Kafka简单入门与Spring结合实践
一、【安装部署kafka服务器环境(centos7.0)】:
1.【注意】新版的kafka已经内置了一个zookeeper环境
2.【安装与运行】:
可以在kafka官网 http://kafka.apache.org/downloads下载到最新的kafka安装包,选择下载二进制版本的tgz文件,本章用的是版本2.11_2.0.0,在centos7.0 直接解压即可
3.【运行命令】:
./zookeeper-server-start.sh ../config/zookeeper.properties &
./kafka-server-start.sh ../config/server.properties &
4.【注意】:
【问题一】:Java端的消费者取不到消息,生产者消息也没发送成功,java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息
【解决办法】:修改kafka/config/server.properties 中的advertised.listeners 这个值改成自己虚拟机IP地址
【问题二】:WARN [Consumer clientId=consumer-1, groupId=console-consumer-950] Connection to node -1 could not be established. Broker may not be available.
【解决办法】:查看本机宿主机是否可以ping通centos7.0 虚拟机的IP地址,一般ping不通,虚拟机的IP地址改变了。
二、【Java程序 + Spring -Kafka 运行实例】:
参考博客:https://www.cnblogs.com/hei12138/p/7805475.html
1.【Java程序】 :
pom.xml 依赖配置:【注意】:其中版本号要与服务器端版本一致
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
【主题配置】:TopicMain.java
package com.caox.kafka._01_topic;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Created by nazi on 2018/8/27.
* @author nazi
*/
public class TopicMain {
public static void main(String[] argv) throws Exception {
//创建topic
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.150:9092");
AdminClient adminClient = AdminClient.create(props);
ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
/**
* NewTopic(String name, int numPartitions, short replicationFactor)
* 的构造方法来创建了一个名为“topic-test”,分区数为1,复制因子为1的Topic.
*/
NewTopic newTopic = new NewTopic("topic-test3", 1, (short) 1);
topics.add(newTopic);
CreateTopicsResult result = adminClient.createTopics(topics);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
【生产者】:ProducerMain.java
package com.caox.kafka._01_topic;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by nazi on 2018/8/27.
* @author nazi
*/
@Slf4j
public class ProducerMain {
public static void main(String[] argv) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.149:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++){
producer.send(new ProducerRecord<String, String>("topic-test2", Integer.toString(i), Integer.toString(i)));
}
log.info("call SUCCESS");
producer.close();
}
}
【消费者】:ConsumerMain.java
package com.caox.kafka._01_topic;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
/**
* Created by nazi on 2018/8/27.
* @author nazi
*/
@Slf4j
public class ConsumerMain {
public static void main(String[] argv) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.149:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("topic-test2"),new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//将偏移设置到最开始
consumer.seekToBeginning(collection);
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
2.【Spring-Kafka配置程序】 :
【pom.xml】依赖配置:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
【kafka配置】:KafkaConfig.java:
package com.caox.kafka._02_spring_kafka;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* Created by nazi on 2018/8/28.
* @author nazi
*/
@Configuration
@EnableKafka
public class KafkaConfig {
private static String BOOTSTRAP_SERVERS_CONFIG = "192.168.80.150:9092";
/**
* topic配置
*/
/******************************************************************************************************************/
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS_CONFIG);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("foo", 10, (short) 1);
}
/******************************************************************************************************************/
/**
* 配置生产者Factory及Template
*/
/******************************************************************************************************************/
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<Integer,String>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String,Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
/******************************************************************************************************************/
/**
* 配置ConsumerFactory
*/
/******************************************************************************************************************/
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<Integer, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
}
@Bean
public Map<String,Object> consumerConfigs(){
HashMap<String, Object> props = new HashMap<String, Object>();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS_CONFIG);
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
/******************************************************************************************************************/
/**
* 默认spring-kafka会为每一个监听方法创建一个线程来向kafka服务器拉取消息
*/
@Bean
public SimpleConsumerListener simpleConsumerListener(){
return new SimpleConsumerListener();
}
}
【生产者配置】:ProducerMain.java
package com.caox.kafka._02_spring_kafka;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Created by nazi on 2018/8/28.
* @author nazi
*/
public class ProducerMain {
/**
* 创建消息生产者
* @param argv 参数
* @throws Exception 异常
*/
public static void main(String[] argv) throws Exception {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaConfig.class);
KafkaTemplate<Integer, String> kafkaTemplate = (KafkaTemplate<Integer, String>) ctx.getBean("kafkaTemplate");
String data="this is a test message";
ListenableFuture<SendResult<Integer, String>> send = kafkaTemplate.send("topic-test3", 1, data);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
public void onFailure(Throwable throwable) {
}
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
System.out.println("success to receive message !");
}
});
}
}
【消费者配置】:消费者监听配置 SimpleConsumerListener.java:
package com.caox.kafka._02_spring_kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
/**
* Created by nazi on 2018/8/28.
* @author nazi
*/
public class SimpleConsumerListener {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumerListener.class);
private final CountDownLatch latch1 = new CountDownLatch(1);
@KafkaListener(id = "foo", topics = "topic-test3")
// public void listen(byte[] records) {
// //do something here
// this.latch1.countDown();
// }
public void listen(ConsumerRecord record) {
System.out.println("listen : " + " key:"+ record.key() + " value: " + record.value());
}
}