您的位置:首页 > 其它

RabbitMQ随手笔记(三)RabbitMQ-“Hello World” 之消费者(.netCore2.0)

2018-03-10 10:48 267 查看
消费者代码主要包含以下几方面:
01.创建factory

02.创建连接

03.创建channel

04.创建消费者

05.回收资源

消费者代码:using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace rabbitMQConsum
{
public static class SimpleConsum
{
private const string QUEUE_NAME = "queue_demo";
private const string IP_ADDRESS = "127.0.0.1";
private const int PORT = 5672;//RabbitMQ 服务端默认端口5672;
private const string USER_NAME = "guest";
private const string PASSWORD = "guest";

public static void Consumer()
{
try
{
//01.创建factory
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = USER_NAME;
factory.Password = PASSWORD;
//02.创建连接
IConnection con = factory.CreateConnection();
//03.创建channel
IModel channel = con.CreateModel();
//创建一个持久的、非排他的、非自动删除的队列
channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
//队列最大接收未被ack的消息的个数
channel.BasicQos(64, 1000, true);
//04.创建消费者-监听方式
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
Run(body);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(QUEUE_NAME, false, consumer);
//05.回收资源
channel.Close();
con.Close();
}
catch (Exception ex)
{
throw;
}

}

private static void Run(byte[] body)
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
}

简单异常处理后代码:using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace rabbitMQConsum
{
public static class SimpleConsum
{
private const string QUEUE_NAME = "queue_demo";
private const string IP_ADDRESS = "127.0.0.1";
private const int PORT = 5672;//RabbitMQ 服务端默认端口5672;
private const string USER_NAME = "guest";
private const string PASSWORD = "guest";

public static void Consumer()
{
IConnection con = null;
IModel channel = null;
try
{
//01.创建factory
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = USER_NAME;
factory.Password = PASSWORD;
//02.创建连接
con = factory.CreateConnection();
//03.创建channel
channel = con.CreateModel();
//创建一个持久的、非排他的、非自动删除的队列
channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
//队列最大接收未被ack的消息的个数
channel.BasicQos(64, 1000, true);
//04.创建消费者-监听方式
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
Run(body);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(QUEUE_NAME, false, consumer);
}
catch (IOException ioE)
{
throw;
}
catch (SocketException socketEx)//RabbitMQ 用TCP协议,这里除了socket异常
{
throw;
}
catch (Exception ex)
{
throw;
}
finally
{
//05.关闭资源
if (channel != null)
channel.Close();
if (con != null)
con.Close();
}

}

private static void Run(byte[] body)
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息