2.Kafka系列之主题与事件

本文及后续文章我们会基于Kafka官网由浅入深逐步了解原理

1. 创建与查看主题

$ kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092                                                                    
Created topic quickstart-events.
$ kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: CKrUNvoiSZCdmOSrN6qokQ PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: quickstart-events        Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

2. 生产者生产事件至主题

# Ctrl-C中断事件写入
$ kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>shenjian.online
>公众号算法小生 

3. 消费者从主题中消费事件

$ kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
shenjian.online
公众号算法小生

因为事件持久存储在Kafka中,所以它们可以被任意多次读取,并且可以被任意多的消费者读取

4. KAFKA CONNECT持续导入导出数据

我们先用修改容器内配置文件的方式进行实践,暂时不先K8S挂载

# 配置连接器依赖包
cd /opt/bitnami/kafka/config
echo 'plugin.path=/opt/bitnami/kafka/libs/connect-file-3.3.1.jar' >> connect-standalone.properties
cat connect-standalone.properties

查看connect-file-source.properties配置信息:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt

该配置主要是将test.txt按行读取信息写入connect-test主题中

查看connect-file-sink.properties配置信息:

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt

该配置主要是将connect-test主题中消息按行写入文件test.sink.txt中

# 在test.txt文件中写入信息
$ pwd
/opt/bitnami/kafka
$ echo '算法小生' >> test.txt
$ echo '算法小生' >> test.txt

启动connect连接器

connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

然后我们新开cmd窗口,重新docker exec -it containerid bash进入容器

# 查看该文件可以看到输出
cat test.sink.txt
算法小生
算法小生

当我们继续追加信息至test.txt中,test.sink.txt及主题connect-test中可以看到shenjian.online

5. KAFKA STREAMS处理事件

我们可以通过KAFKA STREAMS来对主题中的事件进行转换、聚合等操作,如WordCount经典案例,后面会详细讲解,可以先了解

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

欢迎关注公众号算法小生与我沟通交流

猜你喜欢

转载自blog.csdn.net/SJshenjian/article/details/129780096