巧用apache httpcore-nio实现android推送服务
1. 背景
Android推送服务应用非常广泛,一般有轮询、SMS推送、IP推送几种实现方式。由于轮询的即时性不高、SMS推送需要不菲的费用,所以一般采取IP推送。由于google的IP推送平台C2DM国内被屏蔽,国内涌现很多优秀的推送平台如个推、极光推送。由于实现推送服务有一定技术难度,很多移动互联网应用直接租用这些平台服务,达到快速拓展业务的目标。
但是在一些企业和行业应用场合,限制手机接入互联网,不能采用互联网推送平台,就必须实现自己的推送服务。国内流行的方案是采用开源的androidpn,基于XMPP协议。由于XMPP是一个用于IM即时消息的协议,用来做推送服务有很多冗余的东西,显得很不优雅。因此本文提出另一种简洁思路。
2. 采用HTTP的可能性
从服务器向手机推送存在两个限制:1)移动设备IP经常改变,Server端无法通过配置解决;2)设备常在NAT后面(如无线路由器),无法由Server发起建立socket连接。因此必须由手机端发起向服务器发起socket连接,并保持这个连接,服务在这个连接上向手机推送消息。
因此推送服务存在这样的场景:在socket建连时,手机是client端,推送服务器是server端。但是在消息推送时,推送服务器是发起端成了client,手机作为接收端变成了Server。一般的HTTP开源实现,如apache http core,socket server端绑定http server端,socket client端绑定HTTP client端,无法拆分。但是幸运的是,apache http core的NIO版本,却实现可socket层和http层的分离,可以在socket server上发起http client请求。推送服务器因为要保持成千上万的持久连接,NIO成了不二的选择,apache httpcore-nio的NIO模型是理想选择。
3. Demo描述
在公司项目中,服务器端采用apache httpcore-nio,手机端则采用普通的http协议栈,以简化手机的开发难度。因为基于简单协议HTTP、及成熟开源代码,开发量非常小,千把行代码就实现了一个简洁够用的推送服务器,目前该推送服务器已经用于生产环境一年多,性能和稳定性都表现相当不错。
在demo中,手机和服务器端统一采用apache httpcore-nio,基于HTTP通信。通信模型如下图所示:两个手机、一台服务器都运行于localhost,采用不同的端口区分。
Java工程如下图所示。
采用apache httpcore-nio-4.2.4版本。ReverseNHttpServer.java实现推送客户端功能。ReverseNHttpClient.java和ConnectionManager.java实现推送服务器功能,其中ConnectionManager用一个Queue保存已经建立的连接。ReverseHttpTest.java用于测试。
3.1. ReverseNHttpClient
下面是源码。
package com.cangfu.reversehttp; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.http.ContentTooLongException; import org.apache.http.HttpEntity; import org.apache.http.HttpException; import org.apache.http.HttpHost; import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; import org.apache.http.entity.ContentType; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.nio.DefaultHttpClientIODispatch; import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.message.BasicHttpRequest; import org.apache.http.message.BasicHttpResponse; import org.apache.http.nio.ContentDecoder; import org.apache.http.nio.IOControl; import org.apache.http.nio.NHttpClientConnection; import org.apache.http.nio.entity.ContentBufferEntity; import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer; import org.apache.http.nio.protocol.BasicAsyncRequestProducer; import org.apache.http.nio.protocol.HttpAsyncRequestExecutor; import org.apache.http.nio.protocol.HttpAsyncRequester; import org.apache.http.nio.reactor.IOEventDispatch; import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.nio.reactor.ListenerEndpoint; import org.apache.http.nio.reactor.ListeningIOReactor; import org.apache.http.nio.util.HeapByteBufferAllocator; import org.apache.http.nio.util.SimpleInputBuffer; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.params.CoreProtocolPNames; import org.apache.http.params.HttpParams; import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.ImmutableHttpProcessor; import org.apache.http.protocol.RequestConnControl; import org.apache.http.protocol.RequestContent; import org.apache.http.protocol.RequestExpectContinue; import org.apache.http.protocol.RequestTargetHost; import org.apache.http.protocol.RequestUserAgent; public class ReverseNHttpClient { ListeningIOReactor ioReactor; HttpParams params; Thread t; static ConnectionManager<NHttpClientConnection> connMgr = new ConnectionManager<NHttpClientConnection>(); public static ConnectionManager<NHttpClientConnection> getConnectionManager() { return connMgr; } public ReverseNHttpClient(IOReactorConfig ioconfig) throws IOReactorException { this.params = new SyncBasicHttpParams(); this.params .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000) .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000) .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) .setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1") .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1"); // Create client-side I/O reactor ioReactor = new DefaultListeningIOReactor(ioconfig); } public void start() { HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor() { @Override public void connected( final NHttpClientConnection conn, final Object attachment) throws IOException, HttpException { try { ReverseNHttpClient.getConnectionManager().putConnection(conn); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(conn + ": connection open in ReverseClient side"); super.connected(conn, attachment); } }; // Create client-side I/O event dispatch final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params); // Run the I/O reactor in a separate thread t = new Thread(new Runnable() { public void run() { try { // Ready to go! ioReactor.execute(ioEventDispatch); } catch (IOException e) { System.err.println("I/O error: " + e.getMessage()); } System.out.println("Shutdown"); } }); // Start the client thread t.start(); } public void stop() { if (t != null) t.interrupt(); } public void accept(int port) throws InterruptedException { ListenerEndpoint request = ioReactor.listen(new InetSocketAddress(port)); request.waitFor(); } public void HttpExchange(HttpHost target, BasicHttpRequest request) throws InterruptedException { HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] { // Use standard client-side protocol interceptors new RequestContent(), new RequestTargetHost(), new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue()}); HttpAsyncRequester requester = new HttpAsyncRequester( httpproc, new DefaultConnectionReuseStrategy(), params); requester.execute( new BasicAsyncRequestProducer(target, request), new MyResponseConsumer(), ReverseNHttpClient.getConnectionManager().takeConnection() ); } static class MyResponseConsumer extends AbstractAsyncResponseConsumer<HttpResponse> { private volatile HttpResponse response; private volatile SimpleInputBuffer buf; public MyResponseConsumer() { super(); } @Override protected void onResponseReceived(final HttpResponse response) throws IOException { this.response = response; } @Override protected void onEntityEnclosed( final HttpEntity entity, final ContentType contentType) throws IOException { long len = entity.getContentLength(); if (len > Integer.MAX_VALUE) { throw new ContentTooLongException("Entity content is too long: " + len); } if (len < 0) { len = 4096; } this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator()); this.response.setEntity(new ContentBufferEntity(entity, this.buf)); } @Override protected void onContentReceived( final ContentDecoder decoder, final IOControl ioctrl) throws IOException { if (this.buf == null) { throw new IllegalStateException("Content buffer is null"); } this.buf.consumeContent(decoder); } @Override protected void releaseResources() { this.response = null; this.buf = null; } @Override protected HttpResponse buildResult(final HttpContext context) throws IOException { System.out.println(); System.out.println(((BasicHttpResponse)response).toString()); response.getEntity().writeTo(System.out); System.out.println(); return this.response; } } }
其中,start()方法用于启动httpclient,accept()用于启动socket监听,httpExchange用于发送http请求,MyResponseconsumer是一个内部类,用于处理返回的响应。
3.2. ReverseNHttpServer
下面是源码。
package com.cangfu.reversehttp; import java.io.File; import java.io.IOException; import java.net.SocketAddress; import java.net.URL; import java.net.URLDecoder; import java.security.KeyStore; import java.util.Locale; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import org.apache.http.HttpException; import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.HttpResponseInterceptor; import org.apache.http.HttpStatus; import org.apache.http.MethodNotSupportedException; import org.apache.http.entity.ContentType; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.nio.DefaultHttpServerIODispatch; import org.apache.http.impl.nio.DefaultNHttpServerConnection; import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory; import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.nio.NHttpConnection; import org.apache.http.nio.NHttpConnectionFactory; import org.apache.http.nio.NHttpServerConnection; import org.apache.http.nio.entity.NFileEntity; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.nio.protocol.BasicAsyncRequestConsumer; import org.apache.http.nio.protocol.BasicAsyncResponseProducer; import org.apache.http.nio.protocol.HttpAsyncExchange; import org.apache.http.nio.protocol.HttpAsyncRequestConsumer; import org.apache.http.nio.protocol.HttpAsyncRequestHandler; import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry; import org.apache.http.nio.protocol.HttpAsyncService; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOEventDispatch; import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.nio.reactor.SessionRequest; import org.apache.http.nio.reactor.SessionRequestCallback; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.params.CoreProtocolPNames; import org.apache.http.params.HttpParams; import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.protocol.ExecutionContext; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.ImmutableHttpProcessor; import org.apache.http.protocol.ResponseConnControl; import org.apache.http.protocol.ResponseContent; import org.apache.http.protocol.ResponseDate; import org.apache.http.protocol.ResponseServer; public class ReverseNHttpServer { HttpParams params; final ConnectingIOReactor ioReactor; Thread t; static ConnectionManager<NHttpServerConnection> connMgr = new ConnectionManager<NHttpServerConnection>(); public static ConnectionManager<NHttpServerConnection> getConnectionManager() { return connMgr; } public ReverseNHttpServer(IOReactorConfig ioconfig) throws IOReactorException { this.params = new SyncBasicHttpParams(); this.params .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000) .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000) .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) .setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1") .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1"); ioReactor = new DefaultConnectingIOReactor(ioconfig); } public void start(String path, boolean ssl) throws Exception { // Create HTTP protocol processing chain HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] { // Use standard server-side protocol interceptors new ResponseDate(), new ResponseServer(), new ResponseContent(), new ResponseConnControl() }); // Create request handler registry HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry(); // Register the default handler for all URIs registry.register("*", new HttpFileHandler(new File(path))); // Create server-side HTTP protocol handler HttpAsyncService protocolHandler = new HttpAsyncService( httpproc, new DefaultConnectionReuseStrategy(), registry, params) { @Override public void connected(final NHttpServerConnection conn) { System.out.println(conn + ": connection open in ReverseServer side"); try { ReverseNHttpServer.getConnectionManager().putConnection(conn); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } super.connected(conn); } @Override public void closed(final NHttpServerConnection conn) { System.out.println(conn + ": connection closed"); super.closed(conn); } }; // Create HTTP connection factory NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory; if (ssl) { // Initialize SSL context ClassLoader cl = ReverseNHttpServer.class.getClassLoader(); URL url = cl.getResource("my.keystore"); if (url == null) { System.out.println("Keystore not found"); System.exit(1); } KeyStore keystore = KeyStore.getInstance("jks"); keystore.load(url.openStream(), "secret".toCharArray()); KeyManagerFactory kmfactory = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm()); kmfactory.init(keystore, "secret".toCharArray()); KeyManager[] keymanagers = kmfactory.getKeyManagers(); SSLContext sslcontext = SSLContext.getInstance("TLS"); sslcontext.init(keymanagers, null, null); connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params); } else { connFactory = new DefaultNHttpServerConnectionFactory(params); } // Create server-side I/O event dispatch final IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory); // Run the I/O reactor in a separate thread t = new Thread(new Runnable() { public void run() { try { // Ready to go! ioReactor.execute(ioEventDispatch); } catch (IOException e) { System.err.println("I/O error: " + e.getMessage()); } System.out.println("Shutdown"); } }); // Start the client thread t.start(); } public void stop() { if (t != null) { t.interrupt(); } } public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress) throws InterruptedException { SessionRequest sessionReq = ioReactor.connect(remoteAddress, localAddress, null, new SessionRequestCallback() { @Override public void completed(SessionRequest request) { System.out.println("connection established succeed in device side!!!"); } @Override public void failed(SessionRequest request) { System.err.println("Device side connection failed!"); System.err.println(request.getException().getMessage()); } @Override public void timeout(SessionRequest request) { System.err.println("Device side connection timeout!"); System.err.println(request.getException().getMessage()); } @Override public void cancelled(SessionRequest request) { System.err.println("Device side connection cancelled!"); System.err.println(request.getException().getMessage()); } } ); sessionReq.waitFor(); } static class HttpFileHandler implements HttpAsyncRequestHandler<HttpRequest> { private final File docRoot; public HttpFileHandler() { docRoot = new File("."); } public HttpFileHandler(final File docRoot) { super(); this.docRoot = docRoot; } public HttpAsyncRequestConsumer<HttpRequest> processRequest( final HttpRequest request, final HttpContext context) { // Buffer request content in memory for simplicity return new BasicAsyncRequestConsumer(); } public void handle( final HttpRequest request, final HttpAsyncExchange httpexchange, final HttpContext context) throws HttpException, IOException { HttpResponse response = httpexchange.getResponse(); handleInternal(request, response, context); httpexchange.submitResponse(new BasicAsyncResponseProducer(response)); } private void handleInternal( final HttpRequest request, final HttpResponse response, final HttpContext context) throws HttpException, IOException { String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH); if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) { throw new MethodNotSupportedException(method + " method not supported"); } String target = request.getRequestLine().getUri(); final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8")); if (!file.exists()) { response.setStatusCode(HttpStatus.SC_NOT_FOUND); NStringEntity entity = new NStringEntity( "<html><body><h1>File" + file.getPath() + " not found</h1></body></html>", ContentType.create("text/html", "UTF-8")); response.setEntity(entity); System.out.println("File " + file.getPath() + " not found"); } else if (!file.canRead() || file.isDirectory()) { response.setStatusCode(HttpStatus.SC_FORBIDDEN); NStringEntity entity = new NStringEntity( "<html><body><h1>Access denied</h1></body></html>", ContentType.create("text/html", "UTF-8")); response.setEntity(entity); System.out.println("Cannot read file " + file.getPath()); } else { NHttpConnection conn = (NHttpConnection) context.getAttribute( ExecutionContext.HTTP_CONNECTION); response.setStatusCode(HttpStatus.SC_OK); NFileEntity body = new NFileEntity(file, ContentType.create("text/html")); response.setEntity(body); System.out.println(conn + ": serving file " + file.getPath()); } } } }
其中,start()方法用于启动httpserver,connect()用于发起socket连接。HttpFileHandler()是一个处理http请求的内部钩子类,它从本地读取一个文本文件,通过response发送到服务器。
3.3. ConnectionManager
源码。
package com.cangfu.reversehttp; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ConnectionManager<E> { private final BlockingQueue<E> connections; public ConnectionManager() { connections = new LinkedBlockingQueue<E>(); } public void putConnection(E conn) throws InterruptedException { this.connections.put(conn); } public E takeConnection() throws InterruptedException { return this.connections.take(); } }
3.4. ReverseHttpTest
源码如下。
package com.cangfu.reversehttp; import java.net.InetSocketAddress; import org.apache.http.HttpHost; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.message.BasicHttpRequest; import org.junit.Test; public class ReverseHttpTest { @Test public void testReverseHttp() throws Exception { int sockServerPort = 60010; int sockClientPort1 = 60011; int sockClientPort2 = 60012; IOReactorConfig ioconfig = new IOReactorConfig(); ioconfig.setIoThreadCount(1); ioconfig.setSoKeepalive(true); ioconfig.setSoReuseAddress(true); ioconfig.setSelectInterval(1000); // Start ReverseClient, at APNS Server side ReverseNHttpClient reverseClient = new ReverseNHttpClient(ioconfig); reverseClient.start(); reverseClient.accept(sockServerPort); // Start ReverseServer, at Device side ReverseNHttpServer reverseServer1 = new ReverseNHttpServer(ioconfig); reverseServer1.start(".", false); reverseServer1.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort1)); // Start another ReverseServer, at Device side ReverseNHttpServer reverseServer2 = new ReverseNHttpServer(ioconfig); reverseServer2.start(".", false); reverseServer2.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort2)); // send a request from APNS server to phone 1 HttpHost target1 = new HttpHost("localhost", sockClientPort1, "http"); BasicHttpRequest request1 = new BasicHttpRequest("GET", "/Hello1.txt"); reverseClient.HttpExchange(target1, request1); // send another request from APNS server to phone 2 HttpHost target2 = new HttpHost("localhost", sockClientPort2, "http"); BasicHttpRequest request2 = new BasicHttpRequest("GET", "/Hello2.txt"); reverseClient.HttpExchange(target2, request2); Thread.sleep(2*1000); reverseServer1.stop(); reverseServer2.stop(); reverseClient.stop(); } }
启动一个推送服务器的HttpClient和两个手机端的HttpServer。推送服务器分别向两个手机发送GET请求。手机读取本地的hello.txt文件,放置于http response的消息体中,返回给服务器。下面是测试运行结果:
本文出自 “伧夫的博客” 博客,请务必保留此出处http://cangfu.blog.51cto.com/5966711/1580017
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。