本文及后续文章我们会基于Kafka官网由浅入深逐步了解原理
1. 创建与查看主题
$ kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
$ kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: CKrUNvoiSZCdmOSrN6qokQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
2. 生产者生产事件至主题
# Ctrl-C中断事件写入
$ kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>shenjian.online
>公众号算法小生
3. 消费者从主题中消费事件
$ kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
shenjian.online
公众号算法小生
因为事件持久存储在Kafka中,所以它们可以被任意多次读取,并且可以被任意多的消费者读取
4. KAFKA CONNECT持续导入导出数据
我们先用修改容器内配置文件的方式进行实践,暂时不先K8S挂载
# 配置连接器依赖包
cd /opt/bitnami/kafka/config
echo 'plugin.path=/opt/bitnami/kafka/libs/connect-file-3.3.1.jar' >> connect-standalone.properties
cat connect-standalone.properties
查看connect-file-source.properties配置信息:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
该配置主要是将test.txt按行读取信息写入connect-test主题中
查看connect-file-sink.properties配置信息:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
该配置主要是将connect-test主题中消息按行写入文件test.sink.txt中
# 在test.txt文件中写入信息
$ pwd
/opt/bitnami/kafka
$ echo '算法小生' >> test.txt
$ echo '算法小生' >> test.txt
启动connect连接器
connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
然后我们新开cmd窗口,重新docker exec -it containerid bash
进入容器
# 查看该文件可以看到输出
cat test.sink.txt
算法小生
算法小生
当我们继续追加信息至test.txt中,test.sink.txt及主题connect-test中可以看到shenjian.online
5. KAFKA STREAMS处理事件
我们可以通过KAFKA STREAMS来对主题中的事件进行转换、聚合等操作,如WordCount经典案例,后面会详细讲解,可以先了解
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
欢迎关注公众号算法小生与我沟通交流