Kafka Training topic 二

pom


    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

6、在kafka集群中创建18BD-10主题 副本为2个,分区为3个

生产者设置:

消息确认机制 为all

重试次数 为2

批量处理消息字节数 为16384

设置缓冲区大小 为 33554432

设置每条数据生产延迟1ms

设置key的序列化为org.apache.kafka.common.serialization.StringSerializer

设置value的序列化为org.apache.kafka.common.serialization.StringSerializer

数据分发策略为指定分区2,把数据发送到指定的分区中

消费者设置:

消费者组id为test

设置自动提交偏移量

设置自动提交偏移量的时间间隔

设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer

设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer

消费指定分区2中的数据

模拟生产者,请写出代码向18BD-10主题中生产数据test0-test99

模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台

Priducer 代码

package Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by 一个蔡狗 on 2020/3/20.
 *
 * 在kafka集群中创建 18BD-10 主题 副本为2个,分区为3个
 生产者设置:
 消息确认机制 为all
 重试次数 为2
 批量处理消息字节数 为16384
 设置缓冲区大小 为 33554432
 设置每条数据生产延迟1ms
 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
 数据分发策略为指定分区2,把数据发送到指定的分区中

 模拟生产者,请写出代码向18BD-10主题中生产数据  test0 -  test99
 */
public class Priducer_06 {

    public static void main(String[] args) {


        //编写 生产数据程序
        //1、配置kafka集群环境(设置)
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "node001:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 2);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        // kafka   key 和value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");




        //2.  实例一个 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

        for (int i = 0; i < 100; i++) {
            //3.    模拟生产者,请写出代码向18BD-10主题中生产数据  test0 -  test99
            ProducerRecord producerRecord = new ProducerRecord("18BD-10",2,"","test"+i);
            kafkaProducer.send(producerRecord);


        }

    }









}

Consumer 代码

package Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 消费者设置:
 消费者组id为test
 设置自动提交偏移量
 设置自动提交偏移量的时间间隔
 设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 消费指定分区2中的数据


 模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台
 */
public class Consumer_06 {
    public static void main(String[] args) {


        //1、添加哦配置文件
        Properties props = new Properties();
        //指定kafka服务器
        props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
        //消费组
        props.put("group.id", "test");
        //以下两行代码 ---消费者自动提交offset值
        props.put("enable.auto.commit", "true");


        //自动提交的周期
        props.put("auto.commit.interval.ms",  "1000");

        //      设置 topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        props.put("auto.offset.reset","none");


        //kafka   key 和value的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        //2、实例消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);

        //3、设置读取的topic

        //Collection<TopicPartition>
//         模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台
        TopicPartition topicPartition2 = new TopicPartition("18BD-10",2);

        kafkaConsumer.assign(Arrays.asList(topicPartition2));

        //循环遍历
        while (true){
            //4、拉取数据,并输出
            //获取到所有的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);

            Set<TopicPartition> partitions = consumerRecords.partitions();

            for (TopicPartition partition : partitions) {

                //获取分区的数据,多条数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("数据是 "+record.value()+"      "+"分区是  "+record.partition());
                }

            }



        }

    }




}

7、在kafka集群中创建18BD-20主题 副本为2个,分区为3个

生产者设置:

消息确认机制 为all

重试次数 为1

批量处理消息字节数 为16384

设置缓冲区大小 为 33554432

设置每条数据生产延迟1ms

设置key的序列化为org.apache.kafka.common.serialization.StringSerializer

设置value的序列化为org.apache.kafka.common.serialization.StringSerializer

数据分发策略为轮询方式发送到每个分区中

手动提交每条数据

消费者设置:

消费者组id为test

设置手动提交偏移量

设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer

设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer

模拟生产者,请写出代码向18BD-20主题中生产数据test0-test99

模拟消费者,请写出代码把18BD-20主题中的2号分区的数据消费掉 ,打印输出到控制台

Priducer 代码

package Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 *
 * 在kafka集群中创建18BD-20主题 副本为2个,分区为3个
 生产者设置:
 消息确认机制 为all
 重试次数 为1
 批量处理消息字节数 为16384
 设置缓冲区大小 为 33554432
 设置每条数据生产延迟1ms
 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer

 数据分发策略为轮询方式发送到每个分区中
 手动提交每条数据


 模拟生产者,请写出代码向18BD-20主题中生产数据test0-test99

 */
