From 65e421b1d74aaf1cf07b880ea6c40fbd1cb6ffc9 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Tue, 1 Nov 2016 14:50:28 -0700 Subject: [PATCH 1/2] Move treecache to its own package, add unit test --- pkg/dns/dns_test.go | 3 +- pkg/dns/treecache/BUILD | 26 +++++ pkg/dns/{ => treecache}/treecache.go | 157 ++++++++------------------ pkg/dns/treecache/treecache_test.go | 161 +++++++++++++++++++++++++++ 4 files changed, 237 insertions(+), 110 deletions(-) create mode 100644 pkg/dns/treecache/BUILD rename pkg/dns/{ => treecache}/treecache.go (54%) create mode 100644 pkg/dns/treecache/treecache_test.go diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index b0a39a57a10..019d78f36c5 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/dns/treecache" "k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/util/sets" ) @@ -51,7 +52,7 @@ func newKubeDNS() *KubeDNS { domain: testDomain, endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), - cache: NewTreeCache(), + cache: treecache.NewTreeCache(), reverseRecordMap: make(map[string]*skymsg.Service), clusterIPServiceMap: make(map[string]*kapi.Service), cacheLock: sync.RWMutex{}, diff --git a/pkg/dns/treecache/BUILD b/pkg/dns/treecache/BUILD new file mode 100644 index 00000000000..d721c5160be --- /dev/null +++ b/pkg/dns/treecache/BUILD @@ -0,0 +1,26 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["treecache.go"], + tags = ["automanaged"], + deps = ["//vendor:github.com/skynetservices/skydns/msg"], +) + +go_test( + name = "go_default_test", + srcs = ["treecache_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = ["//vendor:github.com/skynetservices/skydns/msg"], +) diff --git a/pkg/dns/treecache.go b/pkg/dns/treecache/treecache.go similarity index 54% rename from pkg/dns/treecache.go rename to pkg/dns/treecache/treecache.go index 4ed1f52ef4e..b4ddbfb56d2 100644 --- a/pkg/dns/treecache.go +++ b/pkg/dns/treecache/treecache.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dns +package treecache import ( "encoding/json" @@ -23,19 +23,48 @@ import ( skymsg "github.com/skynetservices/skydns/msg" ) -type TreeCache struct { - ChildNodes map[string]*TreeCache +type TreeCache interface { + // GetEntry with the given key for the given path. + GetEntry(key string, path ...string) (interface{}, bool) + + // Get a list of values including wildcards labels (e.g. "*"). + GetValuesForPathWithWildcards(path ...string) []*skymsg.Service + + // SetEntry creates the entire path if it doesn't already exist in + // the cache, then sets the given service record under the given + // key. The path this entry would have occupied in an etcd datastore + // is computed from the given fqdn and stored as the "Key" of the + // skydns service; this is only required because skydns expects the + // service record to contain a key in a specific format (presumably + // for legacy compatibility). Note that the fqnd string typically + // contains both the key and all elements in the path. + SetEntry(key string, val *skymsg.Service, fqdn string, path ...string) + + // SetSubCache inserts the given subtree under the given + // path:key. Usually the key is the name of a Kubernetes Service, + // and the path maps to the cluster subdomains matching the Service. + SetSubCache(key string, subCache TreeCache, path ...string) + + // DeletePath removes all entries associated with a given path. + DeletePath(path ...string) bool + + // Serialize dumps a JSON representation of the cache. + Serialize() (string, error) +} + +type treeCache struct { + ChildNodes map[string]*treeCache Entries map[string]interface{} } -func NewTreeCache() *TreeCache { - return &TreeCache{ - ChildNodes: make(map[string]*TreeCache), +func NewTreeCache() TreeCache { + return &treeCache{ + ChildNodes: make(map[string]*treeCache), Entries: make(map[string]interface{}), } } -func (cache *TreeCache) Serialize() (string, error) { +func (cache *treeCache) Serialize() (string, error) { prettyJSON, err := json.MarshalIndent(cache, "", "\t") if err != nil { return "", err @@ -43,14 +72,7 @@ func (cache *TreeCache) Serialize() (string, error) { return string(prettyJSON), nil } -// setEntry creates the entire path if it doesn't already exist in the cache, -// then sets the given service record under the given key. The path this entry -// would have occupied in an etcd datastore is computed from the given fqdn and -// stored as the "Key" of the skydns service; this is only required because -// skydns expects the service record to contain a key in a specific format -// (presumably for legacy compatibility). Note that the fqnd string typically -// contains both the key and all elements in the path. -func (cache *TreeCache) setEntry(key string, val *skymsg.Service, fqdn string, path ...string) { +func (cache *treeCache) SetEntry(key string, val *skymsg.Service, fqdn string, path ...string) { // TODO: Consolidate setEntry and setSubCache into a single method with a // type switch. // TODO: Instead of passing the fqdn as an argument, we can reconstruct @@ -70,7 +92,7 @@ func (cache *TreeCache) setEntry(key string, val *skymsg.Service, fqdn string, p node.Entries[key] = val } -func (cache *TreeCache) getSubCache(path ...string) *TreeCache { +func (cache *treeCache) getSubCache(path ...string) *treeCache { childCache := cache for _, subpath := range path { childCache = childCache.ChildNodes[subpath] @@ -81,15 +103,12 @@ func (cache *TreeCache) getSubCache(path ...string) *TreeCache { return childCache } -// setSubCache inserts the given subtree under the given path:key. Usually the -// key is the name of a Kubernetes Service, and the path maps to the cluster -// subdomains matching the Service. -func (cache *TreeCache) setSubCache(key string, subCache *TreeCache, path ...string) { +func (cache *treeCache) SetSubCache(key string, subCache TreeCache, path ...string) { node := cache.ensureChildNode(path...) - node.ChildNodes[key] = subCache + node.ChildNodes[key] = subCache.(*treeCache) } -func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool) { +func (cache *treeCache) GetEntry(key string, path ...string) (interface{}, bool) { childNode := cache.getSubCache(path...) if childNode == nil { return nil, false @@ -98,11 +117,11 @@ func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool) return val, ok } -func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []*skymsg.Service { +func (cache *treeCache) GetValuesForPathWithWildcards(path ...string) []*skymsg.Service { retval := []*skymsg.Service{} - nodesToExplore := []*TreeCache{cache} + nodesToExplore := []*treeCache{cache} for idx, subpath := range path { - nextNodesToExplore := []*TreeCache{} + nextNodesToExplore := []*treeCache{} if idx == len(path)-1 { // if path ends on an entry, instead of a child node, add the entry for _, node := range nodesToExplore { @@ -150,7 +169,7 @@ func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []*skymsg. return retval } -func (cache *TreeCache) deletePath(path ...string) bool { +func (cache *treeCache) DeletePath(path ...string) bool { if len(path) == 0 { return false } @@ -169,19 +188,7 @@ func (cache *TreeCache) deletePath(path ...string) bool { return false } -func (cache *TreeCache) deleteEntry(key string, path ...string) bool { - childNode := cache.getSubCache(path...) - if childNode == nil { - return false - } - if _, ok := childNode.Entries[key]; ok { - delete(childNode.Entries, key) - return true - } - return false -} - -func (cache *TreeCache) appendValues(recursive bool, ref [][]interface{}) { +func (cache *treeCache) appendValues(recursive bool, ref [][]interface{}) { for _, value := range cache.Entries { ref[0] = append(ref[0], value) } @@ -192,83 +199,15 @@ func (cache *TreeCache) appendValues(recursive bool, ref [][]interface{}) { } } -func (cache *TreeCache) ensureChildNode(path ...string) *TreeCache { +func (cache *treeCache) ensureChildNode(path ...string) *treeCache { childNode := cache for _, subpath := range path { newNode, ok := childNode.ChildNodes[subpath] if !ok { - newNode = NewTreeCache() + newNode = NewTreeCache().(*treeCache) childNode.ChildNodes[subpath] = newNode } childNode = newNode } return childNode } - -// unused function. keeping it around in commented-fashion -// in the future, we might need some form of this function so that -// we can serialize to a file in a mounted empty dir.. -//const ( -// dataFile = "data.dat" -// crcFile = "data.crc" -//) -//func (cache *TreeCache) Serialize(dir string) (string, error) { -// cache.m.RLock() -// defer cache.m.RUnlock() -// b, err := json.Marshal(cache) -// if err != nil { -// return "", err -// } -// -// if err := ensureDir(dir, os.FileMode(0755)); err != nil { -// return "", err -// } -// if err := ioutil.WriteFile(path.Join(dir, dataFile), b, 0644); err != nil { -// return "", err -// } -// if err := ioutil.WriteFile(path.Join(dir, crcFile), getMD5(b), 0644); err != nil { -// return "", err -// } -// return string(b), nil -//} - -//func ensureDir(path string, perm os.FileMode) error { -// s, err := os.Stat(path) -// if err != nil || !s.IsDir() { -// return os.Mkdir(path, perm) -// } -// return nil -//} - -//func getMD5(b []byte) []byte { -// h := md5.New() -// h.Write(b) -// return []byte(fmt.Sprintf("%x", h.Sum(nil))) -//} - -// unused function. keeping it around in commented-fashion -// in the future, we might need some form of this function so that -// we can restart kube-dns, deserialize the tree and have a cache -// without having to wait for kube-dns to reach out to API server. -//func Deserialize(dir string) (*TreeCache, error) { -// b, err := ioutil.ReadFile(path.Join(dir, dataFile)) -// if err != nil { -// return nil, err -// } -// -// hash, err := ioutil.ReadFile(path.Join(dir, crcFile)) -// if err != nil { -// return nil, err -// } -// if !reflect.DeepEqual(hash, getMD5(b)) { -// return nil, fmt.Errorf("Checksum failed") -// } -// -// var cache TreeCache -// err = json.Unmarshal(b, &cache) -// if err != nil { -// return nil, err -// } -// cache.m = &sync.RWMutex{} -// return &cache, nil -//} diff --git a/pkg/dns/treecache/treecache_test.go b/pkg/dns/treecache/treecache_test.go new file mode 100644 index 00000000000..e03cb0b21fe --- /dev/null +++ b/pkg/dns/treecache/treecache_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package treecache + +import ( + "testing" + + "github.com/skynetservices/skydns/msg" +) + +func TestTreeCache(t *testing.T) { + tc := NewTreeCache() + + { + _, ok := tc.GetEntry("key1", "p1", "p2") + if ok { + t.Errorf("key should not exist") + } + } + + checkExists := func(key string, expectedSvc *msg.Service, path ...string) { + svc, ok := tc.GetEntry(key, path...) + if !ok { + t.Fatalf("key %v should exist", key) + } + if svc := svc.(*msg.Service); svc != nil { + if svc != expectedSvc { + t.Errorf("value is not correct (%v != %v)", svc, expectedSvc) + } + } else { + t.Errorf("entry is not of the right type: %T", svc) + } + } + setEntryTC := []struct { + key string + svc *msg.Service + fqdn string + path []string + }{ + {"key1", &msg.Service{}, "key1.p2.p1.", []string{"p1", "p2"}}, + {"key2", &msg.Service{}, "key2.p2.p1.", []string{"p1", "p2"}}, + {"key3", &msg.Service{}, "key3.p2.p1.", []string{"p1", "p3"}}, + } + + for _, testCase := range setEntryTC { + tc.SetEntry(testCase.key, testCase.svc, testCase.fqdn, testCase.path...) + checkExists(testCase.key, testCase.svc, testCase.path...) + } + + wildcardTC := []struct { + path []string + count int + }{ + {[]string{"p1"}, 0}, + {[]string{"p1", "p2"}, 2}, + {[]string{"p1", "p3"}, 1}, + {[]string{"p1", "p2", "key1"}, 1}, + {[]string{"p1", "p2", "key2"}, 1}, + {[]string{"p1", "p2", "key3"}, 0}, + {[]string{"p1", "p3", "key3"}, 1}, + {[]string{"p1", "p2", "*"}, 2}, + {[]string{"p1", "*", "*"}, 3}, + } + + for _, testCase := range wildcardTC { + services := tc.GetValuesForPathWithWildcards(testCase.path...) + if len(services) != testCase.count { + t.Fatalf("Expected %v services for path %v, got %v", + testCase.count, testCase.path, len(services)) + } + } + + // Delete some paths + if !tc.DeletePath("p1", "p2") { + t.Fatal("should delete path p2.p1.") + } + if _, ok := tc.GetEntry("key3", "p1", "p3"); !ok { + t.Error("should not affect p3.p1.") + } + if tc.DeletePath("p1", "p2") { + t.Fatalf("should not be able to delete p2.p1") + } + if !tc.DeletePath("p1", "p3") { + t.Fatalf("should be able to delete p3.p1") + } + if tc.DeletePath("p1", "p3") { + t.Fatalf("should not be able to delete p3.t1") + } + + for _, testCase := range []struct { + k string + p []string + }{ + {"key1", []string{"p1", "p2"}}, + {"key2", []string{"p1", "p2"}}, + {"key3", []string{"p1", "p3"}}, + } { + if _, ok := tc.GetEntry(testCase.k, testCase.p...); ok { + t.Error() + } + } +} + +func TestTreeCacheSetSubCache(t *testing.T) { + tc := NewTreeCache() + + m := &msg.Service{} + + branch := NewTreeCache() + branch.SetEntry("key1", m, "key", "p2") + + tc.SetSubCache("p1", branch, "p0") + + if _, ok := tc.GetEntry("key1", "p0", "p1", "p2"); !ok { + t.Errorf("should be able to get entry p0.p1.p2.key1") + } +} + +func TestTreeCacheSerialize(t *testing.T) { + tc := NewTreeCache() + tc.SetEntry("key1", &msg.Service{}, "key1.p2.p1.", "p1", "p2") + + const expected = `{ + "ChildNodes": { + "p1": { + "ChildNodes": { + "p2": { + "ChildNodes": {}, + "Entries": { + "key1": {} + } + } + }, + "Entries": {} + } + }, + "Entries": {} +}` + + actual, err := tc.Serialize() + if err != nil { + } + + if actual != expected { + t.Errorf("expected %q, got %q", expected, actual) + } +} From d9557d4eaff96b59a59d45e019decfc1d6789007 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Tue, 1 Nov 2016 14:52:26 -0700 Subject: [PATCH 2/2] kube-dns logging cleanup --v=2 is low noise (record changes), can be default --v=3 will shows per request logging Note: due to the code path with which we integrate with skydns, we don't see non-PILLAR_DOMAIN requests, so these will never be logged. --- cmd/kube-dns/BUILD | 2 + cmd/kube-dns/app/BUILD | 1 - cmd/kube-dns/app/server.go | 42 ++++++---- cmd/kube-dns/dns.go | 5 ++ pkg/dns/BUILD | 3 +- pkg/dns/dns.go | 154 +++++++++++++++++++++++-------------- pkg/dns/util/util.go | 3 +- test/test_owners.csv | 1 + 8 files changed, 133 insertions(+), 78 deletions(-) diff --git a/cmd/kube-dns/BUILD b/cmd/kube-dns/BUILD index 3645756f231..37255f1703c 100644 --- a/cmd/kube-dns/BUILD +++ b/cmd/kube-dns/BUILD @@ -20,8 +20,10 @@ go_binary( "//pkg/client/metrics/prometheus:go_default_library", "//pkg/util/flag:go_default_library", "//pkg/util/logs:go_default_library", + "//pkg/version:go_default_library", "//pkg/version/prometheus:go_default_library", "//pkg/version/verflag:go_default_library", + "//vendor:github.com/golang/glog", "//vendor:github.com/spf13/pflag", ], ) diff --git a/cmd/kube-dns/app/BUILD b/cmd/kube-dns/app/BUILD index 66cce8f92ae..3b019704c19 100644 --- a/cmd/kube-dns/app/BUILD +++ b/cmd/kube-dns/app/BUILD @@ -21,7 +21,6 @@ go_library( "//pkg/client/restclient:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/dns:go_default_library", - "//pkg/version:go_default_library", "//vendor:github.com/golang/glog", "//vendor:github.com/skynetservices/skydns/metrics", "//vendor:github.com/skynetservices/skydns/server", diff --git a/cmd/kube-dns/app/server.go b/cmd/kube-dns/app/server.go index a7f470bb2ae..7f17df93497 100644 --- a/cmd/kube-dns/app/server.go +++ b/cmd/kube-dns/app/server.go @@ -34,7 +34,6 @@ import ( "k8s.io/kubernetes/pkg/client/restclient" kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" kdns "k8s.io/kubernetes/pkg/dns" - "k8s.io/kubernetes/pkg/version" ) type KubeDNSServer struct { @@ -47,9 +46,7 @@ type KubeDNSServer struct { } func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { - ks := KubeDNSServer{ - domain: config.ClusterDomain, - } + ks := KubeDNSServer{domain: config.ClusterDomain} kubeClient, err := newKubeClient(config) if err != nil { @@ -93,28 +90,32 @@ func newKubeClient(dnsConfig *options.KubeDNSConfig) (clientset.Interface, error } } - glog.Infof("Using %s for kubernetes master, kubernetes API: %v", config.Host, config.GroupVersion) + glog.V(0).Infof("Using %v for kubernetes master, kubernetes API: %v", + config.Host, config.GroupVersion) return clientset.NewForConfig(config) } func (server *KubeDNSServer) Run() { - glog.Infof("%+v", version.Get()) pflag.VisitAll(func(flag *pflag.Flag) { - glog.Infof("FLAG: --%s=%q", flag.Name, flag.Value) + glog.V(0).Infof("FLAG: --%s=%q", flag.Name, flag.Value) }) setupSignalHandlers() server.startSkyDNSServer() server.kd.Start() - server.setupHealthzHandlers() - glog.Infof("Setting up Healthz Handler(/readiness, /cache) on port :%d", server.healthzPort) + server.setupHandlers() + + glog.V(0).Infof("Status HTTP port %v", server.healthzPort) glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", server.healthzPort), nil)) } // setupHealthzHandlers sets up a readiness and liveness endpoint for kube2sky. -func (server *KubeDNSServer) setupHealthzHandlers() { +func (server *KubeDNSServer) setupHandlers() { + glog.V(0).Infof("Setting up Healthz Handler (/readiness)") http.HandleFunc("/readiness", func(w http.ResponseWriter, req *http.Request) { fmt.Fprintf(w, "ok\n") }) + + glog.V(0).Infof("Setting up cache handler (/cache)") http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) { serializedJSON, err := server.kd.GetCacheAsJSON() if err == nil { @@ -126,25 +127,32 @@ func (server *KubeDNSServer) setupHealthzHandlers() { }) } -// setupSignalHandlers runs a goroutine that waits on SIGINT or SIGTERM and logs it -// program will be terminated by SIGKILL when grace period ends. +// setupSignalHandlers installs signal handler to ignore SIGINT and +// SIGTERM. This daemon will be killed by SIGKILL after the grace +// period to allow for some manner of graceful shutdown. func setupSignalHandlers() { sigChan := make(chan os.Signal) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { - glog.Infof("Received signal: %s, will exit when the grace period ends", <-sigChan) + glog.V(0).Infof("Ignoring signal %v (can only be terminated by SIGKILL)", <-sigChan) }() } func (d *KubeDNSServer) startSkyDNSServer() { - glog.Infof("Starting SkyDNS server. Listening on %s:%d", d.dnsBindAddress, d.dnsPort) - skydnsConfig := &server.Config{Domain: d.domain, DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort)} + glog.V(0).Infof("Starting SkyDNS server (%v:%v)", d.dnsBindAddress, d.dnsPort) + skydnsConfig := &server.Config{ + Domain: d.domain, + DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort), + } server.SetDefaults(skydnsConfig) s := server.New(d.kd, skydnsConfig) if err := metrics.Metrics(); err != nil { - glog.Fatalf("skydns: %s", err) + glog.Fatalf("Skydns metrics error: %s", err) + } else if metrics.Port != "" { + glog.V(0).Infof("Skydns metrics enabled (%v:%v)", metrics.Path, metrics.Port) + } else { + glog.V(0).Infof("Skydns metrics not enabled") } - glog.Infof("skydns: metrics enabled on : %s:%s", metrics.Path, metrics.Port) go s.Run() } diff --git a/cmd/kube-dns/dns.go b/cmd/kube-dns/dns.go index 22dcb08cf51..845edebf329 100644 --- a/cmd/kube-dns/dns.go +++ b/cmd/kube-dns/dns.go @@ -17,12 +17,14 @@ limitations under the License. package main import ( + "github.com/golang/glog" "github.com/spf13/pflag" "k8s.io/kubernetes/cmd/kube-dns/app" "k8s.io/kubernetes/cmd/kube-dns/app/options" _ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration "k8s.io/kubernetes/pkg/util/flag" "k8s.io/kubernetes/pkg/util/logs" + "k8s.io/kubernetes/pkg/version" _ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration "k8s.io/kubernetes/pkg/version/verflag" ) @@ -36,6 +38,9 @@ func main() { defer logs.FlushLogs() verflag.PrintAndExitIfRequested() + + glog.V(0).Infof("version: %+v", version.Get()) + server := app.NewKubeDNSServerDefault(config) server.Run() } diff --git a/pkg/dns/BUILD b/pkg/dns/BUILD index 2985001f0e6..9acaf709e68 100644 --- a/pkg/dns/BUILD +++ b/pkg/dns/BUILD @@ -15,7 +15,6 @@ go_library( srcs = [ "dns.go", "doc.go", - "treecache.go", ], tags = ["automanaged"], deps = [ @@ -24,6 +23,7 @@ go_library( "//pkg/api/unversioned:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/dns/treecache:go_default_library", "//pkg/dns/util:go_default_library", "//pkg/runtime:go_default_library", "//pkg/util/validation:go_default_library", @@ -47,6 +47,7 @@ go_test( "//pkg/api/unversioned:go_default_library", "//pkg/client/cache:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/dns/treecache:go_default_library", "//pkg/dns/util:go_default_library", "//pkg/util/sets:go_default_library", "//vendor:github.com/coreos/etcd/client", diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 1fa5b4e70ac..cc0e53a0689 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" kcache "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/dns/treecache" "k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/validation" @@ -78,7 +79,7 @@ type KubeDNS struct { // stores DNS records for the domain. // A Records and SRV Records for (regular) services and headless Services. // CNAME Records for ExternalName Services. - cache *TreeCache + cache treecache.TreeCache // TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap. reverseRecordMap map[string]*skymsg.Service @@ -125,7 +126,7 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin kd := &KubeDNS{ kubeClient: client, domain: domain, - cache: NewTreeCache(), + cache: treecache.NewTreeCache(), cacheLock: sync.RWMutex{}, nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), reverseRecordMap: make(map[string]*skymsg.Service), @@ -135,34 +136,46 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin } kd.setEndpointsStore() kd.setServicesStore() + return kd, nil } func (kd *KubeDNS) Start() { + glog.V(2).Infof("Starting endpointsController") go kd.endpointsController.Run(wait.NeverStop) + + glog.V(2).Infof("Starting serviceController") go kd.serviceController.Run(wait.NeverStop) - // Wait synchronously for the Kubernetes service and add a DNS record for it. - // This ensures that the Start function returns only after having received Service objects - // from APIServer. - // TODO: we might not have to wait for kubernetes service specifically. We should just wait - // for a list operation to be complete from APIServer. + + // Wait synchronously for the Kubernetes service and add a DNS + // record for it. This ensures that the Start function returns only + // after having received Service objects from APIServer. + // + // TODO: we might not have to wait for kubernetes service + // specifically. We should just wait for a list operation to be + // complete from APIServer. + glog.V(2).Infof("Waiting for Kubernetes service") kd.waitForKubernetesService() } func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) { name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName) - glog.Infof("Waiting for service: %v", name) + glog.V(2).Infof("Waiting for service: %v", name) var err error servicePollInterval := 1 * time.Second + for { svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName) if err != nil || svc == nil { - glog.Infof("Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", name, err, servicePollInterval) + glog.V(3).Infof( + "Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", + name, err, servicePollInterval) time.Sleep(servicePollInterval) continue } break } + return } @@ -230,7 +243,9 @@ func assertIsService(obj interface{}) (*kapi.Service, bool) { func (kd *KubeDNS) newService(obj interface{}) { if service, ok := assertIsService(obj); ok { - glog.V(4).Infof("Add/Updated for service %v", service.Name) + glog.V(2).Infof("New service: %v", service.Name) + glog.V(4).Infof("Service details: %v", service) + // ExternalName services are a special kind that return CNAME records if service.Spec.Type == kapi.ServiceTypeExternalName { kd.newExternalNameService(service) @@ -242,7 +257,8 @@ func (kd *KubeDNS) newService(obj interface{}) { return } if len(service.Spec.Ports) == 0 { - glog.Warningf("Unexpected service with no ports, this should not have happened: %v", service) + glog.Warningf("Service with no ports, this should not have happened: %v", + service) } kd.newPortalService(service) } @@ -253,8 +269,11 @@ func (kd *KubeDNS) removeService(obj interface{}) { subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name) kd.cacheLock.Lock() defer kd.cacheLock.Unlock() - success := kd.cache.deletePath(subCachePath...) - glog.V(2).Infof("Removing service %v at path %v. Success: ", s.Name, subCachePath, success) + + success := kd.cache.DeletePath(subCachePath...) + glog.V(2).Infof("removeService %v at path %v. Success: %v", + s.Name, subCachePath, success) + // ExternalName services have no IP if kapi.IsServiceIPSet(s) { delete(kd.reverseRecordMap, s.Spec.ClusterIP) @@ -268,7 +287,8 @@ func (kd *KubeDNS) updateService(oldObj, newObj interface{}) { if old, ok := assertIsService(oldObj); ok { // Remove old cache path only if changing type to/from ExternalName. // In all other cases, we'll update records in place. - if (new.Spec.Type == kapi.ServiceTypeExternalName) != (old.Spec.Type == kapi.ServiceTypeExternalName) { + if (new.Spec.Type == kapi.ServiceTypeExternalName) != + (old.Spec.Type == kapi.ServiceTypeExternalName) { kd.removeService(oldObj) } kd.newService(newObj) @@ -304,7 +324,8 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er return nil, fmt.Errorf("failed to get service object from services store - %v", err) } if !exists { - glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace) + glog.V(3).Infof("No service for endpoint %q in namespace %q", + e.Name, e.Namespace) return nil, nil } if svc, ok := assertIsService(obj); ok { @@ -321,9 +342,9 @@ func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string { } func (kd *KubeDNS) newPortalService(service *kapi.Service) { - subCache := NewTreeCache() + subCache := treecache.NewTreeCache() recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0) - subCache.setEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel)) + subCache.SetEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel)) // Generate SRV Records for i := range service.Spec.Ports { @@ -332,7 +353,9 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) { srvValue := kd.generateSRVRecordValue(service, int(port.Port)) l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name} - subCache.setEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...) + glog.V(2).Infof("Added SRV record %+v", srvValue) + + subCache.SetEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...) } } subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace) @@ -341,7 +364,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) { kd.cacheLock.Lock() defer kd.cacheLock.Unlock() - kd.cache.setSubCache(service.Name, subCache, subCachePath...) + kd.cache.SetSubCache(service.Name, subCache, subCachePath...) kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord kd.clusterIPServiceMap[service.Spec.ClusterIP] = service } @@ -352,7 +375,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap if err != nil { return err } - subCache := NewTreeCache() + subCache := treecache.NewTreeCache() glog.V(4).Infof("Endpoints Annotations: %v", e.Annotations) for idx := range e.Subsets { for subIdx := range e.Subsets[idx].Addresses { @@ -362,14 +385,15 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap if hostLabel, exists := getHostname(address, podHostnames); exists { endpointName = hostLabel } - subCache.setEntry(endpointName, recordValue, kd.fqdn(svc, endpointName)) + subCache.SetEntry(endpointName, recordValue, kd.fqdn(svc, endpointName)) for portIdx := range e.Subsets[idx].Ports { endpointPort := &e.Subsets[idx].Ports[portIdx] if endpointPort.Name != "" && endpointPort.Protocol != "" { srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName) + glog.V(2).Infof("Added SRV record %+v", srvValue) l := []string{"_" + strings.ToLower(string(endpointPort.Protocol)), "_" + endpointPort.Name} - subCache.setEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...) + subCache.SetEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...) } } } @@ -377,7 +401,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace) kd.cacheLock.Lock() defer kd.cacheLock.Unlock() - kd.cache.setSubCache(svc.Name, subCache, subCachePath...) + kd.cache.SetSubCache(svc.Name, subCache, subCachePath...) return nil } @@ -430,7 +454,8 @@ func (kd *KubeDNS) newHeadlessService(service *kapi.Service) error { return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err) } if !exists { - glog.V(1).Infof("Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.", service.Name, service.Namespace) + glog.V(1).Infof("Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.", + service.Name, service.Namespace) return nil } if e, ok := e.(*kapi.Endpoints); ok { @@ -446,11 +471,12 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) { recordValue, _ := util.GetSkyMsg(service.Spec.ExternalName, 0) cachePath := append(kd.domainPath, serviceSubdomain, service.Namespace) fqdn := kd.fqdn(service) - glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v", service.Name, recordValue, fqdn, cachePath) + glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v", + service.Name, recordValue, fqdn, cachePath) kd.cacheLock.Lock() defer kd.cacheLock.Unlock() // Store the service name directly as the leaf key - kd.cache.setEntry(service.Name, recordValue, fqdn, cachePath...) + kd.cache.SetEntry(service.Name, recordValue, fqdn, cachePath...) } // Records responds with DNS records that match the given name, in a format @@ -458,7 +484,7 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) { // matching the given name is returned, otherwise all records stored under // the subtree matching the name are returned. func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) { - glog.V(2).Infof("Received DNS Request:%s, exact:%v", name, exact) + glog.V(3).Infof("Query for %q, exact: %v", name, exact) trimmed := strings.TrimRight(name, ".") segments := strings.Split(trimmed, ".") @@ -466,14 +492,14 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er federationSegments := []string{} if !exact && kd.isFederationQuery(segments) { - glog.V(2).Infof( - "federation service query: Received federation query. Going to try to find local service first") - // Try quering the non-federation (local) service first. - // Will try the federation one later, if this fails. + glog.V(3).Infof("Received federation query, trying local service first") + // Try querying the non-federation (local) service first. Will try + // the federation one later, if this fails. isFederationQuery = true federationSegments = append(federationSegments, segments...) // To try local service, remove federation name from segments. - // Federation name is 3rd in the segment (after service name and namespace). + // Federation name is 3rd in the segment (after service name and + // namespace). segments = append(segments[:2], segments[3:]...) } @@ -487,9 +513,11 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er if isFederationQuery { return kd.recordsForFederation(records, path, exact, federationSegments) } else if len(records) > 0 { + glog.V(4).Infof("Records for %v: %v", name, records) return records, nil } + glog.V(3).Infof("No record found for %v", name) return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} } @@ -497,17 +525,18 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string, // For federation query, verify that the local service has endpoints. validRecord := false for _, val := range records { - // We know that a headless service has endpoints for sure if a record was returned for it. - // The record contains endpoint IPs. So nothing to check for headless services. + // We know that a headless service has endpoints for sure if a + // record was returned for it. The record contains endpoint + // IPs. So nothing to check for headless services. if !kd.isHeadlessServiceRecord(&val) { ok, err := kd.serviceWithClusterIPHasEndpoints(&val) if err != nil { glog.V(2).Infof( - "federation service query: unexpected error while trying to find if service has endpoint: %v", err) + "Federation: error finding if service has endpoint: %v", err) continue } if !ok { - glog.Infof("federation service query: skipping record since service has no endpoint: %v", val) + glog.V(2).Infof("Federation: skipping record since service has no endpoint: %v", val) continue } } @@ -518,21 +547,24 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string, if validRecord { // There is a local service with valid endpoints, return its CNAME. name := strings.Join(util.ReverseArray(path), ".") - // Ensure that this name that we are returning as a CNAME response is a fully qualified - // domain name so that the client's resolver library doesn't have to go through its - // search list all over again. + // Ensure that this name that we are returning as a CNAME response + // is a fully qualified domain name so that the client's resolver + // library doesn't have to go through its search list all over + // again. if !strings.HasSuffix(name, ".") { name = name + "." } - glog.V(2).Infof("federation service query: Returning CNAME for local service : %s", name) + glog.V(3).Infof( + "Federation: Returning CNAME for local service: %v", name) return []skymsg.Service{{Host: name}}, nil } - // If the name query is not an exact query and does not match any records in the local store, - // attempt to send a federation redirect (CNAME) response. + // If the name query is not an exact query and does not match any + // records in the local store, attempt to send a federation redirect + // (CNAME) response. if !exact { - glog.V(2).Infof( - "federation service query: Did not find a local service. Trying federation redirect (CNAME) response") + glog.V(3).Infof( + "Federation: Did not find a local service. Trying federation redirect (CNAME)") return kd.federationRecords(util.ReverseArray(federationSegments)) } @@ -556,25 +588,26 @@ func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Servic } kd.cacheLock.RLock() defer kd.cacheLock.RUnlock() - if record, ok := kd.cache.getEntry(key, path[:len(path)-1]...); ok { - glog.V(2).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1]) + if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok { + glog.V(3).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1]) return []skymsg.Service{*(record.(*skymsg.Service))}, nil } - glog.V(2).Infof("Exact match for %v not found in cache", path) + + glog.V(3).Infof("Exact match for %v not found in cache", path) return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} } kd.cacheLock.RLock() defer kd.cacheLock.RUnlock() - records := kd.cache.getValuesForPathWithWildcards(path...) - glog.V(2).Infof("Received %d records for %v from cache", len(records), path) + records := kd.cache.GetValuesForPathWithWildcards(path...) + glog.V(3).Infof("Found %d records for %v in the cache", len(records), path) retval := []skymsg.Service{} for _, val := range records { retval = append(retval, *val) } - glog.V(2).Infof("records:%v, retval:%v, path:%v", records, retval, path) + glog.V(4).Infof("getRecordsForPath retval=%+v, path=%v", retval, path) return retval, nil } @@ -619,7 +652,7 @@ func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool, // ReverseRecords performs a reverse lookup for the given name. func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) { - glog.V(2).Infof("Received ReverseRecord Request:%s", name) + glog.V(3).Infof("Query for ReverseRecord %q", name) // if portalIP is not a valid IP, the reverseRecordMap lookup will fail portalIP, ok := util.ExtractIP(name) @@ -676,34 +709,39 @@ func (kd *KubeDNS) getPodIP(path []string) (string, error) { // We can add support for wildcard queries later, if needed. func (kd *KubeDNS) isFederationQuery(path []string) bool { if len(path) != 4+len(kd.domainPath) { - glog.V(2).Infof("not a federation query: len(%q) != 4+len(%q)", path, kd.domainPath) + glog.V(4).Infof("Not a federation query: len(%q) != 4+len(%q)", path, kd.domainPath) return false } if errs := validation.IsDNS1035Label(path[0]); len(errs) != 0 { - glog.V(2).Infof("not a federation query: %q is not an RFC 1035 label: %q", path[0], errs) + glog.V(4).Infof("Not a federation query: %q is not an RFC 1035 label: %q", + path[0], errs) return false } if errs := validation.IsDNS1123Label(path[1]); len(errs) != 0 { - glog.V(2).Infof("not a federation query: %q is not an RFC 1123 label: %q", path[1], errs) + glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q", + path[1], errs) return false } if errs := validation.IsDNS1123Label(path[2]); len(errs) != 0 { - glog.V(2).Infof("not a federation query: %q is not an RFC 1123 label: %q", path[2], errs) + glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q", + path[2], errs) return false } if path[3] != serviceSubdomain { - glog.V(2).Infof("not a federation query: %q != %q (serviceSubdomain)", path[3], serviceSubdomain) + glog.V(4).Infof("Not a federation query: %q != %q (serviceSubdomain)", + path[3], serviceSubdomain) return false } for i, domComp := range kd.domainPath { // kd.domainPath is reversed, so we need to look in the `path` in the reverse order. if domComp != path[len(path)-i-1] { - glog.V(2).Infof("not a federation query: kd.domainPath[%d] != path[%d] (%q != %q)", i, len(path)-i-1, domComp, path[len(path)-i-1]) + glog.V(4).Infof("Not a federation query: kd.domainPath[%d] != path[%d] (%q != %q)", + i, len(path)-i-1, domComp, path[len(path)-i-1]) return false } } if _, ok := kd.federations[path[2]]; !ok { - glog.V(2).Infof("not a federation query: kd.federations[%q] not found", path[2]) + glog.V(4).Infof("Not a federation query: kd.federations[%q] not found", path[2]) return false } return true diff --git a/pkg/dns/util/util.go b/pkg/dns/util/util.go index a8f92b3c720..fc01e6dc4e3 100644 --- a/pkg/dns/util/util.go +++ b/pkg/dns/util/util.go @@ -63,7 +63,8 @@ func ReverseArray(arr []string) []string { func GetSkyMsg(ip string, port int) (*msg.Service, string) { msg := NewServiceRecord(ip, port) hash := HashServiceRecord(msg) - glog.V(2).Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash) + glog.V(5).Infof("Constructed new DNS record: %s, hash:%s", + fmt.Sprintf("%v", msg), hash) return msg, fmt.Sprintf("%x", hash) } diff --git a/test/test_owners.csv b/test/test_owners.csv index dc32285226b..883656bbd7a 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -593,6 +593,7 @@ k8s.io/kubernetes/pkg/credentialprovider,justinsb,1 k8s.io/kubernetes/pkg/credentialprovider/aws,zmerlynn,1 k8s.io/kubernetes/pkg/credentialprovider/gcp,mml,1 k8s.io/kubernetes/pkg/dns,jdef,1 +k8s.io/kubernetes/pkg/dns/treecache,bowei,0 k8s.io/kubernetes/pkg/fieldpath,childsb,1 k8s.io/kubernetes/pkg/fields,jsafrane,1 k8s.io/kubernetes/pkg/genericapiserver,nikhiljindal,0