This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
MongoDB CDC 连接器 #
MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。 本文档描述了如何设置 MongoDB CDC 连接器以针对 MongoDB 运行 SQL 查询。
依赖 #
为了设置 MongoDB CDC 连接器, 下表提供了使用构建自动化工具(如 Maven 或 SBT )和带有 SQLJar 捆绑包的 SQLClient 的两个项目的依赖关系信息。
Maven dependency #
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<!-- 请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->
<version>3.3-SNAPSHOT</version>
</dependency>
SQL Client JAR #
下载链接仅适用于稳定版本。
下载 flink-sql-connector-mongodb-cdc-3.5-SNAPSHOT.jar, 把它放在 <FLINK_HOME>/lib/
.
注意: 参考 flink-sql-connector-mongodb-cdc, 当前已发布的版本将在 Maven 中央仓库中提供。
设置 MongoDB #
可用性 #
-
MongoDB 版本
MongoDB 版本 >= 3.6
我们使用 更改流 功能(3.6 版中新增),以捕获更改数据。 -
集群部署
-
存储引擎
WiredTiger 存储引擎是必需的。
-
副本集协议版本 1 (pv1) 是必需的。
从 4.0 版本开始,MongoDB 只支持pv1。 pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。 -
权限
changeStream
andread
是 MongoDB Kafka Connector 必需权限。你可以使用以下示例进行简单的授权。
有关更详细的授权, 请参照 MongoDB 数据库用户角色.use admin; db.createRole( { role: "flinkrole", privileges: [{ // 所有数据库中所有非系统集合的 grant 权限 resource: { db: "", collection: "" }, actions: [ "splitVector", "listDatabases", "listCollections", "collStats", "find", "changeStream" ] }], roles: [ // 阅读 config.collections 和 config.chunks // 用于分片集群快照拆分。 { role: 'read', db: 'config' } ] } ); db.createUser( { user: 'flinkuser', pwd: 'flinkpw', roles: [ { role: 'flinkrole', db: 'admin' } ] } );
如何创建 MongoDB CDC 表 #
MongoDB CDC 表可以定义如下:
-- 在 Flink SQL 中注册 MongoDB 表 `products`
CREATE TABLE products (
_id STRING, // 必须声明
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
-- 从 `products` 集合中读取快照和更改事件
SELECT * FROM products;
请注意
MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能将其转换为 Flink 的 UPSERT 更改日志流。
因为 upsert 流需要唯一键,所以我们必须声明 _id
作为主键。
我们不能将其他列声明为主键, 因为删除操作不包含除 _id
和 sharding key
之外的键和值。
连接器选项 #
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | 指定要使用的连接器,此处应为 mongodb-cdc . |
scheme | optional | mongodb | String | 指定 MongoDB 连接协议。 eg. mongodb or mongodb+srv. |
hosts | required | (none) | String | MongoDB 服务器的主机名和端口对的逗号分隔列表。 eg. localhost:27017,localhost:27018
|
username | optional | (none) | String | 连接到 MongoDB 时要使用的数据库用户的名称。 只有当 MongoDB 配置为使用身份验证时,才需要这样做。 |
password | optional | (none) | String | 连接到 MongoDB 时要使用的密码。 只有当 MongoDB 配置为使用身份验证时,才需要这样做。 |
database | optional | (none) | String | 要监视更改的数据库的名称。 如果未设置,则将捕获所有数据库。 该数据库还支持正则表达式来监视与正则表达式匹配的多个数据库。 |
collection | optional | (none) | String | 数据库中要监视更改的集合的名称。 如果未设置,则将捕获所有集合。 该集合还支持正则表达式来监视与完全限定的集合标识符匹配的多个集合。 |
connection.options | optional | (none) | String | MongoDB连接选项。 例如: replicaSet=test&connectTimeoutMS=300000
|
scan.startup.mode | optional | initial | String | MongoDB CDC 消费者可选的启动模式, 合法的模式为 "initial","latest-offset" 和 "timestamp"。 请查阅 启动模式 章节了解更多详细信息。 |
scan.startup.timestamp-millis | optional | (none) | Long | 起始毫秒数, 仅适用于 'timestamp' 启动模式. |
initial.snapshotting.queue.size | optional | 16000 | Integer | 进行初始快照时的队列大小。仅在 scan.startup.mode 选项设置为 initial 时生效。 注意:已弃用的选项名是 copy.existing.queue.size,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名 |
initial.snapshotting.max.threads | optional | Processors Count | Integer | 执行数据复制时使用的线程数。仅在 scan.startup.mode 选项设置为 initial 时生效。 注意:已弃用的选项名是 copy.existing.max.threads,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名 |
initial.snapshotting.pipeline | optional | (none) | String | MongoDB 管道操作的 JSON 对象数组,在快照读取阶段,会把该操作下推到 MongoDB,只筛选所需的数据,从而提高读取效率,
比如管道操作 [{"$match": {"closed": "false"}}] 表示只复制 closed 字段为 "false" 的文档。 该选项仅在 scan.startup.mode 选项设置为 initial 时生效,且仅限于在 Debezium 模式下使用,不能用于增量快照模式,因为会出现语义不一致的问题。 注意:已弃用的选项名是 copy.existing.pipeline,为了兼容旧版本的作业,该选项名仍可用,但是推荐升级到新选项名 |
batch.size | optional | 1024 | Integer | Cursor 批次大小。 |
poll.max.batch.size | optional | 1024 | Integer | 轮询新数据时,单个批处理中要包含的更改流文档的最大数量。 |
poll.await.time.ms | optional | 1000 | Integer | 在更改流上检查新结果之前等待的时间。 |
heartbeat.interval.ms | optional | 0 | Integer | 心跳间隔(毫秒)。使用 0 禁用。 |
scan.incremental.snapshot.enabled | optional | false | Boolean | 是否启用增量快照。增量快照功能仅支持 MongoDB 4.0 之后的版本。 |
scan.incremental.snapshot.chunk.size.mb | optional | 64 | Integer | 增量快照的区块大小 mb。 |
scan.incremental.close-idle-reader.enabled | optional | false | Boolean | 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | optional | false | Boolean |
快照读取阶段是否先分配 UnboundedChunk。 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。 这是一项实验特性,默认为 false。 |
scan.incremental.snapshot.backfill.skip | optional | false | Boolean |
Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot. WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially. |
注意: heartbeat.interval.ms
强烈建议设置一个大于 0 的适当值 如果集合更改缓慢.
当我们从检查点或保存点恢复 Flink 作业时,心跳事件可以向前推送 resumeToken
,以避免 resumeToken
过期。
可用元数据 #
以下格式元数据可以在表定义中公开为只读(VIRTUAL)列。
Key | DataType | Description |
---|---|---|
database_name | STRING NOT NULL | 包含该行的数据库的名称。 |
collection_name | STRING NOT NULL | 包含该行的集合的名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 它指示在数据库中进行更改的时间。 如果记录是从表的快照而不是改变流中读取的,该值将始终为0。 |
row_kind | STRING NOT NULL | 当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL 算子在处理消息撤回时会因为这个字段不同而比对失败,
建议只在简单的同步作业中引用该元数据列。 '+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。 |
扩展的 CREATE TABLE 示例演示了用于公开这些元数据字段的语法:
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
_id STRING, // 必须声明
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'products'
);
特性 #
精确一次处理 #
MongoDB CDC 连接器是一个 Flink Source 连接器,它将首先读取数据库快照,然后在处理甚至失败时继续读取带有的更改流事件。
启动模式 #
配置选项scan.startup.mode
指定 MongoDB CDC 消费者的启动模式。有效枚举包括:
initial
(默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 oplog。latest-offset
:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 oplog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。timestamp
:跳过快照阶段,从指定的时间戳开始读取 oplog 事件。
例如使用 DataStream API:
MongoDBSource.builder()
.startupOptions(StartupOptions.latest()) // Start from latest offset
.startupOptions(StartupOptions.timestamp(1667232000000L) // Start from timestamp
.build()
and with SQL:
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
...
'scan.incremental.snapshot.enabled' = 'true', -- 指定时间戳启动,需要开启增量快照读
'scan.startup.mode' = 'timestamp', -- 指定时间戳启动模式
'scan.startup.timestamp-millis' = '1667232000000' -- 启动毫秒时间
...
)
Notes:
- ’timestamp’ 指定时间戳启动模式,需要开启增量快照读。
快照数据筛选器 #
配置选项 initial.snapshotting.pipeline
描述复制现有数据时的筛选器。
在快照读取阶段,会把该筛选器下推到 MongoDB,只筛选所需的数据,从而提高读取效率。
在下面的示例中,$match
聚合运算符确保只复制 closed 字段设置为 “false” 的文档。
'initial.snapshotting.pipeline' = '[ { "$match": { "closed": "false" } } ]'
更改流 #
我们将 MongoDB’s official Kafka Connector 从 MongoDB 中读取快照或更改事件,并通过 Debezium 的 EmbeddedEngine
进行驱动。
Debezium 的 EmbeddedEngine
提供了一种在应用程序进程中运行单个 Kafka Connect SourceConnector
的机制,并且它可以正确地驱动任何标准的 Kafka Connect SourceConnector
,即使它不是由 Debezium 提供的。
我们选择 MongoDB 的官方 Kafka连接器,而不是 Debezium 的MongoDB 连接器,因为它们使用了不同的更改数据捕获机制。
- 对于 Debezium 的 MongoDB 连接器,它读取每个复制集主节点的
oplog.rs
集合。 - 对于 MongoDB 的 Kafka 连接器,它订阅了 MongoDB 的
更改流
。
MongoDB 的oplog.rs
集合没有在状态之前保持更改记录的更新, 因此,很难通过单个 oplog.rs
记录提取完整的文档状态,并将其转换为 Flink 接受的更改日志流(Insert Only,Upsert,All)。
此外,MongoDB 5(2021 7月发布)改变了 oplog 格式,因此当前的 Debezium 连接器不能与其一起使用。
Change Stream是 MongoDB 3.6 为副本集和分片集群提供的一项新功能,它允许应用程序访问实时数据更改,而不会带来跟踪操作日志的复杂性和风险。
应用程序可以使用更改流来订阅单个集合上的所有数据更改, 数据库或整个部署,并立即对其做出反应。
查找更新操作的完整文档是变更流提供的一项功能,它可以配置变更流以返回更新文档的最新多数提交版本。由于该功能,我们可以轻松收集最新的完整文档,并将更改日志转换为 Flink 的Upsert Changelog Stream。
顺便说一句,DBZ-435提到的Debezium的MongoDB变更流探索,正在制定路线图。
如果完成了,我们可以考虑集成两种源连接器供用户选择。
DataStream Source #
MongoDB CDC 连接器也可以是一个数据流源。 你可以创建 SourceFunction,如下所示:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mongodb.MongoDBSource;
public class MongoDBSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
.hosts("localhost:27017")
.username("flink")
.password("flinkpw")
.databaseList("inventory") // 设置捕获的数据库,支持正则表达式
.collectionList("inventory.products", "inventory.orders") //设置捕获的集合,支持正则表达式
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(sourceFunction)
.print().setParallelism(1); // 对 sink 使用并行度 1 以保持消息顺序
env.execute();
}
}
MongoDB CDC 增量连接器(2.3.0 之后)可以使用,如下所示:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
public class MongoDBIncrementalSourceExample {
public static void main(String[] args) throws Exception {
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts("localhost:27017")
.databaseList("inventory") // 设置捕获的数据库,支持正则表达式
.collectionList("inventory.products", "inventory.orders") //设置捕获的集合,支持正则表达式
.username("flink")
.password("flinkpw")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点
env.enableCheckpointing(3000);
// 将 source 并行度设置为 2
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
.setParallelism(2)
.print()
.setParallelism(1);
env.execute("Print MongoDB Snapshot + Change Stream");
}
}
注意:
- 如果使用数据库正则表达式,则需要
readAnyDatabase
角色。 - 增量快照功能仅支持 MongoDB 4.0 之后的版本。
可用的指标 #
指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 Flink metrics:
Group | Name | Type | Description |
---|---|---|---|
namespace.schema.table | isSnapshotting | Gauge | 表是否在快照读取阶段 |
namespace.schema.table | isStreamReading | Gauge | 表是否在增量读取阶段 |
namespace.schema.table | numTablesSnapshotted | Gauge | 已经被快照读取完成的表的数量 |
namespace.schema.table | numTablesRemaining | Gauge | 还没有被快照读取的表的数据 |
namespace.schema.table | numSnapshotSplitsProcessed | Gauge | 正在处理的分片的数量 |
namespace.schema.table | numSnapshotSplitsRemaining | Gauge | 还没有被处理的分片的数量 |
namespace.schema.table | numSnapshotSplitsFinished | Gauge | 已经处理完成的分片的数据 |
namespace.schema.table | snapshotStartTime | Gauge | 快照读取阶段开始的时间 |
namespace.schema.table | snapshotEndTime | Gauge | 快照读取阶段结束的时间 |
注意:
- Group 名称是
namespace.schema.table
,这里的namespace
是实际的数据库名称,schema
是实际的 schema 名称,table
是实际的表名称。 - 对于 MongoDB,这里的
namespace
会被设置成默认值 “",也就是一个空字符串,Group 名称的格式会类似于test_database.test_table
。
完整的 Changelog #
MongoDB 6.0 以及更高的版本支持发送变更流事件,其中包含文档的更新前和更新后的内容(或者说数据的前后镜像)。
-
前镜像是指被替换、更新或删除之前的文档。对于插入操作没有前镜像。
-
后镜像是指被替换、更新或删除之后的文档。对于删除操作没有后镜像。
MongoDB CDC 能够使用前镜像和后镜像来生成完整的变更日志流,包括插入、更新前、更新后和删除的数据行,从而避免了额外的 ChangelogNormalize
下游节点。
为了启用此功能,你需要满足以下条件:
- MongoDB 的版本必须为 6.0 或更高版本。
- 启用
preAndPostImages
功能。
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' // replace with custom image expiration time
}
}
}
})
- 为希望监控的 collection 启用
changeStreamPreAndPostImages
功能:
db.runCommand({
collMod: "<< collection name >>",
changeStreamPreAndPostImages: {
enabled: true
}
})
在 DataStream 中开启 MongoDB CDC 的 scan.full-changelog
功能:
MongoDBSource.builder()
.scanFullChangelog(true)
...
.build()
或者使用 Flink SQL:
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.full-changelog' = 'true',
...
)
数据类型映射 #
BSON 二进制 JSON的缩写是一种类似 JSON 格式的二进制编码序列,用于在 MongoDB 中存储文档和进行远程过程调用。
Flink SQL Data Type 类似于 SQL 标准的数据类型术语,该术语描述了表生态系统中值的逻辑类型。它可以用于声明操作的输入和/或输出类型。
为了使 Flink SQL 能够处理来自异构数据源的数据,异构数据源的数据类型需要统一转换为 Flink SQL 数据类型。
以下是 BSON 类型和 Flink SQL 类型的映射。
BSON type | Flink SQL type |
---|---|
TINYINT | |
SMALLINT | |
Int | INT |
Long | BIGINT |
FLOAT | |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
DateTimestamp | DATE |
DateTimestamp | TIME |
Date | TIMESTAMP(3)TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0)TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex |
STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point : ROW<type STRING, coordinates ARRAY<DOUBLE>> Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> ... |