All Products
Search
Document Center

Realtime Compute for Apache Flink:Get started with dynamic Flink CEP

Last Updated:Mar 12, 2025

Realtime Compute for Apache Flink supports dynamic Flink complex event processing (CEP) in DataStream programs to update rules dynamically. This topic describes how to develop a Flink job that dynamically loads the latest rules to process upstream Kafka messages.

Use cases of dynamic Flink CEP

Flink CEP leverages Apache Flink's distributed, sub-millisecond latency data processing capabilities to detect complicated data patterns in data streams. It has a wide range of real-world applications, including:

  • Real-time risk control: Flink CEP can analyze user behavior logs to flag unusual customers, such as when they transfer a total of USD 10,000 in 10 transactions within a five-minute window.

  • Real-time marketing: By consuming user behavior logs, Flink CEP can offer insights for optimizing marketing strategies, such as identifying users who add over three items to their carts in 10 minutes but do not make a payment. Flink CEP is also useful for fraud detection in real-time marketing.

  • IoT: Flink CEP enables anomaly detection and alerting. For example, it can flag a shared bike that has been out of a specific area for over 15 minutes. Another example of Flink CEP is to identify assembly line anomalies in industrial production based on IoT sensor data. For example, when temperature data from sensors consistently exceeds a threshold over three consecutive time windows, alerts will be triggered.

Overview

This topic provides a practical guide to implementing dynamic Flink CEP. As an example, we will demonstrate how Flink consumes user clickstream data from Kafka, dynamically fetches rules from MySQL, and detects matching events. Upon finding a pattern match, the Flink job can trigger alerts or write the results to data stores. The data pipeline graph is as follows:

image

Initially, we will start the Flink job and insert Rule 1: Three events with action = 0, followed by an event with action != 1. This rule flags users who view a product page three times consecutively without making a payment. Then, Rule 1 is updated to Rule 2, which introduces a timeliness constraint: Three consecutive events with action = 0 occur within a 15-minute time window. The updated rule identifies users who repeatedly view the page of a product within 30 minutes without purchasing it.

Prerequisites

Procedure

The following sections describe how to develop a Flink program that looks for users whose actions match a pre-defined rule and how to dynamically update the rule. The procedure is as follows:

Step 1: Prepare test data

Create an upstream Kafka topic

  1. Log on to the ApsaraMQ for Kafka console.

  2. Create a topic named demo_topic to store simulated user behavior logs.

    For more information, see Step 1: Create a topic.

Prepare an ApsaraDB RDS for MySQL database

In the Data Management (DMS) console, prepare test data in an ApsaraDB RDS for MySQL database.

  1. Log on to the ApsaraDB RDS for MySQL instance by using a privileged account.

    For more information, see Log on to the RDS instance in the DMS console.

  2. Create a table named rds_demo to store rules and a table named match_results to receive found matches.

    Copy the following statements to the SQL editor and click Execute(F8):

    CREATE DATABASE cep_demo_db;
    USE cep_demo_db;
    
    CREATE TABLE rds_demo (
      `id` VARCHAR(64),
      `version` INT,
      `pattern` VARCHAR(4096),
      `function` VARCHAR(512)
    );
    
    CREATE TABLE match_results (
        rule_id INT,
        rule_version INT,
        user_id INT,
        user_name VARCHAR(255),
        production_id INT,
        PRIMARY KEY (rule_id,rule_version,user_id,production_id)
    );

    Each row of the rds_demo table represents a rule, consisting of four fields: id (a unique identifier for the rule), version, pattern, and function (defining how to process a matched event sequence.)

    Each row of the match_results table represents a found match, indicating a user's behavior aligning with a specific pattern. This record can empower the marketing team to make informed decisions, such as providing a coupon to the user.

Step 2: Configure an IP address whitelist

To enable your Flink workspace to access the ApsaraDB RDS for MySQL instance, you must add the CIDR block of the Flink workspace to the IP address whitelist of the ApsaraDB RDS for MySQL instance.

  1. Obtain the CIDR block of the vSwitch used by the Flink workspace.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the target workspace and choose More > Workspace Details in the Actions column.

    3. In the Workspace Details dialog box, copy the CIDR block of the vSwitch.

      网段信息

  2. Add the CIDR block to the IP address whitelist of the ApsaraDB RDS for MySQL instance.

    For more information, see Configure an IP address whitelist in the ApsaraDB RDS for MySQL documentation. RDS白名单

Step 3: Develop a dynamic CEP job

Note

