
简单的来说,Java NIO 提供了一种异步非阻塞模型,使得网络请求都可以并发执行。

服务器端采用这种模型,响应速度将大大提高,Apache,Nginx 都是这种模型。


1. 使用范例

  1. public static void main(String[] args) throws Exception { 
  2. //初始化 
  3. NHttpClient httpClient = new NHttpClient(); 
  4. httpClient.init(); 
  5. //调用的url 
  6. String url = ""
  7. //调用的方法 
  8. httpClient.getUrl(url, new NHttpClientCallback() { 
  9. public void finished(String content) { 
  10. System.out.println("content=" + content.substring(01000)); 
  11. }); 
  12. //注意这里是立即返回,可以根据需要进行处理 

2. NHttpClient 的代码

  1.  /** 
  2.  * 专注互联网,分享创造价值 
  3.  * [email protected] 
  4.  */ 
  5.  package; 
  6.  import common.util.ValidateUtil; 
  7.  import; 
  8.  import; 
  9.  import; 
  10.  import; 
  11.  import; 
  12.  import; 
  13.  import java.util.HashMap; 
  14.  import java.util.Iterator; 
  15.  import java.util.Map; 
  16.  import java.util.concurrent.CountDownLatch; 
  17.  import java.util.concurrent.locks.Condition; 
  18.  import java.util.concurrent.locks.Lock; 
  19.  import java.util.concurrent.locks.ReentrantLock; 
  20.  import; 
  21.  import; 
  22.  import org.apache.commons.logging.Log; 
  23.  import org.apache.commons.logging.LogFactory; 
  24.  import org.apache.http.Header; 
  25.  import org.apache.http.HttpEntity; 
  26.  import org.apache.http.HttpException; 
  27.  import org.apache.http.HttpRequest; 
  28.  import org.apache.http.HttpResponse; 
  29.  import org.apache.http.impl.DefaultConnectionReuseStrategy; 
  30.  import org.apache.http.impl.nio.DefaultClientIOEventDispatch; 
  31.  import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; 
  32.  import org.apache.http.message.BasicHttpRequest; 
  33.  import org.apache.http.nio.NHttpConnection; 
  34.  import org.apache.http.nio.protocol.BufferingHttpClientHandler; 
  35.  import org.apache.http.nio.protocol.EventListener; 
  36.  import org.apache.http.nio.protocol.HttpRequestExecutionHandler; 
  37.  import org.apache.http.nio.reactor.IOEventDispatch; 
  38.  import org.apache.http.nio.reactor.IOReactorExceptionHandler; 
  39.  import org.apache.http.nio.reactor.SessionRequest; 
  40.  import org.apache.http.nio.reactor.SessionRequestCallback; 
  41.  import org.apache.http.params.BasicHttpParams; 
  42.  import org.apache.http.params.CoreConnectionPNames; 
  43.  import org.apache.http.params.HttpParams; 
  44.  import org.apache.http.protocol.BasicHttpProcessor; 
  45.  import org.apache.http.protocol.HttpContext; 
  46.  import org.apache.http.protocol.RequestConnControl; 
  47.  import org.apache.http.protocol.RequestContent; 
  48.  import org.apache.http.protocol.RequestExpectContinue; 
  49.  import org.apache.http.protocol.RequestTargetHost; 
  50.  import org.apache.http.protocol.RequestUserAgent; 
  51.  import org.apache.http.util.EntityUtils; 
  52.  /** 
  53.  * 
  54.  * 作用: 支持异步读取的httpClient 
  55.  * 暂时不支持socks代理 
  56.  */ 
  57.  public class NHttpClient { 
  58.  private final static Log log = LogFactory.getLog(NHttpClient.class); 
  59.  private int timeOut = 10000// 10秒 
  60.  private String localAddress = null
  61.  private SocketAddress localSocketAddress = null//本地端口 
  62.  private boolean useProxy = false
  63.  private int maxConnection = 2
  64.  private Map defaultHeaders = new HashMap(); 
  65.  private DefaultConnectingIOReactor ioReactor; 
  66.  private String host; 
  67.  private String proxyServerType = "http"
  68.  private String directHost = ",localhost"
  69.  private String proxyServer; 
  70.  private int proxyPort; 
  71.  private String proxyUser; 
  72.  private String proxyPassword; 
  73.  private int connections = 0
  74.  private Lock lock = new ReentrantLock(); 
  75.  private final Condition full = lock.newCondition(); 
  76.  public void addConnection() throws Exception { 
  77.  lock.lock(); 
  78.  try { 
  79.  if (connections > maxConnection) { 
  80.  full.await(); 
  81.  } 
  82.  connections++; 
  83.  } finally { 
  84.  lock.unlock(); 
  85.  } 
  86.  } 
  87.  public void removeConnection() { 
  88.  lock.lock(); 
  89.  try { 
  90.  if (connections <= maxConnection) { 
  91.  full.signal(); 
  92.  } 
  93.  connections--; 
  94.  } finally { 
  95.  lock.unlock(); 
  96.  } 
  97.  } 
  98.  public boolean isRunning() { 
  99.  return connections > 0
  100.  } 
  101.  public int getConnections() { 
  102.  return connections; 
  103.  } 
  104.  public Map getDefaultHeaders() { 
  105.  return defaultHeaders; 
  106.  } 
  107.  public void setDefaultHeaders(Map defaultHeaders) { 
  108.  this.defaultHeaders = defaultHeaders; 
  109.  } 
  110.  public String getDirectHost() { 
  111.  return directHost; 
  112.  } 
  113.  public void setDirectHost(String directHost) { 
  114.  this.directHost = directHost; 
  115.  } 
  116.  public String getHost() { 
  117.  return host; 
  118.  } 
  119.  public void setHost(String host) { 
  120. = host; 
  121.  } 
  122.  public String getLocalAddress() { 
  123.  return localAddress; 
  124.  } 
  125.  public void setLocalAddress(String localAddress) { 
  126.  this.localAddress = localAddress; 
  127.  } 
  128.  public SocketAddress getLocalSocketAddress() { 
  129.  return localSocketAddress; 
  130.  } 
  131.  public void setLocalSocketAddress(SocketAddress localSocketAddress) { 
  132.  this.localSocketAddress = localSocketAddress; 
  133.  } 
  134.  public int getMaxConnection() { 
  135.  return maxConnection; 
  136.  } 
  137.  public void setMaxConnection(int maxConnection) { 
  138.  this.maxConnection = maxConnection; 
  139.  } 
  140.  public String getProxyPassword() { 
  141.  return proxyPassword; 
  142.  } 
  143.  public void setProxyPassword(String proxyPassword) { 
  144.  this.proxyPassword = proxyPassword; 
  145.  } 
  146.  public int getProxyPort() { 
  147.  return proxyPort; 
  148.  } 
  149.  public void setProxyPort(int proxyPort) { 
  150.  this.proxyPort = proxyPort; 
  151.  } 
  152.  public String getProxyServer() { 
  153.  return proxyServer; 
  154.  } 
  155.  public void setProxyServer(String proxyServer) { 
  156.  this.proxyServer = proxyServer; 
  157.  } 
  158.  public String getProxyServerType() { 
  159.  return proxyServerType; 
  160.  } 
  161.  public void setProxyServerType(String proxyServerType) { 
  162.  this.proxyServerType = proxyServerType; 
  163.  } 
  164.  public String getProxyUser() { 
  165.  return proxyUser; 
  166.  } 
  167.  public void setProxyUser(String proxyUser) { 
  168.  this.proxyUser = proxyUser; 
  169.  } 
  170.  public int getTimeOut() { 
  171.  return timeOut; 
  172.  } 
  173.  public void setTimeOut(int timeOut) { 
  174.  this.timeOut = timeOut; 
  175.  } 
  176.  public boolean isUseProxy() { 
  177.  return useProxy; 
  178.  } 
  179.  public void setUseProxy(boolean useProxy) { 
  180.  this.useProxy = useProxy; 
  181.  } 
  182.  public void init() throws Exception { 
  183.  HttpParams params = new BasicHttpParams(); 
  184.  params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000). 
  185.  setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, timeOut). 
  186.  setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 512 * 1024). 
  187.  setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, true); 
  188.  // setBooleanParameter(CoreConnectionPNames., true); 
  189.  if (!ValidateUtil.isNull(localAddress)) { 
  190.  localSocketAddress = InetSocketAddress.createUnresolved(localAddress, 0); 
  191.  } 
  192.  defaultHeaders.put("User-Agent""Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5 GTB5"); 
  193.  defaultHeaders.put("Accept-Language""zh-cn,zh;q=0.5"); 
  194.  defaultHeaders.put("Accept-Charset""GB2312,utf-8;q=0.7,*;q=0.7"); 
  195.  defaultHeaders.put("Accept""*/*"); 
  196.  /** 
  197.  * 设置几个固定的http 头 
  198.  */ 
  199.  // defaultHeaders.put("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5 GTB5"); 
  200.  // defaultHeaders.put("Accept-Language", "zh-cn,zh;q=0.5"); 
  201.  // defaultHeaders.put("Accept-Charset", "GB2312,utf-8;q=0.7,*;q=0.7"); 
  202.  // defaultHeaders.put("Accept", "*/*"); 
  203.  ioReactor = new DefaultConnectingIOReactor(2, params); 
  204.  BasicHttpProcessor httpproc = new BasicHttpProcessor(); 
  205.  httpproc.addInterceptor(new RequestContent()); 
  206.  httpproc.addInterceptor(new RequestTargetHost()); 
  207.  httpproc.addInterceptor(new RequestConnControl()); 
  208.  httpproc.addInterceptor(new RequestUserAgent()); 
  209.  httpproc.addInterceptor(new RequestExpectContinue()); 
  210.  BufferingHttpClientHandler handler = new BufferingHttpClientHandler( 
  211.  httpproc, 
  212.  new MyHttpRequestExecutionHandler(), 
  213.  new DefaultConnectionReuseStrategy(), 
  214.  params); 
  215.  handler.setEventListener(new EventLogger()); 
  216.  final IOEventDispatch ioEventDispatch = new DefaultClientIOEventDispatch(handler, params); 
  217.  ioReactor.setExceptionHandler(new IOReactorExceptionHandler() { 
  218.  public boolean handle(IOException e) { 
  219.  e.printStackTrace(); 
  220.  log.error("IOException=" + e.getMessage()); 
  221.  return true
  222.  } 
  223.  public boolean handle(RuntimeException e) { 
  224.  e.printStackTrace(); 
  225.  log.error("RuntimeException=" + e.getMessage()); 
  226.  return true
  227.  } 
  228.  }); 
  229.  Thread t = new Thread(new Runnable() { 
  230.  public void run() { 
  231.  try { 
  232.  ioReactor.execute(ioEventDispatch); 
  233.  } catch (InterruptedIOException ex) { 
  234.  log.error("Interrupted." + ex.getMessage()); 
  235.  } catch (Exception e) { 
  236.  log.error("I/O error: " + e.getMessage()); 
  237.  } 
  238.  log.debug("shutdown"); 
  239.  } 
  240.  }); 
  241.  t.start(); 
  242.  } 
  243.  public void destroy() throws Exception { 
  244.  if (ioReactor != null) { 
  245.  ioReactor.shutdown(); 
  246.  } 
  247.  } 
  248.  //减少dns查询 
  249.  private Map dns = new HashMap(); 
  250.  public void getUrl(String url, NHttpClientCallback callback) throws Exception { 
  251.  addConnection(); 
  252.  if (!url.startsWith("http://")) { 
  253.  url += "http://" + host; 
  254.  } 
  255.  URL u = new URL(url); 
  256.  int port = u.getPort() < 0 ? u.getDefaultPort() : u.getPort(); 
  257.  String path = u.getPath(); 
  258.  if (ValidateUtil.isNull(path)) { 
  259.  path = "/"
  260.  } 
  261.  if (u.getQuery() != null) { 
  262.  path += "?" + u.getQuery(); 
  263.  } 
  264.  if (dns.get(u.getHost()) == null) { 
  265.  InetAddress address = InetAddress.getByName(u.getHost()); 
  266.  dns.put(u.getHost(), address); 
  267.  } 
  268.  InetAddress address = dns.get(u.getHost()); 
  269.  SessionRequest sessionRequest = null
  270.  InternalObject object = new InternalObject(path, callback); 
  271.  object.setUrl(url); 
  272.  if (!useProxy) { 
  273.  sessionRequest = ioReactor.connect( 
  274.  new InetSocketAddress(address, port), 
  275.  localSocketAddress, //localhost 
  276.  object,//attachment 
  277.  new MySessionRequestCallback()); 
  278.  } else { 
  279.  //TODO 
  280.  SocketAddress addr = new InetSocketAddress(proxyServer, proxyPort); 
  281.  sessionRequest = ioReactor.connect( 
  282.  addr, 
  283.  localSocketAddress, //localhost 
  284.  object,//attachment 
  285.  new MySessionRequestCallback()); 
  286.  } 
  287.  /* * */ 
  288.  sessionRequest.waitFor(); 
  289.  if (sessionRequest.getException() != null) { 
  290.  throw sessionRequest.getException(); 
  291.  } 
  292.  } 
  293.  private class InternalObject { 
  294.  private NHttpClientCallback callback; 
  295.  private String uri; 
  296.  private String url; 
  297.  public InternalObject(String uri, NHttpClientCallback callback) { 
  298.  this.uri = uri; 
  299. this.callback = callback; 
  300.  } 
  301.  public NHttpClientCallback getCallback() { 
  302.  return callback; 
  303.  } 
  304.  public void setCallback(NHttpClientCallback callback) { 
  305.  this.callback = callback; 
  306.  } 
  307.  public String getUri() { 
  308.  return uri; 
  309.  } 
  310.  public void setUri(String uri) { 
  311.  this.uri = uri; 
  312.  } 
  313.  public String getUrl() { 
  314.  return url; 
  315.  } 
  316.  public void setUrl(String url) { 
  317.  this.url = url; 
  318.  } 
  319.  } 
  320.  private class MySessionRequestCallback implements SessionRequestCallback { 
  321.  public MySessionRequestCallback() { 
  322.  super(); 
  323.  } 
  324.  public void cancelled(final SessionRequest request) { 
  325.  log.debug("Connect request cancelled: " + request.getRemoteAddress()); 
  326.  } 
  327.  public void completed(final SessionRequest request) { 
  328.  log.debug("Connect request completed: " + request.getRemoteAddress()); 
  329.  } 
  330.  public void failed(final SessionRequest request) { 
  331.  log.debug("Connect request failed: " + request.getRemoteAddress()); 
  332.  } 
  333.  public void timeout(final SessionRequest request) { 
  334.  log.debug("Connect request timed out: " + request.getRemoteAddress()); 
  335.  } 
  336.  } 
  337.  private class EventLogger implements EventListener { 
  338.  public void connectionOpen(final NHttpConnection conn) { 
  339.  log.debug("Connection open: " + conn); 
  340.  } 
  341.  public void connectionTimeout(final NHttpConnection conn) { 
  342.  log.debug("Connection timed out: " + conn); 
  343.  } 
  344.  public void connectionClosed(final NHttpConnection conn) { 
  345.  log.debug("Connection closed: " + conn); 
  346.  } 
  347.  public void fatalIOException(final IOException ex, final NHttpConnection conn) { 
  348.  log.error("I/O error: " + ex.getMessage()); 
  349.  } 
  350.  public void fatalProtocolException(final HttpException ex, final NHttpConnection conn) { 
  351.  log.error("HTTP error: " + ex.getMessage()); 
  352.  } 
  353.  } 
  354.  private class MyHttpRequestExecutionHandler implements HttpRequestExecutionHandler { 
  355.  private final static String REQUEST_SENT = "request-sent"
  356.  private final static String RESPONSE_RECEIVED = "response-received"
  357.  public MyHttpRequestExecutionHandler() { 
  358.  super(); 
  359.  } 
  360.  public void initalizeContext(final HttpContext context, final Object attachment) { 
  361.  InternalObject internalObject = (InternalObject) attachment; 
  362.  context.setAttribute("internalObject", internalObject); 
  363.  } 
  364.  public void finalizeContext(final HttpContext context) { 
  365.  Object flag = context.getAttribute(RESPONSE_RECEIVED); 
  366.  if (flag == null) { 
  367.  // Signal completion of the request execution 
  368.  } 
  369.  } 
  370. public HttpRequest submitRequest(final HttpContext context) { 
  371. InternalObject internalObject = (InternalObject) context.getAttribute("internalObject"); 
  372.  Object flag = context.getAttribute(REQUEST_SENT); 
  373.  if (flag == null) { 
  374.  try { 
  375.  // Stick some object into the context 
  376.  context.setAttribute(REQUEST_SENT, Boolean.TRUE); 
  377.  log.debug("Sending request to " + internalObject.getUrl()); 
  378.  System.out.println("Sending request to " + internalObject.getUrl()); 
  379.  BasicHttpRequest httpRequest = new BasicHttpRequest("GET", internalObject.getUri()); 
  380.  //FIXMED me 
  381.  // httpRequest.addHeader("Accept-Encoding", "gzip,deflate"); 
  382.  Iterator iteratorDefault = defaultHeaders.keySet().iterator(); 
  383.  while (iteratorDefault.hasNext()) { 
  384.  String key =; 
  385.  httpRequest.setHeader(key, defaultHeaders.get(key)); 
  386.  log.debug(key + "=" + defaultHeaders.get(key)); 
  387.  } 
  388.  return httpRequest; 
  389.  } catch (Exception e) { 
  390.  e.printStackTrace(); 
  391.  } 
  392.  return null
  393.  } else { 
  394.  // No new request to submit 
  395.  return null
  396.  } 
  397.  } 
  398.  public void handleResponse(final HttpResponse response, final HttpContext context) { 
  399.  InternalObject internalObject = (InternalObject) context.getAttribute("internalObject"); 
  400.  HttpEntity entity = response.getEntity(); 
  401.  String content = ""
  402.  try { 
  403.  if (response.getStatusLine().getStatusCode() != 200) { 
  404.  throw new IOException("invalid response code=" + response.getStatusLine().getStatusCode() + ",url=" + internalObject.getUrl()); 
  405.  } 
  406.  log.debug(response.getStatusLine()); 
  407.  Header[] headers = response.getAllHeaders(); 
  408.  for (Header header : headers) { 
  409.  log.debug(header.getName() + "=" + header.getValue()); 
  410.  } 
  411.  if (entity.getContentEncoding() != null && "gzip".equals(entity.getContentEncoding().getValue())) { 
  412.  //是压缩的流 
  413.  GZIPInputStream inStream = new GZIPInputStream(entity.getContent()); 
  414.  content = IOUtils.toString(inStream); 
  415.  } else { 
  416.  content = IOUtils.toString(entity.getContent(), "GBK"); 
  417.  // content = EntityUtils.toString(entity, "GBK"); 
  418.  } 
  419.  System.out.println("-----------------------"); 
  420.  System.out.println("response " + response.getStatusLine() + " of url=" + internalObject.getUrl() + ",content=" + content.length()); 
  421.  System.out.println("content=" + content.indexOf("page-info")); 
  422.  System.out.println("-----------------------"); 
  423.  //System.out.println("content="+content); 
  424.  internalObject.getCallback().finished(content); 
  425.  log.debug("Document length: " + content.length()); 
  426.  } catch (Exception e) { 
  427.  e.printStackTrace(); 
  428.  log.error("I/O error: " + e.getMessage()); 
  429.  } finally { 
  430.  removeConnection(); 
  431.  } 
  432.  context.setAttribute(RESPONSE_RECEIVED, Boolean.TRUE); 
  433.  } 
  434.  } 
  435.  /** 
  436.  * 
  437.  * 作用: 
  438.  */ 
  439.  public interface NHttpClientCallback { 
  440.  public void finished(String content); 
  441.  } 
  442.  } 

3. 说明

