All Products
Search
Document Center

Realtime Compute for Apache Flink:Quick start for real-time database ingestion

Last Updated:Aug 08, 2025

Realtime Compute for Apache Flink provides powerful features for real-time data ingestion. It simplifies the data ingestion pipeline with features such as automatic switching between full and incremental synchronization, automatic metadata discovery, automatic synchronization of schema evolution, and full database synchronization. These features make real-time data synchronization more efficient and convenient. This topic describes how to quickly build a data ingestion job from MySQL to Hologres.

Background information

For example, a MySQL instance has a tpc_ds database that contains 24 business tables with different table schemas. The instance also has three other databases: user_db1, user_db2, and user_db3. Because of database and table sharding, each of these three databases contains three tables with the same schema, for a total of nine tables named user01 to user09. The following figure shows the databases and tables in the MySQL instance as viewed in the Alibaba Cloud DMS console.数据库和表情况

To develop a data ingestion job that synchronizes these tables and their data to Hologres and merges the sharded user tables into a single table in Hologres, perform the following steps:

This topic uses the Flink CDC data ingestion job development (in public preview) feature to perform full database synchronization and merge sharded tables. This feature lets you complete full and incremental data synchronization, and real-time synchronization of schema evolution with a single job.

Prerequisites

Prepare MySQL test data and a Hologres database

  1. Click tpc_ds.sql, user_db1.sql, user_db2.sql, and user_db3.sql to download the test data to your local computer.

  2. In the DMS console, prepare the test data for the RDS for MySQL instance.

    1. Log on to the RDS for MySQL instance from DMS.

    2. In the SQL Console window, enter the following commands and click Execute.

      The following commands create the tpc_ds, user_db1, user_db2, and user_db3 databases.

      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. From the top shortcut menu, click Data Import.

    4. On the Batch Data Import tab, select the destination database and upload the SQL file. Click Submit Application and then Execute Change. In the dialog box that appears, click Confirm Execution.

      Repeat this operation to import the corresponding data files into the tpc_ds, user_db1, user_db2, and user_db3 databases.导入数据

  3. In the Hologres console, create the my_user database to store the data from the merged user tables.

    For more information, see Create a database.

Configure IP whitelists

To allow Flink to access the MySQL and Hologres instances, add the CIDR block of the Flink workspace to the IP whitelists of MySQL and Hologres.

  1. Obtain the CIDR block of the Flink workspace VPC.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column for the target Workspace, choose More > Workspace Details.

    3. In the Workspace Details dialog box, you can view the CIDR Block of the Flink virtual switch.

      网段信息

  2. Add the Flink CIDR block to the IP whitelist of the RDS for MySQL instance.

    For more information, see Configure an IP address whitelist.RDS白名单

  3. Add the Flink CIDR block to the IP whitelist of the Hologres instance.

    When you configure a data connection in HoloWeb, set the Logon Method to Password-free Logon for Current User to configure an IP whitelist for the current connection. For more information, see IP whitelists.Holo白名单

Step 1: Develop a data ingestion job

  1. Log on to the Flink development console and create a job.

    1. In the left navigation pane, go to Data Development > Data Ingestion. On the page that opens, click New.

    2. You can click Blank Data Ingestion Draft.

      Flink provides a rich set of code templates. Each template provides specific scenarios, code samples, and instructions. You can click a template to quickly understand Flink features and related syntax, and then implement your business logic.

    3. Click Next.

    4. In the New Data Ingestion Job Draft dialog box, configure the job parameters.

      Job Parameter

      Description

      Example

      File Name

      The name of the job.

      Note

      The job name must be unique in the current project.

      flink-test

      Location

      The folder where the code file for the job is stored.

      You can also click the 新建文件夹 icon next to an existing folder to create a subfolder.

      Job Drafts

      Engine Version

      The Flink engine version used by the current job. For more information about engine version numbers, version compatibility, and lifecycle milestones, see Engine versions.

      vvr-11.1-jdk11-flink-1.20

    5. Click OK.

  2. Copy the following job code to the editor.

    The following sample code synchronizes all tables from the tpc_ds database to Hologres and merges the sharded user tables into a single table in Hologres.

    source:
      type: mysql
      name: MySQL Source
      hostname: localhost
      port: 3306
      username: username
      password: password
      tables: tpc_ds.\.*,user_db[0-9]+.user[0-9]+
      server-id: 8601-8604
      # (Optional) Synchronize table and field comments.
      include-comments.enabled: true
      # (Optional) Prioritize distributing unbounded splits to prevent potential TaskManager OutOfMemory issues.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Prioritize distributing unbounded splits to prevent potential TaskManager OutOfMemory issues.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Enable parsing filters to accelerate reads.
      scan.only.deserialize.captured.tables.changelog.enabled: true  
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: ****.hologres.aliyuncs.com:80
      dbname: cdcyaml_test
      username: ${secret_values.holo-username}
      password: ${secret_values.holo-password}
      sink.type-normalize-strategy: BROADEN
      
    route:
      # Merge and synchronize sharded user databases and tables to the my_user.users table.
      - source-table: user_db[0-9]+.user[0-9]+
        sink-table: my_user.users
    Note

    All tables in the MySQL tpc_ds database are directly mapped to downstream tables with the same names. Therefore, you do not need to configure mappings in the route module. If you want to synchronize the tables to a database with a different name, such as ods_tps_ds, you can configure the route module as follows:

    route:
      # Merge and synchronize sharded user databases and tables to the my_user.users table.
      - source-table: user_db[0-9]+.user[0-9]+
        sink-table: my_user.users
      # Uniformly change table names to synchronize all tables from the tpc_ds database to the ods_tps_ds database.
      - source-table: tpc_ds.\.*
        sink-table: ods_tps_ds.<>
        replace-symbol: <>

