关于kafka的这些概念和理论请见这篇博客 https://www.jianshu.com/p/d3e963ff8b70,
在下只简单阐述一下自己遇到的问题以及解决办法 由于之前自己配置的maven版本和消费kafka信息的姿势有问题,所以导致上线时疯狂报错:Attempt to join group failed due to fatal error: The configured groupId is i xxx
现贴上正确的使用套路 上maven 这里要注意的是 如果你kafka配置jar包不指定版本,那么默认会追随springboot的版本,所以当你使用低版本的springboot的时候建议带上版本号
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.15</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--jdbc -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
</dependencies>
yml文件
server:
port: 8082
spring:
kafka:
consumer:
enable-auto-commit: false
# group-id: test-group-1
auto-offset-reset: earliest
bootstrap-servers: 192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092
java代码
package com.xuebusi.consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
/**
* 消费者
* 指定topics
*/
@Component
public class KafkaConsumer {
@Autowired
JdbcTemplate jdbcTemplate;
@KafkaListener(topics = {"spectrum-data-vod"}, groupId = "test-group-1")
public void receive(String message) {
try {
JSONObject json = JSONObject.parseObject(message);
JSONObject videoInfo = json.getJSONObject("video_info");
//唯一列
String sndlvl_id = videoInfo.getString("sndlvl_id");
String is_charge = videoInfo.getString("is_charge");
String sndlvl_name = json.getString("sndlvl_name");
String clum_id = json.getString("clum_id");
String sqls="insert into iptv_content_info(sndlvl_id,is_charge,sndlvl_name,clum_id) values ('"+sndlvl_id+"','"+is_charge+"','"+sndlvl_name+"','"+clum_id+"')";
System.out.println("vod: " + sndlvl_name + " ----- " + sndlvl_id + " ----- " + clum_id + " ----- " + is_charge + " ----- ");
try {
/**
* 添加数据到数据库,并且抓取异常 因为主键的关系,重复添加将会报错,所以做异常处理
*/
jdbcTemplate.update(sqls);
} catch (Exception e) {
}
} catch (Exception e) {
e.printStackTrace();
}
}
@KafkaListener(topics = {"spectrum-data-device"}, groupId = "test-group-2")
public void receive2(String message) {
try {
JSONObject json = JSONObject.parseObject(message);
String model = json.getString("model");
String manufacturers = json.getString("manufacturers");
String apk_version = json.getString("apk_version");
String mac = json.getString("mac");
String user_id = json.getString("user_id");
String license = json.getString("license");
System.out.println("device: " + model + " ----- " + manufacturers + " ----- " + apk_version + " ----- " + mac + " ----- " + user_id + "-----" + license);
jdbcTemplate.update("insert into iptv_content_device(user_id,model,manufacturers,apk_version,mac,license)values ('"+
user_id +"','"+model+"','"+manufacturers+"','"+apk_version+"','"+mac+"','"+license+"') ON DUPLICATE KEY UPDATE model='"+model+"',manufacturers='"+manufacturers+"',apk_version='"+apk_version+"'");
} catch (Exception e) {
e.printStackTrace();
}
}
}
指定topics 以及 groupId 只要在方法的注解上标示就好了,但是问题是,假设你的jar包版本有问题的话是没有groupid这个选项的,它会报错,所以得注意下jar包的版本