【RocketMq-生产者】消息发送者参数详解

#rocketmq 【RocketMq-生产者】消息发送者参数详解

引言

首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。

这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。

ClientConfig 部分

ClientConfig 定义了一些配置的获取方法,定义了命名空间等参数。无论是消息的发送者还是消费者都是通用的。

下面根据本次的版本的源代码介绍相关参数。

名称

描述

参数类型

默认值

有效值

重要性

namesrvAddr

NameServer的地址列表

String

从-D系统参数rocketmq.namesrv.addr或环境变量。NAMESRV_ADDR

instanceName

客户端实例名称

String

从-D系统参数rocketmq.client.name获取,否则就是DEFAULT

clientIP

客户端IP

String

RemotingUtil.getLocalAddress()

namespace

客户端命名空间

String

accessChannel

设置访问通道

AccessChannel

LOCAL

clientCallbackExecutorThreads

客户端通信层接收到网络请求的时候,处理器的核数

int

Runtime.getRuntime().availableProcessors()

pollNameServerInterval

轮询从NameServer获取路由信息的时间间隔

int

30000,单位毫秒

heartbeatBrokerInterval

定期发送注册心跳到broker的间隔

int

30000,单位毫秒

persistConsumerOffsetInterval

作用于Consumer,持久化消费进度的间隔

int

默认值5000,单位毫秒

pullTimeDelayMillsWhenException

拉取消息出现异常的延迟时间设置

long

1000,单位毫秒

unitName

单位名称

String

unitMode

单位模式

boolean

false

vipChannelEnabled

是否启用vip netty通道以发送消息

boolean

从-D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true

useTLS

是否使用安全传输。

boolean

从-D系统参数tls.enable获取,否则就是false

mqClientApiTimeout

mq客户端api超时设置

int

3000,单位毫秒

language

客户端实现语言

LanguageCode

LanguageCode.JAVA

namesrvAddr

NameServer 的地址列表。

clientIp

private String clientIP = RemotingUtil.getLocalAddress();

从代码中可以看到,使用RemotingUtil#getLocalAddress 获取IP信息,在当前版本中默认返回不是127.0或者192.168开头的 IPV4地址,否则尝试获取IPV6的地址,如果都找不到就用LocalHost地址。

instanceName

private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

instanceName主要获取当前默认的系统参数客户端实例名称,它是客户端标识 CID 的组成部分

unitName 单元名称

也是CID的组成部分之一,如果获取 NameServer 的地址是通过 URL 进行动态更新的话,会通过这个单元名称进行附加,用来区分不同的NameServer地址服务。

clientCallbackExecutorThreads 回调线程池数量

表示public回调线程池的数量,默认为CPU的核数,通常这个值直接根据JVM获取的结果为基准即可。

private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();

namespace 命名空间

4.5.1 之后才加入的新机制。主要适用场景为全链路压测的时候可以利用不同的命名空间划分出真实消息和压测消息,使得线上业务正常执行的情况下同步处理测试流程。

pollNameServerInterval NameServer同步间隔

生产者客户端默认每隔出30S向NameServer 更新Topic的相关信息,注意这个参数在消费端同样存在相同的配置,这个配置通常不建议修改。

/**  
 * Pulling topic information interval from the named server */
   private int pollNameServerInterval = 1000 * 30;

heartbeatBrokerInterval Broker心跳间隔

客户端向 Broker 发送心跳包的时间间隔,默认为 30s,不建议修改该值。

/**  
 * Heartbeat interval in microseconds with message broker */
   private int heartbeatBrokerInterval = 1000 * 30;

persistConsumerOffsetInterval

客户端持久化消息消费进度的间隔,默认为 5s,该值不建议修改。

/**  
 * Offset persistent interval for consumer */
   private int persistConsumerOffsetInterval = 1000 * 5;

DefaultMQProducer 部分

这部分定义了日志和常见的使用消息队列方法,注意在类的开头定义了一个 transient 变量执行内部的保护方法。

官方文档中极少DefaultMQProducer配置如下:

名称

描述

参数类型

默认值

有效值

重要性

producerGroup

生产组的名称,一类Producer的标识

String

DEFAULT_PRODUCER

createTopicKey

发送消息的时候,如果没有找到topic,若想自动创建该topic,需要一个key topic,这个值即是key topic的值

String

TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC

defaultTopicQueueNums

自动创建topic的话,默认queue数量是多少

int

4

sendMsgTimeout

默认的发送超时时间

int

3000,单位毫秒

compressMsgBodyOverHowmuc

消息body需要压缩的阈值

int

1024 * 4,4K

retryTimesWhenSendFailed

同步发送失败的话,rocketmq内部重试多少次

int

2

retryTimesWhenSendAsyncFailed

异步发送失败的话,rocketmq内部重试多少次

int

2

retryAnotherBrokerWhenNotStoreOK