public class Priducer_07 {

    public static void main(String[] args) {


        //编写 生产数据程序
        //1、配置kafka集群环境(设置)
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "node001:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 1);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        // kafka   key 和value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");




        //2.  实例一个 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

        for (int i = 0; i < 100; i++) {
            //3.   模拟生产者,请写出代码向18BD-20主题中生产数据test0-test99,  数据分发策略为轮询方式发送到每个分区中
            ProducerRecord producerRecord = new ProducerRecord("18BD-20","test"+i);


            kafkaProducer.send(producerRecord);



        }

    }









}

Consumer 代码

package Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
import java.util.Properties;
import java.util.Set;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 *
 * 消费者设置:
 消费者组id为test
 设置手动提交偏移量
 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer


 模拟消费者,请写出代码把18BD-20主题中的   2号分区  的  数据  消费  掉 ,打印输出到控制台
 */
public class Consumer_07 {



    //程序入口
    public static void main(String[] args){

        //1配置文件
        Properties props = new Properties();
        //指定kafka服务器
        props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
        //消费组
        props.put("group.id", "test");
        //以下两行代码 ---消费者自动提交offset值 为 true   手动为 false
        props.put("enable.auto.commit", "false");
        //自动提交的周期
        //props.put("auto.commit.interval.ms",  "1000");
        //kafka   key 和value的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //2消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);


        //3、设置读取的topic

        //Collection<TopicPartition>
//         模拟消费者,请写出代码把18BD-20主题中的   2号分区  的  数据  消费  掉 ,打印输出到控制台

        TopicPartition topicPartition2 = new TopicPartition("18BD-20",2);

        //循环遍历
        while (true){
            //4、拉取数据,并输出
            //获取到所有的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);

            Set<TopicPartition> partitions = consumerRecords.partitions();

            for (TopicPartition partition : partitions) {

                //获取分区的数据,多条数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("数据是 "+record.value()+"      "+"分区是 "+record.partition());
                }

            }



        }


    }

}

8、在kafka集群中创建18BD-30主题 副本为2个,分区为3个

生产者设置:

消息确认机制 为all

重试次数 为1

批量处理消息字节数 为16384

设置缓冲区大小 为 33554432

设置每条数据生产延迟1ms

设置key的序列化为org.apache.kafka.common.serialization.StringSerializer

设置value的序列化为org.apache.kafka.common.serialization.StringSerializer

数据分发策略为轮询方式发送到每个分区中

消费者设置:

消费者组id为test

设置手动提交偏移量

设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer

设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer

依次消费完每个分区之后手动提交offset

模拟生产者,请写出代码向18BD-30主题中生产数据test0-test99

模拟消费者,请写出代码把18BD-30主题中的2号分区的数据消费掉 ,打印输出到控制台

Priducer 代码

package Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 * 在kafka集群中创建18BD-30主题 副本为2个,分区为3个
 生产者设置:
 消息确认机制 为all
 重试次数 为1
 批量处理消息字节数 为16384
 设置缓冲区大小 为 33554432
 设置每条数据生产延迟1ms
 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
 数据分发策略为轮询方式发送到每个分区中

 模拟生产者,请写出代码向 18BD-30 主题中生产数据 test0 - test99
 */
public class Priducer_08 {


    public static void main(String[] args) {


        //编写 生产数据程序
        //1、配置kafka集群环境(设置)
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "node001:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 1);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        // kafka   key 和value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");




        //2.  实例一个 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

        for (int i = 0; i < 100; i++) {
            //3.   模拟生产者,请写出代码向18BD-30主题中生产数据test0-test99,  数据分发策略为轮询方式发送到每个分区中
            ProducerRecord producerRecord = new ProducerRecord("18BD-30","test"+i);


            kafkaProducer.send(producerRecord);



        }

    }

}

Consumer 代码

package Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
import java.util.Properties;
import java.util.Set;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 * 消费者设置:
 消费者组id为test
 设置手动提交偏移量
 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 依次消费完每个分区之后手动提交offset


 模拟消费者,请写出代码把18BD-30主题中的2号分区的数据消费掉 ,打印输出到控制台
 */
public class Consumer_08 {


