Kube2sky synchronously waits for the Kubernetes service.

This commit is contained in:
Prashanth Balasubramanian 2016-02-29 19:40:15 -08:00
parent 3f16f5f2b8
commit 8de2fbfa0a
4 changed files with 112 additions and 28 deletions

View File

@ -1,3 +1,7 @@
## Version 1.13 (Mar 1 2016 Prashanth.B <beeps@google.com>)
- Synchronously wait for the Kubernetes service at startup.
- Add a SIGTERM/SIGINT handler.
## Version 1.12 (Dec 15 2015 Abhishek Shah <abshah@google.com>) ## Version 1.12 (Dec 15 2015 Abhishek Shah <abshah@google.com>)
- Gave pods their own cache store. (034ecbd) - Gave pods their own cache store. (034ecbd)
- Allow pods to have dns. (717660a) - Allow pods to have dns. (717660a)

View File

@ -18,7 +18,7 @@
.PHONY: all kube2sky container push clean test .PHONY: all kube2sky container push clean test
TAG = 1.12 TAG = 1.13
PREFIX = gcr.io/google_containers PREFIX = gcr.io/google_containers
all: container all: container

View File

@ -26,8 +26,10 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"os/signal"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
etcd "github.com/coreos/go-etcd/etcd" etcd "github.com/coreos/go-etcd/etcd"
@ -47,12 +49,16 @@ import (
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
// The name of the "master" Kubernetes Service.
const kubernetesSvcName = "kubernetes"
var ( var (
argDomain = flag.String("domain", "cluster.local", "domain under which to create names") argDomain = flag.String("domain", "cluster.local", "domain under which to create names")
argEtcdMutationTimeout = flag.Duration("etcd-mutation-timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration") argEtcdMutationTimeout = flag.Duration("etcd-mutation-timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration")
argEtcdServer = flag.String("etcd-server", "http://127.0.0.1:4001", "URL to etcd server") argEtcdServer = flag.String("etcd-server", "http://127.0.0.1:4001", "URL to etcd server")
argKubecfgFile = flag.String("kubecfg-file", "", "Location of kubecfg file for access to kubernetes master service; --kube-master-url overrides the URL part of this; if neither this nor --kube-master-url are provided, defaults to service account tokens") argKubecfgFile = flag.String("kubecfg-file", "", "Location of kubecfg file for access to kubernetes master service; --kube-master-url overrides the URL part of this; if neither this nor --kube-master-url are provided, defaults to service account tokens")
argKubeMasterURL = flag.String("kube-master-url", "", "URL to reach kubernetes master. Env variables in this flag will be expanded.") argKubeMasterURL = flag.String("kube-master-url", "", "URL to reach kubernetes master. Env variables in this flag will be expanded.")
healthzPort = flag.Int("healthz-port", 8081, "port on which to serve a kube2sky HTTP readiness probe.")
) )
const ( const (
@ -410,7 +416,11 @@ func (ks *kube2sky) removeService(obj interface{}) {
} }
func (ks *kube2sky) updateService(oldObj, newObj interface{}) { func (ks *kube2sky) updateService(oldObj, newObj interface{}) {
// TODO: Avoid unwanted updates. // TODO: We shouldn't leave etcd in a state where it doesn't have a
// record for a Service. This removal is needed to completely clean
// the directory of a Service, which has SRV records and A records
// that are hashed according to oldObj. Unfortunately, this is the
// easiest way to purge the directory.
ks.removeService(oldObj) ks.removeService(oldObj)
ks.newService(newObj) ks.newService(newObj)
} }
@ -562,10 +572,56 @@ func getHash(text string) string {
return fmt.Sprintf("%x", h.Sum32()) return fmt.Sprintf("%x", h.Sum32())
} }
// waitForKubernetesService waits for the "Kuberntes" master service.
// Since the health probe on the kube2sky container is essentially an nslookup
// of this service, we cannot serve any DNS records if it doesn't show up.
// Once the Service is found, we start replying on this containers readiness
// probe endpoint.
func waitForKubernetesService(client *kclient.Client) (svc *kapi.Service) {
name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName)
glog.Infof("Waiting for service: %v", name)
var err error
servicePollInterval := 1 * time.Second
for {
svc, err = client.Services(kapi.NamespaceDefault).Get(kubernetesSvcName)
if err != nil || svc == nil {
glog.Infof("Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", name, err, servicePollInterval)
time.Sleep(servicePollInterval)
continue
}
break
}
return
}
// setupSignalHandlers runs a goroutine that waits on SIGINT or SIGTERM and logs it
// before exiting.
func setupSignalHandlers() {
sigChan := make(chan os.Signal)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// This program should always exit gracefully logging that it received
// either a SIGINT or SIGTERM. Since kube2sky is run in a container
// without a liveness probe as part of the kube-dns pod, it shouldn't
// restart unless the pod is deleted. If it restarts without logging
// anything it means something is seriously wrong.
// TODO: Remove once #22290 is fixed.
go func() {
glog.Fatalf("Received signal %s", <-sigChan)
}()
}
// setupHealthzHandlers sets up a readiness and liveness endpoint for kube2sky.
func setupHealthzHandlers(ks *kube2sky) {
http.HandleFunc("/readiness", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, "ok\n")
})
}
func main() { func main() {
flag.CommandLine.SetNormalizeFunc(util.WarnWordSepNormalizeFunc) flag.CommandLine.SetNormalizeFunc(util.WarnWordSepNormalizeFunc)
flag.Parse() flag.Parse()
var err error var err error
setupSignalHandlers()
// TODO: Validate input flags. // TODO: Validate input flags.
domain := *argDomain domain := *argDomain
if !strings.HasSuffix(domain, ".") { if !strings.HasSuffix(domain, ".") {
@ -583,10 +639,20 @@ func main() {
if err != nil { if err != nil {
glog.Fatalf("Failed to create a kubernetes client: %v", err) glog.Fatalf("Failed to create a kubernetes client: %v", err)
} }
// Wait synchronously for the Kubernetes service and add a DNS record for it.
ks.newService(waitForKubernetesService(kubeClient))
glog.Infof("Successfully added DNS record for Kubernetes service.")
ks.endpointsStore = watchEndpoints(kubeClient, &ks) ks.endpointsStore = watchEndpoints(kubeClient, &ks)
ks.servicesStore = watchForServices(kubeClient, &ks) ks.servicesStore = watchForServices(kubeClient, &ks)
ks.podsStore = watchPods(kubeClient, &ks) ks.podsStore = watchPods(kubeClient, &ks)
select {} // We declare kube2sky ready when:
// 1. It has retrieved the Kubernetes master service from the apiserver. If this
// doesn't happen skydns will fail its liveness probe assuming that it can't
// perform any cluster local DNS lookups.
// 2. It has setup the 3 watches above.
// Once ready this container never flips to not-ready.
setupHealthzHandlers(&ks)
glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *healthzPort), nil))
} }