Step 2: Start the job

  1. On the Data Development > Data Ingestion page, click Deploy. In the dialog box, click Confirm.部署

  2. In the navigation pane on the left, choose Operation Center > Job O&M. On the Job O&M page, find the target job and click Start in the Actions column. Configure the parameters. For more information, see Start a job.

  3. Click Start.

    After the job starts, you can view its running information and status on the Job O&M page.作业状态

Step 3: Observe the full synchronization result

  1. Log on to the Hologres console.

  2. On the Metadata Management tab, you can view the 24 tables and their data in the tpc_ds database of the Hologres instance.

    holo表数据

  3. On the Metadata Management tab, you can view the schema of the users table in the my_user database.

    The following figures show the table schema and data after synchronization.

    • Table schema表结构

      The schema of the users table has two more columns than the MySQL source tables: _db_name and _table_name. These columns represent the source database name and table name. They are part of the composite primary key to ensure data uniqueness after the sharded tables are merged.

    • Table data

      In the upper-right corner of the user table information page, click Query Table. Enter the following command and click Run.

      select * from users order by _db_name,_table_name,id;

      The following figure shows the table data result.表数据

Step 4: Observe the incremental synchronization result

The synchronization job automatically switches to the incremental data synchronization phase after the full data synchronization is complete. No manual intervention is required. You can check the value of the currentEmitEventTimeLag metric on the Monitoring and Alerts tab to determine the data synchronization phase.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. In the Actions column for the workspace, click Console.

  3. In the left navigation pane, choose Operation Center > Job O&M and click the name of the target job.

  4. Click the Monitoring and Alerts (or Data Curves) tab.

  5. Observe the currentEmitEventTimeLag curve to determine the data synchronization phase.

    数据曲线

    • A value of 0 indicates that the job is in the full synchronization phase.

    • A value greater than 0 indicates that the job has entered the incremental synchronization phase.

  6. Verify the real-time synchronization of data and schema changes.

    The MySQL CDC data source supports real-time synchronization of data changes and schema changes during the incremental phase. After the job enters the incremental synchronization phase, you can modify the schema and data of the sharded user tables in MySQL to verify this capability.

    1. Log in to the RDS for MySQL instance from DMS.

    2. In the user_db2 database, run the following commands to modify the user02 table schema, and insert and update data.

      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- Add the age column.
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- Insert data with age.
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- Update another table, changing the name to uppercase.
    3. In the Hologres console, you can view the schema and data changes in the users table.

      In the upper-right corner of the users table page, click Query Table. Enter the following command and click Run.

      select * from users order by _db_name,_table_name,id;

      The following figure shows the resulting table data.表结构和数据变化Although the schemas of the sharded tables are not identical, schema and data changes to the user02 table are synchronized to the downstream table in real time. In the Hologres users table, you can see the new age field, the inserted data for Tony, and the name JARK updated to uppercase.

(Optional) Step 5: Configure job resources

Depending on the data volume, you may need to adjust the concurrency and TaskManager resources for better job performance. You can use the resource configuration settings to adjust the job's concurrency, memory, or CU count.

  1. In the navigation pane on the left, choose Operation Center > Job O&M. Then, click the name of the target job.

  2. On the Deployment Details tab, click Edit in the upper-right corner of the Resource Configuration section.

  3. You can manually set resource parameters such as TaskManager Memory and concurrency.

  4. On the right side of the Resource Configuration section, click Save.

  5. Restart the job.

    The configuration changes take effect only after the job is restarted.

References