
上一篇文章《高并发+海量数据下如何实现系统解耦?发海【中】》分析了一下如何利用消息中间件对系统进行解耦处理。
同时,量数我们也提到了使用消息中间件还有利于一份数据被多个系统同时订阅,据下供多个系统来使用于不同的何实目的。
目前的现系一个架构如下图所示。

在这个图里,统解我们可以清晰的耦下看到,实时计算平台发布的高并一份数据到消息中间件里,接着,发海会进行如下步骤:
数据查询平台,量数会订阅这份数据,据下并落入自己本地的何实数据库集群和缓存集群里,接着对外提供数据查询的现系服务数据质量监控系统,会对计算结果按照一定的统解业务规则进行监控,如果发现有数据计算错误,则会立马进行报警数据链路追踪系统,会采集计算结果作为一个链路节点,同时对一条数据的整个完整计算链路都进行采集并组装出来一系列的数据计算链路落地存储,最后如果某个数据计算错误了,就可以立马通过计算链路进行回溯排查问题因此上述场景中,使用消息中间件一来可以解耦,二来还可以实现消息“Pub/Sub”模型,实现消息的免费信息发布网发布与订阅。
这篇文章,咱们就来看看,假如说基于RabbitMQ作为消息中间件,如何实现一份数据被多个系统同时订阅的“Pub/Sub”模型。

上面那个图,其实就是采用的RabbitMQ最基本的队列消费模型的支持。
也就是说,你可以理解为RabbitMQ内部有一个队列,生产者不断的发送数据到队列里,消息按照先后顺序进入队列中排队。
接着,假设队列里有4条数据,然后我们有2个消费者一起消费这个队列的数据。
此时每个消费者会均匀的被分配到2条数据,也就是说4条数据会均匀的分配给各个消费者,每个消费者只不过是处理一部分数据罢了,这个就是典型的源码库队列消费模型。
但是消息中间件还可以实现一种“Pub/Sub”模型,也就是“发布/订阅”模型,Pub就是Publish,Sub就是Subscribe。
这种模型是可以支持多个系统同时消费一份数据的。也就是说,你发布出去的每条数据,都会广播给每个系统。
给大家来一张图,一起来感受一下。

如上图所示。也就是说,我们想要实现的上图的效果,实时计算平台发布一系列的数据到消息中间件里。
然后数据查询平台、数据质量监控系统、数据链路追踪系统,都会订阅数据,b2b信息网都会消费到同一份完整的数据,每个系统都可以根据自己的需要使用数据。
这,就是所谓的“Pub/Sub”模型,一个系统发布一份数据出去,多个系统订阅和消费到一模一样的一份数据。
那如果要实现上述的效果,基于RabbitMQ应该怎么来处理呢?
实际上来说,在RabbitMQ里面是不允许生产者直接投递消息到某个queue(队列)里的,而是只能让生产者投递消息给RabbitMQ内部的一个特殊组件,叫做“exchange”。
关于这个exchange,大概你可以把这个组件理解为一种消息路由的组件。
也就是说,实时计算平台发送出去的message到RabbitMQ中都是由一个exchange来接收的。
然后这个exchange会根据一定的规则决定要将这个message路由转发到哪个queue里去,这个实际上就是RabbitMQ中的一个核心的消息模型。
大家看下面的图,一起来理解一下。

在之前的文章里,我们投递消息到RabbitMQ的时候,也没有用什么exchange,但是为什么就还是把消息投递到了queue里去呢?
那是因为我们用了默认的exchange,他会直接把消息路由到你指定的那个queue里去,所以如果简单用队列消费模型,不就省去了exchange的概念了吗。

上面这段就是之前我们给大家展示的,让消息持久化的一种投递消息的方式。
大家注意里面的第一个参数,是一个空的字符串,这个空字符串的意思,就是说投递消息到默认的exchange里去,然后他就会路由消息到我们指定的queue里去。
在RabbitMQ里,exchange这种组件有很多种类型,比如说:direct、topic、headers以及fanout。这里咱们就来看看最后一种,fanout这种类型的exchange组件。
这种exchange组件其实非常的简单,你可以创建一个fanout类型的exchange,然后给这个exchange绑定多个queue。
接着只要你投递一条消息到这个exchange,他就会把消息路由给他绑定的所有queue。
使用下面的代码就可以创建一个exchange,比如说在实时计算平台(生产者)的代码里,可以加入下面的一段,创建一个fanout类型的exchange。
第一个参数我们叫做“rt_compute_data”,这个就是exchange的名字,rt就是“RealTime”的缩写,意思就是实时计算系统的计算结果数据。
第二个参数就是定义了这个exchange的类型是“fanout”。
channel.exchangeDeclare(
"rt_compute_data",
"fanout");接着我们就采用下面的代码来投递消息到我们创建好的exchange组件里去:

