版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_15014327/article/details/83421755
一.项目环境搭建
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.lv</groupId>
<artifactId>kafka-study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-study</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
</dependencies>
<build>
<finalName>kafka-study</finalName>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
二.编写代码
1.生产者
package kafka;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerOps {
public static void main(String[] args) throws IOException {
/**
* 加载配置文件
*/
Properties prop = new Properties();
InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
prop.load(in);
/**
* 两个泛型参数 第一个泛型参数:kafka中一条记录key的类型 第二个泛型参数:kafka中一条记录value的类型
*/
String[] girls = new String[] { "科比", "詹姆斯", "杜兰特", "库里" };
Producer<String, String> producer = new KafkaProducer<String, String>(prop);
Random random = new Random();
int start = 1;
for (int i = start; i <= start + 9; i++) {
String topic = prop.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
String key = i + "";
String value = "今天<--" + girls[random.nextInt(girls.length)] + "-->得分非常高!";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value);
producer.send(producerRecord);
}
}
}
生产者的配置文件:
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=192.168.2.15:9092,192.168.2.15:9093,192.168.2.15:9094
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=
partitioner.class=kafka.MyKafkaPartitioner
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
#####设置自定义的topic
producer.topic=nbaStar
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
2.消费者
package kafka;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerOps {
public static void main(String[] args) throws IOException {
/**
* 加载配置文件
*/
Properties properties = new Properties();
InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
properties.load(in);
Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
Collection<String> topics = Arrays.asList("nbaStar");
/**
* 消费者订阅topic
*/
consumer.subscribe(topics);
ConsumerRecords<String, String> consumerRecords = null;
while (true) {
// 接下来就要从topic中拉取数据
consumerRecords = consumer.poll(1000);
// 遍历每一条记录
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
long offset = consumerRecord.offset();
int partition = consumerRecord.partition();
Object key = consumerRecord.key();
Object value = consumerRecord.value();
System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
}
}
}
}
消费者的配置文件:
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=192.168.2.15:2181,192.168.2.15:2182,192.168.2.15:2183
bootstrap.servers=192.168.2.15:9092,192.168.2.15:9093,192.168.2.15:9094
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
3.自定义分区
package kafka;
import java.util.Map;
import java.util.Random;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class MyKafkaPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> config) {
}
@Override
public void close() {
}
/**
* 根据给定的数据设置相关的分区
*
* @param topic 主题名称
* @param key key
* @param keyBytes 序列化之后的key
* @param value value
* @param valueBytes 序列化之后的value
* @param cluster 当前集群的元数据信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer partitionNums = cluster.partitionCountForTopic(topic);
int targetPartition = -1;
if (key == null || keyBytes == null) {
targetPartition = new Random().nextInt(10000) % partitionNums;
} else {
int hashCode = key.hashCode();
targetPartition = hashCode % partitionNums;
System.out.println(
"key:" + key + ",value" + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
}
return targetPartition;
}
}
4.打包上传到服务器执行