发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发

boolean

false

maxMessageSize

客户端验证,允许发送的最大消息体大小

int

1024 1024 4,4M

traceDispatcher

异步传输数据接口

TraceDispatcher

null

DefaultMQProducerImpl 内部对象

defaultMQProducerImpl 比较意思,因为此对象是 DefaultMQProducerImpl 整个实现类的实际调用者,这里用了受保护的内部对象完成所有方法调用,用final是规避旧版本多个线程初始化对象非原子性的问题,同时保证持有的内部对象不可变。

/**  
 * Wrapping internal implementations for virtually all methods presented in this class. */
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

为什么这里要用 transient? transient 关键字确保对象被序列化之后不会泄漏 DefaultMQProducerImpl 对象。

InternalLogger 日志对象

接着是日志对象,日志对象 InternalLogger 如下定义,内部实现比较简单,基本是一些info和debug日志打印。

InternalLogger log = ClientLogger.getLog()

客户端日志的实现类存储路径时是:${user.home}/logs/rocketmqlogs/rocketmq_client.log,这个路径的获取细节在org.apache.rocketmq.client.log.ClientLogger#createClientAppender可以看到有关细节。使用System.getProperty("user.home")获取的路径在Unix系统中相当于用户的主目录。

user.home 如果是 xxx 则是 /usr/home/xxx 为开始,比如个人的Mac电脑最终的存放地址为:/Users/zxd/logs/rocketmqlogs/rocketmq_client.log

producerGroup 消息组

表示发送者所属组定义如下,根据注释可以得知,gropu 可以实现生产者实例的聚合,主要用在事务的的时候需要使用到,而如果是非事务的消息,每一个进程都是唯一的,彼此没有关联。

有关事务的内容涉及需要用到Broker反查机制,这里不做过多牵扯,继续介绍。

/**  
 * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly * important when transactional messages are involved. </p>  
 *  
 * For non-transactional messages, it does not matter as long as it's unique per process. </p>  
 *  
 * See <a href="http://rocketmq.apache.org/docs/core-concept/">core concepts</a> for more discussion.  
 */
 private String producerGroup;

我们可以通过相关命令或者可视化工具查看发送者所属组的状态。注意默认的主题队列数量,RocketMq默认设置为4。

这里用了volatile保证多线程对于主题队列的数量时可见的,多个生产者实例观察的数量是一致的。

/**  
 * Number of queues to create per default topic. */private volatile int defaultTopicQueueNums = 4;

sendMsgTimeout 消息发送默认超时时间

消息默认发送的超时时间为3秒,

注意的是在 RocketMQ 4.3.0 版本之前由于存在重试机制,程序设置的设计为单次重试的超时时间,即如果设置重试次数为 3 次,则 DefaultMQProducer#send 方法可能会超过 9s 才返回。

/**  
 * Timeout for sending messages. */
   private int sendMsgTimeout = 3000;

主要的改动点在org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl 这个对象里面

修复的方式比较简单粗暴,是增加一个纳秒值进行计算 ,如果请求时间超过发送请求的时间太久就抛出异常。下一次请求对应的扣除掉本次耗费的时间再进行重试,如果重试超过的总时间超过超时时间也同样抛出异常。

这就意味着如果超时次数设置10次,可能不到10次就会因为超时时间的判断抛出异常信息。

long costTimeAsync = System.currentTimeMillis() - beginStartTime;  
if (timeout < costTimeAsync) {  
    throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");  
}

compressMsgBodyOverHowmuch 压缩阈值

默认情况下,如果消息的长度超过4K,那么RocketMq默认会对于消息开启压缩,虽然会增加CPU的性能损耗,但是可以有效减少网络方便的开销。

/**  
 * Compress message body threshold, namely, message body larger than 4k will be compressed on default. */
 // 压缩消息体阈值,即默认压缩大于4k的消息体。
   private int compressMsgBodyOverHowmuch = 1024 * 4;
private boolean tryToCompressMessage(final Message msg) {  
	// 批量数据目前不支持压缩
    if (msg instanceof MessageBatch) {  
        //batch does not support compressing right now  
        return false;  
    }  
    byte[] body = msg.getBody();  
    if (body != null) {  
        if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {  
            try {  
	            // 压缩之后的数据
                byte[] data = compressor.compress(body, compressLevel);  
                if (data != null) {  
                    msg.setBody(data);  
                    return true;                }  
            } catch (IOException e) {  
                log.error("tryToCompressMessage exception", e);  
                log.warn(msg.toString());  
            }  
        }  
    }  
  
    return false;  
}

retryTimesWhenSendFailed 失败重试

同步消息发送重试次数。RocketMQ 客户端内部在消息发送失败时默认会重试 2 次。该参数与 sendMsgTimeout 联合生效,但是需要注意这个参数在SYNC模式下才会重试2次,如果是其他模式则默认是一次失败不再进行重试。

