RabbitMQ学习(四).NET Client之Routing

转载请注明出处:jiq?钦‘s technical Blog

Routing(路由)

(using the .NET client)

上一节教程我们已经构建好了一个简单的日志系统,能够广播日志消息到多个receiver。

这一节我们将给这个日志系统加一个功能,即我们只订阅这些消息中的一个子集,而不是全部。

例如,前面我们是将订阅到的所有日志消息要么存储到磁盘,要么打印到控制台屏幕,现在我们要实现的是只将关键性的错误日志消息存储到磁盘上的日志文件,但是仍然能够打印所有的日志消息到控制台。

Bindings(绑定)

前面的例子中已经创建了一个绑定,代码类似这样:

channel.QueueBind(queueName, "direct_logs", "");

一个绑定是将一个交换机和一个队列关联起来,可以理解为: 某个特定的队列是对 某个特定的交换机的消息感兴趣。

绑定可以接受 routingKey 参数,为了避免和 BasicPublish 函数的routingKey 参数混淆,我们称之为 binding key. 下面展示了如何用一个key来创建一个绑定(bingding):

channel.QueueBind(queueName, "direct_logs", "black");

binding key的含义取决于交换类型(exchange type).前面用到的分列交换机(fanout exchanges)简单地忽略了这个值。

Direct exchange(direct交换机)

前面介绍的日志系统会吧所有消息都广播给所有的consumer,这里我们想要拓展这个系统,根据消息的严重程度来对消息进行过滤。例如我们可能只想将一些关键性错误的日志消息存储到磁盘,而不想把一些info日志也存下来浪费磁盘空间。

之前我们使用的交换机是fanout,没有什么灵活性,只会无脑地广播所有消息。

现在我们使用direct交换机,这个交换机可以很简便地实现消息的路由算法,一个消息只会发送到这样的一个队列,即这个队列的binding key和消息的routing key完全匹配。

以下图为例进行说明:

我们可以看到 direct 交换机 X 绑定到了两个队列,交换机 X 到第一个队列的binding key是 orange, 交换机 X 到第二个队列有两个bindings, 其中一个的 binding key 是 black,另一个的是 green.

在这样的一个设置之下,一个具备routing key为 orange 的消息将会被路由到队列 Q1. 具备 routing key 为 black 或者 green 的消息将会被路由到队列 Q2. 而交换机 X 接收到的其他所有消息都会被抛弃掉。

Multiple bindings(多个绑定)

一个交换机可以设置与多个队列的 binding key 为同一个. 

基于上面的设置,你可以再增加一个绑定,用 black 作为binding key来绑定交换机 X 和 队列 Q1,这个时候这个 direct 交换机就看起来跟 fanout 交换机看起来很像了,它会将消息广播给所有匹配的队列,因为一个具备 routing key 为 black的消息会被路由到 Q1和Q2两个队列。

Emitting logs(日志消息发送)

现在我们的日志系统将用 direct 交换机来代替 fanout 交换机,我们将日志的严重级别作为日志消息的 routing key,这样接收程序就能够选择性地接收自己想要接收的特定严重级别的日志消息,首先看发出日志消息的代码.

首先创建一个 direct 交换机:

channel.ExchangeDeclare("direct_logs", "direct");

然后准备发送消息到这个交换机:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("direct_logs", severity, null, body);

简单来看我们可以假设 ‘severity‘ 是 ‘info‘, ‘warning‘, ‘error‘ 中的一个.

Subscribing(日志消息接收)

接收消息的代码和前面的类似,但是我们首先要创建好一个临时队列到 direct 交换机的绑定,同时需要指定 binding key。

var queueName = channel.QueueDeclare().QueueName;

foreach (var severity in args)
{
    channel.QueueBind(queueName, "direct_logs", severity);
}

Putting it all together(代码总览)

The code for EmitLogDirect.cs class:

class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("direct_logs", "direct");

                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                                ? string.Join(" ", args.Skip(1).ToArray())
                                : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("direct_logs", severity, null, body);
                Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", severity, message);
            }
        }
    }
}

The code for ReceiveLogsDirect.cs:

class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("direct_logs", "direct");
                var queueName = channel.QueueDeclare().QueueName;

                if (args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                            Environment.GetCommandLineArgs()[0]);
                    Environment.ExitCode = 1;
                    return;
                }

                foreach (var severity in args)
                {
                    channel.QueueBind(queueName, "direct_logs", severity);
                }

                Console.WriteLine(" [*] Waiting for messages. " +
                                  "To exit press CTRL+C");

                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);

                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘",
                                      routingKey, message);
                }
            }
        }
    }
}

像之前那样编译代码 (参考 tutorial one ).

假如你只想保存 ‘warning‘ 和 ‘error‘日志消息 (and not ‘info‘) 到文件,那么这样运行接收程序:

$ ReceiveLogsDirect.exe warning error > logs_from_rabbit.log

假如你想要打印所有类型的日志消息到屏幕,这样运行接收程序:

$ ReceiveLogsDirect.exe info warning error
 [*] Waiting for logs. To exit press CTRL+C

另外,发送error类型的日志消息可以这样运行发送程序:

$ EmitLogDirect.exe error "Run. Run. Or it will explode."
 [x] Sent ‘error‘:‘Run. Run. Or it will explode.‘

(Full source code for (EmitLogDirect.cs source) and (ReceiveLogsDirect.cs source))

Move on to tutorial 5 to find out how to listen for messages based on a pattern.

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。