Python API Tutorial
Firstly, you can fire up your favorite IDE and create a Python project and thenyou need to install the PyFlink package. Pleasesee Build PyFlinkfor more details about this.
The first step in a Flink Python Table API program is to create a (or StreamTableEnvironment
if you are writing a streaming job). It is the main entry pointfor Python Table API jobs.
The ExecutionEnvironment
(or StreamExecutionEnvironment
if you are writing a streaming job)can be used to set execution parameters, such as the restart strategy, default parallelism, etc.
Next we will create a source table and a sink table.
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.line_delimiter(' ')
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.register_table_sink('mySink')
This registers a table named and a table named mySink
in theExecutionEnvironment
. The table mySource
has only one column: word.It represents the words read from file /tmp/input
. The table mySink
has two columns:word and count. It writes data to file /tmp/output
, with \t
as the field delimiter.
Then we need to create a job which reads input from table mySource
, preforms someoperations and writes the results to table mySink
.
The complete code so far is as follows:
You can run this example in your IDE or on the command line (suppose the job script file isWordCount.py):
$ python WordCount.py
The command builds and runs the Python Table API program in a local mini cluster.You can also submit the Python Table API program to a remote cluster, you can referfor more details.