CXF中Web服务请求处理流程
本文将从源码(CXF版本为2.7.6)层面来分析一下CXF是如何完成对一个Web服务(SOAP类型为例)请求的处理的,注意这里分析的是请求的处理,并不包含这个请求是如何生成的,分析将将从CXF服务端接收到一个请求开始。在CXF中,处理HTTP请求是使用Jetty实现的,其处理请求的Handler为org.apache.cxf.transport.http_jetty.JettyHTTPHandler在其handle方法中调用的是org.apache.cxf.transport.http_jetty.JettyHTTPDestination的doService()方法,其中最重要的一句代码是调用了serviceRequest(context, req, resp)方法,下面是源码:
protected void serviceRequest(final ServletContext context, final HttpServletRequest req, final HttpServletResponse resp) throws IOException { Request baseRequest = (req instanceof Request) ? (Request)req : getCurrentRequest(); if (LOG.isLoggable(Level.FINE)) { LOG.fine("Service http request on thread: " + Thread.currentThread()); } Message inMessage = retrieveFromContinuation(req); if (inMessage == null) { inMessage = new MessageImpl(); ExchangeImpl exchange = new ExchangeImpl(); exchange.setInMessage(inMessage); setupMessage(inMessage, context, req, resp); ((MessageImpl)inMessage).setDestination(this); exchange.setSession(new HTTPSession(req)); } try { //处理消息 //incomingObserver的现实类为org.apache.cxf.transport.ChainInitiationObserver incomingObserver.onMessage(inMessage); resp.flushBuffer(); baseRequest.setHandled(true); } catch (SuspendedInvocationException ex) { //省略... } //省略... }
在这个方法中,完成了对Message的创建,初始化设置(setupMessage);Exchange、Session的创建,并将Message与Session放置进Exchange对象中。Message对象的初始化设置(setupMessage方法中)非常重要,向Message中放置了很多信息,如Http请求对象,响应对象,请求的地址等一系列信息,具体请查看setupMessage方法,需要的对象创建完成后调用ChainInitiationObserver.onMessage()方法,源码如下:
public void onMessage(Message m) { Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus); ClassLoaderHolder origLoader = null; try { if (loader != null) { origLoader = ClassLoaderUtils.setThreadContextClassloader(loader); } InterceptorChain phaseChain = null; //如果拦截器链不为空,则检查其状态,如果是停止或挂起状态则使其恢复 if (m.getInterceptorChain() != null) { phaseChain = m.getInterceptorChain(); // To make sure the phase chain is run by one thread once synchronized (phaseChain) { if (phaseChain.getState() == InterceptorChain.State.PAUSED || phaseChain.getState() == InterceptorChain.State.SUSPENDED) { phaseChain.resume(); return; } } } //重新创建Message对象,m为MessageImpl对象,而CXF中支持SOAP协议服务与restful服务 //所以要根据具体使用的是哪一种类型创建出更具体的Message对象,这里,如果是SOAP协议则创建的是SOAPMessage对象 //如果是restful服务则创建的是XMLMessage对象 Message message = getBinding().createMessage(m); Exchange exchange = message.getExchange(); if (exchange == null) { exchange = new ExchangeImpl(); m.setExchange(exchange); } exchange.setInMessage(message); //往Exchage对象中填充信息 setExchangeProperties(exchange, message); InterceptorProvider dbp = null; if (endpoint.getService().getDataBinding() instanceof InterceptorProvider) { //数据绑定对象不为空 dbp = (InterceptorProvider)endpoint.getService().getDataBinding(); } // setup chain if (dbp == null) { phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(), bus.getInInterceptors(), endpoint.getService().getInInterceptors(), endpoint.getInInterceptors(), getBinding().getInInterceptors()); } else { //将Bus、Service、Endpoint、协议绑定、数据绑定中的所有输入拦截器汇聚到拦截器链中 phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(), bus.getInInterceptors(), endpoint.getService().getInInterceptors(), endpoint.getInInterceptors(), getBinding().getInInterceptors(), dbp.getInInterceptors()); } //将拦截器链设置进Message中 message.setInterceptorChain(phaseChain); phaseChain.setFaultObserver(endpoint.getOutFaultObserver()); addToChain(phaseChain, message); //调用拦截器链的doIntercept方法,该方法中即依次调用链中的各个拦截器的handMessage方法。 phaseChain.doIntercept(message); } finally { if (origBus != bus) { BusFactory.setThreadDefaultBus(origBus); } if (origLoader != null) { origLoader.reset(); } } }
到这里我们知道了拦截器是如何被调用的,但奇怪是我们并还没有看到服务的调用,而当doIntercept执行完成后onMessage()方法将退出,serviceRequest方法也就基本上执行完成了。大家应该猜得出来,服务的调用正是在doIntercept方法中完成的,确切点是在拦截器链接中的某个拦截器的handMessage()方法中完成的。使用拦截器来实现框架自身某些特定的功能是不是和struts2中的拦截器有点类似呢(并不是说拦截器的实现方式)。
真实情况是拦截器链从Service注册的拦截器中获取了一个org.apache.cxf.interceptor.ServiceInvokerInterceptor对象,服务的调用由该拦截器完成。
public void handleMessage(final Message message) { final Exchange exchange = message.getExchange(); final Endpoint endpoint = exchange.get(Endpoint.class); final Service service = endpoint.getService(); //获取调用都对象org.apache.cxf.jaxws.JAXWSMethodInvoker final Invoker invoker = service.getInvoker(); //创建一个Runnable可执行对象 Runnable invocation = new Runnable() { public void run() { Exchange runableEx = message.getExchange(); //调用JAXWSMethodInvoker的invoke方法,其返回值result为MessageContentsList对象 Object result = invoker.invoke(runableEx, getInvokee(message)); if (!exchange.isOneWay()) {//如果不是单向Exchange,即是双向的,也就是还有输出 Endpoint ep = exchange.get(Endpoint.class); //创建输出Message Message outMessage = runableEx.getOutMessage(); if (outMessage == null) { outMessage = new MessageImpl(); outMessage.setExchange(exchange); outMessage = ep.getBinding().createMessage(outMessage); exchange.setOutMessage(outMessage); } copyJaxwsProperties(message, outMessage); if (result != null) { MessageContentsList resList = null; if (result instanceof MessageContentsList) { resList = (MessageContentsList)result; } else if (result instanceof List) { resList = new MessageContentsList((List<?>)result); } else if (result.getClass().isArray()) { resList = new MessageContentsList((Object[])result); } else { outMessage.setContent(Object.class, result); } if (resList != null) { //将结果放置在outMessage中 outMessage.setContent(List.class, resList); } } } } }; //从Service中获取Executor,不为null Executor executor = getExecutor(endpoint); //从Exchange中获取Executor,为null Executor executor2 = exchange.get(Executor.class); if (executor2 == executor || executor == null) { // already executing on the appropriate executor invocation.run(); } else { //执行 exchange.put(Executor.class, executor); FutureTask<Object> o = new FutureTask<Object>(invocation, null) { @Override protected void done() { super.done(); synchronized (this) { this.notifyAll(); } } }; synchronized (o) { //执行FutureTask,最后还是执行了刚创建Runnable的run方法,返回run方法中 executor.execute(o); //省略... } } }
调用JAXWSMethodInvoker的invoke(Exchange exchange, Object o)从AbstractInvoker类中继承而来,如下:
public Object invoke(Exchange exchange, Object o) { //获取服务实现类对象,默认使用单例工厂创建 final Object serviceObject = getServiceObject(exchange); try { //获取方法信息 BindingOperationInfo bop = exchange.get(BindingOperationInfo.class); MethodDispatcher md = (MethodDispatcher) exchange.get(Service.class).get(MethodDispatcher.class.getName()); //查找出要执行的方法 Method m = bop == null ? null : md.getMethod(bop); if (m == null && bop == null) { LOG.severe(new Message("MISSING_BINDING_OPERATION", LOG).toString()); throw new Fault(new Message("EXCEPTION_INVOKING_OBJECT", LOG, "No binding operation info", "unknown method", "unknown")); } List<Object> params = null; if (o instanceof List) { params = CastUtils.cast((List<?>)o); } else if (o != null) { params = new MessageContentsList(o); } m = adjustMethodAndParams(m, exchange, params); //Method m = (Method)bop.getOperationInfo().getProperty(Method.class.getName()); m = matchMethod(m, serviceObject);//再次匹配方法 //将服务对象,方法,方法参数传入 return invoke(exchange, serviceObject, m, params); } finally { releaseServiceObject(exchange, serviceObject); } }
将服务对象,方法,方法参数传入入调用了重载的protected Object invoke(Exchange exchange, final Object serviceObject, Method m, List<Object> params)方法该方法位于JAXWSMethodInvoker类中,其夫归调用父类的invoke方法,最终调用AbstractInvoker的performInvocation方法,如下:
protected Object performInvocation(Exchange exchange, final Object serviceObject, Method m, Object[] paramArray) throws Exception { paramArray = insertExchange(m, paramArray, exchange); //省略... return m.invoke(serviceObject, paramArray);//通过反射调用服务实现对象方法 }
到这里,你应该知道,服务是如何被调用的,那么服务被调用后又是如何返回的呢?现在回到ServiceInvokerInterceptor拦截器中创建的Runnable对象,因为Exchange是双向的,所以会创建出outMessage,然后将服务执行的结果封装成一个MessageContentsList对象放置在outMessage的content中。
在输入拦截器链中还注册了另外一个org.apache.cxf.interceptor.OutgoingChainInterceptor拦截器:
public void handleMessage(Message message) { Exchange ex = message.getExchange(); BindingOperationInfo binding = ex.get(BindingOperationInfo.class); if (null != binding && null != binding.getOperationInfo() && binding.getOperationInfo().isOneWay()) { closeInput(message); return; } Message out = ex.getOutMessage(); if (out != null) { getBackChannelConduit(message); if (binding != null) { out.put(MessageInfo.class, binding.getOperationInfo().getOutput()); out.put(BindingMessageInfo.class, binding.getOutput()); } InterceptorChain outChain = out.getInterceptorChain(); if (outChain == null) { //对于输出消息来说,输出拦截器链就是其输入,这里与收集输入拦截器类似,这只过这里是 //获取Bus、Service、Endpoint、协议绑定、数据绑定中的输出拦截器 outChain = OutgoingChainInterceptor.getChain(ex, chainCache); out.setInterceptorChain(outChain); } //负责收集输出拦截器并创建出输出拦截器链,然后调用其doIntercept方法 outChain.doIntercept(out); } }
在输出拦截器链中有几个关于向客端输出的拦截器:
1. org.apache.cxf.interceptor.MessageSenderInterceptor
在其handleMessage方法中执行了getConduit(message).prepare(message);而在prepare方法中向Message中设置了OutputStream,并传入了HttpServletResponse对象
public void prepare(Message message) throws IOException { message.put(HTTP_RESPONSE, response); OutputStream os = message.getContent(OutputStream.class); if (os == null) { message.setContent(OutputStream.class, new WrappedOutputStream(message, response)); } }
Conduit实现类为org.apache.cxf.transport.http.AbstractHTTPDestination.BackChannelConduit
2. org.apache.cxf.interceptor.StaxOutInterceptor
创建XMLStreamWriter对象,并设置进Message中
3. org.apache.cxf.interceptor.BareOutInterceptor
获取服务调用的返回结果,使用XMLStreamWriter对象将包装好的返回结果以XML格式写入WrappedOutputStream中,而WrappedOutputStream是包装了HttpServletRespone.getOutputStream()对象
即,将结果返回给了客户端。
当输出拦截器链执行完成后返回到JettyHTTPDestination的serviceRequest()方法,刷新HttpServletRespone并标记请求处理完成。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。