flume
--------------
收集日志、移动、聚合框架。
基于事件。
agent
-------------
source //接收数据,生产者
//put()
//NetcatSource
//ExecSource,实时收集 tail -F xxx.txt
//spooldir
//seq
//Stress
//avroSource
channel //暂存数据,缓冲区,
//非永久性:MemoryChannel
//永久性 :FileChannel,磁盘.
//SpillableMemoryChannel :Mem + FileChannel.Capacity
sink //输出数据,消费者
//从channel提取take()数据,write()destination.
//HdfsSink
//HbaseSink
//avroSink
JMS
---------------
java message service,java消息服务。
queue //只有能有一个消费者。P2P模式(点对点).
//发布订阅(publish-subscribe,主题模式),
kafka
--------------
分布式流处理平台。
在系统之间构建实时数据流管道。
以topic分类对记录进行存储
每个记录包含key-value+timestamp
每秒钟百万消息吞吐量。
producer //消息生产者
consumer //消息消费者
consumer group //消费者组
kafka server //broker,kafka服务器
topic //主题,副本数,分区.
zookeeper //hadoop namenoade + RM HA | hbase | kafka
安装kafka
----------------
0.选择s202 ~ s204三台主机安装kafka
1.准备zk
略
2.jdk
略
3.tar文件
4.环境变量
略
5.配置kafka
[kafka/config/server.properties]
...
broker.id=202
...
listeners=PLAINTEXT://:9092
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:2181
6.分发server.properties,同时修改每个文件的broker.id
7.启动kafka服务器
a)先启动zk
b)启动kafka 这样启动可以在后台:bin/kafka-server-start.sh -daemon config/server.properties
[s202 ~ s204]
$>bin/kafka-server-start.sh config/server.properties
c)验证kafka服务器是否启动
$>netstat -anop | grep 9092
8.创建主题 副本:三个节点就是三个副本。我自己在一个节点上启动了一个kafka,那么副本就只能设置为1
$>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 1 --partitions 3 --topic test
9.查看主题列表
$>bin/kafka-topics.sh --list --zookeeper s201:2181
10.启动控制台生产者
$>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test //生产者不连接zookeeper,只连接broker
11.启动控制台消费者
$>bin/kafka-console-consumer.sh --bootstrap-server s201:9092 --topic test --from-beginning --zookeeper s201:2181
从头消费
12.在生产者控制台输入hello world
删除主题:bin/kafka-topics.sh --zookeeper spark1:2181 --delete --topic test
from-beginning从头消费:意思就是不管什么时候启动消费者都能把生产者之前发的消息收到
kafka集群在zk的配置
-----------------------
/controller ===> {"version":1,"brokerid":202,"timestamp":"1490926369148"
/controller_epoch ===> 1
/brokers
/brokers/ids
/brokers/ids/202 ===> {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
/brokers/ids/203
/brokers/ids/204
/brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
/brokers/topics/test/partitions/1/state ===>...
/brokers/topics/test/partitions/2/state ===>...
/brokers/seqid ===> null
/admin
/admin/delete_topics/test ===>标记删除的主题
/isr_change_notification
/consumers/xxxx/
/config
----------
删除主题 在zookeeper里彻底删除
ls /brokers
[ids, topics, seqid]
ls /brokers/topics/
[test]
删除主题
rmr /brokers/topics/test
rmr /admin/delete_topics/test ===>标记删除的主题
容错
----------------------
创建主题
-------------------
repliation_factor 2 partitions 5
$>kafka-topic.sh --zookeeper s202:2181 --replication_factor 3 --partitions 4 --create --topic test3
2 x 5 = 10 //是个文件夹
[s202]
test2-1 //
test2-2 //
test2-3 //
[s203]
test2-0
test2-2
test2-3
test2-4
[s204]
test2-0
test2-1
test2-4
重新布局分区和副本,手动再平衡
--------------------------------
$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204
副本
--------------
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。(都是知道副本在哪里)
支持到n-1故障。每个分区都有leader,follow.
leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。
新leader的选举是通过isr进行,第一个注册的follower成为leader。
kafka支持副本模式
---------------------
[同步复制]
1.producer联系zk识别leader
2.向leader发送消息
3.leadr收到消息写入到本地log
4.follower从leader pull消息
5.follower向本地写入log
6.follower向leader发送ack消息
7.leader收到所有follower的ack消息
8.leader向producer回传ack
[异步副本]
和同步复制的区别在与leader写入本地log之后,
直接向client回传ack消息,不需要等待所有follower复制完成。
--------------
收集日志、移动、聚合框架。
基于事件。
agent
-------------
source //接收数据,生产者
//put()
//NetcatSource
//ExecSource,实时收集 tail -F xxx.txt
//spooldir
//seq
//Stress
//avroSource
channel //暂存数据,缓冲区,
//非永久性:MemoryChannel
//永久性 :FileChannel,磁盘.
//SpillableMemoryChannel :Mem + FileChannel.Capacity
sink //输出数据,消费者
//从channel提取take()数据,write()destination.
//HdfsSink
//HbaseSink
//avroSink
JMS
---------------
java message service,java消息服务。
queue //只有能有一个消费者。P2P模式(点对点).
//发布订阅(publish-subscribe,主题模式),
kafka
--------------
分布式流处理平台。
在系统之间构建实时数据流管道。
以topic分类对记录进行存储
每个记录包含key-value+timestamp
每秒钟百万消息吞吐量。
producer //消息生产者
consumer //消息消费者
consumer group //消费者组
kafka server //broker,kafka服务器
topic //主题,副本数,分区.
zookeeper //hadoop namenoade + RM HA | hbase | kafka
安装kafka
----------------
0.选择s202 ~ s204三台主机安装kafka
1.准备zk
略
2.jdk
略
3.tar文件
4.环境变量
略
5.配置kafka
[kafka/config/server.properties]
...
broker.id=202
...
listeners=PLAINTEXT://:9092
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:2181
6.分发server.properties,同时修改每个文件的broker.id
7.启动kafka服务器
a)先启动zk
b)启动kafka 这样启动可以在后台:bin/kafka-server-start.sh -daemon config/server.properties
[s202 ~ s204]
$>bin/kafka-server-start.sh config/server.properties
c)验证kafka服务器是否启动
$>netstat -anop | grep 9092
8.创建主题 副本:三个节点就是三个副本。我自己在一个节点上启动了一个kafka,那么副本就只能设置为1
$>bin/kafka-topics.sh --create --zookeeper s201:2181 --replication-factor 1 --partitions 3 --topic test
9.查看主题列表
$>bin/kafka-topics.sh --list --zookeeper s201:2181
10.启动控制台生产者
$>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test //生产者不连接zookeeper,只连接broker
11.启动控制台消费者
$>bin/kafka-console-consumer.sh --bootstrap-server s201:9092 --topic test --from-beginning --zookeeper s201:2181
从头消费
12.在生产者控制台输入hello world
删除主题:bin/kafka-topics.sh --zookeeper spark1:2181 --delete --topic test
from-beginning从头消费:意思就是不管什么时候启动消费者都能把生产者之前发的消息收到
kafka集群在zk的配置
-----------------------
/controller ===> {"version":1,"brokerid":202,"timestamp":"1490926369148"
/controller_epoch ===> 1
/brokers
/brokers/ids
/brokers/ids/202 ===> {"jmx_port":-1,"timestamp":"1490926370304","endpoints":["PLAINTEXT://s202:9092"],"host":"s202","version":3,"port":9092}
/brokers/ids/203
/brokers/ids/204
/brokers/topics/test/partitions/0/state ===>{"controller_epoch":1,"leader":203,"version":1,"leader_epoch":0,"isr":[203,204,202]}
/brokers/topics/test/partitions/1/state ===>...
/brokers/topics/test/partitions/2/state ===>...
/brokers/seqid ===> null
/admin
/admin/delete_topics/test ===>标记删除的主题
/isr_change_notification
/consumers/xxxx/
/config
----------
删除主题 在zookeeper里彻底删除
ls /brokers
[ids, topics, seqid]
ls /brokers/topics/
[test]
删除主题
rmr /brokers/topics/test
rmr /admin/delete_topics/test ===>标记删除的主题
容错
----------------------
创建主题
-------------------
repliation_factor 2 partitions 5
$>kafka-topic.sh --zookeeper s202:2181 --replication_factor 3 --partitions 4 --create --topic test3
2 x 5 = 10 //是个文件夹
[s202]
test2-1 //
test2-2 //
test2-3 //
[s203]
test2-0
test2-2
test2-3
test2-4
[s204]
test2-0
test2-1
test2-4
重新布局分区和副本,手动再平衡
--------------------------------
$>kafka-topics.sh --alter --zookeeper s202:2181 --topic test2 --replica-assignment 203:204,203:204,203:204,203:204,203:204
副本
--------------
broker存放消息以消息达到顺序存放。生产和消费都是副本感知的。(都是知道副本在哪里)
支持到n-1故障。每个分区都有leader,follow.
leader挂掉时,消息分区写入到本地log或者,向生产者发送消息确认回执之前,生产者向新的leader发送消息。
新leader的选举是通过isr进行,第一个注册的follower成为leader。
kafka支持副本模式
---------------------
[同步复制]
1.producer联系zk识别leader
2.向leader发送消息
3.leadr收到消息写入到本地log
4.follower从leader pull消息
5.follower向本地写入log
6.follower向leader发送ack消息
7.leader收到所有follower的ack消息
8.leader向producer回传ack
[异步副本]
和同步复制的区别在与leader写入本地log之后,
直接向client回传ack消息,不需要等待所有follower复制完成。