Navigation

  • next
  • previous |
  • PyFlink 1.12.dev0 documentation »
  • pyflink package »

pyflink.table package¶

Module contents¶

Important classes of Flink Table API:

  • pyflink.table.TableEnvironment Main entry point for Table and SQL functionality

  • pyflink.table.Table The core component of the Table API. Use the methods of Table to transform data.

  • pyflink.table.TableConfig A config to define the runtime behavior of the Table API. It is necessary when creating TableEnvironment.

  • pyflink.table.EnvironmentSettings Defines all parameters that initialize a table environment.

  • pyflink.table.TableSource Defines an external data source as a table.

  • pyflink.table.TableSink Specifies how to emit a table to an external system or location.

  • pyflink.table.DataTypes Defines a list of data types available.

  • pyflink.table.Row A row in a Table.

  • pyflink.table.Expression A column expression in a Table.

  • pyflink.table.window Helper classes for working with pyflink.table.window.GroupWindow (pyflink.table.window.Tumble, pyflink.table.window.Session, pyflink.table.window.Slide) and pyflink.table.window.OverWindow window (pyflink.table.window.Over).

  • pyflink.table.descriptors Helper classes that describes DDL information, such as how to connect to another system, the format of data, the schema of table, the event time attribute in the schema, etc.

  • pyflink.table.catalog Responsible for reading and writing metadata such as database/table/views/UDFs from a registered pyflink.table.catalog.Catalog.

  • pyflink.table.TableSchema Represents a table’s structure with field names and data types.

  • pyflink.table.FunctionContext Used to obtain global runtime information about the context in which the user-defined function is executed, such as the metric group, and global job parameters, etc.

  • pyflink.table.ScalarFunction Base interface for user-defined scalar function.

  • pyflink.table.TableFunction Base interface for user-defined table function.

  • pyflink.table.AggregateFunction Base interface for user-defined aggregate function.

  • pyflink.table.StatementSet Base interface accepts DML statements or Tables.

class pyflink.table.AggregateFunction[source]¶

Bases: pyflink.table.udf.UserDefinedFunction, typing.Generic

Base interface for user-defined aggregate function. A user-defined aggregate function maps scalar values of multiple rows to a new scalar value.

New in version 1.12.0.

abstract accumulate(accumulator: ACC, *args)[source]¶

Processes the input values and updates the provided accumulator instance.

Parameters
  • accumulator – the accumulator which contains the current aggregated results

  • args – the input value (usually obtained from new arrived data)

abstract create_accumulator() → ACC[source]¶

Creates and initializes the accumulator for this AggregateFunction.

Returns

the accumulator with the initial value

get_accumulator_type() → pyflink.table.types.DataType[source]¶

Returns the DataType of the AggregateFunction’s accumulator.

Returns

The DataType of the AggregateFunction’s accumulator.

get_result_type() → pyflink.table.types.DataType[source]¶

Returns the DataType of the AggregateFunction’s result.

Returns

The DataType of the AggregateFunction’s result.

abstract get_value(accumulator: ACC) → T[source]¶

Called every time when an aggregation result should be materialized. The returned value could be either an early and incomplete result (periodically emitted as data arrives) or the final result of the aggregation.

Parameters

accumulator – the accumulator which contains the current intermediate results

Returns

the aggregation result

merge(accumulator: ACC, accumulators)[source]¶

Merges a group of accumulator instances into one accumulator instance. This method must be implemented for unbounded session window grouping aggregates and bounded grouping aggregates.

Parameters
  • accumulator – the accumulator which will keep the merged aggregate results. It should be noted that the accumulator may contain the previous aggregated results. Therefore user should not replace or clean this instance in the custom merge method.

  • accumulators – a group of accumulators that will be merged.

retract(accumulator: ACC, *args)[source]¶

Retracts the input values from the accumulator instance.The current design assumes the inputs are the values that have been previously accumulated.

Parameters
  • accumulator – the accumulator which contains the current aggregated results

  • args – the input value (usually obtained from new arrived data).

class pyflink.table.BatchTableEnvironment(j_tenv)[source]¶

Bases: pyflink.table.table_environment.TableEnvironment

connect(connector_descriptor: pyflink.table.descriptors.ConnectorDescriptor) → Union[pyflink.table.descriptors.BatchTableDescriptor, pyflink.table.descriptors.StreamTableDescriptor][source]¶

Creates a temporary table from a descriptor.

Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.

The following example shows how to read from a connector using a JSON format and registering a temporary table as “MyTable”:

>>> table_env \
...     .connect(ExternalSystemXYZ()
...              .version("0.11")) \
...     .with_format(Json()
...                  .json_schema("{...}")
...                  .fail_on_missing_field(False)) \
...     .with_schema(Schema()
...                  .field("user-name", "VARCHAR")
...                  .from_origin_field("u_name")
...                  .field("count", "DECIMAL")) \
...     .create_temporary_table("MyTable")
Parameters

connector_descriptor – Connector descriptor describing the external system.

Returns

A BatchTableDescriptor or a StreamTableDescriptor (for blink planner) used to build the temporary table.

Note

Deprecated in 1.11. Use execute_sql() to register a table instead.

static create(execution_environment: pyflink.dataset.execution_environment.ExecutionEnvironment = None, table_config: pyflink.table.table_config.TableConfig = None, environment_settings: pyflink.table.environment_settings.EnvironmentSettings = None) → pyflink.table.table_environment.BatchTableEnvironment[source]¶

Creates a BatchTableEnvironment.

Example:

# create with ExecutionEnvironment.
>>> env = ExecutionEnvironment.get_execution_environment()
>>> table_env = BatchTableEnvironment.create(env)
# create with ExecutionEnvironment and TableConfig.
>>> table_config = TableConfig()
>>> table_config.set_null_check(False)
>>> table_env = BatchTableEnvironment.create(env, table_config)
# create with EnvironmentSettings.
>>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \
...     .use_blink_planner().build()
>>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings)
Parameters
  • execution_environment – The batch ExecutionEnvironment of the TableEnvironment.

  • table_config – The configuration of the TableEnvironment, optional.

  • environment_settings – The environment settings used to instantiate the TableEnvironment. It provides the interfaces about planner selection(flink or blink), optional.

Returns

The BatchTableEnvironment created from given ExecutionEnvironment and configuration.

class pyflink.table.CsvTableSink(field_names, field_types, path, field_delimiter=', ', num_files=-1, write_mode=None)[source]¶

Bases: pyflink.table.sinks.TableSink

A simple TableSink to emit data as CSV files.

Example:

>>> CsvTableSink(["a", "b"], [DataTypes.INT(), DataTypes.STRING()],
...              "/csv/file/path", "|", 1, WriteMode.OVERWRITE)
Parameters
  • field_names – The list of field names.

  • field_types – The list of field data types.

  • path – The output path to write the Table to.

  • field_delimiter – The field delimiter.

  • num_files – The number of files to write to.

  • write_mode – The write mode to specify whether existing files are overwritten or not, which contains: WriteMode.NO_OVERWRITE and WriteMode.OVERWRITE.

class pyflink.table.CsvTableSource(source_path, field_names, field_types, field_delim=None, line_delim=None, quote_character=None, ignore_first_line=None, ignore_comments=None, lenient=None, empty_column_as_null=None)[source]¶

Bases: pyflink.table.sources.TableSource

A TableSource for simple CSV files with a (logically) unlimited number of fields.

Example:

>>> CsvTableSource("/csv/file/path", ["a", "b"], [DataTypes.INT(), DataTypes.STRING()])
Parameters
  • source_path (str) – The path to the CSV file.

  • field_names (collections.Iterable[str]) – The names of the table fields.

  • field_types (collections.Iterable[str]) – The types of the table fields.

  • field_delim (str, optional) – The field delimiter, “,” by default.

  • line_delim (str, optional) – The row delimiter, “n” by default.

  • quote_character (str, optional) – An optional quote character for String values, null by default.

  • ignore_first_line (bool, optional) – Flag to ignore the first line, false by default.

  • ignore_comments (str, optional) – An optional prefix to indicate comments, null by default.

  • lenient (bool, optional) – Flag to skip records with parse error instead to fail, false by default.

  • empty_column_as_null (bool, optional) – Treat empty column as null, false by default.

class pyflink.table.DataTypes[source]¶

Bases: object

A DataType can be used to declare input and/or output types of operations. This class enumerates all supported data types of the Table & SQL API.

static ARRAY(element_type: pyflink.table.types.DataType, nullable: bool = True) → pyflink.table.types.ArrayType[source]¶

Data type of an array of elements with same subtype.

Compared to the SQL standard, the maximum cardinality of an array cannot be specified but is fixed at 2147483647(0x7fffffff). Also, any valid type is supported as a subtype.

Parameters
  • element_type – DataType of each element in the array.

  • nullable – boolean, whether the type can be null (None) or not.

static BIGINT(nullable: bool = True) → pyflink.table.types.BigIntType[source]¶

Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static BINARY(length: int, nullable: bool = True) → pyflink.table.types.BinaryType[source]¶

Data type of a fixed-length binary string (=a sequence of bytes).

Parameters
  • length – int, the number of bytes. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).

  • nullable – boolean, whether the type can be null (None) or not.

Note

BinaryType is still not supported yet.

static BOOLEAN(nullable: bool = True) → pyflink.table.types.BooleanType[source]¶

Data type of a boolean with a (possibly) three-valued logic of TRUE, FALSE, UNKNOWN.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static BYTES(nullable: bool = True) → pyflink.table.types.VarBinaryType[source]¶

Data type of a variable-length binary string (=a sequence of bytes) with defined maximum length. This is a shortcut for DataTypes.VARBINARY(2147483647).

Parameters

nullable – boolean, whether the type can be null (None) or not.

See also

VARBINARY()

static CHAR(length: int, nullable: bool = True) → pyflink.table.types.CharType[source]¶

Data type of a fixed-length character string.

Parameters
  • length – int, the string representation length. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).

  • nullable – boolean, whether the type can be null (None) or not.

Note

CharType is still not supported yet.

static DATE(nullable: bool = True) → pyflink.table.types.DateType[source]¶

Data type of a date consisting of year-month-day with values ranging from 0000-01-01 to 9999-12-31.

Compared to the SQL standard, the range starts at year 0000.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static DAY(precision: int = 2) → pyflink.table.types.Resolution[source]¶

Resolution in days.

Parameters

precision – int, the number of digits of days. It must have a value between 1 and 6 (both inclusive), (default: 2).

Returns

the specified Resolution.

See also

INTERVAL()

static DECIMAL(precision: int, scale: int, nullable: bool = True) → pyflink.table.types.DecimalType[source]¶

Data type of a decimal number with fixed precision and scale.

Parameters
  • precision – the number of digits in a number. It must have a value between 1 and 38 (both inclusive).

  • scale – the number of digits on right side of dot. It must have a value between 0 and precision (both inclusive).

  • nullable – boolean, whether the type can be null (None) or not.

Note

The precision must be 38 and the scale must be 18 currently.

static DOUBLE(nullable: bool = True) → pyflink.table.types.DoubleType[source]¶

Data type of an 8-byte double precision floating point number.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static FIELD(name: str, data_type: pyflink.table.types.DataType, description: str = None) → pyflink.table.types.RowField[source]¶

Field definition with field name, data type, and a description.

Parameters
  • name – string, name of the field.

  • data_type – DataType of the field.

  • description – string, description of the field.

static FLOAT(nullable: bool = True) → pyflink.table.types.FloatType[source]¶

Data type of a 4-byte single precision floating point number.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static HOUR() → pyflink.table.types.Resolution[source]¶

Resolution in hours.

Returns

Resolution

See also

INTERVAL()

static INT(nullable: bool = True) → pyflink.table.types.IntType[source]¶

Data type of a 2-byte signed integer with values from -2,147,483,648 to 2,147,483,647.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static INTERVAL(upper_resolution: pyflink.table.types.Resolution, lower_resolution: pyflink.table.types.Resolution = None) → Union[pyflink.table.types.DayTimeIntervalType, pyflink.table.types.YearMonthIntervalType][source]¶

Data type of a temporal interval. There are two types of temporal intervals: day-time intervals with up to nanosecond granularity or year-month intervals with up to month granularity.

An interval of day-time consists of +days hours:months:seconds.fractional with values ranging from -999999 23:59:59.999999999 to +999999 23:59:59.999999999. The type must be parameterized to one of the following resolutions: interval of days, interval of days to hours, interval of days to minutes, interval of days to seconds, interval of hours, interval of hours to minutes, interval of hours to seconds, interval of minutes, interval of minutes to seconds, or interval of seconds. The value representation is the same for all types of resolutions. For example, an interval of seconds of 70 is always represented in an interval-of-days-to-seconds format (with default precisions): +00 00:01:10.000000.

An interval of year-month consists of +years-months with values ranging from -9999-11 to +9999-11. The type must be parameterized to one of the following resolutions: interval of years, interval of years to months, or interval of months. The value representation is the same for all types of resolutions. For example, an interval of months of 50 is always represented in an interval-of-years-to-months format (with default year precision): +04-02.

Examples: INTERVAL(DAY(2), SECOND(9)) for a day-time interval or INTERVAL(YEAR(4), MONTH()) for a year-month interval.

Parameters
  • upper_resolution – Resolution, the upper resolution of the interval.

  • lower_resolution – Resolution, the lower resolution of the interval.

Note

the upper_resolution must be MONTH for YearMonthIntervalType, SECOND for DayTimeIntervalType and the lower_resolution must be None currently.

See also

SECOND()

See also

MINUTE()

See also

HOUR()

See also

DAY()

See also

MONTH()

See also

YEAR()

static LIST_VIEW(element_type: pyflink.table.types.DataType) → pyflink.table.types.ListViewType[source]¶

Data type of a pyflink.table.data_view.ListView.

It can only be used in accumulator type declaration of an Aggregate Function.

Parameters

element_type – DataType of each element in the list view.

static MAP(key_type: pyflink.table.types.DataType, value_type: pyflink.table.types.DataType, nullable: bool = True) → pyflink.table.types.MapType[source]¶

Data type of an associative array that maps keys to values. A map cannot contain duplicate keys; each key can map to at most one value.

There is no restriction of key types; it is the responsibility of the user to ensure uniqueness. The map type is an extension to the SQL standard.

Parameters
  • key_type – DataType of the keys in the map.

  • value_type – DataType of the values in the map.

  • nullable – boolean, whether the type can be null (None) or not.

static MAP_VIEW(key_type: pyflink.table.types.DataType, value_type: pyflink.table.types.DataType) → pyflink.table.types.MapViewType[source]¶

Data type of a pyflink.table.data_view.ListView.

It can only be used in accumulator type declaration of an Aggregate Function.

Parameters
  • key_type – DataType of the keys in the map view.

  • value_type – DataType of the values in the map view.

static MINUTE() → pyflink.table.types.Resolution[source]¶

Resolution in minutes.

Returns

the specified Resolution.

See also

INTERVAL()

static MONTH() → pyflink.table.types.Resolution[source]¶

Resolution in months.

Returns

the specified Resolution.

See also

INTERVAL()

static MULTISET(element_type: pyflink.table.types.DataType, nullable: bool = True) → pyflink.table.types.MultisetType[source]¶

Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its elements with a common subtype. Each unique value is mapped to some multiplicity.

There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.

Parameters
  • element_type – DataType of each element in the multiset.

  • nullable – boolean, whether the type can be null (None) or not.

static NULL() → pyflink.table.types.NullType[source]¶

Data type for representing untyped null (None) values. A null type has no other value except null (None), thus, it can be cast to any nullable type.

This type helps in representing unknown types in API calls that use a null (None) literal as well as bridging to formats such as JSON or Avro that define such a type as well.

The null type is an extension to the SQL standard.

Note

NullType is still not supported yet.

static ROW(row_fields: List = [], nullable: bool = True) → pyflink.table.types.RowType[source]¶

Data type of a sequence of fields. A field consists of a field name, field type, and an optional description. The most specific type of a row of a table is a row type. In this case, each column of the row corresponds to the field of the row type that has the same ordinal position as the column.

Compared to the SQL standard, an optional field description simplifies the handling with complex structures.

Parameters
  • row_fields – a list of row field types which can be created via DataTypes.FIELD().

  • nullable – boolean, whether the type can be null (None) or not.

static SECOND(precision: int = 6) → pyflink.table.types.Resolution[source]¶

Resolution in seconds and (possibly) fractional seconds.

Parameters

precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive), (default: 6).

Returns

the specified Resolution.

Note

the precision must be 3 currently.

See also

INTERVAL()

static SMALLINT(nullable: bool = True) → pyflink.table.types.SmallIntType[source]¶

Data type of a 2-byte signed integer with values from -32,768 to 32,767.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static STRING(nullable: bool = True) → pyflink.table.types.VarCharType[source]¶

Data type of a variable-length character string with defined maximum length. This is a shortcut for DataTypes.VARCHAR(2147483647).

Parameters

nullable – boolean, whether the type can be null (None) or not.

See also

VARCHAR()

static TIME(precision: int = 0, nullable: bool = True) → pyflink.table.types.TimeType[source]¶

Data type of a time WITHOUT time zone.

An instance consists of hour:minute:second[.fractional with up to nanosecond precision and values ranging from 00:00:00.000000000 to 23:59:59.999999999.

Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported.

Parameters
  • precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive).

  • nullable – boolean, whether the type can be null (None) or not.

Note

The precision must be 0 currently.

static TIMESTAMP(precision: int = 6, nullable: bool = True) → pyflink.table.types.TimestampType[source]¶

Data type of a timestamp WITHOUT time zone.

An instance consists of year-month-day hour:minute:second[.fractional with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 to 9999-12-31 23:59:59.999999999.

Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported.

This class does not store or represent a time-zone. Instead, it is a description of the date, as used for birthdays, combined with the local time as seen on a wall clock. It cannot represent an instant on the time-line without additional information such as an offset or time-zone.

Parameters
  • precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). (default: 6)

  • nullable – boolean, whether the type can be null (None) or not.

Note

The precision must be 3 currently.

static TIMESTAMP_WITH_LOCAL_TIME_ZONE(precision: int = 6, nullable: bool = True) → pyflink.table.types.LocalZonedTimestampType[source]¶

Data type of a timestamp WITH LOCAL time zone.

An instance consists of year-month-day hour:minute:second[.fractional with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 +14:59 to 9999-12-31 23:59:59.999999999 -14:59.

Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported.

The value will be stored internally as a long value which stores all date and time fields, to a precision of nanoseconds, as well as the offset from UTC/Greenwich.

Parameters
  • precision – int, the number of digits of fractional seconds. It must have a value between 0 and 9 (both inclusive). (default: 6)

  • nullable – boolean, whether the type can be null (None) or not.

Note

LocalZonedTimestampType is currently only supported in blink planner and the precision must be 3.

static TINYINT(nullable: bool = True) → pyflink.table.types.TinyIntType[source]¶

Data type of a 1-byte signed integer with values from -128 to 127.

Parameters

nullable – boolean, whether the type can be null (None) or not.

static VARBINARY(length: int, nullable: bool = True) → pyflink.table.types.VarBinaryType[source]¶

Data type of a variable-length binary string (=a sequence of bytes)

Parameters
  • length – int, the maximum number of bytes. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).

  • nullable – boolean, whether the type can be null (None) or not.

Note

The length limit must be 0x7fffffff(2147483647) currently.

See also

BYTES()

static VARCHAR(length: int, nullable: bool = True) → pyflink.table.types.VarCharType[source]¶

Data type of a variable-length character string.

Parameters
  • length – int, the maximum string representation length. It must have a value between 1 and 2147483647(0x7fffffff) (both inclusive).

  • nullable – boolean, whether the type can be null (None) or not.

Note

The length limit must be 0x7fffffff(2147483647) currently.

See also

STRING()

static YEAR(precision: int = 2) → pyflink.table.types.Resolution[source]¶

Resolution in years with 2 digits for the number of years by default.

Parameters

precision – the number of digits of years. It must have a value between 1 and 4 (both inclusive), (default 2).

Returns

the specified Resolution.

See also

INTERVAL()

class pyflink.table.DataView[source]¶

Bases: abc.ABC

A DataView is a collection type that can be used in the accumulator of an user defined pyflink.table.AggregateFunction. Depending on the context in which the function is used, a DataView can be backed by a normal collection or a state backend.

abstract clear() → None[source]¶

Clears the DataView and removes all data.

class pyflink.table.EnvironmentSettings(j_environment_settings)[source]¶

Bases: object

Defines all parameters that initialize a table environment. Those parameters are used only during instantiation of a TableEnvironment and cannot be changed afterwards.

Example:

>>> EnvironmentSettings.new_instance() \
...     .use_old_planner() \
...     .in_streaming_mode() \
...     .with_built_in_catalog_name("my_catalog") \
...     .with_built_in_database_name("my_database") \
...     .build()
class Builder[source]¶

Bases: object

A builder for EnvironmentSettings.

build() → pyflink.table.environment_settings.EnvironmentSettings[source]¶

Returns an immutable instance of EnvironmentSettings.

Returns

an immutable instance of EnvironmentSettings.

in_batch_mode() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Sets that the components should work in a batch mode. Streaming mode by default.

Returns

This object.

in_streaming_mode() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Sets that the components should work in a streaming mode. Enabled by default.

Returns

This object.

use_any_planner() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Does not set a planner requirement explicitly.

A planner will be discovered automatically, if there is only one planner available.

By default, use_old_planner() is enabled.

Returns

This object.

use_blink_planner() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Sets the Blink planner as the required module. By default, use_old_planner() is enabled.

Returns

This object.

use_old_planner() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Sets the old Flink planner as the required module.

This is the default behavior.

Returns

This object.

with_built_in_catalog_name(built_in_catalog_name: str) → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Specifies the name of the initial catalog to be created when instantiating a TableEnvironment. This catalog will be used to store all non-serializable objects such as tables and functions registered via e.g. register_table_sink() or register_java_function(). It will also be the initial value for the current catalog which can be altered via use_catalog().

Default: “default_catalog”.

Parameters

built_in_catalog_name – The specified built-in catalog name.

Returns

This object.

with_built_in_database_name(built_in_database_name: str) → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Specifies the name of the default database in the initial catalog to be created when instantiating a TableEnvironment. The database will be used to store all non-serializable objects such as tables and functions registered via e.g. register_table_sink() or register_java_function(). It will also be the initial value for the current database which can be altered via use_database().

Default: “default_database”.

Parameters

built_in_database_name – The specified built-in database name.

Returns

This object.

get_built_in_catalog_name() → str[source]¶

