读入数据时 注意服务器内存大小 可用命令查看!
如果内存太小会出现读入报错的情况
df
方法一
1.导入pom依赖
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0</version> </dependency> </dependencies>
2.编写代码
public class HbaseReadKafka{ static int i = 0; static Connection conn; static { try { Configuration cnf = HBaseConfiguration.create(); cnf.set("hbase.zookeeper.quorum", "192.168.64.128:2181"); conn = ConnectionFactory.createConnection(cnf); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.128:9092"); prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "cm"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop); consumer.subscribe(Arrays.asList(new String[]{"user_friends_raw"})); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3)); System.out.println("获取数据:" + records.count() + "====>" + (++i)); List<Put> puts = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { //对kafka读取的数据处理 塞入集合 String[] lines = record.value().split(",", -1); if (lines.length > 1) { String[] fids = lines[1].split(" "); for (String fid : fids) { Put put = new Put((lines[0] + "-" + fid).getBytes()); put.addColumn("base".getBytes(), "userid".getBytes(), lines[0].getBytes()); put.addColumn("base".getBytes(), "friendid".getBytes(), fid.getBytes()); puts.add(put); } } } //找到表 //调用hbase写入数据 try { Table htable = conn.getTable(TableName.valueOf("userfriends")); htable.put(puts); } catch (IOException e) { e.printStackTrace(); } //清空一下list集合 puts.clear(); //异步提交 consumer.commitAsync(); //同步提交 // consumer.commitSync(); } } }
方法二
类型总览!
1 导入pom
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.2.RELEASE</version>
<configuration>
<mainClass>com.kgc.UserinterestApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2 配置yml
spring: application: name: userinterest kafka: bootstrap-servers: 192.168.64.128:9092 consumer: #关闭手动提交 enable-auto-commit: false #从首位置读取文件 auto-offset-reset: earliest #序列化编码 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: ack-mode: manual_immediate server: port: 8999
3 配置桥接模式
# 编写接口1
public interface FillData<T,E>{ List<T> fillData (List<E> lst); }# 编写接口2
/** * 根据查询结果自动填充数据 * @param <T> */ public interface FillHbaseData<T> extends FillData<Put,T> { List<Put> fillData(List<T> list); } # 编写接口3
/** * 根据用户的不同数据和传入的实体模型进行数据格式转换接口 * @param <T> */ public interface StringToEntity<T>{ List<T> change(String line); }
# 编写接口 4
/** * 数据转化接口 kafka数据转为常用数据格式 * @param <T> */ public interface DataChange<T> { List<T> change(String line); }# 编写抽象类 5 实现接口4
/** * 桥梁模式中的抽象角色 */ public abstract class AbstracDataChange<E,T> implements DataChange<T> { protected FillData<E,T> fillData; protected StringToEntity<T> stringToEntity; protected Writer<E> writer; public AbstracDataChange(FillData<E, T> fillData, StringToEntity<T> stringToEntity, Writer<E> writer) { this.fillData = fillData; this.stringToEntity = stringToEntity; this.writer = writer; } public abstract List<T> change(String line); public abstract void fill(ConsumerRecord<String,String> record,String tableName); }
# 编写接口实现类1 实现接口 2
1.1
/** * 处理eventAttendees数据 */ public class EventAttendeesFillDataImp implements FillHbaseData<EventAttendees> { @Override public List<Put> fillData(List<EventAttendees> list) { List<Put> puts=new ArrayList<>(); list.stream().forEach(eventAttendees -> { Put put = new Put((eventAttendees.getEventid()+eventAttendees.getUserid()+eventAttendees.getAnswer()).getBytes()); put.addColumn("base".getBytes(),"eventid".getBytes(),eventAttendees.getEventid().getBytes()); put.addColumn("base".getBytes(),"userid".getBytes(),eventAttendees.getUserid().getBytes()); put.addColumn("base".getBytes(),"answer".getBytes(),eventAttendees.getAnswer().getBytes()); puts.add(put); }); return puts; } }# 编写接口实现类2 实现接口 2
1.2
public class EventsFillDataImp implements FillHbaseData<Events> { @Override public List<Put> fillData(List<Events> list) { List<Put> puts=new ArrayList<>(); list.stream().forEach(events -> { Put put=new Put(events.getEventid().getBytes()); put.addColumn("base".getBytes(),"userid".getBytes(),events.getUserid().getBytes()); put.addColumn("base".getBytes(),"starttime".getBytes(),events.getStarttime().getBytes()); put.addColumn("base".getBytes(),"city".getBytes(),events.getCity().getBytes()); put.addColumn("base".getBytes(),"state".getBytes(),events.getState().getBytes()); put.addColumn("base".getBytes(),"zip".getBytes(),events.getZip().getBytes()); put.addColumn("base".getBytes(),"country".getBytes(),events.getCountry().getBytes()); put.addColumn("base".getBytes(),"lat".getBytes(),events.getLat().getBytes()); put.addColumn("base".getBytes(),"lng".getBytes(),events.getLng().getBytes()); puts.add(put); }); return puts; } }# 编写接口实现类3 实现接口 2
1.3
/** * 针对userfriends消息队列转换的list集合再转为list<Put> */ public class UserFriendsFillDataImp implements FillHbaseData<UserFriends> { @Override public List<Put> fillData(List<UserFriends> list) { List<Put> puts=new ArrayList<>(); list.stream().forEach(userFriends -> { Put put = new Put((userFriends.getUserid()+"-"+userFriends.getFriendid()).getBytes()); put.addColumn("base".getBytes(),"userid".getBytes(), userFriends.getUserid().getBytes()); put.addColumn("base".getBytes(),"friendid".getBytes(), userFriends.getFriendid().getBytes()); puts.add(put); }); return puts; } }
# 编写接口实现类1 实现接口 3
public class EventAttendeesChangeImp implements StringToEntity<EventAttendees> { /** * 数据进入为 eventid yes maybe invited no * ex:123,112233 34343,234234 45454,112233 23232 234234,343343 34343 * 将数据格式转为 123 112233 yes,123 34343 yes,123 234234 maybe ...... * @param line * @return */ @Override public List<EventAttendees> change(String line) { String[] infos = line.split(",", -1); List<EventAttendees> eas= new ArrayList<>(); //先计算所有回答yes的人 if (!infos[1].trim().equals("")&&infos[1]!=null) { Arrays.asList(infos[1].split(" ")).stream() .forEach(yes->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(yes).answer("yes").build(); eas.add(eventAttendees); }); } //计算所有maybe的人 if (!infos[2].trim().equals("")&&infos[2]!=null) { Arrays.asList(infos[2].split(" ")).stream() .forEach(maybe->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(maybe).answer("maybe").build(); eas.add(eventAttendees); }); } //计算所有invited的人 if (!infos[3].trim().equals("")&&infos[3]!=null) { Arrays.asList(infos[3].split(" ")).stream() .forEach(invited->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(invited).answer("invited").build(); eas.add(eventAttendees); }); } //计算no的人 if (!infos[4].trim().equals("")&&infos[4]!=null) { Arrays.asList(infos[4].split(" ")).stream() .forEach(no->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(no).answer("no").build(); eas.add(eventAttendees); }); } return eas; } }# 编写接口实现类2 实现接口 3
public class EventsChangeImp implements StringToEntity<Events> { @Override public List<Events> change(String line) { String[] infos = line.split(",", -1); List<Events> events=new ArrayList<>(); Events event = Events.builder().eventid(infos[0]).userid(infos[1]).starttime(infos[2]) .city(infos[3]).state(infos[4]).zip(infos[5]) .country(infos[6]).lat(infos[7]).lng(infos[8]).build(); events.add(event); return events; } }# 编写接口实现类 3 实现接口 3
/** * 将 123123,123435 435455 345345 => 123123,12335 123123,435455 123123,345345 */ public class UserFriendsChangeImp implements StringToEntity<UserFriends> { @Override public List<UserFriends> change(String line) { String[] infos = line.split(",",-1); List<UserFriends> ufs=new ArrayList<>(); if(infos.length>=1){ UserFriends userFriends = UserFriends.builder().userid(infos[0]).friendid("").build(); ufs.add(userFriends); }else { Arrays.asList(infos[1].split(" ")).stream() .forEach(fid -> { UserFriends uf = UserFriends.builder().userid(infos[0]).friendid(fid).build(); ufs.add(uf); }); } return ufs; } }
3.1 配置
DataChangeFillHbaseDatabase实体类 完成接口整合
public class DataChangeFillHbaseDatabase<T> extends AbstracDataChange<Put,T> { static int count; public DataChangeFillHbaseDatabase(FillData<Put, T> fillData, StringToEntity<T> stringToEntity, Writer<Put> writer) { super(fillData, stringToEntity, writer); } @Override public List<T> change(String line) { return stringToEntity.change(line); } @Override public void fill(ConsumerRecord<String,String> record,String tableName) { //读kafka获得的ConsumerRecord 转字符串 List<Put> puts = fillData.fillData(change(record.value())); // puts.forEach( System.out::println); //将集合填充到对应的Hbase数据库中 writer.write(puts,tableName); System.out.println("hu获取到kafka数据======>"+count++); } }
3.2 配置实体类
# 实体类 1
@Data @AllArgsConstructor @NoArgsConstructor @Builder public class EventAttendees { private String eventid; private String userid; private String answer; }# 实体类 2
@Data @AllArgsConstructor @NoArgsConstructor @Builder public class Events { private String eventid; private String userid; private String starttime; private String city; private String state; private String zip; private String country; private String lat; private String lng; } # 实体类 3@Data @AllArgsConstructor @NoArgsConstructor @Builder public class UserFriends { private String userid; private String friendid; }
3.3 配置读写文件service层
# 接口 writer public interface Writer<T> { void write(List<T> puts, String tableName); }
#接口实现类
/** * 接受转换后的list<put>数据集合 填写到hbase数据库 */ @Component public class HbaseWriter implements Writer<Put> { @Resource private Connection connection; public void write(List<Put> puts,String tableName){ try { Table table = connection.getTable(TableName.valueOf(tableName)); table.put(puts); } catch (IOException e) { e.printStackTrace(); } } }#KafkaReader 类
@Component public class KafkaReader { @Resource private Writer<Put> writer; @KafkaListener(groupId = "cm", topics = {"events_raw"}) public void readEventToHbase(ConsumerRecord<String, String> record, Acknowledgment ack) { AbstracDataChange<Put, Events> eventHandler = new DataChangeFillHbaseDatabase<Events>( new EventsFillDataImp(), new EventsChangeImp(), writer ); eventHandler.fill(record,"events"); ack.acknowledge(); } @KafkaListener(groupId = "cm", topics = {"event_attendees_raw"}) public void readEventAttendeesToHbase(ConsumerRecord<String, String> record, Acknowledgment ack) { AbstracDataChange<Put, EventAttendees> eventHandler = new DataChangeFillHbaseDatabase<EventAttendees>( new EventAttendeesFillDataImp(), new EventAttendeesChangeImp(), writer ); eventHandler.fill(record,"eventsattends"); ack.acknowledge(); } // @KafkaListener(groupId = "cm", topics = {"user_friends_raw"}) public void readUserFriendsToHbase(ConsumerRecord<String, String> record, Acknowledgment ack) { AbstracDataChange<Put, UserFriends> eventHandler = new DataChangeFillHbaseDatabase<UserFriends>( new UserFriendsFillDataImp(), new UserFriendsChangeImp(), writer ); eventHandler.fill(record,"userfriends"); ack.acknowledge(); } }
3.4 hbase配置
@Configuration public class HbaseConfig { @Bean public org.apache.hadoop.conf.Configuration getConfig(){ org.apache.hadoop.conf.Configuration cfg = HBaseConfiguration.create(); cfg.set(HConstants.ZOOKEEPER_QUORUM,"192.168.64.128:2181"); return cfg; } @Bean @Scope(value = "prototype") public Connection getConnection(){ Connection connection = null; try { connection = ConnectionFactory.createConnection(getConfig()); } catch (IOException e) { e.printStackTrace(); } return connection; } @Bean public Supplier<Connection> hBaseConnection(){ return this::getConnection; } }