Kafka学习总结(六)——应用开发

1、开发一个生产者应用,如下:

package com.cattsoft;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

    static String message = "kafka message producer!";

    public static void main(String[] args) {
        System.out.println("生产消息开始!!!");
        Properties props = new Properties();
        /* 发送实际数据的socket连接将基于返回的metadata数据信息而建立 */
        props.setProperty("metadata.broker.list", "172.168.10.48:9092,172.168.10.49:9092,172.168.10.54:9092");
        /* 消息的序列化类别。默认编码器输入一个字节byte[],然后返回相同的字节byte[] */
        props.setProperty("serializer.class", "kafka.serializer.StringEncoder");

        /*
         * 此选项置顶了消息是否在后台线程中异步发送。正确的值: (1) async: 异步发送 (2) sync: 同步发送
         * 通过将producer设置为异步,我们可以批量处理请求(有利于提高吞吐率)但是这也就造成了客户端机器丢掉未发送数据的可能性
         */
        props.setProperty("producer.type", "asyc");

        /*
         * 仅仅for sync 0:
         * 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。这个选择提供了最小的时延但同时风险最大(
         * 因为当server宕机时,数据将会丢失)。 1:表示获得leader
         * replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。
         * -1:producer会获得所有同步replicas都收到数据的确认,同时时延最大。
         */
        props.put("request.required.acks", "1");
        /* 确认超时时间 */
        props.put("request.timeout.ms", 1000);
        /* broker尽力实现request.required.acks需求时的等待时间,否则会发送错误到客户端 */
        props.put("request.timeout.ms", 10000);
        /* 此项参数可以设置压缩数据的codec,可选codec为:“none”, “gzip”, “snappy” */
        props.put("compression.codec", "none");
        /* 在设置了压缩的情况下,可以指定特定的topic压缩,为指定则全部压缩 */
        props.put("compressed.topics", null);
        /* 消息发送最大尝试次数 */
        props.put("message.send.max.retries", 3);

        /* 批量消息的数量,仅仅for asyc */
        props.put("batch.num.messages", 100);

        /*
         * 当应用async模式时,用户缓存数据的最大时间间隔。例如,设置为100时,将会批量处理100ms之内消息。这将改善吞吐率,
         * 但是会增加由于缓存产生的延迟。
         */
        props.put("queue.buffering.max.ms", 5000);

        /* producer 缓存的消息的最大数量,仅仅for asyc */
        props.put("queue.buffering.max.message", 1000);

        /* 必须实现kafka.producer.Partitioner,根据Key提供一个分区策略 */
        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        String topic = "ADAPTER_QUEUE_SOLR_S1000";

        try {
            int i = 0;
            int count = 100;
            while (i < count) {
                KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, message);
                producer.send(data);
                i++;
            }
            System.out.println("生成消息完成!");
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

2、开发一个消费者应用

package com.cattsoft;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String topic) {
            consumer =(ConsumerConnector) kafka.consumer.Consumer
                    .createJavaConsumerConnector(createConsumerConfig());   
            this.topic =topic;
        }

    private ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();

        /* 指定zookeeper的连接的字符串 */
        props.put("zookeeper.connect", "130.60.23.193:2181,130.60.23.194:2181,130.60.23.195:2181");
        props.put("group.id", "zk999");
        // 如果true,consumer定期地往zookeeper写入每个分区的offset
        props.put("auto.commit.enable", "true");
        /* consumer向zookeeper提交offset的频率,单位是秒 */
        props.put("auto.commit.interval.ms", 60 * 1000);

        /*
         * zookeeper
         * 会话的超时限制。如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生
         */
        props.put("zookeeper.session.timeout.ms", "50000");
        /* 客户端在建立通zookeeper连接中的最大等待时间 */
        props.put("zookeeper.connection.timeout.ms", "20000");
        /* rebalance时的最大尝试次数 */
        /*
         * 当新的consumer加入到consumer group时,
         * consumers集合试图重新平衡分配到每个consumer的partitions数目。
         * 如果consumers集合改变了,当分配正在执行时,这个重新平衡会失败并重入
         */
        props.put("rebalance.max.retries", "5");
        /* 在重试reblance之前backoff时间 */
        props.put("rebalance.backoff.ms", "12000");

        /*
         * zookeeper中没有初始化的offset时,如果offset是以下值的回应:
         * smallest:自动复位offset为smallest的offset largest:自动复位offset为largest的offset
         * anything else:向consumer抛出异常
         */
        props.put("auto.offset.reset", "largest");

        /* ZK follower可以落后ZK leader的最大时间 */
        props.put("zookeeper.sync.time.ms", "1200");

        /* 这个参数避免在没有新数据的情况下重复频繁的拉数据。 如果拉到空数据,则多推后这个时间 */
        props.put("backoff.increment.ms", 1000);
        return new ConsumerConfig(props);
    }

    public void run() {
        Map<String, Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
        KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        System.out.println("*********Results********");
        int i = 1;
        while (it.hasNext()) {
            System.out.println(
                    Thread.currentThread() + " 接收到的第几个消息  " + i++ + " ------ " + new String(it.next().message()));
            try {
                Thread.sleep(800);
                this.consumer.commitOffsets();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        KafkaComsuerUtil consumerThread = new KafkaComsuerUtil("ADAPTER_QUEUE_SOLR_S1000");
        consumerThread.start();
    }
}

猜你喜欢

转载自blog.csdn.net/lyf_ldh/article/details/79776853