发布时间:2016-01-04 00:00 来源:陆晨
前天在扒Tomcat源码的时候在装配Servlet的时候我们除了看见了比较熟悉的loadOnStartup参数之外,另外一个不太熟悉的参数asyncSupported就是我们今天要讨论的主题,我们的关注点随即也从Servlet上下文转向了Tomcat对请求的处理与分发,也就是更底层一些的东西,待会会涉及Tomcat Endpoint相关的东西,很开心和大家一起分享。
我们先看下conf/server.xml里面的一端配置:
<Connector?port="8080"?protocol="HTTP/1.1" ???????????connectionTimeout="20000" ???????????redirectPort="8443"?/>
这个配置位于Service组件标签的里面,在Tomcat的容器架构图中Connector和Service是父子关系,我先画一张图:
解释下这张图,Connector是作为Service容器的组件,当Service被父容器启动的时候同事会启动Connector组件,Connector组件关联一个ProtocolHandler,Connector会启动这个ProtocolHandler,ProtocolHandler关联着一个Endpoint,ProtocolHandler同样也会启动这个Endpoint。Endpoint是干嘛的呢,Tomcat定义Endpoint作为网络层的组件,用于绑定及监听服务端的端口,将接收到的客户端的连接分发到工作线程去处理,Endpoint启动的时候做些什么事情以及包括哪些内容呢?Endpoint具体有多个实现,我拿最简单的JIoEndpoint来扒一扒,它启动的时候会做下面这些事情:
bind本地指定的端口,我们最熟悉的就是8080了。 初始化内部工作线程池。 启动Acceptor线程,Acceptor线程是用来接受客户端socket并包装交给工作线程处理了,Acceptor线程只负责接客,接完之后就包装成SocketProcessor丢给工作线程池去处理了。 启动Timeout线程,用来异步检查超时连接。好了,下面继续看看Tomcat对请求处理的逻辑。
我们在SocketProcessor的实现里面找到了一个代码片段:
if?(state?==?SocketState.CLOSED)?{ ????//?Close?socket ????if?(log.isTraceEnabled())?{ ????????log.trace("Closing?socket:"+socket); ????} ????countDownConnection(); ????try?{ ????????socket.getSocket().close(); ????}?catch?(IOException?e)?{ ????????//?Ignore ????} }?else?if?(state?==?SocketState.OPEN?|| ????????state?==?SocketState.UPGRADING?|| ????????state?==?SocketState.UPGRADING_TOMCAT??|| ????????state?==?SocketState.UPGRADED){ ????socket.setKeptAlive(true); ????socket.access(); ????launch?=?true; }?else?if?(state?==?SocketState.LONG)?{ ????socket.access(); ????waitingRequests.add(socket); }
上面可以看出,第一个if分支是当状态等于CLOSED的时候,这里会将连接数减1并且关闭服务器与客户端的socket连接,其他两个分支并没有断开连接。再看看SocketProcessor的实现中另一个代码片段:
if?((state?!=?SocketState.CLOSED))?{ ????if?(status?==?null)?{ ????????state?=?handler.process(socket,?SocketStatus.OPEN_READ); ????}?else?{ ????????state?=?handler.process(socket,status); ????} }
(下面我想用记流水账的形式描述逻辑代码的执行堆栈)上面的handler process是具体处理socket的分支,相关实现由AbstractProtocol下沉到AbstractHttp11Processor的asyncDispatch中,在asyncDispatch会调用adapter的asyncDispatch方法来处理,这个adapter的具体实现在Connector被启动的时候初始化的,具体是CoyoteAdapter类,在CoyoteAdapter的实现中会去调用StandardWrapperValve的invoke方法,再具体一点就会调用用户在WebXML中配置的过滤器链以及Servlet啦。
上面讲了那么一连串的源码堆栈逻辑,其实是想连贯Tomcat从接收到客户端请求与调用Servlet这条线。
简单来说,Tomcat对异步Servlet的处理逻辑即Tomcat接收客户端的请求之后,如果这个请求对应的Servlet是异步的,那么Tomcat会将请求委托给异步线程来处理,并会保持与客户端的连接,当请求处理完成之后再由委托线程来通知监听器异步处理已经完成,于此同时Tomcat的工作线程已经被Tomcat工作线程池回收。
下面我们就可以继续看看上层是如何写异步Servlet的了。
在这一节,我们主要看看如何从零开始实现一个异步的Servlet,为了不让篇幅过长,我尽量精简一下例子。
一、实现一个ServletContextListener来初始化我们自己的线程池,这个池子和Tomcat的工作线程池是完全独立的:
/** ?*?@author?float.lu ?*/ @WebListener public?class?AppContextListener?implements?ServletContextListener?{ ????private?static?final?String?EXECUTOR_KEY?=?AppContextListener.class.getName(); ????@Override ????public?void?contextInitialized(ServletContextEvent?servletContextEvent)?{ ????????ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(100,?200,?50000L, ????????????????TimeUnit.MILLISECONDS,?new?ArrayBlockingQueue<Runnable>(100)); ????????servletContextEvent.getServletContext().setAttribute(EXECUTOR_KEY, ????????????????executor); ????} ????@Override ????public?void?contextDestroyed(ServletContextEvent?servletContextEvent)?{ ????????ThreadPoolExecutor?executor?=?(ThreadPoolExecutor)?servletContextEvent ????????????????.getServletContext().getAttribute(EXECUTOR_KEY); ????????executor.shutdown(); ????} }
这里只做两件事情,第一、在Servlet容器初始化完成的时候初始化线程池,这个时候Servlet还没有被初始化,这是上篇文章的知识了。第二,在Servlet容器销毁的时候销毁线程池。
二、实现一个AsyncListener接口的类,这个接口是Servlet3 API提供的接口,用于监听工作线程的执行情况从而正确的响应异步处理结果,因为我的例子实现代码没有什么意义这里就不贴了,记住实现javax.servlet.AsyncListener这个接口就好。
三、自定义一个实现Runnable接口的类,我的实现是这样的:
/** ?*?@author?float.lu ?*/ public?class?AsyncRequestProcessor?implements?Runnable?{ ????private?AsyncContext?asyncContext; ????public?AsyncRequestProcessor(AsyncContext?asyncCtx)?{ ????????this.asyncContext?=?asyncCtx; ????} ????@Override ????public?void?run()?{ ????????try?{ ????????????PrintWriter?out?=?this.asyncContext.getResponse().getWriter(); ????????????out.write("Async?servlet?started?!/n"); ????????????out.flush(); ????????}?catch?(Exception?e)?{ ????????} ????????asyncContext.complete(); ????} }
主要是通过构造方法拿到了异步上下文AsyncContext对应于ServletContext。然后线程实现里面可以拿到请求进行响应的处理。
四,最后一个是异步Servlet的实现:
/** ?*?@author?float.lu ?*/ @WebServlet(value?=?"/asyncservlet",?asyncSupported?=?true) public?class?AsyncServlet?extends?HttpServlet?{ ????@Override ????protected?void?doGet(HttpServletRequest?req,?HttpServletResponse?resp)?throws?ServletException,?IOException?{ ????????AsyncContext?asyncContext?=?req.startAsync(); ????????asyncContext.addListener(new?AppAsyncListener()); ????????asyncContext.setTimeout(2000); ????????ThreadPoolExecutor?executor?=?(ThreadPoolExecutor)?req ????????????????.getServletContext().getAttribute("executor"); ????????executor.execute(new?AsyncRequestProcessor(asyncContext)); ????} }
这里面需要注意的有几点:
将@WebServlet注解的asyncSupported的值设置为true,代表这个Servlet是异步Servlet。 通过req.startAsync获取异步上下文。 设置上文中自定义的Listener。 设置超时时间。 以异步上下文为参数构造线程丢进工作线程池中。到此,我们自己的异步Servlet实现就结束了,其实这只是其中一种实现方式,具体可以根据实际情况巧妙设计。举个例子,如果使用单线程模型的话我们可以维护着一个队列来保存异步上下文,一个工作线程不断的从队列中拿到异步上下文进行处理,完了之后调用AsyncContext定义的complete接口告知监听器处理完成即可。第一种模型其实只是将原来可能附加给Tomcat工作线程池的任务拿到自定义的线程池处理而已,而第二种模型是只用一个工作线程去利用队列来处理异步任务。具体应用要看实际情况来定。
现在知道了Tomcat对异步Servlet的支持,有知道了如何实现异步Servlet,那么问题来了,异步Servlet适合什么样的场景呢?
我们分析下并设想一下,当然下面可能是我自己在YY,不正确的欢迎指出,也欢迎读者能够举一些其他的应用场景。首先问题肯定出现在当请求处理时间可能很长的时候,这让我想到了报表导出功能。报表导出其实是一个非常常见的功能,我们需要通过查询数据库,对数据进行处理,然后根据处理完的数据生成Excel并导出。这个过程时间一般都是相对比较长的,通常会引发数据库连接数不够这种问题,当然这是另外一个话题了,数据层相关问题我可能会通过为报表导出任务建立单独的数据源来处理,或者是其他方法。而我们现在讨论的是比较上层的请求占用问题,这个时候我们可以使用异步Servlet来处理这个耗时比较长的任务,从而不会长时间占用Tomcat宝贵的工作线程,因为Tomcat工作线程被占用完的后果将是不接受任何请求。
无论场景如何,结果是我们可以用自己的线程代理工作线程来处理请求了,当然用单线程还是用多线程模型这个也要看实际情况,如果你能拿出实验数据来证明具体的应用场景下哪种模型更好,这是再好不过的了,
上面的例子都是直接使用Servlet来实现的,实际应用中这种方式可能很少有人用了,不过没关系。Spring MVC从3.2版本就支持异步Servlet了,可能上层的表现形式不一样也就是具体码的姿势不一样,但是都知道原理了,可以直接Hack起。Struts貌似还不支持???另外提一下,对于异步Servlet,其实tomcat支持的comet Servlet就是一种异步Servlet。comet的原理是请求到达Servlet之后客户端就和服务器保持着长连接,这样服务端可以随时将内容推送到客户端。
本文相关代码基于tomcat7.0.56和servlet3.1.0版本,由作者原创,欢迎补充或纠正。