注意:在配置文件server.properties中指定了partition的数量num.partitions。这指的是多单个topic的partition数量之和。若有多个broker,可能partition分布在不同的节点上,则多个broker的所有partitioin数量加起来为num.partitions
0.7中producer的配置有几项是相排斥的,设置了其一,就不能设置其二
比如:
broker.list 与 zk.connect 不能同时设置
broker.list 与 partitioner.class 不能同时设置
如果这么干,编译时无所谓,运行时会抛异常
1,指定broker
props.put("broker.list", "0:10.10.10.10:9092");//直接连接kafka
设置这项后,就不能设置partitioner.class了,可是我在运行的时候发现,此时所有的数据都发往10.10.10.10的4个分区,并没有只发给一个分区。我换了syncproducer里的send(topic,partitionid,list)都没用。
2,指定partition
props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
props.put("zk.connect", "10.10.10.10:2181");//连接zk
上面的 com.kafka.myparitioner.CidPartitioner 为自己实现的类,注意要自己实现完整的包名
CidPartitioner继承了Partitioner类,其中实现的partition方法指定了通过key计算partition的方法
package com.kafka.myparitioner;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
//设定依据key将当前这条消息发送到哪个partition的规则
public class CidPartitioner implements Partitioner {
public CidPartitioner(VerifiableProperties props) {
//注意 : 构造函数的函数体没有东西,但是不能没有构造函数
}
@Override
public int partition(Object key, int numPartitions) {
try {
long partitionNum = Long.parseLong((String) key);
return (int) Math.abs(partitionNum % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
想要依据key来进行partition的分配,需要在发送消息的时候指定key。
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
//与KafkaReceiverLTELogSocket的区别在于,指定了消息的partition分配规则
public class KafkaReceiveLTELogSocketPartition extends Thread{
//按照一定的时间间隔发送LTE信令数据
String regEx ="[^0-9.\\+\\-\\s+\\,E]";
Pattern p = Pattern.compile(regEx);
//第一个类型代表key的类型,第二个代表消息的类型
private final kafka.javaapi.producer.Producer<String, String> producer;
private final String topic;
private final Properties props = new Properties();
private final int port = 12345;
public KafkaReceiveLTELogSocketPartition(String topic) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "192.168.1.164:9093"); // 配置kafka端口
props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
//props.put("zk.connect", "192.168.1.164:2181");//连接zk,新的版本好像不需要
producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
this.topic = topic;
}
public void receiveAndWrite2(String outputFileName , int port) throws IOException{
ServerSocket serverSocket = new ServerSocket(port);
Socket socket = serverSocket.accept();
StringBuilder sb = new StringBuilder();
try{
while(true){
InputStream istream = socket.getInputStream();
int count = 0;
while (count == 0) {
count = istream.available();
}
byte[] b = new byte[count];
istream.read(b);
for(int i = 0 ; i < count ; i ++){
if(b[i]=='\n'){ //当遇到流中的换行符时,说明已经获取一条完整的信息,发送
String str = sb.toString();
//获取key_cid_str
String key_cid_str = str.substring(str.indexOf(":")+1, str.indexOf(","));
System.out.println("接收长度:"+str.length());
System.out.println(str);
//第一个参数代表key的类型,第二个参数代表message的类型
producer.send(new KeyedMessage<String, String>(topic,key_cid_str,str));
sb = new StringBuilder();
}else{
sb.append(Character.toChars(b[i]));
}
}
}
}finally{
// 关闭socket,不要再while中关闭,否则发送方每次都要重建连接
socket.close();
serverSocket.close();
}
}
@Override
public void run() {
String filename = "JSON1_Yanming_DriveTesting_09-04.16-17.16-27_TIME.json";
String outputFileName = ""+filename;
try {
receiveAndWrite2(outputFileName,port);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String topic = "kafka_flume_topic";
new KafkaReceiveLTELogSocketPartition(topic).start();
}
}