Flume 将8888端口接收的数据存入hive中

一、a1.sinks.s1.type = hive

(1)使用hive做flume sink时,对hive表的要求:

  • 表必须是事物表
  • 表必须是分区表
  • 表必须是分桶表
  • 表stored as orc

即clustered分桶、transactional事务、orc存储格式。

(2)拷贝hive的jar依赖包到flume安装目录的lib路径下,避免启动hive sink时失败。

cp /usr/local/apache-hive-3.1.2-bin/hcatalog/share/hcatalog/*.jar /usr/local/apache-flume-1.9.0-bin/lib/

(3)修改hive配置文件,使其支持事务处理。

vi /usr/local/apache-hive-3.1.2-bin/conf/hive-site.xml

添加如下内容。

<property>

    <name>hive.support.concurrency</name>

    <value>true</value>

  </property>

  <property>

    <name>hive.exec.dynamic.partition.mode</name>

    <value>nonstrict</value>

  </property>

  <property>

    <name>hive.txn.manager</name>

    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>

  </property>

(4)启动Hadoop、MySQL、HiveMetaStore

启动hadoop集群。

cd /usr/local/hadoop-3.1.4/sbin/

./start-all.sh

启动mysql服务。

service mysqld start

启动hive元数据服务。

hive --service metastore &

创建表:

create database flume;
use flume;

create table people(
id int,
name string,
age int)
clustered by (id) into 2 buckets
row format delimited lines terminated by '\n'
stored as orc
tblproperties("transactional"='true');

采集方案:

# 定义这个agent中三大组件Source、Channel、Sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = s1

# 配置Source组件:从44444端口采集数据
a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = ic
a1.sources.r1.interceptors.ic.type = timestamp
a1.sources.r1.interceptors.ic.headerName = time
a1.sources.r1.interceptors.ic.preserveExisting = false

# 配置Channel组件:中间缓存采用内存缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.keep-alive = 5

# 配置Sink组件:数据存储为Hive
a1.sinks.s1.type = hive
# hive元存储的url
a1.sinks.s1.hive.metastore = thrift://master:9083
# hive数据库名
a1.sinks.s1.hive.database = flume
# hive表名
a1.sinks.s1.hive.table = people
a1.sinks.s1.batchSize = 150
a1.sinks.s1.serializer = DELIMITED
a1.sinks.s1.serializer.delimiter = "\t"
a1.sinks.s1.serializer.serdeSeparator = '\t'
a1.sinks.s1.serializer.fieldnames = id,name,age

# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.s1.channel = c1

二、a1.sinks.sk1.type=hdfs

创建表:
create database flume;
use flume;
create table student(
source string,
name string,
grade string
)
row format delimited fields terminated by '\t';

采集方案:
a1.sources=s1
a1.channels=c1
a1.sinks=sk1

a1.sources.s1.type = netcat
a1.sources.s1.bind = master
a1.sources.s1.port = 8888

a1.channels.c1.type=memory

a1.sinks.sk1.type=hdfs
a1.sinks.sk1.hdfs.path=/user/hive/warehouse/flume.db/student
a1.sinks.sk1.hdfs.fileType=DataStream
a1.sinks.sk1.channel=c1
a1.sources.s1.channels = c1

猜你喜欢

转载自blog.csdn.net/GX_0824/article/details/126964559