Kafka开发环境搭建(Java)
新建maven工程,然后这是用到的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kafkaDevEnv</groupId>
<artifactId>kafkaDevEnv</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka.api</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging -->
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.11</artifactId>
<version>3.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang.modules/scala-parser-combinators -->
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parser-combinators_2.11</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/net.sf.jopt-simple/jopt-simple -->
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
</dependencies>
</project>
然后写一个测试的代码
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaDemoAPI {
private final String TOPIC = "testAPI";
// 设置kafka属性
private Properties props = new Properties();
public KafkaDemoAPI() {
}
public void run() {
//指定序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生产者 指定bootstrapServers
props.put("bootstrap.servers", "Hadoop01:9092");
//初始化KafkaProducer,通过创建对象向KafkaProducer传kafka属性
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
//生产数据,并在控制台上打印
int messageNo = 51;
while (messageNo < 101) {
String messageStr = new String("producerMessage_" + messageNo);
//send方法 异步传输数据到topic
producer.send(new ProducerRecord<Integer, String>(TOPIC, messageStr));
System.out.println(messageStr);
messageNo++;
}
producer.close();
}
public static void main(String[] args) {
new KafkaDemoAPI().run();
}
}
然后通过kafka-console-consumer.sh消费数据
可以看到生产的数据确实如代码里写的一样