kafka api-实现生产者
本次测试依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
可根据自己的版本进行选择,注意,spark-streaming-kafka_2.11的version2.0以后会有很大变动,api请参照其他文档。
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
/**
* 实现一个生产者
* 模拟一些数据并把数据发送到kafka集群的topic中
*/
object ProderceDemo {
def main(args: Array[String]): Unit = {
// 定义一个topic
val topic = "test1"
// 创建配置信息对象
val props = new Properties()
// 把配置信息put到props里
// 指定序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder")
// 指定kafka列表
props.put("metadata.broker.list", "mini1:9092,mini2:9092")
// 设置发送数据后的响应方式:0、1、-1(0要求不严格,1,一般,-1很严格)
props.put("request.required.acks", "1")
// 调用分区器
props.put("partitioner.class", "kafka.producer.DefaultPartitioner")
//v可自定义分区器,如下
// props.put("partitioner.class", "自定义分区器的path")
// 创建Producer配置对象
val config = new ProducerConfig(props)
// 创建生产者对象
val producer: Producer[String, String] = new Producer(config)
// 生成一些数据
for (i <- 1 to 10000) {
val msg: String = s"$i : Producer send data...."
producer.send(new KeyedMessage[String, String](topic, msg))
// Thread.sleep(1000)
}
}
}
自定义分区器:
import kafka.producer.Partitioner
import kafka.utils.VerifiableProperties
class CustomPartitioner(props: VerifiableProperties = null) extends Partitioner{
//用hash值模于分区数,特别注意返回值↑↑↑↑↑
override def partition(key: Any, numPartitions: Int) = key.hashCode() % numPartitions
}