This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Model Inference #
StreamingFlink SQL provides the ML_PREDICT
table-valued function (TVF) to perform model inference in SQL queries. This function allows you to apply machine learning models to your data streams directly in SQL.
See Model Creation about how to create a model.
ML_PREDICT Function #
The ML_PREDICT
function takes a table input, applies a model to it, and returns a new table with the model’s predictions. The function offers support for synchronous/asynchronous inference modes when the underlying model permits both.
Syntax #
SELECT * FROM
ML_PREDICT(
TABLE input_table,
MODEL model_name,
DESCRIPTOR(feature_columns),
[CONFIG => MAP['key', 'value']]
)
Parameters #
input_table
: The input table containing the data to be processedmodel_name
: The name of the model to use for inferencefeature_columns
: A descriptor specifying which columns from the input table should be used as features for the modelconfig
: (Optional) A map of configuration options for the model inference
Configuration Options #
The following configuration options can be specified in the config map:
Key | Default | Type | Description |
---|---|---|---|
async |
(none) | Boolean | Value can be 'true' or 'false' to suggest the planner choose the corresponding predict function. If the backend predict function provider does not support the suggested mode, it will throw exception to notify users. |
max-concurrent-operations |
(none) | Integer | The max number of async i/o operation that the async ml predict can trigger. |
output-mode |
(none) | Enum |
Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used. Possible values:
|
timeout |
(none) | Duration | Timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover. |
Example #
-- Basic usage
SELECT * FROM ML_PREDICT(
TABLE input_table,
MODEL my_model,
DESCRIPTOR(feature1, feature2)
);
-- With configuration options
SELECT * FROM ML_PREDICT(
TABLE input_table,
MODEL my_model,
DESCRIPTOR(feature1, feature2),
MAP['async', 'true', 'timeout', '100s']
);
-- Using named parameters
SELECT * FROM ML_PREDICT(
INPUT => TABLE input_table,
MODEL => MODEL my_model,
ARGS => DESCRIPTOR(feature1, feature2),
CONFIG => MAP['async', 'true']
);
Output #
The output table contains all columns from the input table plus the model’s prediction columns. The prediction columns are added based on the model’s output schema.
Notes #
- The model must be registered in the catalog before it can be used with
ML_PREDICT
. - The number of feature columns specified in the descriptor must match the model’s input schema.
- If column names in the output conflict with existing column names in the input table, an index will be added to the output column names to avoid conflicts. For example, if the output column is named
prediction
, it will be renamed toprediction0
if a column with that name already exists in the input table. - For asynchronous inference, the model provider must support the
AsyncPredictRuntimeProvider
interface. ML_PREDICT
only supports append-only tables. CDC (Change Data Capture) tables are not supported becauseML_PREDICT
results are non-deterministic.
Model Provider #
The ML_PREDICT
function uses a ModelProvider
to perform the actual model inference. The provider is looked up based on the provider identifier specified when registering the model. There are two types of model providers:
-
PredictRuntimeProvider
: For synchronous model inference- Implements the
createPredictFunction
method to create a synchronous prediction function - Used when
async
is set tofalse
in the config
- Implements the
-
AsyncPredictRuntimeProvider
: For asynchronous model inference- Implements the
createAsyncPredictFunction
method to create an asynchronous prediction function - Used when
async
is set totrue
in the config - Requires additional configuration for timeout and buffer capacity
- Implements the
If async
is not set in the config, the system will pick either sync or async model provider and prefer async model provider if both exist.
Error Handling #
The function will throw an exception in the following cases:
- The model does not exist in the catalog
- The number of feature columns does not match the model’s input schema
- The model parameter is missing
- Too few or too many arguments are provided
Performance Considerations #
- For high-throughput scenarios, consider using asynchronous inference mode.
- Configure appropriate timeout and buffer capacity values for asynchronous inference.
- The function’s performance depends on the underlying model provider implementation.