知识点1: JPA、EnCache
PlayHandler主要就是接收客户端的请求处理
public class PlayHandler extends SimpleChannelUpstreamHandler {
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent messageEvent) { //客户端传来的信息
final Object msg = messageEvent.getMessage();
//如果是http请求
if (msg instanceof HttpRequest) {
final HttpRequest nettyRequest = (HttpRequest) msg;
//如果是websocket,就升级,这个以后再讨论
try {
//转换成Play的Request
final Request request = parseRequest(ctx, nettyRequest, messageEvent);
//创建Response
final Response response = new Response();
//current:就是 ThreadLocal<Response>();
Http.Response.current.set(response);
response.out = new ByteArrayOutputStream();
response.direct = null;
//将事件放入到list中
response.onWriteChunk(new Action<Object>() {
public void invoke(Object result) {
writeChunk(request, response, ctx, nettyRequest, result);
}
});
//处理插件eg:JPAPlugin,验证插件
boolean raw = Play.pluginCollection.rawInvocation(request, response);
//处理事件
Invoker.invoke(new NettyInvocation(request, response, ctx, nettyRequest, messageEvent));
}
}
//将http的请求转换成Play的Request
public Request parseRequest(ChannelHandlerContext ctx, HttpRequest nettyRequest, MessageEvent messageEvent) {
String uri = nettyRequest.getUri(); //url
String encoding = Play.defaultWebEncoding; //UTF-8
String remoteAddress = getRemoteIPAddress(messageEvent);
String method = nettyRequest.getMethod().getName();//GET\POST
InputStream body = null;
//获取buffer
ChannelBuffer b = nettyRequest.getContent();
//将读取到body中
if (b instanceof FileChannelBuffer) {
FileChannelBuffer buffer = (FileChannelBuffer) b;
// An error occurred
Integer max = Integer.valueOf(Play.configuration.getProperty("play.netty.maxContentLength", "-1"));
body = buffer.getInputStream();
if (!(max == -1 || body.available() < max)) {
body = new ByteArrayInputStream(new byte[0]);
}
} else {
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copy(new ChannelBufferInputStream(b), out);
byte[] n = out.toByteArray();
body = new ByteArrayInputStream(n);
}
//创建Request.
final Request request = Request.createRequest(
remoteAddress,
method,
path,
querystring,
contentType,
body,
uri,
host,
isLoopback,
port,
domain,
secure,
getHeaders(nettyRequest),
getCookies(nettyRequest));
}
}
处理reponse
public void copyResponse(ChannelHandlerContext ctx, Request request, Response response, HttpRequest nettyRequest) throws Exception {
//创建Netty 下的Reponse
HttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(response.status));
nettyResponse.setHeader(SERVER, signature);
if (response.contentType != null) {
nettyResponse.setHeader(CONTENT_TYPE, response.contentType + (response.contentType.startsWith("text/") && !response.contentType.contains("charset") ? "; charset=" + response.encoding : ""));
} else {
nettyResponse.setHeader(CONTENT_TYPE, "text/plain; charset=" + response.encoding);
}
设置一些头部信息
addToResponse(response, nettyResponse);
final Object obj = response.direct;
File file = null;
ChunkedInput stream = null;
InputStream is = null;
if (obj instanceof File) {
file = (File) obj;
} else if (obj instanceof InputStream) {
is = (InputStream) obj;
} else if (obj instanceof ChunkedInput) {
// Streaming we don't know the content length
stream = (ChunkedInput) obj;
}
final boolean keepAlive = HttpHeaders.isKeepAlive(message) && message.getProtocolVersion().equals(HttpVersion.HTTP_1_1)
if (file != null && file.isFile()) {
//等下再看
}else if (is != null) {
}else if (stream != null) {
}else {
writeResponse(ctx, response, nettyResponse, nettyRequest);
}
}
//平常的写入到response
protected static void writeResponse(ChannelHandlerContext ctx, Response response, HttpResponse nettyResponse, HttpRequest nettyRequest) {
byte[] content = null;
final boolean keepAlive = HttpHeaders.isKeepAlive(message) && message.getProtocolVersion().equals(HttpVersion.HTTP_1_1)
//获取内容
if (nettyRequest.getMethod().equals(HttpMethod.HEAD)) {
content = new byte[0];
} else {
content = response.out.toByteArray();
}
//将byte[]转换成Buffer,并设置到response
ChannelBuffer buf = ChannelBuffers.copiedBuffer(content);
nettyResponse.setContent(buf);
//设置长度
message.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(response.out.size()));
//写入
ChannelFuture f = ctx.getChannel().write(nettyResponse);
if (!keepAlive) {
// Close the connection when the whole content is written out.
f.addListener(ChannelFutureListener.CLOSE);
}
}
//设置一些头部信息
protected static void addToResponse(Response response, HttpResponse nettyResponse) {
Map<String, Http.Header> headers = response.headers;
for (Map.Entry<String, Http.Header> entry : headers.entrySet()) {
Http.Header hd = entry.getValue();
for (String value : hd.values) {
nettyResponse.setHeader(entry.getKey(), value);
}
}
nettyResponse.setHeader(DATE, Utils.getHttpDateFormatter().format(new Date()));
Map<String, Http.Cookie> cookies = response.cookies;
for (Http.Cookie cookie : cookies.values()) {
CookieEncoder encoder = new CookieEncoder(true);
Cookie c = new DefaultCookie(cookie.name, cookie.value);
c.setSecure(cookie.secure);
c.setPath(cookie.path);
if (cookie.domain != null) {
c.setDomain(cookie.domain);
}
if (cookie.maxAge != null) {
c.setMaxAge(cookie.maxAge);
}
c.setHttpOnly(cookie.httpOnly);
encoder.addCookie(c);
nettyResponse.addHeader(SET_COOKIE, encoder.encode());
}
if (!response.headers.containsKey(CACHE_CONTROL) && !response.headers.containsKey(EXPIRES) && !(response.direct instanceof File)) {
nettyResponse.setHeader(CACHE_CONTROL, "no-cache");
}
}
- 如何处理事件呢,上面是Invoker.invoke(new NettyInvocation(request, response, ctx, nettyRequest, messageEvent));
主要是放入到线程池中
Invoker {
public static ScheduledThreadPoolExecutor executor = null;
public static Future<?> invoke(final Invocation invocation) {
//时间监听器
Monitor monitor = MonitorFactory.getMonitor("Invoker queue size", "elmts.");
monitor.add(executor.getQueue().size());
invocation.waitInQueue = MonitorFactory.start("Waiting for execution");
//放进线程池中
return executor.submit(invocation);
}
}
主要的线程是用NettyInvocation
NettyInvocation implements Runnable {
public boolean init() {
Thread.currentThread().setContextClassLoader(Play.classloader);
Play.detectChanges();
if (!Play.started) {
if (Play.mode == Mode.PROD) {
throw new UnexpectedException("Application is not started");
}
Play.start();
}
InvocationContext.current.set(getInvocationContext());
}
//信息
public InvocationContext getInvocationContext() {
ActionInvoker.resolve(request, response);
return new InvocationContext(Http.invocationType,
request.invokedMethod.getAnnotations(),
request.invokedMethod.getDeclaringClass().getAnnotations());
}
public void run() {
//如果是第一次访问
if (init()) {
before();//Play.pluginCollection.beforeInvocation();插件
execute();
after();// Play.pluginCollection.afterInvocation();连接关闭
onSuccess();
} catch (Suspend e) {
suspend(e);
after();
}catch (Throwable e) {
Play.pluginCollection.onInvocationException(e);
}finally{
Play.pluginCollection.invocationFinally();
}
}
}
//成功之后的操作
public void onSuccess() throws Exception {
Play.pluginCollection.onInvocationSuccess();
//这个和之前的那个response.onWriteChunk有联系
if (response.chunked) {
closeChunked(request, response, ctx, nettyRequest);
} else {
copyResponse(ctx, request, response, nettyRequest);
}
if (Logger.isTraceEnabled()) {
Logger.trace("execute: end");
}
}
//执行方法
public void execute() throws Exception {
//如果链接断开了,直接关闭连接
if (!ctx.getChannel().isConnected()) {
ctx.getChannel().close();
return;
}
saveExceededSizeError(nettyRequest, request, response);
ActionInvoker.invoke(request, response);
}
真正的处理请求:
public static void invoke(Http.Request request, Http.Response response) {
try{
resolve(request, response);
//ValidationPlugin 验证参数等 Guard
Play.pluginCollection.beforeActionInvocation(actionMethod);
//处理Before注解
handleBefores(request);
//利用java反射调用,并根据返回结果
actionResult inferResult(invokeControllerMethod(actionMethod));
handleAfters(request);
monitor.stop();
if (actionResult != null) {
throw actionResult;
}
throw new NoResult();
}cacth(Result r) {
//处理结果
result.apply(request, response);
}
}
//处理请求结果
public static void inferResult(Object o) {
// Return type inference
if (o != null) {
if (o instanceof NoResult) {
return;
}
if (o instanceof Result) {
// Of course
throw (Result) o;
}
if (o instanceof InputStream) {
Controller.renderBinary((InputStream) o);
}
if (o instanceof File) {
Controller.renderBinary((File) o);
}
if (o instanceof Map) {
Controller.renderTemplate((Map<String, Object>) o);
}
if (o instanceof Object[]) {
Controller.render(o);
}
Controller.renderHtml(o);
}
}
public static void resolve(Http.Request request, Http.Response response) {
Http.Request.current.set(request);
Http.Response.current.set(response);
...
//栈
ControllersEnhancer.currentAction.set(new Stack<String>());
}
第一次访问:Play.detectChanges();
public static synchronized void detectChanges() {
if (mode == Mode.PROD) {
return;
}
try {
//重新加载class
if(!pluginCollection.detectClassesChange()) {
classloader.detectChanges();
}
//重新加载route
Router.detectChanges(ctxPath);
for(VirtualFile conf : confs) {
if (conf.lastModified() > startedAt) {
//重启
start();
return;
}
}
pluginCollection.detectChange();
if (!Play.started) {
throw new RuntimeException("Not started");
}
} catch (PlayException e) {
throw e;
} catch (Exception e) {
// We have to do a clean refresh
start();
}
}
重启步骤:Play.start()
public static synchronized void start() {
//已经重启过了,就
if (started) {
stop();
}
//在JVM关闭前关闭服务
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
Play.stop();
}
});
//重新加载插件
pluginCollection.reloadApplicationPlugins();
//读取application.conf
readConfiguration();
//清除模板
TemplateLoader.cleanCompiledCache();
//加载所有的类(包含)
Play.classloader.getAllClasses();
//初始化缓存
Cache.init();
//各个插件初始化
pluginCollection.onApplicationStart();
//插件
pluginCollection.afterApplicationStart();
}
关闭:
//关闭
public static synchronized void stop() {
if (started) {
Logger.trace("Stopping the play application");
//插件关闭eg: JPAPlugin.entityManagerFactory.close();WS;JobsPlugin..
pluginCollection.onApplicationStop();
started = false;
//缓存关闭Memcached/EhCache
Cache.stop();
Router.lastLoading = 0L;
}
}