qmq-spring-boot-starter 启用消费者模式配置消费监听器 介绍
使用方式
- QMQ - https://github.com/qunarcorp/qmq
- Spring Boot Starter for QMQ - https://gitee.com/wjtree/qmq-spring-boot-starter
引入 Maven 依赖(已上传到中央仓库)
<dependency> <groupId>xin.wjtree.qmq</groupId> <artifactId>qmq-spring-boot-starter</artifactId> <version>1.0.0</version> </dependency>
添加 Spring Boot 配置(YML)
spring: application: name: qmq-demo qmq: # 应用标识 appcode,必填 app-code: qmq-demo # 服务器地址 metaserver,必填 meta-server: http://127.0.0.1:8080/meta/address # 生产者配置,发送消息的线程池的设置,选填 producer: # 发送线程数,默认 3 send-threads: 3 # 默认每次发送时最大批量大小,默认 30 send-batch: 30 # 如果消息发送失败,重试次数,默认 10 send-try-count: 10 # 异步发送队列大小,默认 10000 max-queue-size: 10000 # 使用 QmqTemplate 发送消息的默认主题,默认值 default_subject template: default-subject: default_subject # 消费者配置,消费消息的线程池的设置,选填 consumer: # 线程名称前缀,默认 qmq-process thread-name-prefix: qmq-process # 线程池大小,默认 2 core-pool-size: 2 # 最大线程池大小,默认 2 max-pool-size: 2 # 线程池队列大小,默认 1000 queue-capacity: 1000 # 消息主题和分组配置,选填 # 使用 QmqConsumer 注解时,可使用 SpEL 表达式引入以下主题和分组 subject: sub1: sub1 sub2: sub2 sub3: sub3 # more subject ... group: group1: group1 group2: group2 group3: group3 # more group ... logging: level: # 设置 qmq-spring-boot-starter 的日志级别 xin.wjtree.qmq: trace server: port: 8989
发送消息
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import qunar.tc.qmq.Message; import qunar.tc.qmq.MessageSendStateListener; import xin.wjtree.qmq.QmqTemplate; import xin.wjtree.qmq.autoconfigure.QmqProperties; import xin.wjtree.qmq.constant.QmqTimeUnit; import xin.wjtree.qmq.internal.QmqAlias; import xin.wjtree.qmq.internal.QmqIgnore; import javax.annotation.Resource; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; @RunWith(SpringRunner.class) @SpringBootTest public class QmqTest { @Resource private QmqTemplate template; @Resource private QmqProperties properties; /** * 发送即时消息 * @throws InterruptedException */ @Test public void sendImmediate() throws InterruptedException { // 计数器,执行1次结束 CountDownLatch latch = new CountDownLatch(1); // 一般使用 template.send(properties.getSubject().get("sub1"), getUser()) 即可 template.withSendStateListener(new MessageSendStateListener() { @Override public void onSuccess(Message m) { latch.countDown(); } @Override public void onFailed(Message m) { latch.countDown(); } }).send(properties.getSubject().get("sub1"), getUser()); // 计数器减1 latch.await(); } /** * 发送延时消息 * @throws InterruptedException */ @Test public void sendDelay() throws InterruptedException { // 计数器,执行1次结束 CountDownLatch latch = new CountDownLatch(1); // 延时 10 秒发送消息 // 一般使用 template.sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS) 即可 template.withSendStateListener(new MessageSendStateListener() { @Override public void onSuccess(Message m) { latch.countDown(); } @Override public void onFailed(Message m) { latch.countDown(); } }).sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS); // 计数器减1 latch.await(); } /** * 发送定时消息 * @throws InterruptedException */ @Test public void sendSchedule() throws InterruptedException, ParseException { // 计数器,执行1次结束 CountDownLatch latch = new CountDownLatch(1); // 定时发送的日期时间 Date date = new SimpleDateFormat("yyyy-MM-dd HHqmq-spring-boot-starter 启用消费者模式配置消费监听器 官网
https://gitee.com/wjtree/qmq-spring-boot-starter
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。