普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
FAQ
  • CDC同步关系型数据库示例
  • 环境准备说明
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# CDC同步关系型数据库示例

本示例演示从 MySQL 数据库表 empsinfo中实时读取数据,通过 FieldMapper 组件进行字段配置,去掉 EMPNO字段后,将数据写入 MySQL empsinfo_miggration表中。

主要步骤如下:

  • 环境准备说明
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 运行
  • 查看数据
  • 提交版本

# 环境准备说明

增量同步与多表迁移的向导界面类似,且都支持多表增量同步。与多表迁移的不同点在于,增量同步不是一次性将数据从源表同步到目标表,而是源表的数据发生变化(增加、修改、删除)时才会同步到目标表。

提示:

1、源表和目标表都必须定义主键;
2、目标表不存在时,模型运行时会自动创建表,和源表结构一致;
3、目标表存在时,要确保源表与目标表的数据结构、字段类型一致。同时源表开启了 cdc 配置。

CDC 增量同步支持的数据源如下,需要提前对来源数据库进行配置,具体的配置方法参考:CDC连接器组件使用说明

数据源类型 版本 驱动
MySQL-CDC MySQL: 5.6,5.7,8.0.x JDBC Driver: 8.0.28
PostgreSQL-CDC PostgresSQL: 10, 11, 12, 13,14 JDBC Driver: 42.5.1
SQLServer-CDC Sqlserver: 2012, 2014, 2016, 2017, 2019 JDBC Driver: 9.4.1.jre8
MongoDB-CDC MongoDB: 3.12.11 JDBC Driver: 3.12.11
Oracle-CDC] Oracle: 12c, 19c JDBC Driver: 19.3.0.0

# 准备数据

创建表 empsinfo 和 empsinfo_miggration 分别作为源表和目标表,并给源表empsinfoINSERT 一些数据。

-- ---------------------------------------
-- Table structure for empsinfo
-- ---------------------------------------
DROP TABLE IF EXISTS `empsinfo`;
CREATE TABLE `empsinfo` (
  `ID` int NOT NULL,
  `NAME` varchar(10) DEFAULT NULL,
  `AGE` decimal(3,0) DEFAULT NULL,
  `EMPNO` int DEFAULT NULL,
  PRIMARY KEY (`ID`)
);

-- ---------------------------------------
-- Table structure for empsinfo_miggration
-- ---------------------------------------
DROP TABLE IF EXISTS `empsinfo_miggration`;
CREATE TABLE `empsinfo_miggration` (
  `ID` int NOT NULL,
  `NAME` varchar(10) DEFAULT NULL,
  `AGE` decimal(3,0) DEFAULT NULL,
  `EMPNO` int DEFAULT NULL,
  PRIMARY KEY (`ID`)
);

-- ----------------------------
-- Records of empsinfo
-- ----------------------------
BEGIN;
INSERT INTO `empsinfo` (`ID`, `NAME`, `AGE`, `EMPNO`) VALUES (10001, 'WARD', 25, 7521);
INSERT INTO `empsinfo` (`ID`, `NAME`, `AGE`, `EMPNO`) VALUES (10002, 'JONES', 32, 7566);
INSERT INTO `empsinfo` (`ID`, `NAME`, `AGE`, `EMPNO`) VALUES (10003, 'BLAKE', 15, 7698);
INSERT INTO `empsinfo` (`ID`, `NAME`, `AGE`, `EMPNO`) VALUES (10004, 'SCOTT', 53, 7788);
INSERT INTO `empsinfo` (`ID`, `NAME`, `AGE`, `EMPNO`) VALUES (10005, 'KING', 22, 7839);
COMMIT;

# 新建同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:CDCSource-FieldMapper-JDBCSink。

# 拖拽图元

依次拖拽数据源中的CDC组件、转换中的FieldMapper组件和目标中的关系型数据库组件,依次连线。如下图所示:

cdcsource

# 配置组件属性

1、双击"CDC"组件,根据下图所示步骤依次配置。

cdcsource

2、双击"FieldMapper"组件,根据下图所示步骤依次配置。

cdcsource

3、双击"关系型数据库"组件,根据下图所示步骤依次配置。

启用upsert语法:否则会导致cdc源表已同步过的数据发生变更时,目标表更新报错。

image-20241121160117639

4、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

cdcsource

# 查看数据

从SQL客户端中去目标表查看 empsinfo 源表数据已经同步到 empsinfo_miggration 目标表中。

cdcsource

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

cdcsource

← SQL插入数据 ClickHouse同步示例 →