Gets the specified name of the initial catalog to be created when instantiating a TableEnvironment.

Returns

The specified name of the initial catalog to be created.

get_built_in_database_name() → str[source]¶

Gets the specified name of the default database in the initial catalog to be created when instantiating a TableEnvironment.

Returns

The specified name of the default database in the initial catalog to be created.

is_streaming_mode() → bool[source]¶

Tells if the TableEnvironment should work in a batch or streaming mode.

Returns

True if the TableEnvironment should work in a streaming mode, false otherwise.

static new_instance() → pyflink.table.environment_settings.EnvironmentSettings.Builder[source]¶

Creates a builder for creating an instance of EnvironmentSettings.

By default, it does not specify a required planner and will use the one that is available on the classpath via discovery.

Returns

A builder of EnvironmentSettings.

class pyflink.table.ExplainDetail[source]¶

Bases: object

ExplainDetail defines the types of details for explain result.

New in version 1.11.0.

CHANGELOG_MODE = 1¶
ESTIMATED_COST = 0¶
class pyflink.table.Expression(j_expr_or_property_name)[source]¶

Bases: typing.Generic

Expressions represent a logical tree for producing a computation result. Expressions might be literal values, function calls, or field references.

New in version 1.12.0.

property abs¶

Calculates the absolute value of given value.

New in version 1.12.0.

property acos¶

Calculates the arc cosine of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

alias(name: str, *extra_names: str) → pyflink.table.expression.Expression[~T][T][source]¶

Specifies a name for an expression i.e. a field.

Example:

>>> tab.select(col('a').alias('b'))
Parameters
  • name – name for one field.

  • extra_names – additional names if the expression expands to multiple fields

New in version 1.12.0.

property asc¶

Specifies ascending order of an expression i.e. a field for order_by.

Example:

>>> tab.order_by(col('a').asc)

See also

desc

New in version 1.12.0.

property asin¶

Calculates the arc sine of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

at(index) → pyflink.table.expression.Expression[source]¶

Accesses the element of an array or map based on a key or an index (starting at 1).

Parameters

index – index key or position of the element (array index starting at 1)

See also

cardinality, element

New in version 1.12.0.

property atan¶

Calculates the arc tangent of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

property avg¶

Returns the average (arithmetic mean) of the numeric field across all input values.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

between(lower_bound, upper_bound) → pyflink.table.expression.Expression[bool][bool][source]¶