    //程序入口
    public static void main(String[] args){

        //1配置文件
        Properties props = new Properties();
        //指定kafka服务器
        props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
        //消费组
        props.put("group.id", "test");
        //以下两行代码 ---消费者自动提交offset值 为 true   手动为 false
        props.put("enable.auto.commit", "false");
        //自动提交的周期
        //props.put("auto.commit.interval.ms",  "1000");
        //kafka   key 和value的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //2消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);


        //3、设置读取的topic

        //Collection<TopicPartition>
//         模拟消费者,请写出代码把18BD-30主题中的2号分区的数据消费掉 ,打印输出到控制台

        TopicPartition topicPartition2 = new TopicPartition("18BD-30",2);

        //循环遍历
        while (true){
            //4、拉取数据,并输出
            //获取到所有的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);

            Set<TopicPartition> partitions = consumerRecords.partitions();

            for (TopicPartition partition : partitions) {

                //获取分区的数据,多条数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("数据是 "+record.value()+"      "+"分区是"+record.partition());
                }

            }



        }


    }


}

9、在kafka集群中创建18BD-40主题 副本为2个,分区为3个

生产者设置:

消息确认机制 为all

重试次数 为1

批量处理消息字节数 为16384

设置缓冲区大小 为 33554432

设置每条数据生产延迟1ms

设置key的序列化为org.apache.kafka.common.serialization.StringSerializer

设置value的序列化为org.apache.kafka.common.serialization.StringSerializer

数据分发策略为轮询方式发送到每个分区中

消费者设置:

消费者组id为test

设置自动提交偏移量

设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer

设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer

消费指定分区0和分区2中的数据

模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99

模拟消费者,请写出代码把18BD-40主题中的0和2号分区的数据消费掉 ,打印输出到控制台

Priducer 代码

package Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 * 在kafka集群中创建18BD-40主题 副本为2个,分区为3个
 生产者设置:
 消息确认机制 为all
 重试次数 为1
 批量处理消息字节数 为16384
 设置缓冲区大小 为 33554432
 设置每条数据生产延迟1ms
 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
 数据分发策略为轮询方式发送到每个分区中

 模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99
 */
public class Priducer_09 {

    public static void main(String[] args) {


        //编写 生产数据程序
        //1、配置kafka集群环境(设置)
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "node001:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 1);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        // kafka   key 和value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");




        //2.  实例一个 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

        for (int i = 0; i < 100; i++) {
            //3.   模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99,  数据分发策略为轮询方式发送到每个分区中
            ProducerRecord producerRecord = new ProducerRecord("18BD-40","test"+i);


            kafkaProducer.send(producerRecord);



        }

    }


}

Consumer 代码

package Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 *
 消费者设置:
 消费者组id为test
 设置自动提交偏移量
 设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 消费指定分区0和分区2中的数据

 模拟消费者,请写出代码把18BD-40主题中的0和2号分区的数据消费掉 ,打印输出到控制台
 */
public class Consumer_09 {

    public static void main(String[] args) {


        //1、添加哦配置文件
        Properties props = new Properties();
        //指定kafka服务器
        props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
        //消费组
        props.put("group.id", "test");
        //以下两行代码 ---消费者自动提交offset值
        props.put("enable.auto.commit", "true");

        //自动提交的周期
        props.put("auto.commit.interval.ms",  "1000");

        //      设置 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        props.put("auto.offset.reset","earliest");


        //kafka   key 和value的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        //2、实例消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);

        //3、设置读取的topic

        //Collection<TopicPartition>
//         模拟消费者,请写出代码把18BD-10主题中的2号分区的数据消费掉 ,打印输出到控制台
        TopicPartition topicPartition0 = new TopicPartition("18BD-40",0);
        TopicPartition topicPartition2 = new TopicPartition("18BD-40",2);

        kafkaConsumer.assign(Arrays.asList(topicPartition2));

        //循环遍历
        while (true){
            //4、拉取数据,并输出
            //获取到所有的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);

            Set<TopicPartition> partitions = consumerRecords.partitions();

            for (TopicPartition partition : partitions) {

                //获取分区的数据,多条数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);

                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("数据是"+record.value()+"      "+"分区是"+record.partition());
                }

            }



        }

    }



}

10、在kafka集群中创建18BD-50主题 副本为2个,分区为3个

生产者设置:

消息确认机制 为all

重试次数 为1

批量处理消息字节数 为16384

设置缓冲区大小 为 33554432

设置每条数据生产延迟1ms

设置key的序列化为org.apache.kafka.common.serialization.StringSerializer

设置value的序列化为org.apache.kafka.common.serialization.StringSerializer

