java http连接池的实现(带有失败重试等高级功能)

java 本身提供的java.net.HttpURLConnection不支持连接池功能。如果不想从头实现的话,最好的方式便是引用第三方依赖包,目前是有一个特别不错的,org.apache.httpcomponents:httpclient依赖,引入方式如下。

<dependency>
	<groupId>org.apache.httpcomponents</groupId>
	<artifactId>httpclient</artifactId>
	<version>4.5.13</version>
</dependency>

使用httpclient依赖

在开始使用连接池之前,要学会如何使用httpclient去完成http请求,其请求方式与java的原生http请求完全不同。
其中CloseableHttpClient对象便是我们的http请求连接池,其实声明方式会在下面介绍。

// 引用的包
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.*;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DoHttp {
    
    
	private static final Logger LOG = LoggerFactory.getLogger(DoHttp.class);
	// httpGet请求
	public static String get(CloseableHttpClient httpClient, String url) {
    
    
	    HttpGet httpGet = new HttpGet(url);
	    return doRequest(httpClient, url, httpGet);
	}
	// httpPost请求 (json格式)
	public static String jsonPost(CloseableHttpClient httpClient, String url, String json) {
    
    
	    HttpPost httpPost = new HttpPost(url);
	    httpPost.setHeader("Content-Type", "application/json");
	    StringEntity entity = new StringEntity(json, "UTF-8");
	    httpPost.setEntity(entity);
	    return doRequest(httpClient, url, httpPost);
	}
	// 统一的请求处理逻辑
	private static String doRequest(CloseableHttpClient httpClient, String url, HttpRequestBase httpRequest) {
    
    
	    try (CloseableHttpResponse response = httpClient.execute(httpRequest)) {
    
    
	        int code = response.getStatusLine().getStatusCode();
	        HttpEntity responseEntity = response.getEntity();
	        String responseBody = null;
	        if (responseEntity != null) {
    
    
	            responseBody = EntityUtils.toString(responseEntity);
	        }
	        if (code != 200) {
    
    
	            LOG.error("http post error, url: {}, code: {}, result: {}", url, code, responseBody);
	            return null;
	        }
	        return responseBody;
	    } catch (Exception e) {
    
    
	        LOG.error("http post error, url: {}", url, e);
	    }
	    return null;
	}
}

连接池的实现

连接池的配置类如下:

public class HttpPoolConfig {
    
    
    /** http连接池大小 */
    public int httpPoolSize;
    /** http连接超时时间 */
    public int httpConnectTimeout;
    /** http连接池等待超时时间 */
    public int httpWaitTimeout;
    /** http响应包间隔超时时间 */
    public int httpSocketTimeout;
    /** http重试次数 */
    public int httpRetryCount;
    /** http重试间隔时间 */
    public int httpRetryInterval;
    /** http监控间隔时间 定时清理 打印连接池状态 */
    public int httpMonitorInterval;
    /** http关闭空闲连接的等待时间 */
    public int httpCloseIdleConnectionWaitTime;
}

连接池实现类

import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.pool.PoolStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLException;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * http连接池
 */
public class HttpPool {
    
    

    private static final Logger LOG = LoggerFactory.getLogger(HttpPool.class);

    /**
     * 初始化连接池
     * @param httpPoolConfig 配置信息
     */
    public HttpPool(HttpPoolConfig httpPoolConfig) {
    
    
        PoolingHttpClientConnectionManager manager = buildHttpManger(httpPoolConfig);
        httpClient = buildHttpClient(httpPoolConfig, manager);
        monitorExecutor = buildMonitorExecutor(httpPoolConfig, manager);
    }

    private final CloseableHttpClient httpClient;
    private final ScheduledExecutorService monitorExecutor;

