Python REPL

    To use the shell with an integrated Flink cluster just execute:

    in the root directory of your binary Flink directory. To run the Shell on acluster, please see the Setup section below.

    The shell only supports Table API currently.The Table Environments are automatically prebound after startup. Use “bt_env” and “st_env” to access BatchTableEnvironment and StreamTableEnvironment respectively.

    1. >>> import os
    2. >>> import shutil
    3. >>> sink_path = tempfile.gettempdir() + '/streaming.csv'
    4. >>> if os.path.exists(sink_path):
    5. ... if os.path.isfile(sink_path):
    6. ... os.remove(sink_path)
    7. ... else:
    8. ... shutil.rmtree(sink_path)
    9. >>> s_env.set_parallelism(1)
    10. >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    11. >>> st_env.connect(FileSystem().path(sink_path))\
    12. ... .with_format(OldCsv()
    13. ... .field_delimiter(',')
    14. ... .field("a", DataTypes.BIGINT())
    15. ... .field("b", DataTypes.STRING())
    16. ... .field("c", DataTypes.STRING()))\
    17. ... .with_schema(Schema()
    18. ... .field("a", DataTypes.BIGINT())
    19. ... .field("b", DataTypes.STRING())
    20. ... .field("c", DataTypes.STRING()))\
    21. ... .register_table_sink("stream_sink")
    22. >>> t.select("a + 1, b, c")\
    23. ... .insert_into("stream_sink")
    24. >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
    25. >>> with open(sink_path, 'r') as f:
    26. ... print(f.read())
    1. >>> import tempfile
    2. >>> import os
    3. >>> import shutil
    4. >>> if os.path.exists(sink_path):
    5. ... if os.path.isfile(sink_path):
    6. ... os.remove(sink_path)
    7. ... else:
    8. ... shutil.rmtree(sink_path)
    9. >>> b_env.set_parallelism(1)
    10. >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    11. >>> bt_env.connect(FileSystem().path(sink_path))\
    12. ... .with_format(OldCsv()
    13. ... .field_delimiter(',')
    14. ... .field("a", DataTypes.BIGINT())
    15. ... .field("b", DataTypes.STRING())
    16. ... .field("c", DataTypes.STRING()))\
    17. ... .with_schema(Schema()
    18. ... .field("a", DataTypes.BIGINT())
    19. ... .field("b", DataTypes.STRING())
    20. ... .field("c", DataTypes.STRING()))\
    21. ... .register_table_sink("batch_sink")
    22. >>> t.select("a + 1, b, c")\
    23. ... .insert_into("batch_sink")
    24. >>> bt_env.execute("batch_job")
    25. >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
    26. >>> with open(sink_path, 'r') as f:
    27. ... print(f.read())

    To get an overview of what options the Python Shell provides, please use

    To use the shell with an integrated Flink cluster just execute:

    1. bin/pyflink-shell.sh local

    To use it with a running cluster, please start the Python shell with the keyword remoteand supply the host and port of the JobManager with:

      For example, to start a Yarn cluster for the Python Shell with two TaskManagersuse the following:

      For all other options, see the full reference at the bottom.

      If you have previously deployed a Flink cluster using the Flink Yarn Session,the Python shell can connect with it using the following command:

      1. bin/pyflink-shell.sh yarn
      1. Flink Python Shell
      2. Command: local [options]
      3. Starts Flink Python shell with a local Flink cluster
      4. usage:
      5. -h,--help Show the help message with descriptions of all options.
      6. Command: remote [options] <host> <port>
      7. Starts Flink Python shell connecting to a remote cluster
      8. <host>
      9. Remote host name as string
      10. <port>
      11. Remote port as integer
      12. usage:
      13. -h,--help Show the help message with descriptions of all options.
      14. Command: yarn [options]
      15. Starts Flink Python shell connecting to a yarn cluster
      16. usage:
      17. -h,--help Show the help message with descriptions of
      18. all options.
      19. -jm,--jobManagerMemory <arg> Memory for JobManager Container with
      20. optional unit (default: MB)
      21. -n,--container <arg> Number of YARN container to allocate
      22. (=Number of Task Managers)
      23. -nm,--name <arg> Set a custom name for the application on
      24. YARN
      25. -qu,--queue <arg> Specify YARN queue.
      26. -s,--slots <arg> Number of slots per TaskManager
      27. -tm,--taskManagerMemory <arg> Memory per TaskManager Container with
      28. optional unit (default: MB)