当前位置:首页 > 财经

mq 详解RocketMQ不同类型的消费者

云简介:本文摘自社区“火箭MQ原理与实践分析”系列。作者:阿里巴巴数据专家杨开元。本节将关注不同类型的RocketMQ消费者。

根据用户对阅读操作的控制,可以分为两种。一种是DefaultMQPushConsumer,由系统控制,收到消息后自动调用传入处理方法进行处理;另一个是DefaultMQPullConsumer,读取操作中的大部分功能都是由用户控制的。

1.1.1默认mqpushconsumer的使用

使用DefaultMQPushConsumer的主要目的是设置各种参数,传入处理消息的函数。系统收到消息后自动调用处理函数处理消息,自动保存Offset,添加新的DefaultMQPushConsumer后自动进行负载均衡。下面是org . Apache . rockemq . example . quick start包中源代码的描述。

代码清单1-1缺省QPushConsumer示例

DefaultMQPushConsumer有三个参数需要设置:该使用者的组名、名称服务器的地址和端口号以及主题的名称,这将在下面详细描述。

消费者的组名用于将多个消费者组织在一起,以提高并发处理能力,组名需要与消息模型结合使用。

RocketMQ支持两种消息模式:集群和广播。

在Clustering 模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。

NameServer的地址和端口号可以用多个数字填充,并用分号分隔,以消除单点故障,如“ip1:port;ip2:端口;ip3:端口.

主题名称用于标识消息类型,需要提前创建。如果您不需要使用某个主题下的所有消息,您可以通过指定消息的标签来过滤消息,例如:消费者。订阅(“主题测试”,“标签1 | |标签2 | |标签3”),这意味着该消费者需要在“主题测试”下使用标签1、标签2或标签3来消费消息(标签是发送消息时设置的标签)。在填充标记参数的位置,使用空值或“*”来指示使用此主题的所有消息。

1.1.2默认mqpushconsumer的处理流程

本节通过对源代码的分析来说明DefaultMQPushConsumer的处理流程。

DefaultMQPushConsumer的主要功能在DefaultMQPushConsumerImpl类中实现,消息处理逻辑在pullMessage函数的PullCallBack中。PullCallBack函数中有switch语句,根据Broker返回的消息类型进行处理,具体处理逻辑可以查看源代码。

代码清单1-2默认mqpushconsuer的处理逻辑

DefaultMQPushConsuer的源代码中有很多PullRequest语句,比如DefaultMQPushConumerimpl。这个。立即执行请求。为什么在“PushConsumer”中使用“pull request”?这是一种通过“长轮询”实现推送效果的方法,既有Pull的优点,又有Push的实时性。

推送模式是服务器端收到消息后主动将消息推送至客户端,实时性高。对于一个提供队列服务的服务器来说,主动推送有很多缺点;首先是增加服务器的工作负载,进而影响服务器的性能。其次,客户端的处理能力不同,客户端的状态不受服务器控制。如果客户端不能及时处理服务器推送的消息,将会导致各种潜在的问题。

拉模式是客户端循环从服务器端拉消息,主动权在客户端手里。自己拉一定量的消息后,就可以妥善处理,然后拿走。Pull模式的问题是循环拉消息的间隔不容易设置,间隔过短就会处于“忙等待”状态,浪费资源;每次提取的时间间隔太长,来自服务器的消息可能无法及时处理。

“长轮询”模式是通过客户端和服务器端的协作,既有Pull的优点,又能达到保证实时的目的。我们结合源代码来分析:

代码清单1- 3发送请求消息的代码片段

源代码中有这行设置语句

Requestheader。setsuspendtimeoutmillis(Broker suspenmaxtimemillis),设置Broker的最长阻塞时间。默认设置为15秒。请注意,当没有新消息时,代理会阻止,消息会立即返回。

从Broker的源代码可以看出,服务器收到新的消息请求后,如果队列中没有新的消息,就不急着返回,通过一个周期不断检查状态,等待运行一定时间(默认为5秒),然后再检查。默认情况下,当代理中没有新消息,并且第三次检查的等待时间超过请求中的代理挂起最大时间毫秒数时,将返回空。在等待过程中,代理将在收到新消息后直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是代理端短时间保持来自客户端的请求,如果在这段时间内有新的消息到达,它将立即使用现有的连接将消息返回给消费者。“长轮询”的主动权还在消费者手里,即使Broker有大量的消息积压,也不会主动推给消费者。

长轮询的限制是,它需要在保存消费者请求时占用资源。它适用于消息队列,这是一种可以控制客户端连接数的场景。

1.1.3 DefaultMQPullConsumer

