同步工具-Oceanus打通mysql到Iceberg

发布时间 2024-01-02 09:51:30作者: 黑水滴

一、服务配置

已配置好gt_oneline_2,其它集群还需按照下面方式特殊配置

1、需要腾讯Oceanus同学在后端修改高途flink集群配置core-site.xml文件,增加如下配置。只能绑定一个chdfs环境,这边绑定的测试环境,线上环境需要改qcloud.object.storage.zk.address

<property>
    <name>fs.AbstractFileSystem.ofs.impl</name>
    <value>com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter</value>
</property>
<property>
    <name>fs.ofs.impl</name>
    <value>com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter</value>
</property>
<property>
    <name>fs.ofs.tmp.cache.dir</name>
    <value>/tmp/chdfs/</value>
</property>
<property>
    <name>fs.ofs.user.appid</name>
    <value>1234</value>
</property>
<property>
    <name>fs.ofs.bucket.region</name>
    <value>ap-beijing</value>
</property>

 2、chdfs产品挂载点调整

测试环境挂载点:gaotu-chdfs-test

挂载地址:chdfs产品查看地址

权限组增加Oceanus的信息,Oceanus伙伴提供vpc信息,chdfs伙伴王帅后端添加。

      如:VpcId: 无权限 | 权限组Id: ag-1234

 

二、任务配置

可参考test_iceberg_2的V7版本,最精简

1、登录hadoop集群hive metastore节点,如测试环境127,获取以下配置文件

(1)krb认证文件:bdg_app.keytab

路径:/root/add_princals/certificate

(2)krb配置文件:krb5.conf

路径:/etc/krb5.conf

(3)hdfs配置:core-site.xml

路径:/usr/local/service/hadoop/etc/hadoop/core-site.xml

(4)hdfs配置:hdfs-site.xml

路径:/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml

(5)hive配置:hive-site.xml

路径:/usr/local/service/hive/conf/hive-site.xml

(6)空配置:hivemetastore-site.xml

下载地址:https://cloud.tencent.com/document/product/849/55238

(6)空配置:hiveserver2-site.xml

下载地址同上:https://cloud.tencent.com/document/product/849/55238

 

2、配置文件打包到一起

jar cvf hdfs-bdg_app.jar krb5.conf bdg_app.keytab core-site.xml hdfs-site.xml hive-site.xml hivemetastore-site.xml hiveserver2-site.xml

 

3、特殊依赖jar下载

地址:https://cloud.tencent.com/document/product/849/53852

文件:flink-chdfs-hadoop-1.10.0-0.1.4.jar(改良后仅这一个文件即可,下图可忽略)

 

 

4、Oceanus依赖管理上传依赖信息(改良后就两个文件即可,测试阶段是四个)

flink-chdfs-hadoop-1.10.0-0.1.4.jar (V1),hdfs-bdg_app.jar (V2)

 

5、任务配置上面四个依赖,以及高级参数。线上需要替换值

pipeline.max-parallelism: 2048
security.kerberos.login.principal: bdg_app@EMR-QX9XE1IH
security.kerberos.login.keytab: bdg_app.keytab
security.kerberos.login.conf: krb5.conf
containerized.taskmanager.env.HADOOP_USER_NAME: bdg_app
containerized.master.env.HADOOP_USER_NAME: bdg_app

 

6、任务配置信息

CREATE TABLE `test_mysql_metrices` (
  id                            bigint
  ,metric_id                    STRING
  ,`version`                    int
  ,name_en                      STRING
  ,name_cn                      STRING
  ,biz_code                     int
  ,topic_code                   int
  ,procedure_code               int
  ,create_time                  timestamp
  ,update_time                  timestamp
  ,isdel                        tinyint
  ,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
  'connector' = 'mysql-cdc',      -- 固定值 'mysql-cdc'
  'hostname' = 'mysql地址',   -- 数据库的 IP
  'port' = '3306',                -- 数据库的访问端口
  'username' = 'user',        -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
  'password' = 'pass',    -- 数据库访问的密码
  -- 'scan.incremental.snapshot.enabled' = 'false' -- 如果 source 表没有设置 PRIMARY Key,需要启用该设置
  'database-name' = 'umetric',   -- 需要同步的数据库
  'table-name' = 'metrices'      -- 需要同步的数据表名
);
 
CREATE TABLE `stock_basic_iceberg_sink` (
  `id`  bigint NOT NULL,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'iceberg',
  'write.upsert.enabled'='true', -- 是否开启upsert
  'catalog-type' = 'hive',
  'catalog-name'='iceberg_catalog',
  'catalog-database'='bdg_app',
  'catalog-table'='test2',
  -- Hive metastore 的 thrift URI,可以从hive-site.xml配置文件中获取,对应的Key为:hive-metastore-uris
  'uri'='thrift://127.0.0.:7004',
  'engine.hive.enabled' = 'true',
  'format-version' = '2'
);
 
insert into stock_basic_iceberg_sink select id from test_mysql_metrices;