Deploy and manage functions worker

    Pulsar is a logic component to run Pulsar Functions in cluster mode. 有两种不同的选择,你可以选择一种你需要的方式:

    The following diagram illustrates the deployment of functions-workers running along with brokers.

    To enable functions-worker running as part of a broker, you need to set functionsWorkerEnabled to true in the broker.conf file.

    如果functionsWorkerEnabled设置为 true,Functions-worker 会作为 broker 的一部分运行。 You need to configure the conf/functions_worker.yml file to customize your functions_worker.

    在与 broker 一起运行 Functions-worker 时,需要先配置 Functions-worker,再与 broker 一起启动。

    在这个模式下,让 functions-worker 在 broker 上运行,其大多数配置已经继承了 broker 的配置(如配置存储设置,权限配置等等)。

    Pay attention to the following required settings when configuring functions-worker in this mode.

    • numFunctionPackageReplicas:存储 function 包的副本数。 默认值是 1,对独立部署很有用。 对于生产环境部署,为确保其高可用性,需设置为大于 2
    • initializedDlogMetadata: Whether to initialize distributed log metadata in runtime. 如果设置为 true,需要确保通过 bin/pulsar initialize-cluster-metadata 命令对其进行了初始化。

    If authentication is enabled on the BookKeeper cluster, configure the following BookKeeper authentication settings.

    • bookkeeperClientAuthenticationPlugin:BookKeeper 客户端身份验证插件的名称。
    • bookkeeperClientAuthenticationParametersName:BookKeeper 客户端身份验证插件的参数名称。
    • bookkeeperClientAuthenticationParameters:BookKeeper 客户端身份验证插件的参数。

    配置和 broker 共同运行的 Stateful-Functions

    如果想使用 Stateful-Functions 相关的函数(例如,putState()queryState() 相关的接口),参考以下步骤。

    1. Enable the streamStorage service in the BookKeeper.

      现在服务使用的是 NAR 包,所以需要在 bookkeeper.conf 中进行配置。

      1. extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent

      在启动 bookie 后,使用以下方法检查 streamStorage 服务是否正常启动。

      输入:

      1. telnet localhost 4181

      输出:

      1. Trying 127.0.0.1...
      2. Connected to localhost.
      3. Escape character is '^]'.
    1. ```text
    2. stateStorageServiceUrl: bk://<bk-service-url>:4181
    3. ```
    4. `bk-service-url` is the service URL pointing to the BookKeeper table service.

    Once you have configured the functions_worker.yml file, you can start or restart your broker.

    1. curl <broker-ip>:8080/admin/v2/worker/cluster

    After entering the command above, a list of active function workers in the cluster is returned. 会输出类似以下的内容:

    1. [{"workerId":"<worker-id>","workerHostname":"<worker-hostname>","port":8080}]

    This section illustrates how to run functions-worker as a separate process in separate machines.

    assets/functions-worker-separated.png

    配置 Functions-Worker 以单独运行

    To run function-worker separately, you have to configure the following parameters.

    Worker 参数

    • :类型为字符串。 它是整个集群是唯一的,用于标识每台 worker 机器
    • workerHostname:worker 计算机的主机名。
    • workerPort:worker 服务器的监听端口。 在未进行自定义时,请使用其默认值。
    • workerPortTls:worker 服务器监听的 TLS 端口。 在未进行自定义时,请使用其默认值。

    Function 包参数

    • numFunctionPackageReplicas:存储 function 包的副本数。 默认值为 1

    Function 元数据参数

    • pulsarServiceUrl:broker 集群的 Pulsar 服务 URL。
    • pulsarWebServiceUrl: The Pulsar web service URL for your broker cluster.
    • pulsarFunctionsCluster:设置 Pulsar 集群名称 (与 clusterName 在 broker 配置中的设置相同)。

    If authentication is enabled for your broker cluster, you should configure the authentication plugin and parameters for the functions worker to communicate with the brokers.

    • clientAuthenticationPlugin
    • clientAuthenticationParameters

    安全设置

    If you want to enable security on functions workers, you should:

    Enable TLS transport encryption

    To enable TLS transport encryption, configure the following settings.

    For details on TLS encryption, refer to .

    启用身份验证提供程序

    要在函数 worker 启用身份验证,你需要配置以下信息。

    1. authenticationEnabled: true
    2. authenticationProviders: [ provider1, provider2 ]

    For TLS Authentication provider, follow the example below to add the necessary settings. 查看 可以了解到详细的信息。

    1. brokerClientAuthenticationPlugin: org.apache.pulsar.client.impl.auth.AuthenticationTls
    2. brokerClientAuthenticationParameters: tlsCertFile:/path/to/admin.cert.pem,tlsKeyFile:/path/to/admin.key-pk8.pem
    3. authenticationEnabled: true
    4. authenticationProviders: ['org.apache.pulsar.broker.authentication.AuthenticationProviderTls']

    For SASL Authentication provider, add saslJaasClientAllowedIds and saslJaasBrokerSectionName under properties if needed.

    1. properties:
    2. saslJaasClientAllowedIds: .*pulsar.*
    3. saslJaasBrokerSectionName: Broker

    For Token Authentication provider, add necessary settings for properties if needed. 更多详细信息,请参阅 Token Authentication。 注意:秘钥文件必须是 DER 编码

    1. properties:
    2. # If using public/private
    3. # tokenPublicKey: file:///path/to/public.key
    启用授权提供程序

    如果需要启用函数 Worker 授权机制,你必须配置authorizationEnabledauthorizationProviderconfigurationStoreServers。 The authentication provider connects to configurationStoreServers to receive namespace policies.

    1. authorizationEnabled: true
    2. authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
    3. configurationStoreServers: <configuration-store-servers>

    You should also configure a list of superuser roles. The superuser roles are able to access any admin API. The following is a configuration example.

    1. superUserRoles:
    2. - role1
    3. - role2
    4. - role3
    启用端到端加密

    你可以使用应用程序配置的公钥私钥对进行加密。 只有拥有有效密钥的消费者可以解密加密过的消息。

    要启用 Functions Worker 端到端加密,可以在命令行使用 --producer-config 进行配置,更多信息可以参考 。

    • : 在生产者加密数据失败时,执行 FAILSEND 其中之一的操作。
    • consumerCryptoFailureAction: 在消费者解密数据失败时,执行 FAIL, DISCARD, CONSME 其中之一的操作。

    BookKeeper 身份验证

    如果要开启 BooKeeper 集群的身份认证,你必须配置以下 Bookeeper 认证选项:

    • bookkeeperClientAuthenticationPlugin:BookKeeper 客户端身份验证插件的名称。
    • bookkeeperClientAuthenticationParametersName:BookKeeper 客户端身份验证插件的参数名称。
    • bookkeeperClientAuthenticationParameters:BookKeeper 客户端身份验证插件的参数。

    一旦配置完 functions_worker.yml 文件,可以后台启动 functions-worker ,使用 命令,结合 pulsar-daemon 客户端工具进行:

    1. bin/pulsar-daemon start functions-worker

    也可以前台启动 functions-worker ,结合 pulsar 客户端工具进行:

    1. bin/pulsar functions-worker

    为 Functions-workers 配置 Proxies

    When you are running functions-worker in a separate cluster, the admin rest endpoints are split into two clusters. functions, function-worker, source and sink endpoints are now served by the functions-worker cluster, while all the other remaining endpoints are served by the broker cluster. Hence you need to configure your pulsar-admin to use the right service URL accordingly.

    In order to address this inconvenience, you can start a proxy cluster for routing the admin rest requests accordingly. Hence you will have one central entry point for your admin service.

    If you already have a proxy cluster, continue reading. If you haven’t setup a proxy cluster before, you can follow the instructions to start proxies.

    To enable routing functions related admin requests to functions-worker in a proxy, you can edit the proxy.conf file to modify the following settings:

    1. functionWorkerWebServiceURL=<pulsar-functions-worker-web-service-url>
    2. functionWorkerWebServiceURLTLS=<pulsar-functions-worker-web-service-url>

    As described above, you can run Function-worker with brokers, or run it separately. And it is more convenient to run functions-workers along with brokers. However, running functions-workers in a separate cluster provides better resource isolation for running functions in Process or Thread mode.

    Use which mode for your cases, refer to the following guidelines to determine.

    Use the Run-with-Broker mode in the following cases:

    • a)在 ProcessThread 模式下运行 functions,则不需要进行资源隔离;
    • b)在 Kubernetes 上配置 functions-worker 以运行 fucntions(Kubernetes 解决了资源隔离问题)。

    Use the Run-separately mode in the following cases:

    • a) 没有 Kubernetes 集群;
    • b) 不想单独运行 functions 或 brokers。

    Error message: Namespace missing local cluster name in clusters list

    1. Failed to get partitioned topic metadata: org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: Namespace missing local cluster name in clusters list: local_cluster=xyz ns=public/functions clusters=[standalone]

    The error message prompts when either of the cases occurs:

    • a) broker 是以 functionsWorkerEnabled=true 开始的,但是未在 conf/functions_worker.yaml 文件中将 pulsarFunctionsCluster 设置为正确的集群;
    • b) 当一个集群中的 brokers 运行良好,而另一个集群中的 brokers 运行有问题时,用 functionsWorkerEnabled=true 建立一个 Pulsar 集群的跨机房副本。

    Workaround

    If any of these cases happens, follow the instructions below to fix the problem:

    1. Disable Functions Worker by setting functionsWorkerEnabled=false, and restart brokers.

    2. 获取 public/functions 命名空间的当前集群列表。

    1. bin/pulsar-admin namespaces get-clusters public/functions
    1. 检查集群是否在集群列表中。 如果集群不在列表中,则将其添加到列表中,并更新列表。
    1. bin/pulsar-admin namespaces set-clusters --clusters <existing-clusters>,<new-cluster> public/functions
    1. Set the correct cluster name in pulsarFunctionsCluster in the conf/functions_worker.yml file, and restart brokers.