Merge pull request #17345 from erictune/job-patterns

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-12-10 21:39:07 -08:00
commit 08de178f24
16 changed files with 1367 additions and 22 deletions

View File

@ -42,12 +42,13 @@ Documentation for other releases can be found at
- [Writing a Job Spec](#writing-a-job-spec)
- [Pod Template](#pod-template)
- [Pod Selector](#pod-selector)
- [Multiple Completions](#multiple-completions)
- [Parallelism](#parallelism)
- [Parallelism and Completions](#parallelism-and-completions)
- [Handling Pod and Container Failures](#handling-pod-and-container-failures)
- [Alternatives to Job](#alternatives-to-job)
- [Job Patterns](#job-patterns)
- [Alternatives](#alternatives)
- [Bare Pods](#bare-pods)
- [Replication Controller](#replication-controller)
- [Single Job starts Controller Pod](#single-job-starts-controller-pod)
- [Caveats](#caveats)
- [Future work](#future-work)
@ -61,6 +62,9 @@ of successful completions is reached, the job itself is complete. Deleting a Jo
pods it created.
A simple case is to create 1 Job object in order to reliably run one Pod to completion.
The Job object will start a new Pod if the first pod fails or is deleted (for example
due to a node hardware failure or a node reboot).
A Job can also be used to run multiple pods in parallel.
## Running an example Job
@ -179,30 +183,28 @@ Also you should not normally create any pods whose labels match this selector, e
via another Job, or via another controller such as ReplicationController. Otherwise, the Job will
think that those pods were created by it. Kubernetes will not stop you from doing this.
### Multiple Completions
### Parallelism and Completions
By default, a Job is complete when one Pod runs to successful completion. You can also specify that
this needs to happen multiple times by specifying `.spec.completions` with a value greater than 1.
When multiple completions are requested, each Pod created by the Job controller has an identical
[`spec`](../devel/api-conventions.md#spec-and-status). In particular, all pods will have
the same command line and the same image, the same volumes, and mostly the same environment
variables. It is up to the user to arrange for the pods to do work on different things. For
example, the pods might all access a shared work queue service to acquire work units.
By default, a Job is complete when one Pod runs to successful completion.
To create multiple pods which are similar, but have slightly different arguments, environment
variables or images, use multiple Jobs.
A single Job object can also be used to control multiple pods running in
parallel. There are several different [patterns for running parallel
jobs](#job-patterns).
### Parallelism
You can suggest how many pods should run concurrently by setting `.spec.parallelism` to the number
of pods you would like to have running concurrently. This number is a suggestion. The number
running concurrently may be lower or higher for a variety of reasons. For example, it may be lower
if the number of remaining completions is less, or as the controller is ramping up, or if it is
throttling the job due to excessive failures. It may be higher for example if a pod is gracefully
shutdown, and the replacement starts early.
With some of these patterns, you can suggest how many pods should run
concurrently by setting `.spec.parallelism` to the number of pods you would
like to have running concurrently. This number is a suggestion. The number
running concurrently may be lower or higher for a variety of reasons. For
example, it may be lower if the number of remaining completions is less, or as
the controller is ramping up, or if it is throttling the job due to excessive
failures. It may be higher for example if a pod is gracefully shutdown, and
the replacement starts early.
If you do not specify `.spec.parallelism`, then it defaults to `.spec.completions`.
Depending on the pattern you are using, you will either set `.spec.completions`
to 1 or to the number of units of work (see [Job Patterns] for an explanation).
## Handling Pod and Container Failures
A Container in a Pod may fail for a number of reasons, such as because the process in it exited with
@ -226,7 +228,61 @@ sometimes be started twice.
If you do specify `.spec.parallelism` and `.spec.completions` both greater than 1, then there may be
multiple pods running at once. Therefore, your pods must also be tolerant of concurrency.
## Alternatives to Job
## Job Patterns
The Job object can be used to support reliable parallel execution of Pods. The Job object is not
designed to support closely-communicating parallel processes, as commonly found in scientific
computing. It does support parallel processing of a set of independent but related *work items*.
These might be emails to be sent, frames to be rendered, files to be transcoded, ranges of keys in a
NoSQL database to scan, and so on.
In a complex system, there may be multiple different sets of work items. Here we are just
considering one set of work items that the user wants to manage together — a *batch job*.
There are several different patterns for parallel computation, each with strengths and weaknesses.
The tradeoffs are:
- One Job object for each work item, vs a single Job object for all work items. The latter is
better for large numbers of work items. The former creates some overhead for the user and for the
system to manage large numbers of Job objects. Also, with the latter, the resource usage of the job
(number of concurrently running pods) can be easily adjusted using the `kubectl scale` command.
- Number of pods created equals number of work items, vs each pod can process multiple work items.
The former typically requires less modification to existing code and containers. The latter
is better for large numbers of work items, for similar reasons to the previous bullet.
- Several approaches use a work queue. This requires running a queue service,
and modifications to the existing program or container to make it use the work queue.
Other approaches are easier to adapt to an existing containerised application.
The tradeoffs are summarized here, with columns 2 to 4 corresponding to the above tradeoffs.
The pattern names are also links to examples and more detailed description.
| Pattern | Single Job object | Fewer pods than work items? | Use app unmodified? | Works in Kube 1.1? |
| -------------------------------------------------------------------------- |:-----------------:|:---------------------------:|:-------------------:|:-------------------:|
| [Job Template Expansion](../../examples/job/expansions/README.md) | | | ✓ | ✓ |
| [Queue with Pod Per Work Item](../../examples/job/work-queue-1/README.md) | ✓ | | sometimes | ✓ |
| [Queue with Variable Pod Count](../../examples/job/work-queue-2/README.md) | | ✓ | ✓ | | ✓ |
| Single Job with Static Work Assignment | ✓ | | ✓ | |
When you specify completions with `.spec.completions`, each Pod created by the Job controller
has an identical [`spec`](../devel/api-conventions.md#spec-and-status). This means that
all pods will have the same command line and the same
image, the same volumes, and (almost) the same environment variables. These patterns
are different ways to arrange for pods to work on different things.
This table shows the required settings for `.spec.parallelism` and `.spec.completions` for each of the patterns.
Here, `W` is the number of work items.
| Pattern | `.spec.completions` | `.spec.parallelism` |
| -------------------------------------------------------------------------- |:-------------------:|:--------------------:|
| [Job Template Expansion](../../examples/job/expansions/README.md) | 1 | should be 1 |
| [Queue with Pod Per Work Item](../../examples/job/work-queue-1/README.md) | W | any |
| [Queue with Variable Pod Count](../../examples/job/work-queue-2/README.md) | 1 | any |
| Single Job with Static Work Assignment | W | any |
## Alternatives
### Bare Pods
@ -245,6 +301,19 @@ As discussed in [life of a pod](pod-states.md), `Job` is *only* appropriate for
`RestartPolicy` equal to `OnFailure` or `Never`. (Note: If `RestartPolicy` is not set, the default
value is `Always`.)
### Single Job starts Controller Pod
Another pattern is for a single Job to create a pod which then creates other pods, acting as a sort
of custom controller for those pods. This allows the most flexibility, but may be somewhat
complicated to get started with and offers less integration with Kubernetes.
One example of this pattern would be a Job which starts a Pod which runs a script that in turn
starts a Spark master controller (see [spark example](../../examples/spark/README.md)), runs a spark
driver, and then cleans up.
An advantage of this approach is that the overall process gets the completion guarantee of a Job
object, but complete control over what pods are created and how work is assigned to them.
## Caveats
Job objects are in the [`extensions` API Group](../api.md#api-groups).

View File

@ -392,6 +392,14 @@ func TestExampleObjectSchemas(t *testing.T) {
"javaweb": &api.Pod{},
"javaweb-2": &api.Pod{},
},
"../examples/job/work-queue-1": {
"job": &extensions.Job{},
},
"../examples/job/work-queue-2": {
"redis-pod": &api.Pod{},
"redis-service": &api.Service{},
"job": &extensions.Job{},
},
}
capabilities.SetForTests(capabilities.Capabilities{

View File

@ -0,0 +1,255 @@
<!-- BEGIN MUNGE: UNVERSIONED_WARNING -->
<!-- BEGIN STRIP_FOR_RELEASE -->
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<h2>PLEASE NOTE: This document applies to the HEAD of the source tree</h2>
If you are using a released version of Kubernetes, you should
refer to the docs that go with that version.
<strong>
The latest release of this document can be found
[here](http://releases.k8s.io/release-1.1/examples/job/expansions/README.md).
Documentation for other releases can be found at
[releases.k8s.io](http://releases.k8s.io).
</strong>
--
<!-- END STRIP_FOR_RELEASE -->
<!-- END MUNGE: UNVERSIONED_WARNING -->
# Example: Multiple Job Objects from Template Expansion
In this example, we will run multiple Kubernetes Jobs created from
a common template. You may want to be familiar with the basic,
non-parallel, use of [Job](../../../docs/user-guide/jobs.md) first.
## Basic Template Expansion
First, create a template of a Job object:
<!-- BEGIN MUNGE: EXAMPLE job.yaml.txt -->
```
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: process-item-$ITEM
spec:
selector:
matchLabels:
app: jobexample
item: $ITEM
template:
metadata:
name: jobexample
labels:
app: jobexample
item: $ITEM
spec:
containers:
- name: c
image: busybox
command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"]
restartPolicy: Never
```
[Download example](job.yaml.txt?raw=true)
<!-- END MUNGE: EXAMPLE job.yaml.txt -->
Unlike a *pod template*, our *job template* is not a Kubernetes API type. It is just
a yaml representation of a Job object that has some placeholders that need to be filled
in before it can be used. The `$ITEM` syntax is not meaningful to Kubernetes.
In this example, the only processing the container does is to `echo` a string and sleep for a bit.
In a real use case, the processing would be some substantial computation, such as rendering a frame
of a movie, or processing a range of rows in a database. The "$ITEM" parameter would specify for
example, the frame number or the row range.
This Job has two labels. The first label, `app=jobexample`, distinguishes this group of jobs from
other groups of jobs (these are not shown, but there might be other ones). This label
makes it convenient to operate on all the jobs in the group at once. The second label, with
key `item`, distinguishes individual jobs in the group. Each Job object needs to have
a unique label that no other job has. This is it.
Neither of these label keys are special to kubernetes -- you can pick your own label scheme.
Next, expand the template into multiple files, one for each item to be processed.
```console
# Expand files into a temporary directory
$ mkdir ./jobs
$ for i in apple banana cherry
do
cat job.yaml.txt | sed "s/\$ITEM/$i/" > ./jobs/job-$i.yaml
done
$ ls jobs/
job-apple.yaml
job-banana.yaml
job-cherry.yaml
```
Here, we used `sed` to replace the string `$ITEM` with the the loop variable.
You could use any type of template language (jinja2, erb) or write a program
to generate the Job objects.
Next, create all the jobs with one kubectl command:
```console
$ kubectl create -f ./jobs
job "process-item-apple" created
job "process-item-banana" created
job "process-item-cherry" created
```
Now, check on the jobs:
```console
$ kubectl get jobs -l app=jobexample -L item
JOB CONTAINER(S) IMAGE(S) SELECTOR SUCCESSFUL ITEM
process-item-apple c busybox app in (jobexample),item in (apple) 1 apple
process-item-banana c busybox app in (jobexample),item in (banana) 1 banana
process-item-cherry c busybox app in (jobexample),item in (cherry) 1 cherry
```
Here we use the `-l` option to select all jobs that are part of this
group of jobs. (There might be other unrelated jobs in the system that we
do not care to see.)
The `-L` option adds an extra column with just the `item` label value.
We can check on the pods as well using the same label selector:
```console
$ kubectl get pods -l app=jobexample -L item
NAME READY STATUS RESTARTS AGE ITEM
process-item-apple-kixwv 0/1 Completed 0 4m apple
process-item-banana-wrsf7 0/1 Completed 0 4m banana
process-item-cherry-dnfu9 0/1 Completed 0 4m cherry
```
There is not a single command to check on the output of all jobs at once,
but looping over all the pods is pretty easy:
```console
$ for p in $(kubectl get pods -l app=jobexample -o name)
do
kubectl logs $p
done
Processing item apple
Processing item banana
Processing item cherry
```
## Multiple Template Parameters
In the first example, each instance of the template had one parameter, and that parameter was also
used as a label. However label keys are limited in [what characters they can
contain](labels.md#syntax-and-character-set).
This slightly more complex example uses a the jinja2 template language to generate our objects.
We will use a one-line python script to convert the template to a file.
First, download or paste the following template file to a file called `job.yaml.jinja2`:
<!-- BEGIN MUNGE: EXAMPLE job.yaml.jinja2 -->
```
{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", },
{ "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", },
{ "name": "raspberry", "url": "https://www.raspberrypi.org/" }]
%}
{%- for p in params %}
{%- set name = p["name"] %}
{%- set url = p["url"] %}
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: jobexample-{{ name }}
spec:
selector:
matchLabels:
app: jobexample
item: {{ name }}
template:
metadata:
name: jobexample
labels:
app: jobexample
item: {{ name }}
spec:
containers:
- name: c
image: busybox
command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"]
restartPolicy: Never
---
{%- endfor %}
```
[Download example](job.yaml.jinja2?raw=true)
<!-- END MUNGE: EXAMPLE job.yaml.jinja2 -->
The above template defines parameters for each job object using a list of
python dicts (lines 1-4). Then a for loop emits one job yaml object
for each set of parameters (remaining lines).
We take advantage of the fact that multiple yaml documents can be concatenated
with the `---` separator (second to last line).
.) We can pipe the output directly to kubectl to
create the objects.
You will need the jinja2 package if you do not already have it: `pip install --user jinja2`.
Now, use this one-line python program to expand the template:
```
$ alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"'
```
The output can be saved to a file, like this:
```
$ cat job.yaml.jinja2 | render_template > jobs.yaml
```
or sent directly to kubectl, like this:
```
$ cat job.yaml.jinja2 | render_template | kubectl create -f -
```
## Alternatives
If you have a large number of job objects, you may find that:
- even using labels, managing so many Job objects is cumbersome.
- you exceed resource quota when creating all the Jobs at once,
and do not want to wait to create them incrementally.
- you need a way to easily scale the number of pods running
concurrently. One reason would be to avoid using too many
compute resources. Another would be to limit the number of
concurrent requests to a shared resource, such as a database,
used by all the pods in the job.
- very large numbers of jobs created at once overload the
kubernetes apiserver, controller, or scheduler.
In this case, you can consider one of the
other [job patterns](../../../docs/user-guide/jobs.md#job-patterns).
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/job/expansions/README.md?pixel)]()
<!-- END MUNGE: GENERATED_ANALYTICS -->

View File

@ -0,0 +1,30 @@
{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", },
{ "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", },
{ "name": "raspberry", "url": "https://www.raspberrypi.org/" }]
%}
{%- for p in params %}
{%- set name = p["name"] %}
{%- set url = p["url"] %}
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: jobexample-{{ name }}
spec:
selector:
matchLabels:
app: jobexample
item: {{ name }}
template:
metadata:
name: jobexample
labels:
app: jobexample
item: {{ name }}
spec:
containers:
- name: c
image: busybox
command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"]
restartPolicy: Never
---
{%- endfor %}

View File

@ -0,0 +1,21 @@
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: process-item-$ITEM
spec:
selector:
matchLabels:
app: jobexample
item: $ITEM
template:
metadata:
name: jobexample
labels:
app: jobexample
item: $ITEM
spec:
containers:
- name: c
image: busybox
command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"]
restartPolicy: Never

View File

@ -0,0 +1,10 @@
# Specify BROKER_URL and QUEUE when running
FROM ubuntu:14.04
RUN apt-get update && \
apt-get install -y curl ca-certificates amqp-tools python \
--no-install-recommends \
&& rm -rf /var/lib/apt/lists/*
COPY ./worker.py /worker.py
CMD /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py

View File

@ -0,0 +1,374 @@
<!-- BEGIN MUNGE: UNVERSIONED_WARNING -->
<!-- BEGIN STRIP_FOR_RELEASE -->
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<h2>PLEASE NOTE: This document applies to the HEAD of the source tree</h2>
If you are using a released version of Kubernetes, you should
refer to the docs that go with that version.
<strong>
The latest release of this document can be found
[here](http://releases.k8s.io/release-1.1/examples/job/work-queue-1/README.md).
Documentation for other releases can be found at
[releases.k8s.io](http://releases.k8s.io).
</strong>
--
<!-- END STRIP_FOR_RELEASE -->
<!-- END MUNGE: UNVERSIONED_WARNING -->
# Example: Job with Work Queue with Pod Per Work Item
In this example, we will run a Kubernetes Job with multiple parallel
worker processes. You may want to be familiar with the basic,
non-parallel, use of [Job](../../../docs/user-guide/jobs.md) first.
In this example, as each pod is created, it picks up one unit of work
from a task queue, completes it, deletes it from the queue, and exits.
Here is an overview of the steps in this example:
1. **Start a message queue service.** In this example, we use RabbitMQ, but you could use another
one. In practice you would set up a message queue service once and reuse it for many jobs.
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
this example, a message is just an integer that we will do a lengthy computation on.
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
one task from the message queue, processes it, and repeats until the end of the queue is reached.
## Starting a message queue service
This example uses RabbitMQ, but it should be easy to adapt to another AMQP-type message service.
In practice you could set up a message queue service once in a
cluster and reuse it for many jobs, as well as for long-running services.
Start RabbitMQ as follows:
```console
$ kubectl create -f examples/celery-rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
$ kubectl create -f examples/celery-rabbitmq/rabbitmq-controller.yaml
replicationController "rabbitmq-controller" created
```
We will only use the rabbitmq part from the celery-rabbitmq example.
## Testing the message queue service
Now, we can experiment with accessing the message queue. We will
create a temporary interactive pod, install some tools on it,
and experiment with queues.
First create a temporary interactive Pod.
```console
# Create a temporary interactive container
$ kubectl run -i --tty temp --image ubuntu:14.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...
```
Note that your pod name and command prompt will be different.
Next install the `amqp-tools` so we can work with message queues.
```console
# Install some tools
root@temp-loe07:/# apt-get update
.... [ lots of output ] ....
root@temp-loe07:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils
.... [ lots of output ] ....
```
Later, we will make a docker image that includes these packages.
Next, we will check that we can discover the rabbitmq service:
```
# Note the rabitmq-service has a DNS name, provided by Kubernetes:
root@temp-loe07:/# nslookup rabbitmq-service
Server: 10.0.0.10
Address: 10.0.0.10#53
Name: rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152
# Your address will vary.
```
If Kube-DNS is not setup correctly, the previous step may not work for you.
You can also find the service IP in an env var:
```
# env | grep RABBIT | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
# Your address will vary.
```
Next we will verify we can create a queue, and publish and consume messages.
```console
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
# can be reached. 5672 is the standard port for rabbitmq.
root@temp-loe07:/# BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# If you could not resolve "rabbitmq-service" in the previous step,
# then use this command instead:
# root@temp-loe07:/# BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
# Now create a queue:
root@temp-loe07:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
foo
# Publish one message to it:
root@temp-loe07:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
# And get it back.
root@temp-loe07:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello
root@temp-loe07:/#
```
In the last command, the `amqp-consume` tool takes one message (`-c 1`)
from the queue, and passes that message to the standard input of an
an arbitrary command. In this case, the program `cat` is just printing
out what it gets on the standard input, and the echo is just to add a carriage
return so the example is readable.
## Filling the Queue with tasks
Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be
printed.
In a practice, the content of the messages might be:
- names of files to that need to be processed
- extra flags to the program
- ranges of keys in a database table
- configuration parameters to a simulation
- frame numbers of a scene to be rendered
In practice, if there is large data that is needed in a read-only mode by all pods
of the Job, you will typically put that in a shared file system like NFS and mount
that readonly on all the pods, or the program in the pod will natively read data from
a cluster file system like HDFS.
For our example, we will create the queue and fill it using the amqp command line tools.
In practice, you might write a program to fill the queue using an amqp client library.
```console
$ /usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
job1
$ for f in apple banana cherry date fig grape lemon melon
do
/usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done
```
So, we filled the queue with 8 messages.
## Create an Image
Now we are ready to create an image that we will run as a job.
We will use the `amqp-consume` utility to read the message
from the queue and run our actual program. Here is a very simple
example program:
<!-- BEGIN MUNGE: EXAMPLE worker.py -->
```
#!/usr/bin/env python
# Copyright 2015 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.lines())
time.sleep(10)
```
[Download example](worker.py?raw=true)
<!-- END MUNGE: EXAMPLE worker.py -->
Now, build an an image. If you are working in the source
tree, then change directory to `examples/job/work-queue-1`.
Otherwise, make a temporary directory, change to it,
download the [Dockerfile](Dockerfile?raw=true),
and [worker.py](worker.py?raw=true). In either case,
build the image with this command: `
```console
$ docker build -t job-wq-1 .
```
For the [Docker Hub](https://hub.docker.com/), tag your app image with
your username and push to the Hub with the below commands. Replace
`<username>` with your Hub username.
```
docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1
```
If you are using [Google Container
Registry](https://cloud.google.com/tools/container-registry/), tag
your app image with your project ID, and push to GCR. Replace
`<project>` with your project ID.
```
docker tag job-wq-1 gcr.io/<project>/job-wq-1
gcloud docker push gcr.io/<project>/job-wq-1
```
## Defining a Job
Here is a job definition. You'll need to make a copy of the Job and edit the
image to match the name you used, and call it `./job.yaml`.
<!-- BEGIN MUNGE: EXAMPLE job.yaml -->
```yaml
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: job-wq-1
spec:
selector:
matchLabels:
app: job-wq-1
completions: 8
parallelism: 2
template:
metadata:
name: job-wq-1
labels:
app: job-wq-1
spec:
containers:
- name: c
image: gcr.io/<project>/job-wq-1
restartPolicy: OnFailure
```
[Download example](job.yaml?raw=true)
<!-- END MUNGE: EXAMPLE job.yaml -->
In this example, each pod works on one item from the queue and then exits.
So, the completion count of the Job corresponds to the number of work items
done. So we set, `.spec.completions: 8` for the example, since we put 8 items in the queue.
## Running the Job
So, now run the Job:
```console
$ kubectl create -f ./job.yaml
```
Now wait a bit, then check on the job.
```console
$ ./kubectl describe jobs/job-wq-1
Name: job-wq-1
Namespace: default
Image(s): gcr.io/causal-jigsaw-637/job-wq-1
Selector: app in (job-wq-1)
Parallelism: 4
Completions: 8
Labels: app=job-wq-1
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
No volumes.
Events:
FirstSeen LastSeen Count From SubobjectPath Reason Message
───────── ──────── ───── ──── ───────────── ────── ───────
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-hcobb
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-weytj
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-qaam5
27s 27s 1 {job } SuccessfulCreate Created pod: job-wq-1-b67sr
26s 26s 1 {job } SuccessfulCreate Created pod: job-wq-1-xe5hj
15s 15s 1 {job } SuccessfulCreate Created pod: job-wq-1-w2zqe
14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-d6ppa
14s 14s 1 {job } SuccessfulCreate Created pod: job-wq-1-p17e0
```
All our pods succeeded. Yay.
## Alternatives
This approach has the advantage that you
do not need to modify your "worker" program to be aware that there is a work queue.
It does require that you run a message queue service.
If running a queue service is inconvenient, you may
want to consider one of the other [job patterns](../../../docs/user-guide/jobs.md#job-patterns).
This approach creates a pod for every work item. If your work items only take a few seconds,
though, creating a Pod for every work item may add a lot of overhead. Consider another
[example](../work-queue-2/README.md), that executes multiple work items per Pod.
In this example, we used use the `amqp-consume` utility to read the message
from the queue and run our actual program. This has the advantage that you
do not need to modify your program to be aware of the queue.
A [different example](../work-queue-2/README.md), shows how to
communicate with the work queue using a client library.
## Caveats
If the number of completions is set to less than the number of items in the queue, then
not all items will be processed.
If the number of completions is set to more than the number of items in the queue,
then the Job will not appear to be completed, even though all items in the queue
have been processed. It will start additional pods which will block waiting
for a mesage.
There is an unlikely race with this pattern. If the container is killed in between the time
that the message is acknowledged by the amqp-consume command and the time that the container
exits with success, or if the node crashes before the kubelet is able to post the success of the pod
back to the api-server, then the Job will not appear to be complete, even though all items
in the queue have been processed.
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/job/work-queue-1/README.md?pixel)]()
<!-- END MUNGE: GENERATED_ANALYTICS -->

View File

@ -0,0 +1,20 @@
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: job-wq-1
spec:
selector:
matchLabels:
app: job-wq-1
completions: 8
parallelism: 2
template:
metadata:
name: job-wq-1
labels:
app: job-wq-1
spec:
containers:
- name: c
image: gcr.io/<project>/job-wq-1
restartPolicy: OnFailure

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python
# Copyright 2015 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.lines())
time.sleep(10)

View File

@ -0,0 +1,6 @@
FROM python
RUN pip install redis
COPY ./worker.py /worker.py
COPY ./rediswq.py /rediswq.py
CMD python worker.py

View File

@ -0,0 +1,305 @@
<!-- BEGIN MUNGE: UNVERSIONED_WARNING -->
<!-- BEGIN STRIP_FOR_RELEASE -->
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<h2>PLEASE NOTE: This document applies to the HEAD of the source tree</h2>
If you are using a released version of Kubernetes, you should
refer to the docs that go with that version.
<strong>
The latest release of this document can be found
[here](http://releases.k8s.io/release-1.1/examples/job/work-queue-2/README.md).
Documentation for other releases can be found at
[releases.k8s.io](http://releases.k8s.io).
</strong>
--
<!-- END STRIP_FOR_RELEASE -->
<!-- END MUNGE: UNVERSIONED_WARNING -->
# Example: Job with Work Queue with Pod Per Work Item
In this example, we will run a Kubernetes Job with multiple parallel
worker processes. You may want to be familiar with the basic,
non-parallel, use of [Job](../../../docs/user-guide/jobs.md) first.
In this example, as each pod is created, it picks up one unit of work
from a task queue, completes it, deletes it from the queue, and exits.
Here is an overview of the steps in this example:
1. **Start a storage service to hold the work queue.** In this example, we use Redis to store
our work items. In the previous example, we used RabbitMQ. In this example, we use Redis and
a custom work-queue client library because AMQP does not provide a good way for clients to
detect when a finite-length work queue is empty. In practice you would set up a store such
as Redis once and reuse it for the work queues of many jobs, and other things.
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
this example, a message is just an integer that we will do a lengthy computation on.
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
one task from the message queue, processes it, and repeats until the end of the queue is reached.
## Starting Redis
For this example, for simplicitly, we will start a single instance of Redis.
See the [Redis Example](../../../examples/redis/README.md) for an example
of deploying Redis scaleably and redundantly.
Start a temporary Pod running Redis and a service so we can find it.
```console
$ kubectl create -f examples/job/work-queue-2/redis-pod.yaml
pod "redis-master" created
$ kubectl create -f examples/job/work-queue-2/redis-service.yaml
service "redis" created
```
## Filling the Queue with tasks
Now lets fill the queue with some "tasks". In our example, our tasks are just strings to be
printed.
Start a temporary interactive pod for running the Redis CLI
```console
$ kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt
```
Now hit enter, start the redis CLI, and create a list with some work items in it.
```
# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"
```
So, the list with key `job2` will be our work queue.
Note: if you do not have Kube DNS setup correctly, you may need to change
the first step of the above block to `redis-cli -h $REDIS_SERVICE_HOST`.
## Create an Image
Now we are ready to create an image that we will run.
We will use a python worker program with a redis client to read
the messages from the message queue.
A simple Redis work queue client library is provided,
called rediswq.py ([Download](rediswq.py?raw=true)).
The "worker" program in each Pod of the Job uses the work queue
client library to get work. Here it is:
<!-- BEGIN MUNGE: EXAMPLE worker.py -->
```
#!/usr/bin/env python
# Copyright 2015 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import rediswq
host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")
q = rediswq.RedisWQ(name="job2", host="redis")
print("Worker with sessionID: " + q.sessionID())
print("Inital queue state: empty=" + str(q.empty()))
while not q.empty():
item = q.lease(lease_secs=10, block=True, timeout=2)
if item is not None:
itemstr = item.decode("utf=8")
print("Working on " + itemstr)
time.sleep(10) # Put your actual work here instead of sleep.
q.complete(item)
else:
print("Waiting for work")
print("Queue empty, exiting")
```
[Download example](worker.py?raw=true)
<!-- END MUNGE: EXAMPLE worker.py -->
If you are working from the source tree,
change directory to the `examples/job/work-queue-2` directory.
Otherwise, download [`worker.py`](worker.py?raw=true), [`rediswq.py`](rediswq.py?raw=true), and [`Dockerfile`](Dockerfile?raw=true)
using above links. Then build the image:
```console
$ docker build -t job-wq-2 .
```
For the [Docker Hub](https://hub.docker.com/), tag your app image with
your username and push to the Hub with the below commands. Replace
`<username>` with your Hub username.
```
docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2
```
If you are using [Google Container
Registry](https://cloud.google.com/tools/container-registry/), tag
your app image with your project ID, and push to GCR. Replace
`<project>` with your project ID.
```
docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker push gcr.io/<project>/job-wq-2
```
## Defining a Job
Here is the job definition:
<!-- BEGIN MUNGE: EXAMPLE job.yaml -->
```yaml
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: job-wq-2
spec:
selector:
matchLabels:
app: job-wq-2
completions: 1
parallelism: 2
template:
metadata:
name: job-wq-2
labels:
app: job-wq-2
spec:
containers:
- name: c
image: gcr.io/myproject/job-wq-2
restartPolicy: OnFailure
```
[Download example](job.yaml?raw=true)
<!-- END MUNGE: EXAMPLE job.yaml -->
Be sure to edit the job template to
change `gcr.io/myproject` to your own path.
In this example, each pod works on several items from the queue and then exits when there are no more items.
Since the workers themselves detect when the workqueue is empty, and the Job controller does not
know about the workqueue, it relies on the workers to signal when they are done working.
The workers signal that the queue is empty by exiting with success. So, as soon as any worker
exits with success, the controller knows the work is done, and the Pods will exit soon.
So, we set the completion count of the Job to 1. The job controller will wait for the other pods to complete
too.
## Running the Job
So, now run the Job:
```console
$ kubectl create -f ./job.yaml
```
Now wait a bit, then check on the job.
```console
$ ./kubectl describe jobs/job-wq-2
Name: job-wq-2
Namespace: default
Image(s): gcr.io/causal-jigsaw-637/job-wq-2
Selector: app in (job-wq-2)
Parallelism: 2
Completions: 1
Labels: app=job-wq-2
Pods Statuses: 0 Running / 1 Succeeded / 0 Failed
No volumes.
Events:
FirstSeen LastSeen Count From SubobjectPath Reason Message
───────── ──────── ───── ──── ───────────── ────── ───────
1m 1m 1 {job } SuccessfulCreate Created pod: job-wq-2-7r7b2
$ kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Inital queue state: empty=False
Working on banana
Working on date
Working on lemon
```
As you can see, one of our pods worked on several work units.
## Alternatives
If running a queue service or modifying your containers to use a work queue is inconvenient, you may
want to consider one of the other [job patterns](../../../docs/user-guide/jobs.md#job-patterns).
If you have a continuous stream of background processing work to run, then
consider running your background workers with a `replicationController` instead,
and consider running a background processing library such as
https://github.com/resque/resque.
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/job/work-queue-2/README.md?pixel)]()
<!-- END MUNGE: GENERATED_ANALYTICS -->

View File

@ -0,0 +1,20 @@
apiVersion: extensions/v1beta1
kind: Job
metadata:
name: job-wq-2
spec:
selector:
matchLabels:
app: job-wq-2
completions: 1
parallelism: 2
template:
metadata:
name: job-wq-2
labels:
app: job-wq-2
spec:
containers:
- name: c
image: gcr.io/myproject/job-wq-2
restartPolicy: OnFailure

View File

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: redis-master
labels:
app: redis
spec:
containers:
- name: master
image: redis
env:
- name: MASTER
value: "true"
ports:
- containerPort: 6379

View File

@ -0,0 +1,10 @@
apiVersion: v1
kind: Service
metadata:
name: redis
spec:
ports:
- port: 6379
targetPort: 6379
selector:
app: redis

View File

@ -0,0 +1,144 @@
#!/usr/bin/env python
# Copyright 2015 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html
# and the suggestion in the redis documentation for RPOPLPUSH, at
# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue.
import redis
import uuid
import hashlib
class RedisWQ(object):
"""Simple Finite Work Queue with Redis Backend
This work queue is finite: as long as no more work is added
after workers start, the workers can detect when the queue
is completely empty.
The items in the work queue are assumed to have unique values.
This object is not intended to be used by multiple threads
concurrently.
"""
def __init__(self, name, **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0
The work queue is identified by "name". The library may create other
keys with "name" as a prefix.
"""
self._db = redis.StrictRedis(**redis_kwargs)
# The session ID will uniquely identify this "worker".
self._session = str(uuid.uuid4())
# Work queue is implemented as two queues: main, and processing.
# Work is initially in main, and moved to processing when a client picks it up.
self._main_q_key = name
self._processing_q_key = name + ":processing"
self._lease_key_prefix = name + ":leased_by_session:"
def sessionID(self):
"""Return the ID for this session."""
return self._session
def _main_qsize(self):
"""Return the size of the main queue."""
return self._db.llen(self._main_q_key)
def _processing_qsize(self):
"""Return the size of the main queue."""
return self._db.llen(self._processing_q_key)
def empty(self):
"""Return True if the queue is empty, including work being done, False otherwise.
False does not necessarily mean that there is work available to work on right now,
"""
return self._main_qsize() == 0 and self._processing_qsize() == 0
# TODO: implement this
# def check_expired_leases(self):
# """Return to the work queueReturn True if the queue is empty, False otherwise."""
# # Processing list should not be _too_ long since it is approximately as long
# # as the number of active and recently active workers.
# processing = self._db.lrange(self._processing_q_key, 0, -1)
# for item in processing:
# # If the lease key is not present for an item (it expired or was
# # never created because the client crashed before creating it)
# # then move the item back to the main queue so others can work on it.
# if not self._lease_exists(item):
# TODO: transactionally move the key from processing queue to
# to main queue, while detecting if a new lease is created
# or if either queue is modified.
def _itemkey(self, item):
"""Returns a string that uniquely identifies an item (bytes)."""
return hashlib.sha224(item).hexdigest()
def _lease_exists(self, item):
"""True if a lease on 'item' exists."""
return self._db.exists(self._lease_key_prefix + self._itemkey(item))
def lease(self, lease_secs=60, block=True, timeout=None):
"""Begin working on an item the work queue.
Lease the item for lease_secs. After that time, other
workers may consider this client to have crashed or stalled
and pick up the item instead.
If optional args block is true and timeout is None (the default), block
if necessary until an item is available."""
if block:
item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout)
else:
item = self._db.rpoplpush(self._main_q_key, self._processing_q_key)
if item:
# Record that we (this session id) are working on a key. Expire that
# note after the lease timeout.
# Note: if we crash at this line of the program, then GC will see no lease
# for this item an later return it to the main queue.
itemkey = self._itemkey(item)
self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session)
return item
def complete(self, value):
"""Complete working on the item with 'value'.
If the lease expired, the item may not have completed, and some
other worker may have picked it up. There is no indication
of what happened.
"""
self._db.lrem(self._processing_q_key, 0, value)
# If we crash here, then the GC code will try to move the value, but it will
# not be here, which is fine. So this does not need to be a transaction.
itemkey = self._itemkey(value)
self._db.delete(self._lease_key_prefix + itemkey, self._session)
# TODO: add functions to clean up all keys associated with "name" when
# processing is complete.
# TODO: add a function to add an item to the queue. Atomically
# check if the queue is empty and if so fail to add the item
# since other workers might think work is done and be in the process
# of exiting.
# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and
# make it so it can be pip installed by anyone (see
# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github)
# TODO(etune): finish code to GC expired leases, and call periodically
# e.g. each time lease times out.

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python
# Copyright 2015 The Kubernetes Authors All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import rediswq
host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")
q = rediswq.RedisWQ(name="job2", host="redis")
print("Worker with sessionID: " + q.sessionID())
print("Inital queue state: empty=" + str(q.empty()))
while not q.empty():
item = q.lease(lease_secs=10, block=True, timeout=2)
if item is not None:
itemstr = item.decode("utf=8")
print("Working on " + itemstr)
time.sleep(10) # Put your actual work here instead of sleep.
q.complete(item)
else:
print("Waiting for work")
print("Queue empty, exiting")