OceanBase
This documentation is for an unreleased version of Apache Flink CDC. We recommend you use the latest stable version.

OceanBase Connector #

OceanBase Pipeline 连接器可以用作 Pipeline 的 Data Sink,将数据写入OceanBase。 本文档介绍如何设置 OceanBase Pipeline 连接器。

连接器的功能 #

  • 自动建表
  • 表结构变更同步
  • 数据实时同步

示例 #

从 MySQL 读取数据同步到 OceanBase 的 Pipeline 可以定义如下:

source:
  type: mysql
  hostname: mysql
  port: 3306
  username: mysqluser
  password: mysqlpw
  tables: mysql_2_oceanbase_test_17l13vc.\.*
  server-id: 5400-5404
  server-time-zone: UTC

sink:
  type: oceanbase
  url: jdbc:mysql://oceanbase:2881/test
  username: root@test
  password: password

pipeline:
  name: MySQL to OceanBase Pipeline
  parallelism: 1

连接器配置项 #

参数名 是否必需 默认值 类型 描述
type String 指定要使用的连接器, 这里需要设置成 'oceanbase'.
url String 数据库的 JDBC url。
username String 连接用户名。
password String 连接密码。
schema-name String 连接的 schema 名或 db 名。
table-name String 表名。
driver-class-name com.mysql.cj.jdbc.Driver String 驱动类名,默认为 'com.mysql.cj.jdbc.Driver'。同时该connector并不包含对应驱动,需手动引入。
druid-properties String Druid 连接池属性,多个值用分号分隔。
sync-write false Boolean 是否开启同步写,设置为 true 时将不使用 buffer 直接写入数据库。
buffer-flush.interval 1s Duration 缓冲区刷新周期。设置为 '0' 时将关闭定期刷新。
buffer-flush.buffer-size 1000 Integer 缓冲区大小。
max-retries 3 Integer 失败重试次数。
memstore-check.enabled true Boolean 是否开启内存检查。
memstore-check.threshold 0.9 Double 内存使用的阈值相对最大限制值的比例。
memstore-check.interval 30s Duration 内存使用检查周期。
partition.enabled false Boolean 是否启用分区计算功能,按照分区来写数据。仅当 'sync-write' 和 'direct-load.enabled' 都为 false 时生效。
direct-load.enabled false Boolean 是否开启旁路导入。需要注意旁路导入需要将 sink 的并发度设置为1。
direct-load.host String 旁路导入使用的域名或 IP 地址,开启旁路导入时为必填项。
direct-load.port 2882 Integer 旁路导入使用的 RPC 端口,开启旁路导入时为必填项。
direct-load.parallel 8 Integer 旁路导入任务的并发度。
direct-load.max-error-rows 0 Long 旁路导入任务最大可容忍的错误行数目。
direct-load.dup-action REPLACE STRING 旁路导入任务中主键重复时的处理策略。可以是 'STOP_ON_DUP'(本次导入失败),'REPLACE'(替换)或 'IGNORE'(忽略)。
direct-load.timeout 7d Duration 旁路导入任务的超时时间。
direct-load.heartbeat-timeout 30s Duration 旁路导入任务客户端的心跳超时时间。

使用说明 #

  • 暂仅支持OceanBase的MySQL租户

  • at-least-once语义保证,暂不支持 exactly-once

  • 对于自动建表

    • 没有分区键
  • 对于表结构变更同步

    • 暂只支持新增列、重命名列
    • 新增列只能添加到最后一列
  • 对于数据同步,pipeline 连接器使用 OceanBase Sink 连接器 将数据写入 OceanBase,具体可以参考 Sink 文档

数据类型映射 #

CDC type OceanBase type under MySQL tenant NOTE
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP TIMESTAMP
TIMESTAMP_LTZ TIMESTAMP
CHAR(n) where n <= 256 CHAR(n)
CHAR(n) where n > 256 VARCHAR(n)
VARCHAR(n) VARCHAR(n)

Back to top