This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.
OceanBase
OceanBase Connector #
OceanBase Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入OceanBase。 本文档介绍如何设置 OceanBase Pipeline 连接器。
连接器的功能 #
- 自动建表
- 表结构变更同步
- 数据实时同步
示例 #
从 MySQL 读取数据同步到 OceanBase 的 Pipeline 可以定义如下:
source:
type: mysql
hostname: mysql
port: 3306
username: mysqluser
password: mysqlpw
tables: mysql_2_oceanbase_test_17l13vc.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: oceanbase
url: jdbc:mysql://oceanbase:2881/test
username: root@test
password: password
pipeline:
name: MySQL to OceanBase Pipeline
parallelism: 1
连接器配置项 #
参数名 | 是否必需 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
type | 是 | String | 指定要使用的连接器, 这里需要设置成 'oceanbase' . |
|
url | 是 | String | 数据库的 JDBC url。 | |
username | 是 | String | 连接用户名。 | |
password | 是 | String | 连接密码。 | |
schema-name | 否 | String | 连接的 schema 名或 db 名。 | |
table-name | 否 | String | 表名。 | |
driver-class-name | 否 | com.mysql.cj.jdbc.Driver | String | 驱动类名,默认为 'com.mysql.cj.jdbc.Driver'。同时该connector并不包含对应驱动,需手动引入。 |
druid-properties | 否 | String | Druid 连接池属性,多个值用分号分隔。 | |
sync-write | 否 | false | Boolean | 是否开启同步写,设置为 true 时将不使用 buffer 直接写入数据库。 |
buffer-flush.interval | 否 | 1s | Duration | 缓冲区刷新周期。设置为 '0' 时将关闭定期刷新。 |
buffer-flush.buffer-size | 否 | 1000 | Integer | 缓冲区大小。 |
max-retries | 否 | 3 | Integer | 失败重试次数。 |
memstore-check.enabled | 否 | true | Boolean | 是否开启内存检查。 |
memstore-check.threshold | 否 | 0.9 | Double | 内存使用的阈值相对最大限制值的比例。 |
memstore-check.interval | 否 | 30s | Duration | 内存使用检查周期。 |
partition.enabled | 否 | false | Boolean | 是否启用分区计算功能,按照分区来写数据。仅当 'sync-write' 和 'direct-load.enabled' 都为 false 时生效。 |
direct-load.enabled | 否 | false | Boolean | 是否开启旁路导入。需要注意旁路导入需要将 sink 的并发度设置为1。 |
direct-load.host | 否 | String | 旁路导入使用的域名或 IP 地址,开启旁路导入时为必填项。 | |
direct-load.port | 否 | 2882 | Integer | 旁路导入使用的 RPC 端口,开启旁路导入时为必填项。 |
direct-load.parallel | 否 | 8 | Integer | 旁路导入任务的并发度。 |
direct-load.max-error-rows | 否 | 0 | Long | 旁路导入任务最大可容忍的错误行数目。 |
direct-load.dup-action | 否 | REPLACE | STRING | 旁路导入任务中主键重复时的处理策略。可以是 'STOP_ON_DUP'(本次导入失败),'REPLACE'(替换)或 'IGNORE'(忽略)。 |
direct-load.timeout | 否 | 7d | Duration | 旁路导入任务的超时时间。 |
direct-load.heartbeat-timeout | 否 | 30s | Duration | 旁路导入任务客户端的心跳超时时间。 |
使用说明 #
-
暂仅支持OceanBase的MySQL租户
-
at-least-once语义保证,暂不支持 exactly-once
-
对于自动建表
- 没有分区键
-
对于表结构变更同步
- 暂只支持新增列、重命名列
- 新增列只能添加到最后一列
-
对于数据同步,pipeline 连接器使用 OceanBase Sink 连接器 将数据写入 OceanBase,具体可以参考 Sink 文档。
数据类型映射 #
CDC type | OceanBase type under MySQL tenant | NOTE |
---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
DECIMAL(p, s) | DECIMAL(p, s) | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | TIMESTAMP | |
TIMESTAMP_LTZ | TIMESTAMP | |
CHAR(n) where n <= 256 | CHAR(n) | |
CHAR(n) where n > 256 | VARCHAR(n) | |
VARCHAR(n) | VARCHAR(n) |