使用工作队列进行粗粒度并行处理

    本例中,每个 Pod 一旦被创建,会立即从任务队列中取走一个工作单元并完成它,然后将工作单元从队列中删除后再退出。

    下面是本次示例的主要步骤:

    1. 启动一个消息队列服务 本例中,我们使用 RabbitMQ,你也可以用其他的消息队列服务。在实际工作环境中,你可以创建一次消息队列服务然后在多个任务中重复使用。

    2. 创建一个队列,放上消息数据 每个消息表示一个要执行的任务。本例中,每个消息是一个整数值。我们将基于这个整数值执行很长的计算操作。

    3. 启动一个在队列中执行这些任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中取走一个任务,处理它,然后重复执行,直到队列的队尾。

    要熟悉 Job 基本用法(非并行的),请参考 。

    你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 如果你还没有集群,你可以通过 Minikube 构建一 个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:

    您的 Kubernetes 服务器版本必须不低于版本 v1.8. 要获知版本信息,请输入 .

    启动消息队列服务

    本例使用了 RabbitMQ,使用其他 AMQP 类型的消息服务应该比较容易。

    在实际工作中,在集群中一次性部署某个消息队列服务,之后在很多 Job 中复用,包括需要长期运行的服务。

    按下面的方法启动 RabbitMQ:

    1. service "rabbitmq-service" created
    1. kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-controller.yaml
    1. replicationcontroller "rabbitmq-controller" created

    我们仅用到 中描述的部分功能。

    测试消息队列服务

    现在,我们可以试着访问消息队列。我们将会创建一个临时的可交互的 Pod,在它上面安装一些工具,然后用队列做实验。

    首先创建一个临时的可交互的 Pod:

    1. # 创建一个临时的可交互的 Pod
    2. kubectl run -i --tty temp --image ubuntu:14.04
    1. Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
    2. ... [ previous line repeats several times .. hit return when it stops ] ...

    接下来安装 amqp-tools ,这样我们就能用消息队列了。

    后续,我们将制作一个包含这些包的 Docker 镜像。

    接着,我们将要验证我们发现 RabbitMQ 服务:

    1. # 请注意 rabbitmq-service 有Kubernetes 提供的 DNS 名称,
    2. root@temp-loe07:/# nslookup rabbitmq-service
    3. Server: 10.0.0.10
    4. Address: 10.0.0.10#53
    5. Name: rabbitmq-service.default.svc.cluster.local
    6. Address: 10.0.147.152
    7. # 你的 IP 地址会不同

    如果 Kube-DNS 没有正确安装,上一步可能会出错。 你也可以在环境变量中找到服务 IP。

    1. # env | grep RABBIT | grep HOST
    2. RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
    3. # 你的 IP 地址会有所不同

    接着我们将要确认可以创建队列,并能发布消息和消费消息。

    1. # 下一行,rabbitmq-service 是访问 rabbitmq-service 的主机名。5672是 rabbitmq 的标准端口。
    2. root@temp-loe07:/# export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
    3. # 如果上一步中你不能解析 "rabbitmq-service",可以用下面的命令替换:
    4. # root@temp-loe07:/# BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
    5. # 现在创建队列:
    6. root@temp-loe07:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d foo
    7. # 向它推送一条消息:
    8. root@temp-loe07:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
    9. # 然后取回它.
    10. root@temp-loe07:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
    11. Hello
    12. root@temp-loe07:/#

    最后一个命令中, amqp-consume 工具从队列中取走了一个消息,并把该消息传递给了随机命令的标准输出。在这种情况下,cat 只会打印它从标准输入或得的内容,echo 只会添加回车符以便示例可读。

    现在让我们给队列增加一些任务。在我们的示例中,任务是多个待打印的字符串。

    实践中,消息的内容可以是:

    • 待处理的文件名
    • 程序额外的参数
    • 数据库表的关键字范围
    • 模拟任务的配置参数
    • 待渲染的场景的帧序列号

    本例中,如果有大量的数据需要被 Job 的所有 Pod 读取,典型的做法是把它们放在一个共享文件系统中,如NFS,并以只读的方式挂载到所有 Pod,或者 Pod 中的程序从类似 HDFS 的集群文件系统中读取。

    例如,我们创建队列并使用 amqp 命令行工具向队列中填充消息。实践中,你可以写个程序来利用 amqp 客户端库来填充这些队列。

    1. /usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d job1
    2. for f in apple banana cherry date fig grape lemon melon
    3. do
    4. /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
    5. done

    这样,我们给队列中填充了8个消息。

    创建镜像

    现在我们可以创建一个做为 Job 来运行的镜像。

    我们将用 amqp-consume 来从队列中读取消息并实际运行我们的程序。这里给出一个非常简单的示例程序:

    application/job/rabbitmq/worker.py

    1. #!/usr/bin/env python
    2. # Just prints standard out and sleeps for 10 seconds.
    3. import sys
    4. import time
    5. print("Processing " + sys.stdin.readlines()[0])
    6. time.sleep(10)

    对于 , 给你的应用镜像打上标签, 标签为你的用户名,然后用下面的命令推送到 Hub。用你的 Hub 用户名替换 <username>

    1. docker push <username>/job-wq-1

    如果你在用谷歌容器仓库, 用你的项目 ID 作为标签打到你的应用镜像上,然后推送到 GCR。 用你的项目 ID 替换 <project>

    1. docker tag job-wq-1 gcr.io/<project>/job-wq-1
    2. gcloud docker -- push gcr.io/<project>/job-wq-1

    定义 Job

    这里给出一个 Job 定义 yaml文件。你需要拷贝一份并编辑镜像以匹配你使用的名称,保存为 ./job.yaml

    使用工作队列进行粗粒度并行处理 - 图1

    1. apiVersion: batch/v1
    2. kind: Job
    3. metadata:
    4. name: job-wq-1
    5. spec:
    6. completions: 8
    7. parallelism: 2
    8. template:
    9. metadata:
    10. name: job-wq-1
    11. spec:
    12. containers:
    13. - name: c
    14. image: gcr.io/<project>/job-wq-1
    15. env:
    16. - name: BROKER_URL
    17. value: amqp://guest:guest@rabbitmq-service:5672
    18. - name: QUEUE
    19. value: job1
    20. restartPolicy: OnFailure

    本例中,每个 Pod 使用队列中的一个消息然后退出。这样,Job 的完成计数就代表了完成的工作项的数量。本例中我们设置 .spec.completions: 8,因为我们放了8项内容在队列中。

    现在我们运行 Job:

    1. kubectl create -f ./job.yaml

    稍等片刻,然后检查 Job。

    1. kubectl describe jobs/job-wq-1

    我们所有的 Pod 都成功了。耶!

    替代方案

    本文所讲述的处理方法的好处是你不需要修改你的 “worker” 程序使其知道工作队列的存在。

    本文所描述的方法需要你运行一个消息队列服务。如果不方便运行消息队列服务,你也许会考虑另外一种 任务模式

    本文所述的方法为每个工作项创建了一个 Pod。 如果你的工作项仅需数秒钟,为每个工作项创建 Pod会增加很多的常规消耗。 可以考虑另外的方案请参考, 这种方案可以实现每个 Pod 执行多个工作项。

    示例中,我们使用 amqp-consume 从消息队列读取消息并执行我们真正的程序。 这样的好处是你不需要修改你的程序使其知道队列的存在。 要了解怎样使用客户端库和工作队列通信,请参考 不同的示例

    友情提醒

    如果设置的完成数量小于队列中的消息数量,会导致一部分消息项不会被执行。

    如果设置的完成数量大于队列中的消息数量,当队列中所有的消息都处理完成后, Job 也会显示为未完成。Job 将创建 Pod 并阻塞等待消息输入。

    • 在 amqp-consume 命令拿到消息和容器成功退出之间的时间段内,执行杀死容器操作;
    • 在 kubelet 向 api-server 传回 Pod 成功运行之前,发生节点崩溃。