Realtime Compute for Apache Flink provides powerful features for real-time data ingestion. It simplifies the data ingestion pipeline with features such as automatic switching between full and incremental synchronization, automatic metadata discovery, automatic synchronization of schema evolution, and full database synchronization. These features make real-time data synchronization more efficient and convenient. This topic describes how to quickly build a data ingestion job from MySQL to Hologres.
Background information
For example, a MySQL instance has a tpc_ds database that contains 24 business tables with different table schemas. The instance also has three other databases: user_db1, user_db2, and user_db3. Because of database and table sharding, each of these three databases contains three tables with the same schema, for a total of nine tables named user01 to user09. The following figure shows the databases and tables in the MySQL instance as viewed in the Alibaba Cloud DMS console.
To develop a data ingestion job that synchronizes these tables and their data to Hologres and merges the sharded user tables into a single table in Hologres, perform the following steps:
This topic uses the Flink CDC data ingestion job development (in public preview) feature to perform full database synchronization and merge sharded tables. This feature lets you complete full and incremental data synchronization, and real-time synchronization of schema evolution with a single job.
Prerequisites
If you use a Resource Access Management (RAM) user or RAM role to access the service, ensure that the required permissions are granted. For more information, see Permission Management.
A Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
The upstream and downstream storage is ready.
An RDS for MySQL instance is created. For more information, see Create an RDS for MySQL instance.
A Hologres instance is created. For more information, see Purchase a Hologres instance.
NoteThe RDS for MySQL and Hologres instances must be in the same region and VPC as the Flink workspace. Otherwise, you must establish a network connection. For more information, see How do I access other services across VPCs? and How do I access the internet?.
Test data is prepared and whitelists are configured. For more information, see Prepare MySQL test data and a Hologres database and Configure IP whitelists.
Prepare MySQL test data and a Hologres database
Click tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql to download the test data to your local computer.
In the DMS console, prepare the test data for the RDS for MySQL instance.
Log on to the RDS for MySQL instance from DMS.
For more information, see Log on to an RDS for MySQL instance using DMS.
In the SQL Console window, enter the following commands and click Execute.
The following commands create the tpc_ds, user_db1, user_db2, and user_db3 databases.
CREATE DATABASE tpc_ds; CREATE DATABASE user_db1; CREATE DATABASE user_db2; CREATE DATABASE user_db3;
From the top shortcut menu, click Data Import.
On the Batch Data Import tab, select the destination database and upload the SQL file. Click Submit Application and then Execute Change. In the dialog box that appears, click Confirm Execution.
Repeat this operation to import the corresponding data files into the tpc_ds, user_db1, user_db2, and user_db3 databases.
In the Hologres console, create the my_user database to store the data from the merged user tables.
For more information, see Create a database.
Configure IP whitelists
To allow Flink to access the MySQL and Hologres instances, add the CIDR block of the Flink workspace to the IP whitelists of MySQL and Hologres.
Obtain the CIDR block of the Flink workspace VPC.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column for the target Workspace, choose .
In the Workspace Details dialog box, you can view the CIDR Block of the Flink virtual switch.
Add the Flink CIDR block to the IP whitelist of the RDS for MySQL instance.
For more information, see Configure an IP address whitelist.
Add the Flink CIDR block to the IP whitelist of the Hologres instance.
When you configure a data connection in HoloWeb, set the Logon Method to Password-free Logon for Current User to configure an IP whitelist for the current connection. For more information, see IP whitelists.
Step 1: Develop a data ingestion job
Log on to the Flink development console and create a job.
In the left navigation pane, go to . On the page that opens, click New.
You can click Blank Data Ingestion Draft.
Flink provides a rich set of code templates. Each template provides specific scenarios, code samples, and instructions. You can click a template to quickly understand Flink features and related syntax, and then implement your business logic.
Click Next.
In the New Data Ingestion Job Draft dialog box, configure the job parameters.
Job Parameter
Description
Example
File Name
The name of the job.
NoteThe job name must be unique in the current project.
flink-test
Location
The folder where the code file for the job is stored.
You can also click the
icon next to an existing folder to create a subfolder.
Job Drafts
Engine Version
The Flink engine version used by the current job. For more information about engine version numbers, version compatibility, and lifecycle milestones, see Engine versions.
vvr-11.1-jdk11-flink-1.20
Click OK.
Copy the following job code to the editor.
The following sample code synchronizes all tables from the tpc_ds database to Hologres and merges the sharded user tables into a single table in Hologres.
source: type: mysql name: MySQL Source hostname: localhost port: 3306 username: username password: password tables: tpc_ds.\.*,user_db[0-9]+.user[0-9]+ server-id: 8601-8604 # (Optional) Synchronize table and field comments. include-comments.enabled: true # (Optional) Prioritize distributing unbounded splits to prevent potential TaskManager OutOfMemory issues. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Prioritize distributing unbounded splits to prevent potential TaskManager OutOfMemory issues. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Enable parsing filters to accelerate reads. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: hologres name: Hologres Sink endpoint: ****.hologres.aliyuncs.com:80 dbname: cdcyaml_test username: ${secret_values.holo-username} password: ${secret_values.holo-password} sink.type-normalize-strategy: BROADEN route: # Merge and synchronize sharded user databases and tables to the my_user.users table. - source-table: user_db[0-9]+.user[0-9]+ sink-table: my_user.users
NoteAll tables in the MySQL tpc_ds database are directly mapped to downstream tables with the same names. Therefore, you do not need to configure mappings in the route module. If you want to synchronize the tables to a database with a different name, such as ods_tps_ds, you can configure the route module as follows:
route: # Merge and synchronize sharded user databases and tables to the my_user.users table. - source-table: user_db[0-9]+.user[0-9]+ sink-table: my_user.users # Uniformly change table names to synchronize all tables from the tpc_ds database to the ods_tps_ds database. - source-table: tpc_ds.\.* sink-table: ods_tps_ds.<> replace-symbol: <>
Step 2: Start the job
On the
page, click Deploy. In the dialog box, click Confirm.
In the navigation pane on the left, choose Start a job. . On the Job O&M page, find the target job and click Start in the Actions column. Configure the parameters. For more information, see
Click Start.
After the job starts, you can view its running information and status on the Job O&M page.
Step 3: Observe the full synchronization result
Log on to the Hologres console.
On the Metadata Management tab, you can view the 24 tables and their data in the tpc_ds database of the Hologres instance.
On the Metadata Management tab, you can view the schema of the users table in the my_user database.
The following figures show the table schema and data after synchronization.
Table schema
The schema of the users table has two more columns than the MySQL source tables: _db_name and _table_name. These columns represent the source database name and table name. They are part of the composite primary key to ensure data uniqueness after the sharded tables are merged.
Table data
In the upper-right corner of the user table information page, click Query Table. Enter the following command and click Run.
select * from users order by _db_name,_table_name,id;
The following figure shows the table data result.
Step 4: Observe the incremental synchronization result
The synchronization job automatically switches to the incremental data synchronization phase after the full data synchronization is complete. No manual intervention is required. You can check the value of the currentEmitEventTimeLag metric on the Monitoring and Alerts tab to determine the data synchronization phase.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column for the workspace, click Console.
In the left navigation pane, choose and click the name of the target job.
Click the Monitoring and Alerts (or Data Curves) tab.
Observe the currentEmitEventTimeLag curve to determine the data synchronization phase.
A value of 0 indicates that the job is in the full synchronization phase.
A value greater than 0 indicates that the job has entered the incremental synchronization phase.
Verify the real-time synchronization of data and schema changes.
The MySQL CDC data source supports real-time synchronization of data changes and schema changes during the incremental phase. After the job enters the incremental synchronization phase, you can modify the schema and data of the sharded user tables in MySQL to verify this capability.
Log in to the RDS for MySQL instance from DMS.
For more information, see Connect to an RDS for MySQL instance using DMS.
In the user_db2 database, run the following commands to modify the user02 table schema, and insert and update data.
USE DATABASE `user_db2`; ALTER TABLE `user02` ADD COLUMN `age` INT; -- Add the age column. INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Insert data with age. UPDATE `user05` SET name='JARK' WHERE id=15; -- Update another table, changing the name to uppercase.
In the Hologres console, you can view the schema and data changes in the users table.
In the upper-right corner of the users table page, click Query Table. Enter the following command and click Run.
select * from users order by _db_name,_table_name,id;
The following figure shows the resulting table data.
Although the schemas of the sharded tables are not identical, schema and data changes to the user02 table are synchronized to the downstream table in real time. In the Hologres users table, you can see the new age field, the inserted data for Tony, and the name JARK updated to uppercase.
(Optional) Step 5: Configure job resources
Depending on the data volume, you may need to adjust the concurrency and TaskManager resources for better job performance. You can use the resource configuration settings to adjust the job's concurrency, memory, or CU count.
In the navigation pane on the left, choose . Then, click the name of the target job.
On the Deployment Details tab, click Edit in the upper-right corner of the Resource Configuration section.
You can manually set resource parameters such as TaskManager Memory and concurrency.
On the right side of the Resource Configuration section, click Save.
Restart the job.
The configuration changes take effect only after the job is restarted.
References
For the syntax of data ingestion modules, see Flink CDC data ingestion job development reference.
If an exception occurs while a data ingestion job is running, see FAQ and solutions for data ingestion jobs.
Build a streaming lakehouse using Flink, Paimon, and StarRocks.