【实时数仓】在Hbase建立维度表、保存维度数据到Hbase、保存业务数据到kafka主题

一 分流Sink之建立维度表到HBase(Phoenix)

1 拼接建表语句

如果读取到的配置信息是维度数据,提前在hbase中通过Phoenix创建维度表。

(1)定义配置常量类

定义一个项目中常用的配置常量类GmallConfig。

package com.hzy.gmall.realtime.common;

/**
 * 实时数仓中的常量类
 */
public class GmallConfig {
    
    
    public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}

(2)引入依赖

<!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,可以很方便的对bean对象的属性进行操作-->
<dependency>
    <groupId>commons-beanutils</groupId>
    <artifactId>commons-beanutils</artifactId>
    <version>1.9.3</version>
</dependency>

<dependency>
    <groupId>org.apache.phoenix</groupId>
    <artifactId>phoenix-spark</artifactId>
    <version>5.0.0-HBase-2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.el</artifactId>
        </exclusion>
    </exclusions>
</dependency>

(3)hbase-site.xml

因为要用单独的schema,所以在Idea程序中加入hbase-site.xml。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop101:8020/hbase</value>
    </property>

    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop101,hadoop102,hadoop103</value>
    </property>

    <property>
        <name>hbase.unsafe.stream.capability.enforce</name>
        <value>false</value>
    </property>

    <property>
        <name>hbase.wal.provider</name>
        <value>filesystem</value>
    </property>

    <property>
        <name>phoenix.schema.isNamespaceMappingEnabled</name>
        <value>true</value>
    </property>

    <property>
        <name>phoenix.schema.mapSystemTablesToNamespace</name>
        <value>true</value>
    </property>

</configuration>

注意:为了开启hbase的namespace和phoenix的schema的映射,在程序中需要加这个配置文件,另外在linux服务上,也需要在hbase以及phoenix的hbase-site.xml配置文件中,加上以上后两个配置,并使用xsync进行同步

/opt/module/hbase-2.0.5/conf  注意分发
/opt/module/phoenix-5.0.0/bin

重启Hbase服务

(4)在phoenix中执行

create schema GMALL2022_REALTIME;
# 在hbase查看是否创建成功
cd /opt/module/hbase-2.0.5/bin/
hbase shell
list_namespace

(5)增加代码

a TableProcessFunction

// 声明连接对象
private Connection conn;

@Override
public void open(Configuration parameters) throws Exception {
    
    
    // 注册驱动
    Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
    // 获取连接
    conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
// 如果读取到的配置信息是维度数据,提前创建维度表
if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){
    
    
    checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
}

// 将配置信息放到状态中
// 拼接key
String key = sourceTable + ":" + operateType;
broadcastState.put(key,tableProcess);

b checkTable

// 在处理配置数据时,提前建立维度表
// create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {
    
    
    // 对主键进行空值处理
    if (pk == null){
    
    
        pk = "id";
    }
    // 对建表扩展进行空值处理
    if (ext == null){
    
    
        ext = "";
    }
    StringBuilder createSql = new StringBuilder("create table if not exists "+
            GmallConfig.HBASE_SCHEMA + "." + tableName +"(");

    String[] fieldArr = fields.split(",");
    for (int i = 0; i < fieldArr.length; i++) {
    
    
        String field = fieldArr[i];
        // 判断是否为主键
        if (field.equals(pk)){
    
    
            createSql.append(field + " varchar primary key ");
        }else {
    
    
            createSql.append(field + " varchar ");
        }
        if(i < fieldArr.length - 1){
    
    
            createSql.append(",");
        }
    }
    createSql.append(")" + ext);
    System.out.println("phoenix中的建表语句:" + createSql);


    // 创建数据库操作对象
    PreparedStatement ps = null;
    try {
    
    
        ps = conn.prepareStatement(createSql.toString());
        // 执行sql语句
        ps.execute();
    } catch (SQLException e) {
    
    
        e.printStackTrace();
        throw new RuntimeException("在phoenix中建表失败");
    } finally {
    
    
        // 释放资源
        if(ps != null){
    
    
            ps.close();
        }
    }
}

