大数据组件使用 总文章
====================================================
代码下载 链接:https://pan.baidu.com/s/1gBi1epr2oli3y9of2jdI-A
提取码:vpf9
使用MR消费kafka数据,需要手动管理kafka的offset,因此有必要先阅读下面一遍文章
参考文章:kafka 生产/消费API、offset管理/原理、kafka命令
====================================================
重写InputFormat、RecordReader:
1.执行流程
1.主类:提交程序 job.submit()/job.waitForCompletion(true)
2.InputFormat:getSplits --> createRecordReader
1.getSplits:topic中 有多少个分区,那么getSplits就应该返回同样个数的InputSplit对象并封装到List中并进行return
2.createRecordReader:getSplits方法return的List中有多少个InputSplit对象,就会调用多少次createRecordReader创建同样数量的RecordReader对象
3.RecordReader:
注意:调用多少次createRecordReader 就会创建同样数量的RecordReader对象,等同topic的分区数量
只执行一次 initialize
开始循环执行 nextKeyValue --> getProgress --> getCurrentKey --> getCurrentValue
最终执行 close
1.每个RecordReader对象 均对应 一个 topic的分区
2.创建每个RecordReader对象时,都会调用一次 initialize
3.然后循环执行 nextKeyValue --> getProgress --> getCurrentKey --> getCurrentValue,直到 nextKeyValue 返回 false为止 结束循环
4.最终调用 close方法 关闭 “消费kafka的”对象
2.注意:
configuration.setInt("mapreduce.local.map.tasks.maximum", 并行数);
对于单机运行的情况,需要指定mapreduce.local.map.tasks.maximum参数,表示并行执行的最大map个数。不指定的话默认是1,所有任务都是串行执行的。
map个数的并行度:应设置为 topic分区的个数 也即 class InputFormat类中 getSplits方法返回的 InputSplit对象的个数
======================================
第一种方式:一次性把指定范围内的offset的kafka数据消费完毕
======================================
package com.mrkafka;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
/*
mapreduce之 自定义分片策略
1.自定义分片策略的作用之一:可解决大量小文件问题
1.默认的TextInputFormat
应该都知道默认的TextInputFormat是一行行的读取文件内容,这对于一个或几个超大型的文件来说并没有什么问题,
但是在实验读取大量小文件的时候,性能及其低下。
2.实验过程
分别有5个文件夹,每个文件夹下有不同数量(1-2千个)的小文件(10+k大小),总量大概有8k+个文件,使用CLI命令上传到HDFS就花费了一个多小时。
环境为本地电脑安装的伪分布式Hadoop集群,机器配置为四核I7的CPU,16G的RAM。
编写简单的worldCount程序,一切默认,放到集群上跑的时候出现以下情况:
1.启动的mapper总数量为8k+个!而一个节点能同时运行的mapper数量为4
2.整个map过程及其缓慢,50%跑了2h
3.CPU总用率高达80%,整个机器开始发出呲呲呲的声音
可见大量的小文件对mapreduce程序性能的影响有多大。
3.问题的根本所在
HDFS上的文件是按block来存储的。
如果一个文件很大,超出了一个block的设定,那么它就会被划分为多个block存储,mapreduce程序读取时,每个block都会对应输入一个mapper,
所以大文件,默认的分片策略是可以hold住的。
但是如果是很多小文件的话,每个小文件存储的时候都会是一个block,即使它很小,远远达不到block大小(默认128M),
HDFS还是会将其存储在一个block中,那么问题就来了,默认的分片策略在读取这些小文件的时候,每个block都会产生一个mapper,
所以就有了上面程序中出现了8k+个mapper的情况。
4.解决方案
既然知道了问题所在,那么就可以指定对应的解决方案,无非就是从两点入手:
1.默认每个小文件对应一个block,那么可以采取压缩等手段将多个小文件进行合并存储,以达到每个block存储的内容都是足够大的。
2.修改mapreduce默认的分片策略,使得读取文件进行分片的时候让每个block可以对应多个小文件,而不再是仅仅一个小文件。
2.自定义的分片策略
1.关于Hadoop的InputFormat类,默认的分片策略使用的就是其TextInputFormat子类,这里将介绍另外一个子类:CombineFileInputFormat。
CombineFileInputFormat是用来将输入的小文件进行合并,然后输入到一个mapper中的策略。
这是一个抽象类,只实现了InputFomat接口的getSplit方法。所有的分片策略都要继承InputFormat,并实现getSplit和createRecordReader两个方法。
既然我们需要用到CombineFileInputFormat,但他留了一个接口方法让我们实现,那么就可以自定义一个MyInputFormat类继承自CombineFileInputFormat,
重写createRecordReader。
2.关于InputFormat接口的两个方法:
1.getSplit是从HDFS上读取文件,并形成逻辑的分片,在本文中,这个分片会包含多个小文件。
2.createRecordReader会创建一个RecordReader对象,用来读取getSplit产生的分片,mapper中的键值对就是这个RecordReader输出的。
之前讨论到的自定义MyInputFormat类实现分片策略,但是分片之后如何读取分片内的数据是createRecordReader方法创建的RecordReader对象决定的。
所以自定义分片策略的关键在于两点:
1.MyInputFormat类自定义分片策略
2.MyRecordReader类自定义读取分片内的数据
3.KafkaInputFormat类
1.KafkaInputFormat类 这是一个自定义的抽象类,只实现了InputFomat接口的getSplit方法,即重写getSplit和createRecordReader两个方法。
所有的分片策略(比如 自定义的KafkaInputFormat类)都要继承InputFormat,并实现getSplit和createRecordReader两个方法。
2.关于InputFormat接口的两个方法:
1.getSplit是从HDFS上读取文件,并形成逻辑的分片,在本文中,这个分片会包含多个小文件。
2.createRecordReader会创建一个RecordReader对象,用来读取getSplit产生的分片,mapper中的键值对就是这个RecordReader输出的。
3.自定义的MyInputFormat(比如当前自定义的KafkaInputFormat类)类实现分片策略,但是分片之后如何读取分片内的数据,
是createRecordReader方法创建的RecordReader对象决定的。
所以自定义分片策略的关键在于两点:
1.MyInputFormat类(比如当前自定义的KafkaInputFormat类)自定义分片策略
2.MyRecordReader类(比如当前自定义的KafkaRecordReader类)自定义读取分片内的数据
*/
public class KafkaInputFormat<K, V> extends InputFormat<K, V>
{
//封装多个topic主题
// private final String[] topics;
private String[] topics = {"test"};
// //需要了解如何接收Kafka主题/分区的 开始偏移信息,接收Kafka主题/分区 结束偏移信息
// public KafkaInputFormat(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String...topics)
// {
// //封装多个topic主题
// this.topics = topics;
// }
/*
把每个topic主题的中每个分区信息都封装到InputSplit对象中,目的是在 MapReduce过程中,Map之前要进行的是Inputsplit,通过数据的切分,
然后才能把数据分发到不同的节点上进行计算。
InputSplit整个过程包含了个重要的方法getSplits和getRecorder方法。
getSplits主要返回了一个InputSplit的对象,这个对象在逻辑上对整个数据进行了切片(注意,没有进行实际的物理切分)
*/
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException
{
System.out.println("-------------3 getSplits------------");
// 通过Properties配置文件的方式 配置Kafka的"连接集群"的信息
// Properties connectionProps = KafkaUtils.getKafkaConnectionProperties(jobContext.getConfiguration());
Properties connectionProps = new Properties();
connectionProps.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
// connectionProps.put("bootstrap.servers", "cdh01.hadoop.com:9092,cdh02.hadoop.com:9092,cdh03.hadoop.com:9092");
connectionProps.put("group.id", "testGroup");
connectionProps.put("enable.auto.commit", "true"); // 自动提交 enable.auto.commit:true
connectionProps.put("auto.commit.interval.ms", "1000");
connectionProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectionProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectionProps.put("auto.offset.reset","earliest");//默认是 latest, earliest
// List<InputSplit> splits = new LinkedList<>();
List<InputSplit> splits = new ArrayList<InputSplit>();
/*
在集群中运行mr的时候,可能会引用不到 kafka-clients-1.0.1.jar 而报错找不到 KafkaConsumer类,
因此就需要自己打包mr程序的jar包之后,还要把 kafka-clients-1.0.1.jar 中的 org整个目录给 打包进 mr程序的jar包中,
那么集群中跑mr程序时就能使用自己手动打包进去的kafka-clients-1.0.1.jar
*/
//创建KafkaConsumer消费者,传入所配置好的Kafka"连接集群"的信息
try (KafkaConsumer<Text, Text> consumer = new KafkaConsumer<Text, Text>(connectionProps))
{
//遍历每个topic主题
for (String topic : topics)
{
//获取该topic主题的分区列表,即所有分区的信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
//遍历该topic主题中的每个分区Partition信息
for(PartitionInfo info: partitionInfos)
{
//了解如何查找 当前最低 + 最大偏移量,尝试只使用Kafka客户机,但可能需要Kafka_2.x
//new KafkaInputSplit(topic字符串名称, partition分区序号(代表第几个分区), 开始偏移量, 结束偏移量)
splits.add(new KafkaInputSplit(topic, info.partition(), 0, 4000));
// System.out.println("topic:"+topic+"\t 分区号"+info.partition());
}
}
}
// System.out.println("splits.size:"+splits.size());
// System.out.println("-------------3.1------------");
// 把每个topic主题的中每个分区信息都封装到InputSplit对象中,目的是在 MapReduce过程中,Map之前要进行的是Inputsplit,通过数据的切分,
// 然后才能把数据分发到不同的节点上进行计算
return splits;
}
@Override
//自定义MyInputFormat类(比如当前自定义的KafkaInputFormat类)实现分片策略,但是分片之后如何读取分片内的数据是createRecordReader方法创建的RecordReader对象决定的
//createRecordReader(InputSplit 当前输入的分片,TaskAttemptContext 当前输入的系统环境)
public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
System.out.println("-------------4 new KafkaRecordReader()------------");
return new KafkaRecordReader();
}
}
------------------------------------------------------------------------------------------------------------------------------
package com.mrkafka;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.kafka.common.TopicPartition;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//现在只需简单地假设每个topic partition有一个split拆分,但可以分组来减少mapper映射器
public class KafkaInputSplit extends InputSplit implements Writable
{
private String topic; //topic主题
private int partition; //topic中的分区序号,代表第几个分区
private long startingOffset; //开始偏移量
private long endingOffset; //结束偏移量
//被声明为transient的TopicPartition类不会被序列化,这就是transient关键字的作用
//TopicPartition类 实现了 序列化接口 java.io.Serializable。 构造函数 TopicPartition(java.lang.String topic主题名称, int partition分区号)
private transient TopicPartition topicPartition; //topic分区对象
/*
transient、序列化、反序列化
1.平时我们在Java内存中的对象,是无 法进行IO操作或者网络通信的,因为在进行IO操作或者网络通信的时候,
人家根本不知道内存中的对象是个什么东西,因此必须将对象以某种方式表示出来,即 存储对象中的状态。
一个Java对象的表示有各种各样的方式,Java本身也提供给了用户一种表示对象的方式,那就是序列化。
换句话说,序列化只是表示对 象的一种方式而已。OK,有了序列化,那么必然有反序列化,我们先看一下序列化、反序列化是什么意思。
2.序列化只需要实现java.io.Serializable接口就可以了。序列化的时候有一个serialVersionUID参数,
Java序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。
在进行反序列化,Java虚拟机会把传过来的字节流中的serialVersionUID和本地相应实体类的serialVersionUID进行比较,
如果相同就认为是一致的实体类,可以进行反序列化,否则Java虚拟机会拒绝对这个实体类进行反序列化并抛出异常。
serialVersionUID有两 种生成方式:
1、默认的1L
2、根据类名、接口名、成员方法以及属性等来生成一个64位的Hash字段
如果实现 java.io.Serializable接口的实体类没有显式定义一个名为serialVersionUID、类型为long的变量时,
Java序列化 机制会根据编译的.class文件自动生成一个serialVersionUID,如果.class文件没有变化,那么就算编译再多次,
serialVersionUID也不会变化。换言之,Java为用户定义了默认的序列化、反序列化方法,
其实就是ObjectOutputStream的defaultWriteObject方法和ObjectInputStream的defaultReadObject方法。
3.序列化:将一个对象转换成一串二进制表示的字节数组,通过保存或转移这些字节数据来达到持久化的目的。
4.反序列化:将字节数组重新构造成对象。
5.被声明为transient的属性不会被序列化,这就是transient关键字的作用
6.手动指定序列化过程
Java并不强求用户非要使用默认的序列化方式,用户也可以按照自己的喜好自己指定自己想要的序列化方式,
只要你自己能保证序列化前后能得到想要的数据就好了。
手动指定序列化方式的规则是:
进行 行序列化、反序列化时,虚拟机会首先试图调用对象里的writeObject和readObject方法,进行用户自定义的序列化和反序列化。
如果没有这样的方法,那么默认调用的是ObjectOutputStream的defaultWriteObject以及ObjectInputStream的 defaultReadObject方法。
换言之,利用自定义的writeObject方法和readObject方法,用户可以自己控制序列化和反序列 化的过程。
这是非常有用的。
比如:
1、有些 场景下,某些字段我们并不想要使用Java提供给我们的序列化方式,而是想要以自定义的方式去序列化它,
比如ArrayList的 elementData、HashMap的table,就可以通过将这些字段声明为transient,
然后在writeObject和readObject中去使用自己想要的方式去序列化它们
2、因为 序列化并不安全,因此有些场景下我们需要对一些敏感字段进行加密再序列化,然后再反序列化的时候按照同样的方式进行解密,
就在一定程度上保证了安全性了。 要这么做,就必须自己写writeObject和readObject,writeObject方法在序列化前对字段加密,
readObject方法在序 列化之后对字段解密
*/
public KafkaInputSplit(){
}
public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset){
this.topic = topic;
this.partition = partition;
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
}
@Override
public long getLength() throws IOException, InterruptedException
{
//如果开始偏移量大于0的话,那么取 结束偏移量减去开始偏移量的差值,否则取 结束偏移量
return startingOffset > 0 ? endingOffset - startingOffset : endingOffset;
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
//保留为空,因为数据位置不是真正的问题
return new String[0];
}
public TopicPartition getTopicPartition()
{
if(topicPartition == null)
{
// TopicPartition类 实现了 序列化接口 java.io.Serializable。
// 构造函数 TopicPartition(java.lang.String topic主题名称, int partition分区号)
topicPartition = new TopicPartition(topic, partition);
}
return topicPartition;
}
public String getTopic() {
return topic;
}
public int getPartition() {
return partition;
}
//获取开始偏移量
public long getStartingOffset()
{
return startingOffset;
}
//获取结束偏移量
public long getEndingOffset()
{
return endingOffset;
}
@Override
public void write(DataOutput dataOutput) throws IOException
{
Text.writeString(dataOutput, topic, 1048576);
dataOutput.writeInt(partition);
dataOutput.writeLong(startingOffset);
dataOutput.writeLong(endingOffset);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.topic = Text.readString(dataInput, 1048576);
this.partition = dataInput.readInt();
this.startingOffset= dataInput.readLong();
this.endingOffset= dataInput.readLong();
}
}
------------------------------------------------------------------------------------------------------------------------------
package com.mrkafka;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.hadoop.io.Text;
import javax.security.auth.kerberos.KerberosTicket;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import static com.mrkafka.KafkaUtils.getKafkaConnectionProperties;
/*
RecordReader:其作用就是将数据切分成key/value的形式然后作为输入传给Mapper。
继承RecordReader需要实现RecordReader的6个抽象方法。
一 方法分析:
1.1 initialize: 初始化RecordReader,只能被调用一次。
1.2 nextKeyValue: 读取下一个key/value键值对
1.3 getCurrentKey: 获取当前的key
1.4 getCurrentValue: 获取当前的value
1.5 getProgress: 当前分片的处理进度
1.6 close: 关闭 RecordReader
*/
public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
private Consumer<K, V> consumer;
private ConsumerRecord<K, V> record;
private long startingOffset;
private long endingOffset;
private long currentOffset;
private int partition;
private String topic ;
private Iterator<ConsumerRecord<K, V>> recordIterator;
//该initialize方法会在InputFormat(KafkaInputFormat类)中的createRecordReader创建一个RecordReader的时候调用。
//初始化RecordReader,只能被调用一次。
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
{
System.out.println("-------------5 initialize------------");
// Configuration configuration = taskAttemptContext.getConfiguration();
// String bootstrapServers = configuration.get("bootstrap.servers");
// String groupID = configuration.get("group.id");
// System.out.println("bootstrapServers:"+bootstrapServers);
// System.out.println("groupID:"+groupID);
// consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
Properties connectionProps = new Properties();
connectionProps.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
// connectionProps.put("bootstrap.servers", "cdh01.hadoop.com:9092,cdh02.hadoop.com:9092,cdh03.hadoop.com:9092");
connectionProps.put("group.id", "testGroup");
connectionProps.put("enable.auto.commit", "false"); // 手动提交 enable.auto.commit:false
connectionProps.put("auto.commit.interval.ms", "1000");
connectionProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectionProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
connectionProps.put("auto.offset.reset","earliest");//默认是 latest, earliest
/*
earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
消费者就可以从最早或者最新的offset开始消费,但在实际上测试的时候发现并不是那么回事,因为他们生效都有一个前提条件,
那就是对于同一个groupid的消费者,如果这个topic某个分区有已经提交的offset,
那么无论是把auto.offset.reset=earliest还是latest,都将失效,消费者会从已经提交的offset开始消费.
*/
consumer = new KafkaConsumer<>(connectionProps);
KafkaInputSplit split = (KafkaInputSplit) inputSplit;
TopicPartition topicPartition = split.getTopicPartition();
// int partition = split.getPartition();
// String topic = split.getTopic();
partition = split.getPartition();
topic = split.getTopic();
startingOffset = split.getStartingOffset();
endingOffset = split.getEndingOffset();
currentOffset = split.getStartingOffset();
// System.out.println("-----------------------");
// System.out.println("partition:"+partition+"\t topic:"+topic+"\t startingOffset:"+startingOffset+"\t endingOffset:"+endingOffset);
// System.out.println("-----------------------");
/*
1.控制消费者的position
kafka允许通过seek(TopicPartition,long)指定新的位置,或者seekToBeginning,seekToEnd定位到最早或最近的offset。
2.使用 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0))) 或 consumer.assign(Collections.singletonList(new TopicPartition(topicName, 0)))
来分配topic和partition
3.使用 consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0))) 指定从这个topic和partition的开始位置获取。
4.使用 consumer.seek(new TopicPartition(topicName, 0), 10)中的10是表示从这个topic的partition中的offset为10的开始获取消息。
*/
// consumer.assign(Collections.singletonList(new TopicPartition(topic, partition))); //分配assign
// consumer.seek(new TopicPartition(topic, partition), startingOffset); //寻找seek
consumer.assign(Collections.singletonList(topicPartition)); //分配assign
consumer.seek(topicPartition, split.getStartingOffset()); //寻找seek
}
//mapper中获得键值对的方式就是调用当前处理的RecordReader对象的nextKeyValue方式,mapper中获得键值对的逻辑主要是在MyRecordReader的nextKeyValue中实现的。
//返回true就取出key和value,返回false就结束 表示没有文件内容可读取了,之后在父类的nextKeyValue方法中进行index的前移
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
System.out.println("-------------6 nextKeyValue------------");
// recordIterator = getRecords();
// record = recordIterator.hasNext() ? recordIterator.next() : null;
// //返回true就取出key和value,返回false就结束 表示没有文件内容可读取了,之后在父类的nextKeyValue方法中进行index的前移
// return record != null && record.offset() >= startingOffset && record.offset() <= endingOffset;
if(currentOffset <= endingOffset)
{
recordIterator = getRecords();
if (recordIterator.hasNext())
{
record = recordIterator.next();
//将提交方式改成false之后,需要手动提交
consumer.commitSync();
// System.out.println("manual commit offset 手动提交offset");
currentOffset++;
consumer.seek(new TopicPartition(topic, partition), currentOffset); //寻找seek
return true;
}
else
{
return false;
}
}
else
{
return false;
}
}
//获取RecordReader
private Iterator<ConsumerRecord<K, V>> getRecords()
{
ConsumerRecords<K, V> records = null;
if(recordIterator == null || recordIterator.hasNext())
{
records = consumer.poll(1000);
}
return records != null ? records.iterator(): ConsumerRecords.<K, V>empty().iterator();
}
//当前分片的处理进度
@Override
public float getProgress() throws IOException, InterruptedException {
if (record == null || record.offset() < 0)
{
// System.out.println("====处理进度结束,任务结束=====");
return 0;
}
System.out.println("-------------7 getProgress------------");
float v = record.offset() * 1.0f / endingOffset;
// System.out.println("当前分片的处理进度:"+v);
return v;
//不是最准确,但给出了合理的估计
// return record.offset()*1.0f/endingOffset;
}
//获取当前的key,通过 nextKeyValue 读取下一个key/value键值对 来作为当前获取的key
@Override
public K getCurrentKey() throws IOException, InterruptedException {
int partition = record.partition();//分区号
long offset = record.offset(); //分区中的 offset
/*
Text转换为String对象,这样就可以用String的API操作
Text t= new Text("hadoop");
t.set("hbase");
t.toString();
Text t= new Text("hadoop");
t.set("hbase");
*/
// Text Textkey = new Text();
// Textkey.set(record.key().toString());
//
// System.out.println("getCurrentKey 分区号:"+partition+"\t offset:"+offset+"\t key:"+Textkey);
// return (K)Textkey;
Text Textkey = new Text();
Textkey.set("Textkey".toString());
System.out.println("getCurrentKey 分区号:"+partition+"\t offset:"+offset+"\t key:"+Textkey);
return (K)Textkey;
}
//获取当前的value,通过 nextKeyValue 读取下一个key/value键值对 来作为当前获取的value
@Override
public V getCurrentValue() throws IOException, InterruptedException {
int partition = record.partition();//分区号
long offset = record.offset(); //分区中的 offset
// Text Textvalue = new Text();
// Textvalue.set(record.value().toString());
// System.out.println("getCurrentValue 分区号:"+partition+"\t offset:"+offset+"\t value:"+Textvalue);
// return (V)Textvalue;
Text Textvalue = new Text();
String s = "offset:"+offset+"\t value:"+record.value().toString();
Textvalue.set(s);
System.out.println("getCurrentValue 分区号:"+partition+"\t "+s);
return (V)Textvalue;
}
//关闭 RecordReader
@Override
public void close() throws IOException {
System.out.println("------8 close consumer 关闭----------");
consumer.close();
}
}
======================================
第二种方式:
MR实时监控并消费kafka中offset的最新数据,并实时把消费完的offset存储到mysql中
======================================
package com.mrkafka;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ArrayListHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class KafkaInputFormat extends InputFormat<IntWritable, LongWritable>
{
private static final Logger logger = LoggerFactory.getLogger(KafkaInputFormat.class);
/*
CREATE TABLE `topic_partition_offset` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`topicName` varchar(255) NOT NULL DEFAULT '' COMMENT '主题名',
`kafkaPartition` LONG NOT NULL COMMENT '分区号',
`offset` int(11) NOT NULL COMMENT '消费到的位置',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='topic对应分区的所消费到的offset';
*/
public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException
{
//jobContext.getConfiguration() 获取的是 main主类中Job job = Job.getInstance(conf)所设置的 conf配置信息
Configuration configuration = jobContext.getConfiguration();
//kafka.partitions.count 代表自定义的 topic分区数量,如果自己没有手动指定的话,则使用 defaultVaule默认值 0
//分区数。而分区号是从0开始
int KafkaPartitionCounts = 3;
String topicName = "test";
/*
从mysql中查询该topic对应每个分区的offset是多少,如果mysql中没有存在对应的offset的话,那么offset默认为0
Class QueryRunner 核心运行类:提供了对sql语句操作的方法。提供了自定事务处理、自动释放资源等操作,无需再手动
传入连接池,交由QueryRunner自动操作连接池中的连接
所以无需手动调用conn.close(),交由QueryRunner自动管理。
*/
QueryRunner queryRunner = new QueryRunner(MysqlUtil.getDataSource());
String sql = "select kafkaPartition,offset from topic_partition_offset where topicName = '"+topicName+"'";
List<Object[]> arrayListResult = null;
try
{
arrayListResult = queryRunner.query(sql, new ArrayListHandler());
}
catch (SQLException e)
{
logger.error("-------查询失败--------");
logger.error(e.getMessage());
// e.printStackTrace();
}
//每个InputSplit对象 负责 封装topic的每个分区信息,那么getSplits即返回封装了多个分区的ArrayList<InputSplit>列表
ArrayList<InputSplit> arrayList = new ArrayList<InputSplit>();
//遍历分区
for (int i = 0; i < KafkaPartitionCounts; ++i)
{
//每个KafkaInputSplit封装一个topic的分区信息
KafkaInputSplit kafkaInputSplit = new KafkaInputSplit();
kafkaInputSplit.partition = i; //分区号:从0开始
//方案一:程序启动时 先从mysql中查询出该offset,然后作为这一次开始消费的offset的位置
//startOffset读取分区的开始offset:如果自己没有手动指定的话,则使用 defaultVaule默认值 0
Integer o = (Integer)arrayListResult.get(i)[1];
kafkaInputSplit.startOffset = o.longValue() ;
//方案二:不管配置auto.offset.reset 为latest还是为earliest,只要分区中有已提交的offset,那么就从该分区中已提交的offset开始消费,
// 因此此处默认值为0L即可,反正后面用不到该0L值
// kafkaInputSplit.startOffset = 0L;
System.out.println("分区号:"+kafkaInputSplit.partition+"\t startOffset:"+kafkaInputSplit.startOffset);
//把封装了每个分区信息的多个InputSplit对象再封装到ArrayList中
arrayList.add(kafkaInputSplit);
}
//getSplits返回 ArrayList<InputSplit>:代表要读取的分片策略是要 读取多少个分区,每个InputSplit代表一个分区
return arrayList;
}
public RecordReader<IntWritable, LongWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//每个InputSplit代表一个分区,把InputSplit对象传入到RecordReader对象中,代表开始如何从该分区中如何读取数据
return new KafkaRecordReader(inputSplit);
}
}
------------------------------------------------------------------------------------------------------------------------------
package com.mrkafka;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class KafkaInputSplit extends InputSplit implements Writable
{
public int partition = 0; //分区号:从0开始
public long startOffset = -1L;
public int getPartition() {
return this.partition;
}
public long getStartOffset() {
return this.startOffset;
}
public long getLength() throws IOException {
return 0L;
}
public String[] getLocations() throws IOException {
return new String[0];
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.partition);
dataOutput.writeLong(this.startOffset);
}
public void readFields(DataInput dataInput) throws IOException {
this.partition = dataInput.readInt();
this.startOffset = dataInput.readLong();
}
}
------------------------------------------------------------------------------------------------------------------------------
package com.mrkafka;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class KafkaRecordReader extends RecordReader<IntWritable, LongWritable>
{
public boolean hasNext = true;
public KafkaInputSplit kafkaInputSplit;
public KafkaRecordReader(InputSplit inputSplit) {
this.kafkaInputSplit = (KafkaInputSplit)inputSplit;
}
//每个KafkaRecordReader对象 只执行一次 initialize
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
}
//mapper中获得键值对的方式就是调用当前处理的RecordReader对象的nextKeyValue方式,mapper中获得键值对的逻辑主要是在MyRecordReader的nextKeyValue中实现的。
//返回true就取出key和value,返回false就结束 表示没有文件内容可读取了,之后在父类的nextKeyValue方法中进行index的前移
public boolean nextKeyValue() throws IOException, InterruptedException
{
//第一次执行时,hasNext为true,因此执行一次以下判断会成立
if (this.hasNext)
{
//执行一次判断成立后,立即重置为false,下次则不会再执行判断成立
this.hasNext = false;
//返回true表示 读取下一个key/value键值对。返回false表示没有key/value键值对可以读取了
return true;
}
return false;
}
//获取keyin:分区号
public IntWritable getCurrentKey() throws IOException, InterruptedException {
return new IntWritable(this.kafkaInputSplit.getPartition());
}
//获取valuein:StartOffset分区的开始读取offset
public LongWritable getCurrentValue() throws IOException, InterruptedException {
return new LongWritable(this.kafkaInputSplit.getStartOffset());
}
//进度直接为0:代表并不是连续多次获取key/value键值对,只读取一次,只为了获取 keyin(分区号) 和 vlauein(StartOffset分区的开始读取offset)
public float getProgress() throws IOException, InterruptedException {
return 0.0f;
}
public void close() throws IOException {
}
}