This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
MySQL
MySQL Connector #
MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。
依赖配置 #
由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。
您可能需要手动配置以下依赖,并在提交 YAML 作业时使用 Flink CDC CLI 的 --jar
参数将其传入:
依赖名称 | 说明 |
---|---|
mysql:mysql-connector-java:8.0.27 | 用于连接到 MySQL 数据库。 |
示例 #
从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
连接器配置项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
hostname | required | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
port | optional | 3306 | Integer | MySQL 数据库服务器的整数端口号。 |
username | required | (none) | String | 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。 |
password | required | (none) | String | 连接 MySQL 数据库服务器时使用的密码。 |
tables | required | (none) | String | 需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。 需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。 例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.* |
tables.exclude | optional | (none) | String | 需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。 用法和tables参数相同 |
schema-change.enabled | optional | true | Boolean | 是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。 |
server-id | optional | (none) | String | 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 |
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 |
scan.snapshot.fetch.size | optional | 1024 | Integer | 读取表快照时每次读取数据的最大条数。 |
scan.startup.mode | optional | initial | String | MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 ""snapshot"。 |
scan.startup.specific-offset.file | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 |
scan.startup.specific-offset.pos | optional | (none) | Long | 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 |
scan.startup.specific-offset.gtid-set | optional | (none) | String | 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 |
scan.startup.timestamp-millis | optional | (none) | Long | 在 "timestamp" 启动模式下,启动位点的毫秒时间戳。 |
scan.startup.specific-offset.skip-events | optional | (none) | Long | 在指定的启动位点后需要跳过的事件数量。 |
scan.startup.specific-offset.skip-rows | optional | (none) | Long | 在指定的启动位点后需要跳过的数据行数量。 |
connect.timeout | optional | 30s | Duration | 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。 |
connect.max-retries | optional | 3 | Integer | 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 |
connection.pool.size | optional | 20 | Integer | 连接池大小。 |
jdbc.properties.* | optional | 20 | String | 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. |
heartbeat.interval | optional | 30s | Duration | 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 |
debezium.* | optional | (none) | String | 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。
例如: 'debezium.snapshot.mode' = 'never' .
查看更多关于 Debezium 的 MySQL 连接器属性 |
scan.incremental.close-idle-reader.enabled | optional | false | Boolean | 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。 |
scan.newly-added-table.enabled | optional | false | Boolean | 是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。 |
scan.binlog.newly-added-table.enabled | optional | false | Boolean | 在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。 scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于: scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取; scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。 |
scan.parse.online.schema.changes.enabled | optional | false | Boolean |
是否尝试解析由 gh-ost 或 pt-osc 工具生成的表结构变更事件。
这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
这是一项实验性功能。 |
include-comments.enabled | optional | false | Boolean | 是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。 |
treat-tinyint1-as-boolean.enabled | optional | true | Boolean | 是否将TINYINT(1)类型当做Boolean类型处理,默认true。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | optional | false | Boolean |
快照读取阶段是否先分配 UnboundedChunk。 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。 这是一项实验特性,默认为 false。 |
scan.incremental.snapshot.backfill.skip | optional | false | Boolean |
是否在快照读取阶段跳过 backfill 。 如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。 警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。 |
metadata.list | optional | false | String | 可额外读取的SourceRecord中元数据的列表,后续可直接使用在transform模块,英文逗号 `,` 分割。目前可用值包含:op_ts。 |
use.legacy.json.format | optional | true | Boolean | 是否使用 legacy JSON 格式来转换 Binlog 中的 JSON 类型的数据。 这代表着是否使用 legacy JSON 格式来转换 Binlog 中的 JSON 类型的数据。 如果用户配置 'use.legacy.json.format' = 'true',则从 Binlog 中转换 JSON 类型的数据时,会移除值之前的空格和逗号之后的空格。例如, Binlog 中 JSON 类型的数据 {"key1": "value1", "key2": "value2"} 会被转换为 {"key1":"value1","key2":"value2"}。 如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。 |
启动模式 #
配置选项scan.startup.mode
指定 MySQL CDC 使用者的启动模式。有效枚举包括:
initial
(默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。earliest-offset
:跳过快照阶段,从可读取的最早 binlog 位点开始读取latest-offset
:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。specific-offset
:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。timestamp
:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。snapshot
: 只进行快照阶段,跳过增量阶段,快照阶段读取结束后退出。
例如,可以在 YAML 配置文件中这样指定启动模式:
source:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.mode: snapshot # Read snapshot only
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
可用的指标 #
指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 Flink metrics:
Group | Name | Type | Description |
---|---|---|---|
namespace.schema.table | isSnapshotting | Gauge | 表是否在快照读取阶段 |
namespace.schema.table | isStreamReading | Gauge | 表是否在增量读取阶段 |
namespace.schema.table | numTablesSnapshotted | Gauge | 已经被快照读取完成的表的数量 |
namespace.schema.table | numTablesRemaining | Gauge | 还没有被快照读取的表的数据 |
namespace.schema.table | numSnapshotSplitsProcessed | Gauge | 正在处理的分片的数量 |
namespace.schema.table | numSnapshotSplitsRemaining | Gauge | 还没有被处理的分片的数量 |
namespace.schema.table | numSnapshotSplitsFinished | Gauge | 已经处理完成的分片的数据 |
namespace.schema.table | snapshotStartTime | Gauge | 快照读取阶段开始的时间 |
namespace.schema.table | snapshotEndTime | Gauge | 快照读取阶段结束的时间 |
注意:
- Group 名称是
namespace.schema.table
,这里的namespace
是实际的数据库名称,schema
是实际的 schema 名称,table
是实际的表名称。 - 对于 MySQL,这里的
namespace
会被设置成默认值 “",也就是一个空字符串,Group 名称的格式会类似于test_database.test_table
。
数据类型映射 #
MySQL type | CDC type | NOTE |
---|---|---|
TINYINT(n) | TINYINT | |
SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL |
SMALLINT | |
INT YEAR MEDIUMINT MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL |
INT | |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL |
BIGINT | |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL |
DECIMAL(20, 0) | |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL |
FLOAT | |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL FLOAT(p, s) REAL(p, s) DOUBLE(p, s) |
DOUBLE | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 |
DECIMAL(p, s) | |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 |
STRING | 在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。 |
BOOLEAN TINYINT(1) BIT(1) |
BOOLEAN | |
DATE | DATE | |
TIME [(p)] | TIME [(p)] | |
TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] | |
DATETIME [(p)] | TIMESTAMP [(p)] | |
CHAR(n) | CHAR(n) | |
VARCHAR(n) | VARCHAR(n) | |
BIT(n) | BINARY(⌈(n + 7) / 8⌉) | |
BINARY(n) | BINARY(n) | |
VARBINARY(N) | VARBINARY(N) | |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT LONG LONG VARCHAR |
STRING | |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES | 目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。 |
ENUM | STRING | |
JSON | STRING | JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。 |
SET | - | 暂不支持 |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION |
STRING | MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。 |
空间数据类型映射 #
MySQL中除GEOMETRYCOLLECTION
之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
字段srid
标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。
由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid
将始终为 0。
字段type
标识空间数据类型,例如POINT
/LINESTRING
/POLYGON
。
字段coordinates
表示空间数据的坐标
。
对于GEOMETRYCOLLECTION
,它将转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
Geometrics
字段是一个包含所有空间数据的数组。
不同空间数据类型映射的示例如下:
Spatial data in MySQL | Json String converted in Flink |
---|---|
POINT(1 1) | {"coordinates":[1,1],"type":"Point","srid":0} |
LINESTRING(3 0, 3 3, 3 5) | {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0} |
POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) | {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0} |
MULTIPOINT((1 1),(2 2)) | {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0} |
MultiLineString((1 1,2 2,3 3),(4 4,5 5)) | {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0} |
MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) | {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0} |
GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) | {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0} |