kafka入门01

一、Kafka

1、消息中间件(现在改名为分布式流式平台

kafka

MQ

Redis

2、kafka的框架

生产者:Broker:消费者

正常部署的是:Broker

3、企业常用的框架

Flume-->Kafka(sink到Kafka)-->Spark streaming    流式处理

              Kafka streaming  

4、特点

1)发布/订阅

2)处理实时应用

3)分布式、可复制、容错性

5、官网介绍

由于企业很多用spark-streaming-kafka-0-10,所以kafka用0.10版本

二、安装部署kafka(kafka源码是scala编写的)

1、版本要求

kafka_2.11-0.10.0.0.tgz

scala version:2.11

2、环境要求

1)JDK

2)Scala

3)Zookeeper

3、软连接

ln -s zookeeper-3.4.6 zookeeper

删除:rm -rf zookeeper

类似window桌面快捷方式

ln -s 物理文件/文件夹 快捷方式的文件/文件夹

好处:

1)删除快捷的文件夹/文件,物理的没删除,增加安全系数

2)方便于升级和多版本的使用

3)了解硬连接(作业)

4、部署zk

1)conf/zoo.cfg

# the directory where the snapshot is stored.

# do not use /tmp for storage, /tmp here is just

# example sakes.

存储临时数据的,维护用

dataDir=/opt/software/zookeeper/data

server.1=hadoop001:2888:3888

server.2=hadoop002:2888:3888

server.3=hadoop003:2888:3888

三台机器的zoo.cfg一样

2)创建data

mkdir /opt/software/zookeeper/data

touch data/myid

echo 1 > data/myid

其他机器:
echo 2 > data/myid

echo 3 > data/myid

3)启动

./zkServer.sh start

4)查状态

./zkServer.sh status

5、部署kafka(kafka_2.11-0.10.0.1)

1)解压kafka

tar -zvxf kafka_2.11-0.10.0.1.tgz

2)软连接

ln -s kafka_2.11-0.10.0.1 kafka

3)修改conf/server.properties文件

Server Basics:

broker.id=1

port=9092

host.name=hadoop001

Log Basics:

log.dirs=/opt/software/kafka/logs(保存写入kafka的数据)

log Flush Policy:

数据写入磁盘的刷新方式。一个是信息刷了一万行就写入磁盘;一个是等待1s,就写入磁盘。

Zookeeper:

zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka

4)将server.properties发给其他机器并修改属于自己的server.properties

scp server.properties hadoop002://opt/software/kafka/config/

scp server.properties hadoop003://opt/software/kafka/config/

5)创建logs目录

mkdir logs

6)启动

企业上一般放在后台启动:
nohup ./kafka-server-start.sh ../config/server.properties &

./kafka-server-start.sh -daemon /opt/app/kafka/config/server.properties 

表示启动成功。

6、kafka的常用命令

1)kafka-topics.sh

概念

topic:主题

ERP系统log-->erp_topic

OA系统log-->oa_topic

为什么要用三台机器部署kafka:

因为要提高并发写/读性能。体现在partition 分区上。

创建topic,在topic里面生产消费数据

./kafka-topics.sh --create \
--zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka \
--replication-factor 3 \
--partitions 3 \
--topic test

--zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka

--replication-factor 3    1个分区3个副本 

--partitions 3                 3个分区,下标从0开始

进去logs目录,发现:

(1)为什么创建topic的时候zk地址要用到/kafka

因为这里/kafka指向zk的节点,便于删除zk上的kafka信息。

2)查看kafka有多少个分区

bin/kafka-topics.sh --list \

--zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka

3)发送信息和接收信息

生产者,发送消息:

bin/kafka-console-producer.sh \

--broker-list hadoop001:9092,hadoop002:9092,hadoop003:9092 \

--topic test

消费者,接收消息:

bin/kafka-console-consumer.sh \

--zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka \

--topic test \

--from-beginning

4)实验一下

在hadoop001启动生产者:

在hadoop002启动消费者:

5)单分区:有序,全局分区:无序

hadoop001:

6)如何保证生产上全局有序

案例:

MySQL BINLOG 日志文件 按顺序----有序-----》Kafka test 3个分区,但是消费的时候,读取这些分区时是无序的。

table:

ruozedata.stu

属性:

id name age

SQL:

insert into stu values(1,'jepson',18);
insert into stu values(2,'ruoze',28);
update stu set age=26 where id=1;
delete from stu where id=1;

test-0:
insert into stu values(1,'jepson',18);

test-1:
insert into stu values(2,'ruoze',28);
delete from stu where id=1;

test-2:
update stu set age=26 where id=1;

消费时
insert into stu values(2,'ruoze',28);
delete from stu where id=1;

insert into stu values(1,'jepson',18);

update stu set age=26 where id=1;

消费者读取数据时读分区是无序的,可能针对同一表的操作顺序有误。

解决方案:

核心点: 共性数据发送到同一个topic的1个分区

例如生产者:

test-0:
insert into stu values(1,'jepson',18);
update stu set age=26 where id=1;
delete from stu where id=1;

test-1:
insert into stu values(2,'ruoze',28);

test-2:
消费者:
无论怎么读,同个表的操作都在同一个分区中,而同分区是有序的,就不会出现数据不同步的问题。

拼装的Key: ruozedata_stu_id=1  

自定义分区(代码):

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        if (keyBytes == null) {

            int nextValue = nextValue(topic);

            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

            if (availablePartitions.size() > 0) {

                int part = Utils.toPositive(nextValue) % availablePartitions.size();

                return availablePartitions.get(part).partition();

            } else {

                // no partitions are available, give a non-available partition

                return Utils.toPositive(nextValue) % numPartitions;

            }

        } else {

            // hash the keyBytes to choose a partition

            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

        }

    }

  = hash(ruozedata_stu_id=1 ) 取模 0,1,2

    value: sq

2)./kafka-topics.sh --describe 

./kafka-topics.sh --describe \
--zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka \
--topic test

(1)解释

第一行:哪个topic 多少个分区 每个分区多少副本数

第二行开始,每一行列出了一个分区的信息,如它是第几个分区,这个分区的leader是哪个broker(之前设的broker id)(先读写到这),副本位于哪些broker(replicas),有哪些副本处于同步状态(Isr)(活跃的副本列表,有可能成为leader,比如2挂了1顶上)。

测试:kill -9 $(pgrep -f kafka)

因为我们在第二台机器kill掉了kafka,所以副本数落在第二台机器的分区的2挂了,选举1成为读写的副本

3)添加分区

--alter

Alter the number of partitions,  replica assignment, and/or configuration for the topic.  

改变分区数,例如,副本分配,配置主题。

将分区数改为4:

./kafka-topics.sh --alter \

--zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka \

--partitions 4 \

--topic test

成功。

4)副本的数据保存有效期

server.properties

5)删除kafka

--delete

./kafka-topics.sh --delete \

--zookeeper hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka \

--topic test

(1)暴力删除

bin/kafka-topics.sh --delete \

--zookeeper yws85:2181,yws86:2181,yws87:2181/kafka \

--topic test 

进入zk地址:

rmr /kafka/admin/delete_topics/test

rmr /kafka/config/topics/test

rmr /kafka/brokers/topics/test

logs物理数据:

rm -rf logs/test-*

猜你喜欢

转载自blog.csdn.net/Binbinhb/article/details/88585739
今日推荐