RabbitMq消息队列配置
2017-08-27 15:49
513 查看
安装ERLang
下载RabbitMq
启动RabbitMq rabbitmq-server.bat start
访问rabbitMq界面 默认账号和密码都是一样的guest
下载RabbitMq
启动RabbitMq rabbitmq-server.bat start
访问rabbitMq界面 默认账号和密码都是一样的guest
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { private static Connection conn=null; public ConnectionUtil(){ } public static synchronized Connection getConnection() throws IOException, TimeoutException{ if(conn==null){ ConnectionFactory factory = new ConnectionFactory(); //hostname of your rabbitmq server factory.setHost("localhost"); //getting a connection conn = factory.newConnection(); return conn; } return conn; } }
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException, TimeoutException{ this.endPointName = endpointName; //Create a connection factory connection = ConnectionUtil.getConnection(); //creating a channel channel = connection.createChannel(); channel.basicQos(1); //declaring a queue for this channel. If queue does not exist, //it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * @throws IOException * @throws TimeoutException */ public void close() throws IOException, TimeoutException{ this.channel.close(); this.connection.close(); } }
import java.io.IOException; import java.io.Serializable; import java.util.concurrent.TimeoutException; import org.springframework.util.SerializationUtils; public class Producer extends EndPoint { public Producer(String endpointName) throws IOException, TimeoutException { //this.endPointName=endpointName; super(endpointName); // TODO Auto-generated constructor stub } private static Producer pro=null; public static Producer getInstance(String endpointName) throws IOException, TimeoutException{ if(pro==null){ pro=new Producer(endpointName); return pro; } return pro; } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } }
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import org.springframework.util.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; public class QueueConsumer extends EndPoint implements Runnable, Consumer{ public QueueConsumer(String endPointName) throws IOException, TimeoutException{ super(endPointName); } public void run() { try { //start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true,this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer "+consumerTag +" registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("message number") + " received."); } public void handleCancel(String consumerTag) {} public void handleCancelOk(String consumerTag) {} public void handleRecoverOk(String consumerTag) {} public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {} }
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.TimeoutException; public class Java { public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub Producer producer = Producer.getInstance("queue"); for (int i = 0; i < 100; i++) { HashMap message = new HashMap(); ArrayList list=new ArrayList(); list.add("kjjklkj"+i); message.put("message number", list); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
import java.io.IOException; import java.util.concurrent.TimeoutException; public class Java01 { public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub QueueConsumer consumer = new QueueConsumer("queue"); QueueConsumer2 consumer2 = new QueueConsumer2("queue"); //QueueConsumer consumer1 = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); Thread consumerThread1 = new Thread(consumer2); consumerThread.start(); consumerThread1.start(); } }
相关文章推荐
- RabbitMQ消息队列安装和配置以及推送消息
- (四)RabbitMQ消息队列-服务详细配置与日常监控管理
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- 在C#中使用消息队列RabbitMQ -摘自网络(包括RabbitMQ的配置)
- RabbitMQ消息队列之一:RabbitMQ的环境安装及配置
- RabbitMQ 消息队列 配置
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- (四)RabbitMQ消息队列-服务详细配置与日常监控管理
- rabbitmq队列中消息过期配置
- RabbitMQ消息队列之一:RabbitMQ的环境安装及配置
- linux下安装配置rabbitmq(消息队列系统)
- 柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装、配置与监控
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- 柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装、配置与监控
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- (四)RabbitMQ消息队列-服务详细配置与日常监控管理
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- 消息队列_RabbitMQ-0003.深入RabbitMQ节点/配置/管理及日志实时化?
- 通过haproxy 配置rabbitmq消息队列主从