All code files in this topic can be downloaded from the GitHub repository. For demonstration purposes, sample code in this topic has been modified on the timeOrMoreAndWindow branch. For the full code, download the ververica-cep-demo-master.zip package.

  1. Add the flink-cep dependency to your project's pom.xml file.

    For information about dependency configurations and conflict handling, see Configure environment dependencies for Apache Flink.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-cep</artifactId>
        <version>1.17-vvr-8.0.8</version>
        <scope>provided</scope>
    </dependency>
  2. Write the dynamic CEP program code.

    1. Create a Kafka source.

      For more information, see Kafka DataStream Connector.

    2. Define transformations using the CEP.dynamicPatterns() method.

      To enable dynamic rule modification and support multiple rules, Alibaba Cloud Realtime Compute for Apache Flink defines the CEP.dynamicPatterns() method:

      public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns(
               DataStream<T> input,
               PatternProcessorDiscovererFactory<T> discovererFactory,
               TimeBehaviour timeBehaviour,
               TypeInformation<R> outTypeInfo)

      The following table describes the required parameters. Replace the placeholder values with your actual configuration values.

      Parameter

      Description

      DataStream<T> input

      The input event stream.

      PatternProcessorDiscovererFactory<T> discovererFactory

      The factory object which constructs the PatternProcessorDiscoverer. The PatternProcessorDiscoverer retrieves the latest rule to construct a PatternProcessor interface.

      TimeBehaviour timeBehaviour

      The time attribute that defines how the Flink CEP job processes events. Valid values:

      • TimeBehaviour.ProcessingTime: events are processed based on the processing time.

      • TimeBehaviour.EventTime: events are processed based on the event time.

      TypeInformation<R> outTypeInfo

      The type information of the output stream.

      For information about concepts such as DataStream, TimeBehavior, and TypeInformation, see Flink DataStream API Programming Guide, Notions of Time: Event Time and Processing Time, and Class TypeInformation<T>.

      The PatternProcessor interface contains a Pattern that defines how to match events and a PatternProcessFunction that specifies how to process matched events, such as sending alerts. Additionally, it includes identification properties, such as id and version. For more information, see FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP).

      The PatternProcessorDiscovererFactory constructs the PatternProcessorDiscoverer that fetches the latest PatternProcessor. Realtime Compute for Apache Flink offers an abstract class that periodically scans external storage. The following code constructs a timer to regularly poll external storage to fetch the latest PatternProcessor:

      public abstract class PeriodicPatternProcessorDiscoverer<T>
              implements PatternProcessorDiscoverer<T> {
      
          ...
          @Override
          public void discoverPatternProcessorUpdates(
                  PatternProcessorManager<T> patternProcessorManager) {
              // Periodically discovers the pattern processor updates.
              timer.schedule(
                      new TimerTask() {
                          @Override
                          public void run() {
                              if (arePatternProcessorsUpdated()) {
                                  List<PatternProcessor<T>> patternProcessors = null;
                                  try {
                                      patternProcessors = getLatestPatternProcessors();
                                  } catch (Exception e) {
                                      e.printStackTrace();
                                  }
                                  patternProcessorManager.onPatternProcessorsUpdated(patternProcessors);
                              }
                          }
                      },
                      0,
                      intervalMillis);
          }
      
          ...
      }

      Realtime Compute for Apache Flink also offers the implementation of the JDBCPeriodicPatternProcessorDiscoverer to fetch the most recent rules from a JDBC database, such as ApsaraDB RDS or Hologres. The following table describes the required parameters:

      Parameter

      Description

      jdbcUrl

      The JDBC URL of the database.

      jdbcDriver

      The name of the database driver class.

      tableName

      The name of the table in the database.

      initialPatternProcessors

      The list of the initial PatternProcessors.

      intervalMillis

      The interval at which the database is polled.

      The following sample code provides an example of using the JDBCPeriodicPatternProcessorDiscoverer. Matching events are printed to the logs of the TaskManager.

      // import ......
      public class CepDemo {
      
          public static void main(String[] args) throws Exception {
      
              ......
              // DataStream Source
              DataStreamSource<Event> source =
                      env.fromSource(
                              kafkaSource,
                              WatermarkStrategy.<Event>forMonotonousTimestamps()
                                      .withTimestampAssigner((event, ts) -> event.getEventTime()),
                              "Kafka Source");
      
              env.setParallelism(1);
              // keyBy userId and productionId
              // Notes, only events with the same key will be processd to see if there is a match
              KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream =
                      source.assignTimestampsAndWatermarks(
                              WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5)))
                      ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() {
                          @Override
                          public Tuple2<Integer, Integer> getKey(Event value) throws Exception {
                              return Tuple2.of(value.getId(), value.getProductionId());
                          }
                      });
      
              SingleOutputStreamOperator<String> output =
                      CEP.dynamicPatterns(
                              keyedStream,
                              new JDBCPeriodicPatternProcessorDiscovererFactory<>(
                                      params.get(JDBC_URL_ARG),
                                      JDBC_DRIVE,
                                      params.get(TABLE_NAME_ARG),
                                      null,
                                      Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))),
                              Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ?  TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime,
                              TypeInformation.of(new TypeHint<String>() {}));
      
              output.print();
              // Compile and submit the job
              env.execute("CEPDemo");
          }
      }
      Note

      In the demo code, the input data stream is keyed by userId and productionId before the CEP.dynamicPatterns() is called. This way, Flink only looks for patterns in events with the same userId and productionId.

  3. In the Realtime Compute for Apache Flink console, upload the program JAR and create a JAR deployment.

    The following table describes the parameters that you need to configure when you create the deployment:

    Note

    To facilitate testing, download the cep-demo.jar and create a JAR deployment from it. As no data is stored in the upstream Kafka source and the rule table is empty, no output is returned after you start the deployment.

    Parameter

    Description

    Deployment Mode

    Select Stream Mode.

    Deployment Name

    Enter the name of the JAR deployment.

    Engine Version

    For more information about engine versions, see Engine version and Lifecycle policies. We recommend that you use a recommended version or a stable version. Engine versions are classified into the following types:

    • Recommended version: the latest minor version of the latest major version.

    • Stable version: the latest minor version of a major version that is still in the service period of the product. Defects in previous versions are fixed in such a version.

    • Normal version: other minor versions that are still in the service period of the product.

    • Deprecated version: the versions that exceed the service period of the product.

    JAR URL

    Upload your program JAR or the test cep-demo.jar.

    Entry Point Class

    Set the value to com.alibaba.ververica.cep.demo.CepDemo.

    Entry Point Main Arguments

    If you use your program JAR with the upstream and downstream systems configured, skip this field. If you use the test cep-demo.jar that we provided, copy the following code to the field:

    --kafkaBrokers YOUR_KAFKA_BROKERS 
    --inputTopic YOUR_KAFKA_TOPIC 
    --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP 
    --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD
    --tableName YOUR_TABLE_NAME  
    --jdbcIntervalMs 3000
    --usingEventTime false

    Replace placeholder values in the above code with the following:

    • kafkaBrokers: the addresses of your Kafka brokers.

    • inputTopic: the name of your Kafka topic.

    • inputTopicGroup: your Kafka consumer group.

    • jdbcUrl: the JDBC URL of your database.

      Note

      To connect to a database via JDBC URL, use a standard account with a password composed solely of letters and digits. You can use an authentication method for your deployment based on your business requirements.

    • tableName: the name of the destination table.

    • jdbcIntervalMs: the interval at which the database is polled.

    • usingEventTime: specifies whether to use event time. Valid values: true and false.

    Note
    • Replace the placeholder values with your actual configuration values.

    • Use variables rather than plaintext credentials in production. For more information, see Manage variables.

  4. On the Deployment tab of the Deployments page, click Edit in the Parameters section. Then, add the following parameters in the Other Configuration field.

    The flink-cep dependency relies on the AppClassLoader while aviator classes in the user JAR depends on the UserCodeClassLoader. To prevent loading failure and ensure AppClassLoader can access classes in the user JAR, add the following configurations.

    kubernetes.application-mode.classpath.include-user-jar: 'true' 
    classloader.resolve-order: parent-first

    For more information about how to configure the parameters in the Parameters section, see Parameters.

  5. Navigate to O&M > Deployments. Find the target deployment, and click Start in the Actions column.

    For more information, see Start a deployment.

