【Elasticsearch】Java Client连接池代码实现

用过Elasticsearch API的都知道,在Java端使用是ES服务需要创建Java Client,但是每一次连接都实例化一个client,对系统的消耗很大,而且最令人头疼的是它的连接非常慢。所以为了解决上述问题并提高client利用率,用池化技术复用client,第一次用去创建client,后面使用就直接去池子里面拿就可以。但是第一次的连接还是很慢,在这里就不做过多阐述。

第一步,创建一个PoolableUserFactory,需要继承BasePooledObjectFactory:

package com.yuguo.es.poolutils;

@Slf4j
public class PoolableUserFactory extends BasePooledObjectFactory<EsClient>{

	private EsEnv esEnv = null;
	
	public PoolableUserFactory(EsEnv esEnv) {
		super();
		this.esEnv = esEnv;
	}
	
	@Override
	public void destroyObject(PooledObject<EsClient> poolObject) throws Exception {
		EsClient esClient=poolObject.getObject();
		esClient.close();
		log.info("destroyObject EsClient! " + esClient);
		super.destroyObject(poolObject);
	}
	
	@Override
	public void passivateObject(PooledObject<EsClient> p) throws Exception {
		// TODO Auto-generated method stub
		super.passivateObject(p);
	}
	
	@Override
	public EsClient create() throws Exception {
		EsClient esClient = new EsClient(esEnv);
		log.info("create EsClient! " + esClient);
		System.out.println("create EsClient! " + esClient);
		return esClient;
	}
	
	@Override
	public PooledObject<EsClient> wrap(EsClient esClient) {
		return new DefaultPooledObject<EsClient>(esClient);
	}
}

第二步,创建获取ES Client实例对象:

package com.yuguo.es.poolutils;

/**
 * 获取ES Client单例
 * 
 * @ClassName: ClusterClient
 */
@Slf4j
public  class EsClient {
	
	private EsEnv esEnv;
	private Client client;
	
	protected EsClient(String clusterName, int numberOfShards, int numberOfReplicas) {
		Settings settings = Settings.builder().put("cluster.name", esEnv.getClusterName()).build();
		client = new PreBuiltTransportClient(settings);
	}
	
	protected EsClient addTransport(String host, int port) {
		((TransportClient) client).addTransportAddress(new TransportAddress(new InetSocketAddress(host, port)));
		return this;
	}
	
	private void buildClient(){
		Builder builder = Settings.builder();
		builder.put("cluster.name",  esEnv.getClusterName());
		
		String[] arrIp = esEnv.getIp().split(",");
		String[] arrPort = esEnv.getPort().split(",");
		
		TransportAddress[] addressArr = new TransportAddress[arrPort.length];
		for (int i = 0 ,size = arrIp.length; i < size; i++) {
			String objIp = arrIp[i];
			int port = 9300;
			try {
				port = Integer.valueOf(arrPort[i]);
			} catch (NumberFormatException e) {
				log.error("port trans error !");
			}
			addressArr[i] = new TransportAddress(new InetSocketAddress(objIp, port));
		}
		try {
			Settings settings = Settings.builder().put("cluster.name", esEnv.getClusterName()).build();
			client = new PreBuiltTransportClient(settings).addTransportAddresses(addressArr);
		} catch (Exception e) {
			e.printStackTrace();
		}  
		log.info("开辟集群连接,address:"+"\t连接对象"+client);
	}
	
	public Client getClient() {
		return client;
	}

	public void rebuildClient(){
		log.info("上次client连接发生错误,重新开辟连接!");
		if(client != null){
			close();
		}
		buildClient();
	}
	
	public EsClient(EsEnv esEnv) {
		this.esEnv = esEnv;
		buildClient();
	}
	
	public BulkRequestBuilder getBulkRequestBuilder(){
		return client.prepareBulk();
	}
	
	public IndexRequestBuilder getIndexRequestBuilder(){
		return client.prepareIndex();
	}
	