(6)测试

启动hadoop(等待安全模式关闭再进行下一步),zookeeper,kafka,hbase,phoenix,maxwell,在phoenix中查看表数据
!tables
select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;

2 过滤字段

数据在向下游传递之前,过滤掉不需要的字段,只保留配置表中sink_columns存在的字段。

(1)代码编写

if (tableProcess != null){
    
    
    // 在配置表中找到了该操作对应的配置
    // 判断是事实数据还是维度数据
    String sinkTable = tableProcess.getSinkTable();
    jsonObj.put("sink_table",sinkTable);

    // 在向下游传递数据之前,将不需要的字段过滤掉
    // 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤
    JSONObject dataJsonObj = jsonObj.getJSONObject("data");
    filterColumns(dataJsonObj,tableProcess.getSinkColumns());

    String sinkType = tableProcess.getSinkType();
    if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
    
    
        // 维度数据,放到维度侧输出流中
        ctx.output(dimTag,jsonObj);
    }else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
    
    
        // 事实数据,放到主流中
        out.collect(jsonObj);
    }
}else {
    
    
    // 在配置表中没有该操作对应的配置
    System.out.println("No This Key In TableProcess:" + key);
}
// 过滤字段
    private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {
    
    
        // dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}
        // sinkColumns : id,tm_name
        String[] columnArr = sinkColumns.split(",");
        // 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法
        List<String> columnList = Arrays.asList(columnArr);

        // 获取json中的每一个名值对(KV)
        Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
//        // 获取迭代器
//        Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
//        // 遍历,如果不包含则删除
//        for (;it.hasNext();) {
    
    
//            if(!columnList.contains(it.next().getKey())){
    
    
//                it.remove();
//            }
//        }
        entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));
    }

(2)测试

开启相关环境,在表中添加或者删除数据,查看输出结果。

(3)总结

在这里插入图片描述

动态分流总结:

  • 广播流数据处理,FlinkCDC从MySQL中读取配置信息
    • 获取状态
    • 读取配置**(维度表的创建)**
    • 将配置封装为Map<sourceTable:operateType,TableProcess>放到状态中。
  • 主流数据处理,maxwell从业务数据库中采集到的数据
    • 获取状态
    • 从状态中获取当前处理数据的配置信息(字段过滤)
    • 根据配置信息进行分流(事实与维度)
  • 维度表的创建
    • 拼接建表语句
    • 通过jdbc执行建表语句
    • 测试
      • schema和namespace的映射
      • 拼接sql,空格处理
      • 提前创建schema
  • 字段过滤
    • 获取保留字段,放到List集合中
    • 获取dataJsonObj,转换为EntrySet
    • 根据保留字段判断EntrySet遍历出来的entry,是否保留

二 分流Sink之保存维度数据到HBase(Phoenix)

1 程序执行流程

在这里插入图片描述

DimSink 继承了RichSinkFunction,这个function得分两条时间线。

  • 一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行。
  • 另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。

2 DimSink

package com.hzy.gmall.realtime.app.fun;
/**
 * 将维度侧输出流的数据写到hbase(Phoenix)中
 */
public class DimSink extends RichSinkFunction<JSONObject> {
    
    
    private Connection conn;


    @Override
    public void open(Configuration parameters) throws Exception {
    
    
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    @Override
    public void invoke(JSONObject jsonObj, Context context) throws Exception {
    
    
        // 上游传递过来的数据格式如下:
        // {"database":"gmall2022",
        // "data":{"tm_name":"a","id":13},
        // "commit":true,
        // "sink_table":"dim_base_trademark",
        // "type":"insert",
        // "table":"base_trademark","
        // ts":1670131087}

        // 获取维度表表名
        String tableName = jsonObj.getString("sink_table");
        // 获取数据
        JSONObject dataJsonObj = jsonObj.getJSONObject("data");
        // 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
        String upsertSql = genUpsertSql(tableName,dataJsonObj);

        System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);

        PreparedStatement ps = null;
        try {
    
    
            // 创建数据库操作对象
            ps = conn.prepareStatement(upsertSql);
            // 执行sql语句
            ps.executeUpdate();
            // 手动提交事务,phoenix的连接实现类不是自动提交事务
            conn.commit();
        }catch (SQLException e){
    
    
            e.printStackTrace();
            throw new RuntimeException("向phoenix维度表中插入数据失败了");
        } finally {
    
    
            // 释放资源
            if (ps != null){
    
    
                ps.close();
            }
        }
    }

