基于Netty与RabbitMQ的消息服务

Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架。

前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下:

技术分享

1、设备TCP消息解析:

NettyMQServer和采集设备Device之间采用TCP通信,TCP消息的消息的解析可以使用LengthFieldBasedFrameDecoder(消息头和消息体),可以有效的解决TCP消息“粘包”问题。

2、给设备发消息:

首先在连接创建时,要保留TCP的连接:

static final ChannelGroup channels = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // A closed channel will be removed from ChannelGroup automatically
        channels.add(ctx.channel());
    }

当需要给某个设备发消息的时候,可以遍历该ChannelGroup,找到对应的Channel,给该Channel发送消息:

for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
                            c.writeAndFlush(msg);
                        }

3、心跳检测

当某个设备Device由于断电或是其他原因导致设备不正常无法采集数据,Netty服务端需要知道该设备是否在正常工作,可以使用Netty的IdleStateHandler,示例代码如下:

ch.pipeline().addLast(new IdleStateHandler(3*60,0,0));
ch.pipeline().addLast(new HeartBeatHandler());

/**
 * Handler implementation for heart beating.
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                // Read timeout
                System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress());
                //ctx.disconnect(); //Channel disconnect
            }
        }
    }
}

4、RabbitMQ消息接收与发送

NettyMQServer消息发送采用了Spring AMQP,只需要在配置文件中简单配置一下,就可以方便使用。

NettyMQServer消息接收同样可以采用Spring AMQP,但由于对Spring相关的配置不是很熟悉,为了更好的使用MQ,这里使用了RabbitMQ Client Java API来实现:

                    Connection connection = connnectionFactory.newConnection();
                    Channel channel = connection.createChannel();
                    channel.exchangeDeclare(exchangeName, "direct", true,
                            false, null);
                    channel.queueDeclare(queueName, true, false, false, null);
                    channel.queueBind(queueName, exchangeName, routeKey);

                    // process the message one by one
                    channel.basicQos(1);

                    QueueingConsumer queueingConsumer = new QueueingConsumer(
                            channel);
                    // auto-ack is false
                    channel.basicConsume(queueName, false, queueingConsumer);
                    while (true) {
                        QueueingConsumer.Delivery delivery = queueingConsumer
                                .nextDelivery();
                        String message = new String(delivery.getBody());

                        log.debug("Mq Receiver get message");
                        // Send the message to all connected clients
                        // If you want to send to a specified client, just add
                        // your own logic and ack manually
                        // Be aware that ChannelGroup is thread safe
                        log.info(String.format("Conneted client number: %d",
                                EchoServerHandler.channels.size()));
                        for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                            ByteBuf msg = Unpooled.copiedBuffer(message
                                    .getBytes());
                            c.writeAndFlush(msg);
                        }
                        // manually ack to MQ server the message is consumed.
                        channel.basicAck(delivery.getEnvelope()
                                .getDeliveryTag(), false);

RabbitMQ的使用方式:http://www.cnblogs.com/luxiaoxun/p/3918054.html

 

源代码:https://github.com/luxiaoxun/NettyMqServer

 

参考:

http://netty.io/

http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。