Deploy Pulsar Functions
- 可以在本地运行 standalone cluster。
- You can deploy a Pulsar cluster on , Amazon Web Services, , DC/OS, and more.
运行一个非 集群,需要获取集群服务的 URL。 如何获取集群服务的 URL 取决于如何部署 Pulsar 集群。
想要部署并触发 Python 的用户自定义 function,需要在所有运行 functions workers 的设备上安装 。
Pulsar Functions 使用 pulsar-admin functions 接口进行部署和管理,通过 在 cluster mode 下部署 functions;通过 使用 triggering functions,通过 列出已部署的 functions。
了解更多命令,请参阅 pulsar-admin functions。
在管理 Pulsar Functions 时,需要指定关于 functions 的各种信息,包括租户、命名空间、输入主题、输出主题等。 但是,在不输入信息时,有些参数会使用默认值。 如下表所示。
默认参数示例
以 create
命令为例。
此 function 具有默认值的参数包括:function 名称(MyFunction
)、租户(public
)、命名空间(default
)、订阅类型(SHARED
)、处理保证(ATLEAST_ONCE
)、Pulsar 服务 URL (pulsar://localhost:6650
)。
If you run a Pulsar Function in local run mode, it runs on the machine from which you enter the commands (on your laptop, an AWS EC2 instance, and so on). 命令示例如下。
$ bin/pulsar-admin functions localrun \
--py myfunc.py \
--classname myfunc.SomeFunction \
--inputs persistent://public/default/input-1 \
--output persistent://public/default/output-1
默认情况下,该 function 通过 broker 服务的 URL 连接到在同一设备上运行的 Pulsar 集群。 如果想要在本地运行模式下运行一个 function ,并连接到非本地 Pulsar 集群,则可以通过 --brokerServiceUrl
标志来指定不同的 broker URL。 The following is an example.
$ bin/pulsar-admin functions localrun \
--broker-service-url pulsar://my-cluster-host:6650 \
When you run a Pulsar Function in cluster mode, the function code is uploaded to a Pulsar broker and runs alongside the broker rather than in your . 您可以使用 create 命令在群集模式下运行函数。
$ bin/pulsar-admin functions create \
--py myfunc.py \
--classname myfunc.SomeFunction \
--inputs persistent://public/default/input-1 \
--output persistent://public/default/output-1
$ bin/pulsar-admin functions update \
--py myfunc.py \
--classname myfunc.SomeFunction \
--inputs persistent://public/default/new-input-topic \
--output persistent://public/default/new-output-topic
Parallelism
Pulsar Functions run as processes or threads, which are called instances. 在运行 Pulsar Function 时,默认为单个实例。 使用一个本地运行命令只能运行 function 的单个实例。 想要运行多个实例则需要多次使用本地运行命令。
When you create a function, you can specify the parallelism of a function (the number of instances to run). You can set the parallelism factor using the --parallelism
flag of the create command.
You can adjust the parallelism of an already created function using the interface.
$ bin/pulsar-admin functions update \
--parallelism 5 \
# Other function
通过 YAML ,使用 parallelism
function 指定其配置。 配置示例如下。
# function-config.yaml
parallelism: 3
inputs:
- persistent://public/default/input-1
output: persistent://public/default/output-1
# other parameters
相关更新命令如下。
$ bin/pulsar-admin functions update \
--function-config-file function-config.yaml
在集群模式下运行 Pulsar Functions 时,可以指定资源分配给 function 的每个。
Resource | Specified as | Runtimes |
---|---|---|
CPU | The number of cores | Kubernetes |
RAM | The number of bytes | Process, Docker |
Disk space | The number of bytes | Docker |
为一个 function 分配 8 个内核、8GB 内存、10GB 磁盘空间的 function 创建命令如下。
$ bin/pulsar-admin functions create \
--classname org.example.functions.MyFunction \
--cpu 8 \
--ram 8589934592 \
--disk 10737418240
Use Package management service
Package 管理允许使用版本管理,并将简化对 Function、Sink 和 Source 的升级回滚过程。 当需要在不同的命名空间下使用相同的 Function、Sink 或 Source 时,可以将它们上传到一个共用的 Package 管理系统中。
To use , ensure that the package management service has been enabled in your cluster by setting the following properties in broker.conf
.
With Package management service enabled, you can upload your function packages by upload a package to the service and get the .
When you have a ready to use package URL, you can create the function with package URL by setting --jar
, --py
, or --go
to the package URL with pulsar-admin functions create
.
If a Pulsar Function is running in cluster mode, you can trigger it at any time using the command line. 触发 function 意味着向其发送具有特定值的消息,并通过命令行获取其输出(如有输入)。
要学习如何触发 function,可以从 Python function 开始,Python function 会返回基于输入的简单字符串。
# myfunc.py
def process(input):
return "This function has been triggered with a value of {0}".format(input)
可以在下运行 function。
$ bin/pulsar-admin functions create \
--tenant public \
--namespace default \
--name myfunc \
--py myfunc.py \
--classname myfunc \
--inputs persistent://public/default/in \
--output persistent://public/default/out
指定 consumer 以 pulsar-client consume 命令在输出 topic 上接收来自 myfunc
function 的消息。
$ bin/pulsar-client consume persistent://public/default/out \
--subscription-name my-subscription
--num-messages 0 # Listen indefinitely
然后可以触发 function。
$ bin/pulsar-admin functions trigger \
--tenant public \
--namespace default \
--name myfunc \
监听输出 topic 的 consumer 会在日志中生成如下内容。