    // 拼接插入语句
    private String genUpsertSql(String tableName, JSONObject dataJsonObj) {
    
    
        // id 10
        // tm_name zs
        String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +
                " ("+ StringUtils.join(dataJsonObj.keySet(),",") +
                ") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";
        return upsertSql;
    }
}

3 BaseDBApp

//TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
dimDS.addSink(new DimSink());

4 测试

开启必要环境,向base_tradmark表中添加一条数据,查看phoenix是否插入成功。

三 分流Sink之保存业务数据到Kafka主题

1 BaseDBApp

//TODO 9 将主流数据写回kafka的dwd层
realDS.addSink(
        MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
    
    
            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
    
    
                return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),
                        jsonObj.getJSONObject("data").toJSONString().getBytes());
            }
        })
);

2 MyKafkaUtil

package com.hzy.gmall.realtime.utils;
// 获取kafka的生产者
// 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
//    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
    
    
//        return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
//    }
    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
    
    
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
    
    
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
    
    
                return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());
            }
        },props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    // 获取kafka的生产者
    public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){
    
    
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

3 测试

在base_trademark表中添加一条数据,查看程序输出结果,在phoenix中查看结果。

维度数据::3> {
    
    "database":"gmall2022","xid":24993,"data":{
    
    "tm_name":"c","id":14},"commit":true,"sink_table":"dim_base_trademark","type":"insert","table":"base_trademark","ts":1670150491}
向phoenix维度表中插入数据的sql:upsert into GMALL2022_REALTIME.dim_base_trademark (tm_name,id) values('c','14')

select * from GMALL2022_REALTIME.DIM_BASE_TRADEMARK;

启动kafka消费者,kfkcon.sh dwd_order_info,在order_info中修改一条数据,查看程序输出结果,在kafka中查看结果。

事实数据::3> {
    
    "database":"gmall2022","xid":25144,"data":{
    
    "delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00},"old":{
    
    "consignee":"苗冰"},"commit":true,"sink_table":"dwd_order_info","type":"update","table":"order_info","ts":1670150618}
kafka中输出内容
{
    
    "delivery_address":"第19大街第35号楼3单元488门","consignee":"苗冰11","original_total_amount":976.00,"coupon_reduce_amount":0.00,"order_status":"1001","total_amount":990.00,"user_id":39,"province_id":16,"feight_fee":14.00,"consignee_tel":"13437571252","id":28782,"activity_reduce_amount":0.00}

4 总结

动态分流测试执行流程

  • 需要启动的进程:zookeeper,kafka,maxwell,hdfs,hbase,BaseDBApp

  • 当业务数据发生变化,maxwell会采集变化数据到ods层

  • BaseDBApp从ods层读取到变化数据,作为业务主流

  • BaseDBApp在启动时,会通过FlinkCDC读取配置表,作为广播流

  • 业务流和广播流通过connect进行连接

  • 对连接之后的数据通过process进行处理

    • processElement
    • processBroadcastElement

    具体执行流程见一 2(3)总结

  • 将维度侧输出流的数据写到Hbase中 – DimSink

    • 拼接upsert
    • 执行sql(手动提交事务)
  • 将主流数据写回到kafka的dwd层

    • 重写获取FlinkKafkaProducer的方法,自定义序列化的过程
    • 将主流的数据写到kafka不同的主题中,并且保存精准一次性

四 总结

DWD的实时计算核心就是数据分流,其次是状态识别。在开发过程中使用了几个灵活度较强算子,比如RichMapFunction, ProcessFunction, RichSinkFunction。 那这几个算子何时会用到,如何进行选择,汇总见下表:

Function 可转换结构 可过滤数据 侧输出 open****方法 可以使用状态 输出至
MapFunction Yes No No No No 下游算子
FilterFunction No Yes No No No 下游算子
RichMapFunction Yes No No Yes Yes 下游算子
RichFilterFunction No Yes No Yes Yes 下游算子
ProcessFunction Yes Yes Yes Yes Yes 下游算子
SinkFunction Yes Yes No No No 外部
RichSinkFunction Yes Yes No Yes Yes 外部

从对比表中能明显看出,Rich系列能功能强大,ProcessFunction功能更强大,但是相对的越全面的算子使用起来也更加繁琐。

五 附录:完整代码

0 BaseDBApp

package com.hzy.gmall.realtime.app.dwd;

public class BaseDBApp {
    
    
    public static void main(String[] args) throws Exception {
    
    
        //TODO 1 基本环境准备
        //流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(4);

//        //TODO 2 检查点设置
//        //开启检查点
//        env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
//        // 设置检查点超时时间
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        // 设置重启策略
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
//        // 设置job取消后,检查点是否保留
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
//        // 指定操作HDFS的用户
//        System.setProperty("HADOOP_USER_NAME","hzy");

        //TODO 3 从kafka中读取数据
        //声明消费的主题以及消费者组
        String topic = "ods_base_db_m";
        String groupId = "base_db_app_group";
        // 获取消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        // 读取数据,封装成流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4 对数据类型进行转换 String -> JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 5 简单的ETL
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
                new FilterFunction<JSONObject>() {
    
    
                    @Override
                    public boolean filter(JSONObject jsonobj) throws Exception {
    
    
                        boolean flag =
                                jsonobj.getString("table") != null &&
                                        jsonobj.getString("table").length() > 0 &&
                                        jsonobj.getJSONObject("data") != null &&
                                        jsonobj.getString("data").length() > 3;
                        return flag;
                    }
                }
        );
//        filterDS.print("<<<");

        //TODO 6 使用FlinkCDC读取配置表数据
        //获取dataSource
        Properties props = new Properties();
        props.setProperty("scan.startup.mode","initial");
        SourceFunction<String> mySqlSourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop101")
                .port(3306)
                .username("root")
                .password("123456")
                // 可配置多个库
                .databaseList("gmall2022_realtime")
                ///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
                //注意:指定的时候需要使用"db.table"的方式
                .tableList("gmall2022_realtime.table_process")
                .debeziumProperties(props)
                .deserializer(new MyDeserializationSchemaFunction())
                .build();

        // 读取数据封装流
        DataStreamSource<String> mySqlDS = env.addSource(mySqlSourceFunction);

        // 为了让每一个并行度上处理业务数据的时候,都能使用配置流的数据,需要将配置流广播下去
        // 想要使用广播状态,状态描述器只能是map,使用map状态存储
        MapStateDescriptor<String, TableProcess> mapStateDescriptor =
                new MapStateDescriptor<>("table_process", String.class, TableProcess.class);

        BroadcastStream<String> broadcastDS = mySqlDS.broadcast(mapStateDescriptor);

        // 调用非广播流的connect方法,将业务流与配置流进行连接
        BroadcastConnectedStream<JSONObject, String> connectDS = filterDS.connect(broadcastDS);

        //TODO 7 动态分流,将维度数据放到维度侧输出流,事实数据放到主流中
        //声明维度侧输出流的标记
        OutputTag<JSONObject> dimTag = new OutputTag<JSONObject>("dimTag") {
    
    };
        SingleOutputStreamOperator<JSONObject> realDS = connectDS.process(
                new TableProcessFunction(dimTag,mapStateDescriptor)
        );
        // 获取维度侧输出流
        DataStream<JSONObject> dimDS = realDS.getSideOutput(dimTag);
        realDS.print("事实数据:");
        dimDS.print("维度数据:");

        //TODO 8 将维度侧输出流的数据写到hbase(Phoenix)中
        dimDS.addSink(new DimSink());

        //TODO 9 将主流数据写回kafka的dwd层
        realDS.addSink(
                MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
    
    
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObj, @Nullable Long timestamp) {
    
    
                        return new ProducerRecord<byte[], byte[]>(jsonObj.getString("sink_table"),
                                jsonObj.getJSONObject("data").toJSONString().getBytes());
                    }
                })
        );

        env.execute();
    }
}

