本篇我们在一台ECS服务器上安装Flink SQL Client, 然后尝试在数据湖中建表。
flink
- 下载flink包
先看官网说明
当前Iceberg仅支持flink 1.11.x
https://archive.apache.org/dist/flink/flink-1.11.2/
flink-1.11.2-bin-scala_2.12.tgz
复制代码
2. 解压:tar -zxvf ```
flink-1.11.2-bin-scala_2.12.tgz
复制代码
注:可以本地启动flink
- 进目录1.11.2/bin/,执行命令./start-cluster.sh启动本地flink;
- 访问该机器的8081端口,可见本地flink启动成功:
配置flink的checkpoint
配置flink的checkpoint,因为目前flink提交iceberg的信息是在每次checkpoint的时候提交的。在sql client配置checkpoint的方法如下:
在flink-conf.yaml添加如下配置
cd ./flink-1.11.2/conf
vim flink-conf.yaml
添加
execution.checkpointing.interval: 10s # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 10 # checkpoint 失败容忍次数
复制代码
注:无论通过 SQL 还是 Datastream 入湖,都必须开启 Checkpoint。
Flink1.11.2 集成s3保存checkpoint
参考:www.jianshu.com/p/8fbe4434c…
state.backend: rocksdb
state.checkpoints.dir: s3://flink/flink-checkpoints
state.savepoints.dir: s3://flink/flink-savepoints
state.backend.incremental: true
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.ssl.enabled: false
s3.path.style.access: true
s3.endpoint: http://10.132.19.52:9000
复制代码
下载iceberg-hive-runtime和flink-sql-connector-hive
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-2.2.0_2.11/1.14.0
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/0.12.1/
复制代码
启动 SQL 客户端命令行界面
./flink-1.11.2/bin/sql-client.sh embedded \
-j ./iceberg-flink-runtime-0.10.0.jar \
-j ./flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar \
shell
复制代码
创建Hive catalog
创建一个名为iceberg的 iceberg catalog ,用来从 hive metastore 中加载表。
CREATE CATALOG iceberg WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://10.132.19.6:9083',
'clients'='5',
'property-version'='1',
'warehouse'='http://minioadmin:[email protected]:9000/buckets/datalake/'
);
复制代码
遇到的问题1
参考的文章 www.cnblogs.com/swordfall/p… 搞到这一步, 报错:java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
一开始觉得是自己的版本有适配问题(iceberg-flink-runtime,flink-sql-connector-hive两个包,一开始我找的高版本),最后全部缓存和参考文章里一样的,还是报同样的错误。浪费了一下午,一直在下载包,换包,再执行,还是一样的错,继续下载包,循环......
第二天一早来,翻了下iceberg的官网,看到下面一行,里面知道问题在哪儿了。因为报错里提示的就是没有hadoop的class。
下载了Hadoop的包,配置了一下,问题就解决了。 hadoop.apache.org/releases.ht…
wget https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz
export HADOOP_HOME = XXX
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
复制代码
总结:
- 遇到问题耗费了不少时间,出去转转或者做做其他事,换个脑袋的你解决问题更迅速。
- 看官网文档,看官网文档,看官网文档
遇到的问题2
报错org.apache.hadoop.hive.metastore.api.MetaException: Got exception: java.lang.ClassCastException class [Ljava.lang.Object; cannot be cast to class [Ljava.net.URI; ([Ljava.lang.Object; and [Ljava.net.URI; are in module java.base of loader 'bootstrap')
当前版本的flink只能在Java8上工作,而我的JDK版本是Java11,抱着试一试的心态把JDK版本降为了Java8,再次启动flink,成功。
配置sql-client-defaults.yaml
该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下。
如果不想每次启动sql client都重新创建catalog,可以在sql-client-defaults.yaml
里面设置下默认catalog。
catalogs:
- name: iceberg
type: iceberg
default-database: iceberg_db
catalog-type: hive
warehouse: s3a://datalake/
uri: thrift://10.132.19.6:9083
clients: 5
property-version: 1
复制代码
创建DATABASE
Flink SQL> use catalog iceberg;
Flink SQL> CREATE DATABASE iceberg_db;
复制代码
报错 [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hive.metastore.api.MetaException: Unable to create database path http://minioadmin:[email protected]:9000/buckets/datalake/iceberg_db.db, failed to create database iceberg_db
看来我yy出来的warehouse访问鉴权的方式minioadmin:minioadmin@是不行的
解决方案如下:
vim hadoop-3.3.1/etc/hadoop/core-site.xml
添加如下配置
<configuration>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>false</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>http://XXX:9000</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>minioadmin</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>minioadmin</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.fast.upload</name>
<value>true</value>
</property>
</configuration>
创建catalog命令改为
CREATE CATALOG iceberg WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://10.132.19.6:9083',
'clients'='5',
'property-version'='1',
'warehouse'='s3a://datalake/'
);
复制代码
./flink-1.11.2/bin/sql-client.sh embedded \
-j ./iceberg-flink-runtime-0.10.0.jar \
-j ./flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar \
shell
CREATE CATALOG iceberg WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://10.132.19.6:9083',
'clients'='5',
'property-version'='1',
'warehouse'='s3a://datalake/'
);
use catalog iceberg;
CREATE DATABASE iceberg_db;
use iceberg_db;
复制代码
创建表
CREATE TABLE iceberg.iceberg_db.sample (
id BIGINT COMMENT 'unique id',
data STRING);
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
复制代码
又报错了!
找到如下文章,提示 included /hadoop-3.3.0/share/hadoop/tools/lib
in HADOOP_CLASSPATH
我看了下HADOOP_CLASSPATH,竟然真没有,也就是执行 export HADOOP_CLASSPATH=$HADOOP_HOME/bin/hadoop classpath
并没有包含路径/tools/lib,添加了一下,果真解决了。
插入数据
INSERT INTO iceberg.iceberg_db.sample VALUES (1, 'a');
INSERT INTO sample VALUES (1, 'a');
复制代码