RocketMQ概览

时间:2021-04-19 14:11:21   收藏:0   阅读:0

核心概念

Topic:消息主题,一级消息类型,通过Topic对消息进行分类。

Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。

Message ID:消息的全局唯一标识,由消息队列RocketMQ版系统自动生成,唯一标识某条消息。

Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。

Producer:消息生产者,也称为消息发布者,负责生产并发送消息。

Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:

Group:一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。

                 例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。

 

广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。

                 例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。

 

消息收发模型

消息队列RocketMQ版支持发布和订阅模型,消息生产者应用创建Topic并将消息发送到Topic。消费者应用创建对Topic的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。

技术图片

生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。

一个生产者集群可以发送多个Topic消息。发送分布式事务消息时,如果生产者中途意外宕机,消息队列RocketMQ版服务端会主动回调生产者集群的任意一台机器来确认事务状态。

 

消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。

一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。

一个消费者集群对应一个Group ID,一个Group ID可以订阅多个Topic

 

集群消费和广播消费

消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:

集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。
集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

 

集群消费模式

适用场景
适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。

技术图片

 

注意事项

 

 

广播消费模式

适用场景
适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。

技术图片

 

注意事项

 

系统部署架构

技术图片

图中所涉及到的概念如下所述:

 

消息类型

  

普通消息

消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

定时和延时消息

允许消息生产者对指定消息进行定时(延时)投递,最长支持40天。

顺序消息

允许消息消费者按照消息发送的顺序对消息进行消费。

事务消息

实现类似X或Open XA的分布事务功能,以达到事务最终一致性状态。

 

普通消息是指消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。

定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。

延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

 

顺序消息(FIFO消息)是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息。顺序发布和顺序消费是指对于指定的一个Topic,生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被客户端接收到。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

技术图片

技术图片

对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // Message所属的Topic。
                    "Order_global_topic",
                    // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    "TagA",
                    // Message Body,可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                    "send order global msg".getBytes()
            );
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey(orderId);
            // 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
            // 全局顺序消息,该字段可以设置为任意非空字符串。
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // 发送消息,只要不抛异常就是成功。
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // 在应用退出前,销毁Producer对象。
        // 注意:如果不销毁也没有问题。
 producer.shutdown();

 

事务消息适用于所有对数据最终一致性有强需求的场景

技术图片

事务消息发送步骤如下:

  1. 发送方将半事务消息发送至消息队列RocketMQ版服务端。
  2. 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

 

RocketMQ最佳实践


Topic与Tag

 

Topic和Tag的关系如下图

技术图片

 

到底什么时候该用Topic,什么时候该用Tag?

建议从以下几个方面进行判断:

总的来说,针对消息分类,您可以选择创建多个Topic,或者在同一个Topic下创建多个Tag。但通常情况下,不同的Topic之间的消息没有必然的联系,而Tag则用来区分同一个Topic下相互关联的消息,例如全集和子集的关系、流程先后的关系。

 

消息幂等

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。

 

处理方法

因为不同的Message ID对应的消息内容可能相同,有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。

以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);     

 

消费者收到消息时可以根据消息的Key,即订单号来实现消息幂等:

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // 根据业务唯一标识的Key做幂等处理。
    }
}); 

 

订阅关系一致

订阅关系一致指的是同一个消费者Group ID下所有Consumer实例所订阅的Topic、Group ID、Tag必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

消息队列RocketMQ版里的一个消费者Group ID代表一个Consumer实例群组。对于大多数分布式应用来说,一个消费者Group ID下通常会挂载多个Consumer实例。

由于消息队列RocketMQ版的订阅关系主要由Topic+Tag共同组成,因此,保持订阅关系一致意味着同一个消费者Group ID下所有的实例需在以下两方面均保持一致:

 

正确订阅关系

技术图片

 

消息堆积和延迟问题

消息处理流程中,如果客户端的消费速度跟不上服务端的发送速度,未处理的消息会越来越多,这部分消息就被称为堆积消息。消息出现堆积进而会造成消息消费延迟。以下场景需要重点关注消息堆积和延迟的问题:

客户端消费原理

技术图片

 

通过以上客户端消费原理可以看出,消息堆积的主要瓶颈在于本地客户端的消费能力,即消费耗时和消费并发度。想要避免和解决消息堆积问题,必须合理的控制消费耗时和消息并发度,其中消费耗时的优先级高于消费并发度,必须先保证消费耗时的合理性,再考虑消费并发度问题。

 

消费耗时

影响消费耗时的消费逻辑主要分为CPU内存计算和外部I/O操作,通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部I/O操作来说几乎可以忽略。外部I/O操作通常包括如下业务逻辑:

这类外部调用的逻辑和系统容量您需要提前梳理,掌握每个调用操作预期的耗时,这样才能判断消费逻辑中I/O操作的耗时是否合理。通常消费堆积都是由于这些下游系统出现了服务异常、容量限制导致的消费耗时增加。

例如:某业务消费逻辑中需要写一条数据到数据库,单次消费耗时为1 ms,平时消息量小未出现异常。业务侧进行大促活动时,写数据库TPS爆发式增长,并很快达到数据库容量限制,导致消费单条消息的耗时增加到100 ms,业务侧可以明显感受到消费速度大幅下跌。此时仅通过调整消息队列RocketMQ版SDK的消费并发度并不能解决问题,需要对数据库容量进行升配才能从根本上提高客户端消费能力。

 

消费并发度

RocketMQ并发度计算方法如下表所示。

消息类型消费并发度
普通消息 单节点线程数*节点数量

定时和延时消息
事务消息
顺序消息 Min(单节点线程数*节点数量,分区数)

客户端消费并发度由单节点线程数和节点数量共同决定,一般情况下需要优先调整单节点的线程数,若单机硬件资源达到上限,则必须通过扩容节点来提高消费并发度。

 

如何避免消息堆积和延迟

为了避免在业务使用时出现非预期的消息堆积和延迟问题,您需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。

 

如何处理消息堆积

确认消息的消费耗时是否合理。

查看客户端堆栈信息。只需要关注线程名为ConsumeMessageThread的线程,这些都是业务消费消息的逻辑。

常见的异常堆栈信息如下:

示例一:空闲无堆积的堆栈。

消费空闲情况下消费线程都会处于WAITING状态等待从消费任务队里中获取消息。

技术图片

示例二:消费逻辑有抢锁休眠等待等情况。
消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。

技术图片

示例三:消费逻辑操作数据库等外部存储卡住。
消费线程阻塞在外部的HTTP调用上,导致消费缓慢。

技术图片

 

针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以丢弃,您可以通过重置消费位点跳过这些堆积的消息做到快速恢复。

 

规范&特性

Group

Topic

死信消息

消息

 

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!