直入主题:Kafka是一个消息系统,通过消费端订阅生产端,从而消费所需的数据。
问题的产生原因是生成端发送大量数据,但是海量的数据只对应一个topic,且对这个topic开辟多个分区并未成功发送数据,因此自己测试了生成端发送数据至一个topic,十个分区。生产方发送数据至一个toppic,十个分区中,消费端采用十个线程采集这一个topic与十个分区的数据(建议数据量大的数据可以采用创建多个topic并每个topic对应多个分区,这个可以大大提高采集数据的效率)。结果代码如下,可直接粘贴运行:
生成端模拟发送数据至一个topic并创建十个分区代码(在创建一个topic多个分区的方法之下有创建一个topic一个分区的默认的方法,这里使用的上面那个方法,一个topic对应十个分区且每个分区中发送6条数据):
public KafkaProducer() { Properties props = new Properties(); props.put("zookeeper.connect", "xxxx:2181,xxxx:2181,xxxx:2181"); // props.put("zookeeper.connect", "localhost:2181"); // 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[] props.put("serializer.class", "kafka.serializer.StringEncoder"); // 同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息 props.put("producer.type", "sync"); // 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。 props.put("compression.codec", "1"); // 指定kafka节点列表,用于获取metadata(元数据),不必全部指定 props.put("metadata.broker.list", "lognn1te:6667,lognn2te:6667,logrmte:6667"); config = new ProducerConfig(props); } @Override public void run() { producer = new Producer<String, String>(config); for(int i = 1; i <= 5; i++){ //5个分区 List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); for(int j = 0; j < 6; j++){ //每个分区6条讯息 //针对topic创建相应分区数并发送数据 messageList.add(new KeyedMessage<String, String>("wujun", "我是分区名称partition[" + i + "]", "我是发送的内容message[The " + i + " message]")); producer.send(messageList); } } //针对topic创建一个分区并发送数据 //List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); // for(int i = 1; i <= 10; i++){ // messageList.add(new KeyedMessage<String, String>("wj", "我是发送的内容message"+i)); //} //producer.send(messageList); // producer.close(); // } } public static void main(String[] args) { Thread t = new Thread(new KafkaProducer()); t.start(); } }
以上是生成端的代码,发送10条数据至一个topic(wj)十个分区中
消费端代码:
public class testKafka implements Runnable { public void run() { //所要开辟的线程数量 final int a_numThreads = 5; //对应的kafka采集地址(本地测试的可以写为localhost:2181),对个地址用逗号隔开 String zk = "lognn1te:2181,lognn2te:2181,logrmte:2181"; //topic,与发送端生成的topic名称一致 String topic = "wj"; //groupid,生成端从新生成一个topic,消费端消费时最好变动groupid String groupId = "test"; Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("zookeeper.connectiontimeout.ms", "30000"); props.put("group.id", groupId); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); //所要开辟的线程数量 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { //使用相应数量的线程数量采集数据 executor.submit(new KafkaConsumerThread(stream, threadNumber)); //查看消费对应的线程 threadNumber++; } } public static void main(String[] args) { Thread t = new Thread(new testKafka()); t.start(); } }
public class KafkaConsumerThread implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public KafkaConsumerThread(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()){ // System.out.println(Thread.currentThread().getName() + ":" +"partition:["+ mam.partition() +"]"+ "," + new String(mam.message())); System.out.println("使用线程:" + m_threadNumber + " 发送的内容:" + new String(it.next().message())); // System.out.println("Shutting down Thread: " + m_threadNumber); } } }
以上两个类是消费端的代码,消费端使用了5个线程去采集该topic十个分区中的数据,以下是测试的结果:
打印结果:
使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 4 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 0 message] 使用线程:3 发送的内容:我是发送的内容message[The 1 message] 使用线程:3 发送的内容:我是发送的内容message[The 2 message] 使用线程:3 发送的内容:我是发送的内容message[The 3 message] 使用线程:3 发送的内容:我是发送的内容message[The 4 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 4 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 0 message] 使用线程:4 发送的内容:我是发送的内容message[The 1 message] 使用线程:4 发送的内容:我是发送的内容message[The 2 message] 使用线程:4 发送的内容:我是发送的内容message[The 3 message] 使用线程:4 发送的内容:我是发送的内容message[The 4 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 2 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 2 message] 使用线程:2 发送的内容:我是发送的内容message[The 3 message] 使用线程:2 发送的内容:我是发送的内容message[The 0 message] 使用线程:2 发送的内容:我是发送的内容message[The 1 message] 使用线程:2 发送的内容:我是发送的内容message[The 2 message] 使用线程:2 发送的内容:我是发送的内容message[The 3 message] 使用线程:2 发送的内容:我是发送的内容message[The 4 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 0 message] 使用线程:1 发送的内容:我是发送的内容message[The 1 message] 使用线程:1 发送的内容:我是发送的内容message[The 2 message] 使用线程:1 发送的内容:我是发送的内容message[The 3 message] 使用线程:1 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 4 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 0 message] 使用线程:0 发送的内容:我是发送的内容message[The 1 message] 使用线程:0 发送的内容:我是发送的内容message[The 2 message] 使用线程:0 发送的内容:我是发送的内容message[The 3 message] 使用线程:0 发送的内容:我是发送的内容message[The 4 message]
从以上的打印结果可以清楚的说明结果:
1.当使用的分区数大于开辟的线程数,消费端消费数据时会有一个线程同时采集1个以上分区的数据(不会出现一个分区对应多个线程的情况,这样采集数据会重复混乱)当某个分区中的数据较少时,采集的线程快速的采完了该分区的数据,处于空闲的状态则有可能从新分给别的分区进行采集任务。
2.当使用的分区数等于或者小于线程数且每个分区数据量比较大时,这样就会一个线程对应采集一个分区中的数据,开辟多余的线程数处于闲置状态。
以上是消费端,生产端对应topic与分区数量所采用的线程大小采集问题总结。