MySQL
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

MySQL Connector #

MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。

依赖配置 #

由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。 您可能需要手动配置以下依赖,并在提交 YAML 作业时使用 Flink CDC CLI 的 --jar 参数将其传入:

依赖名称 说明
mysql:mysql-connector-java:8.0.27 用于连接到 MySQL 数据库。

示例 #

从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass

pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4

连接器配置项 #

Option Required Default Type Description
hostname required (none) String MySQL 数据库服务器的 IP 地址或主机名。
port optional 3306 Integer MySQL 数据库服务器的整数端口号。
username required (none) String 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称。
password required (none) String 连接 MySQL 数据库服务器时使用的密码。
tables required (none) String 需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
tables.exclude optional (none) String 需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。
用法和tables参数相同
schema-change.enabled optional true Boolean 是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。
server-id optional (none) String 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。
scan.incremental.snapshot.chunk.size optional 8096 Integer 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。
scan.snapshot.fetch.size optional 1024 Integer 读取表快照时每次读取数据的最大条数。
scan.startup.mode optional initial String MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 ""snapshot"。
scan.startup.specific-offset.file optional (none) String 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。
scan.startup.specific-offset.pos optional (none) Long 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。
scan.startup.specific-offset.gtid-set optional (none) String 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。
scan.startup.timestamp-millis optional (none) Long 在 "timestamp" 启动模式下,启动位点的毫秒时间戳。
scan.startup.specific-offset.skip-events optional (none) Long 在指定的启动位点后需要跳过的事件数量。
scan.startup.specific-offset.skip-rows optional (none) Long 在指定的启动位点后需要跳过的数据行数量。
connect.timeout optional 30s Duration 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。
connect.max-retries optional 3 Integer 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。
connection.pool.size optional 20 Integer 连接池大小。
jdbc.properties.* optional 20 String 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.
heartbeat.interval optional 30s Duration 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。
debezium.* optional (none) String 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 例如: 'debezium.snapshot.mode' = 'never'. 查看更多关于 Debezium 的 MySQL 连接器属性
scan.incremental.close-idle-reader.enabled optional false Boolean 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。
scan.newly-added-table.enabled optional false Boolean 是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。
scan.binlog.newly-added-table.enabled optional false Boolean 在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。
scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于:
scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取;
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
scan.parse.online.schema.changes.enabled optional false Boolean 是否尝试解析由 gh-ostpt-osc 工具生成的表结构变更事件。 这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
这是一项实验性功能。
include-comments.enabled optional false Boolean 是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。
treat-tinyint1-as-boolean.enabled optional true Boolean 是否将TINYINT(1)类型当做Boolean类型处理,默认true。
scan.incremental.snapshot.unbounded-chunk-first.enabled optional false Boolean 快照读取阶段是否先分配 UnboundedChunk。
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。
这是一项实验特性,默认为 false。
scan.incremental.snapshot.backfill.skip optional false Boolean 是否在快照读取阶段跳过 backfill 。
如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
metadata.list optional false String 可额外读取的SourceRecord中元数据的列表,后续可直接使用在transform模块,英文逗号 `,` 分割。目前可用值包含:op_ts。
use.legacy.json.format optional true Boolean 是否使用 legacy JSON 格式来转换 Binlog 中的 JSON 类型的数据。
这代表着是否使用 legacy JSON 格式来转换 Binlog 中的 JSON 类型的数据。 如果用户配置 'use.legacy.json.format' = 'true',则从 Binlog 中转换 JSON 类型的数据时,会移除值之前的空格和逗号之后的空格。例如, Binlog 中 JSON 类型的数据 {"key1": "value1", "key2": "value2"} 会被转换为 {"key1":"value1","key2":"value2"}。 如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。

启动模式 #

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  • snapshot: 只进行快照阶段,跳过增量阶段,快照阶段读取结束后退出。

例如,可以在 YAML 配置文件中这样指定启动模式:

