diff --git a/cluster/addons/dns/README.md b/cluster/addons/dns/README.md index e82c1fe8d2a..4fe540276da 100644 --- a/cluster/addons/dns/README.md +++ b/cluster/addons/dns/README.md @@ -23,7 +23,9 @@ The following sections detail the supported record types and layout that is supported. Any other layout or names or queries that happen to work are considered implementation details and are subject to change without warning. -### A records +### Services + +#### A records "Normal" (not headless) Services are assigned a DNS A record for a name of the form `my-svc.my-namespace.svc.cluster.local`. This resolves to the cluster IP of the Service. @@ -50,6 +52,13 @@ Previous versions of kube-dns made names of the for `my-svc.my-namespace.cluster.local` (the 'svc' level was added later). This is no longer supported. +### Pods + +#### A Records +When enabled, pods are assigned a DNS A record in the form of `pod-ip-address.my-namespace.pod.cluster.local`. + +For example, a pod with ip `1.2.3.4` in the namespace `default` with a dns name of `cluster.local` would have an entry: `1-2-3-4.default.pod.cluster.local`. + ## How do I find the DNS server? The DNS server itself runs as a Kubernetes Service. This gives it a stable IP address. When you run the SkyDNS service, you want to assign a static IP to use for @@ -126,7 +135,7 @@ Then create a pod using this file: kubectl create -f busybox.yaml ``` -### 2 Wait for this pod to go into the running state. +### 2 Wait for this pod to go into the running state. You can get its status with: ``` diff --git a/cluster/addons/dns/kube2sky/README.md b/cluster/addons/dns/kube2sky/README.md index e8cd5373c18..9a726accf20 100644 --- a/cluster/addons/dns/kube2sky/README.md +++ b/cluster/addons/dns/kube2sky/README.md @@ -17,11 +17,11 @@ description of `-domain` below. `-domain`: Set the domain under which all DNS names will be hosted. For example, if this is set to `kubernetes.io`, then a service named "nifty" in the "default" namespace would be exposed through DNS as -"nifty.default.kubernetes.io". +"nifty.default.svc.kubernetes.io". `-v`: Set logging level -`-etcd_mutation_timeout`: For how long the application will keep retrying etcd +`-etcd_mutation_timeout`: For how long the application will keep retrying etcd mutation (insertion or removal of a dns entry) before giving up and crashing. `-etcd-server`: The etcd server that is being used by skydns. diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 686c4da6316..9c0c7a27218 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -61,6 +61,8 @@ const ( resyncPeriod = 30 * time.Minute // A subdomain added to the user specified domain for all services. serviceSubdomain = "svc" + // A subdomain added to the user specified dmoain for all pods. + podSubdomain = "pod" ) type etcdClient interface { @@ -222,6 +224,59 @@ func (ks *kube2sky) handleEndpointAdd(obj interface{}) { } } +func (ks *kube2sky) handlePodCreate(obj interface{}) { + if e, ok := obj.(*kapi.Pod); ok { + // If the pod ip is not yet available, do not attempt to create. + if e.Status.PodIP != "" { + name := buildDNSNameString(ks.domain, podSubdomain, e.Namespace, santizeIP(e.Status.PodIP)) + ks.mutateEtcdOrDie(func() error { return ks.generateRecordsForPod(name, e) }) + } + } +} + +func (ks *kube2sky) handlePodUpdate(old interface{}, new interface{}) { + oldPod, okOld := old.(*kapi.Pod) + newPod, okNew := new.(*kapi.Pod) + + // Validate that the objects are good + if okOld && okNew { + if oldPod.Status.PodIP != newPod.Status.PodIP { + ks.handlePodDelete(oldPod) + ks.handlePodCreate(newPod) + } + } else if okNew { + ks.handlePodCreate(newPod) + } else if okOld { + ks.handlePodDelete(oldPod) + } +} + +func (ks *kube2sky) handlePodDelete(obj interface{}) { + if e, ok := obj.(*kapi.Pod); ok { + if e.Status.PodIP != "" { + name := buildDNSNameString(ks.domain, podSubdomain, e.Namespace, santizeIP(e.Status.PodIP)) + ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) }) + } + } +} + +func (ks *kube2sky) generateRecordsForPod(subdomain string, service *kapi.Pod) error { + b, err := json.Marshal(getSkyMsg(service.Status.PodIP, 0)) + if err != nil { + return err + } + recordValue := string(b) + recordLabel := getHash(recordValue) + recordKey := buildDNSNameString(subdomain, recordLabel) + + glog.V(2).Infof("Setting DNS record: %v -> %q, with recordKey: %v\n", subdomain, recordValue, recordKey) + if err := ks.writeSkyRecord(recordKey, recordValue); err != nil { + return err + } + + return nil +} + func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error { b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, 0)) if err != nil { @@ -249,6 +304,10 @@ func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *k return nil } +func santizeIP(ip string) string { + return strings.Replace(ip, ".", "-", -1) +} + func buildPortSegmentString(portName string, portProtocol kapi.Protocol) string { if portName == "" { // we don't create a random name @@ -328,6 +387,11 @@ func createEndpointsLW(kubeClient *kclient.Client) *kcache.ListWatch { return kcache.NewListWatchFromClient(kubeClient, "endpoints", kapi.NamespaceAll, kSelector.Everything()) } +// Returns a cache.ListWatch that gets all changes to pods. +func createEndpointsPodLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kSelector.Everything()) +} + func (ks *kube2sky) newService(obj interface{}) { if s, ok := obj.(*kapi.Service); ok { name := buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name) @@ -411,6 +475,7 @@ func newKubeClient() (*kclient.Client, error) { return nil, err } } + if masterURL != "" && *argKubecfgFile == "" { // Only --kube_master_url was provided. config = &kclient.Config{ @@ -470,6 +535,24 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { return eStore } +func watchPods(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { + eStore, eController := kframework.NewInformer( + createEndpointsPodLW(kubeClient), + &kapi.Pod{}, + resyncPeriod, + kframework.ResourceEventHandlerFuncs{ + AddFunc: ks.handlePodCreate, + UpdateFunc: func(oldObj, newObj interface{}) { + ks.handlePodUpdate(oldObj, newObj) + }, + DeleteFunc: ks.handlePodDelete, + }, + ) + + go eController.Run(util.NeverStop) + return eStore +} + func getHash(text string) string { h := fnv.New32a() h.Write([]byte(text)) @@ -499,6 +582,7 @@ func main() { ks.endpointsStore = watchEndpoints(kubeClient, &ks) ks.servicesStore = watchForServices(kubeClient, &ks) + ks.servicesStore = watchPods(kubeClient, &ks) select {} } diff --git a/cluster/addons/dns/kube2sky/kube2sky_test.go b/cluster/addons/dns/kube2sky/kube2sky_test.go index a6e154973f9..1a410742aaf 100644 --- a/cluster/addons/dns/kube2sky/kube2sky_test.go +++ b/cluster/addons/dns/kube2sky/kube2sky_test.go @@ -82,6 +82,7 @@ const ( testDomain = "cluster.local." basePath = "/skydns/local/cluster" serviceSubDomain = "svc" + podSubDomain = "pod" ) func newKube2Sky(ec etcdClient) *kube2sky { @@ -94,8 +95,8 @@ func newKube2Sky(ec etcdClient) *kube2sky { } } -func getEtcdPathForA(name, namespace string) string { - return path.Join(basePath, serviceSubDomain, namespace, name) +func getEtcdPathForA(name, namespace, subDomain string) string { + return path.Join(basePath, subDomain, namespace, name) } func getEtcdPathForSRV(portName, protocol, name, namespace string) string { @@ -121,7 +122,7 @@ func getHostPortFromString(data string) (*hostPort, error) { } func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName, namespace string, expectedHostPort *hostPort) { - key := getEtcdPathForA(serviceName, namespace) + key := getEtcdPathForA(serviceName, namespace, serviceSubDomain) values := ec.Get(key) //require.True(t, exists) require.True(t, len(values) > 0, "entry not found.") @@ -130,6 +131,20 @@ func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName, assert.Equal(t, expectedHostPort.Host, actualHostPort.Host) } +func assertDnsPodEntryInEtcd(t *testing.T, ec *fakeEtcdClient, podIP, namespace string) { + key := getEtcdPathForA(podIP, namespace, podSubDomain) + values := ec.Get(key) + //require.True(t, exists) + require.True(t, len(values) > 0, "entry not found.") +} + +func assertDnsPodEntryNotInEtcd(t *testing.T, ec *fakeEtcdClient, podIP, namespace string) { + key := getEtcdPathForA(podIP, namespace, podSubDomain) + values := ec.Get(key) + //require.True(t, exists) + require.True(t, len(values) == 0, "entry found.") +} + func assertSRVEntryInEtcd(t *testing.T, ec *fakeEtcdClient, portName, protocol, serviceName, namespace string, expectedPortNumber, expectedEntriesCount int) { srvKey := getEtcdPathForSRV(portName, protocol, serviceName, namespace) values := ec.Get(srvKey) @@ -173,6 +188,20 @@ func newService(namespace, serviceName, clusterIP, portName string, portNumber i return service } +func newPod(namespace, podName, podIP string) kapi.Pod { + pod := kapi.Pod{ + ObjectMeta: kapi.ObjectMeta{ + Name: podName, + Namespace: namespace, + }, + Status: kapi.PodStatus{ + PodIP: podIP, + }, + } + + return pod +} + func newSubset() kapi.EndpointSubset { subset := kapi.EndpointSubset{ Addresses: []kapi.EndpointAddress{}, @@ -393,3 +422,46 @@ func TestBuildDNSName(t *testing.T) { newExpectedDNSName := "00.name.ns.svc.cluster.local." assert.Equal(t, newExpectedDNSName, buildDNSNameString(expectedDNSName, "00")) } + +func TestPodDns(t *testing.T) { + const ( + testPodIP = "1.2.3.4" + sanitizedPodIP = "1-2-3-4" + testNamespace = "default" + testPodName = "testPod" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + + // create pod without ip address yet + pod := newPod(testNamespace, testPodName, "") + k2s.handlePodCreate(&pod) + assert.Empty(t, ec.writes) + + // create pod + pod = newPod(testNamespace, testPodName, testPodIP) + k2s.handlePodCreate(&pod) + assertDnsPodEntryInEtcd(t, ec, sanitizedPodIP, testNamespace) + + // update pod with same ip + newPod := pod + newPod.Status.PodIP = testPodIP + k2s.handlePodUpdate(&pod, &newPod) + assertDnsPodEntryInEtcd(t, ec, sanitizedPodIP, testNamespace) + + // update pod with different ip's + newPod = pod + newPod.Status.PodIP = "4.3.2.1" + k2s.handlePodUpdate(&pod, &newPod) + assertDnsPodEntryInEtcd(t, ec, "4-3-2-1", testNamespace) + assertDnsPodEntryNotInEtcd(t, ec, "1-2-3-4", testNamespace) + + // Delete the pod + k2s.handlePodDelete(&newPod) + assert.Empty(t, ec.writes) +} + +func TestSanitizeIP(t *testing.T) { + expectedIP := "1-2-3-4" + assert.Equal(t, expectedIP, santizeIP("1.2.3.4")) +}