Returns true if the given expression is between lower_bound and upper_bound (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.

e.g. lit(2.1).between(2.1, 2.1) leads to true, lit(“2018-05-05”).to_date.between(lit(“2018-05-01”).to_date, lit(“2018-05-10”).to_date) leads to true.

Parameters
  • lower_bound – numeric or comparable expression

  • upper_bound – numeric or comparable expression

See also

not_between()

New in version 1.12.0.

property bin¶

Returns a string representation of an integer numeric value in binary format. Returns null if numeric is null. E.g. “4” leads to “100”, “12” leads to “1100”.

See also

hex

New in version 1.12.0.

property cardinality¶

Returns the number of elements of an array or number of entries of a map.

See also

at(), element

New in version 1.12.0.

cast(data_type: pyflink.table.types.DataType) → pyflink.table.expression.Expression[source]¶

Converts a value to a given data type.

e.g. lit(“42”).cast(DataTypes.INT()) leads to 42.

New in version 1.12.0.

ceil(time_interval_unit: pyflink.table.expression.TimeIntervalUnit = None) → pyflink.table.expression.Expression[source]¶

If time_interval_unit is specified, it rounds up a time point to the given unit, e.g. lit(“12:44:31”).to_date.floor(TimeIntervalUnit.MINUTE) leads to 12:45:00. Otherwise, it calculates the smallest integer greater than or equal to a given number.

See also

to_date, to_time, to_timestamp, extract(), floor(), ceil()

New in version 1.12.0.

property char_length¶

Returns the length of a string.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property collect¶

Returns multiset aggregate of a given expression.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property cos¶

Calculates the cosine of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

property cosh¶

Calculates the hyperbolic cosine of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

property cot¶

Calculates the cotangent of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

property count¶

Returns the number of input rows for which the field is not null.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property day¶

Creates an interval of the given number of days.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property days¶

Creates an interval of the given number of days.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property degrees¶

Converts numeric from radians to degrees.

See also

radians

New in version 1.12.0.

property desc¶

Specifies descending order of an expression i.e. a field for order_by.

Example:

>>> tab.order_by(col('a').desc)

See also

asc

New in version 1.12.0.

property distinct¶

Similar to a SQL distinct aggregation clause such as COUNT(DISTINCT a), declares that an aggregation function is only applied on distinct input values.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"), col("b").sum.distinct.alias("d"))

New in version 1.12.0.

property element¶

Returns the sole element of an array with a single element. Returns null if the array is empty. Throws an exception if the array has more than one element.

See also

at(), cardinality

New in version 1.12.0.

property end¶

Returns the end time (exclusive) of a window when applied on a window reference.

e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.

Example:

>>> orders.window(Tumble
>>>             .over(row_interval(2))
>>>             .on(col("a"))
>>>             .alias("w")) \
>>>     .group_by(col("c"), col("w")) \
>>>     .select(col("c"), col("w").start, col("w").end, col("w").proctime)

See also

start

New in version 1.12.0.

property exp¶

Calculates the Euler’s number raised to the given power.

New in version 1.12.0.

extract(time_interval_unit: pyflink.table.expression.TimeIntervalUnit) → pyflink.table.expression.Expression[source]¶

Extracts parts of a time point or time interval. Returns the part as a long value. e.g. lit(“2006-06-05”).to_date.extract(TimeIntervalUnit.DAY) leads to 5.

See also

to_date, to_time, to_timestamp, extract(), floor(), ceil()

New in version 1.12.0.

property flatten¶

Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field.

See also

get()

New in version 1.12.0.

floor(time_interval_unit: pyflink.table.expression.TimeIntervalUnit = None) → pyflink.table.expression.Expression[source]¶

If time_interval_unit is specified, it rounds down a time point to the given unit, e.g. lit(“12:44:31”).to_date.floor(TimeIntervalUnit.MINUTE) leads to 12:44:00. Otherwise, it calculates the largest integer less than or equal to a given number.

See also

to_date, to_time, to_timestamp, extract(), floor(), ceil()

New in version 1.12.0.

property from_base64¶

Returns the base string decoded with base64.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

get(name_or_index: Union[str, int]) → pyflink.table.expression.Expression[source]¶

Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name or index and returns it’s value.

Parameters

name_or_index – name or index of the field (similar to Flink’s field expressions)

See also

flatten

New in version 1.12.0.

property hex¶

Returns a string representation of an integer numeric value or a string in hex format. Returns null if numeric or string is null.

E.g. a numeric 20 leads to “14”, a numeric 100 leads to “64”, and a string “hello,world” leads to “68656c6c6f2c776f726c64”.

See also

bin

New in version 1.12.0.

property hour¶

Creates an interval of the given number of hours.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property hours¶

Creates an interval of the given number of hours.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

in_(first_element_or_table, *remaining_elements) → pyflink.table.expression.Expression[source]¶

If first_element_or_table is a Table, Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.

Note

This operation is not supported in a streaming environment yet if first_element_or_table is a Table.

Otherwise, Returns true if an expression exists in a given list of expressions. This is a shorthand for multiple OR conditions.

If the testing set contains null, the result will be null if the element can not be found and true if it can be found. If the element is null, the result is always null.

e.g. lit(“42”).in(1, 2, 3) leads to false.

Example:

>>> tab.where(col("a").in_(1, 2, 3))
>>> table_a.where(col("x").in_(table_b.select("y")))

New in version 1.12.0.

property init_cap¶

Converts the initial letter of each word in a string to uppercase. Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property is_false¶

Returns true if given boolean expression is false. False otherwise (for null and true).

See also

is_true, is_not_true, is_not_false

New in version 1.12.0.

property is_not_false¶

Returns true if given boolean expression is not false (for null and true). False otherwise.

See also

is_true, is_false, is_not_true

New in version 1.12.0.

property is_not_null¶

Returns true if the given expression is not null.

See also

is_null

New in version 1.12.0.

property is_not_true¶

Returns true if given boolean expression is not true (for null and false). False otherwise.

See also

is_true, is_false, is_not_false

New in version 1.12.0.

property is_null¶

Returns true if the given expression is null.

See also

is_not_null

New in version 1.12.0.

property is_true¶

Returns true if given boolean expression is true. False otherwise (for null and false).

See also

is_false, is_not_true, is_not_false

New in version 1.12.0.

like(pattern: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[bool][bool][source]¶

Returns true, if a string matches the specified LIKE pattern. e.g. ‘Jo_n%’ matches all strings that start with ‘Jo(arbitrary letter)n’

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property ln¶

Calculates the natural logarithm of the given value.

See also

log10, log2, ln, log()

New in version 1.12.0.

log(base=None) → pyflink.table.expression.Expression[float][float][source]¶

Calculates the natural logarithm of the given value if base is not specified. Otherwise, calculates the logarithm of the given value to the given base.

See also

log10, log2, ln, log()

New in version 1.12.0.

property log10¶

Calculates the base 10 logarithm of the given value.

See also

log10, log2, ln, log()

New in version 1.12.0.

property log2¶

Calculates the base 2 logarithm of the given value.

See also

log10, log2, ln, log()

New in version 1.12.0.

property lower_case¶

Returns all of the characters in a string in lower case using the rules of the default locale.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

lpad(length: Union[int, Expression[int]], pad: Union[str, Expression[str]]) → pyflink.table.expression.Expression[str][str][source]¶

Returns a string left-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. e.g. lit(‘hi’).lpad(4, ‘??’) returns ‘??hi’, lit(‘hi’).lpad(1, ‘??’) returns ‘h’

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property ltrim¶

Returns a string that removes the left whitespaces from the given string.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property max¶

Returns the maximum value of field across all input values.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property md5¶

Returns the MD5 hash of the string argument; null if string is null.

Returns

string of 32 hexadecimal digits or null.

See also

md5, sha1, sha224, sha256, sha384, sha512, sha2

New in version 1.12.0.

property milli¶

Creates an interval of the given number of millis.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property millis¶

Creates an interval of the given number of millis.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property min¶

Returns the minimum value of field across all input values.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property minute¶

Creates an interval of the given number of minutes.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property minutes¶

Creates an interval of the given number of minutes.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property month¶

Creates an interval of the given number of months.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property months¶

Creates an interval of the given number of months.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

not_between(lower_bound, upper_bound) → pyflink.table.expression.Expression[bool][bool][source]¶

Returns true if the given expression is not between lower_bound and upper_bound (both inclusive). False otherwise. The parameters must be numeric types or identical comparable types.

e.g. lit(2.1).not_between(2.1, 2.1) leads to false, lit(“2018-05-05”).to_date.not_between(lit(“2018-05-01”).to_date, lit(“2018-05-10”).to_date) leads to false.

Parameters
  • lower_bound – numeric or comparable expression

  • upper_bound – numeric or comparable expression

See also

between()

New in version 1.12.0.

over(alias) → pyflink.table.expression.Expression[source]¶

Defines an aggregation to be used for a previously specified over window.

Example:

>>> tab.window(Over
>>>         .partition_by(col('c'))
>>>         .order_by(col('rowtime'))
>>>         .preceding(row_interval(2))
>>>         .following(CURRENT_ROW)
>>>         .alias("w")) \
>>>     .select(col('c'), col('a'), col('a').count.over(col('w')))

New in version 1.12.0.

overlay(new_string: Union[str, Expression[str]], starting: Union[int, Expression[int]], length: Union[int, Expression[int]] = None) → pyflink.table.expression.Expression[str][str][source]¶

Replaces a substring of string with a string starting at a position (starting at 1). e.g. lit(‘xxxxxtest’).overlay(‘xxxx’, 6) leads to ‘xxxxxxxxx’ lit(‘xxxxxtest’).overlay(‘xxxx’, 6, 2) leads to ‘xxxxxxxxxst’

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

position(haystack: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[int][int][source]¶

Returns the position of string in an other string starting at 1. Returns 0 if string could not be found. e.g. lit(‘a’).position(‘bbbbba’) leads to 6.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property proctime¶

Declares a field as the proctime attribute for indicating, accessing, and working in Flink’s processing time.

See also

rowtime

New in version 1.12.0.

property quarter¶

Creates an interval of the given number of quarters.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property quarters¶

Creates an interval of the given number of quarters.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property radians¶

Converts numeric from degrees to radians.

See also

degrees

New in version 1.12.0.

regexp_extract(regex: Union[str, Expression[str]], extract_index: Union[int, Expression[int]] = None) → pyflink.table.expression.Expression[str][str][source]¶

Returns a string extracted with a specified regular expression and a regex match group index.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

regexp_replace(regex: Union[str, Expression[str]], replacement: Union[str, Expression[str]]) → pyflink.table.expression.Expression[str][str][source]¶

Returns a string with all substrings that match the regular expression consecutively being replaced.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

repeat(n: Union[int, Expression[int]]) → pyflink.table.expression.Expression[str][str][source]¶

Returns a string that repeats the base string n times.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

replace(search: Union[str, Expression[str]] = None, replacement: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶

Returns a new string which replaces all the occurrences of the search target with the replacement string (non-overlapping).

e.g. lit(‘This is a test String.’).replace(’ ‘, ‘_’) leads to This_is_a_test_String.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

round(places: Union[int, Expression[int]])[source]¶

Rounds the given number to integer places right to the decimal point.

e.g. lit(646.646).round(2) leads to 646.65, lit(646.646).round(3) leads to 646.646, lit(646.646).round(0) leads to 647, lit(646.646).round(-2) leads to 600.

New in version 1.12.0.

property rowtime¶

Declares a field as the rowtime attribute for indicating, accessing, and working in Flink’s event time.

See also

proctime

New in version 1.12.0.

rpad(length: Union[int, Expression[int]], pad: Union[str, Expression[str]]) → pyflink.table.expression.Expression[str][str][source]¶

Returns a string right-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. e.g. lit(‘hi’).rpad(4, ‘??’) returns ‘hi??’, lit(‘hi’).rpad(1, ‘??’) returns ‘h’

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property rtrim¶

Returns a string that removes the right whitespaces from the given string.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property second¶

Creates an interval of the given number of seconds.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property seconds¶

Creates an interval of the given number of seconds.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property sha1¶

Returns the SHA-1 hash of the string argument; null if string is null.

Returns

string of 40 hexadecimal digits or null.

See also

md5, sha1, sha224, sha256, sha384, sha512, sha2

New in version 1.12.0.

sha2(hash_length: Union[int, Expression[int]]) → pyflink.table.expression.Expression[str][str][source]¶

Returns the hash for the given string expression using the SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, or SHA-512).

Parameters

hash_length – bit length of the result (either 224, 256, 384, or 512)

Returns

string or null if one of the arguments is null.

See also

md5, sha1, sha224, sha256, sha384, sha512

New in version 1.12.0.

property sha224¶

Returns the SHA-224 hash of the string argument; null if string is null.

Returns

string of 56 hexadecimal digits or null.

See also

md5, sha1, sha224, sha256, sha384, sha512, sha2

New in version 1.12.0.

property sha256¶

Returns the SHA-256 hash of the string argument; null if string is null.

Returns

string of 64 hexadecimal digits or null.

See also

md5, sha1, sha224, sha256, sha384, sha512, sha2

New in version 1.12.0.

property sha384¶

Returns the SHA-384 hash of the string argument; null if string is null.

Returns

string of 96 hexadecimal digits or null.

See also

md5, sha1, sha224, sha256, sha384, sha512, sha2

New in version 1.12.0.

property sha512¶

Returns the SHA-512 hash of the string argument; null if string is null.

Returns

string of 128 hexadecimal digits or null.

See also

md5, sha1, sha224, sha256, sha384, sha512, sha2

New in version 1.12.0.

property sign¶

Calculates the signum of a given number.

e.g. lit(1.23).sign leads to 1.00, lit(-1.23).sign leads to -1.00.

New in version 1.12.0.

similar(pattern: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[bool][bool][source]¶

Returns true, if a string matches the specified SQL regex pattern. e.g. ‘A+’ matches all strings that consist of at least one A

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property sin¶

Calculates the sine of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

property sinh¶

Calculates the hyperbolic sine of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

property sqrt¶

Calculates the square root of a given value.

New in version 1.12.0.

property start¶

Returns the start time (inclusive) of a window when applied on a window reference.

Example:

>>> tab.window(Tumble
>>>         .over(row_interval(2))
>>>         .on(col("a"))
>>>         .alias("w")) \
>>>     .group_by(col("c"), col("w")) \
>>>     .select(col("c"), col("w").start, col("w").end, col("w").proctime)

See also

end

New in version 1.12.0.

property stddev_pop¶

Returns the population standard deviation of an expression(the square root of var_pop).

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property stddev_samp¶

Returns the sample standard deviation of an expression(the square root of var_samp).

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

substring(begin_index: Union[int, Expression[int]], length: Union[int, Expression[int]] = None) → pyflink.table.expression.Expression[str][str][source]¶

Creates a substring of the given string at given index for a given length.

Parameters
  • begin_index – first character of the substring (starting at 1, inclusive)

  • length – number of characters of the substring

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property sum¶

Returns the sum of the numeric field across all input values. If all values are null, null is returned.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property sum0¶

Returns the sum of the numeric field across all input values. If all values are null, 0 is returned.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property tan¶

Calculates the tangent of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

property tanh¶

Calculates the hyperbolic tangent of a given number.

See also

sin, cos, sinh, cosh, tan, cot, asin, acos, atan, tanh

New in version 1.12.0.

then(if_true, if_false) → pyflink.table.expression.Expression[source]¶

Ternary conditional operator that decides which of two other expressions should be evaluated based on a evaluated boolean condition.

e.g. lit(42).is_greater(5).then(“A”, “B”) leads to “A”

Parameters
  • if_true – expression to be evaluated if condition holds

  • if_false – expression to be evaluated if condition does not hold

New in version 1.12.0.

property to_base64¶

Returns the base64-encoded result of the input string.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property to_date¶

Parses a date string in the form “yyyy-MM-dd” to a SQL Date. It’s equivalent to col.cast(DataTypes.DATE()).

Example:

>>> lit("2016-06-15").to_date

See also

to_date, to_time, to_timestamp, extract(), floor(), ceil()

New in version 1.12.0.

property to_time¶

Parses a time string in the form “HH:mm:ss” to a SQL Time. It’s equivalent to col.cast(DataTypes.TIME()).

Example:

>>> lit("3:30:00").to_time

See also

to_date, to_time, to_timestamp, extract(), floor(), ceil()

New in version 1.12.0.

property to_timestamp¶

Parses a timestamp string in the form “yyyy-MM-dd HH:mm:ss[.SSS]” to a SQL Timestamp. It’s equivalent to col.cast(DataTypes.TIMESTAMP(3)).

Example:

>>> lit('2016-06-15 3:30:00.001').to_timestamp

See also

to_date, to_time, to_timestamp, extract(), floor(), ceil()

New in version 1.12.0.

trim(character: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶

Removes leading and trailing space characters from the given string if character is None. Otherwise, removes leading and trailing specified characters from the given string.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

trim_leading(character: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶

Removes leading space characters from the given string if character is None. Otherwise, removes leading specified characters from the given string.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

trim_trailing(character: Union[str, Expression[str]] = None) → pyflink.table.expression.Expression[str][str][source]¶

Removes trailing space characters from the given string if character is None. Otherwise, removes trailing specified characters from the given string.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

truncate(n: Union[int, Expression[int]] = 0) → pyflink.table.expression.Expression[~T][T][source]¶

Returns a number of truncated to n decimal places. If n is 0, the result has no decimal point or fractional part. n can be negative to cause n digits left of the decimal point of the value to become zero. E.g. truncate(42.345, 2) to 42.34, 42.truncate(-1) to 40

New in version 1.12.0.

property upper_case¶

Returns all of the characters in a string in upper case using the rules of the default locale.

See also

trim_leading(), trim_trailing(), trim(), replace(), char_length, upper_case, lower_case, init_cap, like(), similar(), position(), lpad(), rpad(), overlay(), regexp_replace(), regexp_extract(), substring(), from_base64, to_base64, ltrim, rtrim, repeat()

New in version 1.12.0.

property var_pop¶

Returns the population standard variance of an expression.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property var_samp¶

Returns the sample variance of a given expression.

Example:

>>> tab \
>>>     .group_by(col("a")) \
>>>     .select(col("a"),
>>>             col("b").sum.alias("d"),
>>>             col("b").sum0.alias("e"),
>>>             col("b").min.alias("f"),
>>>             col("b").max.alias("g"),
>>>             col("b").count.alias("h"),
>>>             col("b").avg.alias("i"),
>>>             col("b").stddev_pop.alias("j"),
>>>             col("b").stddev_samp.alias("k"),
>>>             col("b").var_pop.alias("l"),
>>>             col("b").var_samp.alias("m"),
>>>             col("b").collect.alias("n"))

See also

sum, sum0, min, max, count, avg, stddev_pop, stddev_samp, var_pop, var_samp, collect

New in version 1.12.0.

property week¶

Creates an interval of the given number of weeks.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property weeks¶

Creates an interval of the given number of weeks.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property year¶

Creates an interval of the given number of years.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

property years¶

Creates an interval of the given number of years.

The produced expression is of type INTERVAL().

See also

year, years, quarter, quarters, month, months, week, weeks, day, days, hour, hours, minute, minutes, second, seconds, milli, millis

New in version 1.12.0.

class pyflink.table.FunctionContext(base_metric_group)[source]¶

Bases: object

Used to obtain global runtime information about the context in which the user-defined function is executed. The information includes the metric group, and global job parameters, etc.

get_metric_group() → pyflink.metrics.metricbase.MetricGroup[source]¶

Returns the metric group for this parallel subtask.

New in version 1.11.0.

class pyflink.table.GroupWindowedTable(java_group_windowed_table, t_env)[source]¶

Bases: object

A table that has been windowed for GroupWindow.

group_by(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.WindowGroupedTable[source]¶

Groups the elements by a mandatory window and one or more optional grouping attributes. The window is specified by referring to its alias.

If no additional grouping attribute is specified and if the input is a streaming table, the aggregation will be performed by a single task, i.e., with parallelism 1.

Aggregations are performed per group and defined by a subsequent select() clause similar to SQL SELECT-GROUP-BY query.

Example:

>>> from pyflink.table import expressions as expr
>>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \
...     .group_by(col('w')) \
...     .select(tab.a.sum.alias('a'),
...             col('w').start.alias('b'),
...             col('w').end.alias('c'),
...             col('w').rowtime.alias('d'))
Parameters

fields – Group keys.

Returns

A window grouped table.

class pyflink.table.GroupedTable(java_table, t_env)[source]¶

Bases: object

A table that has been grouped on a set of grouping keys.

select(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Performs a selection operation on a grouped table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.

Example:

>>> tab.group_by(tab.key).select(tab.key, tab.value.avg.alias('average'))
>>> tab.group_by("key").select("key, value.avg as average")
Parameters

fields – Expression string that contains group keys and aggregate function calls.

Returns

The result table.

class pyflink.table.ListView[source]¶

Bases: pyflink.table.data_view.DataView, typing.Generic

A DataView that provides list-like functionality in the accumulator of an AggregateFunction when large amounts of data are expected.

add(value: T) → None[source]¶

Adds the given value to this list view.

add_all(values: List[T]) → None[source]¶

Adds all of the elements of the specified list to this list view.

clear() → None[source]¶

Clears the DataView and removes all data.

get() → Iterable[T][source]¶

Returns an iterable of this list view.

class pyflink.table.MapView[source]¶

Bases: typing.Generic

A DataView that provides dict-like functionality in the accumulator of an AggregateFunction when large amounts of data are expected.

clear() → None[source]¶

Removes all entries of this map.

contains(key: K) → bool[source]¶

Checks if the map view contains a value for a given key.

get(key: K) → V[source]¶

Return the value for the specified key.

is_empty() → bool[source]¶

Returns true if the map view contains no key-value mappings, otherwise false.

items() → Iterable[Tuple[K, V]][source]¶

Returns all entries of the map view.

keys() → Iterable[K][source]¶

Returns all the keys in the map view.

put(key: K, value: V) → None[source]¶

Inserts a value for the given key into the map view. If the map view already contains a value for the key, the existing value is overwritten.

put_all(dict_value: Dict[K, V]) → None[source]¶

Inserts all mappings from the specified map to this map view.

remove(key: K) → None[source]¶

Deletes the value for the given key.

values() → Iterable[V][source]¶

Returns all the values in the map view.

class pyflink.table.Module(j_module)[source]¶

Bases: object

Modules define a set of metadata, including functions, user defined types, operators, rules, etc. Metadata from modules are regarded as built-in or system metadata that users can take advantages of.

New in version 1.12.0.

class pyflink.table.OverWindowedTable(java_over_windowed_table, t_env)[source]¶

Bases: object

A table that has been windowed for OverWindow.

Unlike group windows, which are specified in the GROUP BY clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows.

select(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Performs a selection operation on a over windowed table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.

Example:

>>> over_windowed_table.select(col('c'),
...                            col('b').count.over(col('ow')),
...                            col('e').sum.over(col('ow')))
>>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
Parameters

fields – Expression string.

Returns

The result table.

class pyflink.table.ResultKind[source]¶

Bases: object

ResultKind defines the types of the result.

SUCCESS:

The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple “OK”.

SUCCESS_WITH_CONTENT:

The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important content.

New in version 1.11.0.

SUCCESS = 0¶
SUCCESS_WITH_CONTENT = 1¶
class pyflink.table.Row(*args, **kwargs)[source]¶

Bases: object

A row in Table. The fields in it can be accessed:

  • like attributes (row.key)

  • like dictionary values (row[key])

key in row will search through row keys.

Row can be used to create a row object by using named arguments, the fields will be sorted by names. It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case.

>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row['name'], row['age']
('Alice', 11)
>>> row.name, row.age
('Alice', 11)
>>> 'name' in row
True
>>> 'wrong_key' in row
False

Row can also be used to create another Row like class, then it could be used to create Row objects, such as

>>> Person = Row("name", "age")
>>> Person
<Row(name, age)>
>>> 'name' in Person
True
>>> 'wrong_key' in Person
False
>>> Person("Alice", 11)
Row(name='Alice', age=11)
as_dict(recursive=False)[source]¶

Returns as a dict.

Example:

>>> Row(name="Alice", age=11).as_dict() == {'name': 'Alice', 'age': 11}
True
>>> row = Row(key=1, value=Row(name='a', age=2))
>>> row.as_dict() == {'key': 1, 'value': Row(age=2, name='a')}
True
>>> row.as_dict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}}
True
Parameters

recursive – turns the nested Row as dict (default: False).

get_row_kind() → pyflink.common.types.RowKind[source]¶
set_field_names(field_names: List)[source]¶
set_row_kind(row_kind: pyflink.common.types.RowKind)[source]¶
class pyflink.table.RowKind[source]¶

Bases: enum.Enum

An enumeration.

DELETE = 3¶
INSERT = 0¶
UPDATE_AFTER = 2¶
UPDATE_BEFORE = 1¶
class pyflink.table.ScalarFunction[source]¶

Bases: pyflink.table.udf.UserDefinedFunction

Base interface for user-defined scalar function. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.

New in version 1.10.0.

abstract eval(*args)[source]¶

Method which defines the logic of the scalar function.

class pyflink.table.SqlDialect[source]¶

Bases: object

Enumeration of valid SQL compatibility modes.

In most of the cases, the built-in compatibility mode should be sufficient. For some features, i.e. the “INSERT INTO T PARTITION(a=’xxx’) …” grammar, you may need to switch to the Hive dialect if required.

We may introduce other SQL dialects in the future.

DEFAULT:

Flink’s default SQL behavior.

HIVE:

SQL dialect that allows some Apache Hive specific grammar.

Note: We might never support all of the Hive grammar. See the documentation for supported features.

DEFAULT = 0¶
HIVE = 1¶
class pyflink.table.StatementSet(_j_statement_set, t_env)[source]¶

Bases: object

A StatementSet accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.

Note

The added statements and Tables will be cleared when calling the execute method.

New in version 1.11.0.

add_insert(target_path: str, table, overwrite: bool = False) → pyflink.table.statement_set.StatementSet[source]¶

add Table with the given sink table name to the set.

Parameters
  • target_path – The path of the registered TableSink to which the Table is written.

  • table (pyflink.table.Table) – The Table to add.

  • overwrite – The flag that indicates whether the insert should overwrite existing data or not.

Returns

current StatementSet instance.

New in version 1.11.0.

add_insert_sql(stmt: str) → pyflink.table.statement_set.StatementSet[source]¶

add insert statement to the set.

Parameters

stmt – The statement to be added.

Returns

current StatementSet instance.

New in version 1.11.0.

execute() → pyflink.table.table_result.TableResult[source]¶

execute all statements and Tables as a batch.

Note

The added statements and Tables will be cleared when executing this method.

Returns

execution result.

New in version 1.11.0.

explain(*extra_details: pyflink.table.explain_detail.ExplainDetail) → str[source]¶

returns the AST and the execution plan of all statements and Tables.

Parameters

extra_details – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming

Returns

All statements and Tables for which the AST and execution plan will be returned.

New in version 1.11.0.

class pyflink.table.StreamTableEnvironment(j_tenv)[source]¶

Bases: pyflink.table.table_environment.TableEnvironment

connect(connector_descriptor: pyflink.table.descriptors.ConnectorDescriptor) → pyflink.table.descriptors.StreamTableDescriptor[source]¶

Creates a temporary table from a descriptor.

Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.

The following example shows how to read from a connector using a JSON format and registering a temporary table as “MyTable”:

>>> table_env \
...     .connect(ExternalSystemXYZ()
...              .version("0.11")) \
...     .with_format(Json()
...                  .json_schema("{...}")
...                  .fail_on_missing_field(False)) \
...     .with_schema(Schema()
...                  .field("user-name", "VARCHAR")
...                  .from_origin_field("u_name")
...                  .field("count", "DECIMAL")) \
...     .create_temporary_table("MyTable")
Parameters

connector_descriptor – Connector descriptor describing the external system.

Returns

A StreamTableDescriptor used to build the temporary table.

Note

Deprecated in 1.11. Use execute_sql() to register a table instead.

static create(stream_execution_environment: pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment = None, table_config: pyflink.table.table_config.TableConfig = None, environment_settings: pyflink.table.environment_settings.EnvironmentSettings = None) → pyflink.table.table_environment.StreamTableEnvironment[source]¶

Creates a StreamTableEnvironment.

Example:

# create with StreamExecutionEnvironment.
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> table_env = StreamTableEnvironment.create(env)
# create with StreamExecutionEnvironment and TableConfig.
>>> table_config = TableConfig()
>>> table_config.set_null_check(False)
>>> table_env = StreamTableEnvironment.create(env, table_config)
# create with StreamExecutionEnvironment and EnvironmentSettings.
>>> environment_settings = EnvironmentSettings.new_instance().use_blink_planner() \
...     .build()
>>> table_env = StreamTableEnvironment.create(
...     env, environment_settings=environment_settings)
# create with EnvironmentSettings.
>>> table_env = StreamTableEnvironment.create(environment_settings=environment_settings)
Parameters
  • stream_execution_environment – The StreamExecutionEnvironment of the TableEnvironment.

  • table_config – The configuration of the TableEnvironment, optional.

  • environment_settings – The environment settings used to instantiate the TableEnvironment. It provides the interfaces about planner selection(flink or blink), optional.

Returns

The StreamTableEnvironment created from given StreamExecutionEnvironment and configuration.

from_data_stream(data_stream: pyflink.datastream.data_stream.DataStream, *fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Converts the given DataStream into a Table with specified field names.

There are two modes for mapping original fields to the fields of the Table:

1. Reference input fields by name: All fields in the schema definition are referenced by name (and possibly renamed using and alias (as). Moreover, we can define proctime and rowtime attributes at arbitrary positions using arbitrary names (except those that exist in the result schema). In this mode, fields can be reordered and projected out. This mode can be used for any input type. 2. Reference input fields by position: In this mode, fields are simply renamed. Event-time attributes can replace the field on their position in the input data (if it is of correct type) or be appended at the end. Proctime attributes must be appended at the end. This mode can only be used if the input type has a defined field order (tuple, case class, Row) and none of the fields references a field of the input type.

Parameters
  • data_stream – The datastream to be converted.

  • fields – The fields expressions to map original fields of the DataStream to the fields of the Table

Returns

The converted Table.

New in version 1.12.0.

to_append_stream(table: pyflink.table.table.Table, type_info: pyflink.common.typeinfo.TypeInformation) → pyflink.datastream.data_stream.DataStream[source]¶

Converts the given Table into a DataStream of a specified type. The Table must only have insert (append) changes. If the Table is also modified by update or delete changes, the conversion will fail.

The fields of the Table are mapped to DataStream as follows: Row and Tuple types: Fields are mapped by position, field types must match.

Parameters
  • table – The Table to convert.

  • type_info – The TypeInformation that specifies the type of the DataStream.

Returns

The converted DataStream.

New in version 1.12.0.

to_retract_stream(table: pyflink.table.table.Table, type_info: pyflink.common.typeinfo.TypeInformation) → pyflink.datastream.data_stream.DataStream[source]¶

Converts the given Table into a DataStream of add and retract messages. The message will be encoded as Tuple. The first field is a boolean flag, the second field holds the record of the specified type.

A true flag indicates an add message, a false flag indicates a retract message.

The fields of the Table are mapped to DataStream as follows: Row and Tuple types: Fields are mapped by position, field types must match.

Parameters
  • table – The Table to convert.

  • type_info – The TypeInformation of the requested record type.

Returns

The converted DataStream.

New in version 1.12.0.

class pyflink.table.Table(j_table, t_env)[source]¶

Bases: object

A Table is the core component of the Table API. Similar to how the batch and streaming APIs have DataSet and DataStream, the Table API is built around Table.

Use the methods of Table to transform data.

Example:

>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> t_env = StreamTableEnvironment.create(env)
>>> ...
>>> t_env.register_table_source("source", ...)
>>> t = t_env.from_path("source")
>>> t.select(...)
>>> ...
>>> t_env.register_table_sink("result", ...)
>>> t.execute_insert("result")

Operations such as join(), select(), where() and group_by() take arguments in an expression string. Please refer to the documentation for the expression syntax.

add_columns(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Adds additional columns. Similar to a SQL SELECT statement. The field expressions can contain complex expressions, but can not contain aggregations. It will throw an exception if the added fields already exist.

Example:

>>> from pyflink.table import expressions as expr
>>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b, 'sunny').alias('b1'))
>>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
Parameters

fields – Column list string.

Returns

The result table.

add_or_replace_columns(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Adds additional columns. Similar to a SQL SELECT statement. The field expressions can contain complex expressions, but can not contain aggregations. Existing fields will be replaced if add columns name is the same as the existing column name. Moreover, if the added fields have duplicate field name, then the last one is used.

Example:

>>> from pyflink.table import expressions as expr
>>> tab.add_or_replace_columns((tab.a + 1).alias('a1'),
...                            expr.concat(tab.b, 'sunny').alias('b1'))
>>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as b1")
Parameters

fields – Column list string.

Returns

The result table.

alias(field: str, *fields: str) → pyflink.table.table.Table[source]¶

Renames the fields of the expression result. Use this to disambiguate fields before joining two tables.

Example:

>>> tab.alias("a", "b", "c")
>>> tab.alias("a, b, c")
Parameters
  • field – Field alias.

  • fields – Additional field aliases.

Returns

The result table.

distinct() → pyflink.table.table.Table[source]¶

Removes duplicate values and returns only distinct (different) values.

Example:

>>> tab.select(tab.key, tab.value).distinct()
Returns

The result table.

drop_columns(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Drops existing columns. The field expressions should be field reference expressions.

Example:

>>> tab.drop_columns(tab.a, tab.b)
>>> tab.drop_columns("a, b")
Parameters

fields – Column list string.

Returns

The result table.

execute() → pyflink.table.table_result.TableResult[source]¶

Collects the contents of the current table local client.

Example:

>>> tab.execute()
Returns

The content of the table.

New in version 1.11.0.

execute_insert(table_path: str, overwrite: bool = False) → pyflink.table.table_result.TableResult[source]¶

Writes the Table to a TableSink that was registered under the specified name, and then execute the insert operation. For the path resolution algorithm see use_database().

Example:

>>> tab.execute_insert("sink")
Parameters
  • table_path – The path of the registered TableSink to which the Table is written.

  • overwrite – The flag that indicates whether the insert should overwrite existing data or not.

Returns

The table result.

New in version 1.11.0.

explain(*extra_details: pyflink.table.explain_detail.ExplainDetail) → str[source]¶

Returns the AST of this table and the execution plan.

Parameters

extra_details – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming

Returns

The statement for which the AST and execution plan will be returned.

New in version 1.11.0.

fetch(fetch: int) → pyflink.table.table.Table[source]¶

Limits a (possibly sorted) result to the first n rows.

This method can be combined with a preceding order_by() call for a deterministic order and offset() call to return n rows after skipping the first o rows.

Example:

Returns the first 3 records.

>>> tab.order_by(tab.name.desc).fetch(3)
>>> tab.order_by("name.desc").fetch(3)

Skips the first 10 rows and returns the next 5 rows.

>>> tab.order_by(tab.name.desc).offset(10).fetch(5)
Parameters

fetch – The number of records to return. Fetch must be >= 0.

Returns

The result table.

filter(predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶

Filters out elements that don’t pass the filter predicate. Similar to a SQL WHERE clause.

Example:

>>> tab.filter(tab.name == 'Fred')
>>> tab.filter("name = 'Fred'")
Parameters

predicate – Predicate expression string.

Returns

The result table.

full_outer_join(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶

Joins two Table. Similar to a SQL full outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.

Note

Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).

Example:

>>> left.full_outer_join(right, left.a == right.b)
>>> left.full_outer_join(right, "a = b")
Parameters
  • right – Right table.

  • join_predicate – The join predicate expression string.

Returns

The result table.

get_schema() → pyflink.table.table_schema.TableSchema[source]¶

Returns the TableSchema of this table.

Returns

The schema of this table.

group_by(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.GroupedTable[source]¶

Groups the elements on some grouping keys. Use this before a selection with aggregations to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.

Example:

>>> tab.group_by(tab.key).select(tab.key, tab.value.avg)
>>> tab.group_by("key").select("key, value.avg")
Parameters

fields – Group keys.

Returns

The grouped table.

insert_into(table_path: str)[source]¶

Writes the Table to a TableSink that was registered under the specified name. For the path resolution algorithm see use_database().

Example:

>>> tab.insert_into("sink")
Parameters

table_path – The path of the registered TableSink to which the Table is written.

Note

Deprecated in 1.11. Use execute_insert() for single sink, use TableTableEnvironment`#:func:`create_statement_set for multiple sinks.

intersect(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶

Intersects two Table with duplicate records removed. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Similar to a SQL INTERSECT. The fields of the two intersect operations must fully overlap.

Note

Both tables must be bound to the same TableEnvironment.

Example:

>>> left.intersect(right)
Parameters

right – Right table.

Returns

The result table.

intersect_all(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶

Intersects two Table. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Similar to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.

Note

Both tables must be bound to the same TableEnvironment.

Example:

>>> left.intersect_all(right)
Parameters

right – Right table.

Returns

The result table.

join(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None)[source]¶

Joins two Table. Similar to a SQL join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary. You can use where and select clauses after a join to further specify the behaviour of the join.

Note

Both tables must be bound to the same TableEnvironment .

Example:

>>> left.join(right).where((left.a == right.b) && (left.c > 3))
>>> left.join(right).where("a = b && c > 3")
>>> left.join(right, left.a == right.b)
Parameters
  • right – Right table.

  • join_predicate – Optional, the join predicate expression string.

Returns

The result table.

join_lateral(table_function_call: Union[str, pyflink.table.expression.Expression], join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None) → pyflink.table.table.Table[source]¶

Joins this Table with an user-defined TableFunction. This join is similar to a SQL inner join but works with a table function. Each row of the table is joined with the rows produced by the table function.

Example:

 >>> t_env.create_java_temporary_system_function("split",
...     "java.table.function.class.name")
 >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")

 >>> from pyflink.table import expressions as expr
 >>> tab.join_lateral(expr.call('split', ' ').alias('b'), expr.col('a') == expr.col('b'))
Parameters
  • table_function_call – An expression representing a table function call.

  • join_predicate – Optional, The join predicate expression string, join ON TRUE if not exist.

Returns

The result Table.

left_outer_join(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None) → pyflink.table.table.Table[source]¶

Joins two Table. Similar to a SQL left outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.

Note

Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).

Example:

>>> left.left_outer_join(right)
>>> left.left_outer_join(right, left.a == right.b)
>>> left.left_outer_join(right, "a = b")
Parameters
  • right – Right table.

  • join_predicate – Optional, the join predicate expression string.

Returns

The result table.

left_outer_join_lateral(table_function_call: Union[str, pyflink.table.expression.Expression], join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]] = None) → pyflink.table.table.Table[source]¶

Joins this Table with an user-defined TableFunction. This join is similar to a SQL left outer join but works with a table function. Each row of the table is joined with all rows produced by the table function. If the join does not produce any row, the outer row is padded with nulls.

Example:

>>> t_env.create_java_temporary_system_function("split",
...     "java.table.function.class.name")
>>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
>>> from pyflink.table import expressions as expr
>>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b'))
Parameters
  • table_function_call – An expression representing a table function call.

  • join_predicate – Optional, The join predicate expression string, join ON TRUE if not exist.

Returns

The result Table.

limit(fetch: int, offset: int = 0) → pyflink.table.table.Table[source]¶

Limits a (possibly sorted) result to the first n rows.

This method is a synonym for offset() followed by fetch().

Example:

Returns the first 3 records.

>>> tab.limit(3)

Skips the first 10 rows and returns the next 5 rows.

>>> tab.limit(5, 10)
Parameters
  • fetch – the first number of rows to fetch.

  • offset – the number of records to skip, default 0.

Returns

The result table.

minus(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶

Minus of two Table with duplicate records removed. Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.

Note

Both tables must be bound to the same TableEnvironment.

Example:

>>> left.minus(right)
Parameters

right – Right table.

Returns

The result table.

minus_all(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶

Minus of two Table. Similar to a SQL EXCEPT ALL. Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.

Note

Both tables must be bound to the same TableEnvironment.

Example:

>>> left.minus_all(right)
Parameters

right – Right table.

Returns

The result table.

offset(offset: int) → pyflink.table.table.Table[source]¶

Limits a (possibly sorted) result from an offset position.

This method can be combined with a preceding order_by() call for a deterministic order and a subsequent fetch() call to return n rows after skipping the first o rows.

Example:

# skips the first 3 rows and returns all following rows.
>>> tab.order_by(tab.name.desc).offset(3)
>>> tab.order_by("name.desc").offset(3)
# skips the first 10 rows and returns the next 5 rows.
>>> tab.order_by(tab.name.desc).offset(10).fetch(5)

For unbounded tables, this operation requires a subsequent fetch operation.

Parameters

offset – Number of records to skip.

Returns

The result table.

order_by(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Sorts the given Table. Similar to SQL ORDER BY. The resulting Table is sorted globally sorted across all parallel partitions.

Example:

>>> tab.order_by(tab.name.desc)
>>> tab.order_by("name.desc")

For unbounded tables, this operation requires a sorting on a time attribute or a subsequent fetch operation.

Parameters

fields – Order fields expression string.

Returns

The result table.

over_window(*over_windows: pyflink.table.window.OverWindow) → pyflink.table.table.OverWindowedTable[source]¶

Defines over-windows on the records of a table.

An over-window defines for each record an interval of records over which aggregation functions can be computed.

Example:

>>> from pyflink.table import expressions as expr
>>> tab.over_window(Over.partition_by(tab.c).order_by(tab.rowtime) \
...     .preceding(lit(10).seconds).alias("ow")) \
...     .select(tab.c, tab.b.count.over(col('ow'), tab.e.sum.over(col('ow'))))

Note

Computing over window aggregates on a streaming table is only a parallel operation if the window is partitioned. Otherwise, the whole stream will be processed by a single task, i.e., with parallelism 1.

Note

Over-windows for batch tables are currently not supported.

Parameters

over_windows – over windows created from Over.

Returns

A over windowed table.

print_schema()[source]¶

Prints the schema of this table to the console in a tree format.

rename_columns(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Renames existing columns. Similar to a field alias statement. The field expressions should be alias expressions, and only the existing fields can be renamed.

Example:

>>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1'))
>>> tab.rename_columns("a as a1, b as b1")
Parameters

fields – Column list string.

Returns

The result table.

right_outer_join(right: pyflink.table.table.Table, join_predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶

Joins two Table. Similar to a SQL right outer join. The fields of the two joined operations must not overlap, use alias() to rename fields if necessary.

Note

Both tables must be bound to the same TableEnvironment and its TableConfig must have null check enabled (default).

Example:

>>> left.right_outer_join(right, left.a == right.b)
>>> left.right_outer_join(right, "a = b")
Parameters
  • right – Right table.

  • join_predicate – The join predicate expression string.

Returns

The result table.

select(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Performs a selection operation. Similar to a SQL SELECT statement. The field expressions can contain complex expressions.

Example:

>>> from pyflink.table import expressions as expr
>>> tab.select(tab.key, expr.concat(tab.value, 'hello'))
>>> tab.select(expr.col('key'), expr.concat(expr.col('value'), 'hello'))

>>> tab.select("key, value + 'hello'")
Returns

The result table.

to_pandas()[source]¶

Converts the table to a pandas DataFrame. It will collect the content of the table to the client side and so please make sure that the content of the table could fit in memory before calling this method.

Example:

>>> pdf = pd.DataFrame(np.random.rand(1000, 2))
>>> table = table_env.from_pandas(pdf, ["a", "b"])
>>> table.filter(table.a > 0.5).to_pandas()
Returns

the result pandas DataFrame.

New in version 1.11.0.

union(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶

Unions two Table with duplicate records removed. Similar to a SQL UNION. The fields of the two union operations must fully overlap.

Note

Both tables must be bound to the same TableEnvironment.

Example:

>>> left.union(right)
Parameters

right – Right table.

Returns

The result table.

union_all(right: pyflink.table.table.Table) → pyflink.table.table.Table[source]¶

Unions two Table. Similar to a SQL UNION ALL. The fields of the two union operations must fully overlap.

Note

Both tables must be bound to the same TableEnvironment.

Example:

>>> left.union_all(right)
Parameters

right – Right table.

Returns

The result table.

where(predicate: Union[str, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.table.Table[source]¶

Filters out elements that don’t pass the filter predicate. Similar to a SQL WHERE clause.

Example:

>>> tab.where(tab.name == 'Fred')
>>> tab.where("name = 'Fred'")
Parameters

predicate – Predicate expression string.

Returns

The result table.

window(window: pyflink.table.window.GroupWindow) → pyflink.table.table.GroupWindowedTable[source]¶

Defines group window on the records of a table.

A group window groups the records of a table by assigning them to windows defined by a time or row interval.

For streaming tables of infinite size, grouping into windows is required to define finite groups on which group-based aggregates can be computed.

For batch tables of finite size, windowing essentially provides shortcuts for time-based groupBy.

Note

Computing windowed aggregates on a streaming table is only a parallel operation if additional grouping attributes are added to the group_by() clause. If the group_by() only references a GroupWindow alias, the streamed table will be processed by a single task, i.e., with parallelism 1.

Example:

>>> from pyflink.table import expressions as expr
>>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \
...     .group_by(col('w')) \
...     .select(tab.a.sum.alias('a'),
...             col('w').start.alias('b'),
...             col('w').end.alias('c'),
...             col('w').rowtime.alias('d'))
Parameters

window – A GroupWindow created from Tumble, Session or Slide.

Returns

A group windowed table.

class pyflink.table.TableConfig(j_table_config=None)[source]¶

Bases: object

Configuration for the current TableEnvironment session to adjust Table & SQL API programs.

For common or important configuration options, this class provides getters and setters methods with detailed inline documentation.

For more advanced configuration, users can directly access the underlying key-value map via get_configuration(). Currently, key-value options are only supported for the Blink planner.

Note

Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment.

add_configuration(configuration: pyflink.common.configuration.Configuration)[source]¶

Adds the given key-value configuration to the underlying configuration. It overwrites existing keys.

Parameters

configuration – Key-value configuration to be added.

get_configuration() → pyflink.common.configuration.Configuration[source]¶

Gives direct access to the underlying key-value map for advanced configuration.

Returns

Entire key-value configuration.

get_decimal_context() → Tuple[int, str][source]¶

Returns current context for decimal division calculation, (precision=34, rounding_mode=HALF_EVEN) by default.

See also

set_decimal_context()

Returns

the current context for decimal division calculation.

static get_default() → pyflink.table.table_config.TableConfig[source]¶
Returns

A TableConfig object with default settings.

get_idle_state_retention() → datetime.timedelta[source]¶
Returns

The duration until state which was not updated will be retained.

get_local_timezone() → str[source]¶

Returns the local timezone id for timestamp with local time zone, either an abbreviation such as “PST”, a full name such as “America/Los_Angeles”, or a custom timezone_id such as “GMT-8:00”.

get_max_generated_code_length() → int[source]¶

The current threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default is 64000.

get_max_idle_state_retention_time() → int[source]¶

State will be cleared and removed if it was not updated for the defined period of time.

Note

Currently the concept of min/max idle state retention has been deprecated and only idle state retention time is supported. The min idle state retention is regarded as idle state retention and the max idle state retention is derived from idle state retention as 1.5 x idle state retention.

Returns

The maximum time until state which was not updated will be retained.

get_min_idle_state_retention_time() → int[source]¶

State might be cleared and removed if it was not updated for the defined period of time.

Note

Currently the concept of min/max idle state retention has been deprecated and only idle state retention time is supported. The min idle state retention is regarded as idle state retention and the max idle state retention is derived from idle state retention as 1.5 x idle state retention.

Returns

The minimum time until state which was not updated will be retained.

get_null_check() → bool[source]¶

A boolean value, “True” enables NULL check and “False” disables NULL check.

get_python_executable() → str[source]¶

Gets the path of the python interpreter which is used to execute the python udf workers. If no path is specified before, it will return a None value.

Returns

The path of the python interpreter which is used to execute the python udf workers.

New in version 1.10.0.

get_sql_dialect() → pyflink.table.sql_dialect.SqlDialect[source]¶

Returns the current SQL dialect.

set_decimal_context(precision: int, rounding_mode: str)[source]¶

Sets the default context for decimal division calculation. (precision=34, rounding_mode=HALF_EVEN) by default.

The precision is the number of digits to be used for an operation. A value of 0 indicates that unlimited precision (as many digits as are required) will be used. Note that leading zeros (in the coefficient of a number) are never significant.

The rounding mode is the rounding algorithm to be used for an operation. It could be:

UP, DOWN, CEILING, FLOOR, HALF_UP, HALF_DOWN, HALF_EVEN, UNNECESSARY

The table below shows the results of rounding input to one digit with the given rounding mode:

Input

UP

DOWN

CEILING

FLOOR

HALF_UP

HALF_DOWN

HALF_EVEN

UNNECESSARY

5.5

6

5

6

5

6

5

6

Exception

2.5

3

2

3

2

3

2

2

Exception

1.6

2

1

2

1

2

2

2

Exception

1.1

2

1

2

1

1

1

1

Exception

1.0

1

1

1

1

1

1

1

1

-1.0

-1

-1

-1

-1

-1

-1

-1

-1

-1.1

-2

-1

-1

-2

-1

-1

-1

Exception

-1.6

-2

-1

-1

-2

-2

-2

-2

Exception

2.5

-3

-2

-2

-3

-3

-2

-2

Exception

5.5

-6

-5

-5

-6

-6

-5

-6

Exception

Parameters
  • precision – The precision of the decimal context.

  • rounding_mode – The rounding mode of the decimal context.

set_idle_state_retention(duration: datetime.timedelta)[source]¶

Specifies a retention time interval for how long idle state, i.e., state which was not updated, will be retained.

State will never be cleared until it was idle for less than the duration and will never be kept if it was idle for more than the 1.5 x duration.

When new data arrives for previously cleaned-up state, the new data will be handled as if it was the first data. This can result in previous results being overwritten.

Set to 0 (zero) to never clean-up the state.

Example:

>>> table_config = TableConfig() \
...     .set_idle_state_retention(datetime.timedelta(days=1))

Note

Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of minTime and maxTime. The difference between minTime and maxTime must be at least 5 minutes.

Parameters

duration – The retention time interval for which idle state is retained. Set to 0 (zero) to never clean-up the state.

set_idle_state_retention_time(min_time: datetime.timedelta, max_time: datetime.timedelta)[source]¶

Specifies a minimum and a maximum time interval for how long idle state, i.e., state which was not updated, will be retained.

State will never be cleared until it was idle for less than the minimum time and will never be kept if it was idle for more than the maximum time.

When new data arrives for previously cleaned-up state, the new data will be handled as if it was the first data. This can result in previous results being overwritten.

Set to 0 (zero) to never clean-up the state.

Example:

>>> table_config = TableConfig() \
...     .set_idle_state_retention_time(datetime.timedelta(days=1),
...                                    datetime.timedelta(days=3))

Note

Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of minTime and maxTime. The difference between minTime and maxTime must be at least 5 minutes.

Method set_idle_state_retention_time is deprecated now. The suggested way to set idle state retention time is set_idle_state_retention() Currently, setting max_time will not work and the max_time is directly derived from the min_time as 1.5 x min_time.

Parameters
  • min_time – The minimum time interval for which idle state is retained. Set to 0 (zero) to never clean-up the state.

  • max_time – The maximum time interval for which idle state is retained. Must be at least 5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.

set_local_timezone(timezone_id: str)[source]¶

Sets the local timezone id for timestamp with local time zone.

Parameters

timezone_id – The timezone id, either an abbreviation such as “PST”, a full name such as “America/Los_Angeles”, or a custom timezone_id such as “GMT-8:00”.

set_max_generated_code_length(max_generated_code_length: int)[source]¶

Returns the current threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default is 64000.

set_null_check(null_check: bool)[source]¶

Sets the NULL check. If enabled, all fields need to be checked for NULL first.

set_python_executable(python_exec: str)[source]¶

Sets the path of the python interpreter which is used to execute the python udf workers.

e.g. “/usr/local/bin/python3”.

If python UDF depends on a specific python version which does not exist in the cluster, the method pyflink.table.TableEnvironment.add_python_archive() can be used to upload a virtual environment. The path of the python interpreter contained in the uploaded environment can be specified via this method.

Example:

# command executed in shell
# assume that the relative path of python interpreter is py_env/bin/python
$ zip -r py_env.zip py_env

# python code
>>> table_env.add_python_archive("py_env.zip")
>>> table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")

Note

Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.5 or higher.

Note

The python udf worker depends on Apache Beam (version == 2.23.0). Please ensure that the specified environment meets the above requirements.

Parameters

python_exec – The path of python interpreter.

New in version 1.10.0.

set_sql_dialect(sql_dialect: pyflink.table.sql_dialect.SqlDialect)[source]¶

Sets the current SQL dialect to parse a SQL query. Flink’s SQL behavior by default.

Parameters

sql_dialect – The given SQL dialect.

class pyflink.table.TableEnvironment(j_tenv, serializer=PickleSerializer())[source]¶

Bases: object

A table environment is the base class, entry point, and central context for creating Table and SQL API programs.

It is unified for bounded and unbounded data processing.

A table environment is responsible for:

  • Connecting to external systems.

  • Registering and retrieving Table and other meta objects from a catalog.

  • Executing SQL statements.

  • Offering further configuration options.

The path in methods such as create_temporary_view() should be a proper SQL identifier. The syntax is following [[catalog-name.]database-name.]object-name, where the catalog name and database are optional. For path resolution see use_catalog() and use_database(). All keywords or other special characters need to be escaped.

Example: cat.1.`db`.`Table` resolves to an object named ‘Table’ (table is a reserved keyword, thus must be escaped) in a catalog named ‘cat.1’ and database named ‘db’.

Note

This environment is meant for pure table programs. If you would like to convert from or to other Flink APIs, it might be necessary to use one of the available language-specific table environments in the corresponding bridging modules.

add_python_archive(archive_path: str, target_dir: str = None)[source]¶

Adds a python archive file. The file will be extracted to the working directory of python UDF worker.

If the parameter “target_dir” is specified, the archive file will be extracted to a directory named ${target_dir}. Otherwise, the archive file will be extracted to a directory with the same name of the archive file.

If python UDF depends on a specific python version which does not exist in the cluster, this method can be used to upload the virtual environment. Note that the path of the python interpreter contained in the uploaded environment should be specified via the method pyflink.table.TableConfig.set_python_executable().

The files uploaded via this method are also accessible in UDFs via relative path.

Example:

# command executed in shell
# assert the relative path of python interpreter is py_env/bin/python
$ zip -r py_env.zip py_env

# python code
>>> table_env.add_python_archive("py_env.zip")
>>> table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")

# or
>>> table_env.add_python_archive("py_env.zip", "myenv")
>>> table_env.get_config().set_python_executable("myenv/py_env/bin/python")

# the files contained in the archive file can be accessed in UDF
>>> def my_udf():
...     with open("myenv/py_env/data/data.txt") as f:
...         ...

Note

Please make sure the uploaded python environment matches the platform that the cluster is running on and that the python version must be 3.5 or higher.

Note

Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc. The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.

Parameters
  • archive_path – The archive file path.

  • target_dir – Optional, the target dir name that the archive file extracted to.

New in version 1.10.0.

add_python_file(file_path: str)[source]¶

Adds a python dependency which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker. Please make sure that these dependencies can be imported.

Parameters

file_path – The path of the python dependency.

New in version 1.10.0.

abstract connect(connector_descriptor: pyflink.table.descriptors.ConnectorDescriptor) → pyflink.table.descriptors.ConnectTableDescriptor[source]¶

Creates a temporary table from a descriptor.

Descriptors allow for declaring the communication to external systems in an implementation-agnostic way. The classpath is scanned for suitable table factories that match the desired configuration.

The following example shows how to read from a connector using a JSON format and registering a temporary table as “MyTable”:

Example:

>>> table_env \
...     .connect(ExternalSystemXYZ()
...              .version("0.11")) \
...     .with_format(Json()
...                  .json_schema("{...}")
...                  .fail_on_missing_field(False)) \
...     .with_schema(Schema()
...                  .field("user-name", "VARCHAR")
...                  .from_origin_field("u_name")
...                  .field("count", "DECIMAL")) \
...     .create_temporary_table("MyTable")
Parameters

connector_descriptor – Connector descriptor describing the external system.

Returns

A ConnectTableDescriptor used to build the temporary table.

Note

Deprecated in 1.11. Use execute_sql() to register a table instead.

create_java_function(path: str, function_class_name: str, ignore_if_exists: bool = None)[source]¶

Registers a java user defined function class as a catalog function in the given path.

Compared to system functions with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.

There must not be another function (temporary or permanent) registered under the same path.

Example:

>>> table_env.create_java_function("func", "java.user.defined.function.class.name")
Parameters
  • path – The path under which the function will be registered. See also the TableEnvironment class description for the format of the path.

  • function_class_name – The java full qualified class name of the function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.

  • ignore_if_exists – If a function exists under the given path and this flag is set, no operation is executed. An exception is thrown otherwise.

New in version 1.12.0.

create_java_temporary_function(path: str, function_class_name: str)[source]¶

Registers a java user defined function class as a temporary catalog function.

Compared to .. seealso:: create_java_temporary_system_function() with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.

Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary function.

Example:

>>> table_env.create_java_temporary_function("func",
...     "java.user.defined.function.class.name")
Parameters
  • path – The path under which the function will be registered. See also the TableEnvironment class description for the format of the path.

  • function_class_name – The java full qualified class name of the function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.

New in version 1.12.0.

create_java_temporary_system_function(name: str, function_class_name: str)[source]¶

Registers a java user defined function class as a temporary system function.

Compared to .. seealso:: create_java_temporary_function(), system functions are identified by a global name that is independent of the current catalog and current database. Thus, this method allows to extend the set of built-in system functions like TRIM, ABS, etc.

Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary system function.

Example:

>>> table_env.create_java_temporary_system_function("func",
...     "java.user.defined.function.class.name")
Parameters
  • name – The name under which the function will be registered globally.

  • function_class_name – The java full qualified class name of the function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.

New in version 1.12.0.

create_statement_set() → pyflink.table.statement_set.StatementSet[source]¶

Create a StatementSet instance which accepts DML statements or Tables, the planner can optimize all added statements and Tables together and then submit as one job.

:return statement_set instance

New in version 1.11.0.

create_temporary_function(path: str, function: Union[pyflink.table.udf.UserDefinedFunctionWrapper, pyflink.table.udf.AggregateFunction])[source]¶

Registers a python user defined function class as a temporary catalog function.

Compared to .. seealso:: create_temporary_system_function() with a globally defined name, catalog functions are always (implicitly or explicitly) identified by a catalog and database.

Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary function.

Example:

>>> table_env.create_temporary_function(
...     "add_one", udf(lambda i: i + 1, result_type=DataTypes.BIGINT()))

>>> @udf(result_type=DataTypes.BIGINT())
... def add(i, j):
...     return i + j
>>> table_env.create_temporary_function("add", add)

>>> class SubtractOne(ScalarFunction):
...     def eval(self, i):
...         return i - 1
>>> table_env.create_temporary_function(
...     "subtract_one", udf(SubtractOne(), result_type=DataTypes.BIGINT()))
Parameters
  • path – The path under which the function will be registered. See also the TableEnvironment class description for the format of the path.

  • function – The function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.

New in version 1.12.0.

create_temporary_system_function(name: str, function: Union[pyflink.table.udf.UserDefinedFunctionWrapper, pyflink.table.udf.AggregateFunction])[source]¶

Registers a python user defined function class as a temporary system function.

Compared to .. seealso:: create_temporary_function(), system functions are identified by a global name that is independent of the current catalog and current database. Thus, this method allows to extend the set of built-in system functions like TRIM, ABS, etc.

Temporary functions can shadow permanent ones. If a permanent function under a given name exists, it will be inaccessible in the current session. To make the permanent function available again one can drop the corresponding temporary system function.

Example:

>>> table_env.create_temporary_system_function(
...     "add_one", udf(lambda i: i + 1, result_type=DataTypes.BIGINT()))

>>> @udf(result_type=DataTypes.BIGINT())
... def add(i, j):
...     return i + j
>>> table_env.create_temporary_system_function("add", add)

>>> class SubtractOne(ScalarFunction):
...     def eval(self, i):
...         return i - 1
>>> table_env.create_temporary_system_function(
...     "subtract_one", udf(SubtractOne(), result_type=DataTypes.BIGINT()))
Parameters
  • name – The name under which the function will be registered globally.

  • function – The function class containing the implementation. The function must have a public no-argument constructor and can be founded in current Java classloader.

New in version 1.12.0.

create_temporary_view(view_path: str, table: pyflink.table.table.Table)[source]¶

Registers a Table API object as a temporary view similar to SQL temporary views.

Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.

Parameters
  • view_path – The path under which the view will be registered. See also the TableEnvironment class description for the format of the path.

  • table – The view to register.

New in version 1.10.0.

drop_function(path: str) → bool[source]¶

Drops a catalog function registered in the given path.

Parameters

path – The path under which the function will be registered. See also the TableEnvironment class description for the format of the path.

Returns

true if a function existed in the given path and was removed.

New in version 1.12.0.

drop_temporary_function(path: str) → bool[source]¶

Drops a temporary system function registered under the given name.

If a permanent function with the given name exists, it will be used from now on for any queries that reference this name.

Parameters

path – The path under which the function will be registered. See also the TableEnvironment class description for the format of the path.

Returns

true if a function existed in the given path and was removed.

New in version 1.12.0.

drop_temporary_system_function(name: str) → bool[source]¶

Drops a temporary system function registered under the given name.

If a permanent function with the given name exists, it will be used from now on for any queries that reference this name.

Parameters

name – The name under which the function has been registered globally.

Returns

true if a function existed under the given name and was removed.

New in version 1.12.0.

drop_temporary_table(table_path: str) → bool[source]¶

Drops a temporary table registered in the given path.

If a permanent table with a given path exists, it will be used from now on for any queries that reference this path.

Parameters

table_path – The path of the registered temporary table.

Returns

True if a table existed in the given path and was removed.

New in version 1.10.0.

drop_temporary_view(view_path: str) → bool[source]¶

Drops a temporary view registered in the given path.

If a permanent table or view with a given path exists, it will be used from now on for any queries that reference this path.

Returns

True if a view existed in the given path and was removed.

New in version 1.10.0.

execute(job_name: str) → pyflink.common.job_execution_result.JobExecutionResult[source]¶

Triggers the program execution. The environment will execute all parts of the program.

The program execution will be logged and displayed with the provided name.

Note

It is highly advised to set all parameters in the TableConfig on the very beginning of the program. It is undefined what configurations values will be used for the execution if queries are mixed with config changes. It depends on the characteristic of the particular parameter. For some of them the value from the point in time of query construction (e.g. the current catalog) will be used. On the other hand some values might be evaluated according to the state from the time when this method is called (e.g. timezone).

Parameters

job_name – Desired name of the job.

Returns

The result of the job execution, containing elapsed time and accumulators.

Note

Deprecated in 1.11. Use execute_sql() for single sink, use create_statement_set() for multiple sinks.

execute_sql(stmt: str) → pyflink.table.table_result.TableResult[source]¶

Execute the given single statement, and return the execution result.

The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. For DML and DQL, this method returns TableResult once the job has been submitted. For DDL and DCL statements, TableResult is returned once the operation has finished.

:return content for DQL/SHOW/DESCRIBE/EXPLAIN,

the affected row count for DML (-1 means unknown), or a string message (“OK”) for other statements.

New in version 1.11.0.

explain(table: pyflink.table.table.Table = None, extended: bool = False) → str[source]¶

Returns the AST of the specified Table API and SQL queries and the execution plan to compute the result of the given Table or multi-sinks plan.

Parameters
  • table – The table to be explained. If table is None, explain for multi-sinks plan, else for given table.

  • extended – If the plan should contain additional properties. e.g. estimated cost, traits

Returns

The table for which the AST and execution plan will be returned.

Note

Deprecated in 1.11. Use Table`#:func:`explain instead.

explain_sql(stmt: str, *extra_details: pyflink.table.explain_detail.ExplainDetail) → str[source]¶

Returns the AST of the specified statement and the execution plan.

Parameters
  • stmt – The statement for which the AST and execution plan will be returned.

  • extra_details – The extra explain details which the explain result should include, e.g. estimated cost, changelog mode for streaming

Returns

The statement for which the AST and execution plan will be returned.

New in version 1.11.0.

from_elements(elements: Iterable, schema: Union[pyflink.table.types.DataType, List[str]] = None, verify_schema: bool = True) → pyflink.table.table.Table[source]¶

Creates a table from a collection of elements. The elements types must be acceptable atomic types or acceptable composite types. All elements must be of the same type. If the elements types are composite types, the composite types must be strictly equal, and its subtypes must also be acceptable types. e.g. if the elements are tuples, the length of the tuples must be equal, the element types of the tuples must be equal in order.

The built-in acceptable atomic element types contains:

int, long, str, unicode, bool, float, bytearray, datetime.date, datetime.time, datetime.datetime, datetime.timedelta, decimal.Decimal

The built-in acceptable composite element types contains:

list, tuple, dict, array, Row

If the element type is a composite type, it will be unboxed. e.g. table_env.from_elements([(1, ‘Hi’), (2, ‘Hello’)]) will return a table like:

_1

_2

1

Hi

2

Hello

“_1” and “_2” are generated field names.

Example:

# use the second parameter to specify custom field names
>>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
# use the second parameter to specify custom table schema
>>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
...                         DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
...                                        DataTypes.FIELD("b", DataTypes.STRING())]))
# use the thrid parameter to switch whether to verify the elements against the schema
>>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
...                         DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
...                                        DataTypes.FIELD("b", DataTypes.STRING())]),
...                         False)
# create Table from expressions
>>> table_env.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
...                         DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
...                                        DataTypes.FIELD("b", DataTypes.STRING()),
...                                        DataTypes.FIELD("c", DataTypes.FLOAT())]))
Parameters
  • elements – The elements to create a table from.

  • schema – The schema of the table.

  • verify_schema – Whether to verify the elements against the schema.

Returns

The result table.

from_pandas(pdf, schema: Union[pyflink.table.types.RowType, List[str], Tuple[str], List[pyflink.table.types.DataType], Tuple[pyflink.table.types.DataType]] = None, splits_num: int = 1) → pyflink.table.table.Table[source]¶

Creates a table from a pandas DataFrame.

Example:

>>> pdf = pd.DataFrame(np.random.rand(1000, 2))
# use the second parameter to specify custom field names
>>> table_env.from_pandas(pdf, ["a", "b"])
# use the second parameter to specify custom field types
>>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()]))
# use the second parameter to specify custom table schema
>>> table_env.from_pandas(pdf,
...                       DataTypes.ROW([DataTypes.FIELD("a", DataTypes.DOUBLE()),
...                                      DataTypes.FIELD("b", DataTypes.DOUBLE())]))
Parameters
  • pdf – The pandas DataFrame.

  • schema – The schema of the converted table.

  • splits_num – The number of splits the given Pandas DataFrame will be split into. It determines the number of parallel source tasks. If not specified, the default parallelism will be used.

Returns

The result table.

New in version 1.11.0.

from_path(path: str) → pyflink.table.table.Table[source]¶

Reads a registered table and returns the resulting Table.

A table to scan must be registered in the TableEnvironment.

See the documentation of use_database() or use_catalog() for the rules on the path resolution.

Examples:

Reading a table from default catalog and database.

>>> tab = table_env.from_path("tableName")

Reading a table from a registered catalog.

>>> tab = table_env.from_path("catalogName.dbName.tableName")

Reading a table from a registered catalog with escaping. (Table is a reserved keyword). Dots in e.g. a database name also must be escaped.

>>> tab = table_env.from_path("catalogName.`db.Name`.`Table`")
Parameters

path – The path of a table API object to scan.

Returns

Either a table or virtual table (=view).

See also

use_catalog()

See also

use_database()

New in version 1.10.0.

from_table_source(table_source: pyflink.table.sources.TableSource) → pyflink.table.table.Table[source]¶

Creates a table from a table source.

Example:

>>> csv_table_source = CsvTableSource(
...     csv_file_path, ['a', 'b'], [DataTypes.STRING(), DataTypes.BIGINT()])
>>> table_env.from_table_source(csv_table_source)
Parameters

table_source – The table source used as table.

Returns

The result table.

get_catalog(catalog_name: str) → pyflink.table.catalog.Catalog[source]¶

Gets a registered Catalog by name.

Parameters

catalog_name – The name to look up the Catalog.

Returns

The requested catalog, None if there is no registered catalog with given name.

get_config() → pyflink.table.table_config.TableConfig[source]¶

Returns the table config to define the runtime behavior of the Table API.

Returns

Current table config.

get_current_catalog() → str[source]¶

Gets the current default catalog name of the current session.

Returns

The current default catalog name that is used for the path resolution.

See also

use_catalog()

get_current_database() → str[source]¶

Gets the current default database name of the running session.

Returns

The name of the current database of the current catalog.

See also

use_database()

insert_into(target_path: str, table: pyflink.table.table.Table)[source]¶

Instructs to write the content of a Table API object into a table.

See the documentation of use_database() or use_catalog() for the rules on the path resolution.

Example:

>>> tab = table_env.scan("tableName")
>>> table_env.insert_into("sink", tab)
Parameters
  • target_path – The path of the registered TableSink to which the Table is written.

  • table – The Table to write to the sink.

Changed in version 1.10.0: The signature is changed, e.g. the parameter table_path_continued was removed and the parameter target_path is moved before the parameter table.

Note

Deprecated in 1.11. Use execute_insert() for single sink, use create_statement_set() for multiple sinks.

list_catalogs() → List[str][source]¶

Gets the names of all catalogs registered in this environment.

Returns

List of catalog names.

list_databases() → List[str][source]¶

Gets the names of all databases in the current catalog.

Returns

List of database names in the current catalog.

list_functions() → List[str][source]¶

Gets the names of all functions in this environment.

Returns

List of the names of all functions in this environment.

New in version 1.10.0.

list_modules() → List[str][source]¶

Gets the names of all modules registered in this environment.

Returns

List of module names.

New in version 1.10.0.

list_tables() → List[str][source]¶

Gets the names of all tables and views in the current database of the current catalog. It returns both temporary and permanent tables and views.

Returns

List of table and view names in the current database of the current catalog.

list_temporary_tables() → List[str][source]¶

Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog).

Returns

A list of the names of all registered temporary tables and views in the current database of the current catalog.

See also

list_tables()

New in version 1.10.0.

list_temporary_views() → List[str][source]¶

Gets the names of all temporary views available in the current namespace (the current database of the current catalog).

Returns

A list of the names of all registered temporary views in the current database of the current catalog.

See also

list_tables()

New in version 1.10.0.

list_user_defined_functions() → List[str][source]¶

Gets the names of all user defined functions registered in this environment.

Returns

List of the names of all user defined functions registered in this environment.

list_views() → List[str][source]¶

Gets the names of all views in the current database of the current catalog. It returns both temporary and permanent views.

Returns

List of view names in the current database of the current catalog.

New in version 1.11.0.

load_module(module_name: str, module: pyflink.table.module.Module)[source]¶

Loads a Module under a unique name. Modules will be kept in the loaded order. ValidationException is thrown when there is already a module with the same name.

Parameters
  • module_name – Name of the Module.

  • module – The module instance.

New in version 1.12.0.

register_catalog(catalog_name: str, catalog: pyflink.table.catalog.Catalog)[source]¶

Registers a Catalog under a unique name. All tables registered in the Catalog can be accessed.

Parameters
  • catalog_name – The name under which the catalog will be registered.

  • catalog – The catalog to register.

register_function(name: str, function: pyflink.table.udf.UserDefinedFunctionWrapper)[source]¶

Registers a python user-defined function under a unique name. Replaces already existing user-defined function under this name.

Example:

>>> table_env.register_function(
...     "add_one", udf(lambda i: i + 1, result_type=DataTypes.BIGINT()))

>>> @udf(result_type=DataTypes.BIGINT())
... def add(i, j):
...     return i + j
>>> table_env.register_function("add", add)

>>> class SubtractOne(ScalarFunction):
...     def eval(self, i):
...         return i - 1
>>> table_env.register_function(
...     "subtract_one", udf(SubtractOne(), result_type=DataTypes.BIGINT()))
Parameters
  • name – The name under which the function is registered.

  • function – The python user-defined function to register.

New in version 1.10.0.

Note

Deprecated in 1.12. Use create_temporary_system_function() instead.

register_java_function(name: str, function_class_name: str)[source]¶

Registers a java user defined function under a unique name. Replaces already existing user-defined functions under this name. The acceptable function type contains ScalarFunction, TableFunction and AggregateFunction.

Example:

>>> table_env.register_java_function("func1", "java.user.defined.function.class.name")
Parameters
  • name – The name under which the function is registered.

  • function_class_name – The java full qualified class name of the function to register. The function must have a public no-argument constructor and can be founded in current Java classloader.

Note

Deprecated in 1.12. Use create_java_temporary_system_function() instead.

register_table(name: str, table: pyflink.table.table.Table)[source]¶

Registers a Table under a unique name in the TableEnvironment’s catalog. Registered tables can be referenced in SQL queries.

Example:

>>> tab = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
>>> table_env.register_table("source", tab)
Parameters
  • name – The name under which the table will be registered.

  • table – The table to register.

Note

Deprecated in 1.10. Use create_temporary_view() instead.

register_table_sink(name: str, table_sink: pyflink.table.sinks.TableSink)[source]¶

Registers an external TableSink with given field names and types in this TableEnvironment’s catalog. Registered sink tables can be referenced in SQL DML statements.

Example:

>>> table_env.register_table_sink("sink",
...                               CsvTableSink(["a", "b"],
...                                            [DataTypes.INT(),
...                                             DataTypes.STRING()],
...                                            "./2.csv"))
Parameters
  • name – The name under which the table sink is registered.

  • table_sink – The table sink to register.

Note

Deprecated in 1.10. Use execute_sql() instead.

register_table_source(name: str, table_source: pyflink.table.sources.TableSource)[source]¶

Registers an external TableSource in this TableEnvironment’s catalog. Registered tables can be referenced in SQL queries.

Example:

>>> table_env.register_table_source("source",
...                                 CsvTableSource("./1.csv",
...                                                ["a", "b"],
...                                                [DataTypes.INT(),
...                                                 DataTypes.STRING()]))
Parameters
  • name – The name under which the table source is registered.

  • table_source – The table source to register.

Note

Deprecated in 1.10. Use execute_sql() instead.

scan(*table_path: str) → pyflink.table.table.Table[source]¶

Scans a registered table and returns the resulting Table. A table to scan must be registered in the TableEnvironment. It can be either directly registered or be an external member of a Catalog.

See the documentation of use_database() or use_catalog() for the rules on the path resolution.

Examples:

Scanning a directly registered table

>>> tab = table_env.scan("tableName")

Scanning a table from a registered catalog

>>> tab = table_env.scan("catalogName", "dbName", "tableName")
Parameters

table_path – The path of the table to scan.

Throws

Exception if no table is found using the given table path.

Returns

The resulting table.

Note

Deprecated in 1.10. Use from_path() instead.

set_python_requirements(requirements_file_path: str, requirements_cache_dir: str = None)[source]¶

Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker.

For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter “requirements_cached_dir”. It will be uploaded to the cluster to support offline installation.

Example:

# commands executed in shell
$ echo numpy==1.16.5 > requirements.txt
$ pip download -d cached_dir -r requirements.txt --no-binary :all:

# python code
>>> table_env.set_python_requirements("requirements.txt", "cached_dir")

Note

Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of SetupTools (version >= 37.0.0).

Parameters
  • requirements_file_path – The path of “requirements.txt” file.

  • requirements_cache_dir – The path of the local directory which contains the installation packages.

New in version 1.10.0.

sql_query(query: str) → pyflink.table.table.Table[source]¶

Evaluates a SQL query on registered tables and retrieves the result as a Table.

All tables referenced by the query must be registered in the TableEnvironment.

A Table is automatically registered when its __str__() method is called, for example when it is embedded into a String.

Hence, SQL queries can directly reference a Table as follows:

>>> table = ...
# the table is not registered to the table environment
>>> table_env.sql_query("SELECT * FROM %s" % table)
Parameters

query – The sql query string.

Returns

The result table.

sql_update(stmt: str)[source]¶

Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement

Note

Currently only SQL INSERT statements and CREATE TABLE statements are supported.

All tables referenced by the query must be registered in the TableEnvironment. A Table is automatically registered when its __str__() method is called, for example when it is embedded into a String. Hence, SQL queries can directly reference a Table as follows:

# register the table sink into which the result is inserted.
>>> table_env.register_table_sink("sink_table", table_sink)
>>> source_table = ...
# source_table is not registered to the table environment
>>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)

A DDL statement can also be executed to create/drop a table: For example, the below DDL statement would create a CSV table named tbl1 into the current catalog:

create table tbl1(
    a int,
    b bigint,
    c varchar
) with (
    'connector.type' = 'filesystem',
    'format.type' = 'csv',
    'connector.path' = 'xxx'
)

SQL queries can directly execute as follows:

>>> source_ddl = \
... '''
... create table sourceTable(
...     a int,
...     b varchar
... ) with (
...     'connector.type' = 'kafka',
...     'update-mode' = 'append',
...     'connector.topic' = 'xxx',
...     'connector.properties.bootstrap.servers' = 'localhost:9092'
... )
... '''

>>> sink_ddl = \
... '''
... create table sinkTable(
...     a int,
...     b varchar
... ) with (
...     'connector.type' = 'filesystem',
...     'format.type' = 'csv',
...     'connector.path' = 'xxx'
... )
... '''

>>> query = "INSERT INTO sinkTable SELECT FROM sourceTable"
>>> table_env.sql(source_ddl)
>>> table_env.sql(sink_ddl)
>>> table_env.sql(query)
>>> table_env.execute("MyJob")
Parameters

stmt – The SQL statement to evaluate.

Note

Deprecated in 1.11. Use execute_sql() for single statement, use create_statement_set() for multiple DML statements.

unload_module(module_name: str)[source]¶

Unloads a Module with given name. ValidationException is thrown when there is no module with the given name.

Parameters

module_name – Name of the Module.

New in version 1.12.0.

use_catalog(catalog_name: str)[source]¶

Sets the current catalog to the given value. It also sets the default database to the catalog’s default one. See also use_database().

This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:

  • [current-catalog].[current-database].[requested-path]

  • [current-catalog].[requested-path]

  • [requested-path]

Example:

Given structure with default catalog set to default_catalog and default database set to default_database.

root:
  |- default_catalog
      |- default_database
          |- tab1
      |- db1
          |- tab1
  |- cat1
      |- db1
          |- tab1

The following table describes resolved paths:

Requested path

Resolved path

tab1

default_catalog.default_database.tab1

db1.tab1

default_catalog.db1.tab1

cat1.db1.tab1

cat1.db1.tab1

Parameters

catalog_name – The name of the catalog to set as the current default catalog.

Throws

CatalogException thrown if a catalog with given name could not be set as the default one.

See also

use_database()

use_database(database_name: str)[source]¶

Sets the current default database. It has to exist in the current catalog. That path will be used as the default one when looking for unqualified object names.

This is used during the resolution of object paths. Both the catalog and database are optional when referencing catalog objects such as tables, views etc. The algorithm looks for requested objects in following paths in that order:

  • [current-catalog].[current-database].[requested-path]

  • [current-catalog].[requested-path]

  • [requested-path]

Example:

Given structure with default catalog set to default_catalog and default database set to default_database.

root:
  |- default_catalog
      |- default_database
          |- tab1
      |- db1
          |- tab1
  |- cat1
      |- db1
          |- tab1

The following table describes resolved paths:

Requested path

Resolved path

tab1

default_catalog.default_database.tab1

db1.tab1

default_catalog.db1.tab1

cat1.db1.tab1

cat1.db1.tab1

Throws

CatalogException thrown if the given catalog and database could not be set as the default ones.

See also

use_catalog()

Parameters

database_name – The name of the database to set as the current database.

class pyflink.table.TableFunction[source]¶

Bases: pyflink.table.udf.UserDefinedFunction

Base interface for user-defined table function. A user-defined table function creates zero, one, or multiple rows to a new row value.

New in version 1.11.0.

abstract eval(*args)[source]¶

Method which defines the logic of the table function.

class pyflink.table.TableResult(j_table_result)[source]¶

Bases: object

A TableResult is the representation of the statement execution result.

New in version 1.11.0.

collect() → pyflink.table.table_result.CloseableIterator[source]¶

Get the result contents as a closeable row iterator.

Note:

For SELECT operation, the job will not be finished unless all result data has been collected. So we should actively close the job to avoid resource leak through CloseableIterator#close method. Calling CloseableIterator#close method will cancel the job and release related resources.

For DML operation, Flink does not support getting the real affected row count now. So the affected row count is always -1 (unknown) for every sink, and them will be returned until the job is finished. Calling CloseableIterator#close method will cancel the job.

For other operations, no flink job will be submitted (get_job_client() is always empty), and the result is bounded. Do noting when calling CloseableIterator#close method.

Recommended code to call CloseableIterator#close method looks like:

>>> table_result = t_env.execute("select ...")
>>> with table_result.collect() as results:
>>>    for result in results:
>>>        ...

In order to fetch result to local, you can call either collect() and print(). But, they can not be called both on the same TableResult instance.

Returns

A CloseableIterator.

New in version 1.12.0.

get_job_client() → Optional[pyflink.common.job_client.JobClient][source]¶

For DML and DQL statement, return the JobClient which associates the submitted Flink job. For other statements (e.g. DDL, DCL) return empty.

Returns

The job client, optional.

Return type

pyflink.common.JobClient

New in version 1.11.0.

get_result_kind() → pyflink.table.result_kind.ResultKind[source]¶

Return the ResultKind which represents the result type.

For DDL operation and USE operation, the result kind is always SUCCESS. For other operations, the result kind is always SUCCESS_WITH_CONTENT.

Returns

The result kind.

New in version 1.11.0.

get_table_schema() → pyflink.table.table_schema.TableSchema[source]¶

Get the schema of result.

The schema of DDL, USE, EXPLAIN:

+-------------+-------------+----------+
| column name | column type | comments |
+-------------+-------------+----------+
| result      | STRING      |          |
+-------------+-------------+----------+

The schema of SHOW:

+---------------+-------------+----------+
|  column name  | column type | comments |
+---------------+-------------+----------+
| <object name> | STRING      |          |
+---------------+-------------+----------+
The column name of `SHOW CATALOGS` is "catalog name",
the column name of `SHOW DATABASES` is "database name",
the column name of `SHOW TABLES` is "table name",
the column name of `SHOW VIEWS` is "view name",
the column name of `SHOW FUNCTIONS` is "function name".

The schema of DESCRIBE:

+------------------+-------------+-------------------------------------------------+
| column name      | column type |                 comments                        |
+------------------+-------------+-------------------------------------------------+
| name             | STRING      | field name                                      |
+------------------+-------------+-------------------------------------------------+
| type             | STRING      | field type expressed as a String                |
+------------------+-------------+-------------------------------------------------+
| null             | BOOLEAN     | field nullability: true if a field is nullable, |
|                  |             | else false                                      |
+------------------+-------------+-------------------------------------------------+
| key              | BOOLEAN     | key constraint: 'PRI' for primary keys,         |
|                  |             | 'UNQ' for unique keys, else null                |
+------------------+-------------+-------------------------------------------------+
| computed column  | STRING      | computed column: string expression              |
|                  |             | if a field is computed column, else null        |
+------------------+-------------+-------------------------------------------------+
| watermark        | STRING      | watermark: string expression if a field is      |
|                  |             | watermark, else null                            |
+------------------+-------------+-------------------------------------------------+

The schema of INSERT: (one column per one sink)

+----------------------------+-------------+-----------------------+
| column name                | column type | comments              |
+----------------------------+-------------+-----------------------+
| (name of the insert table) | BIGINT      | the insert table name |
+----------------------------+-------------+-----------------------+

The schema of SELECT is the selected field names and types.

Returns

The schema of result.

Return type

pyflink.table.TableSchema

New in version 1.11.0.

print()[source]¶

Print the result contents as tableau form to client console.

This method has slightly different behaviors under different checkpointing settings.

  • For batch jobs or streaming jobs without checkpointing, this method has neither exactly-once nor at-least-once guarantee. Query results are immediately accessible by the clients once they’re produced, but exceptions will be thrown when the job fails and restarts.

  • For streaming jobs with exactly-once checkpointing, this method guarantees an end-to-end exactly-once record delivery. A result will be accessible by clients only after its corresponding checkpoint completes.

  • For streaming jobs with at-least-once checkpointing, this method guarantees an end-to-end at-least-once record delivery. Query results are immediately accessible by the clients once they’re produced, but it is possible for the same result to be delivered multiple times.

New in version 1.11.0.

wait(timeout_ms: int = None)[source]¶

Wait if necessary for at most the given time (milliseconds) for the data to be ready.

For a select operation, this method will wait until the first row can be accessed locally. For an insert operation, this method will wait for the job to finish, because the result contains only one row. For other operations, this method will return immediately, because the result is already available locally.

New in version 1.12.0.

class pyflink.table.TableSchema(field_names: List[str] = None, data_types: List[pyflink.table.types.DataType] = None, j_table_schema=None)[source]¶

Bases: object

A table schema that represents a table’s structure with field names and data types.

class Builder[source]¶

Bases: object

Builder for creating a TableSchema.

build() → pyflink.table.table_schema.TableSchema[source]¶

Returns a TableSchema instance.

Returns

The TableSchema instance.

field(name: str, data_type: pyflink.table.types.DataType) → pyflink.table.table_schema.TableSchema.Builder[source]¶

Add a field with name and data type.

The call order of this method determines the order of fields in the schema.

Parameters
  • name – The field name.

  • data_type – The field data type.

Returns

This object.

classmethod builder()[source]¶
copy() → pyflink.table.table_schema.TableSchema[source]¶

Returns a deep copy of the table schema.

Returns

A deep copy of the table schema.

get_field_count() → int[source]¶

Returns the number of fields.

Returns

The number of fields.

get_field_data_type(field: Union[int, str]) → Optional[pyflink.table.types.DataType][source]¶

Returns the specified data type for the given field index or field name.

Parameters

field – The index of the field or the name of the field.

Returns

The data type of the specified field.

get_field_data_types() → List[pyflink.table.types.DataType][source]¶

Returns all field data types as a list.

Returns

A list of all field data types.

get_field_name(field_index: int) → Optional[str][source]¶

Returns the specified name for the given field index.

Parameters

field_index – The index of the field.

Returns

The field name.

get_field_names() → List[str][source]¶

Returns all field names as a list.

Returns

The list of all field names.

to_row_data_type() → pyflink.table.types.RowType[source]¶

Converts a table schema into a (nested) data type describing a pyflink.table.types.DataTypes.ROW().

Returns

The row data type.

class pyflink.table.TableSink(j_table_sink)[source]¶

Bases: object

A TableSink specifies how to emit a table to an external system or location.

class pyflink.table.TableSource(j_table_source)[source]¶

Bases: object

Defines a table from an external system or location.

class pyflink.table.UserDefinedType(nullable=True)[source]¶

Bases: pyflink.table.types.DataType

User-defined type (UDT).

Note

WARN: Flink Internal Use Only

deserialize(datum)[source]¶

Converts a SQL datum into a user-type object.

from_sql_type(obj)[source]¶

Converts an internal SQL object into a native Python object.

classmethod java_udt()[source]¶

The class name of the paired Java UDT (could be ‘’, if there is no corresponding one).

classmethod module()[source]¶

The Python module of the UDT.

need_conversion()[source]¶

Does this type need to conversion between Python object and internal SQL object.

This is used to avoid the unnecessary conversion for ArrayType/MultisetType/MapType/RowType.

serialize(obj)[source]¶

Converts the a user-type object into a SQL datum.

classmethod sql_type()[source]¶

Underlying SQL storage type for this UDT.

to_sql_type(obj)[source]¶

Converts a Python object into an internal SQL object.

classmethod type_name()[source]¶
class pyflink.table.WindowGroupedTable(java_window_grouped_table, t_env)[source]¶

Bases: object

A table that has been windowed and grouped for GroupWindow.

select(*fields: Union[str, pyflink.table.expression.Expression]) → pyflink.table.table.Table[source]¶

Performs a selection operation on a window grouped table. Similar to an SQL SELECT statement. The field expressions can contain complex expressions and aggregations.

Example:

>>> window_grouped_table.select(col('key'),
...                             col('window').start,
...                             col('value').avg.alias('valavg'))
>>> window_grouped_table.select("key, window.start, value.avg as valavg")
Parameters

fields – Expression string.

Returns

The result table.

class pyflink.table.WriteMode[source]¶

Bases: object

NO_OVERWRITE = 0¶
OVERWRITE = 1¶

pyflink.table.expressions module¶

pyflink.table.expressions.if_then_else(condition: Union[bool, pyflink.table.expression.Expression[bool][bool]], if_true, if_false) → pyflink.table.expression.Expression[source]¶

Ternary conditional operator that decides which of two other expressions should be evaluated based on a evaluated boolean condition.

e.g. if_then_else(col(“f0”) > 5, “A”, “B”) leads to “A”

Parameters
  • condition – condition boolean condition

  • if_true – expression to be evaluated if condition holds

  • if_false – expression to be evaluated if condition does not hold

New in version 1.12.0.

pyflink.table.expressions.lit(v, data_type: pyflink.table.types.DataType = None) → pyflink.table.expression.Expression[source]¶

Creates a SQL literal.

The data type is derived from the object’s class and its value. For example, lit(12) leads to INT, lit(“abc”) leads to CHAR(3).

Example:

>>> tab.select(col("key"), lit("abc"))

New in version 1.12.0.

pyflink.table.expressions.col(name: str) → pyflink.table.expression.Expression[source]¶

Creates an expression which refers to a table’s field.

Example:

>>> tab.select(col("key"), col("value"))
Parameters

name – the field name to refer to

New in version 1.12.0.

pyflink.table.expressions.range_(start: Union[str, int], end: Union[str, int]) → pyflink.table.expression.Expression[source]¶

Indicates a range from ‘start’ to ‘end’, which can be used in columns selection.

Example:

>>> tab.select(with_columns(range_('b', 'c')))

See also

with_columns()

New in version 1.12.0.

pyflink.table.expressions.and_(predicate0: Union[bool, pyflink.table.expression.Expression[bool][bool]], predicate1: Union[bool, pyflink.table.expression.Expression[bool][bool]], *predicates: Union[bool, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.expression.Expression[bool][bool][source]¶

Boolean AND in three-valued logic.

New in version 1.12.0.

pyflink.table.expressions.or_(predicate0: Union[bool, pyflink.table.expression.Expression[bool][bool]], predicate1: Union[bool, pyflink.table.expression.Expression[bool][bool]], *predicates: Union[bool, pyflink.table.expression.Expression[bool][bool]]) → pyflink.table.expression.Expression[bool][bool][source]¶

Boolean OR in three-valued logic.

New in version 1.12.0.

pyflink.table.expressions.UNBOUNDED_ROW¶

Offset constant to be used in the preceding clause of unbounded Over windows. Use this constant for a row-count interval. Unbounded over windows start with the first row of a partition.

New in version 1.12.0.

pyflink.table.expressions.UNBOUNDED_RANGE¶

Offset constant to be used in the following clause of Over windows. Use this for setting the upper bound of the window to the current row.

New in version 1.12.0.

pyflink.table.expressions.CURRENT_ROW¶

Offset constant to be used in the following clause of Over windows. Use this for setting the upper bound of the window to the sort key of the current row, i.e., all rows with the same sort key as the current row are included in the window.

New in version 1.12.0.

pyflink.table.expressions.CURRENT_RANGE¶

Expressions represent a logical tree for producing a computation result. Expressions might be literal values, function calls, or field references.

New in version 1.12.0.

pyflink.table.expressions.current_date() → pyflink.table.expression.Expression[source]¶

Returns the current SQL date in UTC time zone.

New in version 1.12.0.

pyflink.table.expressions.current_time() → pyflink.table.expression.Expression[source]¶

Returns the current SQL time in UTC time zone.

New in version 1.12.0.

pyflink.table.expressions.current_timestamp() → pyflink.table.expression.Expression[source]¶

Returns the current SQL timestamp in UTC time zone.

New in version 1.12.0.

pyflink.table.expressions.local_time() → pyflink.table.expression.Expression[source]¶

Returns the current SQL time in local time zone.

New in version 1.12.0.

pyflink.table.expressions.local_timestamp() → pyflink.table.expression.Expression[source]¶

Returns the current SQL timestamp in local time zone.

New in version 1.12.0.

pyflink.table.expressions.temporal_overlaps(left_time_point, left_temporal, right_time_point, right_temporal) → pyflink.table.expression.Expression[source]¶

Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates left_end >= right_start && right_end >= left_start.

e.g.
temporal_overlaps(

lit(“2:55:00”).to_time, lit(1).hours, lit(“3:30:00”).to_time, lit(2).hours) leads to true.

Parameters
  • left_time_point – The left time point

  • left_temporal – The time interval from the left time point

  • right_time_point – The right time point

  • right_temporal – The time interval from the right time point

Returns

An expression which indicates whether two anchored time intervals overlap.

New in version 1.12.0.

pyflink.table.expressions.date_format(timestamp, format) → pyflink.table.expression.Expression[source]¶

Formats a timestamp as a string using a specified format. The format must be compatible with MySQL’s date formatting syntax as used by the date_parse function.

For example date_format(col(“time”), “%Y, %d %M”) results in strings formatted as “2017, 05 May”.

Parameters
  • timestamp – The timestamp to format as string.

  • format – The format of the string.

Returns

The formatted timestamp as string.

New in version 1.12.0.

pyflink.table.expressions.timestamp_diff(time_point_unit: pyflink.table.expression.TimePointUnit, time_point1, time_point2) → pyflink.table.expression.Expression[source]¶

Returns the (signed) number of TimePointUnit between time_point1 and time_point2.

For example, timestamp_diff(TimePointUnit.DAY, lit(“2016-06-15”).to_date, lit(“2016-06-18”).to_date leads to 3.

Parameters
  • time_point_unit – The unit to compute diff.

  • time_point1 – The first point in time.

  • time_point2 – The second point in time.

Returns

The number of intervals as integer value.

New in version 1.12.0.

pyflink.table.expressions.array(head, *tail) → pyflink.table.expression.Expression[source]¶

Creates an array of literals.

Example:

>>> tab.select(array(1, 2, 3))

New in version 1.12.0.

pyflink.table.expressions.row(head, *tail) → pyflink.table.expression.Expression[source]¶

Creates a row of expressions.

Example:

>>> tab.select(row("key1", 1))

New in version 1.12.0.

pyflink.table.expressions.map_(key, value, *tail) → pyflink.table.expression.Expression[source]¶

Creates a map of expressions.

Example:

>>> tab.select(
>>>     map_(
>>>         "key1", 1,
>>>         "key2", 2,
>>>         "key3", 3
>>>     ))

Note

keys and values should have the same types for all entries.

New in version 1.12.0.

pyflink.table.expressions.row_interval(rows: int) → pyflink.table.expression.Expression[source]¶

Creates an interval of rows.

Example:

>>> tab.window(Over
>>>         .partition_by(col('a'))
>>>         .order_by(col('proctime'))
>>>         .preceding(row_interval(4))
>>>         .following(CURRENT_ROW)
>>>         .alias('w'))
Parameters

rows – the number of rows

New in version 1.12.0.

pyflink.table.expressions.pi() → pyflink.table.expression.Expression[float][float][source]¶

Returns a value that is closer than any other value to pi.

New in version 1.12.0.

pyflink.table.expressions.e() → pyflink.table.expression.Expression[float][float][source]¶

Returns a value that is closer than any other value to e.

New in version 1.12.0.

pyflink.table.expressions.rand(seed: Union[int, pyflink.table.expression.Expression[int][int]] = None) → pyflink.table.expression.Expression[float][float][source]¶

Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed if specified. Two rand() functions will return identical sequences of numbers if they have same initial seed.

New in version 1.12.0.

pyflink.table.expressions.rand_integer(bound: Union[int, pyflink.table.expression.Expression[int][int]], seed: Union[int, pyflink.table.expression.Expression[int][int]] = None) → pyflink.table.expression.Expression[source]¶

Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed if specified. Two rand_integer() functions will return identical sequences of numbers if they have same initial seed and same bound.

New in version 1.12.0.

pyflink.table.expressions.atan2(y, x) → pyflink.table.expression.Expression[float][float][source]¶

Calculates the arc tangent of a given coordinate.

New in version 1.12.0.

pyflink.table.expressions.negative(v) → pyflink.table.expression.Expression[source]¶

Returns negative numeric.

New in version 1.12.0.

pyflink.table.expressions.concat(first: Union[str, pyflink.table.expression.Expression[str][str]], *others: Union[str, pyflink.table.expression.Expression[str][str]]) → pyflink.table.expression.Expression[str][str][source]¶

Returns the string that results from concatenating the arguments. Returns NULL if any argument is NULL.

New in version 1.12.0.

pyflink.table.expressions.concat_ws(separator: Union[str, pyflink.table.expression.Expression[str][str]], first: Union[str, pyflink.table.expression.Expression[str][str]], *others: Union[str, pyflink.table.expression.Expression[str][str]]) → pyflink.table.expression.Expression[str][str][source]¶

Returns the string that results from concatenating the arguments and separator. Returns NULL If the separator is NULL.

Note

this function does not skip empty strings. However, it does skip any NULL values after the separator argument.

New in version 1.12.0.

pyflink.table.expressions.uuid() → pyflink.table.expression.Expression[str][str][source]¶

Returns an UUID (Universally Unique Identifier) string (e.g., “3d3c68f7-f608-473f-b60c-b0c44ad4cc4e”) according to RFC 4122 type 4 (pseudo randomly generated) UUID. The UUID is generated using a cryptographically strong pseudo random number generator.

New in version 1.12.0.

pyflink.table.expressions.null_of(data_type: pyflink.table.types.DataType) → pyflink.table.expression.Expression[source]¶

Returns a null literal value of a given data type.

New in version 1.12.0.

pyflink.table.expressions.log(v, base=None) → pyflink.table.expression.Expression[float][float][source]¶

If base is specified, calculates the logarithm of the given value to the given base. Otherwise, calculates the natural logarithm of the given value.

New in version 1.12.0.

pyflink.table.expressions.with_columns(head, *tails) → pyflink.table.expression.Expression[source]¶

Creates an expression that selects a range of columns. It can be used wherever an array of expression is accepted such as function calls, projections, or groupings.

A range can either be index-based or name-based. Indices start at 1 and boundaries are inclusive.

e.g. with_columns(range_(“b”, “c”)) or with_columns(col(“*”))

See also

range_(), without_columns()

New in version 1.12.0.

pyflink.table.expressions.without_columns(head, *tails) → pyflink.table.expression.Expression[source]¶

Creates an expression that selects all columns except for the given range of columns. It can be used wherever an array of expression is accepted such as function calls, projections, or groupings.

A range can either be index-based or name-based. Indices start at 1 and boundaries are inclusive.

e.g. without_columns(range_(“b”, “c”)) or without_columns(col(“c”))

See also

range_(), with_columns()

New in version 1.12.0.

pyflink.table.expressions.call(f: Union[str, pyflink.table.udf.UserDefinedFunctionWrapper], *args) → pyflink.table.expression.Expression[source]¶

The first parameter f could be a str or a Python user-defined function.

When it is str, this is a call to a function that will be looked up in a catalog. There are two kinds of functions:

  • System functions - which are identified with one part names

  • Catalog functions - which are identified always with three parts names

    (catalog, database, function)

Moreover each function can either be a temporary function or permanent one (which is stored in an external catalog).

Based on that two properties the resolution order for looking up a function based on the provided function_name is following:

  • Temporary system function

  • System function

  • Temporary catalog function

  • Catalog function

Parameters
  • f – the path of the function or the Python user-defined function.

  • args – parameters of the user-defined function.

New in version 1.12.0.

pyflink.table.window module¶

class pyflink.table.window.Tumble[source]¶

Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups elements in 5 minutes intervals.

Example:

>>> from pyflink.table import expressions as expr
>>> Tumble.over(expr.lit(10).minutes)
...       .on(expr.col("rowtime"))
...       .alias("w")

>>> Tumble.over("10.minutes").on("rowtime").alias("w")
classmethod over(size: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.TumbleWithSize[source]¶

Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups elements in 5 minutes intervals.

Parameters

size – The size of the window as time or row-count interval.

Returns

A partially defined tumbling window.

class pyflink.table.window.Session[source]¶

Helper class for creating a session window. The boundary of session windows are defined by intervals of inactivity, i.e., a session window is closes if no event appears for a defined gap period.

Example:

>>> from pyflink.table import expressions as expr
>>> Session.with_gap(expr.lit(10).minutes)
...        .on(expr.col("rowtime"))
...        .alias("w")

>>> Session.with_gap("10.minutes").on("rowtime").alias("w")
classmethod with_gap(gap: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.SessionWithGap[source]¶

Creates a session window. The boundary of session windows are defined by intervals of inactivity, i.e., a session window is closes if no event appears for a defined gap period.

Parameters

gap – Specifies how long (as interval of milliseconds) to wait for new data before closing the session window.

Returns

A partially defined session window.

class pyflink.table.window.Slide[source]¶

Helper class for creating a sliding window. Sliding windows have a fixed size and slide by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, an element can be assigned to multiple windows.

For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive window evaluations.

Example:

>>> from pyflink.table import expressions as expr
>>> Slide.over(expr.lit(10).minutes)
...      .every(expr.lit(5).minutes)
...      .on(expr.col("rowtime"))
...      .alias("w")

>>> Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
classmethod over(size: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.SlideWithSize[source]¶

Creates a sliding window. Sliding windows have a fixed size and slide by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, an element can be assigned to multiple windows.

For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups elements of 15 minutes and evaluates every five minutes. Each element is contained in three consecutive window evaluations.

Parameters

size – The size of the window as time or row-count interval.

Returns

A partially specified sliding window.

class pyflink.table.window.Over[source]¶

Helper class for creating an over window. Similar to SQL, over window aggregates compute an aggregate for each input row over a range of its neighboring rows.

Over-windows for batch tables are currently not supported.

Example:

>>> from pyflink.table import expressions as expr
>>> Over.partition_by(col("a")) \
...     .order_by(col("rowtime")) \
...     .preceding(expr.UNBOUNDED_RANGE) \
...     .alias("w")

>>> Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
classmethod order_by(order_by: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.OverWindowPartitionedOrdered[source]¶

Specifies the time attribute on which rows are ordered.

For streaming tables, reference a rowtime or proctime time attribute here to specify the time mode.

Parameters

order_by – Field reference.

Returns

An over window with defined order.

classmethod partition_by(*partition_by: Union[str, pyflink.table.expression.Expression]) → pyflink.table.window.OverWindowPartitioned[source]¶

Partitions the elements on some partition keys.

Each partition is individually sorted and aggregate functions are applied to each partition separately.

Parameters

partition_by – List of field references.

Returns

An over window with defined partitioning.

class pyflink.table.window.GroupWindow(java_window)[source]¶

A group window specification.

Group windows group rows based on time or row-count intervals and is therefore essentially a special type of groupBy. Just like groupBy, group windows allow to compute aggregates on groups of elements.

Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping is required to apply aggregations on streaming tables.

For finite batch tables, group windows provide shortcuts for time-based groupBy.

class pyflink.table.window.OverWindow(java_over_window)[source]¶

An over window specification.

Similar to SQL, over window aggregates compute an aggregate for each input row over a range of its neighboring rows.

pyflink.table.descriptors module¶

class pyflink.table.descriptors.Rowtime[source]¶

Rowtime descriptor for describing an event time attribute in the schema.

timestamps_from_extractor(extractor: str) → pyflink.table.descriptors.Rowtime[source]¶

Sets a custom timestamp extractor to be used for the rowtime attribute.

Parameters

extractor – The java fully-qualified class name of the TimestampExtractor to extract the rowtime attribute from the physical type. The TimestampExtractor must have a public no-argument constructor and can be founded by in current Java classloader.

Returns

This rowtime descriptor.

timestamps_from_field(field_name: str)[source]¶

Sets a built-in timestamp extractor that converts an existing LONG or TIMESTAMP field into the rowtime attribute.

Parameters

field_name – The field to convert into a rowtime attribute.

Returns

This rowtime descriptor.

timestamps_from_source() → pyflink.table.descriptors.Rowtime[source]¶

Sets a built-in timestamp extractor that converts the assigned timestamps from a DataStream API record into the rowtime attribute and thus preserves the assigned timestamps from the source.

Note

This extractor only works in streaming environments.

Returns

This rowtime descriptor.

watermarks_from_source() → pyflink.table.descriptors.Rowtime[source]¶

Sets a built-in watermark strategy which indicates the watermarks should be preserved from the underlying DataStream API and thus preserves the assigned watermarks from the source.

Returns

This rowtime descriptor.

watermarks_from_strategy(strategy: str) → pyflink.table.descriptors.Rowtime[source]¶

Sets a custom watermark strategy to be used for the rowtime attribute.

Parameters

strategy – The java fully-qualified class name of the WatermarkStrategy. The WatermarkStrategy must have a public no-argument constructor and can be founded by in current Java classloader.

Returns

This rowtime descriptor.

watermarks_periodic_ascending() → pyflink.table.descriptors.Rowtime[source]¶

Sets a built-in watermark strategy for ascending rowtime attributes.

Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late.

Returns

This rowtime descriptor.

watermarks_periodic_bounded(delay: int) → pyflink.table.descriptors.Rowtime[source]¶

Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.

Emits watermarks which are the maximum observed timestamp minus the specified delay.

Parameters

delay – Delay in milliseconds.

Returns

This rowtime descriptor.

class pyflink.table.descriptors.Schema(schema=None, fields=None, rowtime=None)[source]¶

Describes a schema of a table.

Note

Field names are matched by the exact name by default (case sensitive).

field(field_name: str, field_type: Union[pyflink.table.types.DataType, str]) → pyflink.table.descriptors.Schema[source]¶

Adds a field with the field name and the data type or type string. Required. This method can be called multiple times. The call order of this method defines also the order of the fields in a row. Here is a document that introduces the type strings: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#type-strings

Parameters
  • field_name – The field name.

  • field_type – The data type or type string of the field.

Returns

This schema object.

fields(fields: Dict[str, Union[pyflink.table.types.DataType, str]]) → pyflink.table.descriptors.Schema[source]¶

Adds a set of fields with the field name and the data type or type string stored in a list.

Parameters

fields – Dict of fields with the field name and the data type or type string stored. E.g, [(‘int_field’, DataTypes.INT()), (‘string_field’, DataTypes.STRING())].

Returns

This schema object.

New in version 1.11.0.

from_origin_field(origin_field_name: str) → pyflink.table.descriptors.Schema[source]¶

Specifies the origin of the previously defined field. The origin field is defined by a connector or format.

E.g. field(“myString”, Types.STRING).from_origin_field(“CSV_MY_STRING”)

Note

Field names are matched by the exact name by default (case sensitive).

Parameters

origin_field_name – The origin field name.

Returns

This schema object.

proctime() → pyflink.table.descriptors.Schema[source]¶

Specifies the previously defined field as a processing-time attribute.

E.g. field(“proctime”, Types.SQL_TIMESTAMP).proctime()

Returns

This schema object.

rowtime(rowtime: pyflink.table.descriptors.Rowtime) → pyflink.table.descriptors.Schema[source]¶

Specifies the previously defined field as an event-time attribute.

E.g. field(“rowtime”, Types.SQL_TIMESTAMP).rowtime(…)

Parameters

rowtime – A RowTime.

Returns

This schema object.

schema(table_schema: pyflink.table.table_schema.TableSchema) → pyflink.table.descriptors.Schema[source]¶

Sets the schema with field names and the types. Required.

This method overwrites existing fields added with field().

Parameters

table_schema – The TableSchema object.

Returns

This schema object.

class pyflink.table.descriptors.OldCsv(schema=None, field_delimiter=None, line_delimiter=None, quote_character=None, comment_prefix=None, ignore_parse_errors=False, ignore_first_line=False)[source]¶

Format descriptor for comma-separated values (CSV).

Note

This descriptor describes Flink’s non-standard CSV table source/sink. In the future, the descriptor will be replaced by a proper RFC-compliant version. Use the RFC-compliant Csv format in the dedicated flink-formats/flink-csv module instead when writing to Kafka. Use the old one for stream/batch filesystem operations for now.

Note

Deprecated: use the RFC-compliant Csv format instead when writing to Kafka.

comment_prefix(prefix: str) → pyflink.table.descriptors.OldCsv[source]¶

Sets a prefix to indicate comments, null by default.

Parameters

prefix – The prefix to indicate comments.

Returns

This OldCsv object.

field(field_name: str, field_type: Union[pyflink.table.types.DataType, str]) → pyflink.table.descriptors.OldCsv[source]¶

Adds a format field with the field name and the data type or type string. Required. This method can be called multiple times. The call order of this method defines also the order of the fields in the format.

Parameters
  • field_name – The field name.

  • field_type – The data type or type string of the field.

Returns

This OldCsv object.

field_delimiter(delimiter: str) → pyflink.table.descriptors.OldCsv[source]¶

Sets the field delimiter, “,” by default.

Parameters

delimiter – The field delimiter.

Returns

This OldCsv object.

ignore_first_line() → pyflink.table.descriptors.OldCsv[source]¶

Ignore the first line. Not skip the first line by default.

Returns

This OldCsv object.

ignore_parse_errors() → pyflink.table.descriptors.OldCsv[source]¶

Skip records with parse error instead to fail. Throw an exception by default.

Returns

This OldCsv object.

line_delimiter(delimiter: str) → pyflink.table.descriptors.OldCsv[source]¶

Sets the line delimiter, “\n” by default.

Parameters

delimiter – The line delimiter.

Returns

This OldCsv object.

quote_character(quote_character: str) → pyflink.table.descriptors.OldCsv[source]¶

Sets a quote character for String values, null by default.

Parameters

quote_character – The quote character.

Returns

This OldCsv object.

schema(table_schema: pyflink.table.table_schema.TableSchema) → pyflink.table.descriptors.OldCsv[source]¶

Sets the schema with field names and the types. Required.

This method overwrites existing fields added with field().

Parameters

table_schema – The TableSchema object.

Returns

This OldCsv object.

class pyflink.table.descriptors.FileSystem(path=None)[source]¶

Connector descriptor for a file system.

path(path_str: str) → pyflink.table.descriptors.FileSystem[source]¶

Sets the path to a file or directory in a file system.

Parameters

path_str – The path of a file or directory.

Returns

This FileSystem object.

class pyflink.table.descriptors.Kafka(version=None, topic=None, properties=None, start_from_earliest=False, start_from_latest=False, start_from_group_offsets=True, start_from_specific_offsets_dict=None, start_from_timestamp=None, sink_partitioner_fixed=None, sink_partitioner_round_robin=None, custom_partitioner_class_name=None)[source]¶

Connector descriptor for the Apache Kafka message queue.

properties(property_dict: Dict[str, str]) → pyflink.table.descriptors.Kafka[source]¶

Sets the configuration properties for the Kafka consumer. Resets previously set properties.

Parameters

property_dict – The dict object contains configuration properties for the Kafka consumer. Both the keys and values should be strings.

Returns

This object.

property(key: str, value: str) → pyflink.table.descriptors.Kafka[source]¶

Adds a configuration properties for the Kafka consumer.

Parameters
  • key – Property key string for the Kafka consumer.

  • value – Property value string for the Kafka consumer.

Returns

This object.

sink_partitioner_custom(partitioner_class_name: str) → pyflink.table.descriptors.Kafka[source]¶

Configures how to partition records from Flink’s partitions into Kafka’s partitions.

This strategy allows for a custom partitioner by providing an implementation of FlinkKafkaPartitioner.

Parameters

partitioner_class_name – The java canonical class name of the FlinkKafkaPartitioner. The FlinkKafkaPartitioner must have a public no-argument constructor and can be founded by in current Java classloader.

Returns

This object.

sink_partitioner_fixed() → pyflink.table.descriptors.Kafka[source]¶

Configures how to partition records from Flink’s partitions into Kafka’s partitions.

This strategy ensures that each Flink partition ends up in one Kafka partition.

Note

One Kafka partition can contain multiple Flink partitions. Examples:

More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain the output of more than one flink partition:

Flink Sinks ——— Kafka Partitions
1 —————-> 1
2 ————–/
3 ————-/
4 ————/

Fewer Flink partitions than Kafka partitions:

Flink Sinks ——— Kafka Partitions
1 —————-> 1
2 —————-> 2
……………. 3
……………. 4
……………. 5
Returns

This object.

sink_partitioner_round_robin() → pyflink.table.descriptors.Kafka[source]¶

Configures how to partition records from Flink’s partitions into Kafka’s partitions.

This strategy ensures that records will be distributed to Kafka partitions in a round-robin fashion.

Note

This strategy is useful to avoid an unbalanced partitioning. However, it will cause a lot of network connections between all the Flink instances and all the Kafka brokers.

Returns

This object.

start_from_earliest() → pyflink.table.descriptors.Kafka[source]¶

Specifies the consumer to start reading from the earliest offset for all partitions. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.

This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.

Returns

This object.

start_from_group_offsets() → pyflink.table.descriptors.Kafka[source]¶

Specifies the consumer to start reading from any committed group offsets found in Zookeeper / Kafka brokers. The “group.id” property must be set in the configuration properties. If no offset can be found for a partition, the behaviour in “auto.offset.reset” set in the configuration properties will be used for the partition.

This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.

Returns

This object.

start_from_latest() → pyflink.table.descriptors.Kafka[source]¶

Specifies the consumer to start reading from the latest offset for all partitions. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.

This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.

Returns

This object.

start_from_specific_offset(partition: int, specific_offset: int) → pyflink.table.descriptors.Kafka[source]¶

Configures to start reading partitions from specific offsets and specifies the given offset for the given partition.

see pyflink.table.descriptors.Kafka.start_from_specific_offsets()

Parameters
  • partition – Partition id.

  • specific_offset – Specified offset in given partition.

Returns

This object.

start_from_specific_offsets(specific_offsets_dict: Dict[int, int]) → pyflink.table.descriptors.Kafka[source]¶

Specifies the consumer to start reading partitions from specific offsets, set independently for each partition. The specified offset should be the offset of the next record that will be read from partitions. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.

If the provided map of offsets contains entries whose partition is not subscribed by the consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided map of offsets, the consumer will fallback to the default group offset behaviour(see pyflink.table.descriptors.Kafka.start_from_group_offsets()) for that particular partition.

If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group offsets but still no group offset could be found for it, then the “auto.offset.reset” behaviour set in the configuration properties will be used for the partition.

This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.

Parameters

specific_offsets_dict – Dict of specific_offsets that the key is int-type partition id and value is int-type offset value.

Returns

This object.

start_from_timestamp(timestamp: int) → pyflink.table.descriptors.Kafka[source]¶

Specifies the consumer to start reading partitions from a specified timestamp. The specified timestamp must be before the current timestamp. This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.

The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific timestamp from Kafka. If there’s no such offset, the consumer will use the latest offset to read data from kafka.

This method does not affect where partitions are read from when the consumer is restored from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint, only the offsets in the restored state will be used.

:param timestamp timestamp for the startup offsets, as milliseconds from epoch. :return: This object.

New in version 1.11.0.

topic(topic: str) → pyflink.table.descriptors.Kafka[source]¶

Sets the topic from which the table is read.

Parameters

topic – The topic from which the table is read.

Returns

This object.

version(version: str) → pyflink.table.descriptors.Kafka[source]¶

Sets the Kafka version to be used.

Parameters

version – Kafka version. E.g., “0.8”, “0.11”, etc.

Returns

This object.

class pyflink.table.descriptors.Elasticsearch(version=None, hostname=None, port=None, protocol=None, index=None, document_type=None, key_delimiter=None, key_null_literal=None, failure_handler_fail=False, failure_handler_ignore=False, failure_handler_retry_rejected=False, custom_failure_handler_class_name=None, disable_flush_on_checkpoint=False, bulk_flush_max_actions=None, bulk_flush_max_size=None, bulk_flush_interval=None, bulk_flush_backoff_constant=False, bulk_flush_backoff_exponential=False, bulk_flush_backoff_max_retries=None, bulk_flush_backoff_delay=None, connection_max_retry_timeout=None, connection_path_prefix=None)[source]¶

Connector descriptor for the Elasticsearch search engine.

bulk_flush_backoff_constant() → pyflink.table.descriptors.Elasticsearch[source]¶

Configures how to buffer elements before sending them in bulk to the cluster for efficiency.

Sets a constant backoff type to use when flushing bulk requests.

Returns

This object.

bulk_flush_backoff_delay(delay: int) → pyflink.table.descriptors.Elasticsearch[source]¶

Configures how to buffer elements before sending them in bulk to the cluster for efficiency.

Sets the amount of delay between each backoff attempt when flushing bulk requests (in milliseconds).

Make sure to enable backoff by selecting a strategy ( pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_constant() or pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_exponential()).

Parameters

delay – Delay between each backoff attempt (in milliseconds).

Returns

This object.

bulk_flush_backoff_exponential() → pyflink.table.descriptors.Elasticsearch[source]¶

Configures how to buffer elements before sending them in bulk to the cluster for efficiency.

Sets an exponential backoff type to use when flushing bulk requests.

Returns

This object.

bulk_flush_backoff_max_retries(max_retries: int) → pyflink.table.descriptors.Elasticsearch[source]¶

Configures how to buffer elements before sending them in bulk to the cluster for efficiency.

Sets the maximum number of retries for a backoff attempt when flushing bulk requests.

Make sure to enable backoff by selecting a strategy ( pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_constant() or pyflink.table.descriptors.Elasticsearch.bulk_flush_backoff_exponential()).

Parameters

max_retries – The maximum number of retries.

Returns

This object.

bulk_flush_interval(interval: int) → pyflink.table.descriptors.Elasticsearch[source]¶

Configures how to buffer elements before sending them in bulk to the cluster for efficiency.

Sets the bulk flush interval (in milliseconds).

Parameters

interval – Bulk flush interval (in milliseconds).

Returns

This object.

bulk_flush_max_actions(max_actions_num: int) → pyflink.table.descriptors.Elasticsearch[source]¶

Configures how to buffer elements before sending them in bulk to the cluster for efficiency.

Sets the maximum number of actions to buffer for each bulk request.

Parameters

max_actions_num – the maximum number of actions to buffer per bulk request.

Returns

This object.

bulk_flush_max_size(max_size: int) → pyflink.table.descriptors.Elasticsearch[source]¶

Configures how to buffer elements before sending them in bulk to the cluster for efficiency.

Sets the maximum size of buffered actions per bulk request (using the syntax of MemorySize).

Parameters

max_size – The maximum size. E.g. “42 mb”. only MB granularity is supported.

Returns

This object.

connection_max_retry_timeout(max_retry_timeout: int) → pyflink.table.descriptors.Elasticsearch[source]¶

Sets connection properties to be used during REST communication to Elasticsearch.

Sets the maximum timeout (in milliseconds) in case of multiple retries of the same request.

Parameters

max_retry_timeout – Maximum timeout (in milliseconds).

Returns

This object.

connection_path_prefix(path_prefix: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Sets connection properties to be used during REST communication to Elasticsearch.

Adds a path prefix to every REST communication.

Parameters

path_prefix – Prefix string to be added to every REST communication.

Returns

This object.

disable_flush_on_checkpoint() → pyflink.table.descriptors.Elasticsearch[source]¶

Disables flushing on checkpoint. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints.

Note

If flushing on checkpoint is disabled, a Elasticsearch sink does NOT provide any strong guarantees for at-least-once delivery of action requests.

Returns

This object.

document_type(document_type: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Declares the Elasticsearch document type for every record. Required.

Parameters

document_type – Elasticsearch document type.

Returns

This object.

failure_handler_custom(failure_handler_class_name: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Configures a failure handling strategy in case a request to Elasticsearch fails.

This strategy allows for custom failure handling using a ActionRequestFailureHandler.

Parameters

failure_handler_class_name –

Returns

This object.

failure_handler_fail() → pyflink.table.descriptors.Elasticsearch[source]¶

Configures a failure handling strategy in case a request to Elasticsearch fails.

This strategy throws an exception if a request fails and thus causes a job failure.

Returns

This object.

failure_handler_ignore() → pyflink.table.descriptors.Elasticsearch[source]¶

Configures a failure handling strategy in case a request to Elasticsearch fails.

This strategy ignores failures and drops the request.

Returns

This object.

failure_handler_retry_rejected() → pyflink.table.descriptors.Elasticsearch[source]¶

Configures a failure handling strategy in case a request to Elasticsearch fails.

This strategy re-adds requests that have failed due to queue capacity saturation.

Returns

This object.

host(hostname: str, port: Union[int, str], protocol: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Adds an Elasticsearch host to connect to. Required.

Multiple hosts can be declared by calling this method multiple times.

Parameters
  • hostname – Connection hostname.

  • port – Connection port.

  • protocol – Connection protocol; e.g. “http”.

Returns

This object.

index(index: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Declares the Elasticsearch index for every record. Required.

Parameters

index – Elasticsearch index.

Returns

This object.

key_delimiter(key_delimiter: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Sets a custom key delimiter in case the Elasticsearch ID needs to be constructed from multiple fields. Optional.

Parameters

key_delimiter – Key delimiter; e.g., “$” would result in IDs “KEY1$KEY2$KEY3”.

Returns

This object.

key_null_literal(key_null_literal: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Sets a custom representation for null fields in keys. Optional.

Parameters

key_null_literal – key null literal string; e.g. “N/A” would result in IDs “KEY1_N/A_KEY3”.

Returns

This object.

version(version: str) → pyflink.table.descriptors.Elasticsearch[source]¶

Sets the Elasticsearch version to be used. Required.

Parameters

version – Elasticsearch version. E.g., “6”.

Returns

This object.

class pyflink.table.descriptors.HBase(version=None, table_name=None, zookeeper_quorum=None, zookeeper_node_parent=None, write_buffer_flush_max_size=None, write_buffer_flush_max_rows=None, write_buffer_flush_interval=None)[source]¶

Connector descriptor for Apache HBase.

New in version 1.11.0.

table_name(table_name: str) → pyflink.table.descriptors.HBase[source]¶

Set the HBase table name, Required.

Parameters

table_name – Name of HBase table. E.g., “testNamespace:testTable”, “testDefaultTable”

Returns

This object.

New in version 1.11.0.

version(version: str) → pyflink.table.descriptors.HBase[source]¶

Set the Apache HBase version to be used, Required.

Parameters

version – HBase version. E.g., “1.4.3”.

Returns

This object.

New in version 1.11.0.

write_buffer_flush_interval(interval: Union[str, int]) → pyflink.table.descriptors.HBase[source]¶

Set an interval when to flushing buffered requesting if the interval passes, in milliseconds. Defaults to not set, i.e. won’t flush based on flush interval, Optional.

Parameters

interval – flush interval. The string should be in format “{length value}{time unit label}” E.g, “123ms”, “1 s”, if not time unit label is specified, it will be considered as milliseconds.

Returns

This object.

New in version 1.11.0.

write_buffer_flush_max_rows(write_buffer_flush_max_rows: int) → pyflink.table.descriptors.HBase[source]¶

Set threshold when to flush buffered request based on the number of rows currently added. Defaults to not set, i.e. won;t flush based on the number of buffered rows, Optional.

Parameters

write_buffer_flush_max_rows – number of added rows when begin the request flushing.

Returns

This object.

New in version 1.11.0.

write_buffer_flush_max_size(max_size: Union[int, str]) → pyflink.table.descriptors.HBase[source]¶

Set threshold when to flush buffered request based on the memory byte size of rows currently added.

Parameters

max_size – the maximum size.

Returns

This object.

New in version 1.11.0.

zookeeper_node_parent(zookeeper_node_parent: str) → pyflink.table.descriptors.HBase[source]¶

Set the zookeeper node parent path of HBase cluster. Default to use “/hbase”, Optional.

Parameters

zookeeper_node_parent – zookeeper node path of hbase cluster. E.g, “/hbase/example-root-znode”.

Returns

This object

New in version 1.11.0.

zookeeper_quorum(zookeeper_quorum: str) → pyflink.table.descriptors.HBase[source]¶

Set the zookeeper quorum address to connect the HBase cluster, Required.

Parameters

zookeeper_quorum – zookeeper quorum address to connect the HBase cluster. E.g., “localhost:2181,localhost:2182,localhost:2183”

Returns

This object.

New in version 1.11.0.

class pyflink.table.descriptors.Csv(schema=None, field_delimiter=None, line_delimiter=None, quote_character=None, allow_comments=False, ignore_parse_errors=False, array_element_delimiter=None, escape_character=None, null_literal=None)[source]¶

Format descriptor for comma-separated values (CSV).

This descriptor aims to comply with RFC-4180 (“Common Format and MIME Type for Comma-Separated Values (CSV) Files”) proposed by the Internet Engineering Task Force (IETF).

Note

This descriptor does not describe Flink’s old non-standard CSV table source/sink. Currently, this descriptor can be used when writing to Kafka. The old one is still available as OldCsv for stream/batch filesystem operations.

allow_comments() → pyflink.table.descriptors.Csv[source]¶

Ignores comment lines that start with ‘#’ (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows.

Returns

This Csv object.

array_element_delimiter(delimiter: str) → pyflink.table.descriptors.Csv[source]¶

Sets the array element delimiter string for separating array or row element values (“;” by default).

Parameters

delimiter – The array element delimiter.

Returns

This Csv object.

derive_schema() → pyflink.table.descriptors.Csv[source]¶

Derives the format schema from the table’s schema. Required if no format schema is defined.

This allows for defining schema information only once.

The names, types, and fields’ order of the format are determined by the table’s schema. Time attributes are ignored if their origin is not a field. A “from” definition is interpreted as a field renaming in the format.

Returns

This Csv object.

escape_character(escape_character: str) → pyflink.table.descriptors.Csv[source]¶

Sets the escape character for escaping values (disabled by default).

Parameters

escape_character – Escaping character (e.g. backslash).

Returns

This Csv object.

field_delimiter(delimiter: str) → pyflink.table.descriptors.Csv[source]¶

Sets the field delimiter character (‘,’ by default).

Parameters

delimiter – The field delimiter character.

Returns

This Csv object.

ignore_parse_errors() → pyflink.table.descriptors.Csv[source]¶

Skip records with parse error instead to fail. Throw an exception by default.

Returns

This Csv object.

line_delimiter(delimiter: str) → pyflink.table.descriptors.Csv[source]¶

Sets the line delimiter (“\n” by default; otherwise “\r” or “\r\n” are allowed).

Parameters

delimiter – The line delimiter.

Returns

This Csv object.

null_literal(null_literal: str) → pyflink.table.descriptors.Csv[source]¶

Sets the null literal string that is interpreted as a null value (disabled by default).

Parameters

null_literal – The null literal string.

Returns

This Csv object.

quote_character(quote_character: str) → pyflink.table.descriptors.Csv[source]¶

Sets the field delimiter character (‘,’ by default).

Parameters

quote_character – The quote character.

Returns

This Csv object.

schema(schema_data_type: pyflink.table.types.DataType) → pyflink.table.descriptors.Csv[source]¶

Sets the format schema with field names and the types. Required if schema is not derived.

Parameters

schema_data_type – Data type from DataTypes that describes the schema.

Returns

This Csv object.

class pyflink.table.descriptors.Avro(record_class=None, avro_schema=None)[source]¶

Format descriptor for Apache Avro records.

avro_schema(avro_schema: str) → pyflink.table.descriptors.Avro[source]¶

Sets the Avro schema for specific or generic Avro records.

Parameters

avro_schema – Avro schema string.

Returns

This object.

record_class(record_class: str) → pyflink.table.descriptors.Avro[source]¶

Sets the class of the Avro specific record.

Parameters

record_class – The java fully-qualified class name of the Avro record.

Returns

This object.

class pyflink.table.descriptors.Json(json_schema=None, schema=None, derive_schema=False)[source]¶

Format descriptor for JSON.

derive_schema() → pyflink.table.descriptors.Json[source]¶

Derives the format schema from the table’s schema described.

This allows for defining schema information only once.

The names, types, and fields’ order of the format are determined by the table’s schema. Time attributes are ignored if their origin is not a field. A “from” definition is interpreted as a field renaming in the format.

Returns

This object.

fail_on_missing_field(fail_on_missing_field: bool) → pyflink.table.descriptors.Json[source]¶

Sets flag whether to fail if a field is missing or not.

Parameters

fail_on_missing_field – If set to True, the operation fails if there is a missing field. If set to False, a missing field is set to null.

Returns

This object.

ignore_parse_errors(ignore_parse_errors: bool) → pyflink.table.descriptors.Json[source]¶

Sets flag whether to fail when parsing json fails.

Parameters

ignore_parse_errors – If set to true, the operation will ignore parse errors. If set to false, the operation fails when parsing json fails.

Returns

This object.

json_schema(json_schema: str) → pyflink.table.descriptors.Json[source]¶

Sets the JSON schema string with field names and the types according to the JSON schema specification: http://json-schema.org/specification.html

The schema might be nested.

Parameters

json_schema – The JSON schema string.

Returns

This object.

schema(schema_data_type: pyflink.table.types.DataType) → pyflink.table.descriptors.Json[source]¶

Sets the schema using DataTypes.

JSON objects are represented as ROW types.

The schema might be nested.

Parameters

schema_data_type – Data type that describes the schema.

Returns

This object.

class pyflink.table.descriptors.ConnectTableDescriptor(j_connect_table_descriptor)[source]¶

Common class for table’s created with pyflink.table.TableEnvironment.connect.

create_temporary_table(path: str) → pyflink.table.descriptors.ConnectTableDescriptor[source]¶

Registers the table described by underlying properties in a given path.

There is no distinction between source and sink at the descriptor level anymore as this method does not perform actual class lookup. It only stores the underlying properties. The actual source/sink lookup is performed when the table is used.

Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will be inaccessible in the current session. To make the permanent object available again you can drop the corresponding temporary object.

Note

The schema must be explicitly defined.

Parameters

path – path where to register the temporary table

New in version 1.10.0.

with_format(format_descriptor: pyflink.table.descriptors.FormatDescriptor) → pyflink.table.descriptors.ConnectTableDescriptor[source]¶

Specifies the format that defines how to read data from a connector.

Returns

This object.

with_schema(schema: pyflink.table.descriptors.Schema) → pyflink.table.descriptors.ConnectTableDescriptor[source]¶

Specifies the resulting table schema.

Returns

This object.

class pyflink.table.descriptors.StreamTableDescriptor(j_stream_table_descriptor, in_append_mode=False, in_retract_mode=False, in_upsert_mode=False)[source]¶

Descriptor for specifying a table source and/or sink in a streaming environment.

See also

parent class: ConnectTableDescriptor

in_append_mode() → pyflink.table.descriptors.StreamTableDescriptor[source]¶

Declares how to perform the conversion between a dynamic table and an external connector.

In append mode, a dynamic table and an external connector only exchange INSERT messages.

Returns

This object.

in_retract_mode() → pyflink.table.descriptors.StreamTableDescriptor[source]¶

Declares how to perform the conversion between a dynamic table and an external connector.

In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages.

An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for the updating (new) row.

In this mode, a key must not be defined as opposed to upsert mode. However, every update consists of two messages which is less efficient.

Returns

This object.

in_upsert_mode() → pyflink.table.descriptors.StreamTableDescriptor[source]¶

Declares how to perform the conversion between a dynamic table and an external connector.

In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages.

This mode requires a (possibly composite) unique key by which updates can be propagated. The external connector needs to be aware of the unique key attribute in order to apply messages correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as DELETE messages.

The main difference to a retract stream is that UPDATE changes are encoded with a single message and are therefore more efficient.

Returns

This object.

class pyflink.table.descriptors.BatchTableDescriptor(j_batch_table_descriptor)[source]¶

Descriptor for specifying a table source and/or sink in a batch environment.

See also

parent class: ConnectTableDescriptor

class pyflink.table.descriptors.CustomConnectorDescriptor(type, version, format_needed)[source]¶

Describes a custom connector to an other system.

properties(property_dict: Dict[str, str]) → pyflink.table.descriptors.CustomConnectorDescriptor[source]¶

Adds a set of properties for the connector.

Parameters

property_dict – The dict object contains configuration properties for the connector. Both the keys and values should be strings.

Returns

This object.

property(key: str, value: str) → pyflink.table.descriptors.CustomConnectorDescriptor[source]¶

Adds a configuration property for the connector.

Parameters
  • key – The property key to be set.

  • value – The property value to be set.

Returns

This object.

class pyflink.table.descriptors.CustomFormatDescriptor(type, version)[source]¶

Describes the custom format of data.

properties(property_dict: Dict[str, str]) → pyflink.table.descriptors.CustomFormatDescriptor[source]¶

Adds a set of properties for the format.

Parameters

property_dict – The dict object contains configuration properties for the format. Both the keys and values should be strings.

Returns

This object.

property(key: str, value: str) → pyflink.table.descriptors.CustomFormatDescriptor[source]¶

Adds a configuration property for the format.

Parameters
  • key – The property key to be set.

  • value – The property value to be set.

Returns

This object.

pyflink.table.catalog module¶

class pyflink.table.catalog.Catalog(j_catalog)[source]¶

Catalog is responsible for reading and writing metadata such as database/table/views/UDFs from a registered catalog. It connects a registered catalog and Flink’s Table API.

alter_database(name: str, new_database: pyflink.table.catalog.CatalogDatabase, ignore_if_not_exists: bool)[source]¶

Modify an existing database.

Parameters
  • name – Name of the database to be modified.

  • new_database – The new database CatalogDatabase definition.

  • ignore_if_not_exists – Flag to specify behavior when the given database does not exist: if set to false, throw an exception, if set to true, do nothing.

Raise

CatalogException in case of any runtime exception. DatabaseNotExistException if the given database does not exist.

alter_function(function_path: pyflink.table.catalog.ObjectPath, new_function: pyflink.table.catalog.CatalogFunction, ignore_if_not_exists: bool)[source]¶

Modify an existing function.

Parameters
  • function_path – Path ObjectPath of the function.

  • new_function – The function CatalogFunction to be modified.

  • ignore_if_not_exists – Flag to specify behavior if the function does not exist: if set to false, throw an exception if set to true, nothing happens

Raise

CatalogException in case of any runtime exception. FunctionNotExistException if the function does not exist.

alter_partition(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec, new_partition: pyflink.table.catalog.CatalogPartition, ignore_if_not_exists: bool)[source]¶

Alter a partition.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition to alter.

  • new_partition – New partition CatalogPartition to replace the old one.

  • ignore_if_not_exists – Flag to specify behavior if the database does not exist: if set to false, throw an exception, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. PartitionNotExistException thrown if the target partition does not exist.

alter_partition_column_statistics(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec, column_statistics: pyflink.table.catalog.CatalogColumnStatistics, ignore_if_not_exists: bool)[source]¶

Update the column statistics of a table partition.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition.

  • column_statistics – New column statistics CatalogColumnStatistics to update.

  • ignore_if_not_exists – Flag to specify behavior if the partition does not exist: if set to false, throw an exception, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.

alter_partition_statistics(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec, partition_statistics: pyflink.table.catalog.CatalogTableStatistics, ignore_if_not_exists: bool)[source]¶

Update the statistics of a table partition.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition.

  • partition_statistics – New statistics CatalogTableStatistics to update.

  • ignore_if_not_exists – Flag to specify behavior if the partition does not exist: if set to false, throw an exception, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.

alter_table(table_path: pyflink.table.catalog.ObjectPath, new_table: pyflink.table.catalog.CatalogBaseTable, ignore_if_not_exists)[source]¶

Modify an existing table or view. Note that the new and old CatalogBaseTable must be of the same type. For example, this doesn’t allow alter a regular table to partitioned table, or alter a view to a table, and vice versa.

Parameters
  • table_path – Path ObjectPath of the table or view to be modified.

  • new_table – The new table definition CatalogBaseTable.

  • ignore_if_not_exists – Flag to specify behavior when the table or view does not exist: if set to false, throw an exception, if set to true, do nothing.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the table does not exist.

alter_table_column_statistics(table_path: pyflink.table.catalog.ObjectPath, column_statistics: pyflink.table.catalog.CatalogColumnStatistics, ignore_if_not_exists: bool)[source]¶

Update the column statistics of a table.

Parameters
  • table_path – Path ObjectPath of the table.

  • column_statistics – New column statistics CatalogColumnStatistics to update.

  • ignore_if_not_exists – Flag to specify behavior if the column does not exist: if set to false, throw an exception, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.

alter_table_statistics(table_path: pyflink.table.catalog.ObjectPath, table_statistics: pyflink.table.catalog.CatalogTableStatistics, ignore_if_not_exists: bool)[source]¶

Update the statistics of a table.

Parameters
  • table_path – Path ObjectPath of the table.

  • table_statistics – New statistics CatalogTableStatistics to update.

  • ignore_if_not_exists – Flag to specify behavior if the table does not exist: if set to false, throw an exception, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.

create_database(name: str, database: pyflink.table.catalog.CatalogDatabase, ignore_if_exists: bool)[source]¶

Create a database.

Parameters
  • name – Name of the database to be created.

  • database – The CatalogDatabase database definition.

  • ignore_if_exists – Flag to specify behavior when a database with the given name already exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do nothing.

Raise

CatalogException in case of any runtime exception. DatabaseAlreadyExistException if the given database already exists and ignoreIfExists is false.

create_function(function_path: pyflink.table.catalog.ObjectPath, function: pyflink.table.catalog.CatalogFunction, ignore_if_exists: bool)[source]¶

Create a function.

Parameters
  • function_path – Path ObjectPath of the function.

  • function – The function CatalogFunction to be created.

  • ignore_if_exists – Flag to specify behavior if a function with the given name already exists: if set to false, it throws a FunctionAlreadyExistException, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. FunctionAlreadyExistException if the function already exist. DatabaseNotExistException if the given database does not exist.

create_partition(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec, partition: pyflink.table.catalog.CatalogPartition, ignore_if_exists: bool)[source]¶

Create a partition.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition.

  • partition – The partition CatalogPartition to add.

  • ignore_if_exists – Flag to specify behavior if a table with the given name already exists: if set to false, it throws a TableAlreadyExistException, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. TableNotExistException thrown if the target table does not exist. TableNotPartitionedException thrown if the target table is not partitioned. PartitionSpecInvalidException thrown if the given partition spec is invalid. PartitionAlreadyExistsException thrown if the target partition already exists.

create_table(table_path: pyflink.table.catalog.ObjectPath, table: pyflink.table.catalog.CatalogBaseTable, ignore_if_exists: bool)[source]¶

Create a new table or view.

Parameters
  • table_path – Path ObjectPath of the table or view to be created.

  • table – The table definition CatalogBaseTable.

  • ignore_if_exists – Flag to specify behavior when a table or view already exists at the given path: if set to false, it throws a TableAlreadyExistException, if set to true, do nothing.

Raise

CatalogException in case of any runtime exception. DatabaseNotExistException if the database in tablePath doesn’t exist. TableAlreadyExistException if table already exists and ignoreIfExists is false.

database_exists(database_name: str) → bool[source]¶

Check if a database exists in this catalog.

Parameters

database_name – Name of the database.

Returns

true if the given database exists in the catalog false otherwise.

Raise

CatalogException in case of any runtime exception.

drop_database(name: str, ignore_if_exists: bool)[source]¶

Drop a database.

Parameters
  • name – Name of the database to be dropped.

  • ignore_if_exists – Flag to specify behavior when the database does not exist: if set to false, throw an exception, if set to true, do nothing.

Raise

CatalogException in case of any runtime exception. DatabaseNotExistException if the given database does not exist.

drop_function(function_path: pyflink.table.catalog.ObjectPath, ignore_if_not_exists: bool)[source]¶

Drop a function.

Parameters
  • function_path – Path ObjectPath of the function to be dropped.

  • ignore_if_not_exists – Flag to specify behavior if the function does not exist: if set to false, throw an exception if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. FunctionNotExistException if the function does not exist.

drop_partition(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec, ignore_if_not_exists: bool)[source]¶

Drop a partition.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition to drop.

  • ignore_if_not_exists – Flag to specify behavior if the database does not exist: if set to false, throw an exception, if set to true, nothing happens.

Raise

CatalogException in case of any runtime exception. PartitionNotExistException thrown if the target partition does not exist.

drop_table(table_path: pyflink.table.catalog.ObjectPath, ignore_if_not_exists: bool)[source]¶

Drop a table or view.

Parameters
  • table_path – Path ObjectPath of the table or view to be dropped.

  • ignore_if_not_exists – Flag to specify behavior when the table or view does not exist: if set to false, throw an exception, if set to true, do nothing.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the table or view does not exist.

function_exists(function_path: pyflink.table.catalog.ObjectPath) → bool[source]¶

Check whether a function exists or not.

Parameters

function_path – Path ObjectPath of the function.

Returns

true if the function exists in the catalog false otherwise.

Raise

CatalogException in case of any runtime exception.

get_database(database_name: str) → pyflink.table.catalog.CatalogDatabase[source]¶

Get a database from this catalog.

Parameters

database_name – Name of the database.

Returns

The requested database CatalogDatabase.

Raise

CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.

get_default_database() → str[source]¶

Get the name of the default database for this catalog. The default database will be the current database for the catalog when user’s session doesn’t specify a current database. The value probably comes from configuration, will not change for the life time of the catalog instance.

Returns

The name of the current database.

Raise

CatalogException in case of any runtime exception.

get_function(function_path: pyflink.table.catalog.ObjectPath) → pyflink.table.catalog.CatalogFunction[source]¶

Get the function.

Parameters

function_path – Path ObjectPath of the function.

Returns

The requested function CatalogFunction.

Raise

CatalogException in case of any runtime exception. FunctionNotExistException if the function does not exist in the catalog.

get_partition(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec) → pyflink.table.catalog.CatalogPartition[source]¶

Get a partition of the given table. The given partition spec keys and values need to be matched exactly for a result.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – The partition spec CatalogPartitionSpec of partition to get.

Returns

The requested partition CatalogPartition.

Raise

CatalogException in case of any runtime exception. PartitionNotExistException thrown if the partition doesn’t exist.

get_partition_column_statistics(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec) → pyflink.table.catalog.CatalogColumnStatistics[source]¶

Get the column statistics of a partition.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition.

Returns

The column statistics CatalogColumnStatistics of the given partition.

Raise

CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.

get_partition_statistics(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec) → pyflink.table.catalog.CatalogTableStatistics[source]¶

Get the statistics of a partition.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition.

Returns

The statistics CatalogTableStatistics of the given partition.

Raise

CatalogException in case of any runtime exception. PartitionNotExistException if the partition does not exist.

get_table(table_path: pyflink.table.catalog.ObjectPath) → pyflink.table.catalog.CatalogBaseTable[source]¶

Get a CatalogTable or CatalogView identified by tablePath.

Parameters

table_path – Path ObjectPath of the table or view.

Returns

The requested table or view CatalogBaseTable.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the target does not exist.

get_table_column_statistics(table_path: pyflink.table.catalog.ObjectPath) → pyflink.table.catalog.CatalogColumnStatistics[source]¶

Get the column statistics of a table.

Parameters

table_path – Path ObjectPath of the table.

Returns

The column statistics CatalogColumnStatistics of the given table.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.

get_table_statistics(table_path: pyflink.table.catalog.ObjectPath) → pyflink.table.catalog.CatalogTableStatistics[source]¶

Get the statistics of a table.

Parameters

table_path – Path ObjectPath of the table.

Returns

The statistics CatalogTableStatistics of the given table.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the table does not exist in the catalog.

list_databases() → List[str][source]¶

Get the names of all databases in this catalog.

Returns

A list of the names of all databases.

Raise

CatalogException in case of any runtime exception.

list_functions(database_name: str) → List[str][source]¶

List the names of all functions in the given database. An empty list is returned if none is registered.

Parameters

database_name – Name of the database.

Returns

A list of the names of the functions in this database.

Raise

CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.

list_partitions(table_path: pyflink.table.catalog.ObjectPath, partition_spec: Optional[pyflink.table.catalog.CatalogPartitionSpec] = None) → List[pyflink.table.catalog.CatalogPartitionSpec][source]¶

Get CatalogPartitionSpec of all partitions of the table.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – The partition spec CatalogPartitionSpec to list.

Returns

A list of CatalogPartitionSpec of the table.

Raise

CatalogException in case of any runtime exception. TableNotExistException thrown if the table does not exist in the catalog. TableNotPartitionedException thrown if the table is not partitioned.

list_tables(database_name: str) → List[str][source]¶

Get names of all tables and views under this database. An empty list is returned if none exists.

Parameters

database_name – Name of the given database.

Returns

A list of the names of all tables and views in this database.

Raise

CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.

list_views(database_name: str) → List[str][source]¶

Get names of all views under this database. An empty list is returned if none exists.

Parameters

database_name – Name of the given database.

Returns

A list of the names of all views in the given database.

Raise

CatalogException in case of any runtime exception. DatabaseNotExistException if the database does not exist.

partition_exists(table_path: pyflink.table.catalog.ObjectPath, partition_spec: pyflink.table.catalog.CatalogPartitionSpec) → bool[source]¶

Check whether a partition exists or not.

Parameters
  • table_path – Path ObjectPath of the table.

  • partition_spec – Partition spec CatalogPartitionSpec of the partition to check.

Returns

true if the partition exists.

Raise

CatalogException in case of any runtime exception.

rename_table(table_path: pyflink.table.catalog.ObjectPath, new_table_name: str, ignore_if_not_exists: bool)[source]¶

Rename an existing table or view.

Parameters
  • table_path – Path ObjectPath of the table or view to be renamed.

  • new_table_name – The new name of the table or view.

  • ignore_if_not_exists – Flag to specify behavior when the table or view does not exist: if set to false, throw an exception, if set to true, do nothing.

Raise

CatalogException in case of any runtime exception. TableNotExistException if the table does not exist.

table_exists(table_path: pyflink.table.catalog.ObjectPath) → bool[source]¶

Check if a table or view exists in this catalog.

Parameters

table_path – Path ObjectPath of the table or view.

Returns

true if the given table exists in the catalog false otherwise.

Raise

CatalogException in case of any runtime exception.

class pyflink.table.catalog.CatalogDatabase(j_catalog_database)[source]¶

Represents a database object in a catalog.

copy() → pyflink.table.catalog.CatalogDatabase[source]¶

Get a deep copy of the CatalogDatabase instance.

Returns

A copy of CatalogDatabase instance.

static create_instance(properties: Dict[str, str], comment: str = None) → pyflink.table.catalog.CatalogDatabase[source]¶

Creates an instance of CatalogDatabase.

Parameters
  • properties – Property of the database

  • comment – Comment of the database

get_comment() → str[source]¶

Get comment of the database.

Returns

Comment of the database.

get_description() → Optional[str][source]¶

Get a brief description of the database.

Returns

An optional short description of the database.

get_detailed_description() → Optional[str][source]¶

Get a detailed description of the database.

Returns

An optional long description of the database.

get_properties() → Dict[str, str][source]¶

Get a map of properties associated with the database.

class pyflink.table.catalog.CatalogBaseTable(j_catalog_base_table)[source]¶

CatalogBaseTable is the common parent of table and view. It has a map of key-value pairs defining the properties of the table.

copy() → pyflink.table.catalog.CatalogBaseTable[source]¶

Get a deep copy of the CatalogBaseTable instance.

Returns

An copy of the CatalogBaseTable instance.

static create_table(schema: pyflink.table.table_schema.TableSchema, partition_keys: List[str] = [], properties: Dict[str, str] = {}, comment: str = None) → pyflink.table.catalog.CatalogBaseTable[source]¶

Create an instance of CatalogBaseTable for the catalog table.

Parameters
  • schema – the table schema

  • partition_keys – the partition keys, default empty

  • properties – the properties of the catalog table

  • comment – the comment of the catalog table

static create_view(original_query: str, expanded_query: str, schema: pyflink.table.table_schema.TableSchema, properties: Dict[str, str], comment: str = None) → pyflink.table.catalog.CatalogBaseTable[source]¶

Create an instance of CatalogBaseTable for the catalog view.

Parameters
  • original_query – the original text of the view definition

  • expanded_query – the expanded text of the original view definition, this is needed because the context such as current DB is lost after the session, in which view is defined, is gone. Expanded query text takes care of the this, as an example.

  • schema – the table schema

  • properties – the properties of the catalog view

  • comment – the comment of the catalog view

get_comment() → str[source]¶

Get comment of the table or view.

Returns

Comment of the table/view.

get_description() → Optional[str][source]¶

Get a brief description of the table or view.

Returns

An optional short description of the table/view.

get_detailed_description() → Optional[str][source]¶

Get a detailed description of the table or view.

Returns

An optional long description of the table/view.

get_options()[source]¶

Returns a map of string-based options.

In case of CatalogTable, these options may determine the kind of connector and its configuration for accessing the data in the external system.

Returns

Property map of the table/view.

New in version 1.11.0.

get_properties() → Dict[str, str][source]¶

Get the properties of the table.

Returns

Property map of the table/view.

Note

This method is deprecated. Use get_options() instead.

get_schema() → pyflink.table.table_schema.TableSchema[source]¶

Get the schema of the table.

Returns

Schema of the table/view.

class pyflink.table.catalog.CatalogPartition(j_catalog_partition)[source]¶

Represents a partition object in catalog.

copy() → pyflink.table.catalog.CatalogPartition[source]¶

Get a deep copy of the CatalogPartition instance.

Returns

A copy of CatalogPartition instance.

static create_instance(properties: Dict[str, str], comment: str = None) → pyflink.table.catalog.CatalogPartition[source]¶

Creates an instance of CatalogPartition.

Parameters
  • properties – Property of the partition

  • comment – Comment of the partition

get_comment() → str[source]¶

Get comment of the partition.

Returns

Comment of the partition.

get_description() → Optional[str][source]¶

Get a brief description of the partition object.

Returns

An optional short description of partition object.

get_detailed_description() → Optional[str][source]¶

Get a detailed description of the partition object.

Returns

An optional long description of the partition object.

get_properties() → Dict[str, str][source]¶

Get a map of properties associated with the partition.

Returns

A map of properties with the partition.

class pyflink.table.catalog.CatalogFunction(j_catalog_function)[source]¶

Interface for a function in a catalog.

copy() → pyflink.table.catalog.CatalogFunction[source]¶

Create a deep copy of the function.

Returns

A deep copy of “this” instance.

static create_instance(class_name: str, function_language: str = 'Python') → pyflink.table.catalog.CatalogFunction[source]¶

Creates an instance of CatalogDatabase.

Parameters
  • class_name – full qualified path of the class name

  • function_language – language of the function, must be one of ‘Python’, ‘Java’ or ‘Scala’. (default Python)

get_class_name() → str[source]¶

Get the full name of the class backing the function.

Returns

The full name of the class.

get_description() → Optional[str][source]¶

Get a brief description of the function.

Returns

An optional short description of function.

get_detailed_description() → Optional[str][source]¶

Get a detailed description of the function.

Returns

An optional long description of the function.

get_function_language()[source]¶

Get the language used for the function definition.

Returns

the language type of the function definition

New in version 1.10.0.

is_generic() → bool[source]¶

Whether or not is the function a flink UDF.

Returns

Whether is the function a flink UDF.

New in version 1.10.0.

class pyflink.table.catalog.ObjectPath(database_name=None, object_name=None, j_object_path=None)[source]¶

A database name and object (table/view/function) name combo in a catalog.

static from_string(full_name: str) → pyflink.table.catalog.ObjectPath[source]¶
get_database_name() → str[source]¶
get_full_name() → str[source]¶
get_object_name() → str[source]¶
class pyflink.table.catalog.CatalogPartitionSpec(partition_spec)[source]¶

Represents a partition spec object in catalog. Partition columns and values are NOT of strict order, and they need to be re-arranged to the correct order by comparing with a list of strictly ordered partition keys.

get_partition_spec() → Dict[str, str][source]¶

Get the partition spec as key-value map.

Returns

A map of partition spec keys and values.

class pyflink.table.catalog.CatalogTableStatistics(row_count=None, field_count=None, total_size=None, raw_data_size=None, properties=None, j_catalog_table_statistics=None)[source]¶

Statistics for a non-partitioned table or a partition of a partitioned table.

copy() → pyflink.table.catalog.CatalogTableStatistics[source]¶

Create a deep copy of “this” instance.

get_field_count() → int[source]¶

The number of files on disk.

get_properties() → Dict[str, str][source]¶
get_raw_data_size() → int[source]¶

The raw data size (size when loaded in memory) in bytes.

get_row_count() → int[source]¶

The number of rows in the table or partition.

get_total_size() → int[source]¶

The total size in bytes.

class pyflink.table.catalog.CatalogColumnStatistics(column_statistics_data=None, properties=None, j_catalog_column_statistics=None)[source]¶

Column statistics of a table or partition.

copy() → pyflink.table.catalog.CatalogColumnStatistics[source]¶
get_column_statistics_data()[source]¶
get_properties() → Dict[str, str][source]¶
class pyflink.table.catalog.HiveCatalog(catalog_name: str, default_database: str = None, hive_conf_dir: str = None)[source]¶

A catalog implementation for Hive.

Logo

Table of Contents

  • pyflink.table package
    • Module contents
    • pyflink.table.expressions module
    • pyflink.table.window module
    • pyflink.table.descriptors module
    • pyflink.table.catalog module

Previous topic

pyflink.common package

Next topic

pyflink.dataset package

This Page

  • Show Source

Quick search

Navigation

  • next
  • previous |
  • PyFlink 1.12.dev0 documentation »
  • pyflink package »
© Copyright . Created using Sphinx 2.4.4.