消息订阅
获取配置
获取套件的appId、appSecret
添加主题
属性描述:
公开:公开表示提供给别的套件去订阅的主题
租户隔离:表示该主题订阅后,业务数据是隔离的不是通用的消息
例如:如果是通讯录这种通用的数据,无需使用租户隔离的配置
如果是流程,由于每个套件都会有流程表单,并且是隔离的。这种时候就需要使用租户隔离,在发送消息的时候需要通过套件隔离,并且产生的主题都是不一样的,非常安全。
订阅
各自的套件去订阅
使用
引入依赖
pom.xml
<dependency>
<groupId>cn.flyrise</groupId>
<artifactId>pai-common-mq</artifactId>
<version>1.1.0</version>
</dependency>
配置
在 application.yml 或者 nacos 添加配置
pai:
resource:
appId: '1407538241518833665'
appSecret: '3ohefxbmwedezsq6gg050ja56azk00av'
pulsar:
service-url: 'pulsar://47.99.45.161:6050,47.96.127.14:6050,47.99.45.161:6050'
生产者
建议都用String,不然在多版本的管理下,某些场景会出现订阅不了。
添加生产者 ProducerConfiguration 配置类
@Configuration
public class ProducerConfiguration {
@Resource
private PulsarProperties pulsarProperties;
@Bean
@Primary
public ProducerFactory producerFactory() {
return new ProducerFactory(pulsarProperties)
.addProducer("topic1",String.class)
.addProducer("topic2",MyMsg.class)
;
}
}
addProducer(“主题”, class);
发送消息:
java 代码
@Resource
private PulsarTemplate<MyMsg> pulsarTemplate;
//同步
@GetMapping("send")
public Reply send() throws PulsarClientException {
MyMsg myMsg = new MyMsg();
myMsg.setId(IdUtils.snowFlakeId());
myMsg.setName("cm");
myMsg.setMsg("我今天成为了优秀党员。");
pulsarTemplate.send("topic2", myMsg);
return Reply.success();
}
//异步
@GetMapping("sendAsync")
public Reply sendAsync() throws PulsarClientException {
MyMsg myMsg = new MyMsg();
myMsg.setName("cm");
myMsg.setMsg("我今天成为了优秀党员。");
myMsg.setId(IdUtils.snowFlakeId());
pulsarTemplate.sendAsync("topic1", myMsg)
.thenAccept(messageId -> {
System.out.println("Published message: " + messageId);
})
.exceptionally(ex -> {
System.err.println("Failed to publish: " + ex);
return null;
});
return Reply.success();
}
消费者
公开的消费 统一用Shared,
使用 @PulsarListener
注解
PulsarListener 属性
· suiteKey 套件标识
· namespace 命名空间 一般设置为dev 和prod 标记开发还是生产环境
· topic 主题
· class 序列化的java类
· serialization 序列化的类型,一般json 就可以了
· subscriptionType 订阅的类型 使用枚举类 SubscriptionType
Exclusive 排斥:
只能有一个消费者在同一个主题与相同的订阅名称。
Shared 共享的:
多个消费者将能够使用相同的订阅名称,消息将被发送
根据连接的消费者之间的循环轮换。
在此模式下,不保证消费顺序。
Failover 故障转移
多个消费者将能够使用相同的订阅名称,但只有一个消费者将收到消息。
如果那个消费者断开连接,另一个连接的消费者将开始接收消息。
在故障转移模式下,保证消费顺序。
在分区主题的情况下,排序是保证在每个分区的基础上。
分区分配将被划分到可用的用户。在每个分区,
在一个给定的时间点,最多有一个消费者是活跃的。
Key_Shared
多个消费者将能够使用相同的订阅和所有消息与相同的密钥
将被分派给仅一个消费者。
使用ordering_key覆盖消息排序的消息键。
java 代码:
/**
* @author pai
*/
@Service
public class PulsarConsumer {
@PulsarListener(topic = "topic1",clazz = String.class)
public void consumer1(String msg){
System.out.println("PulsarConsumer Listener:"+msg);
}
@PulsarListener(topic = "topic2",clazz = MyMsg.class)
public void consumer2(MyMsg message){
System.out.println("PulsarConsumer Listener2:"+ JSONUtil.toJsonPrettyStr(message));
}
}
输出结果:
租户(套件)隔离
介绍
pai-common-mq
额外的用法
PulsarMessage Wrapper
@Service class MyConsumer { @PulsarListener(topic="my-topic", clazz=MyMsg.class) void consume(PulsarMessage<MyMsg> myMsg) { producer.send(TOPIC, msg.getValue()); } }
SpeL 支持
my.custom.topic.name=foo
@PulsarListener(topic = "${my.custom.topic.name}", clazz = MyMsg.class)
public void consume(MyMsg myMsg) {
}
文档更新时间: 2023-07-12 13:42 作者:朱灿奕