Java 中的 Comet
现在有很多 Web 服务器是用 Java 构建的。一个原因是 Java 有一个丰富的本地线程模型。因此实现典型的每个连接一个线程的模型便非常简单。该模型对于 Comet 不大适用,但是,Java 对此同样有解决的办法。为了有效地处理 Comet,需要非阻塞 IO,Java 通过它的 NIO 库提供非阻塞 IO。两种最流行的开源服务器 Apache Tomcat 和 Jetty 都利用 NIO 增加非阻塞 IO,从而支持 Comet。然而,这两种服务器中的实现却各不相同。我们来看看 Tomcat 和 Jetty 对 Comet 的支持。
Tomcat 和 Comet
对于 Apache Tomcat,要使用 Comet,主要需要做两件事。首先,需要对 Tomcat 的配置文件 server.xml 稍作修改。默认情况下启用的是更典型的同步 IO 连接器。现在只需将它切换成异步版本,如清单 1 所示。
清单 1. 修改 Tomcat 的 server.xml
<!-- This is the usual Connector, comment it out and add the NIO one -->
<!-- Connector URIEncoding="utf-8" connectionTimeout="20000" port="8084"
protocol="HTTP/1.1" redirectPort="8443"/ -->
<Connector connectionTimeout="20000" port="8080" protocol="org.apache.
coyote.http11.Http11NioProtocol" redirectPort="8443"/>
|
这使 Tomcat 可以处理更多的并发连接,但需要说明的是,其中大多数连接有很多时间都处于空闲状态。利用这一点的最容易的方式是创建一个实现 org.apache.catalina.CometProcessor 接口的 servlet。这显然是 Tomcat 特有的一个接口。清单 2 显示了一个这样的例子。
清单 2. Tomcat Comet servlet
public class TomcatWeatherServlet extends HttpServlet implements CometProcessor {
private MessageSender messageSender = null;
private static final Integer TIMEOUT = 60 * 1000;
@Override
public void destroy() {
messageSender.stop();
messageSender = null;
}
@Override
public void init() throws ServletException {
messageSender = new MessageSender();
Thread messageSenderThread =
new Thread(messageSender, "MessageSender[" + getServletContext()
.getContextPath() + "]");
messageSenderThread.setDaemon(true);
messageSenderThread.start();
}
public void event(final CometEvent event) throws IOException, ServletException {
HttpServletRequest request = event.getHttpServletRequest();
HttpServletResponse response = event.getHttpServletResponse();
if (event.getEventType() == CometEvent.EventType.BEGIN) {
request.setAttribute("org.apache.tomcat.comet.timeout", TIMEOUT);
log("Begin for session: " + request.getSession(true).getId());
messageSender.setConnection(response);
Weatherman weatherman = new Weatherman(95118, 32408);
new Thread(weatherman).start();
} else if (event.getEventType() == CometEvent.EventType.ERROR) {
log("Error for session: " + request.getSession(true).getId());
event.close();
} else if (event.getEventType() == CometEvent.EventType.END) {
log("End for session: " + request.getSession(true).getId());
event.close();
} else if (event.getEventType() == CometEvent.EventType.READ) {
throw new UnsupportedOperationException("This servlet does not accept
data");
}
}
}
|
CometProcessor 接口要求实现 event 方法。这是用于 Comet 交互的一个生命周期方法。Tomcat 将使用不同的 CometEvent 实例调用。通过检查 CometEvent 的 eventType ,可以判断正处在生命周期的哪个阶段。当请求第一次传入时,即发生 BEGIN 事件。READ 事件表明数据正在被发送,只有当请求为 POST 时才需要该事件。遇到 END 或 ERROR 事件时,请求终止。
在清单 2 的例子中,servlet 使用一个 MessageSender 类发送数据。这个类的实例是在 servlet 的 init 方法中在其自身的线程中创建,并在 servlet 的 destroy 方法中销毁的。清单 3 显示了 MessageSender 。
清单 3. MessageSender
private class MessageSender implements Runnable {
protected boolean running = true;
protected final ArrayList<String> messages = new ArrayList<String>();
private ServletResponse connection;
private synchronized void setConnection(ServletResponse connection){
this.connection = connection;
notify();
}
public void send(String message) {
synchronized (messages) {
messages.add(message);
log("Message added #messages=" + messages.size());
messages.notify();
}
}
public void run() {
while (running) {
if (messages.size() == 0) {
try {
synchronized (messages) {
messages.wait();
}
} catch (InterruptedException e) {
// Ignore
}
}
String[] pendingMessages = null;
synchronized (messages) {
pendingMessages = messages.toArray(new String[0]);
messages.clear();
}
try {
if (connection == null){
try{
synchronized(this){
wait();
}
} catch (InterruptedException e){
// Ignore
}
}
PrintWriter writer = connection.getWriter();
for (int j = 0; j < pendingMessages.length; j++) {
final String forecast = pendingMessages[j] + "<br>";
writer.println(forecast);
log("Writing:" + forecast);
}
writer.flush();
writer.close();
connection = null;
log("Closing connection");
} catch (IOException e) {
log("IOExeption sending message", e);
}
}
}
}
|
这个类基本上是样板代码,与 Comet 没有直接的关系。但是,有两点要注意。这个类含有一个 ServletResponse 对象。回头看看清单 2 中的 event 方法,当事件为 BEGIN 时,response 对象被传入到 MessageSender 中。在 MessageSender 的 run 方法中,它使用 ServletResponse 将数据发送回客户机。注意,一旦发送完所有排队等待的消息后,它将关闭连接。这样就实现了长轮询。如果要实现流风格的 Comet,那么需要使连接保持开启,但是仍然刷新数据。
回头看清单 2 可以发现,其中创建了一个 Weatherman 类。正是这个类使用 MessageSender 将数据发送回客户机。这个类使用 Yahoo RSS feed 获得不同地区的天气信息,并将该信息发送到客户机。这是一个特别设计的例子,用于模拟以异步方式发送数据的数据源。清单 4 显示了它的代码。
清单 4. Weatherman
private class Weatherman implements Runnable{
private final List<URL> zipCodes;
private final String YAHOO_WEATHER = "http://weather.yahooapis.com/forecastrss?p=";
public Weatherman(Integer... zips) {
zipCodes = new ArrayList<URL>(zips.length);
for (Integer zip : zips) {
try {
zipCodes.add(new URL(YAHOO_WEATHER + zip));
} catch (Exception e) {
// dont add it if it sucks
}
}
}
public void run() {
int i = 0;
while (i >= 0) {
int j = i % zipCodes.size();
SyndFeedInput input = new SyndFeedInput();
try {
SyndFeed feed = input.build(new InputStreamReader(zipCodes.get(j)
.openStream()));
SyndEntry entry = (SyndEntry) feed.getEntries().get(0);
messageSender.send(entryToHtml(entry));
Thread.sleep(30000L);
} catch (Exception e) {
// just eat it, eat it
}
i++;
}
}
private String entryToHtml(SyndEntry entry){
StringBuilder html = new StringBuilder("<h2>");
html.append(entry.getTitle());
html.append("</h2>");
html.append(entry.getDescription().getValue());
return html.toString();
}
}
|
这个类使用 Project Rome 库解析来自 Yahoo Weather 的 RSS feed。如果需要生成或使用 RSS 或 Atom feed,这是一个非常有用的库。此外,这个代码中只有一个地方值得注意,那就是它产生另一个线程,用于每过 30 秒钟发送一次天气数据。最后,我们再看一个地方:使用该 servlet 的客户机代码。在这种情况下,一个简单的 JSP 加上少量的 JavaScript 就足够了。清单 5 显示了该代码。
清单 5. 客户机 Comet 代码
<%@page contentType="text/html" pageEncoding="UTF-8"%>
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
"http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Comet Weather</title>
<SCRIPT TYPE="text/javascript">
function go(){
var url = "http://localhost:8484/WeatherServer/Weather"
var request = new XMLHttpRequest();
request.open("GET", url, true);
request.setRequestHeader("Content-Type","application/x-javascript;");
request.onreadystatechange = function() {
if (request.readyState == 4) {
if (request.status == 200){
if (request.responseText) {
document.getElementById("forecasts").innerHTML =
request.responseText;
}
}
go();
}
};
request.send(null);
}
</SCRIPT>
</head>
<body>
<h1>Rapid Fire Weather</h1>
<input type="button" onclick="go()" value="Go!"></input>
<div id="forecasts"></div>
</body>
</html>
|
该代码只是在用户单击 Go 按钮时开始长轮询。注意,它直接使用 XMLHttpRequest 对象,所以这在 Internet Explorer 6 中将不能工作。您可能需要使用一个 Ajax 库解决浏览器差异问题。除此之外,惟一需要注意的是回调函数,或者为请求的 onreadystatechange 函数创建的闭包。该函数粘贴来自服务器的新的数据,然后重新调用 go 函数。
现在,我们看过了一个简单的 Comet 应用程序在 Tomcat 上是什么样的。有两件与 Tomcat 密切相关的事情要做:一是配置它的连接器,二是在 servlet 中实现一个特定于 Tomcat 的接口。您可能想知道,将该代码 “移植” 到 Jetty 有多大难度。接下来我们就来看看这个问题。
Jetty 和 Comet
Jetty 服务器使用稍微不同的技术来支持 Comet 的可伸缩的实现。Jetty 支持被称作 continuations 的编程结构。其思想很简单。请求先被暂停,然后在将来的某个时间点再继续。规定时间到期,或者某种有意义的事件发生,都可能导致请求继续。当请求被暂停时,它的线程被释放。
可以使用 Jetty 的 org.mortbay.util.ajax.ContinuationSupport 类为任何 HttpServletRequest 创建 org.mortbay.util.ajax.Continuation 的一个实例。这种方法与 Comet 有很大的不同。但是,continuations 可用于实现逻辑上等效的 Comet。清单 6 显示清单 2 中的 weather servlet “移植” 到 Jetty 后的代码。
清单 6. Jetty Comet servlet
public class JettyWeatherServlet extends HttpServlet {
private MessageSender messageSender = null;
private static final Integer TIMEOUT = 5 * 1000;
public void begin(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
request.setAttribute("org.apache.tomcat.comet", Boolean.TRUE);
request.setAttribute("org.apache.tomcat.comet.timeout", TIMEOUT);
messageSender.setConnection(response);
Weatherman weatherman = new Weatherman(95118, 32408);
new Thread(weatherman).start();
}
public void end(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
synchronized (request) {
request.removeAttribute("org.apache.tomcat.comet");
Continuation continuation = ContinuationSupport.getContinuation
(request, request);
if (continuation.isPending()) {
continuation.resume();
}
}
}
public void error(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
end(request, response);
}
public boolean read(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
throw new UnsupportedOperationException();
}
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
synchronized (request) {
Continuation continuation = ContinuationSupport.getContinuation
(request, request);
if (!continuation.isPending()) {
begin(request, response);
}
Integer timeout = (Integer) request.getAttribute
("org.apache.tomcat.comet.timeout");
boolean resumed = continuation.suspend(timeout == null ? 10000 :
timeout.intValue());
if (!resumed) {
error(request, response);
}
}
}
public void setTimeout(HttpServletRequest request, HttpServletResponse response,
int timeout) throws IOException, ServletException,
UnsupportedOperationException {
request.setAttribute("org.apache.tomcat.comet.timeout", new Integer(timeout));
}
}
|
这里最需要注意的是,该结构与 Tomcat 版本的代码非常类似。begin 、read 、end 和 error 方法都与 Tomcat 中相同的事件匹配。该 servlet 的 service 方法被覆盖为在请求第一次进入时创建一个 continuation 并暂停该请求,直到超时时间已到,或者发生导致它重新开始的事件。上面没有显示 init 和 destroy 方法,因为它们与 Tomcat 版本是一样的。该 servlet 使用与 Tomcat 相同的 MessageSender 。因此不需要修改。注意 begin 方法如何创建 Weatherman 实例。对这个类的使用与 Tomcat 版本中也是完全相同的。甚至客户机代码也是一样的。只有 servlet 有更改。虽然 servlet 的变化比较大,但是与 Tomcat 中的事件模型仍是一一对应的。
希望这足以鼓舞人心。虽然完全相同的代码不能同时在 Tomcat 和 Jetty 中运行,但是它是非常相似的。当然,JavaEE 吸引人的一点是可移植性。大多数在 Tomcat 中运行的代码,无需修改就可以在 Jetty 中运行,反之亦然。因此,毫不奇怪,下一个版本的 Java Servlet 规范包括异步请求处理(即 Comet 背后的底层技术)的标准化。 我们来看看这个规范:Servlet 3.0 规范。
|