消息订阅

获取配置

获取套件的appId、appSecret

添加主题

属性描述:
公开:公开表示提供给别的套件去订阅的主题

租户隔离:表示该主题订阅后,业务数据是隔离的不是通用的消息

例如:如果是通讯录这种通用的数据,无需使用租户隔离的配置
如果是流程,由于每个套件都会有流程表单,并且是隔离的。这种时候就需要使用租户隔离,在发送消息的时候需要通过套件隔离,并且产生的主题都是不一样的,非常安全。

订阅

各自的套件去订阅

使用

引入依赖

pom.xml

        <dependency>
            <groupId>cn.flyrise</groupId>
            <artifactId>pai-common-mq</artifactId>
            <version>1.1.0</version>
        </dependency>

配置

在 application.yml 或者 nacos 添加配置

pai:
  resource:
    appId: '1407538241518833665'
    appSecret: '3ohefxbmwedezsq6gg050ja56azk00av'
  pulsar:
    service-url: 'pulsar://47.99.45.161:6050,47.96.127.14:6050,47.99.45.161:6050'  

生产者

建议都用String,不然在多版本的管理下,某些场景会出现订阅不了。
添加生产者 ProducerConfiguration 配置类

@Configuration
public class ProducerConfiguration {

    @Resource
    private PulsarProperties pulsarProperties;

    @Bean
    @Primary
    public ProducerFactory producerFactory() {
        return new ProducerFactory(pulsarProperties)
                .addProducer("topic1",String.class)
                .addProducer("topic2",MyMsg.class)
                ;
    }
}

addProducer(“主题”, class);

发送消息:
java 代码

    @Resource
    private PulsarTemplate<MyMsg> pulsarTemplate;

    //同步
    @GetMapping("send")
    public Reply send() throws PulsarClientException {
        MyMsg myMsg = new MyMsg();
        myMsg.setId(IdUtils.snowFlakeId());
        myMsg.setName("cm");
        myMsg.setMsg("我今天成为了优秀党员。");
        pulsarTemplate.send("topic2", myMsg);
        return Reply.success();
    }

    //异步
    @GetMapping("sendAsync")
    public Reply sendAsync() throws PulsarClientException {
        MyMsg myMsg = new MyMsg();
        myMsg.setName("cm");
        myMsg.setMsg("我今天成为了优秀党员。");
        myMsg.setId(IdUtils.snowFlakeId());
        pulsarTemplate.sendAsync("topic1", myMsg)
                .thenAccept(messageId -> {
                    System.out.println("Published message: " + messageId);
                })
                .exceptionally(ex -> {
                    System.err.println("Failed to publish: " + ex);
                    return null;
                });
        return Reply.success();
    }

消费者

公开的消费 统一用Shared,

使用 @PulsarListener注解
PulsarListener 属性
· suiteKey 套件标识
· namespace 命名空间 一般设置为dev 和prod 标记开发还是生产环境
· topic 主题
· class 序列化的java类
· serialization 序列化的类型,一般json 就可以了
· subscriptionType 订阅的类型 使用枚举类 SubscriptionType

Exclusive  排斥:

只能有一个消费者在同一个主题与相同的订阅名称。


Shared  共享的:

多个消费者将能够使用相同的订阅名称,消息将被发送
根据连接的消费者之间的循环轮换。

在此模式下,不保证消费顺序。

Failover  故障转移


多个消费者将能够使用相同的订阅名称,但只有一个消费者将收到消息。
如果那个消费者断开连接,另一个连接的消费者将开始接收消息。

在故障转移模式下,保证消费顺序。
在分区主题的情况下,排序是保证在每个分区的基础上。
分区分配将被划分到可用的用户。在每个分区,
在一个给定的时间点,最多有一个消费者是活跃的。

Key_Shared

多个消费者将能够使用相同的订阅和所有消息与相同的密钥
将被分派给仅一个消费者。
使用ordering_key覆盖消息排序的消息键。

java 代码:

/**
 * @author pai
 */
@Service
public class PulsarConsumer {

    @PulsarListener(topic = "topic1",clazz = String.class)
    public void consumer1(String msg){

        System.out.println("PulsarConsumer Listener:"+msg);
    }

    @PulsarListener(topic = "topic2",clazz = MyMsg.class)
    public void consumer2(MyMsg message){

        System.out.println("PulsarConsumer Listener2:"+ JSONUtil.toJsonPrettyStr(message));
    }

}

输出结果:

租户(套件)隔离

介绍

pai-common-mq

额外的用法

  1. PulsarMessage Wrapper

    @Service
    class MyConsumer {
    
     @PulsarListener(topic="my-topic", clazz=MyMsg.class)
     void consume(PulsarMessage<MyMsg> myMsg) { 
         producer.send(TOPIC, msg.getValue()); 
     }
    }
  2. SpeL 支持

my.custom.topic.name=foo
@PulsarListener(topic = "${my.custom.topic.name}", clazz = MyMsg.class)
public void consume(MyMsg myMsg) {
}
文档更新时间: 2023-07-12 13:42   作者:朱灿奕