Netty 使用

最近研究Android Socket通讯,遇到Socket write timeout写超时现象,具体是:

OutputStream outStream = mSocket.getOutputStream();

outStream.write(data); //卡在这一行;

outStream.flush();

查了很多资料,大概清楚一些,flush()方法作用是“Flushes this output stream and forces any buffered output bytes to be written out.”,强制刷新输出流,并将缓存数据写出去。但是这个方法无法保证数据发送成功,所以,当flush成功,但是缓存区被写满之后,write方法就无法将data继续写入,从而到处卡住,直到缓存区的数据发出去,苦于无法解决这个问题,尝试去使用java的通讯框架。

Netty和Apache Mina大概是Java世界非常知名的通讯框架。它们出自同一个作者,Mina诞生略早,现在属于Apache基金会,而Netty开始在Jboss名下,3.0之前叫JBoss Netty,后来出来自立门户netty.io。关于Mina已有@FrankHui的Mina系列文章,我正好最近也要做一些网络方面的开发,就研究一下Netty的源码,顺便分享出来了。

Netty目前有三个分支:5.x、4.x和3.x。新的分支重写了很多东西,并对项目进行了分包,规模比较庞大,入手会困难一些,而3.x版本则已经被广泛使用。本系列文章针对netty 3.10.1 final。

Getting Started

我推荐大家从官方的UserGuide开始阅读 http://netty.io/3.8/guide/#preface.2,或者可以直接阅读源码,Java世界的框架普遍比较庞大,但是功能覆盖全面,个人爱好。

理解Netty的关键点在哪呢?我觉得,除了NIO的相关知识,另一个就是事件驱动的设计思想。什么叫事件驱动?我们回头看看EchoServerHandler的代码,其中的参数:public void messageReceived(ChannelHandlerContext ctx, MessageEvent e),MessageEvent就是一个事件。这个事件携带了一些信息,例如这里e.getMessage()就是消息的内容,而EchoServerHandler则描述了处理这种事件的方式。一旦某个事件触发,相应的Handler则会被调用,并进行处理。这种事件机制在UI编程里广泛应用,而Netty则将其应用到了网络编程领域。

在Netty里,所有事件都来自ChannelEvent接口,这些事件涵盖监听端口、建立连接、读写数据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler,这样,不但是业务逻辑,连网络通讯流程中底层的处理,都可以通过实现ChannelHandler来完成了。事实上,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成的。当弄明白其中的奥妙时,不得不佩服这种设计!

Netty的包结构如下:

org
└── jboss
    └── netty
        ├── bootstrap 配置并启动服务的类
        ├── buffer 缓冲相关类,对NIO Buffer做了一些封装
        ├── channel 核心部分,处理连接
        ├── container 连接其他容器的代码
        ├── example 使用示例
        ├── handler 基于handler的扩展部分,实现协议编解码等附加功能
        ├── logging 日志
        └── util 工具类

图中可以看到,除了之前说到的事件驱动机制之外,Netty的核心功能还包括两部分:

  • Zero-Copy-Capable Rich Byte Buffer

    零拷贝的Buffer。为什么叫零拷贝?因为在数据传输时,最终处理的数据会需要对单个传输层的报文,进行组合或者拆分。NIO原生的ByteBuffer要做到这件事,需要对ByteBuffer内容进行拷贝,产生新的ByteBuffer,而Netty通过提供Composite(组合)和Slice(切分)两种Buffer来实现零拷贝。这部分代码在org.jboss.netty.buffer包中。

  • Universal Communication API

    统一的通讯API。因为Java的Old I/O和New I/O,使用了互不兼容的API,而Netty则提供了统一的API(org.jboss.netty.channel.Channel)来封装这两种I/O模型。这部分代码在org.jboss.netty.channel包中。

此外,Protocol Support功能通过handler机制实现。

下面开始一个简单的类似helloworld的程序:

服务端:

DiscardServer.java
 1 import java.net.InetSocketAddress;
 2 import java.util.concurrent.Executors;
 3 
 4 import org.jboss.netty.bootstrap.ServerBootstrap;
 5 import org.jboss.netty.channel.ChannelFactory;
 6 import org.jboss.netty.channel.ChannelPipeline;
 7 import org.jboss.netty.channel.ChannelPipelineFactory;
 8 import org.jboss.netty.channel.Channels;
 9 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
