0%

rocketmq源码剖析之producer配置

在rocketmq中,MQProducer是承载消息发送的,消息的发送又可以分为常规消息的发送和事务消息的发送,其中常规消息发送用的是DefaultMQProducer,事务消息的发送用的是TransactionMQProducer。他们集成关系图如:

MQProducer

DefaultMQProducer

DefaultMQProducer 中有一些常规的producer端的配置,如下所示:

  • producerGroup : 生产者所属的组;
  • defaultTopicQueueNums:在创建topic时,topic上默认的队列数,默认为4 ;
  • sendMsgTimeout:发送消息的超时时间,默认为3000ms;
  • compressMsgBodyOverHowmuch:压缩消息体的阙值,默认为1024*4=4k,即当消息的body大小为4k的时候对body内容进行压缩;
  • retryTimesWhenSendFailed:同步发送失败时重试的次数,默认为2次;
  • retryTimesWhenSendAsyncFailed:异步发送失败时重试的次数,默认为2次;
  • retryAnotherBrokerWhenNotStoreOK:表示当发送失败的时候是否重试发送到另一个broker,默认为false;
  • maxMessageSize:消息body最大值,1024*1024*4 = 4M

DefaultMQProducer相关配置源码信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
* important when transactional messages are involved.
* </p>
*
* For non-transactional messages, it does not matter as long as it's unique per process.
* </p>
*
* See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
*/
private String producerGroup;

/**
* Just for testing or demo program
*/
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

/**
* Number of queues to create per default topic.
*/
private volatile int defaultTopicQueueNums = 4;

/**
* Timeout for sending messages.
*/
private int sendMsgTimeout = 3000;

/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
private int compressMsgBodyOverHowmuch = 1024 * 4;

/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;

/**
* Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
* </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;

/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryAnotherBrokerWhenNotStoreOK = false;

/**
* Maximum allowed message size in bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M

TransactionMQProducer

TransactionMQProducer中的附加配置项主要是针对事务回查的,如下所示:

1
2
3
private int checkThreadPoolMinSize = 1;
private int checkThreadPoolMaxSize = 1;
private int checkRequestHoldMax = 2000;

其中checkRequestHoldMax是事务回查请求阻塞等待队列的最大值,即最多支持2000个请求等待。