基于Netty与RabbitMQ的消息服务
Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架。
前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下:
1、设备TCP消息解析:
NettyMQServer和采集设备Device之间采用TCP通信,TCP消息的消息的解析可以使用LengthFieldBasedFrameDecoder(消息头和消息体),可以有效的解决TCP消息“粘包”问题。
2、给设备发消息:
首先在连接创建时,要保留TCP的连接:
static final ChannelGroup channels = new DefaultChannelGroup( GlobalEventExecutor.INSTANCE); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // A closed channel will be removed from ChannelGroup automatically channels.add(ctx.channel()); }
当需要给某个设备发消息的时候,可以遍历该ChannelGroup,找到对应的Channel,给该Channel发送消息:
for (io.netty.channel.Channel c : EchoServerHandler.channels) { ByteBuf msg = Unpooled.copiedBuffer(message.getBytes()); c.writeAndFlush(msg); }
3、心跳检测
当某个设备Device由于断电或是其他原因导致设备不正常无法采集数据,Netty服务端需要知道该设备是否在正常工作,可以使用Netty的IdleStateHandler,示例代码如下:
ch.pipeline().addLast(new IdleStateHandler(3*60,0,0)); ch.pipeline().addLast(new HeartBeatHandler()); /** * Handler implementation for heart beating. */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter{ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { // Read timeout System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress()); //ctx.disconnect(); //Channel disconnect } } } }
4、RabbitMQ消息接收与发送
NettyMQServer消息发送采用了Spring AMQP,只需要在配置文件中简单配置一下,就可以方便使用。
NettyMQServer消息接收同样可以采用Spring AMQP,但由于对Spring相关的配置不是很熟悉,为了更好的使用MQ,这里使用了RabbitMQ Client Java API来实现:
Connection connection = connnectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, "direct", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routeKey); // process the message one by one channel.basicQos(1); QueueingConsumer queueingConsumer = new QueueingConsumer( channel); // auto-ack is false channel.basicConsume(queueName, false, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery = queueingConsumer .nextDelivery(); String message = new String(delivery.getBody()); log.debug("Mq Receiver get message"); // Send the message to all connected clients // If you want to send to a specified client, just add // your own logic and ack manually // Be aware that ChannelGroup is thread safe log.info(String.format("Conneted client number: %d", EchoServerHandler.channels.size())); for (io.netty.channel.Channel c : EchoServerHandler.channels) { ByteBuf msg = Unpooled.copiedBuffer(message .getBytes()); c.writeAndFlush(msg); } // manually ack to MQ server the message is consumed. channel.basicAck(delivery.getEnvelope() .getDeliveryTag(), false);
RabbitMQ的使用方式:http://www.cnblogs.com/luxiaoxun/p/3918054.html
源代码:https://github.com/luxiaoxun/NettyMqServer
参考:
http://netty.io/
http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html
http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。