1. 配置文件 producer.properties #kafka broker list metadata.broker.list=master:9092,slave1:9092,slave2:9092,slave3:9092 #异步 producer.type=sync #压缩方式 compression.codec=0 #序列化 serializer.class=kafka.serializer.StringEncoder #batch.num.messages=100 2.生产者代码 import java.io.FileNotFoundException; import java.io.IOException; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 生产者 */ public class TestProducer { public static void main(String[] args) throws FileNotFoundException, IOException, InterruptedException { /** * 1、读取配置文件 */ Properties properties = new Properties(); //properties.load(new FileInputStream(new File("producer.properties"))); properties.load(TestProducer.class.getClassLoader().getResourceAsStream("producer.properties")); //2、传入配置文件,创建配置 ProducerConfig config = new ProducerConfig(properties); //3、通过配置文件,创建生产者 Producer<String, String> producer = new Producer<String, String>(config); for (int i = 0; i < 50; i++) { //4、创建消息,传入topic和消息实体 KeyedMessage<String, String> km = new KeyedMessage<String, String>("test-topic","this is a msg"+i); //5、发送消息 producer.send(km); Thread.sleep(300); } } }
kafka生成者实例
猜你喜欢
转载自houston123.iteye.com/blog/2317479
今日推荐
周排行