版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/shenzhen_zsw/article/details/89284521
Sprint Boot集成Kafka
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
application.properties
......
#============== kafka ===================
kafka.consumer.zookeeper.connect=192.168.2.10:2181
kafka.consumer.servers=192.168.2.10:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10
kafka.producer.servers=192.168.2.10:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
......
KafkaProducerConfig
package com.youfan.kafka;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 埋点收集日志,频道数量少,该模块的频道id固定,id为1,代表生鲜
*/
long pindaoid = 1;//频道id
//productytpeid//产品类别id
//producid//产品id
//用户id
long userid = 0;//游客
//ip地址
String ipaddress = IpUtil.getIpAddress(request);
System.out.println(ipaddress);
HttpSession sesion = request.getSession();
Object userobject = sesion.getAttribute("user");
if(userobject!=null){
User user = (User)userobject;
userid = user.getId();
}
//获取浏览器信息以及操作系统信息
String osandbrowser = BrowserInfoUtil.getOsAndBrowserInfo(request);
System.out.println(osandbrowser);
String[] temps = osandbrowser.split("---");
String os = temps[0].trim();
String browser = temps[1].trim();
System.out.println(os);
System.out.println(browser);
Productscanlog productscanlog = new Productscanlog();
//根据ip获取地区和运营商
try {
AreaAndnetwork areaAndnetwork = AreaAndNetworkUtil.getAddressByIp(ipaddress);
productscanlog.setCounty(areaAndnetwork.getCounty());
productscanlog.setProvice(areaAndnetwork.getProvice());
productscanlog.setCity(areaAndnetwork.getCity());
productscanlog.setCounty(areaAndnetwork.getCounty());
productscanlog.setNetwork(areaAndnetwork.getNetwork());
}catch (Exception e){
e.printStackTrace();
}
productscanlog.setPindaoid(pindaoid);
productscanlog.setProductytpeid(productytpeid);
productscanlog.setProducid(Long.valueOf(producid+""));
productscanlog.setUserid(userid);
productscanlog.setIp(ipaddress);
productscanlog.setBrowser(browser);
productscanlog.setOs(os);
productscanlog.setTimestamp(new Date().getTime());
String productscanlogstring = JSONObject.toJSONString(productscanlog);
System.out.println(productscanlogstring);
kafkaTemplate.send("productscanlog", "key", productscanlogstring);
String productflume = userid +"\t" + pindaoid+"\t"+productscanlog.getTimestamp();
kafkaTemplate.send("productscanlogflume","key",productflume);