rocketmq4.9.6版本把filtersrv删除是出于什么目的呢?

在 RocketMQ 4.9.6 版本中,filtersrv 被移除了,这一变化是出于以下几个目的和考虑:

rocketmq4.9.6版本把filtersrv删除是出于什么目的呢?
(图片来源网络,侵删)

1、架构优化:RocketMQ 作为一个高性能、分布式的消息中间件,其架构设计和实现一直在不断演进和优化filtersrv 的移除是其中一部分,旨在简化架构,提高系统的可维护性和扩展性。

2、功能整合filtersrv 主要用于消息过滤,但在新版本中,这部分功能被整合到了其他组件中,Broker 或 NameServer,这样的整合有助于减少组件数量,降低系统复杂性。

3、性能提升:通过移除 filtersrv,可以减少消息传递的环节,从而降低延迟,提高整体性能。

4、资源节省filtersrv 作为额外的服务进程,会占用一定的系统资源,移除后,可以节省这部分资源,降低部署和维护成本。

5、易用性增强:对于用户来说,filtersrv 的存在可能增加了部署和维护的复杂性,移除后,用户可以更加便捷地进行部署和使用。

接下来,我们将详细探讨如何在移除 filtersrv 后,使用 RocketMQ 进行消息过滤。

消息过滤机制

在 RocketMQ 中,消息过滤主要通过两种方式实现:

1、消费端过滤:消费者在订阅主题时,可以通过设置过滤条件(如 SQL 表达式),来选择性地接收消息,这种方式适用于消费者对特定类型或属性的消息感兴趣,而不想处理全部消息的场景。

2、生产者端过滤:生产者在发送消息时,可以设置消息的属性(如键值对),这些属性可以被用于后续的消息筛选和处理。

示例代码

以下是一个简单的示例,展示了如何在移除 filtersrv 后,使用 RocketMQ 进行消息过滤。

生产者端

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();
        // 创建消息实例
        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
        // 设置消息属性
        msg.setKeys("KeyA");
        // 发送消息
        producer.send(msg);
        // 关闭生产者
        producer.shutdown();
    }
}

消费者端

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅主题
        consumer.subscribe("TopicTest", "*");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在上述示例中,生产者发送了一条带有属性 KeyA 的消息,消费者可以基于这些属性进行过滤,只消费满足特定条件的消息。

原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/537294.html

(0)
未希新媒体运营
上一篇 2024-04-29 12:10
下一篇 2024-04-29 12:12

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

云产品限时秒杀。精选云产品高防服务器,20M大带宽限量抢购  >>点击进入