View File

@ -1,32 +1,35 @@
apiVersion: v1 apiVersion: v1
kind: ReplicationController kind: ReplicationController
metadata: metadata:
name: kube-dns-v10 name: kube-dns-v11
namespace: kube-system namespace: kube-system
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v10 version: v11
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
replicas: {{ pillar['dns_replicas'] }} replicas: {{ pillar['dns_replicas'] }}
selector: selector:
k8s-app: kube-dns k8s-app: kube-dns
version: v10 version: v11
template: template:
metadata: metadata:
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v10 version: v11
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
containers: containers:
- name: etcd - name: etcd
image: gcr.io/google_containers/etcd-amd64:2.2.1 image: gcr.io/google_containers/etcd-amd64:2.2.1
resources: resources:
# keep request = limit to keep this container in guaranteed class # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
# guaranteed class. Currently, this container falls into the
# "burstable" category so the kubelet doesn't backoff from restarting it.
limits: limits:
cpu: 100m cpu: 100m
memory: 50Mi memory: 500Mi
requests: requests:
cpu: 100m cpu: 100m
memory: 50Mi memory: 50Mi
@ -44,25 +47,50 @@ spec:
- name: etcd-storage - name: etcd-storage
mountPath: /var/etcd/data mountPath: /var/etcd/data
- name: kube2sky - name: kube2sky
image: gcr.io/google_containers/kube2sky:1.12 image: gcr.io/google_containers/kube2sky:1.13
resources: resources:
# keep request = limit to keep this container in guaranteed class # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
# guaranteed class. Currently, this container falls into the
# "burstable" category so the kubelet doesn't backoff from restarting it.
limits: limits:
cpu: 100m cpu: 100m
memory: 50Mi # Kube2sky watches all pods.
memory: 200Mi
requests: requests:
cpu: 100m cpu: 100m
memory: 50Mi memory: 50Mi
livenessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 5
readinessProbe:
httpGet:
path: /readiness
port: 8081
scheme: HTTP
# we poll on pod startup for the Kubernetes master service and
# only setup the /readiness HTTP server once that's available.
initialDelaySeconds: 30
timeoutSeconds: 5
args: args:
# command = "/kube2sky" # command = "/kube2sky"
- --domain={{ pillar['dns_domain'] }} - --domain={{ pillar['dns_domain'] }}
- name: skydns - name: skydns
image: gcr.io/google_containers/skydns:2015-10-13-8c72f8c image: gcr.io/google_containers/skydns:2015-10-13-8c72f8c
resources: resources:
# keep request = limit to keep this container in guaranteed class # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in
# guaranteed class. Currently, this container falls into the
# "burstable" category so the kubelet doesn't backoff from restarting it.
limits: limits:
cpu: 100m cpu: 100m
memory: 50Mi memory: 200Mi
requests: requests:
cpu: 100m cpu: 100m
memory: 50Mi memory: 50Mi
@ -79,20 +107,6 @@ spec:
- containerPort: 53 - containerPort: 53
name: dns-tcp name: dns-tcp
protocol: TCP protocol: TCP
livenessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 30
timeoutSeconds: 5
readinessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 1
timeoutSeconds: 5
- name: healthz - name: healthz
image: gcr.io/google_containers/exechealthz:1.0 image: gcr.io/google_containers/exechealthz:1.0
resources: resources: