您的位置:首页 > 编程语言 > Java开发

Spring Boot 集成 RabbitMq 实战操作(二)

2017-07-10 09:13 866 查看
本人学习新框架方法、
一、先学习框架基本知识,也就是看这本书的前三章,了解基本概念。比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机。
二、然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些。
三、然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果。
四、实际项目积累经验。

RabbitMq 消息生产者向队列发送消息 (一)
MQ分为消息生产者和消息消费者,这次做的主要是消息的生产者的讲述。就是发送消息到相应的队列上

本文是用spring boot 来做的。

步骤1:
生成RabbitMqConfig配制文件,里面配制了队列,连接工厂,交换机等一些信息。
package com.basic.rabbitmq.productor.config;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;

import javax.swing.event.ChangeEvent;

/**
* Rabbitmq配置类
*
* 配制一些用户名和密码,还有就是配制队列,交换机,还有路由健
* Created by sdc on 2017/7/4.
*/
@Configuration
@ComponentScan(basePackages = {"com.basic.rabbitmq.productor"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMqConfig {

@Autowired
private Environment env;

/**
* 构建connectionfactory
* @return
* @throws Exception
*/
@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
connectionFactory.setPort(Integer.valueOf(env.getProperty("spring.rabbitmq.port").trim()));
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));

//        connectionFactory.setPublisherReturns(true);
return connectionFactory;
}

/**
* CachingConnectionFactory
* @return
* @throws Exception
*/
@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory());
/** 如果要进行消息回调,则这里必须要设置为true */
cachingConnectionFactory.setPublisherConfirms(true); // 必须要设置

return cachingConnectionFactory;
}

/**
* RabbitTemplate,类似于jdbctemplate一样的工具类
* @return
* @throws Exception
*/
@Bean
public RabbitTemplate rabbitTemplate() throws  Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
//        rabbitTemplate.setChannelTransacted(true);
//这个设置参数
//        rabbitTemplate.setMandatory(true);

//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback());
//        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback());
return rabbitTemplate;
}

@Bean
public AmqpAdmin amqpAdmin() throws  Exception {
return new RabbitAdmin(cachingConnectionFactory());
}

/**
* 通道处理
* @return
* @throws Exception
*/
@Bean
public Channel channel() throws Exception {
//队列名字
String name = env.getProperty("emial.server.queue").trim();
// 是否持久化
boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.queue.durable").trim())?
Boolean.valueOf(env.getProperty("emial.server.queue.durable").trim()) : true;

// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = StringUtils.isNotBlank(env.getProperty("emial.server.queue.exclusive").trim())?
Boolean.valueOf(env.getProperty("emial.server.queue.exclusive").trim()) : false;

// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.queue.autoDelete").trim())?
Boolean.valueOf(env.getProperty("emial.server.queue.autoDelete").trim()) : false;

ConnectionFactory connectionFactory = connectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channes = connection.createChannel();
channes.queueDeclare(name, durable, exclusive, autoDelete, null);
return channes;
}

/**
* 队列
* @return
*/
@Bean
public Queue queue() throws  Exception{
//队列名字
String name = env.getProperty("emial.server.queue").trim();
// 是否持久化
boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.queue.durable").trim())?
Boolean.valueOf(env.getProperty("emial.server.queue.durable").trim()) : true;

// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = StringUtils.isNotBlank(env.getProperty("emial.server.queue.exclusive").trim())?
Boolean.valueOf(env.getProperty("emial.server.queue.exclusive").trim()) : false;

// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.queue.autoDelete").trim())?
Boolean.valueOf(env.getProperty("emial.server.queue.autoDelete").trim()) : false;

return new Queue(name, durable, exclusive, autoDelete);
}

/**
* 配制交换机,交换机类型为topic
* @return
*/
@Bean
public TopicExchange exchange () {
//交换机的名字
String name = env.getProperty("emial.server.exchange");
// 是否持久化
boolean durable = StringUtils.isNotBlank(env.getProperty("emial.server.exchange.durable").trim())?
Boolean.valueOf(env.getProperty("emial.server.exchange.durable").trim()) : true;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("emial.server.exchange.autoDelete").trim())?
Boolean.valueOf(env.getProperty("emial.server.exchange.autoDelete").trim()) : false;
//创建交换机
return new TopicExchange(name, durable, autoDelete);
}

/**
* 绑定,交换机要绑定要队列上,交换机才能把消息放入到相应的队列上。
* @return
*/
@Bean
public Binding binding() throws  Exception{
String routekey = env.getProperty("emial.server.routekey").trim();
return BindingBuilder.bind(queue()).to(exchange()).with(routekey);
}

}
步骤二配置文件:
application.properties
#指定具体使用哪种配置环境,此处指定使用application-prod.properties配置文件中的环境
spring.profiles.active=dev


application-dev.properties
#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

emial.server.exchange=email_exchange
emial.server.exchange.durable=true
emial.server.exchange.autoDelete=false

emial.server.queue=email_queue
emial.server.queue.durable=true
emial.server.queue.exclusive=false
emial.server.queue.autoDelete=false

emial.server.routekey=email_route_key
emial.server.exchange.bindingkey=email_route_key


具体接口EmailService服务:
package com.basic.rabbitmq.productor.service;

/**
* 邮件服务
* Created by sdc on 2017/7/5.
*/
public interface EmailService {

/**
* 发送邮件任务存入消息队列
* @param message
* @throws Exception
*/
public void sendEmailForQueue(String message) throws Exception;

}
package com.basic.rabbitmq.productor.service.impl;

