netty websocket 简单消息推送demo

今天心情很不好!!! 原因保密。


这篇是基于"netty与websocket通信demo"。

错误想法:大量客户请求,共用一个worker,来实现推送。

正确作法:应该是对Channel对应的ChannelGroup进行操作,来实现推送。

一个Channel可以划分到多个ChannelGroup中。


PushServerChannelHandler和DynMessage这两个类最重要,其实类基本没变。


package org.sl.demo.chatserver;

import java.util.List;
import java.util.Map;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;

public class PushServerChannelHandler extends SimpleChannelHandler {
	static boolean debug = true;
	
	@Override
	public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e){
		if(debug){
			System.out.println("channelOpen");
		}
		DynMessage.addAudience(e.getChannel());
	}
	
	@Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
		Channel ch = e.getChannel();
		Object msg = e.getMessage();
		
		if(debug){
			System.out.println("---------------");
			System.out.println("message: "+msg.getClass());
		}
		try{
			if(msg instanceof HttpRequest){
				processHttpRequest(ch, (HttpRequest)msg);
			}else if(msg instanceof WebSocketFrame){
				processWebsocketRequest(ch,(WebSocketFrame)msg);
			}else{
				//未处理的请求类型
			}
		}catch(Exception ex){
			ch.close().sync();
		}
		super.messageReceived(ctx, e);
	}
	
	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e){
		if(debug){
			System.out.println("channelClosed");
		}
		if(e instanceof MessageEvent){
			MessageEvent me = (MessageEvent) e;			
		}
		DynMessage.removeAudience(e.getChannel());
		e.getChannel().close();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e){
		if(debug){
			System.out.println("channelClosed");
		}
		DynMessage.removeAudience(e.getChannel());
		e.getCause().printStackTrace();
		e.getChannel().close();
		try {
			super.exceptionCaught(ctx, e);
		} catch (Exception e1) {		
			e1.printStackTrace();
		}
	}
	
	void processHttpRequest(Channel channel,HttpRequest request){
		HttpHeaders headers = request.headers();
		if(debug){
			List<Map.Entry<String,String>> ls = headers.entries();
			for(Map.Entry<String,String> i: ls){
				System.out.println("header  "+i.getKey()+":"+i.getValue());
			}
		}	
		
		//non-get request
		if(!HttpMethod.GET.equals(request.getMethod())){
			DefaultHttpResponse resp = new DefaultHttpResponse(
					HttpVersion.HTTP_1_1,
					HttpResponseStatus.BAD_REQUEST);
			channel.write(resp);			
			channel.close();
			return;
		}
				
		WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory(
				"ws://"+request.headers().get(HttpHeaders.Names.HOST),
				null,false );
		WebSocketServerHandshaker wsShakerHandler = wsShakerFactory.newHandshaker(request);
		if(null==wsShakerHandler){
			//无法处理的websocket版本
			wsShakerFactory.sendUnsupportedWebSocketVersionResponse(channel);
		}else{
			//向客户端发送websocket握手,完成握手
			//客户端收到的状态是101 sitching protocol
			wsShakerHandler.handshake(channel, request);
		}		
	}
	
	void processWebsocketRequest(Channel channel, WebSocketFrame request) throws Exception{		
		if(request instanceof CloseWebSocketFrame){
			DynMessage.removeAudience(channel);
			channel.close().sync();
		}else if(request instanceof PingWebSocketFrame){			
			channel.write(new PongWebSocketFrame(request.getBinaryData()));  
		}else if(request instanceof TextWebSocketFrame){
			//这个地方 可以根据需求,加上一些业务逻辑
			TextWebSocketFrame txtReq = (TextWebSocketFrame) request;		
			if(debug){ System.out.println("txtReq:"+txtReq.getText());}
			if("disconnect".equalsIgnoreCase(txtReq.getText())){
				DynMessage.removeAudience(channel);
				channel.close().sync();
				return;
			}
			//把符合条件的channel添加到DynMessage的channelGroup中
			DynMessage.addAudience(channel);
		}else{
			//WebSocketFrame还有一些
		}
	}
}

package org.sl.demo.chatserver;

import java.util.Random;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
*动态产生消息,并向Channel组推送。
*/
public class DynMessage implements Runnable{
	public static ChannelGroup audiences = new DefaultChannelGroup("msg-group");
	
	static public void addAudience(Channel ch){		
		audiences.add(ch);
	}
	
	static public void removeAudience(Channel ch){
		audiences.remove(ch);
	}
	
	static String[] names = {
		"Tom", "Jerry",
		"Terry", "Looney",
		"Merrie", "William",
		"Joseph", "Hanna",
		"Speike", "Tyke",
		"Tuffy", "Lightning",
	};
	static String message = "";
	
	public static String getMessage(){
		StringBuffer sb = new StringBuffer();
		sb.append("hello,my name is ");
		sb.append(names[new Random().nextInt(names.length)]);
		sb.append(".");		
		return sb.toString();
//		return message;
	}

	@Override
	public void run() {		
		System.out.println("DynMessage start");
		for(;;){
			String msg = getMessage();			
			radiate(msg);
			try{Thread.sleep(1000); }catch(Exception ex){}
		}
	}
	