数据分发策略为轮询方式发送到每个分区中

消费者设置:

消费者组id为test

设置自动提交偏移量

设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer

设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer

消费指定分区0和分区2中的数据,并且设置消费0分区的数据offerset值从0开始,消费2分区的数据offerset值从10开始

模拟生产者,请写出代码向18BD-50主题中生产数据test0-test99

模拟消费者,请写出代码把18BD-50主题中的0和2号分区的数据消费掉 ,打印输出到控制台

Priducer 代码

package Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 * 在kafka集群中创建18BD-50主题 副本为2个,分区为3个
 生产者设置:
 消息确认机制 为all
 重试次数 为1
 批量处理消息字节数 为16384
 设置缓冲区大小 为 33554432
 设置每条数据生产延迟1ms
 设置key的序列化为org.apache.kafka.common.serialization.StringSerializer
 设置value的序列化为org.apache.kafka.common.serialization.StringSerializer
 数据分发策略为轮询方式发送到每个分区中

 模拟生产者,请写出代码向18BD-50主题中生产数据test0-test99
 */
public class Priducer_10 {

    public static void main(String[] args) {


        //编写 生产数据程序
        //1、配置kafka集群环境(设置)
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "node001:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 1);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //批量的缓冲区大小
        props.put("buffer.memory", 33554432);
        // kafka   key 和value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");




        //2.  实例一个 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);

        for (int i = 0; i < 100; i++) {
            //3.   模拟生产者,请写出代码向18BD-40主题中生产数据test0-test99,  数据分发策略为轮询方式发送到每个分区中
            ProducerRecord producerRecord = new ProducerRecord("18BD-50","test"+i);


            kafkaProducer.send(producerRecord);



        }

    }
}

Consumer 代码

package Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;

/**
 * Created by 一个蔡狗 on 2020/3/22.
 *
 * 消费者设置:
 消费者组id为test
 设置自动提交偏移量
 设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
 设置key的反序列化为org.apache.kafka.common.serialization.StringDeserializer
 设置value的反序列化为org.apache.kafka.common.serialization.StringDeserializer



 消费指定分区0和分区2中的数据,并且设置消费0分区的数据offerset值从0开始,消费2分区的数据offerset值从10开始

 模拟消费者,请写出代码把18BD-50主题中的0和2号分区的数据消费掉 ,打印输出到控制台
 */
public class Consumer_10 {


    public static void main(String[] args) {


        //1配置文件
        Properties props = new Properties();
        //指定kafka服务器
        props.put("bootstrap.servers", "node001:9092,node002:9092,node003:9092");
        //消费组
        props.put("group.id", "test");
        //以下两行代码 ---消费者自动提交offset值
        props.put("auto.offset.reset", "true");

        //只有当offset不存在的时候,才用latest或者earliest    设置当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        props.put("auto.offset.reset", "earliest");

        //自动提交的周期
        props.put("auto.commit.interval.ms",  "1000");
        //kafka   key 和value的反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //2消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        //3、设置topic



        //Collection<TopicPartition>  topicPartition0  只消费分区号为 0 的 分区
        TopicPartition topicPartition0 = new TopicPartition("18BD-50",0);

//        topicPartition2  只消费分区号为 2 的 分区
        TopicPartition topicPartition2 = new TopicPartition("18BD-50",2);

        kafkaConsumer.assign(Arrays.asList(topicPartition0,topicPartition2));

//        消费指定分区0和分区2中的数据,并且设置消费0分区的数据offerset值从0开始,     topicPartition0
        kafkaConsumer.seek(topicPartition0,0);

        //从头开始消费
//        kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition0));

//        消费2分区的数据offerset值从10开始
        kafkaConsumer.seek(topicPartition2,10);



        while (true){

            //4、拉取数据,并输出
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //通过数据获取到多有的分区  0   2
            Set<TopicPartition> partitions = consumerRecords.partitions();
            //遍历所有分区,或得到一个分区
            for (TopicPartition partition : partitions) {
                //获取每个分区的数据,多条数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
                //遍历分区内的所有数据,或得到一条
                for (ConsumerRecord<String, String> record : records) {


                    System.out.println(record.value()+"      "+record.partition());
                }


            }




        }




    }





}
发布了218 篇原创文章 · 获赞 291 · 访问量 29万+

猜你喜欢

转载自blog.csdn.net/bbvjx1314/article/details/105217000