使用DefaultMQPullConsumer需要设置各种参数,编写处理消息的函数,做额外的事情就像使用DefaultMQPullConsumer一样。然后介绍了org . Apache . rockemq . example . simple包中的示例源代码。

示例代码的处理逻辑是逐个读取一个主题下的所有消息队列的内容,然后再次读取后退出,主要处理另外三件事:

(1)获取消息队列并遍历它

一个主题包含多个消息队列。如果消费者需要获取主题下的所有消息,他必须遍历多个消息队列。如果有特殊情况,也可以选择一些特定的消息队列来读取消息。

(2)维护偏移存储

从消息队列中获取消息时,应该传入一个偏移量参数(一个长类型的值)。随着消息的不断读取,偏移量将不断增加。此时,用户负责存储偏移量,偏移量可以存储在内存中,也可以写入磁盘或数据库等。

(3)根据不同的消息状态进行不同的处理

发送拉消息的请求后,会返回四个状态:FOUND、NO_MATCHED_MSG、NO_NEW_MSG、OFFSET _ invalid,应该根据每个状态进行不同的处理。最重要的两个状态是FOUNT和NO_NEW_MSG,表示获得消息,没有新消息。

实际上,while(true)可以放在外层,达到无限循环的目的。因为用户需要遍历消息队列并自己保存偏移量,所以PullConsumer具有更大的自主性和灵活性。

-

结束

1.《mq 详解RocketMQ不同类型的消费者》援引自互联网,旨在传递更多网络信息知识,仅代表作者本人观点,与本网站无关,侵删请联系页脚下方联系方式。

2.《mq 详解RocketMQ不同类型的消费者》仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证。

3.文章转载时请保留本站内容来源地址,https://www.lu-xu.com/caijing/1191602.html

上一篇

我出生那年的祖国记忆 你出生那年祖国发生了什么

下一篇

六大行上半年日赚35.8亿 哪六大排名依次是什么

唐山市古冶区地图 好消息!唐山新开通一条直达古冶公交线路

  • 唐山市古冶区地图 好消息!唐山新开通一条直达古冶公交线路
  • 唐山市古冶区地图 好消息!唐山新开通一条直达古冶公交线路
  • 唐山市古冶区地图 好消息!唐山新开通一条直达古冶公交线路
9c8858 好消息!兰州新区将高规格打造中川机场“迎宾大道”

9c8858 好消息!兰州新区将高规格打造中川机场“迎宾大道”

10月18日,记者从新区管委会获悉,《兰州新区航空服务集团及周边地区城市设计方案》(以下简称《方案》)顺利通过专家评审。根据规划,机场运营区、服务区和机场高速公路的功能布局将整合,形成“一心两带六区”的总体空格局。   根据规划,中川机场航站楼...

北京医科大学口腔医院 好消息!北京医保定点医院可以自己换啦!(附定点及专科医院名单)

崇文区口腔医院 好消息!北京医保定点医院可以自己换啦!(附定点及专科医院名单)

首都医科大学附属北京口腔医院 好消息!北京医保定点医院可以自己换啦!(附定点及专科医院名单)

柯文哲曝林志玲怀孕怎么回事林志玲怀孕消息属实吗

柯文哲曝林志玲怀孕怎么回事林志玲怀孕消息属实吗

据台湾媒体报道,台北市长柯在一次采访中爆料说林志玲怀孕了,并说他听了台大医生的话。但是当被问到医生的名字时,他避免回答:“我不记得是谁告诉我的了。”对此,有媒体要求林志玲的工作人员核实,对方表示“无回应”。等官方宣布吧~林志玲和她的丈夫黑泽明6...

青海高考状元 【好消息】海东市高考理科状元花落乐都一中!(附乐都一中高考简报)

青海高考状元 【好消息】海东市高考理科状元花落乐都一中!(附乐都一中高考简报)

六月飞歌,要第一。 昨天,青海省公布了2017年高考分数和成绩 网上君第一次了解高考是在乐都一中 给朋友一个干巴巴的结果 让我们崇拜大神吧!  今年海东市理科状元是乐都一中学霸 ↓↓↓  乐都一中2017年高考成绩不错! 以下是乐都一中高考情况...

曝林志玲已怀孕 消息可靠吗谁说的官方有回应吗

曝林志玲已怀孕 消息可靠吗谁说的官方有回应吗

据台湾自由电子新闻5日报道,台北市长柯4日在被问及“柯的助手和超级名模林志玲哪个更漂亮”时脱口而出,“林志玲已经结婚并怀孕,所以她退出了这场比赛”。林志玲怀孕的消息在岛上引起了争议。随后,柯文哲5日被媒体质疑林志玲怀孕的消息来源,柯文哲称自己听...