	void radiate(String msg){
		audiences.write(new TextWebSocketFrame(msg));
	}
}

<html>
<head>
<script src="jquery-1.9.1.js"></script>
<script src="messagepush.js"></script>
<script >
function doStop(){
	stopMsgPush();
}

function doWsStart(){
	var  r6 = generateMixed(6);
	$("#txtReq").val(r6);
	var  params = $("#txtReq").val();
	doStop();
	
	wsMsgPush(‘127.0.0.1‘,params,
		function(data){
			$("#txtResp").val(data);			
		},
		function(){
			$("#txtResp").val("ws close...");
		} ,
		function(){
			$("#txtResp").val("ws error...");
		} );		
}
</script>
</head>

<body>
 
<br/>
<br/><br/>
send: <input id="txtReq" readonly="readonly" type="text" value="" />
<input type="button" value="start" onclick="doWsStart()">
<input type="button" value="stop" onclick="doStop()"/> 
<br/>

recv: <input id="txtResp" type="text" value=""  size="50"/>
</body>
</html>

var _mp_ws = null;
var _mp_ajax_it = null;

function msgPush(url, params,onmessage,onclose,onerror){
	wsMsgPush(url,params,onmessage,onclose,onerror);
	if(!_mp_ws){
		ajaxMsgPush(url,params,10000,onmessage,onclose,onerror);
	}
}

function old_wsMsgPush(url, params,onmessage,onclose,onerror){	
	var ws = new WebSocket("ws://"+url); 
	ws.onopen = function(){ws.send(‘1111‘)};
	ws.onmessage = function(evt){ onmessage(evt.data);};
}

function wsMsgPush(url, params,onmessage,onclose,onerror){	
	_mp_ws = new WebSocket("ws://"+url); 
	if(!_mp_ws){ return; }
		
	_mp_ws.onopen = function(){ 
		_mp_ws.send(params); 
	};
	if(onmessage) _mp_ws.onmessage = function(evt){ onmessage(evt.data); }
	if(onerror) _mp_ws.onerror = function (evt){ onerror(); }
	if(onclose) _mp_ws.onclose = function (evt){ onclose(); }	
}

function ajaxMsgPush(url, params,interval,onmessage,onclose,onerror){	
	function __getmsg(){
		$.ajax({
			url:				url,
			data:			params,
			cache:			true,
			type:			"get",
			dataType:		"text",		
			success:		function(data, textStatus, jqXHR){ 
				if(onmessage) onmessage(data);
			},
			error:			function(jqXHR, textStatus, errorThrown){
				if(onerror) onerror();
			},
			complete:		function(jqXHR, textStatus){
				if(onclose) onclose();
			}
		});
	}	
	
	_mp_ajax_it = setInterval("__getmsg()",interval);
}

function stopMsgPush(){
	if(_mp_ws){
		_mp_ws.send("disconnect");
		_mp_ws.close();
	}

	if(_mp_ajax_it){
		clearInterval(_mp_ajax_it);
	}
}

var chars = [‘0‘,‘1‘,‘2‘,‘3‘,‘4‘,‘5‘,‘6‘,‘7‘,‘8‘,‘9‘,‘A‘,‘B‘,‘C‘,‘D‘,‘E‘,‘F‘,‘G‘,‘H‘,‘I‘,‘J‘,‘K‘,‘L‘,‘M‘,‘N‘,‘O‘,‘P‘,‘Q‘,‘R‘,‘S‘,‘T‘,‘U‘,‘V‘,‘W‘,‘X‘,‘Y‘,‘Z‘];
function generateMixed(n) {
     var res = "";
     for(var i = 0; i < n ; i ++) {
         var id = Math.ceil(Math.random()*35);
         res += chars[id];
     }
     return res;
}

package org.sl.demo.chatserver;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.timeout.WriteTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer;

public class PushServerChannelPiplelineFactory  implements ChannelPipelineFactory{

	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline cp = Channels.pipeline();
		cp.addLast("decoder", new HttpRequestDecoder());
		cp.addLast("encoder", new HttpResponseEncoder());
		cp.addLast("writeTimeout", new WriteTimeoutHandler(new HashedWheelTimer(),10));
		cp.addLast("handler", new PushServerChannelHandler());
		return cp;
	}

}

package org.sl.demo.chatserver;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class PushServer implements Runnable{
	int port = 80;
	
	public PushServer(int port){
		this.port = port;
	}

	@Override
	public void run() {
		System.out.println("ChatServer "+port);
		
		ServerBootstrap b = new ServerBootstrap(
				new NioServerSocketChannelFactory(
						Executors.newCachedThreadPool(),
						Executors.newCachedThreadPool()));
		b.setOption("child.tcpNoDelay", true);  
		b.setOption("child.keepAlive", true);
		b.setPipelineFactory(new PushServerChannelPiplelineFactory());
		b.bind(new InetSocketAddress(port));
	}
	
	public static void main(String[] args){
		Thread t = new Thread(new DynMessage(),"DynMessage");
		t.start();
		new PushServer(80).run();
	}
}































郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。