当前位置: 当前位置:首页 >人工智能 >RabbitMQ 客户端源码系列 - Channel 正文

RabbitMQ 客户端源码系列 - Channel

2025-11-05 14:07:59 来源:多维IT资讯作者:IT科技类资讯 点击:295次

前言

续上次分享 RabbitMQ 客户端源码系列 - Connection ,客户继续分享Channel相关的端源源码分析 (com.rabbitmq:amqp-client:4.8.3)。

友情提醒:本次分享适合的码系人群,需要对 RabbitMQ 有一定的客户了解

Channels

https://www.rabbitmq.com/channels.html。

RabbitMQ client Demo

基于上次 Java Client Connecting to RabbitMQ Demo 针对 RabbitMQ Channel 继续深入分析。端源

ConnectionFactory factory = new ConnectionFactory();

// "guest"/"guest" by default,码系 limited to localhost connections

factory.setUsername(userName);

factory.setPassword(password);

factory.setVirtualHost(virtualHost);

factory.setHost(hostName);

factory.setPort(portNumber);

Connection conn = factory.newConnection();

//本次重点分析内容

Channel channel = connection.createChannel();

byte[] messageBodyBytes = "Hello, world!".getBytes();

channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

channel.close();

connection.close();AMQP 协议交互 -- Channel

可以看到简单地调用了 Channel channel = connection.createChannel(); 方法创建Channel,以及可以看到 Channel 相应的客户 AMQP 协议交互:「客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客户端 创建通道;broker 收到并创建通道完成)」。端源

整个 AMQP 协议的码系交互流程(172.30.0.74 为客户端即本机 ip;192.168.17.160 为 RabbitMQ Broker 的 ip)

RabbitMQ client 缓存模式为 Channel

本次分析 RabbitMQ client 采用缓存模式为 Channel:一个 Connection 对应多个 Channel(默认情况下 2048个 channel,其中一个是客户特殊 channel0)

「Connection」:主要用于AMQP协议解析,信道复用。端源「Channel」:路由、码系安全性、客户协调。端源「Queue」:内存中的码系消息,永久队列索引(在 channel 与 queue 之间还有一个 exchange作为交换机,此处就不展开说了)。

RabbitMQ client CacheMode为 Channel模式

channel 源码分析

上面简单地介绍 AMQP 协议交互流程中 Channel 连接、Connection 与 Channel的关系。

开始本次主要介绍 Channel 以及涉及到 Connection 相关的源码,亿华云从connection.createChannel开始深入分析。

/** Public API - {@inheritDoc} */

@Override

public Channel createChannel() throws IOException {

// 确认 connection 为打开的状态

ensureIsOpen();

// 管理channel

ChannelManager cm = _channelManager;

if (cm == null) return null;

// 创建 channel 核心的方法

Channel channel = cm.createChannel(this);

// 用于暴露指标

metricsCollector.newChannel(channel);

return channel;

}

可以看到 channel 由 connection 调用并管理:

ensureIsOpen() -- 确认 connection 为打开的状态,逻辑比较简单判断 shutdownCause 为空即可(connection关闭的话,shutdownCause同时会附带指示关闭的情况)。channelManager -- 统一由 connection 进行初始化及管理,在之前connection与broker 创建连接交互(Connection.Tune --> Connection.TuneOk)中初始化完成,默认 ChannelMax 为 2047 (2048 - 1,这个1对应的特殊的 channel0 )。

重点看下 channelManager.createChannel(this) 逻辑。

public ChannelN createChannel(AMQConnection connection) throws IOException {

ChannelN ch;

// 该 monitor 主要监控 _channelMap 和 channelNumberAllocator

synchronized (this.monitor) {

// 获取 channel 分配的编号

int channelNumber = channelNumberAllocator.allocate();

if (channelNumber == -1) {

return null;

} else {

// 新增新的 channel

ch = addNewChannel(connection, channelNumber);

}

}

// 将新增的 channel 打开

ch.open(); // now that its been safely added

return ch;

}

channelManager 管理着 channel 的创建连接释放等:

synchronized (this.monitor) -- 首先获取 channelManager 的 monitor 锁,防止多线程并发操作。channelNumberAllocator.allocate -- 获取范围内未被分配的 channelNumber,返回 -1 则认为不可再分配新的 channel,内部主要的逻辑由 BitSet 实现的云服务器提供商(感兴趣的可以了解下)。

后续重点分析 addNewChannel 和 open 逻辑。

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {

// 判重

if (_channelMap.containsKey(channelNumber)) {

// That numbers already allocated! Cant do it

// This should never happen unless something has gone

// badly wrong with our implementation.

throw new IllegalStateException("We have attempted to "

+ "create a channel with a number that is already in "

+ "use. This should never happen. "

+ "Please report this as a bug.");

}

// 构建

ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);

// 放入 _channelMap 统一管理

_channelMap.put(ch.getChannelNumber(), ch);

return ch;

}

public ChannelN(AMQConnection connection, int channelNumber,

ConsumerWorkService workService, MetricsCollector metricsCollector) {

// AMQChannel 构造函数

super(connection, channelNumber);

// 构建 消费分配器

this.dispatcher = new ConsumerDispatcher(connection, this, workService);

this.metricsCollector = metricsCollector;

}

这块逻辑比较简单,执行 instantiateChannel 构建和初始化 channel,主要涉及到 连接、channel编号、超时时间、dispatcher等等,每一个 channel 都拥有一个 dispatcher,但是 「连接和线程池」 是与同一个 connection 共享。

最终获取到新创建的 channel,进行打开 ch.open()。

public void open() throws IOException {

// 对rabbitmq broker 发送Channel.Open,并等待broker返回 Channel.OpenOk

exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));

}

public AMQCommand exnWrappingRpc(Method m)

throws IOException

{

try {

// 针对该方法进行rpc调用

return privateRpc(m);

} catch (AlreadyClosedException ace) {

// Do not wrap it since it means that connection/channel

// was closed in some action in the past

throw ace;

} catch (ShutdownSignalException ex) {

throw wrap(ex);

}

}

private AMQCommand privateRpc(Method m)

throws IOException, ShutdownSignalException

{

// 用于 rpc调用过程中 阻塞等待

SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);

rpc(m, k);

// 不超时等待

if(_rpcTimeout == NO_RPC_TIMEOUT) {

return k.getReply();

} else {

try {

// 超时等待

return k.getReply(_rpcTimeout);

} catch (TimeoutException e) {

throw wrapTimeoutException(m, e);

}

}

}

打开新的 channel 逻辑比较简单:主要是和 rabbitmq broker 进行 rpc 调用:「客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk,这个过程完成后 创建通道完成,也就可以进行后续的 channel使用」。

最后

本次分享 RabbitMQ Client 与 RabbitMQ Broker 根据 AMQP 协议交互流程中 根据 channel 源码进行分析,其中还有很多 channel 源码细节感兴趣的读者可以进行深入了解。

作者:IT科技
------分隔线----------------------------
头条新闻
图片新闻
新闻排行榜