From 3ada2170a320b4bee15d6b1d372d0f9e0cadb838 Mon Sep 17 00:00:00 2001 From: Abhishek Shah Date: Wed, 18 May 2016 10:33:17 -0700 Subject: [PATCH] pr feedback --- .../saltbase/salt/kube-dns/kubedns-rc.yaml.in | 5 +- cmd/kube-dns/app/options/options.go | 6 +- cmd/kube-dns/app/server.go | 11 +- cmd/kube-dns/dns.go | 4 +- pkg/dns/dns.go | 136 ++++++---- pkg/dns/dns_test.go | 52 ++-- pkg/dns/treecache.go | 245 +++++++----------- 7 files changed, 218 insertions(+), 241 deletions(-) diff --git a/cluster/saltbase/salt/kube-dns/kubedns-rc.yaml.in b/cluster/saltbase/salt/kube-dns/kubedns-rc.yaml.in index 631939edc6f..985d1066e4e 100644 --- a/cluster/saltbase/salt/kube-dns/kubedns-rc.yaml.in +++ b/cluster/saltbase/salt/kube-dns/kubedns-rc.yaml.in @@ -19,12 +19,9 @@ spec: version: v12 kubernetes.io/cluster-service: "true" spec: -{% if grains['cloud'] is defined and grains['cloud'] in [ 'vsphere', 'photon-controller' ] %} - hostNetwork: true -{% endif %} containers: - name: kubedns - image: artfulcoder/kubedns-amd64:1.0 + image: gcr.io/google_containers/kubedns-amd64:1.0 resources: # TODO: Set memory limits when we've profiled the container for large # clusters, then set request = limit to keep this container in diff --git a/cmd/kube-dns/app/options/options.go b/cmd/kube-dns/app/options/options.go index 973a24901a9..8991ae2e6f0 100644 --- a/cmd/kube-dns/app/options/options.go +++ b/cmd/kube-dns/app/options/options.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -52,8 +52,8 @@ func (m clusterDomainVar) Set(v string) error { v = strings.TrimSuffix(v, ".") segments := strings.Split(v, ".") for _, segment := range segments { - if !validation.IsDNS1123Label(segment) { - return fmt.Errorf("Not a valid DNS label") + if errs := validation.IsDNS1123Label(segment); len(errs) > 0 { + return fmt.Errorf("Not a valid DNS label. %v", errs) } } if !strings.HasSuffix(v, ".") { diff --git a/cmd/kube-dns/app/server.go b/cmd/kube-dns/app/server.go index d7d3d20f1d8..07637469d2f 100644 --- a/cmd/kube-dns/app/server.go +++ b/cmd/kube-dns/app/server.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import ( "syscall" "github.com/golang/glog" - "github.com/skynetservices/skydns/metrics" "github.com/skynetservices/skydns/server" "k8s.io/kubernetes/cmd/kube-dns/app/options" @@ -104,7 +103,13 @@ func (server *KubeDNSServer) setupHealthzHandlers() { fmt.Fprintf(w, "ok\n") }) http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) { - fmt.Fprint(w, server.kd.GetCacheAsJSON()) + serializedJSON, err := server.kd.GetCacheAsJSON() + if err == nil { + fmt.Fprint(w, serializedJSON) + } else { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, err) + } }) } diff --git a/cmd/kube-dns/dns.go b/cmd/kube-dns/dns.go index 07d4661d855..2e08a002939 100644 --- a/cmd/kube-dns/dns.go +++ b/cmd/kube-dns/dns.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -23,11 +23,9 @@ import ( "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/flag" "k8s.io/kubernetes/pkg/version/verflag" - "runtime" ) func main() { - runtime.GOMAXPROCS(runtime.NumCPU()) config := options.NewKubeDNSConfig() config.AddFlags(pflag.CommandLine) diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 6b184ce26c8..d991c510f55 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,13 +19,14 @@ package dns import ( "encoding/json" "fmt" - "github.com/golang/glog" "hash/fnv" "net" "strings" + "sync" "time" etcd "github.com/coreos/etcd/client" + "github.com/golang/glog" skymsg "github.com/skynetservices/skydns/msg" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/endpoints" @@ -47,20 +48,40 @@ const ( podSubdomain = "pod" // Resync period for the kube controller loop. - resyncPeriod = 30 * time.Minute + resyncPeriod = 5 * time.Minute ) type KubeDNS struct { + // kubeClient makes calls to API Server and registers calls with API Server + // to get Endpoints and Service objects. kubeClient *kclient.Client - // DNS domain name. + + // The domain for which this DNS Server is authoritative. domain string + // A cache that contains all the endpoints in the system. endpointsStore kcache.Store + // A cache that contains all the services in the system. - servicesStore kcache.Store - cache *TreeCache - domainPath []string - eController *kframework.Controller + servicesStore kcache.Store + + // stores DNS records for the domain. + // A Records and SRV Records for (regular) services and headless Services. + cache *TreeCache + + // caller is responsible for using the cacheLock before invoking methods on cache + // the cache is not thread-safe, and the caller can guarantee thread safety by using + // the cacheLock + cacheLock sync.RWMutex + + // The domain for which this DNS Server is authoritative, in array format and reversed. + // e.g. if domain is "cluster.local", domainPath is []string{"local", "cluster"} + domainPath []string + + // endpointsController invokes registered callbacks when endpoints change. + endpointsController *kframework.Controller + + // serviceController invokes registered callbacks when services change. serviceController *kframework.Controller } @@ -69,6 +90,7 @@ func NewKubeDNS(client *kclient.Client, domain string) *KubeDNS { kubeClient: client, domain: domain, cache: NewTreeCache(), + cacheLock: sync.RWMutex{}, domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), } kd.setEndpointsStore() @@ -77,11 +99,14 @@ func NewKubeDNS(client *kclient.Client, domain string) *KubeDNS { } func (kd *KubeDNS) Start() { - go kd.eController.Run(wait.NeverStop) + go kd.endpointsController.Run(wait.NeverStop) go kd.serviceController.Run(wait.NeverStop) // Wait synchronously for the Kubernetes service and add a DNS record for it. - // TODO (abshah) UNCOMMENT AFTER TEST COMPLETE - //kd.waitForKubernetesService() + // This ensures that the Start function returns only after having received Service objects + // from APIServer. + // TODO: we might not have to wait for kubernetes service specifically. We should just wait + // for a list operation to be complete from APIServer. + kd.waitForKubernetesService() } func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) { @@ -101,9 +126,11 @@ func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) { return } -func (kd *KubeDNS) GetCacheAsJSON() string { - json, _ := kd.cache.Serialize("") - return json +func (kd *KubeDNS) GetCacheAsJSON() (string, error) { + kd.cacheLock.RLock() + defer kd.cacheLock.RUnlock() + json, err := kd.cache.Serialize() + return json, err } func (kd *KubeDNS) setServicesStore() { @@ -124,7 +151,7 @@ func (kd *KubeDNS) setServicesStore() { func (kd *KubeDNS) setEndpointsStore() { // Returns a cache.ListWatch that gets all changes to endpoints. endpointsWatch := kcache.NewListWatchFromClient(kd.kubeClient, "endpoints", kapi.NamespaceAll, kselector.Everything()) - kd.endpointsStore, kd.eController = kframework.NewInformer( + kd.endpointsStore, kd.endpointsController = kframework.NewInformer( endpointsWatch, &kapi.Endpoints{}, resyncPeriod, @@ -138,24 +165,35 @@ func (kd *KubeDNS) setEndpointsStore() { ) } -func (kd *KubeDNS) newService(obj interface{}) { +func assertIsService(obj interface{}) (*kapi.Service, bool) { if service, ok := obj.(*kapi.Service); ok { + return service, ok + } else { + glog.Errorf("Type assertion failed! Expected 'Service', got %T", service) + return nil, ok + } +} + +func (kd *KubeDNS) newService(obj interface{}) { + if service, ok := assertIsService(obj); ok { // if ClusterIP is not set, a DNS entry should not be created if !kapi.IsServiceIPSet(service) { kd.newHeadlessService(service) return } if len(service.Spec.Ports) == 0 { - glog.Info("Unexpected service with no ports, this should not have happend: %v", service) + glog.Warning("Unexpected service with no ports, this should not have happend: %v", service) } kd.newPortalService(service) } } func (kd *KubeDNS) removeService(obj interface{}) { - if s, ok := obj.(*kapi.Service); ok { + if s, ok := assertIsService(obj); ok { subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name) - kd.cache.DeletePath(subCachePath...) + kd.cacheLock.Lock() + defer kd.cacheLock.Unlock() + kd.cache.deletePath(subCachePath...) } } @@ -194,7 +232,7 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace) return nil, nil } - if svc, ok := obj.(*kapi.Service); ok { + if svc, ok := assertIsService(obj); ok { return svc, nil } return nil, fmt.Errorf("got a non service object in services store %v", obj) @@ -203,18 +241,20 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er func (kd *KubeDNS) newPortalService(service *kapi.Service) { subCache := NewTreeCache() recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0) - subCache.SetEntry(recordLabel, recordValue) + subCache.setEntry(recordLabel, recordValue) // Generate SRV Records for i := range service.Spec.Ports { port := &service.Spec.Ports[i] if port.Name != "" && port.Protocol != "" { srvValue := kd.generateSRVRecordValue(service, int(port.Port)) - subCache.SetEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name) + subCache.setEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name) } } subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace) - kd.cache.SetSubCache(service.Name, subCache, subCachePath...) + kd.cacheLock.Lock() + defer kd.cacheLock.Unlock() + kd.cache.setSubCache(service.Name, subCache, subCachePath...) } func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error { @@ -233,18 +273,20 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap if hostLabel, exists := getHostname(address, podHostnames); exists { endpointName = hostLabel } - subCache.SetEntry(endpointName, recordValue) + subCache.setEntry(endpointName, recordValue) for portIdx := range e.Subsets[idx].Ports { endpointPort := &e.Subsets[idx].Ports[portIdx] if endpointPort.Name != "" && endpointPort.Protocol != "" { srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName) - subCache.SetEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name) + subCache.setEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name) } } } } subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace) - kd.cache.SetSubCache(svc.Name, subCache, subCachePath...) + kd.cacheLock.Lock() + defer kd.cacheLock.Unlock() + kd.cache.setSubCache(svc.Name, subCache, subCachePath...) return nil } @@ -252,7 +294,7 @@ func getHostname(address *kapi.EndpointAddress, podHostnames map[string]endpoint if len(address.Hostname) > 0 { return address.Hostname, true } - if hostRecord, exists := podHostnames[address.IP]; exists && validation.IsDNS1123Label(hostRecord.HostName) { + if hostRecord, exists := podHostnames[address.IP]; exists && len(validation.IsDNS1123Label(hostRecord.HostName)) == 0 { return hostRecord.HostName, true } return "", false @@ -272,12 +314,12 @@ func getPodHostnamesFromAnnotation(annotations map[string]string) (map[string]en return hostnames, nil } -func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, cNameLabels ...string) *skymsg.Service { - cName := strings.Join([]string{svc.Name, svc.Namespace, serviceSubdomain, kd.domain}, ".") - for _, cNameLabel := range cNameLabels { - cName = cNameLabel + "." + cName +func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, labels ...string) *skymsg.Service { + host := strings.Join([]string{svc.Name, svc.Namespace, serviceSubdomain, kd.domain}, ".") + for _, cNameLabel := range labels { + host = cNameLabel + "." + host } - recordValue, _ := getSkyMsg(cName, portNumber) + recordValue, _ := getSkyMsg(host, portNumber) return recordValue } @@ -312,9 +354,10 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) { segments := strings.Split(trimmed, ".") path := reverseArray(segments) if kd.isPodRecord(path) { - response, err := kd.getPodRecord(path) + ip, err := kd.getPodIP(path) if err == nil { - return []skymsg.Service{*response}, nil + skyMsg, _ := getSkyMsg(ip, 0) + return []skymsg.Service{*skyMsg}, nil } return nil, err } @@ -324,15 +367,17 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) { if key == "" { return []skymsg.Service{}, nil } - if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok { + kd.cacheLock.RLock() + defer kd.cacheLock.RUnlock() + if record, ok := kd.cache.getEntry(key, path[:len(path)-1]...); ok { return []skymsg.Service{*(record.(*skymsg.Service))}, nil } return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} } - // tmp, _ := kd.cache.Serialize("") - // glog.Infof("Searching path:%q, %v", path, tmp) - records := kd.cache.GetValuesForPathWithRegex(path...) + kd.cacheLock.RLock() + defer kd.cacheLock.RUnlock() + records := kd.cache.getValuesForPathWithWildcards(path...) retval := []skymsg.Service{} for _, val := range records { retval = append(retval, *(val.(*skymsg.Service))) @@ -350,7 +395,7 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) { segments := strings.Split(strings.TrimRight(name, "."), ".") for _, k := range segments { - if k == "*" || k == "any" { + if k == "*" { return nil, fmt.Errorf("reverse can not contain wildcards") } } @@ -374,20 +419,13 @@ func (kd *KubeDNS) isPodRecord(path []string) bool { return true } -func (kd *KubeDNS) getPodRecord(path []string) (*skymsg.Service, error) { +func (kd *KubeDNS) getPodIP(path []string) (string, error) { ipStr := path[len(path)-1] ip := strings.Replace(ipStr, "-", ".", -1) if parsed := net.ParseIP(ip); parsed != nil { - msg := &skymsg.Service{ - Host: ip, - Port: 0, - Priority: 10, - Weight: 10, - Ttl: 30, - } - return msg, nil + return ip, nil } - return nil, fmt.Errorf("Invalid IP Address %v", ip) + return "", fmt.Errorf("Invalid IP Address %v", ip) } // Returns record in a format that SkyDNS understands. diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index 9a7a6e61e90..7aa4316804e 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,7 +18,9 @@ package dns import ( "fmt" + "net" "strings" + "sync" "testing" skymsg "github.com/skynetservices/skydns/msg" @@ -26,16 +28,12 @@ import ( "github.com/stretchr/testify/require" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" - "net" ) const ( - testDomain = "cluster.local." - basePath = "/skydns/local/cluster" - serviceSubDomain = "svc" - podSubDomain = "pod" - testService = "testservice" - testNamespace = "default" + testDomain = "cluster.local." + testService = "testservice" + testNamespace = "default" ) func newKubeDNS() *KubeDNS { @@ -44,6 +42,7 @@ func newKubeDNS() *KubeDNS { endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), cache: NewTreeCache(), + cacheLock: sync.RWMutex{}, domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")), } return kd @@ -53,7 +52,6 @@ func TestPodDns(t *testing.T) { const ( testPodIP = "1.2.3.4" sanitizedPodIP = "1-2-3-4" - testPodName = "testPod" ) kd := newKubeDNS() @@ -240,9 +238,7 @@ func newEndpoints(service *kapi.Service, subsets ...kapi.EndpointSubset) *kapi.E Subsets: []kapi.EndpointSubset{}, } - for _, subset := range subsets { - endpoints.Subsets = append(endpoints.Subsets, subset) - } + endpoints.Subsets = append(endpoints.Subsets, subsets...) return &endpoints } @@ -310,7 +306,7 @@ func assertCNameRecordsMatchEndpointIPs(t *testing.T, kd *KubeDNS, e []kapi.Endp assert.Equal(t, len(e), len(records), "unexpected record count") for _, record := range records { _, found := endpoints[getIPForCName(t, kd, record.Host)] - assert.True(t, found, "Did not endpoint with address:%s", record.Host) + assert.True(t, found, "Did not find endpoint with address:%s", record.Host) } } @@ -342,20 +338,18 @@ func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portNam } func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) { - records, err := kd.Records(getServiceFQDN(kd, s), false) - require.Error(t, err) - assert.Equal(t, 0, len(records)) + serviceFQDN := getServiceFQDN(kd, s) + queries := getEquivalentQueries(serviceFQDN, s.Namespace) + for _, query := range queries { + records, err := kd.Records(query, false) + require.Error(t, err) + assert.Equal(t, 0, len(records)) + } } func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) { serviceFQDN := getServiceFQDN(kd, s) - queries := []string{ - serviceFQDN, - strings.Replace(serviceFQDN, ".svc.", ".*.", 1), - strings.Replace(serviceFQDN, s.Namespace, "*", 1), - strings.Replace(strings.Replace(serviceFQDN, s.Namespace, "*", 1), ".svc.", ".*.", 1), - "*." + serviceFQDN, - } + queries := getEquivalentQueries(serviceFQDN, s.Namespace) for _, query := range queries { records, err := kd.Records(query, false) require.NoError(t, err) @@ -364,12 +358,22 @@ func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) { } } +func getEquivalentQueries(serviceFQDN, namespace string) []string { + return []string{ + serviceFQDN, + strings.Replace(serviceFQDN, ".svc.", ".*.", 1), + strings.Replace(serviceFQDN, namespace, "*", 1), + strings.Replace(strings.Replace(serviceFQDN, namespace, "*", 1), ".svc.", ".*.", 1), + "*." + serviceFQDN, + } +} + func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string { return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain) } func getEndpointsFQDN(kd *KubeDNS, e *kapi.Endpoints) string { - return fmt.Sprintf("%s.%s.svc.%s", e.ObjectMeta.Name, e.ObjectMeta.Namespace, kd.domain) + return fmt.Sprintf("%s.%s.svc.%s", e.Name, e.Namespace, kd.domain) } func getSRVFQDN(kd *KubeDNS, s *kapi.Service, portName string) string { diff --git a/pkg/dns/treecache.go b/pkg/dns/treecache.go index ee3ba206ee7..8403f445415 100644 --- a/pkg/dns/treecache.go +++ b/pkg/dns/treecache.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors All rights reserved. +Copyright 2016 The Kubernetes Authors All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,136 +18,65 @@ package dns import ( "bytes" - "crypto/md5" "encoding/json" - "fmt" - "io/ioutil" - "os" - "path" - "reflect" "strings" - "sync" ) -const ( - dataFile = "data.dat" - crcFile = "data.crc" -) - -type object interface{} - type TreeCache struct { ChildNodes map[string]*TreeCache Entries map[string]interface{} - m *sync.RWMutex } func NewTreeCache() *TreeCache { return &TreeCache{ ChildNodes: make(map[string]*TreeCache), Entries: make(map[string]interface{}), - m: &sync.RWMutex{}, } } -func Deserialize(dir string) (*TreeCache, error) { - b, err := ioutil.ReadFile(path.Join(dir, dataFile)) - if err != nil { - return nil, err - } - var hash []byte - hash, err = ioutil.ReadFile(path.Join(dir, crcFile)) - if err != nil { - return nil, err - } - if !reflect.DeepEqual(hash, getMD5(b)) { - return nil, fmt.Errorf("Checksum failed") - } - - var cache TreeCache - err = json.Unmarshal(b, &cache) - if err != nil { - return nil, err - } - cache.m = &sync.RWMutex{} - return &cache, nil -} - -func (cache *TreeCache) Serialize(dir string) (string, error) { - cache.m.RLock() - defer cache.m.RUnlock() +func (cache *TreeCache) Serialize() (string, error) { b, err := json.Marshal(cache) if err != nil { return "", err } - if len(dir) == 0 { - var prettyJSON bytes.Buffer - err = json.Indent(&prettyJSON, b, "", "\t") + var prettyJSON bytes.Buffer + err = json.Indent(&prettyJSON, b, "", "\t") - if err != nil { - return "", err - } - return string(prettyJSON.Bytes()), nil - } - if err := ensureDir(dir, os.FileMode(0755)); err != nil { + if err != nil { return "", err } - if err := ioutil.WriteFile(path.Join(dir, dataFile), b, 0644); err != nil { - return "", err - } - if err := ioutil.WriteFile(path.Join(dir, crcFile), getMD5(b), 0644); err != nil { - return "", err - } - return string(b), nil + return string(prettyJSON.Bytes()), nil } -func (cache *TreeCache) SetEntry(key string, val interface{}, path ...string) { - cache.m.Lock() - defer cache.m.Unlock() +func (cache *TreeCache) setEntry(key string, val interface{}, path ...string) { node := cache.ensureChildNode(path...) node.Entries[key] = val } -func (cache *TreeCache) ReplaceEntries(entries map[string]interface{}, path ...string) { - cache.m.Lock() - defer cache.m.Unlock() - node := cache.ensureChildNode(path...) - node.Entries = make(map[string]interface{}) - for key, val := range entries { - node.Entries[key] = val - } -} - -func (cache *TreeCache) GetSubCache(path ...string) *TreeCache { +func (cache *TreeCache) getSubCache(path ...string) *TreeCache { childCache := cache for _, subpath := range path { childCache = childCache.ChildNodes[subpath] if childCache == nil { - return childCache + return nil } } return childCache } -func (cache *TreeCache) SetSubCache(key string, subCache *TreeCache, path ...string) { - cache.m.Lock() - defer cache.m.Unlock() +func (cache *TreeCache) setSubCache(key string, subCache *TreeCache, path ...string) { node := cache.ensureChildNode(path...) node.ChildNodes[key] = subCache } -func (cache *TreeCache) GetEntry(key string, path ...string) (interface{}, bool) { - cache.m.RLock() - defer cache.m.RUnlock() - childNode := cache.GetSubCache(path...) +func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool) { + childNode := cache.getSubCache(path...) val, ok := childNode.Entries[key] return val, ok } -func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{} { - cache.m.RLock() - defer cache.m.RUnlock() +func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interface{} { retval := []interface{}{} nodesToExplore := []*TreeCache{cache} for idx, subpath := range path { @@ -155,7 +84,7 @@ func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{} if idx == len(path)-1 { // if path ends on an entry, instead of a child node, add the entry for _, node := range nodesToExplore { - if subpath == "*" || subpath == "any" { + if subpath == "*" { nextNodesToExplore = append(nextNodesToExplore, node) } else { if val, ok := node.Entries[subpath]; ok { @@ -172,7 +101,7 @@ func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{} break } - if subpath == "*" || subpath == "any" { + if subpath == "*" { for _, node := range nodesToExplore { for subkey, subnode := range node.ChildNodes { if !strings.HasPrefix(subkey, "_") { @@ -200,26 +129,11 @@ func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{} return retval } -func (cache *TreeCache) GetEntries(recursive bool, path ...string) []interface{} { - cache.m.RLock() - defer cache.m.RUnlock() - childNode := cache.GetSubCache(path...) - if childNode == nil { - return nil - } - - retval := [][]interface{}{{}} - childNode.appendValues(recursive, retval) - return retval[0] -} - -func (cache *TreeCache) DeletePath(path ...string) bool { +func (cache *TreeCache) deletePath(path ...string) bool { if len(path) == 0 { return false } - cache.m.Lock() - defer cache.m.Unlock() - if parentNode := cache.GetSubCache(path[:len(path)-1]...); parentNode != nil { + if parentNode := cache.getSubCache(path[:len(path)-1]...); parentNode != nil { if _, ok := parentNode.ChildNodes[path[len(path)-1]]; ok { delete(parentNode.ChildNodes, path[len(path)-1]) return true @@ -228,10 +142,8 @@ func (cache *TreeCache) DeletePath(path ...string) bool { return false } -func (tn *TreeCache) DeleteEntry(key string, path ...string) bool { - tn.m.Lock() - defer tn.m.Unlock() - childNode := tn.GetSubCache(path...) +func (cache *TreeCache) deleteEntry(key string, path ...string) bool { + childNode := cache.getSubCache(path...) if childNode == nil { return false } @@ -242,22 +154,22 @@ func (tn *TreeCache) DeleteEntry(key string, path ...string) bool { return false } -func (tn *TreeCache) appendValues(recursive bool, ref [][]interface{}) { - for _, value := range tn.Entries { +func (cache *TreeCache) appendValues(recursive bool, ref [][]interface{}) { + for _, value := range cache.Entries { ref[0] = append(ref[0], value) } if recursive { - for _, node := range tn.ChildNodes { + for _, node := range cache.ChildNodes { node.appendValues(recursive, ref) } } } -func (tn *TreeCache) ensureChildNode(path ...string) *TreeCache { - childNode := tn +func (cache *TreeCache) ensureChildNode(path ...string) *TreeCache { + childNode := cache for _, subpath := range path { - newNode := childNode.ChildNodes[subpath] - if newNode == nil { + newNode, ok := childNode.ChildNodes[subpath] + if !ok { newNode = NewTreeCache() childNode.ChildNodes[subpath] = newNode } @@ -266,47 +178,70 @@ func (tn *TreeCache) ensureChildNode(path ...string) *TreeCache { return childNode } -func ensureDir(path string, perm os.FileMode) error { - s, err := os.Stat(path) - if err != nil || !s.IsDir() { - return os.Mkdir(path, perm) - } - return nil -} +// unused function. keeping it around in commented-fashion +// in the future, we might need some form of this function so that +// we can serialize to a file in a mounted empty dir.. +//const ( +// dataFile = "data.dat" +// crcFile = "data.crc" +//) +//func (cache *TreeCache) Serialize(dir string) (string, error) { +// cache.m.RLock() +// defer cache.m.RUnlock() +// b, err := json.Marshal(cache) +// if err != nil { +// return "", err +// } +// +// if err := ensureDir(dir, os.FileMode(0755)); err != nil { +// return "", err +// } +// if err := ioutil.WriteFile(path.Join(dir, dataFile), b, 0644); err != nil { +// return "", err +// } +// if err := ioutil.WriteFile(path.Join(dir, crcFile), getMD5(b), 0644); err != nil { +// return "", err +// } +// return string(b), nil +//} -func getMD5(b []byte) []byte { - h := md5.New() - h.Write(b) - return []byte(fmt.Sprintf("%x", h.Sum(nil))) -} +//func ensureDir(path string, perm os.FileMode) error { +// s, err := os.Stat(path) +// if err != nil || !s.IsDir() { +// return os.Mkdir(path, perm) +// } +// return nil +//} -func main() { - root := NewTreeCache() - fmt.Println("Adding Entries") - root.SetEntry("k", "v") - root.SetEntry("foo", "bar", "local") - root.SetEntry("foo1", "bar1", "local", "cluster") +//func getMD5(b []byte) []byte { +// h := md5.New() +// h.Write(b) +// return []byte(fmt.Sprintf("%x", h.Sum(nil))) +//} - fmt.Println("Fetching Entries") - for _, entry := range root.GetEntries(true, "local") { - fmt.Printf("%s\n", entry) - } - - fmt.Println("Serializing") - if _, err := root.Serialize("./foo"); err != nil { - fmt.Printf("Serialization Error: %v,\n", err) - return - } - - fmt.Println("Deserializing") - tn, err := Deserialize("./foo") - if err != nil { - fmt.Printf("Deserialization Error: %v\n", err) - return - } - - fmt.Println("Fetching Entries") - for _, entry := range tn.GetEntries(true, "local") { - fmt.Printf("%s\n", entry) - } -} +// unused function. keeping it around in commented-fashion +// in the future, we might need some form of this function so that +// we can restart kube-dns, deserialize the tree and have a cache +// without having to wait for kube-dns to reach out to API server. +//func Deserialize(dir string) (*TreeCache, error) { +// b, err := ioutil.ReadFile(path.Join(dir, dataFile)) +// if err != nil { +// return nil, err +// } +// +// hash, err := ioutil.ReadFile(path.Join(dir, crcFile)) +// if err != nil { +// return nil, err +// } +// if !reflect.DeepEqual(hash, getMD5(b)) { +// return nil, fmt.Errorf("Checksum failed") +// } +// +// var cache TreeCache +// err = json.Unmarshal(b, &cache) +// if err != nil { +// return nil, err +// } +// cache.m = &sync.RWMutex{} +// return &cache, nil +//}