Step 4: Add a rule

After the JAR deployment is started, add Rule 1: Three consecutive events with action = 0, followed by an event with action != 1. This event sequence indicates the user views the product page three times but does not make a payment.

  1. Log on to the ApsaraDB RDS for MySQL console.

  2. Add a rule.

    Concatenate the JSON string that defines a pattern with the id, version, and function fields, and then execute the INSERT INTO statement to insert data into the rule table in the ApsaraDB RDS for MySQL database.

    INSERT INTO rds_demo (
     `id`,
     `version`,
     `pattern`,
     `function`
    ) values(
      '1',
       1,
      '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}',
      'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction')
    ;

    To improve the readability of the pattern field in a database for your convenience, Realtime Compute for Apache Flink allows you to define a pattern in the JSON format. For more information, see Definitions of rules in the JSON format in dynamic Flink CEP. The value of the pattern field in the preceding SQL statement is an example of a JSON-formatted, serialized pattern string supported by Realtime Compute for Apache Flink. This pattern matches an event sequence where three consecutive events have action = 0, immediately followed by an event with action != 1.

    Note

    The EndCondition is defined in code to check for "action != 1".

    • Use the Pattern API to define the pattern:

      Pattern<Event, Event> pattern =
          Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
              .where(new StartCondition("action == 0"))
              .timesOrMore(3)
              .followedBy("end")
              .where(new EndCondition());
    • It can be converted to a JSON string using the convertPatternToJSONString method in CepJsonUtils.

      public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException {
          System.out.println(CepJsonUtils.convertPatternToJSONString(pattern));
      }
    • Define the pattern in JSON:

      JSON-formatted pattern

      {
        "name": "end",
        "quantifier": {
          "consumingStrategy": "SKIP_TILL_NEXT",
          "properties": [
            "SINGLE"
          ],
          "times": null,
          "untilCondition": null
        },
        "condition": null,
        "nodes": [
          {
            "name": "end",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "SINGLE"
              ],
              "times": null,
              "untilCondition": null
            },
            "condition": {
              "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
              "type": "CLASS"
            },
            "type": "ATOMIC"
          },
          {
            "name": "start",
            "quantifier": {
              "consumingStrategy": "SKIP_TILL_NEXT",
              "properties": [
                "LOOPING"
              ],
              "times": {
                "from": 3,
                "to": 3,
                "windowTime": null
              },
              "untilCondition": null
            },
            "condition": {
              "expression": "action == 0",
              "type": "AVIATOR"
            },
            "type": "ATOMIC"
          }
        ],
        "edges": [
          {
            "source": "start",
            "target": "end",
            "type": "SKIP_TILL_NEXT"
          }
        ],
        "window": null,
        "afterMatchStrategy": {
          "type": "SKIP_PAST_LAST_EVENT",
          "patternName": null
        },
        "type": "COMPOSITE",
        "version": 1
      }
  3. Use a Kafka client to send messages to the demo_topic topic.

    In this demo, you can also send test messages in the Start to Send and Consume Message panel of the demo_topic topic in the ApsaraMQ for Kafka console.

    1,Ken,0,1,1662022777000
    1,Ken,0,1,1662022778000
    1,Ken,0,1,1662022779000
    1,Ken,0,1,1662022780000

    发消息

    The following table describes the fields of messages in the demo_topic topic.

    Field

    Description

    id

    The user ID.

    username

    The username.

    action

    The user action. Valid values:

    • 0: the view operation.

    • 1: the purchase operation.

    product_id

    The product ID.

    event_time

    The event time when the action was performed.

  4. View the latest rule printed to the JobManager logs and the found matches output to the TaskManager logs.

    • In the JobManager logs, use JDBCPeriodicPatternProcessorDiscoverer as a keyword to search the latest rule.

      image

    • On the Running Task Managers subtab under the Logs tab, choose the log file with the .out suffix. Search A match for Pattern of (id, version): (1, 1) and view the log entry.

      image

  5. In the match_results table, execute SELECT * FROM `match_results` ; to query found matches.

    image

