Deploy Pulsar Functions

    运行一个非 集群,需要获取集群服务的 URL。 如何获取集群服务的 URL 取决于如何部署 Pulsar 集群。

    想要部署并触发 Python 的用户自定义 function,需要在所有运行 functions workers 的设备上安装 。

    Pulsar Functions 使用 pulsar-admin functions 接口进行部署和管理,通过 在 cluster mode 下部署 functions;通过 使用 triggering functions,通过 列出已部署的 functions。

    了解更多命令,请参阅 pulsar-admin functions

    在管理 Pulsar Functions 时,需要指定关于 functions 的各种信息,包括租户、命名空间、输入主题、输出主题等。 但是,在不输入信息时,有些参数会使用默认值。 如下表所示。

    默认参数示例

    create 命令为例。

    此 function 具有默认值的参数包括:function 名称(MyFunction)、租户(public)、命名空间(default)、订阅类型(SHARED)、处理保证(ATLEAST_ONCE)、Pulsar 服务 URL (pulsar://localhost:6650)。

    If you run a Pulsar Function in local run mode, it runs on the machine from which you enter the commands (on your laptop, an AWS EC2 instance, and so on). 命令示例如下。

    1. $ bin/pulsar-admin functions localrun \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/input-1 \
    5. --output persistent://public/default/output-1

    默认情况下,该 function 通过 broker 服务的 URL 连接到在同一设备上运行的 Pulsar 集群。 如果想要在本地运行模式下运行一个 function ,并连接到非本地 Pulsar 集群,则可以通过 --brokerServiceUrl 标志来指定不同的 broker URL。 The following is an example.

    1. $ bin/pulsar-admin functions localrun \
    2. --broker-service-url pulsar://my-cluster-host:6650 \

    When you run a Pulsar Function in cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your . 您可以使用 create 命令在群集模式下运行函数。

    1. $ bin/pulsar-admin functions create \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/input-1 \
    5. --output persistent://public/default/output-1
    1. $ bin/pulsar-admin functions update \
    2. --py myfunc.py \
    3. --classname myfunc.SomeFunction \
    4. --inputs persistent://public/default/new-input-topic \
    5. --output persistent://public/default/new-output-topic

    Parallelism

    Pulsar Functions run as processes or threads, which are called instances. 在运行 Pulsar Function 时,默认为单个实例。 使用一个本地运行命令只能运行 function 的单个实例。 想要运行多个实例则需要多次使用本地运行命令。

    When you create a function, you can specify the parallelism of a function (the number of instances to run). You can set the parallelism factor using the --parallelism flag of the create command.

    You can adjust the parallelism of an already created function using the interface.

    1. $ bin/pulsar-admin functions update \
    2. --parallelism 5 \
    3. # Other function

    通过 YAML ,使用 parallelism function 指定其配置。 配置示例如下。

    1. # function-config.yaml
    2. parallelism: 3
    3. inputs:
    4. - persistent://public/default/input-1
    5. output: persistent://public/default/output-1
    6. # other parameters

    相关更新命令如下。

    1. $ bin/pulsar-admin functions update \
    2. --function-config-file function-config.yaml

    集群模式下运行 Pulsar Functions 时,可以指定资源分配给 function 的每个。

    ResourceSpecified asRuntimes
    CPUThe number of coresKubernetes
    RAMThe number of bytesProcess, Docker
    Disk spaceThe number of bytesDocker

    为一个 function 分配 8 个内核、8GB 内存、10GB 磁盘空间的 function 创建命令如下。

    1. $ bin/pulsar-admin functions create \
    2. --classname org.example.functions.MyFunction \
    3. --cpu 8 \
    4. --ram 8589934592 \
    5. --disk 10737418240

    Use Package management service

    Package 管理允许使用版本管理,并将简化对 Function、Sink 和 Source 的升级回滚过程。 当需要在不同的命名空间下使用相同的 Function、Sink 或 Source 时,可以将它们上传到一个共用的 Package 管理系统中。

    To use , ensure that the package management service has been enabled in your cluster by setting the following properties in broker.conf.

    With Package management service enabled, you can upload your function packages by upload a package to the service and get the .

    When you have a ready to use package URL, you can create the function with package URL by setting --jar, --py, or --go to the package URL with pulsar-admin functions create.

    If a Pulsar Function is running in cluster mode, you can trigger it at any time using the command line. 触发 function 意味着向其发送具有特定值的消息,并通过命令行获取其输出(如有输入)。

    要学习如何触发 function,可以从 Python function 开始,Python function 会返回基于输入的简单字符串。

    1. # myfunc.py
    2. def process(input):
    3. return "This function has been triggered with a value of {0}".format(input)

    可以在下运行 function。

    1. $ bin/pulsar-admin functions create \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \
    5. --py myfunc.py \
    6. --classname myfunc \
    7. --inputs persistent://public/default/in \
    8. --output persistent://public/default/out

    指定 consumer 以 pulsar-client consume 命令在输出 topic 上接收来自 myfunc function 的消息。

    1. $ bin/pulsar-client consume persistent://public/default/out \
    2. --subscription-name my-subscription
    3. --num-messages 0 # Listen indefinitely

    然后可以触发 function。

    1. $ bin/pulsar-admin functions trigger \
    2. --tenant public \
    3. --namespace default \
    4. --name myfunc \

    监听输出 topic 的 consumer 会在日志中生成如下内容。