Kubernetes Setup
Info This page describes deploying a standalone Flink session on top of Kubernetes. For information on native Kubernetes deployments read .
Please follow Kubernetes’ setup guide in order to deploy a Kubernetes cluster.If you want to run Kubernetes locally, we recommend using .
Note: If using MiniKube please make sure to execute before deploying a Flink cluster. Otherwise Flink components are not able to self reference themselves through a Kubernetes service.
A Flink session cluster is executed as a long-running Kubernetes Deployment. Note that you can run multiple Flink jobs on a session cluster.Each job needs to be submitted to the cluster after the cluster has been deployed.
A basic Flink session cluster deployment in Kubernetes has three components:
- a Deployment/Job which runs the JobManager
- a Deployment for a pool of TaskManagers
- a Service exposing the JobManager’s REST and UI ports
Using the resource definitions for a session cluster, launch the cluster with the kubectl
command:
You can then access the Flink UI via different ways:
kubectl proxy
:- Run
kubectl proxy
in a terminal. - Navigate to in your browser.
- Run
kubectl port-forward
:- Run
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
to forward your jobmanager’s web ui port to local 8081. - Navigate to http://localhost:8081 in your browser.
- Moreover, you could use the following command below to submit jobs to the cluster:
- Run
./bin/flink run -m localhost:8081 ./examples/streaming/WordCount.jar
- Create a
NodePort
service on the rest service of jobmanager:- Run
kubectl create -f jobmanager-rest-service.yaml
to create theNodePort
service on jobmanager. The example ofjobmanager-rest-service.yaml
can be found in . - Run
kubectl get svc flink-jobmanager-rest
to know thenode-port
of this service and navigate to http://: in your browser. - Similarly to
port-forward
solution, you could also use the following command below to submit jobs to the cluster:
- Run
./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/WordCount.jar
In order to terminate the Flink session cluster, use kubectl
:
A Flink job cluster is a dedicated cluster which runs a single job. The job is part of the image and, thus, there is no extra job submission needed.
The Flink job cluster image needs to contain the user code jars of the job for which the cluster is started.Therefore, one needs to build a dedicated container image for every job.Please follow these to build the Docker image.
An early version of a Flink Helm chart is available on GitHub.
The Deployment definitions use the pre-built image flink:latest
which can be found .The image is built from this Github repository.
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
taskmanager-deployment.yaml
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
selector: