Realtime Compute for Apache Flink supports dynamic Flink complex event processing (CEP) in DataStream programs to update rules dynamically. This topic describes how to develop a Flink job that dynamically loads the latest rules to process upstream Kafka messages.
Use cases of dynamic Flink CEP
Flink CEP leverages Apache Flink's distributed, sub-millisecond latency data processing capabilities to detect complicated data patterns in data streams. It has a wide range of real-world applications, including:
Real-time risk control: Flink CEP can analyze user behavior logs to flag unusual customers, such as when they transfer a total of USD 10,000 in 10 transactions within a five-minute window.
Real-time marketing: By consuming user behavior logs, Flink CEP can offer insights for optimizing marketing strategies, such as identifying users who add over three items to their carts in 10 minutes but do not make a payment. Flink CEP is also useful for fraud detection in real-time marketing.
IoT: Flink CEP enables anomaly detection and alerting. For example, it can flag a shared bike that has been out of a specific area for over 15 minutes. Another example of Flink CEP is to identify assembly line anomalies in industrial production based on IoT sensor data. For example, when temperature data from sensors consistently exceeds a threshold over three consecutive time windows, alerts will be triggered.
Overview
This topic provides a practical guide to implementing dynamic Flink CEP. As an example, we will demonstrate how Flink consumes user clickstream data from Kafka, dynamically fetches rules from MySQL, and detects matching events. Upon finding a pattern match, the Flink job can trigger alerts or write the results to data stores. The data pipeline graph is as follows:
Initially, we will start the Flink job and insert Rule 1: Three events with action = 0, followed by an event with action != 1. This rule flags users who view a product page three times consecutively without making a payment. Then, Rule 1 is updated to Rule 2, which introduces a timeliness constraint: Three consecutive events with action = 0 occur within a 15-minute time window. The updated rule identifies users who repeatedly view the page of a product within 30 minutes without purchasing it.
Prerequisites
A Realtime Compute for Apache Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
A RAM user or RAM role has the required permissions to access the console of Realtime Compute for Apache Flink. For more information, see Permission management.
Upstream and downstream storage
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
An ApsaraMQ for Kafka instance is created. For more information, see Overview of ApsaraMQ for Kafka.
Procedure
The following sections describe how to develop a Flink program that looks for users whose actions match a pre-defined rule and how to dynamically update the rule. The procedure is as follows:
Step 1: Prepare test data
Create an upstream Kafka topic
Log on to the ApsaraMQ for Kafka console.
Create a topic named demo_topic to store simulated user behavior logs.
For more information, see Step 1: Create a topic.
Prepare an ApsaraDB RDS for MySQL database
In the Data Management (DMS) console, prepare test data in an ApsaraDB RDS for MySQL database.
Log on to the ApsaraDB RDS for MySQL instance by using a privileged account.
For more information, see Log on to the RDS instance in the DMS console.
Create a table named rds_demo to store rules and a table named match_results to receive found matches.
Copy the following statements to the SQL editor and click Execute(F8):
CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) ); CREATE TABLE match_results ( rule_id INT, rule_version INT, user_id INT, user_name VARCHAR(255), production_id INT, PRIMARY KEY (rule_id,rule_version,user_id,production_id) );
Each row of the rds_demo table represents a rule, consisting of four fields: id (a unique identifier for the rule), version, pattern, and function (defining how to process a matched event sequence.)
Each row of the match_results table represents a found match, indicating a user's behavior aligning with a specific pattern. This record can empower the marketing team to make informed decisions, such as providing a coupon to the user.
Step 2: Configure an IP address whitelist
To enable your Flink workspace to access the ApsaraDB RDS for MySQL instance, you must add the CIDR block of the Flink workspace to the IP address whitelist of the ApsaraDB RDS for MySQL instance.
Obtain the CIDR block of the vSwitch used by the Flink workspace.
Log on to the Realtime Compute for Apache Flink console.
Find the target workspace and choose
in the Actions column.In the Workspace Details dialog box, copy the CIDR block of the vSwitch.
Add the CIDR block to the IP address whitelist of the ApsaraDB RDS for MySQL instance.
For more information, see Configure an IP address whitelist in the ApsaraDB RDS for MySQL documentation.
Step 3: Develop a dynamic CEP job
All code files in this topic can be downloaded from the GitHub repository. For demonstration purposes, sample code in this topic has been modified on the timeOrMoreAndWindow branch. For the full code, download the ververica-cep-demo-master.zip package.
Add the flink-cep dependency to your project's pom.xml file.
For information about dependency configurations and conflict handling, see Configure environment dependencies for Apache Flink.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-cep</artifactId> <version>1.17-vvr-8.0.8</version> <scope>provided</scope> </dependency>
Write the dynamic CEP program code.
Create a Kafka source.
For more information, see Kafka DataStream Connector.
Define transformations using the CEP.dynamicPatterns() method.
To enable dynamic rule modification and support multiple rules, Alibaba Cloud Realtime Compute for Apache Flink defines the CEP.dynamicPatterns() method:
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns( DataStream<T> input, PatternProcessorDiscovererFactory<T> discovererFactory, TimeBehaviour timeBehaviour, TypeInformation<R> outTypeInfo)
The following table describes the required parameters. Replace the placeholder values with your actual configuration values.
Parameter
Description
DataStream<T> input
The input event stream.
PatternProcessorDiscovererFactory<T> discovererFactory
The factory object which constructs the PatternProcessorDiscoverer. The PatternProcessorDiscoverer retrieves the latest rule to construct a PatternProcessor interface.
TimeBehaviour timeBehaviour
The time attribute that defines how the Flink CEP job processes events. Valid values:
TimeBehaviour.ProcessingTime: events are processed based on the processing time.
TimeBehaviour.EventTime: events are processed based on the event time.
TypeInformation<R> outTypeInfo
The type information of the output stream.
For information about concepts such as DataStream, TimeBehavior, and TypeInformation, see Flink DataStream API Programming Guide, Notions of Time: Event Time and Processing Time, and Class TypeInformation<T>.
The PatternProcessor interface contains a Pattern that defines how to match events and a PatternProcessFunction that specifies how to process matched events, such as sending alerts. Additionally, it includes identification properties, such as id and version. For more information, see FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP).
The PatternProcessorDiscovererFactory constructs the PatternProcessorDiscoverer that fetches the latest PatternProcessor. Realtime Compute for Apache Flink offers an abstract class that periodically scans external storage. The following code constructs a timer to regularly poll external storage to fetch the latest PatternProcessor:
public abstract class PeriodicPatternProcessorDiscoverer<T> implements PatternProcessorDiscoverer<T> { ... @Override public void discoverPatternProcessorUpdates( PatternProcessorManager<T> patternProcessorManager) { // Periodically discovers the pattern processor updates. timer.schedule( new TimerTask() { @Override public void run() { if (arePatternProcessorsUpdated()) { List<PatternProcessor<T>> patternProcessors = null; try { patternProcessors = getLatestPatternProcessors(); } catch (Exception e) { e.printStackTrace(); } patternProcessorManager.onPatternProcessorsUpdated(patternProcessors); } } }, 0, intervalMillis); } ... }
Realtime Compute for Apache Flink also offers the implementation of the JDBCPeriodicPatternProcessorDiscoverer to fetch the most recent rules from a JDBC database, such as ApsaraDB RDS or Hologres. The following table describes the required parameters:
Parameter
Description
jdbcUrl
The JDBC URL of the database.
jdbcDriver
The name of the database driver class.
tableName
The name of the table in the database.
initialPatternProcessors
The list of the initial PatternProcessors.
intervalMillis
The interval at which the database is polled.
The following sample code provides an example of using the JDBCPeriodicPatternProcessorDiscoverer. Matching events are printed to the logs of the TaskManager.
// import ...... public class CepDemo { public static void main(String[] args) throws Exception { ...... // DataStream Source DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Kafka Source"); env.setParallelism(1); // keyBy userId and productionId // Notes, only events with the same key will be processd to see if there is a match KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream = source.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forGenerator(ctx -> new EventBoundedOutOfOrdernessWatermarks(Duration.ofSeconds(5))) ).keyBy(new KeySelector<Event, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> getKey(Event value) throws Exception { return Tuple2.of(value.getId(), value.getProductionId()); } }); SingleOutputStreamOperator<String> output = CEP.dynamicPatterns( keyedStream, new JDBCPeriodicPatternProcessorDiscovererFactory<>( params.get(JDBC_URL_ARG), JDBC_DRIVE, params.get(TABLE_NAME_ARG), null, Long.parseLong(params.get(JDBC_INTERVAL_MILLIS_ARG))), Boolean.parseBoolean(params.get(USING_EVENT_TIME)) ? TimeBehaviour.EventTime : TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint<String>() {})); output.print(); // Compile and submit the job env.execute("CEPDemo"); } }
NoteIn the demo code, the input data stream is keyed by userId and productionId before the
CEP.dynamicPatterns()
is called. This way, Flink only looks for patterns in events with the same userId and productionId.
In the Realtime Compute for Apache Flink console, upload the program JAR and create a JAR deployment.
The following table describes the parameters that you need to configure when you create the deployment:
NoteTo facilitate testing, download the cep-demo.jar and create a JAR deployment from it. As no data is stored in the upstream Kafka source and the rule table is empty, no output is returned after you start the deployment.
Parameter
Description
Deployment Mode
Select Stream Mode.
Deployment Name
Enter the name of the JAR deployment.
Engine Version
For more information about engine versions, see Engine version and Lifecycle policies. We recommend that you use a recommended version or a stable version. Engine versions are classified into the following types:
Recommended version: the latest minor version of the latest major version.
Stable version: the latest minor version of a major version that is still in the service period of the product. Defects in previous versions are fixed in such a version.
Normal version: other minor versions that are still in the service period of the product.
Deprecated version: the versions that exceed the service period of the product.
JAR URL
Upload your program JAR or the test cep-demo.jar.
Entry Point Class
Set the value to
com.alibaba.ververica.cep.demo.CepDemo
.Entry Point Main Arguments
If you use your program JAR with the upstream and downstream systems configured, skip this field. If you use the test cep-demo.jar that we provided, copy the following code to the field:
--kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000 --usingEventTime false
Replace placeholder values in the above code with the following:
kafkaBrokers: the addresses of your Kafka brokers.
inputTopic: the name of your Kafka topic.
inputTopicGroup: your Kafka consumer group.
jdbcUrl: the JDBC URL of your database.
NoteTo connect to a database via JDBC URL, use a standard account with a password composed solely of letters and digits. You can use an authentication method for your deployment based on your business requirements.
tableName: the name of the destination table.
jdbcIntervalMs: the interval at which the database is polled.
usingEventTime: specifies whether to use event time. Valid values: true and false.
NoteReplace the placeholder values with your actual configuration values.
Use variables rather than plaintext credentials in production. For more information, see Manage variables.
On the Deployment tab of the Deployments page, click Edit in the Parameters section. Then, add the following parameters in the Other Configuration field.
The
flink-cep
dependency relies on the AppClassLoader whileaviator
classes in the user JAR depends on the UserCodeClassLoader. To prevent loading failure and ensure AppClassLoader can access classes in the user JAR, add the following configurations.kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-first
For more information about how to configure the parameters in the Parameters section, see Parameters.
Navigate to
. Find the target deployment, and click Start in the Actions column.For more information, see Start a deployment.
Step 4: Add a rule
After the JAR deployment is started, add Rule 1: Three consecutive events with action = 0, followed by an event with action != 1. This event sequence indicates the user views the product page three times but does not make a payment.
Log on to the ApsaraDB RDS for MySQL console.
Add a rule.
Concatenate the JSON string that defines a pattern with the id, version, and function fields, and then execute the INSERT INTO statement to insert data into the rule table in the ApsaraDB RDS for MySQL database.
INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;
To improve the readability of the pattern field in a database for your convenience, Realtime Compute for Apache Flink allows you to define a pattern in the JSON format. For more information, see Definitions of rules in the JSON format in dynamic Flink CEP. The value of the pattern field in the preceding SQL statement is an example of a JSON-formatted, serialized pattern string supported by Realtime Compute for Apache Flink. This pattern matches an event sequence where three consecutive events have action = 0, immediately followed by an event with action != 1.
NoteThe EndCondition is defined in code to check for "action != 1".
Use the Pattern API to define the pattern:
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());
It can be converted to a JSON string using the convertPatternToJSONString method in CepJsonUtils.
public void printTestPattern(Pattern<?, ?> pattern) throws JsonProcessingException { System.out.println(CepJsonUtils.convertPatternToJSONString(pattern)); }
Define the pattern in JSON:
Use a Kafka client to send messages to the demo_topic topic.
In this demo, you can also send test messages in the Start to Send and Consume Message panel of the demo_topic topic in the ApsaraMQ for Kafka console.
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000
The following table describes the fields of messages in the demo_topic topic.
Field
Description
id
The user ID.
username
The username.
action
The user action. Valid values:
0: the view operation.
1: the purchase operation.
product_id
The product ID.
event_time
The event time when the action was performed.
View the latest rule printed to the JobManager logs and the found matches output to the TaskManager logs.
In the JobManager logs, use JDBCPeriodicPatternProcessorDiscoverer as a keyword to search the latest rule.
On the Running Task Managers subtab under the Logs tab, choose the log file with the .out suffix. Search
A match for Pattern of (id, version): (1, 1)
and view the log entry.
In the match_results table, execute
SELECT * FROM `match_results` ;
to query found matches.
Step 5: Update the rule
A tailored marketing strategy usually takes timeliness into consideration. Therefore, Rule 2 is inserted: Three consecutive events with action = 0 occur within a 15-minute time window.
Set
usingEventTime
totrue
.Go to
. Find the target deployment, and click Cancel in the Actions column.Click the name of the deployment. In the deployment details page, select the Configuration tab. Click Edit in the upper-right corner of the Basic section. In the Entry Point Main Arguments field, set
usingEventTime
totrue
. Click Save.Start the deployment again.
Insert a new rule.
Use the Pattern API to define the pattern in Java:
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3,Time.minutes(15)) .followedBy("end") .where(new EndCondition()); printTestPattern(pattern);
Insert a new rule into the rds_demo table.
# For demonstration purposes, Rule 1 is deleted. DELETE FROM `rds_demo` WHERE `id` = 1; # Insert Rule 2: Three consecutive events with action = 0 occur in a 15-minute window, followed by an event with action != 1. The rule version is (1,2) INSERT INTO rds_demo (`id`,`version`,`pattern`,`function`) values('1',2,'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":{"unit":"MINUTES","size":15}},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
Send eight simplified messages in the ApsaraMQ for Kafka console.
Example:
2,Tom,0,1,1739584800000 #10:00 2,Tom,0,1,1739585400000 #10:10 2,Tom,0,1,1739585700000 #10:15 2,Tom,0,1,1739586000000 #10:20 3,Ali,0,1,1739586600000 #10:30 3,Ali,0,1,1739588400000 #11:00 3,Ali,0,1,1739589000000 #11:10 3,Ali,0,1,1739590200000 #11:30
In the match_results table, execute
SELECT * FROM `match_results` ;
to query found matches.The result indicates Tom's behavior matches the pre-defined pattern while Ali's does not. This is because Ali's clicks do not meet the 15-minute time constraint. With these insights, the marketing team can implement targeted interventions during limited-time sales. For example, they can issue coupons to users who repeatedly view a product page within a specific time frame to incentivize purchase.