Step 5: Update the rule

A tailored marketing strategy usually takes timeliness into consideration. Therefore, Rule 2 is inserted: Three consecutive events with action = 0 occur within a 15-minute time window.

  1. Set usingEventTime to true.

    1. Go to O&M > Deployments. Find the target deployment, and click Cancel in the Actions column.

    2. Click the name of the deployment. In the deployment details page, select the Configuration tab. Click Edit in the upper-right corner of the Basic section. In the Entry Point Main Arguments field, set usingEventTime to true. Click Save.

    3. Start the deployment again.

  2. Insert a new rule.

    Use the Pattern API to define the pattern in Java:

    Pattern<Event, Event> pattern =
            Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent())
                    .where(new StartCondition("action == 0"))
                    .timesOrMore(3,Time.minutes(15))
                    .followedBy("end")
                    .where(new EndCondition());
    printTestPattern(pattern);

    Insert a new rule into the rds_demo table.

    # For demonstration purposes, Rule 1 is deleted.
    DELETE FROM `rds_demo` WHERE `id` = 1;
    
    # Insert Rule 2: Three consecutive events with action = 0 occur in a 15-minute window, followed by an event with action != 1. The rule version is (1,2) 
    INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
  3. Send eight simplified messages in the ApsaraMQ for Kafka console.

    Example:

    2,Tom,0,1,1739584800000   #10:00
    2,Tom,0,1,1739585400000   #10:10
    2,Tom,0,1,1739585700000   #10:15
    2,Tom,0,1,1739586000000   #10:20
    3,Ali,0,1,1739586600000   #10:30
    3,Ali,0,1,1739588400000   #11:00
    3,Ali,0,1,1739589000000   #11:10
    3,Ali,0,1,1739590200000   #11:30
  4. In the match_results table, execute SELECT * FROM `match_results` ; to query found matches.

    image

    The result indicates Tom's behavior matches the pre-defined pattern while Ali's does not. This is because Ali's clicks do not meet the 15-minute time constraint. With these insights, the marketing team can implement targeted interventions during limited-time sales. For example, they can issue coupons to users who repeatedly view a product page within a specific time frame to incentivize purchase.