用Jetty 9.1运行Java WebSockets微服务
Jetty 9.1的发布将Java WebSockets (JSR-356) 带入了非Java EE环境,从而开启了微服务时代。我们可以将Jetty的容器包含在java应用程序中(注意,不是Java代码运行在容器中,而是相反),这种微服务轻量概念开始得到提倡推广,为模块化开启新的探索方向。
该案例目标是要建设一个从客户端程序接受消息并广播到当前连接的所有其他客户端WebSocket服务器。假设有一个消息模型:
package com.example.services;
public class Message {
private String
username;
private String message;
public Message() {
}
public Message( final String username, final String
message ) {
this.username =
username;
this.message =
message;
}
public String getMessage()
{
return
message;
}
public String getUsername()
{
return
username;
}
public void setMessage( final String message )
{
this.message =
message;
}
public void setUsername( final String username )
{
this.username =
username;
}
}
为了分离服务器端和客户端,JSR-356规定了两个元注解@ServerEndpoint 和@ClientEndpoit
客户端代码:
@ClientEndpoint
public class BroadcastClientEndpoint
{
private static final Logger log =
Logger.getLogger(
BroadcastClientEndpoint.class.getName() );
@OnOpen
public void onOpen( final
Session session ) throws IOException, EncodeException
{
session.getBasicRemote().sendObject( new Message( "Client", "Hello!" )
);
}
@OnMessage
public void onMessage(
final Message message ) {
log.info( String.format( "Received message ‘%s‘ from
‘%s‘",
message.getMessage(), message.getUsername() ) );
}
}
@OnOpen 是当客户端连接到服务器开始调用,@OnMessage是每次服务器向客户端发送消息时调用。
消息传递使用Json,这里使用JSR-353规范的Json类将对象进行序列化。我们需要在Message里面加上一下Json反序列化,也就是将Json转为Message对象:
public class Message {
public static class
MessageDecoder implements Decoder.Text< Message >
{
private JsonReaderFactory
factory = Json.createReaderFactory( Collections.< String, Object
>emptyMap() );
@Override
public void init( final
EndpointConfig config ) {
}
@Override
public Message decode(
final String str ) throws DecodeException
{
final
Message message = new Message();
try( final
JsonReader reader = factory.createReader( new StringReader( str ) ) )
{
final JsonObject json =
reader.readObject();
message.setUsername( json.getString( "username" )
);
message.setMessage( json.getString( "message" )
);
}
return
message;
}
@Override
public boolean
willDecode( final String str )
{
return
true;
}
@Override
public void destroy()
{
}
}
}
我们需要告诉客户端,我们有一个Json编码器和解码器,在BroadcastClientEndpoint类上加入:
@ClientEndpoint( encoders = { MessageEncoder.class }, decoders = {
MessageDecoder.class } )
public class BroadcastClientEndpoint {
}
下面是调用运行代码:
public class ClientStarter {
public static void main(
final String[] args ) throws Exception
{
final String client =
UUID.randomUUID().toString().substring( 0, 8 );
final WebSocketContainer container
=
ContainerProvider.getWebSocketContainer();
final String uri = "ws://localhost:8080/broadcast";
try( Session session =
container.connectToServer( BroadcastClientEndpoint.class, URI.create( uri ) ) )
{
for( int
i = 1; i <= 10; ++i )
{
session.getBasicRemote().sendObject( new Message( client, "Message #" + i )
);
Thread.sleep( 1000
);
}
}
// Application doesn‘t exit if
container‘s threads are still
running
( ( ClientContainer
)container ).stop();
}
}
这是连接URL ws://localhost:8080/broadcast,随机挑选一些客户端名称(从UUID),每1秒的延迟产生10条信息,(只是为了确保我们有时间去接收他们都回来了)。
下面是服务器端的代码:
@ServerEndpoint(
value =
"/broadcast",
encoders = { MessageEncoder.class
},
decoders = { MessageDecoder.class
}
)
public class BroadcastServerEndpoint
{
private static final Set< Session > sessions
=
Collections.synchronizedSet( new HashSet< Session >() );
@OnOpen
public void onOpen( final
Session session ) {
sessions.add(
session );
}
@OnClose
public void onClose( final
Session session ) {
sessions.remove( session );
}
@OnMessage
public void onMessage(
final Message message, final Session client
)
throws IOException, EncodeException
{
for( final Session session:
sessions )
{
session.getBasicRemote().sendObject( message
);
}
}
}
为了使这个服务器端点能够运行,我们将其注册入Jetty服务器,Jetty9.能够在嵌入下运行:
public class ServerStarter {
public static void
main( String[] args ) throws Exception
{
Server server = new Server( 8080
);
// Create the ‘root‘ Spring
application context
final
ServletHolder servletHolder = new ServletHolder( new DefaultServlet()
);
final ServletContextHandler
context = new ServletContextHandler();
context.setContextPath( "/"
);
context.addServlet(
servletHolder, "/*" );
context.addEventListener( new ContextLoaderListener()
);
context.setInitParameter( "contextClass",
AnnotationConfigWebApplicationContext.class.getName()
);
context.setInitParameter(
"contextConfigLocation", AppConfig.class.getName() );
server.setHandler( context
);
WebSocketServerContainerInitializer.configureContext(
context );
server.start();
server.join();
}
}
最重要的是WebSocketServerContainerInitializer.configureContext:,它是创建一个Websockets的容器,目前容器内什么也没有,我们没有注册进入我们的服务器端点。
Spring的AppConfig 能够帮助我们做到这点:
@Configuration
public class AppConfig {
@Inject private WebApplicationContext context;
private
ServerContainer container;
public class SpringServerEndpointConfigurator extends
ServerEndpointConfig.Configurator
{
@Override
public < T > T
getEndpointInstance( Class< T > endpointClass
)
throws InstantiationException
{
return
context.getAutowireCapableBeanFactory().createBean( endpointClass
);
}
}
@Bean
public
ServerEndpointConfig.Configurator configurator()
{
return new
SpringServerEndpointConfigurator();
}
@PostConstruct
public void init()
throws DeploymentException {
container = ( ServerContainer
)context.getServletContext().
getAttribute( javax.websocket.server.ServerContainer.class.getName() );
container.addEndpoint(
new
AnnotatedServerEndpointConfig(
BroadcastServerEndpoint.class,
BroadcastServerEndpoint.class.getAnnotation( ServerEndpoint.class
)
)
{
@Override
public Configurator getConfigurator()
{
return
configurator();
}
}
);
}
}
容器通过调用构造函数将创建container,然后每一次新的客户端连接创建一个服务器端点的新实例。
我们检索的WebSockets容器的方法是Jetty专用规范:查询来自名为“javax.websocket.server.ServerContainer”上下文的属性。
最后运行:
mvn clean package
java -jar
target\jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-server.jar // run server
java
-jar target/jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-client.jar // run yet
another client
输出结果部分:
Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint
onMessage
INFO: Received message ‘Hello!‘ from ‘Client‘
Nov 29, 2013
9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
INFO:
Received message ‘Message #1‘ from ‘392f68ef‘
Nov 29, 2013 9:21:29 PM
com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message
‘Message #2‘ from ‘8e3a869d‘
Nov 29, 2013 9:21:29 PM
com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message
‘Message #7‘ from ‘ca3a06d0‘
Nov 29, 2013 9:21:30 PM
com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message
‘Message #4‘ from ‘6cb82119‘
Nov 29, 2013 9:21:30 PM
com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message
‘Message #2‘ from ‘392f68ef‘
Nov 29, 2013 9:21:30 PM
com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message
‘Message #3‘ from ‘8e3a869d‘
Nov 29, 2013 9:21:30 PM
com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message
‘Message #8‘ from ‘ca3a06d0‘
Nov 29, 2013 9:21:31 PM
com.example.services.BroadcastClientEndpoint onMessage
INFO: Received message
‘Message #5‘ from ‘6cb82119‘
Nov 29, 2013 9:21:31 PM
com.example.services.BroadcastClientEndpoint onMessage
该项目源码下载: GitHub
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。