前段时间,由于业务需要对Kafka消息从两方面进行存储,一方面离线分批存储,另一方面在线分布式存储。在存储Kafka消息前,需要了解Kafka工作原理。
1 离线分批存储
2 在线分布式存储
直接将消费到的Kafka消息写入HBase
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 5 14:12:09 2019
@author: yoyoyo
"""
from kafka import KafkaConsumer
import happybase
import json
import time
hbase_ip='localhost'
hbase_port=9090
ip = hbase_ip
port = hbase_port
pool = happybase.ConnectionPool(size=1, host=ip) # create connection
# table insert data
def hbase_load(tableName, lists):
with pool.connection() as connection:
connection.open() #start transport
if tableName not in str(connection.tables()):
create_table(connection, tableName)
table = connection.table(tableName)
b = table.batch(batch_size=1024) # batchs inserts data
for li in lists:
try:
rowkey = li['xxx']
data_dicts = {}
for d, x in li.items():
key = "timeStamp:" + d
value = str(x)
data_dicts[key] = value
b.put(row=rowkey, data=data_dicts)
b.send()
print("rowkey:" + rowkey + " data append success")
except Exception as ex:
print(str(ex) + " data append failure.")
connection.close()
# create table
def create_table(conn, table):
try:
conn.create_table(
table,
{
"timeStamp": dict(max_versions=10)
}
)
except Exception as ex:
print(str(ex) + " table exists.")
# print log
def log(str):
t = time.strftime(r"%Y-%m-%d %H:%M:%S", time.localtime())
print("[%s]%s" % (t, str))
lst = []
log('start consumer')
consumer = KafkaConsumer('test1', group_id='logGroup', bootstrap_servers=['xx.xx.xx.xx:9092']) # HBase集群master ip
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
log(recv)
dict_data = json.loads(msg.value, strict = False)
dict_data['xxx'] = str(dict_data['xxx'])+'-'+dict_data['xxx']
lst.append(dict_data)
hbase_load('kafka2Hbase_test', lst)
服务器调试中的报错处理##
错误1 连接Hbase出现的错误
thriftpy2.transport.TTransportException: TTransportException(type=1, message=“Could not connect to (‘localhost’, 9090)”
原因分析##
因为Hbase是用Java写的,原生地提供了Java接口,对非Java程序员,如果用的是其他的语言,则需要开启连接原生地提供的thrift接口服务器。也就是python使用HappyBase操作HBase需要先开启thrift接口服务器。
loads时报错,JSONDecodeError: invalid control character
解决办法:json.loads(json, strict=False) #关掉strict即可
解决办法并查看进程##
(base) root@master:/opt/hbase/hbase-1.2.10/bin# hbase-daemon.sh start thrift2
(base) root@master:~# jps
3266 QuorumPeerMain
4772 HMaster
996 Kafka
4102 ResourceManager
4231 NodeManager
11850 ThriftServer
12048 Jps
3890 SecondaryNameNode
3703 DataNode
3578 NameNode