版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/chenzhenguo123/article/details/83414549
参考 https://gitee.com/eminem89/Hbase-Observer-ElasticSearch 上面的代码,但是由于我的es版本是2.4.0 和作者的版本不对应导致功能无法正常使用,所以特此记录修改成能参考
代码如下:
maven pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.eminem.hbase</groupId>
<artifactId>hbase-observer-elasticsearch</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hbase-observer-elasticsearch</name>
<url>http://maven.apache.org</url>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.version>2.4.0</elasticsearch.version>
<hbase-server.version>1.2.0-cdh5.8.2</hbase-server.version>
<maven-assembly-plugin.version>2.6</maven-assembly-plugin.version>
<commons-logging.version>1.2</commons-logging.version>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase-server.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptors>
<descriptor>assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
项目目录图
参考原作者的项目基础之上改动 https://gitee.com/eminem89/Hbase-Observer-ElasticSearch
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Bulk hbase data to ElasticSearch Class
*/
public class ElasticSearchBulkOperator {
private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);
private static final int MAX_BULK_COUNT = 10000;
private static BulkRequestBuilder bulkRequestBuilder = null;
private static final Lock commitLock = new ReentrantLock();
private static ScheduledExecutorService scheduledExecutorService = null;
static {
// init es bulkRequestBuilder
bulkRequestBuilder = ESClient.client.prepareBulk();
bulkRequestBuilder.setRefresh(true);
// init thread pool and set size 1
scheduledExecutorService = Executors.newScheduledThreadPool(1);
// create beeper thread( it will be sync data to ES cluster)
// use a commitLock to protected bulk es as thread-save
final Runnable beeper = new Runnable() {
public void run() {
commitLock.lock();
try {
bulkRequest(0);
} catch (Exception ex) {
System.out.println(ex.getMessage());
LOG.error("Time Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
};
// set time bulk task
// set beeper thread(10 second to delay first execution , 30 second period between successive executions)
scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);
}
/**
* shutdown time task immediately
*/
public static void shutdownScheduEx() {
if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
scheduledExecutorService.shutdown();
}
}
/**
* bulk request when number of builders is grate then threshold
*
* @param threshold
*/
private static void bulkRequest(int threshold) {
if (bulkRequestBuilder.numberOfActions() > threshold) {
BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
if (!bulkItemResponse.hasFailures()) {
bulkRequestBuilder = ESClient.client.prepareBulk();
}
}
}
/**
* add update builder to bulk
* use commitLock to protected bulk as thread-save
* @param builder
*/
public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" update Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
/**
* add delete builder to bulk
* use commitLock to protected bulk as thread-save
*
* @param builder
*/
public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" delete Bulk " + ESClient.indexName + " index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
}
ESClient.java
package org.eminem.hbase.observer6;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
/**
* ES Cleint class
*/
public class ESClient {
private static final Log LOG = LogFactory.getLog(ESClient.class);
// ElasticSearch的集群名称
public static String clusterName;
// ElasticSearch的host
public static String nodeHost;
// ElasticSearch的端口(Java API用的是Transport端口,也就是TCP)
public static int nodePort;
// ElasticSearch的索引名称
public static String indexName;
// ElasticSearch的类型名称
public static String typeName;
// ElasticSearch Client
public static Client client;
/**
* get Es config
*
* @return
*/
public static String getInfo() {
List<String> fields = new ArrayList<String>();
try {
for (Field f : ESClient.class.getDeclaredFields()) {
fields.add(f.getName() + "=" + f.get(null));
}
} catch (IllegalAccessException ex) {
LOG.error(ex);
ex.printStackTrace();
}
return StringUtils.join(",",fields);
}
/**
* init ES client
*/
public static void initEsClient() {
try {
// 创建配置对象 myClusterName处为es集群名称
Settings settings = Settings.settingsBuilder()
.put("cluster.name", ESClient.clusterName).build();
// 创建客户端 host1,host2处为es集群节点的ip地址
client = TransportClient
.builder()
.settings(settings)
.build()
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName(ESClient.nodeHost), ESClient.nodePort));
} catch (UnknownHostException e) {
LOG.error(e);
e.printStackTrace();
}
}
/**
* Close ES client
*/
public static void closeEsClient() {
LOG.info("-----------Close ES client--------------");
client.close();
}
public static void main(String[] args) {
try {
// 创建配置对象 myClusterName处为es集群名称
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "myClusterName").build();
// 创建客户端 host1,host2处为es集群节点的ip地址
TransportClient client = TransportClient
.builder()
.settings(settings)
.build()
.addTransportAddress(
new InetSocketTransportAddress(InetAddress
.getByName("192.168.1.181"), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
System.out.println("conntect success");
}
}
HbaseDataSyncEsObserver.java
package org.eminem.hbase.observer6;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
/**
* Hbase Sync data to Es Class
*/
public class HbaseDataSyncEsObserver extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(HbaseDataSyncEsObserver.class);
/**
* read es config from params
* @param env
*/
private static void readConfiguration(CoprocessorEnvironment env) {
Configuration conf = env.getConfiguration();
ESClient.clusterName = conf.get("es_cluster");
ESClient.nodeHost = conf.get("es_host");
ESClient.nodePort = conf.getInt("es_port", -1);
ESClient.indexName = conf.get("es_index");
ESClient.typeName = conf.get("es_type");
}
/**
* start
* @param e
* @throws IOException
*/
@Override
public void start(CoprocessorEnvironment e) throws IOException {
// read config
readConfiguration(e);
// init ES client
ESClient.initEsClient();
LOG.error("------observer init EsClient ------"+ESClient.getInfo());
}
/**
* stop
* @param e
* @throws IOException
*/
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
// close es client
ESClient.closeEsClient();
// shutdown time task
ElasticSearchBulkOperator.shutdownScheduEx();
}
/**
* Called after the client stores a value
* after data put to hbase then prepare update builder to bulk ES
*
* @param e
* @param put
* @param edit
* @param durability
* @throws IOException
*/
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
LOG.error("------observer postPut ------"+ESClient.getInfo()+"\r\n");
String indexId = new String(put.getRow());
LOG.error(indexId+"\r\n");
LOG.error("------observer postPut ------");
try {
NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String, Object> infoJson = new HashMap<String, Object>();
Map<String, Object> json = new HashMap<String, Object>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
}
}
// set hbase family to es
infoJson.put("info", json);
LOG.error("------"+infoJson.toString()+" ------");
LOG.error("------"+json+" ------");
ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(ESClient.indexName, ESClient.typeName, indexId).setDocAsUpsert(true).setDoc(infoJson));
} catch (Exception ex) {
LOG.error("observer put a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
/**
* Called after the client deletes a value.
* after data delete from hbase then prepare delete builder to bulk ES
* @param e
* @param delete
* @param edit
* @param durability
* @throws IOException
*/
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
String indexId = new String(delete.getRow());
try {
ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(ESClient.indexName, ESClient.typeName, indexId));
} catch (Exception ex) {
LOG.error(ex);
LOG.error("observer delete a doc, index [ " + ESClient.indexName + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
}
部署方式参考https://blog.csdn.net/fxsdbt520/article/details/53884338