如何解决使用RocketMQ的消息轨迹信息无法查看问题

容器与中间件中间件技术服务知识库
问题描述

RocketMQ 正常生产和消费消息,但是消费轨迹无法查看的问题该如何排查?

问题分析

此类问题原因一般如下:

  1. 客户端 SDK 使用的版本不对, 需要使用 SDK 版本为 4.8.0, 4.7 和 4.9 的版本均会导致前端页面报错如下图:

图片

  1. 生产端和消费端没有开启消费轨迹功能,enableMsgTrace 需要设置为 true,
  • 生产者开启消息轨迹:
AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", aclHook,true,null);
  • 消费者开启消息轨迹:
AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", aclHook, new AllocateMessageQueueAveragely(),true,null);
  1. 轨迹的 Topic 设置的不对,暂不支持customizedTraceTopic参数,即customizedTraceTopic参数应传null或者传空。
  2. 友商阿里的迁移的代码中有会 AccessChannel.CLOUD 的配置,设置后也会导致查询异常,这个在友商云迁移过来的客户需要做代码的适配调整。
代码示例

1.在Java项目的pom.xml中添加以下依赖。指定版本 4.8.0

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.8.0</version>
        </dependency>

  1. 生产者示例代码
package org.example;
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("$ak", "$sk"));
    }

    public static void main(String[] args) throws MQClientException {

        DefaultMQProducer producer = new DefaultMQProducer("GID_*****", getAclRPCHook(),true,null);
        producer.setNamesrvAddr("http://MQ_INST_*********.rocketmq.volces.com:9876");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("lxbtest1",
                        "4.7.1",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        //在应用退出前,销毁Producer对象。
        producer.shutdown();
    }
}
  1. 消费者示例代码
package org.example;
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为您的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("$ak", "$sk"));
    }
    public static void main(String[] args) throws MQClientException {
        //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为您火山引擎账号的AccessKey ID和AccessKey Secret。
        //设置为火山引擎消息队列 RocketMQ版实例的接入点。
        // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GID_lxb_test", getAclRPCHook(), new AllocateMessageQueueAveragely(),true,null);
        consumer.setNamesrvAddr("http://MQ_INST_********.rocketmq.volces.com:9876");
        // 订阅多 topic 消费
        consumer.subscribe("lxbtest", "*");
        consumer.subscribe("lxbtest1", "*");
        consumer.subscribe("lxbtest2", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}
  1. 前端查看消费轨迹信息如下: 图片
参考文档

如果您有其他问题,欢迎您联系火山引擎技术支持服务

60
0
0
0
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论