diff --git a/examples/workqueue/BUILD b/examples/workqueue/BUILD new file mode 100644 index 00000000..fc608cc7 --- /dev/null +++ b/examples/workqueue/BUILD @@ -0,0 +1,34 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", +) + +go_binary( + name = "workqueue", + library = ":go_default_library", + tags = ["automanaged"], +) + +go_library( + name = "go_default_library", + srcs = ["main.go"], + tags = ["automanaged"], + deps = [ + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/pkg/api:go_default_library", + "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", + ], +) diff --git a/examples/workqueue/README.md b/examples/workqueue/README.md new file mode 100644 index 00000000..f222d700 --- /dev/null +++ b/examples/workqueue/README.md @@ -0,0 +1,17 @@ +# Workqueue Example + +This example demonstrates how to write a controller which follows the states +of watched resources. + +It demonstrates how to: + * combine the workqueue with a cache to a full controller + * synchronize the controller on startup + +The example is based on https://github.com/kubernetes/community/blob/master/contributors/devel/controllers.md. + +## Running + +``` +# if outside of the cluster +go run *.go -kubeconfig=/my/config -logtostderr=true +``` diff --git a/examples/workqueue/main.go b/examples/workqueue/main.go new file mode 100644 index 00000000..fb6efd3d --- /dev/null +++ b/examples/workqueue/main.go @@ -0,0 +1,218 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "fmt" + "time" + + "github.com/golang/glog" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" +) + +type Controller struct { + indexer cache.Indexer + queue workqueue.RateLimitingInterface + informer cache.Controller +} + +func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller { + return &Controller{ + informer: informer, + indexer: indexer, + queue: queue, + } +} + +func (c *Controller) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.queue.Get() + if quit { + return false + } + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.queue.Done(key) + + // Invoke the method containing the business logic + err := c.syncToStdout(key.(string)) + // Handle the error if something went wrong during the execution of the business logic + c.handleErr(err, key) + return true +} + +// syncToStdout is the business logic of the controller. In this controller it simply prints +// information about the pod to stdout. In case an error happened, it has to simply return the error. +// The retry logic should not be part of the business logic. +func (c *Controller) syncToStdout(key string) error { + obj, exists, err := c.indexer.GetByKey(key) + if err != nil { + glog.Errorf("Fetching object with key %s from store failed with %v", key, err) + return err + } + + if !exists { + // Below we will warm up our cache with a Pod, so that we will see a delete for one pod + fmt.Printf("Pod %s does not exist anymore\n", key) + } else { + // Note that you also have to check the uid if you have a local controlled resource, which + // is dependent on the actual instance, to detect that a Pod was recreated with the same name + fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName()) + } + return nil +} + +// handleErr checks if an error happened and makes sure we will retry later. +func (c *Controller) handleErr(err error, key interface{}) { + if err == nil { + // Forget about the #AddRateLimited history of the key on every successful synchronization. + // This ensures that future processing of updates for this key is not delayed because of + // an outdated error history. + c.queue.Forget(key) + return + } + + // This controller retries 5 times if something goes wrong. After that, it stops trying. + if c.queue.NumRequeues(key) < 5 { + glog.Infof("Error syncing pod %v: %v", key, err) + + // Re-enqueue the key rate limited. Based on the rate limiter on the + // queue and the re-enqueue history, the key will be processed later again. + c.queue.AddRateLimited(key) + return + } + + c.queue.Forget(key) + // Report to an external entity that, even after several retries, we could not successfully process this key + runtime.HandleError(err) + glog.Infof("Dropping pod %q out of the queue: %v", key, err) +} + +func (c *Controller) Run(threadiness int, stopCh chan struct{}) { + defer runtime.HandleCrash() + + // Let the workers stop when we are done + defer c.queue.ShutDown() + glog.Info("Starting Pod controller") + + go c.informer.Run(stopCh) + + // Wait for all involved caches to be synced, before processing items from the queue is started + if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) + return + } + + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh + glog.Info("Stopping Pod controller") +} + +func (c *Controller) runWorker() { + for c.processNextItem() { + } +} + +func main() { + var kubeconfig string + var master string + + flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file") + flag.StringVar(&master, "master", "", "master url") + flag.Parse() + + // creates the connection + config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig) + if err != nil { + glog.Fatal(err) + } + + // creates the clientset + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + glog.Fatal(err) + } + + // create the pod watcher + podListWatcher := cache.NewListWatchFromClient(clientset.Core().RESTClient(), "pods", api.NamespaceDefault, fields.Everything()) + + // create the workqueue + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + + // Bind the workqueue to a cache with the help of an informer. This way we make sure that + // whenever the cache is updated, the pod key is added to the workqueue. + // Note that when we finally process the item from the workqueue, we might see a newer version + // of the Pod than the version which was responsible for triggering the update. + indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + queue.Add(key) + } + }, + UpdateFunc: func(old interface{}, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + queue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // IndexerInformer uses a delta queue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + queue.Add(key) + } + }, + }, cache.Indexers{}) + + controller := NewController(queue, indexer, informer) + + // We can now warm up the cache for initial synchronization. + // Let's suppose that we knew about a pod "mypod" on our last run, therefore add it to the cache. + // If this pod is not there anymore, the controller will be notified about the removal after the + // cache has synchronized. + indexer.Add(&v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "mypod", + Namespace: v1.NamespaceDefault, + }, + }) + + // Now let's start the controller + stop := make(chan struct{}) + defer close(stop) + go controller.Run(1, stop) + + // Wait forever + select {} +}