Python API Tutorial

    Firstly, you can fire up your favorite IDE and create a Python project and thenyou need to install the PyFlink package. Pleasesee Build PyFlinkfor more details about this.

    The first step in a Flink Python Table API program is to create a (or StreamTableEnvironment if you are writing a streaming job). It is the main entry pointfor Python Table API jobs.

    The ExecutionEnvironment (or StreamExecutionEnvironment if you are writing a streaming job)can be used to set execution parameters, such as the restart strategy, default parallelism, etc.

    Next we will create a source table and a sink table.

    1. t_env.connect(FileSystem().path('/tmp/input')) \
    2. .with_format(OldCsv()
    3. .line_delimiter(' ')
    4. .with_schema(Schema()
    5. .field('word', DataTypes.STRING())) \
    6. t_env.connect(FileSystem().path('/tmp/output')) \
    7. .with_format(OldCsv()
    8. .field('word', DataTypes.STRING())
    9. .field('count', DataTypes.BIGINT())) \
    10. .with_schema(Schema()
    11. .field('word', DataTypes.STRING())
    12. .field('count', DataTypes.BIGINT())) \
    13. .register_table_sink('mySink')

    This registers a table named and a table named mySink in theExecutionEnvironment. The table mySource has only one column: word.It represents the words read from file /tmp/input. The table mySink has two columns:word and count. It writes data to file /tmp/output, with \t as the field delimiter.

    Then we need to create a job which reads input from table mySource, preforms someoperations and writes the results to table mySink.

      The complete code so far is as follows:

      You can run this example in your IDE or on the command line (suppose the job script file isWordCount.py):

      1. $ python WordCount.py

      The command builds and runs the Python Table API program in a local mini cluster.You can also submit the Python Table API program to a remote cluster, you can referfor more details.