C# 使用RabbitMQ消息队列

时间:2020-10-22 22:52:25   收藏:0   阅读:43

参考文章 https://www.cnblogs.com/kiba/p/11703073.htmlhttps://www.cnblogs.com/longlongogo/p/6489574.html

经过代码检测两份文章的代码部分不适用,只参考了RabbitMQ的介绍和安装部分。

安装版本是

rabbitmq-server-3.8.9.exe  地址 https://www.rabbitmq.com/install-windows.html#installer

otp_win64_23.1.exe

废话不多说,直入主题,用的是控制台程序,其他平台类似,安装包是RabbitMQ.Client 6.2.1版本

技术图片

代码如下:

一、RabbitMQ消息队列发送端的代码

 1 using RabbitMQ.Client;
 2 using System;
 3 using System.Collections.Generic;
 4 using System.Linq;
 5 using System.Text;
 6 using System.Threading;
 7 using System.Threading.Tasks;
 8 
 9 namespace MQDemo
10 {
11     class Program
12     {
13         static void Main(string[] args)
14         {
15             for (int i = 1; i < 100; i++)
16             {
17                 bool isTrue = SendMsg("first RabbitMQ message"+i, "firstQueue");
18                 string msg = isTrue ? "发送成功" : "发送失败";
19                 Console.WriteLine(msg+i);
20                 Thread.Sleep(500);
21             }
22             Console.ReadKey();
23           
24         }
25 
26 
27         /// <summary>
28         /// RabbitMQ发送消息
29         /// </summary>
30         /// <param name="jsonstr">具体json格式的字符串</param>
31         /// <param name="queuqname">具体入队的队列名称</param>
32         /// <returns></returns>
33         public static bool SendMsg(string jsonstr, string queuqname)
34         {
35             try
36             {
37                 //1.实例化连接工厂
38                 var factory = new ConnectionFactory();
39                  factory.HostName = "localhost";
40             factory.UserName = "guest";
41             factory.Password = "guest";
42                 factory.AutomaticRecoveryEnabled = true;////设置端口后自动恢复连接属性
43                 //2. 建立连接
44                 var connection = factory.CreateConnection();
45                 //3. 创建信道
46                 var channel = connection.CreateModel();
47                 try
48                 {
49                     var queue_name = queuqname;//具体入队的队列名称
50                     bool durable = true;//队列是否持久化
51                     bool exclusive = false;
52                     //设置 autoDeleted=true 的队列,当没有消费者之后,队列会自动被删除
53                     bool autoDelete = false;
54                     //4. 申明队列
55                     channel.QueueDeclare(queue_name, durable, exclusive, autoDelete, null);
56 
57                     //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
58                     var properties = channel.CreateBasicProperties();
59                     properties.Persistent = true; //持久化的消息
60 
61                     string message = jsonstr; //传递的消息内容
62                     var body = Encoding.UTF8.GetBytes(message);
63 
64                     var exchange_name = "";
65                     var routingKey = queue_name;//routingKey=queue_name,则为对应队列接收=queue_name
66 
67                     channel.BasicPublish(exchange_name, routingKey, properties, body); //开始传递(指定basicProperties) 
68 
69                     return true;
70                 }
71                 catch (Exception ex)
72                 {
73                     Console.WriteLine("RabbitMQ 发送数据异常:" + ex.Message);
74                     //PubTool.ConnError("RabbitMQ", "RunLog", "发送数据异常:" + ex.Message);
75                 }
76                 finally
77                 {
78                     connection.Close();
79                     channel.Close();
80                 }
81             }
82             catch (Exception ex)
83             {
84                 Console.WriteLine("RabbitMQ 外层调用发送方法,发生异常:" + ex.Message);
85                 //PubTool.ConnError("RabbitMQ", "RunLog", "外层调用发送方法,发生异常:" + ex.Message);
86             }
87             return false;
88         }
89     }
90 }

运行结果如下:

技术图片

