rocketmq的client在启动的时候,会通过开启一个定时任务来定期刷新topic信息,这里就来看一下这个刷新的过程。
首先来看一下这个定时任务:
1 | this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { |
A thousand miles begins with a single step .
rocketmq的client在启动的时候,会通过开启一个定时任务来定期刷新topic信息,这里就来看一下这个刷新的过程。
首先来看一下这个定时任务:
1 | this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { |
producer在rocketmq的作用是消息的生产者,consumer在rocketmq的作用是消息的消费者,它的生命周期是跟项目相关的,即是由使用者控制的。而为什么要将这两个角色的启动关闭流程放在一起剖析呢?是因为他们都是MQ的客户端,在启动和关闭的行为上,有着很多共同的地方。接下来便将会来仔细探究其启动和关闭的过程。
producer在启动的时候会做一系列的内部初始化,其启动的源码如下所示:
1 | public void start() throws MQClientException { |
这里以在ubuntu18.04上安装为例,来说明安装单个实例rocketmq的过程和验证情况。
下载地址如下,选择想要下载的版本即可:
http://rocketmq.apache.org/release_notes/
在自己的环境安装目录下解压
1 | $ unzip rocketmq-all-4.5.2-bin-release.zip |
官网下载,地址如下:
https://maven.apache.org/download.cgi
在自己的环境安装目录下解压:
1 | tar -zxvf apache-maven-3.6.2-bin.tar.gz |
在rocketmq中,MQProducer是承载消息发送的,消息的发送又可以分为常规消息的发送和事务消息的发送,其中常规消息发送用的是DefaultMQProducer,事务消息的发送用的是TransactionMQProducer。他们集成关系图如:
从功能上来说,rocketmq支持三种发送消息的方式,分别是同步发送(sync),异步发送(async)和直接发送(oneway)。下面来简单说明一下这三种发送消息的方式,以便了解它们之间的差异。
以下的案例代码将会使用spring-message风格进行展示,即使用
rocketMQTemplate方式,详见rocketmq-spring
业务需求中,在计算人均通话数时,使用聚合的时候使用到了内联以及反内联聚合,当bucket_script聚合作为反内联reverse_nested的子聚和的时候,会报如下错误:
1 | org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=class_cast_exception, reason=org.elasticsearch.search.aggregations.bucket.nested.InternalReverseNested cannot be cast to org.elasticsearch.search.aggregations.InternalMultiBucketAggregation] |