大家会注意到,此时消息就是投递到指定的exchange里去了,但是路由到哪个queue里去呢?此时我们暂时还没确定,要让消费者自己来把自己的queue绑定到这个exchange上去才可以。
我们对消费者的代码也进行修改,之前我们在这里关闭了autoAck机制,然后每次都是自己手动ack。

上面的代码里,每个消费者系统,都会有一些不一样,就是每个消费者都需要定义自己的队列,然后绑定到exchange上去。比如:
数据查询平台的队列是“rt_compute_data_query”。数据质量监控平台的队列是“rt_compute_data_monitor”。数据链路追踪系统的队列是“rt_compute_data_link”。这样,每个订阅这份数据的系统其实都有一个属于自己的队列,然后队列里被会被exchange路由进去实时计算平台生产的所有数据。
而且因为是多个队列的模式,每个系统都可以部署消费者集群来进行数据的消费和处理,非常的方便。
最后,给大家来一张大图,我们再跟着图,来捋一捋整个流程。

如上图所示,首先,实时计算平台会投递消息到“rt_compute_data”这个“exchange”里去,但是他没指定这个exchange要路由消息到哪个队列,因为这个他本身是不知道的。
接着数据查询平台、数据质量监控系统、数据链路追踪系统,就可以声明自己的队列,都绑定到exchange上去。
因为queue和exchange的绑定,在这里是要由订阅数据的平台自己指定的。而且因为这个exchange是fanout类型的,他只要接收到了数据,就会路由数据到所有绑定到他的队列里去,这样每个队列里都有同样的一份数据,供对应的平台来消费。
而且针对每个平台自己的队列,自己还可以部署消费服务集群来消费自己的一个队列,自己的队列里的数据还是会均匀分发给各个消费服务实例来处理,每个消费服务实例会获取到一部分的数据。
大家思考一下,这样是不是就实现了不同的系统订阅一份数据的“Pub/Sub”的模型?
当然,其实RabbitMQ还支持各种不同类型的exchange,可以实现各种复杂的功能。
PPPOECONF 是 Linux 平台上连接有线宽带的命令行工具软件,它的历史已经很久远了(我从 07 年开始接触 Linux 的时候就有了)。后来,Linux 的 Gnome 桌面环境中出现了一个叫 Network Manager 的组件,提供了图形化连接有线宽带的方法,PPPOECONF 就再也没有用过。但是今年升级 Ubuntu 15.04 之后,再使用 Network Manager,却一直无法通过虚拟拨号连接有线宽带。没办法,只有再次请出 PPPOECONF 这个老朋友。本经验就介绍一下如何使用 PPPOECONF 连接宽带。1、升级 Ubuntu 15.04 之后,使用 Network Manager 连接有线宽带的虚拟拨号,会出现以下错误,错误信息是:连接启用失败:(1)Creating object for path /org/freedesktop/NetworkManager/ActiveConnection/18 failed in libnm-glib.2、搜索国外论坛,有大神提出了重装 Network Manager 的办法。我试过了,然并卵。3、那就再请 PPPOECONF 这一员老将出山吧。按 Ctrl + Alt + t 组合快捷键,启动终端。4、在命令提示符 $ 后面,键入命令:$ sudo pppoeconf启动 PPPOECONF 的配置向导。在后面的步骤中,假如不明白各个步骤内容是啥意思(我就是不明白的一员嘿嘿),只要选“是”就行了。5、感谢我们的网络服务提供商,我们在这一步骤中选“是”,启用所谓的“经典模式”。6、到了输入宽带拨号用户账号的步骤了,我们按键盘上的 Backspace 删除键,删除图中文本框中预置的 username,然后输入我们自己的账号。7、当然,接下来输入密码:在空白的文本框里直接输入就行了。8、这一步骤的意思是自动获取我们的网络服务提供商分配给我们的 DNS 域名,我们还是选“是”。9、看到这一步骤中的大片文字,我能做的也只能选“是”。10、在这里,假如你只有有线连接而且要一直使用,那就选”是“,让 PPPOECONF 在开机时自动启动;假如只是像我一样偶尔用用有线,那就选”否“,在需要的时候键入命令,去启动 pppoe 虚拟拨号。11、终于接近尾声了。在这里,选“是”会立即启动虚拟拨号并建立连接。我选“否”,等会打命令启动,老长时间没打命令了,过过瘾、受受虐。12、在上一步骤中选否,PPPOECONF 向导结束,回到终端的命令提示符,我输入:$ sudo pon dsl-provider则宽带连接建立;断开有线宽带、回到 WLAN 无线网络环境,则键入:$ sudo poff可以看到,在原来的 NetworkManager 不被删除、不停止运行的前提下,可以随时启动、关闭 PPPOE;也就是说,两者和平共处、互不影响。
一文快速上手 Nacos 注册中心+配置中心
使用 Vite 和 TypeScript 带你从零打造一个属于自己的 Vue3 组件库