10 
11 
12 
13 public class DiscardServer {
14 
15     public static void main(String[] args) throws Exception{
16         // TODO Auto-generated method stub
17         ChannelFactory factory = new NioServerSocketChannelFactory(
18                 Executors.newCachedThreadPool(), 
19                 Executors.newCachedThreadPool());
20         
21         ServerBootstrap bootstrap = new ServerBootstrap(factory);
22         
23         bootstrap.setPipelineFactory(new ChannelPipelineFactory(){
24 
25             @Override
26             public ChannelPipeline getPipeline() {
27                 ChannelPipeline pipeline = Channels.pipeline();
28                 pipeline .addLast("handler", new DiscardServerHandler());
29                 return pipeline;
30             }
31         });
32         
33         bootstrap.setOption("child.tcpNoDelay", true);
34         bootstrap.setOption("child.keepAlive", true);
35         
36         bootstrap.bind(new InetSocketAddress(18081));
37     }
38 
39 }
DiscardServerHandler.java
 1 import org.jboss.netty.buffer.ChannelBuffer;
 2 import org.jboss.netty.channel.Channel;
 3 import org.jboss.netty.channel.ChannelHandlerContext;
 4 import org.jboss.netty.channel.ExceptionEvent;
 5 import org.jboss.netty.channel.MessageEvent;
 6 import org.jboss.netty.channel.SimpleChannelHandler;
 7 
 8 
 9 public class DiscardServerHandler extends SimpleChannelHandler {
10     @Override
11     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
12             throws Exception {
13         // TODO Auto-generated method stub
14         
15         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
16         while(buf.readable()) {
17             System.out.println(buf.readByte());
18             System.out.flush();
19         }
20         
21         super.messageReceived(ctx, e);
22     }
23     
24     @Override
25     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
26             throws Exception {
27         // TODO Auto-generated method stub
28         e.getCause().printStackTrace();
29         
30         Channel ch = e.getChannel();
31         ch.close();
32         
33         super.exceptionCaught(ctx, e);
34     }
35 }

客户端:

NettyHandler.java
 1 public class NettyHandler {
 2     private final static String TAG = "NettyHandler";
 3     
 4     ChannelFactory mFactory = null;
 5     private ClientBootstrap mBootstrap = null;
 6     private Channel mChannel = null;
 7     private boolean isConnect = false;
 8     
 9     private String     mHost;
10     private int        mPort;
11     
12     
13     public NettyHandler(String host, int port) {
14         mHost = host;
15         mPort = port;
16     }
17     
18     public boolean isConnected() {
19         
20         return isConnect;
21     }
22     
23     public void connect() {
24         Log.d(TAG, "connect to:" + mHost);
25         mFactory = new NioClientSocketChannelFactory(  
26                 Executors.newCachedThreadPool(),  
27                 Executors.newCachedThreadPool());  
28     
29         mBootstrap = new ClientBootstrap(mFactory);  
30         mBootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
31             public ChannelPipeline getPipeline() {  
32                 ChannelPipeline pipeline = Channels.pipeline();  
33                 pipeline.addLast("handler",new ClientHandler());  
34                 return pipeline;  
35             }  
36         });  
37         mBootstrap.setOption("tcpNoDelay" , true);  
38         mBootstrap.setOption("keepAlive", true);  
39         
40         
41         ChannelFuture future = mBootstrap.connect (new InetSocketAddress(mHost, mPort));
42         future.awaitUninterruptibly();  
43         mChannel = future.awaitUninterruptibly().getChannel();  
44     }
45     
46     public void sendData(String message) {
47         Log.d(TAG, "send data to PC: [" + message + "]");
48         
49         ChannelBuffer buffer = ChannelBuffers.buffer(message.length());
50         buffer.writeBytes(message.getBytes());
51         
52         if (isConnect) {
53             if (mChannel.isConnected() && mChannel.isWritable()) {
54                 mStartTime = System.currentTimeMillis();
55                 mChannel.write(buffer);
56             }
57         }
58     }
59     
60     public class ClientHandler extends SimpleChannelUpstreamHandler  {  
61         @Override  
62         public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {  
63             Log.d(TAG, "Socket is connected.");
64             isConnect = true;
65             
66         }  
67         
68         @Override
69         public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
70             Log.d(TAG, "Socket is disconnected.");
71             isConnect = false;
72         }
73       
74         @Override  
75         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
76             Log.d(TAG, "Socket get message: " + e.getMessage());    
77         }  
78           
79         @Override  
80         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
81             Log.d(TAG, "Socket get exceptionCaught: " + e.getCause().toString());
82             
83             isConnect = false;
84         }  
85         
86         @Override 
87         public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {
88         }
89     } 

实例化 NettyHandler,调用connect与sendData即可发送消息,这只是一个最简单的例子,接下来我们将深入理解netty的结构。

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。