1 MyKafkaUtil

package com.hzy.gmall.realtime.utils;
/**
 * 操作kafka工具类
 */
public class MyKafkaUtil {
    
    
    private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";
    private static final String DEFAULT_TOPIC = "default_topic";

    // 获取kafka的消费者
    public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){
    
    
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        // 定义消费者组
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);
    }

    // 获取kafka的生产者
    // 这种实现只能保证数据不丢失,不能保证精准一次,只能保证数据不丢失
//    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
    
    
//        return new FlinkKafkaProducer<String>(KAFKA_SERVER,topic,new SimpleStringSchema());
//    }
    public static FlinkKafkaProducer<String> getKafkaSink(String topic){
    
    
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<String>(DEFAULT_TOPIC, new KafkaSerializationSchema<String>() {
    
    
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String str, @Nullable Long timestamp) {
    
    
                return new ProducerRecord<byte[], byte[]>(topic,str.getBytes());
            }
        },props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    // 获取kafka的生产者
    public static <T>FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema){
    
    
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,1000 * 60 * 15 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,kafkaSerializationSchema,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }
}

2 GmallConfig

package com.hzy.gmall.realtime.common;

/**
 * 实时数仓中的常量类
 */
public class GmallConfig {
    
    
    public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop101,hadoop102,hadoop103:2181";
}

