跳到正文

消息队列学习笔记

Posted by lili Blog on July 1, 2022 · 读取中...

    消息队列学习笔记

    参考资料:

    https://zhuanlan.zhihu.com/p/74063251

    https://mp.weixin.qq.com/s/Qhw4oS0OeN1N7uT1z6rbqg

    面试官让我重构 Kafka,懵了……https://mp.weixin.qq.com/s/_RIvZwK1sJJP8xnUDyAk1Q

    一、消息队列(MQ)是什么

    消息队列(MQ),是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。

    消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    消息中间件的组成

    1、Broker:消息服务器,作为server提供消息核心服务

    2、Producer:消息生产者,业务的发起方,负责生产消息传输给broker,

    3、Consumer:消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

    4、Topic:主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播

    5、Queue:队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收

    6、Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

    1、mq的本质

    对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费。再直白点就是一个「转发器」。

    生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者,仅此而已。

    img

    上面这个图便是消息队列最原始的模型,它包含了两个关键词:消息和队列。

    1、消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。 2、队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。

    2、原始模型的进化

    再看今天我们最常用的消息队列产品(RocketMQ、Kafka 等等),你会发现:它们都在最原始的消息模型上做了扩展,同时提出了一些新名词,比如:主题(topic)、分区(partition)、队列(queue)等等。

    要彻底理解这些五花八门的新概念,我们化繁为简,先从消息模型的演进说起(道理好比:架构从来不是设计出来的,而是演进而来的

    队列模型

    最初的消息队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。消息按照什么顺序写进去,就按照什么顺序读出来。不过,队列没有 “读” 这个操作,读就是出队,从队头中 “删除” 这个消息。

    img

    生产者–>队列(队尾->队头)–>消费者

    这便是队列模型:它允许多个生产者往同一个队列发送消息。但是,如果有多个消费者,实际上是竞争的关系,也就是一条消息只能被其中一个消费者接收到,读完即被删除。

    发布-订阅模型

    如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。

    一个可行的方案是:为每个消费者创建一个单独的队列,让生产者发送多份。这种做法比较笨,而且同一份数据会被复制多份,也很浪费空间。

    为了解决这个问题,就演化出了另外一种消息模型:发布-订阅模型。

    img

    在发布-订阅模型中,存放消息的容器变成了 “主题”,订阅者在接收消息之前需要先 “订阅主题”。最终,每个订阅者都可以收到同一个主题的全量消息。

    仔细对比下它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费。

    小结

    最后做个小结,上面两种模型说白了就是:单播和广播的区别。而且,当发布-订阅模型中只有 1 个订阅者时,它和队列模型就一样了,因此在功能上是完全兼容队列模型的。

    这也解释了为什么现代主流的 RocketMQ、Kafka 都是直接基于发布-订阅模型实现的?此外,RabbitMQ 中之所以有一个 Exchange 模块?其实也是为了解决消息的投递问题,可以变相实现发布-订阅模型。

    包括大家接触到的 “消费组”、“集群消费”、“广播消费” 这些概念,都和上面这两种模型相关,以及在应用层面大家最常见的情形:组间广播、组内单播,也属于此范畴。

    3、 MQ 的应用场景

    目前,MQ 的应用场景非常多,大家能倒背如流的是:系统解耦、异步通信和流量削峰。除此之外,还有延迟通知、最终一致性保证、顺序消息、流式处理等等。

    那到底是先有消息模型,还是先有应用场景呢?答案肯定是:先有应用场景(也就是先有问题),再有消息模型,因为消息模型只是解决方案的抽象而已。

    MQ 经过 30 多年的发展,能从最原始的队列模型发展到今天百花齐放的各种消息中间件(平台级的解决方案),我觉得万变不离其宗,还是得益于:消息模型的适配性很广。

    我们试着重新理解下消息队列的模型。它其实解决的是:生产者和消费者的通信问题。那它对比 RPC 有什么联系和区别呢?

    img

    通过对比,能很明显地看出两点差异:

    1、引入 MQ 后,由之前的一次 RPC 变成了现在的两次 RPC,而且生产者只跟队列耦合,它根本无需知道消费者的存在。 2、多了一个中间节点「队列」进行消息转储,相当于将同步变成了异步。

    再返过来思考 MQ 的所有应用场景,就不难理解 MQ 为什么适用了?因为这些应用场景无外乎都利用了上面两个特性。

    举一个实际例子,比如说电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。

    img

    引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。

    改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,而且后续业务再扩展(比如:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的修改,从而保证了核心流程的稳定性,降低了维护成本。

    这个改造还带来了另外一个好处:因为 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。这便是 MQ 的另一个典型应用场景:异步通信。

    除此以外,由于队列能转储消息,对于超出系统承载能力的场景,可以用 MQ 作为 “漏斗” 进行限流保护,即所谓的流量削峰

    我们还可以利用队列本身的顺序性,来满足消息必须按顺序投递的场景;利用队列 + 定时任务来实现消息的延时消费 ……

    MQ 其他的应用场景基本类似,都能回归到消息模型的特性上,找到它适用的原因,这里就不一一分析了。

    总之,就是建议大家多从复杂多变的实践场景再回归到理论层面进行思考和抽象,这样能吃得更透。

    4、消息队列的缺点

    系统复杂性

    本来蛮简单的一个系统,我代码随便写都没事,现在你凭空接入一个中间件在那,我是不是要考虑去维护他,而且使用的过程中是不是要考虑各种问题,比如消息重复消费消息丢失消息的顺序消费等等,反正用了之后就是贼烦。

    数据一致性

    这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点。

    就像我开头说的,你下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败你就不管了?

    分布式事务:把下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。

    可用性

    你搞个系统本身没啥问题,你现在突然接入一个中间件在那放着,万一挂了怎么办?我下个单MQ挂了,优惠券不扣了,积分不减了,这不是杀一个程序员能搞定的吧,感觉得杀一片。

    至于怎么保证高可用,还是那句话也不在这里展开讨论了,我后面一样会写,像写Redis那样写出来的。

    消息重复消费

    由于消息队列的重试机制,也就是下游业务发生异常会进行充重试,包括服务的网络抖动开发人员代码Bug,还有数据问题等都可能处理失败要求重发的,因此其他在监听的服务便会收到两次消息。

    解决办法:做接口幂等处理,通俗了讲就是同样的参数调用我这个接口,调用多少次结果都是一个。

    强校验场景(金钱):

    比如你监听到用户支付成功的消息,你监听到了去加GMV是不是要调用加钱的接口,那加钱接口下面再调用一个加流水的接口,两个放在一个事务,成功一起成功失败一起失败。每次消息过来都要拿着订单号+业务场景这样的唯一标识(比是天猫双十一活动)去流水表查,看看有没有这条流水,有就直接return不要走下面的流程了,没有就执行后面的逻辑。之所以用流水表,是因为涉及到金钱这样的活动,有啥问题后面也可以去流水表对账,还有就是帮助开发人员定位问题。

    弱校验场景(其他):

    一些不重要的场景,比如给谁发短信啥的,我就把这个id+场景唯一标识作为Redis的key,放到缓存里面失效时间看你场景,一定时间内的这个消息就去Redis判断。用KV就算消息丢了可能这样的场景也没关系,反正丢条无关痛痒的通知短信嘛

    消息如何保证绝对有序

    同个业务场景下不同几个操作的消息同时过去,本身顺序是对的,但是你发出去的时候同时发出去了,消费的时候却乱掉了,这样就有问题了。

    解决办法

    生产者消费者一般需要保证顺序消息的话,可能就是一个业务场景下的,比如订单的创建、支付、发货、收货。

    一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制,他有三种实现:

    我们可使用Hash取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。RocketMQ仅保证顺序发送,顺序消费由消费者业务保证!!!这里很好理解,一个订单你发送的时候放到一个队列里面去,你同意的订单号Hash一下是不是还是一样的结果,那肯定是一个消费者消费,那顺序是不是就保证了?

    Tip:我写到这点的时候人才群里也有人问我,一个队列有序出去,一个消费者消费不就好了,我想说的是消费者是多线程的,你消息是有序的给他的,你能保证他是有序的处理的?还是一个消费成功了再发下一个稳妥

    5、如何设计一个mq?

    任何 MQ 无外乎:一发一存一消费,这是 MQ 最核心的功能需求。另外,从技术维度来看 MQ 的通信模型,可以理解成:两次 RPC + 消息转储。

    1、直接利用成熟的 RPC 框架(Dubbo 或者 Thrift),实现两个接口:发消息和读消息。 2、消息放在本地内存中即可,数据结构可以用 JDK 自带的 ArrayBlockingQueue 。

    1、需要从功能性需求(收发消息)和非功能性需求(高性能、高可用、高扩展等)两方面入手。

    2、功能性需求不是重点,能覆盖 MQ 最基础的功能即可,至于延时消息、事务消息、重试队列等高级特性只是锦上添花的东西。

    3、最核心的是:能结合功能性需求,理清楚整体的数据流,然后顺着这个思路去考虑非功能性的诉求如何满足,这才是技术难点所在。

    6、什么是分布式事务

    分布式事务事务隔离级别ACID我相信大家这些东西都耳熟能详了,那什么是事务呢?

    概念:一般是指要做的或所做的事情。

    在计算机术语中是指访问并可能更新数据库中各种数据项的一个程序执行单元(unit)。

    事务通常由高级数据库操纵语言或编程语言(如SQL,C++或Java)书写的用户程序用户程序的执行所引起,并用形如begin transactionend transaction语句(或函数调用)来界定。

    事务由事务开始(begin transaction)和事务结束(end transaction)之间执行的全体操作组成

    我接触和了解到的分布式事务大概分为:

    • 2pc(两段式提交)
    • 3pc(三段式提交)
    • TCC(Try、Confirm、Cancel)
    • 最大努力通知
    • XA
    • 本地消息表(ebay研发出的)
    • 半消息/最终一致性(RocketMQ)

    这里我就介绍下最简单的2pc(两段式),以及大家以后可能比较常用的半消息事务也就是最终一致性,目的是让大家理解下分布式事务里面消息中间件的作用,别的事务都大同小异,都有很多优点。

    2pc(两段式提交) :

    image-20220901152958544

    2pc(两段式提交)可以说是分布式事务的最开始的样子了,像极了媒婆,就是通过消息中间件协调多个系统,在两个系统操作事务的时候都锁定资源但是不提交事务,等两者都准备好了,告诉消息中间件,然后再分别提交事务。

    但此时存在问题,如果A系统事务提交成功了,但是B系统在提交的时候网络波动或者各种原因提交失败了,其实还是会失败的。

    image-20220901153102515

    整个流程中,我们能保证是:

    • 业务主动方本地事务提交失败,业务被动方不会收到消息的投递。
    • 只要业务主动方本地事务执行成功,那么消息服务一定会投递消息给下游的业务被动方,并最终保证业务被动方一定能成功消费该消息(消费成功或失败,即最终一定会有一个最终态)。

    不过呢技术就是这样,各种极端的情况我们都需要考虑,也很难有完美的方案,所以才会有这么多的方案三段式TCC最大努力通知等等分布式事务方案,大家只需要知道为啥要做,做了有啥好处,有啥坏处,在实际开发的时候都注意下就好好了,系统都是根据业务场景设计出来的,离开业务的技术没有意义,离开技术的业务没有底气

    还是那句话:没有最完美的系统,只有最适合的系统。

    二、消息队列的种类

    目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等这几种。

    ActiveMQRabbitMQ这两着因为吞吐量还有GitHub的社区活跃度的原因,在各大互联网公司都已经基本上绝迹了,业务体量一般的公司会是有在用的,但是越来越多的公司更青睐RocketMQ这样的消息中间件了。

    KafkaRocketMQ一直在各自擅长的领域发光发亮,不过写这篇文章的时候我问了蚂蚁金服,字节跳动和美团的朋友,好像大家用的都有点不一样,应该都是各自的中间件,可能做过修改,也可能是自研的,大多没有开源

    img

    大家其实一下子就能看到差距了,就拿吞吐量来说,早期比较活跃的ActiveMQRabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量是真的很重要

    比如现在突然爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?

    再说这些用户大量涌进来看了你的新闻产生了一系列的附带流量,你怎么应对这些数据,很多场景离开消息队列基本上难以为继

    部署方式而言前两者也是大不如后面两个天然分布式架构的哥哥,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。

    我们再聊一下RabbitMQ这个中间件其实还行,但是这玩意开发语言居然是erlang,我敢说绝大部分工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。

    至于RocketMQ(阿里开源的),git活跃度还可以。基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)可能是因为师出同门的原因吧。

    Kafka我放到最后说,你们也应该知道了,压轴的这是个大哥,大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。

    以上这些都只是一些我自己的个人意见,真正的选型还是要去深入研究的,不然那你公司一天UV就1000你告诉我你要去用Kafka我只能说你吃饱撑的。

    记住,没有最好的技术,只有最适合的技术,不要为了用而用

    三、kafka安装和使用

    环境安装

    1、Homebrew安装kakfa

    1
    
    brew install kafka
    
    • kafka使用zookeeper管理,安装过程会自动安装zookeeper
    • 安装目录:/opt/homebrew/Cellar/kafka/* (*为具体安装的版本)
    • 配置文件目录:/opt/homebrew/etc/kafka

    image-20220831135902897

    2、修改kafka启动配置文件

    1
    
    vim /opt/homebrew/etc/kafka/server.properties
    

    本地先配置单机版

    • 修改kafka的监听地址和端口为localhost:9092 listeners=PLAINTEXT://localhost:9092

    3、依次启动服务

    1
    2
    
    brew services start zookeeper
    brew services start kafka
    

    4、测试kafka

    创建topic
    1
    
    kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    查看创建的topic
    1
    
    kafka-topics --list --bootstrap-server localhost:9092
    
    生产消息

    通过控制台向“test”topic添加消息,>可以随便输入消息

    1
    
    kafka-console-producer --broker-list localhost:9092 --topic test
    
    消费消息

    开启新的terminal,通过控制台查看消费“test”topic中的消息

    1
    
    kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
    
    关闭zookeeper

    开启新的terminal,进入kafka的bin目录,执行如下命令

    1
    
    brew services stop zookeeper
    
    关闭kafka

    在上面的terminal中,执行如下命令关闭kafka

    1
    
    brew services stop kafka
    

    常见错误

    1、/opt/kafka/bin/kafka-run-class.sh: 第 342 行:exec: java: 未找到

    https://blog.csdn.net/Drink_hot_water/article/details/125913288

    2、Exception in thread “main” joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option

    创建topic报错

    1
    
    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    原因:是在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。

    解决办法:使用 Kafka Broker的 –bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。

    1
    
    kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    

    kafka消费数据是怎么消费的,用的是什么方式?

    Kafka消费数据的方式主要包含如下几种:

    1、指定多主题消费

    1
    
    consumer.subscribe(Arrays.asList(“t4”,“t5”));
    

    2、指定分区消费

    1
    
    consumer.assign(list);
    

    3、手动修改偏移量

    1
    2
    3
    4
    5
    
    consumer.commitSync();                //提交当前消费偏移量
    
    consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>)    //提交指定偏移量
    
    consumer.assign(Arrays.asList(tp));
    

    4、seek,修改偏移量搜索指针,顺序读取数据

    1
    2
    3
    
    consumer.assign(Arrays.asList(tp));
    
        consumer.seek(tp,0);
    

    ###