Kubernetes Setup

    Info This page describes deploying a standalone Flink session on top of Kubernetes. For information on native Kubernetes deployments read .

    Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster.If you want to run Kubernetes locally, we recommend using .

    Note: If using MiniKube please make sure to execute before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.

    A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a session cluster.Each job needs to be submitted to the cluster after the cluster has been deployed.

    A basic Flink session cluster deployment in Kubernetes has three components:

    • a Deployment/Job which runs the JobManager
    • a Deployment for a pool of TaskManagers
    • a Service exposing the JobManager’s REST and UI ports

    Using the resource definitions for a session cluster, launch the cluster with the kubectl command:

    You can then access the Flink UI via different ways:

    • kubectl proxy:

      • Run kubectl proxy in a terminal.
      • Navigate to in your browser.
    • kubectl port-forward:

      • Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
      • Navigate to http://localhost:8081 in your browser.
      • Moreover, you could use the following command below to submit jobs to the cluster:
    1. ./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
    • Create a NodePort service on the rest service of jobmanager:
      • Run kubectl create -f jobmanager-rest-service.yaml to create the NodePort service on jobmanager. The example of jobmanager-rest-service.yaml can be found in .
      • Run kubectl get svc flink-jobmanager-rest to know the node-port of this service and navigate to http://: in your browser.
      • Similarly to port-forward solution, you could also use the following command below to submit jobs to the cluster:
    1. ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar

    In order to terminate the Flink session cluster, use kubectl:

    A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.

    The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started.Therefore, one needs to build a dedicated container image for every job.Please follow these to build the Docker image.

    An early version of a Flink Helm chart is available on GitHub.

    The Deployment definitions use the pre-built image flink:latest which can be found .The image is built from this Github repository.

    flink-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 1
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. jobmanager.heap.size: 1024m
    15. taskmanager.memory.process.size: 1024m
    16. log4j.properties: |+
    17. log4j.rootLogger=INFO, file
    18. log4j.logger.akka=INFO
    19. log4j.logger.org.apache.hadoop=INFO
    20. log4j.logger.org.apache.zookeeper=INFO
    21. log4j.appender.file=org.apache.log4j.FileAppender
    22. log4j.appender.file.file=${log.file}
    23. log4j.appender.file.layout=org.apache.log4j.PatternLayout
    24. log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

    jobmanager-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: flink:latest
    20. workingDir: /opt/flink
    21. command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
    22. while :;
    23. do
    24. if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
    25. then tail -f -n +1 log/*jobmanager*.log;
    26. fi;
    27. done"]
    28. ports:
    29. - containerPort: 6123
    30. name: rpc
    31. - containerPort: 6124
    32. name: blob
    33. - containerPort: 8081
    34. name: ui
    35. livenessProbe:
    36. tcpSocket:
    37. initialDelaySeconds: 30
    38. periodSeconds: 60
    39. volumeMounts:
    40. - name: flink-config-volume
    41. securityContext:
    42. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    43. volumes:
    44. - name: flink-config-volume
    45. configMap:
    46. name: flink-config
    47. items:
    48. - key: flink-conf.yaml
    49. path: flink-conf.yaml
    50. - key: log4j.properties
    51. path: log4j.properties

    taskmanager-deployment.yaml

    jobmanager-service.yaml

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. type: ClusterIP
    7. ports:
    8. - name: rpc
    9. port: 6123
    10. - name: blob
    11. port: 6124
    12. - name: ui
    13. port: 8081
    14. selector:
    15. app: flink
    16. component: jobmanager
    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager-rest
    5. spec:
    6. type: NodePort
    7. ports:
    8. - name: rest
    9. port: 8081
    10. targetPort: 8081
    11. selector: