博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot整合xxl-mq学习笔记
阅读量:5167 次
发布时间:2019-06-13

本文共 8562 字,大约阅读时间需要 28 分钟。

首先xxl-mq是大神xuxueli开发的一个消息中间件框架:

与springboot整合过程:

com.xuxueli
xxl-mq-samples
1.3.0-SNAPSHOT
4.0.0
xxl-mq-samples-springboot
jar
org.springframework.boot
spring-boot-starter-parent
${spring-boot.version}
pom
import
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-freemarker
com.xuxueli
xxl-mq-client
${parent.version}
org.springframework.boot
spring-boot-maven-plugin
${spring-boot.version}
repackage

2  propeties

### webserver.port=8081server.context-path=/### resourcesspring.mvc.static-path-pattern=/static/**spring.resources.static-locations=classpath:/static/### freemarkerspring.freemarker.templateLoaderPath=classpath:/templates/spring.freemarker.suffix=.ftlspring.freemarker.charset=UTF-8spring.freemarker.request-context-attribute=requestspring.freemarker.settings.number_format=0.########### xxl-mq, admin conf 这个配置时admin部署的位置xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin### xxl-mq, access tokenxxl.mq.accessToken=

index。html

        

需要在juery下面引入:

日志管理配置:

logback。xml

logback
%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{
36} - %msg%n
${log.path}
${log.path}.%d{yyyy-MM-dd}.zip
%date %level [%thread] %logger{
36} [%file : %line] %msg%n

配置XxlMqConf.java

package com.xxl.mq.sample.springboot.conf;import com.xxl.mq.client.factory.impl.XxlMqSpringClientFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;@Componentpublic class XxlMqConf {    // ---------------------- param ----------------------    @Value("${xxl.mq.admin.address}")    private String adminAddress;    @Value("${xxl.mq.accessToken}")    private String accessToken;    @Bean    public XxlMqSpringClientFactory getXxlMqConsumer(){        XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory();        xxlMqSpringClientFactory.setAdminAddress(adminAddress);        xxlMqSpringClientFactory.setAccessToken(accessToken);        return xxlMqSpringClientFactory;    }}

controller 根据传入的参数进行设置:

import com.xxl.mq.client.message.XxlMqMessage;import com.xxl.mq.client.producer.XxlMqProducer;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.ExceptionHandler;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import java.util.Calendar;import java.util.Date;/** * index controller * @author xuxueli 2015-12-19 16:13:16 */@Controllerpublic class IndexController {    @RequestMapping("/")    public String index(){        return "index";    }    @RequestMapping("/produce")    @ResponseBody    public String produce(int type){        String topic = "topic_1";        String data = "时间戳:" + System.currentTimeMillis();        if (type == 0) {            /**             * 并行消费             */            XxlMqProducer.produce(new XxlMqMessage(topic, data));        } else if (type == 1) {            /**             * 串行消费             */            XxlMqProducer.produce(new XxlMqMessage(topic, data, 1L));        } else if (type == 2) {            /**             * 广播消费             */            XxlMqProducer.broadcast(new XxlMqMessage(topic, data));        } else if (type == 3) {            Calendar calendar = Calendar.getInstance();            calendar.add(Calendar.MINUTE, 5);            Date effectTime = calendar.getTime();            /**             * 延时消息             */            XxlMqProducer.produce(new XxlMqMessage(topic, data, effectTime));        } else if (type == 4) {            int msgNum = 10000;            long start = System.currentTimeMillis();            for (int i = 0; i < msgNum; i++) {                XxlMqProducer.produce(new XxlMqMessage("topic_1", "No:"+i));            }            long end = System.currentTimeMillis();            return "Cost = " + (end-start);        } else {            return "Type Error.";        }        return "SUCCESS";    }    @ExceptionHandler({Exception.class})     public String exception(Exception e) {         e.printStackTrace();         return e.getMessage();     }}

对应的消费者:

import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_2")@Servicepublic class Demo2MqComsumer implements IMqConsumer {    private Logger logger = LoggerFactory.getLogger(Demo2MqComsumer.class);    @Override    public MqResult consume(String data) throws Exception {        logger.info("[Demo2MqComsumer] 消费一条消息:{}", data);        return MqResult.SUCCESS;    }}
import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_1")@Servicepublic class DemoAMqComsumer implements IMqConsumer {    private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class);    @Override    public MqResult consume(String data) throws Exception {        logger.info("[DemoAMqComsumer] 消费一条消息:{}", data);        return MqResult.SUCCESS;    }}
import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_1")@Servicepublic class DemoBMqComsumer implements IMqConsumer {    private Logger logger = LoggerFactory.getLogger(DemoBMqComsumer.class);    @Override    public MqResult consume(String data) throws Exception {        logger.info("[DemoBMqComsumer] 消费一条消息:{}", data);        return MqResult.SUCCESS;    }}
import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_1", group = MqConsumer.EMPTY_GROUP)@Servicepublic class DemoCMqComsumer implements IMqConsumer {    private Logger logger = LoggerFactory.getLogger(DemoCMqComsumer.class);    @Override    public MqResult consume(String data) throws Exception {        logger.info("[DemoCMqComsumer] 消费一条消息:{}", data);        return MqResult.SUCCESS;    }}

 

转载于:https://www.cnblogs.com/xiufengchen/p/10406020.html

你可能感兴趣的文章
有关快速幂取模
查看>>
NOI2018垫底记
查看>>
注意java的对象引用
查看>>
C++ 面向对象 类成员函数this指针
查看>>
NSPredicate的使用,超级强大
查看>>
自动分割mp3等音频视频文件的脚本
查看>>
判断字符串是否为空的注意事项
查看>>
布兰诗歌
查看>>
js编码
查看>>
Pycharm Error loading package list:Status: 403错误解决方法
查看>>
steps/train_sat.sh
查看>>
转:Linux设备树(Device Tree)机制
查看>>
iOS 组件化
查看>>
(转)Tomcat 8 安装和配置、优化
查看>>
(转)Linxu磁盘体系知识介绍及磁盘介绍
查看>>
tkinter布局
查看>>
命令ord
查看>>
Sharepoint 2013搜索服务配置总结(实战)
查看>>
博客盈利请先考虑这七点
查看>>
使用 XMLBeans 进行编程
查看>>