很长一段时间没有写博客了。最近单位系统升级,鉴于activemq+redis构架存在的一些我没有能力解决的问题。考虑再三,决定换kafka+zookeeper。选用的是多节点多boker集群。
先说一下我的服务器欢迎:
编号 | 服务器名 | 内网IP |
0 | server1.novalocal | 192.168.0.5 |
1 | server2.novalocal | 192.168.0.6 |
2 | server3.novalocal | 192.168.0.7 |
下面开始部署:
1、安装jdk,我安装的是jdk-10.0.2_linux-x64_bin.rpm
2、下载kafka,我安装的是kafka_2.12-2.0.0.tgz
3、tar -zxvf kafka_2.12-2.0.0.tgz
4、mv kafka_2.12-2.0.0 /usr/local/
5、修改zookeeper配置文件
配置文件路径:
/usr/local/kafka_2.12-2.0.0/config/zookeeper.properties
文件里已有的配置项
dataDir=/tmp/zookeeper 保存数据的目录
clientPort=2181 客户端连接端口
maxClientCnxns=0 限制连接到 ZooKeeper 的客户端的数量,限制并发连接的数量,它通过 IP 来区分不同的客户端。此配置选项可以用来阻止某些类别的 Dos 攻击。将它设置为 0 或者忽略而不进行设置将会取消对并发连接的限制。
手动添加到配置文件里的项:
initLimit=5 允许 follower (相对于 leader 而言的“客户端”)连接并同步到 leader 的初始化连接时间,它以 tickTime 的倍数来表示。当超过设置倍数的 tickTime 时间,则连接失败
syncLimit=2 leader 与 follower 之间发送消息,请求和应答时间长度。如果 follower 在设置的时间内不能与 leader 进行通信,那么此 follower 将被丢弃
server.N=YYY:A:B 服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)
server.0=192.168.0.5:2888:3888
server.1=192.168.0.6:2889:3889
server.2=192.168.0.7:2890:3890
其中N表示服务器编号,YYY表示服务器的IP地址,A为LF通信端口,表示该服务器与集群中的leader交换的信息的端口。B为选举端口,表示选举新leader时服务器间相互通信的端口(当leader挂掉时,其余服务器会相互通信,选择出新的leader)。一般来说,集群中每个服务器的A端口都是一样,每个服务器的B端口也是一样。但是当所采用的为伪集群时,IP地址都一样,只能时A端口和B端口不一样。
6、在dataDir配置的路径下创建myid文件,这个id是zookeeper的主机标识,每个主机id不同,第一台是0;第二台是1;第三台是2。也就是说3个zookeeper配置文件除了myid不同,其他都一样。
按照例子里的路径就是:/tmp/zookeeper/myid,内容分别是0、1、2
7、启动zookeeper服务
./zookeeper-server-start.sh ../config/zookeeper.properties &
如果启动之后连不上,一般都是端口开放的问题,找到防火墙仔细看一下就行。
8、修改kafka配置文件
配置文件路径:
/usr/local/kafka_2.12-2.0.0/config/server.properties
broker.id=0、1、2
listeners=PLAINTEXT://192.168.0.7:9092
advertised.listeners=PLAINTEXT://server3.novalocal:9092
zookeeper.connect=192.168.0.5:2181,192.168.0.6:2181,192.168.0.7:2181
zookeeper.connection.timeout.ms=60000
num.partitions=3
再配置一下日志信息,让kafka根据日志文件大小自动删除,我这里配置的是256M一个文件,256x1024x1024=268435456
注释掉# log.retention.hours=168
打开log.retention.bytes=268435456
9、修改producer.properties
bootstrap.servers=192.168.0.5:9092,192.168.0.6:9092,192.168.0.7:9092
10、修改consumer.properties
bootstrap.servers=192.168.0.5:9092,192.168.0.6:9092,192.168.0.7:9092
11、在hosts里面配置上服务器名
192.168.0.5 server1.novalocal
192.168.0.6 server2.novalocal
192.168.0.7 server3.novalocal
12、最后启动kafka服务
./kafka-server-start.sh ../config/server.properties &
到现在安装部署工作已经完成。随便生产一条试试:
./kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1
然后消费掉。
./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1
下面是java实现生产和消费的内容:
13、用java直接对String分优先级进行生产和消费
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.0.5:9092,192.168.0.6:9092,192.168.0.7:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = null;
try {
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 50000000; i++) {
String msg = "Message " + i;
producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
msg = "Message_1 " + i;
producer.send(new ProducerRecord<String, String>("HelloWorld_1", msg));
System.out.println("Sent:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
try {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.0.5:9092,192.168.0.6:9092,192.168.0.7:9092");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("max.poll.records", "1");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
Properties properties_1 = new Properties();
properties_1.put("bootstrap.servers", "192.168.0.5:9092,192.168.0.6:9092,192.168.0.7:9092");
properties_1.put("group.id", "group-2");
properties_1.put("enable.auto.commit", "true");
properties_1.put("auto.commit.interval.ms", "1000");
properties_1.put("auto.offset.reset", "earliest");
properties_1.put("session.timeout.ms", "30000");
properties_1.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties_1.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties_1.put("max.poll.records", "1");
KafkaConsumer<String, String> kafkaConsumer_1 = new KafkaConsumer<>(properties_1);
kafkaConsumer_1.subscribe(Arrays.asList("HelloWorld_1"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(2);
int records_count = records.count();
if (records_count == 0) {
records = kafkaConsumer_1.poll(2);
}
if (records.count() == 0) {
Thread.sleep(1000);
} else {
for (ConsumerRecord<String, String> record : records) {
System.out.println(Tools.GetCreatetime("yyyy-MM-dd HH:mm:ss") + "\toffset = " + record.offset() + ", value = " + record.value());
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
14、用java对自定义对象进行生产和消费
try {
String topic = "test"; // 定义要操作的主题
Properties pro = new Properties(); // 定义相应的属性保存
pro.setProperty("zookeeper.connect", "192.168.0.5:2181,192.168.0.6:2181,192.168.0.7:2181"); // 这里根据实际情况填写你的zk连接地址
pro.setProperty("metadata.broker.list", "192.168.0.5:9092,192.168.0.6:9092,192.168.0.7:9092"); // 根据自己的配置填写连接地址
pro.setProperty("serializer.class", ObjectEncoder.class.getName()); // 填写刚刚自定义的Encoder类
Producer<Integer, Object> prod = new Producer<Integer, Object>(new ProducerConfig(pro));
for (int i = 0; i < 200000000; i++) {
String Name = Tools.getName();
int Age = Tools.getAge();
Date Birthday = Tools.getBirthday(Age);
Sex sex = Tools.getSex(Name);
Member member = new Member(Name, Age, Birthday, sex);
prod.send(new KeyedMessage<Integer, Object>(topic, member));
// System.out.println(member);
System.out.println(Name + "\t" + sex + "\t" + Age + "\t" + Tools.GetCreatetime(Birthday, "yyyy-MM-dd"));
}
} catch (Exception e) {
e.printStackTrace();
}
try {
String topic = "test"; // 定义要操作的主题
Properties pro = new Properties(); // 定义相应的属性保存
pro.setProperty("zookeeper.connect", "192.168.0.5:2181,192.168.0.6:2181,192.168.0.7:2181"); // 这里根据实际情况填写你的zk连接地址
pro.setProperty("metadata.broker.list", "server1.novalocal:9092"); // 根据自己的配置填写连接地址
pro.setProperty("group.id", "group1");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(pro)); // 需要定义一个主题的映射的存储集合
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1); // 设置要读取数据的主题
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicMap); // 现在只有一个主题,所以此处只接收第一个主题的数据即可
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0); // 第一个主题
ConsumerIterator<byte[], byte[]> iter = stream.iterator();
while (iter.hasNext()) {
// String msg = new String(iter.next().message()) ;
Member vo = (Member) BeanUtils.BytesToObject(iter.next().message()); // 接收消息,并将字节数组转换为对象
// System.out.println("接收到消息:" + vo);
// System.out.println(vo.getName() + "\t" + vo.getSex() + "\t" + +vo.getAge() + "\t" + Tools.GetCreatetime(vo.getBirthday(), "yyyy-MM-dd"));
Speed++;
}
} catch (Exception e) {
e.printStackTrace();
}
具体工程放在附件里面了。请大家参考。