Rocketmq消息中间件中通过message key找消息的问题

增长营销容器服务机器学习
  1. Rocketmq的安装布署:

参考:http://rocketmq.apache.org/docs/quick-start/

  1. Rocketmq的简单应用

参考:https://github.com/apache/rocketmq/tree/master/example

  1. MessageQueueSelector


      1. `public interface MessageQueueSelector {`
2. `MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);`
3. `}`


    

RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上,RocketMQ默认提供了三种实现,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三个入参,分别为消息队列集合、消息和扩展参数。本示例通过使用扩展参数来实现消息通道的定向发送和接收。

RocketMQ在设计的时候就支持tag了,因为他的索引文件就包含了tag的。 后来为了更去的过滤功能,更是扩展格式里,能进一步根据SQL92或者创建时间来过滤了。可以自定义MessageSelector来获取需要的消息。

ConsumeQueue扩展格式:支持sql92标准来过滤 ConsumeQueue标准格式只能通过tags搜索,不能使用用filters和commitTime搜索,于是扩展格式增加了: 参考:http://rocketmq.apache.org/docs/filter-by-sql92-example/

  1. 生产者示例:

      1. `DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");`
2. `producer.start();`
3. 
4. `Message msg = new Message("TopicTest",`
5. `tag,`
6. `("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)`
7. `);`
8. `// Set some properties.`
9. `msg.putUserProperty("a", String.valueOf(i));`
10. 
11. `SendResult sendResult = producer.send(msg);`
12. 
13. `producer.shutdown();`


    
  1. 消费者示例:

      1. `DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");`
2. 
3. `// only subsribe messages have property a, also a >=0 and a <= 3`
4. `consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");`
5. 
6. `consumer.registerMessageListener(new MessageListenerConcurrently() {`
7. `@Override`
8. `public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {`
9. `return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;`
10. `}`
11. `});`
12. `consumer.start();`


    

picture.image

IndexFile:支持查询消息,topic+key+最多条数+开始时间+结束时间 public QueryOffsetResult queryOffset(String topic,String key,int maxNum,long begin,long end){...}

  1. 怎样设计IndexFile的物理存储内容才能满足上面的要求?

RocketMQ的物理存储总结:

  • 消息实际内容存储在CommitLog中(这点和Kafka大有不同,这也是RocketMQ没有kafka那么大的吞吐但是吞吐更稳定的原因);
  • 为了能有多个Consumer并行消费,设计了基于(topic,queued)区分的ConsumeQueue;
  • 为了在消费时在Broker上就过滤掉不感兴趣的内容,支持为Message打tag,订阅时只得到相关的tag的消息,将tagCode存储于其上。
  • 为了订阅时能做到除了tag外的更多过滤,设计ConsumeQueueExt格式,通过BloomFilter;
  • 为了满足根据key和时间段进行查询,设计了IndexFile
  • Kafka是不支持broker端过滤的,只能通过offset拿数据,拿到Consumer里,自己把Message解析出来,在Consumer里过滤。

本文主要参考RocketMq实战和for神的群发言总结而来。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
云原生环境下的日志采集存储分析实践
云原生场景下,日志数据的规模和种类剧增,日志采集、加工、分析的多样性也大大增加。面对这些挑战,火山引擎基于超大规模下的 Kubernetes 日志实践孵化出了一套完整的日志采集、加工、查询、分析、消费的平台。本次主要分享了火山引擎云原生日志平台的相关实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论