一、RabbitMQ消息队列接收端的代码:

  1 using RabbitMQ.Client;
  2 using RabbitMQ.Client.Events;
  3 using System;
  4 using System.Collections.Generic;
  5 using System.IO;
  6 using System.Linq;
  7 using System.Text;
  8 using System.Threading;
  9 using System.Threading.Tasks;
 10 
 11 namespace RabbitMQReceived
 12 {
 13     class Program
 14     {
 15         static void Main(string[] args)
 16         {
 17             string queuqname = "firstQueue";
 18             ushort limitnum = 3;
 19 
 20             try
 21             {
 22                 #region 构建消息队列
 23                 //1.实例化连接工厂
 24                 var factory = new RabbitMQ.Client.ConnectionFactory();
 25                 factory.HostName = "localhost";
 26                 factory.UserName = "guest";
 27                 factory.Password = "guest";
 28                 factory.AutomaticRecoveryEnabled = true;
 29                 //2. 建立连接 
 30                 var connection = factory.CreateConnection();
 31                 //3. 创建信道
 32                 var channel = connection.CreateModel();
 33 
 34                 var queue_name = queuqname;//项目下游上传的队列信息
 35                 bool durable = true;//队列是否持久化
 36                 bool exclusive = false;
 37                 //设置 autoDeleted=true 的队列,当没有消费者之后,队列会自动被删除
 38                 bool autoDelete = false;
 39                 //4. 申明队列
 40                 channel.QueueDeclare(queue_name, durable, exclusive, autoDelete, null);
 41                 //5. 构造消费者实例
 42                 var consumer = new RabbitMQ.Client.Events.EventingBasicConsumer(channel);
 43                 bool autoAck = false;
 44                 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 45                 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认 
 46                 //6. 绑定消息接收后的事件委托
 47 
 48                 //8. 启动消费者
 49                 //设置prefetchCount : 3 来告知RabbitMQ,在未收到消费端的N条消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
 50                 channel.BasicQos(0, limitnum, false);
 51 
 52                 channel.BasicConsume(queue_name, autoAck, consumer);
 53 
 54                 #endregion
 55 
 56                 #region 队列-接收消息的处理方法
 57 
 58                 consumer.Received += (model, ea) =>
 59                 {
 60                     try
 61                     {
 62                         //var body = ea.Body.ToArray();
 63                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
 64                         //获取消息后进行操作,do something
 65                         bool flag = false;
 66                         if (!string.IsNullOrEmpty(message))
 67                         {
 68                             try
 69                             {
 70                                 //做其他存储或处理操作
 71                                 //File.WriteAllText(@"C:\Users\Administrator\Desktop\333.txt", message, Encoding.UTF8);
 72                                 Console.WriteLine("接收消息:" + message);
 73                                 flag = true;
 74 
 75 
 76                             }
 77                             catch (Exception ex)
 78                             {
 79                             }
 80                         }
 81                         else
 82                         {
 83                             flag = true;
 84                         }
 85                         if (flag)
 86                         {
 87                             //操作完毕,则手动确认消息可删除
 88                             // 7. 发送消息确认信号(手动消息确认)
 89                             channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 90                         }
 91                     }
 92                     catch (Exception ex)
 93                     {
 94                     }
 95                 };
 96                 #endregion
 97 
 98             }
 99             catch (Exception ex)
100             { 
101             
102             }
103             //finally
104             //{
105             //    connection.Close();//不能关,关了就停止接收消息了
106             //    channel.Close();
107             //}
108            
109 
110             Console.ReadKey();
111         }
112 
113         static void Methed(object model, BasicDeliverEventArgs ea, IModel channel)
114         {
115             try
116             {
117                 //var body = ea.Body.ToArray();
118                 var message = Encoding.UTF8.GetString(ea.Body.ToArray());
119                 //获取消息后进行操作,do something
120                 bool flag = false;
121                 if (!string.IsNullOrEmpty(message))
122                 {
123                     try
124                     {
125                         //做其他存储或处理操作
126                         File.WriteAllText(@"C:\Users\Administrator\Desktop\333.txt", message, Encoding.UTF8);
127                         Console.WriteLine("ok :" + message);
128                         flag = true;
129 
130 
131                     }
132                     catch (Exception ex)
133                     {
134                     }
135                 }
136                 else
137                 {
138                     flag = true;
139                 }
140                 if (flag)
141                 {
142                     //操作完毕,则手动确认消息可删除
143                     // 7. 发送消息确认信号(手动消息确认)
144                     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
145                 }
146             }
147             catch (Exception ex)
148             {
149             }
150         }
151        
152     }
153 
154      
155 }

 

运行结果如下:

技术图片

 

下面是安装教程及介绍

关于消息队列

其实消息队列没有那么神秘,我们这样想一下,用户访问网站,最终是要将数据以HTTP的协议的方式,通过网络传输到主机的某个端口上的。

那么,接收数据的方式是什么呢?自然是端口监听啦。

那消息队列是什么就很好解释了?

它就是端口监听,接到数据后,将数据排列起来。

那这件事,我们不用中间件能做吗?

当然能做啦,写个TCP/UDP/Socket的软件就可以做啦。

举个简单的例子,如下图:

技术图片

既然自己可以做消息队列,那为什么要用RabbitMQ?

因为,RabbitMQ成熟的开源中间件,可靠性有保证,bug少,性能也非常好。