    /**
     * 连接池管理器
     */
    private PoolingHttpClientConnectionManager buildHttpManger(HttpPoolConfig httpPoolConfig) {
    
    
        LayeredConnectionSocketFactory sslSocketFactory = SSLConnectionSocketFactory.getSocketFactory();
        Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
                .register("https", sslSocketFactory).build();
        PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(registry);
        manager.setMaxTotal(httpPoolConfig.httpPoolSize);
        manager.setDefaultMaxPerRoute(httpPoolConfig.httpPoolSize);
        return manager;
    }

    /**
     * 建立httpClient
     */
    private CloseableHttpClient buildHttpClient(HttpPoolConfig httpPoolConfig, PoolingHttpClientConnectionManager manager) {
    
    
        // 请求配置
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(httpPoolConfig.httpConnectTimeout)
                .setSocketTimeout(httpPoolConfig.httpSocketTimeout)
                .setConnectionRequestTimeout(httpPoolConfig.httpWaitTimeout)
                .build();
        // 失败重试机制
        HttpRequestRetryHandler retryHandler = (e, c, context) -> {
    
    
            if (c > httpPoolConfig.httpRetryCount) {
    
    
                LOG.error("HttpPool request retry more than {} times", httpPoolConfig.httpRetryCount, e);
                return false;
            }
            if (e == null) {
    
    
                LOG.info("HttpPool request exception is null.");
                return false;
            }
            if (e instanceof NoHttpResponseException) {
    
    
                //服务器没有响应,可能是服务器断开了连接,应该重试
                LOG.error("HttpPool receive no response from server, retry");
                return true;
            }
            // SSL握手异常
            if (e instanceof InterruptedIOException // 超时
                    || e instanceof UnknownHostException // 未知主机
                    || e instanceof SSLException) {
    
     // SSL异常
                LOG.error("HttpPool request error, retry", e);
                return true;
            } else {
    
    
                LOG.error("HttpPool request unknown error, retry", e);
            }
            // 对于关闭连接的异常不进行重试
            HttpClientContext clientContext = HttpClientContext.adapt(context);
            HttpRequest request = clientContext.getRequest();
            return !(request instanceof HttpEntityEnclosingRequest);
        };
        // 构建httpClient
        return HttpClients.custom().setDefaultRequestConfig(config)
                .setConnectionManager(manager).setRetryHandler(retryHandler).build();
    }

    /**
     * 建立连接池监视器
     */
    private ScheduledExecutorService buildMonitorExecutor(HttpPoolConfig httpPoolConfig,
                                                          PoolingHttpClientConnectionManager manager) {
    
    
        TimerTask timerTask = new TimerTask() {
    
    
            @Override
            public void run() {
    
    
                // 关闭过期连接
                manager.closeExpiredConnections();
                // 关闭空闲时间超过一定时间的连接
                manager.closeIdleConnections(httpPoolConfig.httpCloseIdleConnectionWaitTime, TimeUnit.MILLISECONDS);
                // 打印连接池状态
                PoolStats poolStats = manager.getTotalStats();
                // max:最大连接数, available:可用连接数, leased:已借出连接数, pending:挂起(表示当前等待从连接池中获取连接的线程数量)
                LOG.info("HttpPool status {}", poolStats);
            }
        };
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        int time = httpPoolConfig.httpMonitorInterval;
        executor.scheduleAtFixedRate(timerTask, time, time, TimeUnit.MILLISECONDS);
        return executor;
    }

    /**
     * 关闭连接池
     */
    public void close() {
    
    
        try {
    
    
            httpClient.close();
            monitorExecutor.shutdown();
        } catch (Exception e) {
    
    
            LOG.error("HttpPool close http client error", e);
        }
    }

    /**
     * 发起get请求
     */
    public String get(String url) {
    
     return DoHttp.get(httpClient, url); }

    /**
     * 发起json格式的post请求
     */
    public String jsonPost(String url, String json) {
    
     return DoHttp.jsonPost(httpClient, url, json); }
}

猜你喜欢

转载自blog.csdn.net/weixin_44927769/article/details/130951270