问题描述
RocketMQ 正常生产和消费消息,但是消费轨迹无法查看的问题该如何排查?
问题分析
此类问题原因一般如下:
- 客户端 SDK 使用的版本不对, 需要使用 SDK 版本为 4.8.0, 4.7 和 4.9 的版本均会导致前端页面报错如下图:
- 生产端和消费端没有开启消费轨迹功能,enableMsgTrace 需要设置为 true,
- 生产者开启消息轨迹:
AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", aclHook,true,null);
- 消费者开启消息轨迹:
AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", aclHook, new AllocateMessageQueueAveragely(),true,null);
- 轨迹的 Topic 设置的不对,暂不支持customizedTraceTopic参数,即customizedTraceTopic参数应传null或者传空。
- 友商阿里的迁移的代码中有会 AccessChannel.CLOUD 的配置,设置后也会导致查询异常,这个在友商云迁移过来的客户需要做代码的适配调整。
代码示例
1.在Java项目的pom.xml中添加以下依赖。指定版本 4.8.0
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.8.0</version>
</dependency>
- 生产者示例代码
package org.example;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("$ak", "$sk"));
}
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("GID_*****", getAclRPCHook(),true,null);
producer.setNamesrvAddr("http://MQ_INST_*********.rocketmq.volces.com:9876");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("lxbtest1",
"4.7.1",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
//消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
//在应用退出前,销毁Producer对象。
producer.shutdown();
}
}
- 消费者示例代码
package org.example;
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
//设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为您的AccessKey ID和AccessKey Secret。
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("$ak", "$sk"));
}
public static void main(String[] args) throws MQClientException {
//设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为您火山引擎账号的AccessKey ID和AccessKey Secret。
//设置为火山引擎消息队列 RocketMQ版实例的接入点。
// 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_lxb_test", getAclRPCHook(), new AllocateMessageQueueAveragely(),true,null);
consumer.setNamesrvAddr("http://MQ_INST_********.rocketmq.volces.com:9876");
// 订阅多 topic 消费
consumer.subscribe("lxbtest", "*");
consumer.subscribe("lxbtest1", "*");
consumer.subscribe("lxbtest2", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
- 前端查看消费轨迹信息如下:
参考文档
如果您有其他问题,欢迎您联系火山引擎技术支持服务