source:
  type: mysql
  scan.startup.mode: earliest-offset                    # Start from earliest offset
  scan.startup.mode: latest-offset                      # Start from latest offset
  scan.startup.mode: specific-offset                    # Start from specific offset
  scan.startup.mode: timestamp                          # Start from timestamp
  scan.startup.mode: snapshot                          # Read snapshot only
  scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
  scan.startup.specific-offset.pos: 4                   # Binlog position under specific offset mode
  scan.startup.specific-offset.gtid-set: 24DA167-...    # GTID set under specific offset startup mode
  scan.startup.timestamp-millis: 1667232000000          # Timestamp under timestamp startup mode
  # ...

可用的指标 #

指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 Flink metrics:

Group Name Type Description
namespace.schema.table isSnapshotting Gauge 表是否在快照读取阶段
namespace.schema.table isStreamReading Gauge 表是否在增量读取阶段
namespace.schema.table numTablesSnapshotted Gauge 已经被快照读取完成的表的数量
namespace.schema.table numTablesRemaining Gauge 还没有被快照读取的表的数据
namespace.schema.table numSnapshotSplitsProcessed Gauge 正在处理的分片的数量
namespace.schema.table numSnapshotSplitsRemaining Gauge 还没有被处理的分片的数量
namespace.schema.table numSnapshotSplitsFinished Gauge 已经处理完成的分片的数据
namespace.schema.table snapshotStartTime Gauge 快照读取阶段开始的时间
namespace.schema.table snapshotEndTime Gauge 快照读取阶段结束的时间

注意:

  1. Group 名称是 namespace.schema.table,这里的 namespace 是实际的数据库名称, schema 是实际的 schema 名称, table 是实际的表名称。
  2. 对于 MySQL,这里的 namespace 会被设置成默认值 “",也就是一个空字符串,Group 名称的格式会类似于 test_database.test_table

数据类型映射 #

MySQL type CDC type NOTE
TINYINT(n) TINYINT
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL
SMALLINT
INT
YEAR
MEDIUMINT
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
INT
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
BIGINT
BIGINT UNSIGNED
BIGINT UNSIGNED ZEROFILL
SERIAL
DECIMAL(20, 0)
FLOAT
FLOAT UNSIGNED
FLOAT UNSIGNED ZEROFILL
FLOAT
REAL
REAL UNSIGNED
REAL UNSIGNED ZEROFILL
DOUBLE
DOUBLE UNSIGNED
DOUBLE UNSIGNED ZEROFILL
DOUBLE PRECISION
DOUBLE PRECISION UNSIGNED
DOUBLE PRECISION UNSIGNED ZEROFILL
FLOAT(p, s)
REAL(p, s)
DOUBLE(p, s)
DOUBLE
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where 38 < p <= 65
STRING 在 MySQL 中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
DATE DATE
TIME [(p)] TIME [(p)]
TIMESTAMP [(p)] TIMESTAMP_LTZ [(p)]
DATETIME [(p)] TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈(n + 7) / 8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
LONG
LONG VARCHAR
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES 目前,对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。
ENUM STRING
JSON STRING JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。
SET - 暂不支持
GEOMETRY
POINT
LINESTRING
POLYGON
MULTIPOINT
MULTILINESTRING
MULTIPOLYGON
GEOMETRYCOLLECTION
STRING MySQL 中的空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 MySQL 空间数据类型映射 章节了解更多详细信息。

空间数据类型映射 #

MySQL中除GEOMETRYCOLLECTION之外的空间数据类型都会转换为 Json 字符串,格式固定,如:

{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}

字段srid标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。 由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid将始终为 0。

字段type标识空间数据类型,例如POINT/LINESTRING/POLYGON

字段coordinates表示空间数据的坐标

对于GEOMETRYCOLLECTION,它将转换为 Json 字符串,格式固定,如:

{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}

Geometrics字段是一个包含所有空间数据的数组。

不同空间数据类型映射的示例如下:

Spatial data in MySQL Json String converted in Flink
POINT(1 1) {"coordinates":[1,1],"type":"Point","srid":0}
LINESTRING(3 0, 3 3, 3 5) {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}
POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}
MULTIPOINT((1 1),(2 2)) {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}
MultiLineString((1 1,2 2,3 3),(4 4,5 5)) {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}
MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}
GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}

Back to top