在SYNC模式只重试一次可以看下面代码:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

retryTimesWhenSendAsyncFailed 异步消息重试

见名知义,异步消息发送重试次数,默认为 2,即重试 2 次,一共有 3 次机会。关键的代码在org.apache.rocketmq.client.impl.MQClientAPIImpl#onExceptionImpl 这个参数巨多的方法当中,简单判断当前的异步消息总的重试次数,如果重试多次超过次数则通过sendCallback回调发送异常。

/**  
 * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>  
 *  
 * This may potentially cause message duplication which is up to application developers to resolve. */
   private int retryTimesWhenSendFailed = 2;

retryAnotherBrokerWhenNotStoreOK 失败向其他Broker重试

根据方法的本意按照道理来说如果客户端收到的结果不是 SEND_OK,应该直接向另外一个 Broker 重试,但根据代码分析目前这个参数并不能按预期运作,官方一致也没有关注过这个问题。

  
/**  
 * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>  
 *  
 * This may potentially cause message duplication which is up to application developers to resolve. */
   private int retryTimesWhenSendAsyncFailed = 2;

maxMessageSize 最大消息体

允许发送的最大消息体,默认为 4M,具体可以看下面的判断,注意Broker也有 maxMessageSize 这个参数的设置,故客户端的设置不能超过服务端的配置:

客户端的发送限制如下:

/**  
 * Maximum allowed message body size in bytes. */
   private int maxMessageSize = 1024 * 1024 * 4; // 4M

...

if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {  
    throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,  
        "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());  
}

maxMessageSize 另一个使用地点是在RocketMq的轨迹消息长度判断中,不过这一块的代码在2022年的上半年被某位大神大改优化过,里面的优化代码比较值得学习,但是因为这一块牵扯的内容比较大部头需要先放放,我们看其他参数内容。

// 轨迹消息中累计到3/4左右的时候就进行合并提交
if (currentMsgSize >= traceProducer.getMaxMessageSize() - 10 * 1000) {  
    List<TraceTransferBean> dataToSend = new ArrayList(traceTransferBeanList);  
    AsyncDataSendTask asyncDataSendTask = new AsyncDataSendTask(traceTopicName, regionId, dataToSend);  
    traceExecutor.submit(asyncDataSendTask);  
  
    this.clear();  
  
}

sendLatencyFaultEnable 失败延迟规避

失败规避机制默认为false,它的含义是当Product向Broker发送消息失败之后,客户端的在内部重试的时候会规避掉上一次发送失败的Broker,并且一段时间内不会再向该Broker进行发送。

notAvailableDuration 不可用延迟数组

不可用延迟数组,利用等比数列的时间发送消息,根据数组的设置在多少时间内不向Broker发送消息。从默认值可以看到这里是按照阶层的方式进行增长的。

private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

latencyMax 延迟最大值

设置消息发送的最大延迟级别,同样涉及了延迟推送机制。这里暂时略过。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

MqAdmin

定义了一些基础的规范接口,由于和我们平时写业务代码的Service Interface类似,这里不在过多展开介绍,而是简单罗列一些比较常用的接口:

/**
-   String key:根据 key 查找 Broker,即新主题创建在哪些 Broker 上
-   String newTopic:主题名称
-   int queueNum:主题队列个数
-   int topicSysFlag:主题的系统参数
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
 
/**
	根据队列与时间戳,从消息消费队列中查找消息,返回消息的物理偏移量(在 commitlog 文件中的偏移量)。
	MessageQueue mq:消息消费队列
	long timestamp:时间戳
*/
long searchOffset(MessageQueue mq, long timestamp)

 /**
 查询消息消费队列当前最大的逻辑偏移量,在 consumequeue 文件中的偏移量。
 */
long maxOffset(final MessageQueue mq)
    
 /**
 查询消息消费队列当前最小的逻辑偏移量。
 */
long minOffset(final MessageQueue mq) 
    
/**
返回消息消费队列中第一条消息的存储时间戳。
*/
long earliestMsgStoreTime(MessageQueue mq)
    
/**
根据消息的物理偏移量查找消息
*/
MessageExt viewMessage(String offsetMsgId)
    
/**
根据主题与消息的全局唯一 ID 查找消息。
*/    
MessageExt viewMessage(String topic, String msgId)
    
/**
批量查询消息,其参数列表如下:

String topic:主题名称
String key:消息索引 Key
int maxNum:本次查询最大返回消息条数
long begin:开始时间戳
long end:结束时间戳
*/
QueryResult queryMessage(String topic, String key, int maxNum, long begin,long end)

写在最后

简单的进行一些API讲解,我们可以下具体使用到之后再来本文查阅会更有实际意义。

原文地址:https://cloud.tencent.com/developer/article/2176251

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340