rocketmq 控制台

地址:https://pai.flyrise.cn/rocketmq-dashboard

消息订阅

v1.8.1-RC2更新内容
1.添加了本地调试的功能,添加了一个静态页面/mq/status.html ,能查看状态以及消费的数据。

需配置 ,仅开发使用

v1.8.1-RC1更新内容
1.同一组件使用的是同一个group,group默认情况下是工程名,暂时无法修改。节省了配置以及在私有化的情况下,不需要创建太多的group

v1.7.8更新内容
1.添加子套件使用同一个后台进行监听时,因为租户隔离而监听不到数据的问题
2.在nacos 配置中添加配置 pai.mq.subSuiteList = (多个套件code使用逗号隔开) 例如 cn.flyrise.bussiness.lexxx,cn.flyrise.bussiness.conxxx
3.一定要记得提交到nacos.yml
4.由于某些功能是判断了某个套件是否订阅了才推送数据,所以类似流程需要在子套件也监听流程的主题才可以正常使用

消息订阅

v1.6.0更新内容
1.修复多主题订阅导致消息监听异常
2.添加自定义动态订阅分组

引入依赖

pom.xml

        <dependency>
            <groupId>cn.flyrise</groupId>
            <artifactId>pai-common-mq</artifactId>
            <version>1.7.8</version>
        </dependency>

消息订阅

v1.4.3更新内容

1.修复租户过滤bug

消息订阅

v1.4.2更新内容

消息订阅

v1.4.1更新内容

1.支持延迟消息
延迟时间级别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 1d 2d
s代表秒,m代表分钟,h代表小时,d代表天
DelayEnums.LEVEL1 对应上方的 1秒
DelayEnums.LEVEL4 对应上方的 30秒,以此类推

2.注解上的主题topic以及suiteCode 支持表达式${pai.mq.topic}

引入依赖

pom.xml

        <dependency>
            <groupId>cn.flyrise</groupId>
            <artifactId>pai-common-mq</artifactId>
            <version>1.4.1</version>
        </dependency>

配置

在 application.yml 或者 nacos 添加配置

AppID 和 appSecret 需要从开放平台中对应的套件去申请

pai:
  resource:
    appId: 'xxx'
    appSecret: 'xxx'
  mq:
    enable: true

延迟消息

消息的内容已经强制使用String了
java 代码

    @Resource
    private PaiMqTemplate<MyMsg> template;
    private final static String TAG = "YEWU1";
    private final static String TOPIC = "topic";

    //同步
    @GetMapping("send")
    public Reply send(MsgVo msg) {
       PaiMqMessageBean<MyMsg> bean = new PaiMqMessageBean.Builder<MyMsg>()
                .tag(TAG)
                .status(PaiMqConstants.ADD)
                .key(msg.getId())
                .data(msg)
                .build();

        template.send(topic, bean, DelayEnums.LEVEL4);
        return Reply.success();
    }
    //异步
    @GetMapping("sendAsync")
    public Reply sendAsync(MsgVo msg) {
       PaiMqMessageBean<MyMsg> bean = new PaiMqMessageBean.Builder<MyMsg>()
                .tag(TAG)
                .status(PaiMqConstants.ADD)
                .key(msg.getId())
                .data(msg)
                .build();
        template.sendAsync(topic, bean)
                .thenAccept(messageId -> logger.info("Published message: " + messageId)
                ).exceptionally(ex -> {
                    logger.error("Failed to publish: " + ex);
                return null;
        });
        return Reply.success();
    }

v1.4.0更新内容

1.简化代码,去掉生产者定义的配置文件
2.兼容了rocketmq
3.支持本地化配置
4.支持使用云端配置,无需配置中间件的服务地址以及其他配置
5.数据结构调整,添加了tag属性作为业务隔离参数

6. 发送接口以及消费的注解都改了

PulsarTemplate 改为 PaiMqTemplate,注解 @PulsarListener 改为 @PaiMqListener

7.由于数据结构以及各个方面的调整,新的配置需要重新创建主题才能使用

8.旧的消息主题将在1.5.0版本删除,如果暂时不想修改,pai-common-mq 版本使用 1.1.0即可。

使用

引入依赖

pom.xml

        <dependency>
            <groupId>cn.flyrise</groupId>
            <artifactId>pai-common-mq</artifactId>
            <version>1.4.0</version>
        </dependency>

配置

在 application.yml 或者 nacos 添加配置

AppID 和 appSecret 需要从开放平台中对应的套件去申请

pai:
  resource:
    appId: 'xxx'
    appSecret: 'xxx'
  mq:
    enable: true

消息定义

topic: 主题的名称,建议使用驼峰来命名,一个套件建议使用一个主题,除非你的业务真的没办法再去拆分
tag: tag作用于业务的区分,例如用户同步可以分为全量、增量
key: 业务唯一主键,方便查询
status: 该属性用作于业务细分,例如增量的数据可以分为新增,修改,删除等等
data: 消息的数据内容

生产者

消息的内容已经强制使用String了
java 代码

    @Resource
    private PaiMqTemplate<MyMsg> template;
    private final static String TAG = "YEWU1";
    private final static String TOPIC = "topic";
    //同步
    @GetMapping("send")
    public Reply send(MsgVo msg) {
        PaiMqMessageBean bean = new PaiMqMessageBean.Builder()
                .tag(TAG)
                .key(msg.getId())
                .status(PaiMqConstants.ADD)
                .data(msg)
                .build();
        pulsarTemplate.send(TOPIC, bean);
        return Reply.success();
    }

    //异步
    @GetMapping("sendAsync")
    public Reply sendAsync(MsgVo msg)  {
        PaiMqMessageBean bean = new PaiMqMessageBean.Builder()
                 .tag(TAG)
                .key(msg.getId())
                .status(PaiMqConstants.ADD)
                .data(msg)
                .build();
        pulsarTemplate.sendAsync(TOPIC, bean).thenAccept(messageId -> {
            System.out.println("Published message: " + messageId);
        }).exceptionally(ex -> {
            System.err.println("Failed to publish: " + ex);
            return null;
        });
        return Reply.success();
    }

消费者

使用 @PaiMqListener注解

java 代码:

/**
 * @author pai
 */
@Service
public class PulsarConsumer {
    //支持表达式,建议使用
    @PaiMqListener(suiteCode = "${yewu.app.code}", topic = "${yewu.app.topic.topicA}")
    public void consumer(String string) {

        System.out.println("Consumer Listener (:" + DateUtil.now() + "): msg:" + string);
    }


    @PaiMqListener(suiteCode = "cn.flyrise.demo.attendance", topic = "ceshi001")
    public void consumer2(String msg) {

        System.out.println("Consumer4 Listener (" + (i++) + ":" + DateUtil.now() + "): msg:" + msg);

        JSONObject jsonObject = JSONUtil.parseObj(msg);
        String tag = jsonObject.getStr("tag");
        JSONObject data = jsonObject.getJSONObject("data");
        AskLeaveVO askLeaveVO = JSONUtil.toBean(data, AskLeaveVO.class);
        switch (tag) {

            case "tagA":
                System.out.println("tag A :" + askLeaveVO);
                break;
            case "tagB":
                System.out.println("tag B :" + askLeaveVO);
                break;
            default:
                break;
        }


    }

}
文档更新时间: 2023-07-12 13:42   作者:朱灿奕