Netty5入门(3)

一、示例介绍

示例取自《基于Netty5.0高级案例一之NettyWebsocket》,和《Netty inAction》中11章的例子一样,这个例子通过WebSocket实现了一个聊天室的群发功能。但后者的例子我没本事跑通。

新建一个Maven项目,项目名称叫NettyWebSocket,具体过程请参考前一贴。别忘了在pom.xml中加入netty5.0的依赖。

在项目中新建4个class:

技术分享

4个类的代码你可以从后面的内容中找到,这里先不考虑代码的问题。用“复制>>粘贴”将代码原样拷贝到这4个源文件中再说。

注意,如果源代码中缺少import语句,请自行fixed一下。

 

在NettyServe.java上右键,Run As >> Java Application,运行服务端代码。此时控制台将输出:

服务端开启等待客户端连接 ... ...

然后在磁盘上创建一个index.html文件,文件内容你也可以在后面的内容中找到。在Finder中双击index.html文件,用Safari打开。

在Dock栏上右击Safari图标,选择“新建窗口”,打开另一个Safari窗口。然后将第一个Safari窗口中的地址复制到第二个Safari窗口地址栏中,回车。

将两个窗口并列,你可以看到在一个窗口中输入的聊天消息,在另一个窗口中会实时得到刷新,显然是服务端通过WebSocket同时向所有连接的客户端进行了推送:

技术分享

在服务端控制台中也会有输出:

技术分享

测试完毕,我们下面再来介绍代码。

注意,如果使用Safari测试,当你关闭Safari,服务端会输出“客户端与服务端连接关闭”。如果使用Chrome测试,当你关闭Chrome时,服务端会抛出一个“UnsupportedOperationException”异常。

 

二、Global.java

这个类很简单,就是定义了一个全局变量ChannelGroup  group,这样在后面的其它类(主要是MyWebSocketServerHandler)中就不用定义group了,直接使用就行了。

源代码(如果你已经复制/粘贴过源代码了,请跳过):

public class Global {

   public staticChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}

 

三、ChildChannelHandler.java

这个类实现了ChannelInitializer,即Channel与ChannelHandler的绑定,也就是向pipeline中填入各个ChannelHandler。

源代码(如果你已经复制/粘贴过源代码了,请跳过):

public classChildChannelHandler extends ChannelInitializer<SocketChannel>{

   @Override

   protected void initChannel(SocketChannel e) throws Exception {

     

      e.pipeline().addLast("http-codec",new HttpServerCodec());

      e.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));

      e.pipeline().addLast("http-chunked",new ChunkedWriteHandler());

      e.pipeline().addLast("handler",newMyWebSocketServerHandler());

   }

}

 

四、MyWebSocketServerHandler.java

服务器所有的业务逻辑被放到这里,当然也包括我们的WebSocket群发。

源代码(如果你已经复制/粘贴过源代码了,请跳过):

public class MyWebSocketServerHandlerextends

      SimpleChannelInboundHandler<Object>{

   private static final Logger logger = Logger

         .getLogger(WebSocketServerHandshaker.class.getName());

   private WebSocketServerHandshaker handshaker;

   @Override

   public voidchannelActive(ChannelHandlerContext ctx) throws Exception {

      // 添加

      Global.group.add(ctx.channel());

      System.out.println("客户端与服务端连接开启");

   }

   @Override

   public voidchannelInactive(ChannelHandlerContext ctx) throws Exception {

      // 移除

      Global.group.remove(ctx.channel());

      System.out.println("客户端与服务端连接关闭");

   }

   @Override

   protected void messageReceived(ChannelHandlerContext ctx, Object msg)

         throws Exception {

      if (msg instanceof FullHttpRequest) {

         handleHttpRequest(ctx, ((FullHttpRequest) msg));

      } else if (msg instanceofWebSocketFrame) {

         handlerWebSocketFrame(ctx, (WebSocketFrame) msg);

      }

   }

   @Override

   public voidchannelReadComplete(ChannelHandlerContext ctx) throws Exception {

      ctx.flush();

   }

   private voidhandlerWebSocketFrame(ChannelHandlerContext ctx,

         WebSocketFrameframe) {

      // 判断是否关闭链路的指令

      if (frame instanceof CloseWebSocketFrame) {

         handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame

               .retain());

         return;

      }

      // 判断是否ping消息

      else if (frame instanceofPingWebSocketFrame) {

         ctx.channel().write(

               new PongWebSocketFrame(frame.content().retain()));

         return;

      }

      // 本例程仅支持文本消息,不支持二进制消息

      else if (!(frame instanceofTextWebSocketFrame)) {

         System.out.println("本例程仅支持文本消息,不支持二进制消息");

         throw newUnsupportedOperationException(String.format(

               "%s frame types notsupported", frame.getClass().getName()));

      }

      // 返回应答消息

      Stringrequest = ((TextWebSocketFrame) frame).text();

      System.out.println("服务端收到:" + request);

      if (logger.isLoggable(Level.FINE)) {

         logger

               .fine(String.format("%s received %s", ctx.channel(),

                     request));

      }

      TextWebSocketFrametws = new TextWebSocketFrame(new Date().toString()

            +ctx.channel().id() + "" + request);

      // 群发

      Global.group.writeAndFlush(tws);

      // 返回【谁发的发给谁】

      // ctx.channel().writeAndFlush(tws);

   }

   private voidhandleHttpRequest(ChannelHandlerContext ctx,

         FullHttpRequestreq) {

      if (!req.getDecoderResult().isSuccess()

            ||(!"websocket".equals(req.headers().get("Upgrade")))) {

         sendHttpResponse(ctx, req, new DefaultFullHttpResponse(

               HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));

         return;

      }

      WebSocketServerHandshakerFactorywsFactory = new WebSocketServerHandshakerFactory(

            "ws://localhost:7397/websocket", null, false);

      handshaker = wsFactory.newHandshaker(req);

      if (handshaker == null) {

         WebSocketServerHandshakerFactory

               .sendUnsupportedWebSocketVersionResponse(ctx.channel());

      } else {

         handshaker.handshake(ctx.channel(), req);

      }

   }

   private static void sendHttpResponse(ChannelHandlerContext ctx,

         FullHttpRequestreq, DefaultFullHttpResponse res) {

      // 返回应答给客户端

      if (res.getStatus().code()!= 200) {

         ByteBufbuf = Unpooled.copiedBuffer(res.getStatus().toString(),

               CharsetUtil.UTF_8);

         res.content().writeBytes(buf);

         buf.release();

      }

      // 如果是非Keep-Alive,关闭连接

      ChannelFuturef = ctx.channel().writeAndFlush(res);

      if (!isKeepAlive(req) || res.getStatus().code() != 200) {

         f.addListener(ChannelFutureListener.CLOSE);

      }

   }

   private static boolean isKeepAlive(FullHttpRequest req) {

      return false;

   }

   @Override

   public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)

         throws Exception {

      cause.printStackTrace();

      ctx.close();

   }

}


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。