qmq-spring-boot-starter 启用消费者模式配置消费监听器

程序名称:qmq-spring-boot-starter 启用消费者模式配置消费监听器

授权协议: Apache

操作系统: 跨平台

开发语言: Java

qmq-spring-boot-starter 启用消费者模式配置消费监听器 介绍

使用方式

引入 Maven 依赖(已上传到中央仓库)

<dependency>
    <groupId>xin.wjtree.qmq</groupId>
    <artifactId>qmq-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>

添加 Spring Boot 配置(YML)

spring:
  application:
    name: qmq-demo
  qmq:
    # 应用标识 appcode,必填
    app-code: qmq-demo
    # 服务器地址 metaserver,必填
    meta-server: http://127.0.0.1:8080/meta/address

    # 生产者配置,发送消息的线程池的设置,选填
    producer:
      # 发送线程数,默认 3
      send-threads: 3
      # 默认每次发送时最大批量大小,默认 30
      send-batch: 30
      # 如果消息发送失败,重试次数,默认 10
      send-try-count: 10
      # 异步发送队列大小,默认 10000
      max-queue-size: 10000

    # 使用 QmqTemplate 发送消息的默认主题,默认值 default_subject
    template:
      default-subject: default_subject

    # 消费者配置,消费消息的线程池的设置,选填
    consumer:
      # 线程名称前缀,默认 qmq-process
      thread-name-prefix: qmq-process
      # 线程池大小,默认 2
      core-pool-size: 2
      # 最大线程池大小,默认 2
      max-pool-size: 2
      # 线程池队列大小,默认 1000
      queue-capacity: 1000

    # 消息主题和分组配置,选填
    # 使用 QmqConsumer 注解时,可使用 SpEL 表达式引入以下主题和分组
    subject:
      sub1: sub1
      sub2: sub2
      sub3: sub3
      # more subject ...
    group:
      group1: group1
      group2: group2
      group3: group3
      # more group ...

logging:
  level:
    # 设置 qmq-spring-boot-starter 的日志级别
    xin.wjtree.qmq: trace

server:
  port: 8989

发送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.MessageSendStateListener;
import xin.wjtree.qmq.QmqTemplate;
import xin.wjtree.qmq.autoconfigure.QmqProperties;
import xin.wjtree.qmq.constant.QmqTimeUnit;
import xin.wjtree.qmq.internal.QmqAlias;
import xin.wjtree.qmq.internal.QmqIgnore;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest
public class QmqTest {
    @Resource
    private QmqTemplate template;
    @Resource
    private QmqProperties properties;

    /**
     * 发送即时消息
     * @throws InterruptedException
     */
    @Test
    public void sendImmediate() throws InterruptedException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 一般使用 template.send(properties.getSubject().get("sub1"), getUser()) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).send(properties.getSubject().get("sub1"), getUser());

        // 计数器减1
        latch.await();
    }

    /**
     * 发送延时消息
     * @throws InterruptedException
     */
    @Test
    public void sendDelay() throws InterruptedException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 延时 10 秒发送消息
        // 一般使用 template.sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS);

        // 计数器减1
        latch.await();
    }

    /**
     * 发送定时消息
     * @throws InterruptedException
     */
    @Test
    public void sendSchedule() throws InterruptedException, ParseException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 定时发送的日期时间
        Date date = new SimpleDateFormat("yyyy-MM-dd HH

qmq-spring-boot-starter 启用消费者模式配置消费监听器 官网

https://gitee.com/wjtree/qmq-spring-boot-starter

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


memcached-session-manager 将session存储到memchached实现方案时。他主要功能是修改tomcat的session存储机制,使之能够把session序列化存放到memcached中。
Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。
EasyTomcat 是一个用来帮助简化 Tomcat 和MySQL 管理的系统,你可以启动、停止和配置 Tomcat和MySQL
riak-session-manager 是使用 Riak 来存储Tomcat session 信息的项目。 配置方法:
tomcat-redis-session-manager 是一个用来将 Tomcat 的 Session 数据存储在 Redis 库中的项目。
这是一款在 Oracle 的 JDeveloper 开发环境下管理Tomcat 的插件,如下图所示:
扩展Tomcat 6.x,使用redis存放session信息!是一个Eclipse项目,最好用EGit来Clone(因为里面有个中文文件名的说明文件).
dhcpcd 是一个兼容 RFC2131的DHCP客户端程序,支持DHCP的全部功能并且体积非常小,只有差不多 46k。
phpDHCPAdmin 是一个基于 Web 的动态主机配置协议(DHCP Daemon)的管理工具,可单独设置组、用户级别;PXE、多子网;空间租赁管理功能。可对数据进行可视化展示、分类。适合大规模的 dhcpd 环境管理。
JDHCP 项目的目的是为 Java 应用增加简单操作 DHCP 协议的方法,DHCP是动态主机配置协议的简称。使用这个API可以轻松的发送、接收和解析DHCP消息,可用于编写DHCP的客户端、服务器端应用。
DHCP服务器为客户端计算机分配IP地址,通常应用在企业网络中以减小配置成本,所有客户端的IP地址都保存在服务器端。
dhcp4java是一个用于操作DHCP信息包的纯Java类库。适用于DHCP服务器, DHCP客户端或DHCP转发。
dhcp-forwarder 是一个 DHCP 中继代理,它将在不同的子网广播域中转发 DHCP 广播信息。
不用看都知道是一个开源的 DHCP 服务器。 Open DHCP Server is a multi-subnet DHCP server. It supports both dynamic and
GAdmin-ProFTPD是一个基于GTK的可视化DHCP服务端管理工具。 更多的屏幕截图请看:http://mange.dynalias.org/linux/gadmin-dhcpd/screenshots/
Dual DHCP DNS Server 是一个提供 DHCP 和 DNS 服务的服务器软件,每一个功能都可以单独启用或者关闭。
Dhcpy6d 是一个开源的 DHCPv6 的服务器软件,相当于为 IPv6 客户端提供 DHCP 协议。
DHCP as a filesystem,要求 FUSE 的支持,使用 Go 语言开发。 安装: GOFUSE=github.com/hanwen/go-fuse
简易图床支持 HDFS 本地存储远端存储等。 Status Esay Graph bed Use HDFS Use Qiniu Use upyun Use Local
一个使用python开发的简单好用的 PXE (DHCP/TFTP/HTTP) 服务器,同时支持netboot、dhcp-