	public SearchRequestBuilder getEsSearch(){
		return client.prepareSearch();
	}
	
	public void esRefresh(){
		client.admin().indices().prepareRefresh().execute().actionGet();
	}
	
	/**
	 * 获取ES服务器的所有打开的索引
	 * 
	 * @Description: 
	 * @return
	 */
	public ClusterHealthResponse getClusterHealthResponse(){
		return client.admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus()).actionGet();
	}
	
	/**
	 * 获取ES服务器的所有索引(包括打开和关闭的索引)
	 * 
	 * @Description: 
	 * @return
	 */
	public MetaData getMetaData(){
		ClusterState state = client.admin().cluster().prepareState().execute().actionGet().getState();
		return state.getMetaData();
	}
	
	
	public boolean isExists(String indexName){
		return client.admin().indices().prepareExists(indexName).execute().actionGet().isExists();
	}
	
	public boolean closeIndex(String indexName){
		return client.admin().indices().prepareClose(indexName).execute().actionGet().isAcknowledged();
	}
	
	public boolean deleteIndex(String indexName){
		return client.admin().indices().prepareDelete(indexName).execute().actionGet().isAcknowledged();
	}
	
	public boolean createIndex(Settings settings, String indexName){
		return client.admin().indices().prepareCreate(indexName).setSettings(settings).execute().actionGet().isAcknowledged();
	}
	
	public void createAlias(String indexName, String aliasName){
		client.admin().indices().prepareAliases().addAlias(indexName, aliasName).execute().actionGet();
	}
	
	public boolean putMapping(PutMappingRequest mappingRequest){
		return client.admin().indices().putMapping(mappingRequest).actionGet().isAcknowledged();
	}
	
	
	/**
	 * 关闭ES客户端
	 * 
	 * @Description:
	 */
	public void close(){
		log.info("\t关闭连接对象"+client);
		client.close(); 
	}	
}

第三步,创建连接池对象:

package com.yuguo.es.poolutils;

@Slf4j
public class EsClientPool {
	
	@Getter
	@Setter
	private String clusterName;
	
	@Getter
	@Setter
	private String ip;
	
	@Getter
	@Setter
	private String port;
	
	@Getter
	@Setter
	private int keepClienNum;
	
	private ObjectPool<EsClient> pool = null;
	
	public EsClientPool(String clusterName, String ip, String port,int keepClienNum) {
		super();
		this.clusterName = clusterName;
		this.ip = ip;
		this.port = port;
		this.keepClienNum = keepClienNum;
		
		EsEnv esEnv = new EsEnv();
	    	esEnv.setClusterName(clusterName);
	    	esEnv.setIp(ip);
	    	esEnv.setPort(port);
	    	PoolableUserFactory poolFactory=new PoolableUserFactory(esEnv);
	    	GenericObjectPoolConfig config=new GenericObjectPoolConfig();
	    	config.setMaxTotal(keepClienNum);
	    	pool = new GenericObjectPool<EsClient>(poolFactory, config);
	}
    
	public EsClient getEsClient(){
    	EsClient esClient = null;
    	try {
			esClient = pool.borrowObject();
		} catch (Exception e) {
			log.error("create Client error!" , e);
		}
    	return esClient;
    }
    
    public EsClient removeEsClient(EsClient esClient){
    	try {
			pool.returnObject(esClient);
		} catch (Exception e) {
			log.error("Client return to pool error!" , e);
		}
    	return esClient;
    }
    
}

然后写一个utils类来获取elasticsearch连接:

private static EsClientPool pool;
	
public static EsClientPool esClientPool(ElasticsearchConfig esConfig){
	if(pool == null){
		pool = new EsClientPool(getEsClusterName(esConfig), getEsIp(esConfig), 	getEsPort(esConfig).toString(), 5);
	}
	return pool;
}

到此,一个简单的Elasticsearch连接池就写好了!

猜你喜欢

转载自blog.csdn.net/weixin_43833817/article/details/84937857