import com.basic.rabbitmq.productor.model.SendMessage;
import com.basic.rabbitmq.productor.service.EmailService;
import com.basic.rabbitmq.productor.util.MessageSender;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
* 邮件服务
* Created by sdc on 2017/7/5.
*/
@Service("emailService")
public class EmailServiceImpl implements EmailService {

private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class);

//    @Resource( name = "rabbitTemplate" )
//    private RabbitTemplate rabbitTemplate;

@Value("${emial.server.exchange}")
private String exchange;

@Value("${emial.server.routekey}")
private String routeKey;

@Resource(name = "messageSender")
private MessageSender messageSender;

@Override
public void sendEmailForQueue(String message) throws Exception {
try {
//            rabbitTemplate.convertAndSend(exchange, routeKey, message);
messageSender.handlerMessage(message);
}catch (Exception e){
//            logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e));
e.printStackTrace();
}
}

}


Controller层,写一个接口,往Rabbitmq上发送消息。
package com.basic.rabbitmq.productor.controller;

import com.basic.rabbitmq.productor.constant.WebStatusEnum;
import com.basic.rabbitmq.productor.model.ResponseVo;
import com.basic.rabbitmq.productor.service.EmailService;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

import javax.annotation.Resource;

/**
* 接口测试类
*
* Created by sdc on 2017/7/5.
*/
@RestController
@RequestMapping(value = "/email")
public class EmailController extends BaseController{

@Resource(name = "emailService")
private EmailService emailService;

/**
* 发供邮件服务
* @return
* @throws Exception
*/
@RequestMapping(value="/sendEmail", method = RequestMethod.GET)
public ResponseVo<?> sendEmail() throws Exception {
String emailMessage = "邮件消息";
emailService.sendEmailForQueue(emailMessage);
return generateResponseVo(WebStatusEnum.SUCCESS, "");
}

}


EmailController的父类,抽出来,用于别的controller类集成。
package com.basic.rabbitmq.productor.controller;

import com.basic.rabbitmq.productor.constant.WebStatusEnum;
import com.basic.rabbitmq.productor.model.ResponseVo;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

/**
* 提取出来一个类,用来作为controller层的父类,统一的返回格式。
*
* Created by sdc on 2017/7/6.
*/
public class BaseController {
/**
* 生成统一的返回响应对象
*
* @param webStatusEnum 状态码枚举
* @param data 数据对象
* @param <T> 数据对象类型参数
* @return
*/
public <T> ResponseVo generateResponseVo(WebStatusEnum webStatusEnum, T data) {
return new ResponseVo(webStatusEnum.getCode(), webStatusEnum.getDesc(), data);
}

/**
* 获取当前会话
*
* @param request 请求
* @return httpSession
*/
public HttpSession getCurrentSession(HttpServletRequest request) {
return request.getSession();
}

}


常量类,用于返回给页面展示的公共类:
package com.basic.rabbitmq.productor.model;

import java.io.Serializable;

/**
* 定义统一的返回格式,用于和前端一起沟通
*
* Created by sdc on 2017/7/7.
*/
public class ResponseVo<T> implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 状态码
*/
private String code;

/**
* 状态码对应的信息
*/
private String message;

/**
* 数据对象
*/
private T data;

public ResponseVo(String code, String message, T data) {
super();
this.code = code;
this.message = message;
this.data = data;
}

public String getCode() {
return code;
}

public void setCode(String code) {
this.code = code;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

public T getData() {
return data;
}

public void setData(T data) {
this.data = data;
}
}
package com.basic.rabbitmq.productor.constant;

/**
* Created by sdc on 2017/7/7.
*/
public enum WebStatusEnum {

/**
* 定义接口返回状态码
*
* 通用部分范围 5000 +
* 业务使用范围 2000 至 4000
*/

SUCCESS("5000", "成功"),
FAILED("7000", "失败"),

PARAM_ERROR("7001", "参数错误"),
PARAM_NOT_NULL("7002", "参数不能为空");

/**
* 系统码
*/
private String code;

/**
* 描述
*/
private String desc;

WebStatusEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}

public static WebStatusEnum getWebStatusEnumByKey(String key){
for(WebStatusEnum bt : values()){
if(bt.getCode().equals(key) )
return bt;
}
return null;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

public String getCode() {
return code;
}

public void setCode(String code) {
this.code = code;
}
}


最后pom.xml文件,用于下载jar包的。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent>
<artifactId>email-server</artifactId>
<groupId>com.basic.email</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>email-server-productor</artifactId>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<fastjson.version>1.2.6</fastjson.version>
<commons-lang3.version>3.4</commons-lang3.version>
<guava.version>18.0</guava.version>
<javax.mail.version>1.4.7</javax.mail.version>
<log4j.version>1.2.17</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>

<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>${javax.mail.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.4</version>
</dependency>

<!-- log -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>

<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>

<build>
<finalName>basic-model</finalName>
<plugins>
<!-- 编译插件 -->
<!--编译插件设置-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<skip>true</skip>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>


项目启动成功后,输入http://localhost:8080/email/sendEmail,
这个接口,每当成功一次就会在rabbitmq的管理页面上,看见队列里多一条信息。





最后就是成功了,用到的一些概念请自己查找,记忆会深刻些。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RabbitMQ SpringBoot