pom文件中添加:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
一、kafka消息生产:
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
*
* <p>Description: kafka 1.0.0</p>
* @author wanwei
* @date 2018年9月19日
*
*/
public class kafkaproducer1 {
public static void main(String[] args) {
Map<String,Object> props = new HashMap<String, Object>();
/*
* acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
* 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
* 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
* 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
* 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
* 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
* -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
* 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
*/
props.put("acks", "1");
//配置默认的分区方式
props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
//配置topic的序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//配置value的序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*
* kafka broker对应的主机,格式为host1:port1,host2:port2
*/
props.put("bootstrap.servers", "x.x.x.x:9092,x.x.x.x:9092");
//topic
String topic = "cy_test1";
KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props);
for(int i = 1 ;i < 2 ; i++) {
String real_time ="{\"fname\":\"5661215645410014\"\"incom\":\"25000\",\"serviceyear\":\"3\",\"work_city\":\"2\",\"marriage\":\"2\",\"education\":\"103\"}";
ProducerRecord<String,String> record_real_time = new ProducerRecord<String,String>(topic,real_time);
producer.send(record_real_time);
}
producer.close();
}
}
二、kafka消息消费:
package com.feidee.recommend.strategy.model;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* @Author
* @Date 2019/2/21 10:58
* @Description
* @Reviewer
*/
public class kafkaconsumer2 {
public static void main(String[] args) {
String tuijian_group = "test_1030_2";
String topic_11 = "11_online_system";
String recordStrFormat = "offset = %d, key = %s, value = %s\n";
Map<String,Object> props = new HashMap<String, Object>();
//配置topic的序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//配置value的序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/*
* kafka broker对应的主机,格式为host1:port1,host2:port2
*/
// props.put("bootstrap.servers", "10.201.7.174:9092,10.201.7.175:9092");
props.put("bootstrap.servers", "10.201.7.187:9093,10.201.7.188:9093");
//groupid
props.put("group.id", tuijian_group);
//topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//test test2 为topic的名字
consumer.subscribe(Arrays.asList(topic_11));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format(recordStrFormat, record.offset(), record.key(), record.value()));
}
}
} finally {
consumer.close();
}
}
}