而C#代码默认是使用托管内存的,所以,想写出媲美RabbitMQ性能的消息队列,就必须离开我们常用的托管内存,使用非托管内存,但这个代价就太大了;而且最终能否达到RabbitMQ的性能水平还是个未知数。

还有就是RabbitMQ除了基础的消息队列管理,还有很多很强大的额外功能,而自己开发消息队列,很难如此尽善尽美。

----------------------------------------------------------------------------------------------------

我们还会发现,在消息队列里有很多概念,什么消息总线啊,什么工作队列啊等等。

要怎么理解这些概念呢?

很简单,不要去理解。这些概念其实是人家代码架构的模式,不要去理解他们,【记】就完了,人家的中间件就是按照这个模式工作的。

比如,我写了一个接收消息的总控制器,然后我为他命名为总线,那这个控制器就是总线,没有理由,这就是定义。

准备工作

首先,我们访问官网【https://www.rabbitmq.com/】,点击Get Started。

然后,网站会自动跳转到当前首页Get Started的锚点位置,如下图:

技术图片

Get Started锚点:

技术图片

然后我们点击DownLoad+Installation,进入到下载界面。

在下载页面中,我们找到安装指南,然后在点击官网推荐的Windows系统的安装包,如下图:

技术图片

现在,我们进入了Windows安装指南界面了。

首先,我们看一下预览信息,如下图:

技术图片

在预览里,我们得知,安装RabbitMQ有两种方法,一种是使用Chocolatey安装,一种是使用官方安装包安装。

Chocolatey是什么呢?随手百度一下,原来他是一个软件包管理工具,也就是说,Chocolatey是类似于Nuget的一种工具。

由于Chocolatey的使用,我不是很熟悉,所以,这里选择使用官方安装包安装。

点击【Using the official installer】,我们进入了【Using the official installer】对应的锚点,如下图。

 技术图片

在【Using the official installer】段落里找到有推荐标志的安装包,然后下载。 

下载完成后,我们可以得到这样一个安装包,如下图:

技术图片

除了下载安装包,我们还会发现,在【Using the official installer】段落里,有提醒我们,RabbitMQ是有依赖的,依赖一个Erlang语言的框架(类似于C#语言的NetFramework)。

我们可以发现,在依赖的段落里,官网非常坑的给出了三个链接网址,如下:

supported version of Erlang:https://www.rabbitmq.com/which-erlang.html

Windows installer:https://www.erlang.org/downloads

Erlang Solutions:https://www.erlang-solutions.com/resources/download.html

因为,我们是无法通过文字描述来判断,哪一个是真的依赖框架的下载地址,所以只好每个都点击进去看看。。。

打开网址后发现,在后两个网址中都可以找到框架下载地址,但第二个地址明显更友好一点,所以我们在第二个网址内下载Erlang的框架。

技术图片

 下载完成得到如下图文件:

技术图片

 PS:这里下载的是OTP的22.1的版本,我的理解是Erlang等于C#语言,而OTP等于NetFramework。

安装Erlang\OTP

首先,我们运行otp_win64_22.1.exe,安装依赖框架Erlang\OTP。

安装完成后,设置环境变量如下:

技术图片

然后运行CMD,输入erl,测试安装是否成功,如下图:

技术图片

安装成功。

安装rabbitmq-server

安装完依赖后,我们接着安装rabbitmq-server-3.8.0.exe。

【rabbitmq-server-3.8.0.exe】?从这个文件名上,我们发现了一个问题,那就是,我们即将安装的RabbitMQ,是一个服务端啊。

什么?服务端?难道还有客户端???

其实这也很好理解,想一下最开始我举的那个例子,消息队列是需要一个监听端口的服务端的,然后客户端向这个服务端发送请求。

这样是不是就很好的理解RabbitMQ了呢:)

----------------------------------------------------------------------------------------------------

安装完RabbitMQ服务端后,我们还是启动CMD,用命令行来查看下安装状态。

首先输入下面的命令,将路径定位到RabbitMQ的路径下:

【CD /D C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.0\sbin】

技术图片

然后输入rabbitmqctl status查看状态。

技术图片

启动管理工具的命令行:rabbitmq-plugins enable rabbitmq_management。

技术图片

启动成功后,在浏览器输入地址http://127.0.0.1:15672/,进入管理页面,账户密码都是guest。

技术图片

RabbitMQ还有很多常用命令,大家可以自行百度。

----------------------------------------------------------------------------------------------------

到此,RabbitMQ服务端的环境配置好了,正常情况,这些配置应该在服务器进行,但我为了测试方便,就把服务端也安装在本机了,因此我下面调用RabbitMQ时,连接的主机IP都是localhost。

 

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!