3 TableProcess

package com.hzy.gmall.realtime.beans;

import lombok.Data;
@Data
public class TableProcess {
    
    
    //动态分流Sink常量   改为小写和脚本一致
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //来源表
    String sourceTable;
    //操作类型 insert,update,delete
    String operateType;
    //输出类型 hbase kafka
    String sinkType;
    //输出表(主题)
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}

4 MyDeserializationSchemaFunction

package com.hzy.gmall.realtime.app.fun;

public class MyDeserializationSchemaFunction  implements DebeziumDeserializationSchema<String> {
    
    
    // 反序列化
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
    
    
        // 导入的是org.apache.kafka.connnect.data包
        Struct valueStruct = (Struct) sourceRecord.value();
        // 获取数据的来源
        Struct afterStruct = valueStruct.getStruct("after");
        // 获取数据库和表名的来源
        Struct sourceStruct = valueStruct.getStruct("source");
        // 获取数据库
        String database = sourceStruct.getString("db");
        // 获取表名
        String table = sourceStruct.getString("table");
        // 获取操作类型
//        String op = valueStruct.getString("op");
        String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
        if(type.equals("create")){
    
    
            type = "insert";
        }
        JSONObject jsonObj = new JSONObject();
        jsonObj.put("database",database);
        jsonObj.put("table",table);
        jsonObj.put("type",type);
        // 获取影响的数据
        // 删除时,afterStruct为空
        JSONObject dataJsonObj = new JSONObject();
        if (afterStruct != null){
    
    
            // schema获取源数据的格式,fields获取里面的各个元素
            for (Field field : afterStruct.schema().fields()) {
    
    
                String fieldName = field.name();
                Object fieldValue = afterStruct.get(field);
                dataJsonObj.put(fieldName,fieldValue);
            }
        }
        // 删除操作会使得data属性不为空,但size为0
        jsonObj.put("data",dataJsonObj);

        // 向下游发送数据
        collector.collect(jsonObj.toJSONString()) ;
    }

    // 指定类型
    @Override
    public TypeInformation<String> getProducedType() {
    
    
        return TypeInformation.of(String.class);
    }
}

5 TableProcessFunction

package com.hzy.gmall.realtime.app.fun;

/**
 * 实现动态分流
 * 目前流中有两条流中的数据,使用以下两个方法分别进行处理
 */
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
    
    
    // 声明维度侧输出流标签
    private OutputTag<JSONObject> dimTag;
    // 声明广播状态描述器
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    // 声明连接对象
    private Connection conn;

    @Override
    public void open(Configuration parameters) throws Exception {
    
    
        // 注册驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        // 获取连接
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
    
    
        this.dimTag = dimTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    // 处理业务流中的数据,maxwell从业务数据库中采集到的数据
    @Override
    public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
    
    
        String table = jsonObj.getString("table");
        String type = jsonObj.getString("type");

        // 在使用maxwell处理历史数据的时候,类型是bootstrap-insert,修复为insert
        if (type.equals("bootstrap-insert")){
    
    
            type = "insert";
            jsonObj.put("type",type);
        }

        // 拼接key
        String key = table  + ":" + type;
        // 获取状态
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 从状态中获取配置信息
        TableProcess tableProcess = broadcastState.get(key);

        if (tableProcess != null){
    
    
            // 在配置表中找到了该操作对应的配置
            // 判断是事实数据还是维度数据
            String sinkTable = tableProcess.getSinkTable();
            jsonObj.put("sink_table",sinkTable);

            // 在向下游传递数据之前,将不需要的字段过滤掉
            // 过滤思路:从配置表中读取保留字段,根据保留字段对data中的属性进行过滤
            JSONObject dataJsonObj = jsonObj.getJSONObject("data");
            filterColumns(dataJsonObj,tableProcess.getSinkColumns());

            String sinkType = tableProcess.getSinkType();
            if (sinkType.equals(TableProcess.SINK_TYPE_HBASE)){
    
    
                // 维度数据,放到维度侧输出流中
                ctx.output(dimTag,jsonObj);
            }else if (sinkType.equals(TableProcess.SINK_TYPE_KAFKA)){
    
    
                // 事实数据,放到主流中
                out.collect(jsonObj);
            }
        }else {
    
    
            // 在配置表中没有该操作对应的配置
            System.out.println("No This Key In TableProcess:" + key);
        }
    }

    // 过滤字段
    private void filterColumns(JSONObject dataJsonObj, String sinkColumns) {
    
    
        // dataJsonObj : {"tm_name":"aaa","logo_url":"aaa","id":12}
        // sinkColumns : id,tm_name
        String[] columnArr = sinkColumns.split(",");
        // 将数组转换成集合,以便下面和entrySet进行比较,数组中没有”包含“方法
        List<String> columnList = Arrays.asList(columnArr);

        // 获取json中的每一个名值对(KV)
        Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();
//        // 获取迭代器
//        Iterator<Map.Entry<String, Object>> it = entrySet.iterator();
//        // 遍历,如果不包含则删除
//        for (;it.hasNext();) {
    
    
//            if(!columnList.contains(it.next().getKey())){
    
    
//                it.remove();
//            }
//        }
        entrySet.removeIf(entry -> !columnList.contains(entry.getKey()));
    }

    // 处理广播流中的数据,FlinkCDC从MySQL中读取配置信息
    @Override
    public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
    
    
        // 获取状态
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        // 将json格式字符串转换为JSON对象
        JSONObject jsonObj = JSONObject.parseObject(jsonStr);
        // 获取配置表中的一条配置信息
        // parseObject:将json格式字符串转化为json格式对象
        // 第二个参数为将json字符串转化为何种格式的对象
        TableProcess tableProcess = JSONObject.parseObject(jsonObj.getString("data"), TableProcess.class);
        // 业务数据库表名
        String sourceTable = tableProcess.getSourceTable();
        // 操作类型
        String operateType = tableProcess.getOperateType();
        // 数据类型 hbase -- 维度数据    kafka -- 事实数据
        String sinkType = tableProcess.getSinkType();
        // 指定输出目的地
        String sinkTable = tableProcess.getSinkTable();
        // 主键
        String sinkPk = tableProcess.getSinkPk();
        // 指定保留字段(列)
        String sinkColumns = tableProcess.getSinkColumns();
        // 指定建表扩展语句
        String sinkExtend = tableProcess.getSinkExtend();

        // 如果读取到的配置信息是维度数据,提前创建维度表
        if (sinkType.equals(TableProcess.SINK_TYPE_HBASE) && "insert".equals(operateType)){
    
    
            checkTable(sinkTable,sinkPk,sinkColumns,sinkExtend);
        }

        // 将配置信息放到状态中
        // 拼接key
        String key = sourceTable + ":" + operateType;
        broadcastState.put(key,tableProcess);
    }

    // 在处理配置数据时,提前建立维度表
    // create table if not exists 表空间.表名(字段名 数据类型,字段名 数据类型)
    private void checkTable(String tableName, String pk, String fields, String ext) throws SQLException {
    
    
        // 对主键进行空值处理
        if (pk == null){
    
    
            pk = "id";
        }
        // 对建表扩展进行空值处理
        if (ext == null){
    
    
            ext = "";
        }
        StringBuilder createSql = new StringBuilder("create table if not exists "+
                GmallConfig.HBASE_SCHEMA + "." + tableName +"(");

        String[] fieldArr = fields.split(",");
        for (int i = 0; i < fieldArr.length; i++) {
    
    
            String field = fieldArr[i];
            // 判断是否为主键
            if (field.equals(pk)){
    
    
                createSql.append(field + " varchar primary key ");
            }else {
    
    
                createSql.append(field + " varchar ");
            }
            if(i < fieldArr.length - 1){
    
    
                createSql.append(",");
            }
        }
        createSql.append(")" + ext);
        System.out.println("phoenix中的建表语句:" + createSql);


        // 创建数据库操作对象
        PreparedStatement ps = null;
        try {
    
    
            ps = conn.prepareStatement(createSql.toString());
            // 执行sql语句
            ps.execute();
        } catch (SQLException e) {
    
    
            e.printStackTrace();
            throw new RuntimeException("在phoenix中建表失败");
        } finally {
    
    
            // 释放资源
            if(ps != null){
    
    
                ps.close();
            }
        }
    }
}

6 DimSink

package com.hzy.gmall.realtime.app.fun;

/**
 * 将维度侧输出流的数据写到hbase(Phoenix)中
 */
public class DimSink extends RichSinkFunction<JSONObject> {
    
    
    private Connection conn;


    @Override
    public void open(Configuration parameters) throws Exception {
    
    
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    @Override
    public void invoke(JSONObject jsonObj, Context context) throws Exception {
    
    
        // 上游传递过来的数据格式如下:
        // {"database":"gmall2022",
        // "data":{"tm_name":"a","id":13},
        // "commit":true,
        // "sink_table":"dim_base_trademark",
        // "type":"insert",
        // "table":"base_trademark","
        // ts":1670131087}

        // 获取维度表表名
        String tableName = jsonObj.getString("sink_table");
        // 获取数据
        JSONObject dataJsonObj = jsonObj.getJSONObject("data");
        // 拼接插入语句 upsert into 表空间.表 (a,b,c) values(aa,bb,cc);
        String upsertSql = genUpsertSql(tableName,dataJsonObj);

        System.out.println("向phoenix维度表中插入数据的sql:" + upsertSql);

        PreparedStatement ps = null;
        try {
    
    
            // 创建数据库操作对象
            ps = conn.prepareStatement(upsertSql);
            // 执行sql语句
            ps.executeUpdate();
            // 手动提交事务,phoenix的连接实现类不是自动提交事务
            conn.commit();
        }catch (SQLException e){
    
    
            e.printStackTrace();
            throw new RuntimeException("向phoenix维度表中插入数据失败了");
        } finally {
    
    
            // 释放资源
            if (ps != null){
    
    
                ps.close();
            }
        }
    }

    // 拼接插入语句
    private String genUpsertSql(String tableName, JSONObject dataJsonObj) {
    
    
        // id 10
        // tm_name zs
        String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "."+ tableName +
                " ("+ StringUtils.join(dataJsonObj.keySet(),",") +
                ") values('"+ StringUtils.join(dataJsonObj.values(),"','")+"')";
        return upsertSql;
    }
}

猜你喜欢

转载自blog.csdn.net/weixin_43923463/article/details/128226822