文章目录
- 1. Flink Yarn 模式高可用性配置(自己通过源码编译的Flink)
- 2. Hive
- 2.1. Hive简介
- 2.2. Hive的数据存储
- 2.3. [Hive基本架构](https://blog.csdn.net/u013595419/article/details/79632928)
- 2.4. [Hive为什么要启用Metastore](https://blog.csdn.net/qq_35440040/article/details/82462269)
- 2.5. [hive集群搭建](https://blog.csdn.net/yangang1223/article/details/80183038)
- 3. [Hive Integration](https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/hive/)
- 4. [Flink 1.10 SQL、HiveCatalog与事件时间整合示例](https://www.jianshu.com/p/fe49b8f25313)
- 5. 寄语:当你遇到困难时,你会如何去面对, 这将会决定你的人生最终能够走多远!
1. Flink Yarn 模式高可用性配置(自己通过源码编译的Flink)
本次采用自编译(基于Flink-1.10,hadoop-2.9.2)搭建集群
-
参考链接
-
启动集群时遇到的错误
-
错误一
java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder; at org.apache.flink.yarn.cli.FlinkYarnSessionCli.<init>(FlinkYarnSessionCli.java:197) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.<init>(FlinkYarnSessionCli.java:173) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:836)
-
解决方案
在 flink-shaded-9.0/flink-shaded-hadoop-2-uber
/pom.xml 中的 dependencyManagement 标签中添加如下依赖Apache Commons CLI » 1.4;重新编译mvn clean install -DskipTests -Drat.skip=true -Pvendor-repos -Dhadoop.version=2.9.2
;最后再重新编译flink源码mvn clean package -T 4 -Dfast -Drat.skip=true -Pinclude-hadoop -Dhadoop.version=2.9.2 -Dmaven.compile.fork=true -DskipTests -Dscala-2.11
-
错误二
java.io.IOException: Could not create FileSystem for highly available storage path (hdfs://xiaofan/flink/ha/default) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103) at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89) at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.<init>(TaskManagerRunner.java:132) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$2(TaskManagerRunner.java:322) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:321) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:287) Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://xiaofan/flink/ha/default at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:192) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100) ... 11 more Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: xiaofan at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:443) at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:132) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:351) at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:285) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:164) at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:164) ... 15 more Caused by: java.net.UnknownHostException: xiaofan ... 21 more
-
解决方案:
修改配置文件
-
-
- 每个 Flink TaskManager 在集群中提供 slot。 slot 的数量通常与每个 TaskManager 的可用 CPU 内核数成比例。
一般情况下你的 slot 数是你每个 TaskManager 的 cpu 的核数。
- parallelism 是指 taskmanager 实际使用的并发能力
- 每个 Flink TaskManager 在集群中提供 slot。 slot 的数量通常与每个 TaskManager 的可用 CPU 内核数成比例。
-
Flink集群高可用HA测试
扫描二维码关注公众号,回复: 10302910 查看本文章-
standalone模式
- 启动集群
- 浏览器访问
由于端口占用,我用了8083端口号
- 关闭集群
- 启动集群
-
Flink On Yarn模式
-
首先需要修改hadoop中 yarn-site.xml 中的配置,设置提交应用程序的最大尝试次数
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>8</value> <description> The maximum number of application master execution attempts. </description> </property>
-
配置Yarn重试次数(
此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
)vi conf/flink-conf.yaml yarn.application-attempts: 8
-
注意:Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.
-
Flink on yarn实现逻辑
-
启动方式
-
第一种方式(
启动一个一直运行的flink集群
):yarn-session.sh
(开辟资源)+flink run
(提交任务)# 下面的命令会申请4个taskmanager,每个1G内存和2个solt,超过集群总资源将会启动失败。 ./bin/yarn-session.sh -n 4 -tm 1024 -s 2 --nm xiaofan-flink -d
- -n ,–container 分配多少个yarn容器(=taskmanager的数量)
- -D 动态属性
- -d, --detached 独立运行
- -jm,–jobManagerMemory JobManager的内存 [in MB]
- -nm,–name 在YARN上为一个自定义的应用设置一个名字
- -q,–query 显示yarn中可用的资源 (内存, cpu核数)
- -qu,–queue 指定YARN队列.
- -s,–slots 每个TaskManager使用的slots(vcore)数量
- -tm,–taskManagerMemory 每个TaskManager的内存 [in MB]
- -z,–zookeeperNamespace 针对HA模式在zookeeper上创建NameSpace
-
运行结果如图
-
浏览器中访问
http://baojiabei:8083
-
yarn web-ui中
http://192.168.1.27:8088
-
注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行
-
提交Job到长期运行的flink on yarn实例上(
注意:要在FlinkYarnSessionCli启动的节点上提交
)./bin/flink run ./examples/batch/WordCount.jar -input hdfs://cluster/words.txt -output hdfs://cluster/flink-word-count
-
通过web ui可以看到已经运行完成的任务
-
第二种启动方式flink run -m yarn-cluster(开辟资源+提交任务)
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar -input hdfs://cluster/words.txt -output hdfs://cluster/flink-word-count—2
-
注意jar包的位置
-
运行结果
-
-
-
2. Hive
2.1. Hive简介
- Hive是基于Hadoop的一个离线数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。
- 元数据存储:Hive 将元数据存储在数据库中。
Hive 中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。
- Hive可以自由的扩展集群的规模,一般情况下不需要重启服务。
2.2. Hive的数据存储
- Hive中所有的数据都存储在 HDFS 中,没有专门的数据存储格式(可支持Text,SequenceFile,ParquetFile,RCFILE等)
- 只需要在创建表的时候告诉 Hive 数据中的列分隔符和行分隔符,Hive 就可以解析数据。
- Hive 中包含以下数据模型:
DB
、Table
,External Table
,Partition
,Bucket
。- db:在hdfs中表现为${hive.metastore.warehouse.dir}目录下一个文件夹
- table:在hdfs中表现所属db目录下一个文件夹
- external table:与table类似,不过其数据存放位置可以在任意指定路径
- partition:在hdfs中表现为table目录下的子目录
- bucket:在hdfs中表现为同一个表目录下根据hash散列之后的多个文件
2.3. Hive基本架构
2.4. Hive为什么要启用Metastore
- 服务端metastore 启动方式:
hive --service metastore -p 9083 &
- hiveserver2启动方式:前台
bin/hiveserver2
后台nohup bin/hiveserver2 1>/var/log/hiveserver.log 2>/var/log/hiveserver.err &
(注意权限问题) - client端链接hiveserver2:
bin/beeline -u jdbc:hive2://192.168.1.27:10000 -n hadoop
2.5. hive集群搭建
3. Hive Integration
- ${FLINK_HOME}/lib目录添加jar包
- HiveCatalog
-
启动metastore服务, 并通过hive客户端测试连接该服务
-
配置Flink群集和SQL CLI
-
建立Kafka集群,并验验证集群
- 生产端生产数据
bin/kafka-console-producer.sh --broker-list 192.168.1.25:9091 --topic xiaofan_test
- 消费端消费数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.25:9091 --topic xiaofan_test --from-beginning
- 生产端生产数据
-
启动SQL Client,并使用Flink SQL DDL创建一个Kafka表
CREATE TABLE mykafka (name String, age Int) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'xiaofan_test', 'connector.properties.zookeeper.connect' = '192.168.1.23:2181', 'connector.properties.bootstrap.servers' = '192.168.1.23:9091', 'format.type' = 'csv', 'update-mode' = 'append' );
-
通过Hive Cli验证该表对Hive也可见,并注意该表具有属性is_generic=true:
-
运行Flink SQL查询Kakfa表, 在Kafka主题中产生更多消息,应该立即在SQL Client中看到Flink产生的结果,如下所示
-
注意:要在${FLINK_HOME}/lib目录下面配置相关的jar
-