kafka数据入paimon(flink-sql)

发布时间 2023-12-07 10:16:14作者: whiteY

1.创建CATALOG

CREATE CATALOG paimon_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoopm111:9083',
'warehouse' = 'hdfs:///apps/hive/paimon'
);

2.创建表接kafka数据(数据格式是text)

CREATE TABLE if not exists kafka_bus_user_info_source (
log_new String
) WITH (
'connector' = 'kafka',
'topic' = 'top-bus-user-info',
'properties.bootstrap.servers' = '192.168.11.106:6667,192.168.11.108:6667,
'properties.group.id' = 'top-bus-user-info-paimon',
'scan.startup.mode' = 'group-offsets',
'format' = 'raw',
'sink.parallelism' = '2'
);

创建udf函数(函数功能为抛掉少于指定字段数的日志)

CREATE TEMPORARY SYSTEM FUNCTION rawUdf AS 'cn.leadeon.dbgrp.udf.SplitRawUDTF';

3.使用CATALOG

USE CATALOG paimon_hive;

4.创建paimon表(会同步到hive)

create table if not exists paimon_ods.bus_user_info(
column1 string,
column2 string,
column3 string,
dt string)
PARTITIONED BY (dt)
with(
'write-mode' = 'append-only',
'bucket' = '5',
'bucket-key' = 'column1,column2',
'full-compaction.delta-commits'='20',
'sink.parallelism'='3'
);

5.设置checkpointing

SET execution.checkpointing.interval = '300s';

6.kafka数据写入paimon表

insert into paimon_ods.bus_user_info
SELECT
arr[1] as column1,
arr[2] as column2,
arr[3] as column3,
replace(substr(arr[2],1,10),'-','') as dt
FROM default_catalog.default_database.kafka_bus_user_info_source AS S
left join lateral table(rawUdf(log_new,3)) AS T(arr) on true
where arr[3] is not null;