您的位置:首页 > 其它

RabbitMq消息队列配置

2017-08-27 15:49 513 查看
安装ERLang
下载RabbitMq 
启动RabbitMq    rabbitmq-server.bat start  
访问rabbitMq界面  默认账号和密码都是一样的guest


import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

private static Connection conn=null;

public ConnectionUtil(){

}

public static  synchronized Connection getConnection() throws IOException, TimeoutException{

if(conn==null){
ConnectionFactory factory = new ConnectionFactory();

//hostname of your rabbitmq server
factory.setHost("localhost");

//getting a connection
conn = factory.newConnection();

return conn;
}
return conn;

}

}

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EndPoint {

protected Channel channel;
protected Connection connection;
protected String endPointName;

public EndPoint(String endpointName) throws IOException, TimeoutException{
this.endPointName = endpointName;

//Create a connection factory

connection = ConnectionUtil.getConnection();

//creating a channel

channel = connection.createChannel();

channel.basicQos(1);

//declaring a queue for this channel. If queue does not exist,
//it will be created on the server.
channel.queueDeclare(endpointName, false, false, false, null);
}

/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
* @throws IOException
* @throws TimeoutException
*/
public void close() throws IOException, TimeoutException{
this.channel.close();
this.connection.close();
}

}

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.TimeoutException;

import org.springframework.util.SerializationUtils;

public class Producer extends EndPoint {

public Producer(String endpointName) throws IOException, TimeoutException {
//this.endPointName=endpointName;
super(endpointName);
// TODO Auto-generated constructor stub
}

private static Producer pro=null;

public static Producer getInstance(String endpointName) throws IOException, TimeoutException{
if(pro==null){
pro=new Producer(endpointName);
return pro;
}
return pro;
}

public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
}

}

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import org.springframework.util.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

public class QueueConsumer extends EndPoint implements Runnable, Consumer{

public QueueConsumer(String endPointName) throws IOException, TimeoutException{

super(endPointName);
}

public void run() {
try {
//start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true,this);
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* Called when consumer is registered.
*/
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
}

/**
* Called when new message is available.
*/
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");

}

public void handleCancel(String consumerTag) {}
public void handleCancelOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}

}

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class Java {

public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub

Producer producer = Producer.getInstance("queue");

for (int i = 0; i < 100; i++) {
HashMap message = new HashMap();
ArrayList list=new ArrayList();
list.add("kjjklkj"+i);
message.put("message number", list);
producer.sendMessage(message);

System.out.println("Message Number "+ i +" sent.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Java01 {

public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub
QueueConsumer consumer = new QueueConsumer("queue");
QueueConsumer2 consumer2 = new QueueConsumer2("queue");

//QueueConsumer consumer1 = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
Thread consumerThread1 = new Thread(consumer2);
consumerThread.start();
consumerThread1.start();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: