elasticsearch备份与恢复3_使用ES-Hadoop将HDFS数据写入Elasticsearch中

背景知识见链接: elasticsearch备份与恢复2_es通过repository-hdfs插件支持snapshot和restore API

es使用repository-hdfs插件通过snapshot和restore,来执行将索引传入HDFS以及从HDFS中恢复。但这样的在HDFS中的snapshot文件并不适合处理。

ES-Hadoop可以用java来操作HDFS以及Elasticsearch之间的数据同步,可以把HDFS中的数据导入到Elasticsearch的索引中,从而可以放到kibana中进行展示。也可以把Elasticsearch中的索引数据传到HDFS中进行离线批处理。

这一节讲一下前者的实现方式。

项目参考《Elasticsearch集成Hadoop最佳实践》的WordCount示例

使用了maven管理jar包的方式使用ES-Hadoop,其ES-Hadoop版本要与Elasticsearch版本一致,这里是6.2.3

项目源码:https://gitee.com/constfafa/HDFSToES.git

注意:使用ES-Hadoop,docker02防火墙要开启9000端口

 

开发过程

1. 首先, 我们使用如下命令在docker02 HDFS上创建一个目录来管理输入文件

hadoop fs -mkdir /input

hadoop fs –mkdir /input/ch01

2. 在根路径mkdir jar文件夹

将项目源码打的jar包ch01-0.0.1-job.jar以及sample.txt文件都通过XFTP上传到jar文件夹中

3. 使用如下命令将sample.txt文件上传到HDFS对应目录

hadoop fs –put /jar/sample.txt /input/ch01/sample.txt

4. 使用如下命令确认该文件是否成功上传到了HDFS指定位置上

hadoop fs –ls /input/ch01

5. hadoop ch01-0.0.1-job.jar /input/ch01/sample.txt

可以看到执行过程

[root@docker02 jar]# hadoop jar ch01-0.0.1-job.jar /input/ch01/sample.txt
18/06/07 01:31:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/07 01:31:18 INFO client.RMProxy: Connecting to ResourceManager at /192.168.211.104:8032
18/06/07 01:31:22 WARN mr.EsOutputFormat: Speculative execution enabled for reducer - consider disabling it to prevent data corruption
18/06/07 01:31:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/06/07 01:31:34 INFO input.FileInputFormat: Total input paths to process : 1
18/06/07 01:31:34 INFO mapreduce.JobSubmitter: number of splits:1
18/06/07 01:31:36 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1528305729734_0003
18/06/07 01:31:45 INFO impl.YarnClientImpl: Submitted application application_1528305729734_0003
18/06/07 01:31:47 INFO mapreduce.Job: The url to track the job: http://docker02:8088/proxy/application_1528305729734_0003/
18/06/07 01:31:47 INFO mapreduce.Job: Running job: job_1528305729734_0003
18/06/07 01:34:56 INFO mapreduce.Job: Job job_1528305729734_0003 running in uber mode : false
18/06/07 01:35:03 INFO mapreduce.Job:  map 0% reduce 0%
18/06/07 01:39:24 INFO mapreduce.Job:  map 67% reduce 0%
18/06/07 01:39:29 INFO mapreduce.Job:  map 100% reduce 0%
18/06/07 01:41:31 INFO mapreduce.Job:  map 100% reduce 67%
18/06/07 01:41:43 INFO mapreduce.Job:  map 100% reduce 100%
18/06/07 01:41:57 INFO mapreduce.Job: Job job_1528305729734_0003 completed successfully
18/06/07 01:42:24 INFO mapreduce.Job: Counters: 66
	File System Counters
		FILE: Number of bytes read=5103
		FILE: Number of bytes written=241415
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=2917
		HDFS: Number of bytes written=0
		HDFS: Number of read operations=2
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=0
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=287474
		Total time spent by all reduces in occupied slots (ms)=86672
		Total time spent by all map tasks (ms)=287474
		Total time spent by all reduce tasks (ms)=86672
		Total vcore-seconds taken by all map tasks=287474
		Total vcore-seconds taken by all reduce tasks=86672
		Total megabyte-seconds taken by all map tasks=294373376
		Total megabyte-seconds taken by all reduce tasks=88752128
	Map-Reduce Framework
		Map input records=26
		Map output records=383
		Map output bytes=4331
		Map output materialized bytes=5103
		Input split bytes=114
		Combine input records=0
		Combine output records=0
		Reduce input groups=232
		Reduce shuffle bytes=5103
		Reduce input records=383
		Reduce output records=232
		Spilled Records=766
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=18474
		CPU time spent (ms)=12780
		Physical memory (bytes) snapshot=233213952
		Virtual memory (bytes) snapshot=4123394048
		Total committed heap usage (bytes)=143073280
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=2803
	File Output Format Counters 
		Bytes Written=0
	Elasticsearch Hadoop Counters
		Bulk Retries=0
		Bulk Retries Total Time(ms)=0
		Bulk Total=1
		Bulk Total Time(ms)=2489
		Bytes Accepted=9655
		Bytes Received=4000
		Bytes Retried=0
		Bytes Sent=9655
		Documents Accepted=232
		Documents Received=0
		Documents Retried=0
		Documents Sent=232
		Network Retries=0
		Network Total Time(ms)=3054
		Node Retries=0
		Scroll Total=0
		Scroll Total Time(ms)=0

可以看到job成功,后面会报的异常不需要处理

18/06/07 01:42:26 INFO ipc.Client: Retrying connect to server: docker02/192.168.211.104:43790. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=3, sleepTime=1000 MILLISECONDS)
18/06/07 01:42:30 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server

6. 使用kibana执行以下语句

GET eshadoop/wordcount/_search
{
  "query": {
    "match_all": {}
  }
}

可以看到数据已经成功由HDFS中导入到了kibana中

猜你喜欢

转载自blog.csdn.net/u013905744/article/details/81623981