Configure Functions runtime
- Thread: Invoke functions threads in functions worker.
- Process: Invoke functions in processes forked by functions worker.
- Kubernetes: Submit functions as Kubernetes StatefulSets by functions worker.
线程和进程两种模式有如下不同:
- 线程模式:当函数运行在线程模式时,函数和 Puslsar functions 的 work 是运行在同一个Java 虚拟机(JVM) 里面的。
- 进程模式: 当函数运行在进程模式时,它们是运行在同一台机器上的不同 Java 虚拟机里面。
It is easy to configure Thread runtime. In most cases, you do not need to configure anything. You can customize the thread group name with the following settings:
Thread runtime is only supported in Java function.
When you enable Process runtime, you do not need to configure anything.
functionRuntimeFactoryConfigs:
# the directory for storing the function logs
logDirectory:
# change the jar location only when you put the java instance jar in a different location
javaInstanceJarLocation:
# change the python instance location only when you put the python instance jar in a different location
pythonInstanceLocation:
# change the extra dependencies location:
extraFunctionDependenciesDir:
Process runtime is supported in Java, Python, and Go functions.
当函数 worker 生成 Kubernetes manifests,应用这份 manifests 时,Kubernetes 就会开始工作。 如果你将函数运行在 Kubernetes 里面,你能够使用serviceAccount
去关联正在运行该函数的 Pod。 然后,可以将其配置为与 Kubernetes 集群进行通信。
Mainifests 由函数 worker 生成,包含一个StatefulSet
,一个Service
(用于 pods 之间通信),和一个Secret
(在需要的情况下,用于身份认证)。 默认情况下,StatefulSet
只有一个 Pod,函数的 “并行度” 决定它的数量。 Pode 启动时, Pod 会下载函数的运行内容 (通过函数 worker 的REST API 下载)。 Pod 的容器镜像是可以配置的,但是必须先有函数运行时。
Kubernetes 运行时是支持 secrets 的,所以你能够创建一个 Kubernetes secret,并将其作为环境变量在 Pod 内可见。 Kubernetes 运行时是可扩展的,你可以实现类并定制,比如:如何生成 Kubernetes manifests、如果 传递认证数据给pod,如何整合 secrets等。
It is easy to configure Kubernetes runtime. You can just uncomment the settings of kubernetesContainerFactory
in the functions_worker.yaml
file. The following is an example.
functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
functionRuntimeFactoryConfigs:
# uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
k8Uri:
# the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
jobNamespace:
# The Kubernetes pod name to run the function instances. It is set to
# `pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty
jobName:
# the docker image to run function instance. 默认情况下是`apachepulsar/pulsar`
pulsarDockerImageName:
# docker 镜像根据用户提供的不同配置运行函数实例。
# 默认是 `apachepulsar/pulsar`
# e.g:
# functionDockerImages:
# JAVA: JAVA_IMAGE_NAME
# PYTHON: PYTHON_IMAGE_NAME
# GO: GO_IMAGE_NAME
functionDockerImages:
# "The image pull policy for image used to run function instance. By default it is `IfNotPresent`
imagePullPolicy: IfNotPresent
# the root directory of pulsar home directory in `pulsarDockerImageName`. by default it is `/pulsar`.
# if you are using your own built image in `pulsarDockerImageName`, you need to set this setting accordingly
pulsarRootDir:
# The config admin CLI allows users to customize the configuration of the admin cli tool, such as:
# `/bin/pulsar-admin and /bin/pulsarctl`. By default it is `/bin/pulsar-admin`. If you want to use `pulsarctl`
configAdminCLI:
# this setting only takes effects if `k8Uri` is set to null. if your function worker is running as a k8 pod,
# setting this to true is let function worker to submit functions to the same k8s cluster as function worker
# is running. setting this to false if your function worker is not running as a k8 pod.
submittingInsidePod: false
# setting the pulsar service url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar service url configured in worker service
pulsarServiceUrl:
# setting the pulsar admin url that pulsar function should use to connect to pulsar
# if it is not set, it will use the pulsar admin url configured in worker service
pulsarAdminUrl:
# The flag indicates to install user code dependencies. (applied to python package)
installUserCodeDependencies:
# The repository that pulsar functions use to download python dependencies
pythonDependencyRepository:
# The repository that pulsar functions use to download extra python dependencies
pythonExtraDependencyRepository:
# the custom labels that function worker uses to select the nodes for pods
customLabels:
expectedMetricsCollectionInterval: 30
# Kubernetes Runtime will periodically checkback on
# this configMap if defined and if there are any changes
# to the kubernetes specific stuff, we apply those changes
changeConfigMap:
# The namespace for storing change config map
changeConfigMapNamespace:
# The ratio cpu request and cpu limit to be set for a function/source/sink.
# The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio
cpuOverCommitRatio: 1.0
# The ratio memory request and memory limit to be set for a function/source/sink.
# The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio
memoryOverCommitRatio: 1.0
# The port inside the function pod which is used by the worker to communicate with the pod
grpcPort: 9093
# The port inside the function pod on which prometheus metrics are exposed
metricsPort: 9094
# The directory inside the function pod where nar packages will be extracted
narExtractionDirectory:
# The classpath where function instance files stored
functionInstanceClassPath:
# the directory for dropping extra function dependencies
# if it is not an absolute path, it is relative to `pulsarRootDir`
extraFunctionDependenciesDir:
# Additional memory padding added on top of the memory requested by the function per on a per instance basis
percentMemoryPadding: 10
如果函数 worker 运行在 Kubernetes 环境的 broker 中,你可以使用默认配置。
Kubernetes 运行独立的函数 worker
例如,Pulsar brokers 和 函数 worker 都运行在 K8S的 pulsar
命名空间下。 Broker 有一个叫做 brokers
的服务,函数 worker 有一个叫做 func-worker
的服务。 他们的设置如下:
pulsarServiceUrl: pulsar://broker.pulsar:6650 // 如果使用了 TLS 就用 :pulsar+ssl://broker.pulsar:6651
pulsarAdminUrl: http://func-worker.pulsar:8080 // 如果使用了TLS 就用: https://func-worker:8443
如果要在集群中运行 RBAC,需要确保运行函数worker(如果函数运行在 broker 上,那就是 broker )的 service account 有以下 Kubernetes API 的访问权限。
- services
- configmaps
- pods
- apps.statefulsets
如下是完整的配置:
如果 service account 的配置不正确,就会有如下的错误信息提示。
22:04:27.696 [Timer-0] ERROR org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory - Error while trying to fetch configmap example-pulsar-4qvmb5gur3c6fc9dih0x1xn8b-function-worker-config at namespace pulsar
at io.kubernetes.client.ApiClient.handleResponse(ApiClient.java:882) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.ApiClient.execute(ApiClient.java:798) ~[io.kubernetes-client-java-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMapWithHttpInfo(CoreV1Api.java:23673) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at io.kubernetes.client.apis.CoreV1Api.readNamespacedConfigMap(CoreV1Api.java:23655) ~[io.kubernetes-client-java-api-2.0.0.jar:?]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.fetchConfigMap(KubernetesRuntimeFactory.java:284) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory$1.run(KubernetesRuntimeFactory.java:275) [org.apache.pulsar-pulsar-functions-runtime-2.4.0-42c3bf949.jar:2.4.0-42c3bf949]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_212]
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_212]
集成 Kubernetes secrets
为了确保信息安全 ,Pulsar 函数可以引用Kubernetes secrets。 如果要启用此功能,请将参数secretsProviderConfiguratorClassName
设置为org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator
。
你可以在部署函数的 kubernetes 命名空间中创建一个 secret。 例如,你的函数部署在 Kubernetes 的命名空间 pulsar-func
中,你有一个名为database-creds
的secret,它有一个字段password
,你希望将其作为一个环境变量传递到 pod 中,并且想用名称DATABASE_PASSWORD
获取到它。 那么下面的函数配置允许你将这个 secret 以环境变量的形式传递到 pod 里面。
tenant: "mytenant"
namespace: "mynamespace"
name: "myfunction"
topicName: "persistent://mytenant/mynamespace/myfuncinput"
className: "com.company.pulsar.myfunction"
secrets:
# secret 中的`password`和`database-creds`将被挂载到名为 `DATABASE_PASSWORD `的环境变量中。
DATABASE_PASSWORD:
path: "database-creds"
key: "password"
当你开启 Pulsar 集群的身份认证时,你需要为运行函数的 pod 提供一种机制,以通过 broker的身份认证。
org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider
接口可以为任何认证机制提供支持。 配置文件function-worker.yml
的配置项 允许你自定义认证实现机制。
Pulsar 自带了令牌身份认证的实现,并通过相同的方式实现了分发证书授权。 配置如下:
functionAuthProviderClassName: org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider
使用 Token 身份认证时,函数 worker 会获取到 token,并将其用于部署或者更新函数。 Token 通过 secret 的方式挂载到 pod 里面。
自定义的认证方式或者TLS, 你必须去实现上面的接口或者使用替代机制实现身份认证。 如果你在集群中使用 Token 身份认证和 TLS 加密实现安全通信,Pulsar 会将证书授权(CA)传递给客户端,所以客户端就可以获取到集群认证所需的内容,同时信任集群所签发的证书。
启用身份认证
当您在启用了身份认证的集群中独立运行函数 worker 时, 你必须配置函数 worker 和 broker 交互时传入身份认证信息。 所以你必须配置 broker 所需的身份认证和鉴权选项。
例如,如果是使用 Token 认证,你必须在function-worker.yml
文件内配置如下属性。
Kubernetes 集成允许你实现类并自定义如何生成manifests. 你可以为配置文件functions-worker.yml
的配置项runtimeCustomizerClassName
指定一个全路径的类名。 这个类必须实现org.apache.pulsar.functions.runtime.kubernetes.KubernetesManifestCustomizer
接口。
函数(数据来源/ 数据去向) API 会提供了一个标记,customRuntimeOptions
,传递到这个接口。
To initialize the KubernetesManifestCustomizer
, you can provide runtimeCustomizerConfig
in the functions-worker.yml
file. runtimeCustomizerConfig
is passed to the public void initialize(Map<String, Object> config)
function of the interface. runtimeCustomizerConfig
is different from the customRuntimeOptions
as runtimeCustomizerConfig
is the same across all functions. If you provide both runtimeCustomizerConfig
and customRuntimeOptions
, you need to decide how to manage these two configurations in your implementation of KubernetesManifestCustomizer
.
Pulsar 自带了一些实现。 若要使用基本的功能,可以将runtimeCustomizerClassName
设置为org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer
。 The built-in implementation initialized with runtimeCustomizerConfig
enables you to pass a JSON document as customRuntimeOptions
with certain properties to augment, which decides how the manifests are generated. If both runtimeCustomizerConfig
and customRuntimeOptions
are provided, BasicKubernetesManifestCustomizer
uses customRuntimeOptions
to override the configuration if there are conflicts in these two configurations.
Below is an example of customRuntimeOptions
.
{
"jobName": "jobname", // the k8s pod name to run this function instance
"jobNamespace": "namespace", // the k8s namespace to run this function in
"extractLabels": { // extra labels to attach to the statefulSet, service, and pods
"extraLabel": "value"
},
"extraAnnotations": { // extra annotations to attach to the statefulSet, service, and pods
"extraAnnotation": "value"
},
"nodeSelectorLabels": { // node selector labels to add on to the pod spec
"customLabel": "value"
},
"tolerations": [ // tolerations to add to the pod spec
{
"key": "custom-key",
"value": "value",
"effect": "NoSchedule"
}
],
"resourceRequirements": { // values for cpu and memory should be defined as described here: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
"requests": {
"cpu": 1,
"memory": "4G"
},
"limits": {
"cpu": 2,
"memory": "8G"
}
}
}
如果你想使用跨域复制运行多个集群,每个集群必须使用不同的函数命名空间。 否则,函数会共享同一个命名空间,可能跨集群调度。
例如,假设你有两个集群:east-1
和west-1
, 你可能对这两个函数做如下的运行配置:
pulsarFunctionsCluster: east-1
pulsarFunctionsNamespace: public/functions-east-1
pulsarFunctionsCluster: west-1
这确保了两个不同的函数 Worker 使用两个不同的主题进行内部调度。
你必须需配置一下必选参数。