首先xxl-mq是大神xuxueli开发的一个消息中间件框架:
与springboot整合过程:
com.xuxueli xxl-mq-samples 1.3.0-SNAPSHOT 4.0.0 xxl-mq-samples-springboot jar org.springframework.boot spring-boot-starter-parent ${spring-boot.version} pom import org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-freemarker com.xuxueli xxl-mq-client ${parent.version} org.springframework.boot spring-boot-maven-plugin ${spring-boot.version} repackage
2 propeties
### webserver.port=8081server.context-path=/### resourcesspring.mvc.static-path-pattern=/static/**spring.resources.static-locations=classpath:/static/### freemarkerspring.freemarker.templateLoaderPath=classpath:/templates/spring.freemarker.suffix=.ftlspring.freemarker.charset=UTF-8spring.freemarker.request-context-attribute=requestspring.freemarker.settings.number_format=0.########### xxl-mq, admin conf 这个配置时admin部署的位置xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin### xxl-mq, access tokenxxl.mq.accessToken=
index。html
需要在juery下面引入:
日志管理配置:
logback。xml
logback %d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{ 36} - %msg%n ${log.path} ${log.path}.%d{yyyy-MM-dd}.zip %date %level [%thread] %logger{ 36} [%file : %line] %msg%n
配置XxlMqConf.java:
package com.xxl.mq.sample.springboot.conf;import com.xxl.mq.client.factory.impl.XxlMqSpringClientFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.stereotype.Component;@Componentpublic class XxlMqConf { // ---------------------- param ---------------------- @Value("${xxl.mq.admin.address}") private String adminAddress; @Value("${xxl.mq.accessToken}") private String accessToken; @Bean public XxlMqSpringClientFactory getXxlMqConsumer(){ XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory(); xxlMqSpringClientFactory.setAdminAddress(adminAddress); xxlMqSpringClientFactory.setAccessToken(accessToken); return xxlMqSpringClientFactory; }}
controller 根据传入的参数进行设置:
import com.xxl.mq.client.message.XxlMqMessage;import com.xxl.mq.client.producer.XxlMqProducer;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.ExceptionHandler;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import java.util.Calendar;import java.util.Date;/** * index controller * @author xuxueli 2015-12-19 16:13:16 */@Controllerpublic class IndexController { @RequestMapping("/") public String index(){ return "index"; } @RequestMapping("/produce") @ResponseBody public String produce(int type){ String topic = "topic_1"; String data = "时间戳:" + System.currentTimeMillis(); if (type == 0) { /** * 并行消费 */ XxlMqProducer.produce(new XxlMqMessage(topic, data)); } else if (type == 1) { /** * 串行消费 */ XxlMqProducer.produce(new XxlMqMessage(topic, data, 1L)); } else if (type == 2) { /** * 广播消费 */ XxlMqProducer.broadcast(new XxlMqMessage(topic, data)); } else if (type == 3) { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); Date effectTime = calendar.getTime(); /** * 延时消息 */ XxlMqProducer.produce(new XxlMqMessage(topic, data, effectTime)); } else if (type == 4) { int msgNum = 10000; long start = System.currentTimeMillis(); for (int i = 0; i < msgNum; i++) { XxlMqProducer.produce(new XxlMqMessage("topic_1", "No:"+i)); } long end = System.currentTimeMillis(); return "Cost = " + (end-start); } else { return "Type Error."; } return "SUCCESS"; } @ExceptionHandler({Exception.class}) public String exception(Exception e) { e.printStackTrace(); return e.getMessage(); }}
对应的消费者:
import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_2")@Servicepublic class Demo2MqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(Demo2MqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[Demo2MqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; }}
import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_1")@Servicepublic class DemoAMqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[DemoAMqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; }}
import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_1")@Servicepublic class DemoBMqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(DemoBMqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[DemoBMqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; }}
import com.xxl.mq.client.consumer.IMqConsumer;import com.xxl.mq.client.consumer.MqResult;import com.xxl.mq.client.consumer.annotation.MqConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;/** * Created by xuxueli on 16/8/28. */@MqConsumer(topic = "topic_1", group = MqConsumer.EMPTY_GROUP)@Servicepublic class DemoCMqComsumer implements IMqConsumer { private Logger logger = LoggerFactory.getLogger(DemoCMqComsumer.class); @Override public MqResult consume(String data) throws Exception { logger.info("[DemoCMqComsumer] 消费一条消息:{}", data); return MqResult.SUCCESS; }}