Merge pull request #36013 from bowei/kubedns-logging

Automatic merge from submit-queue

Kubedns logging

fixes 
https://github.com/kubernetes/kubernetes/issues/29053

may resolve https://github.com/kubernetes/kubernetes/issues/29054, but depends on what the specific ask is
This commit is contained in:
Kubernetes Submit Queue 2016-11-06 03:38:27 -08:00 committed by GitHub
commit 5e8b22fdcb
12 changed files with 370 additions and 188 deletions

View File

@ -20,8 +20,10 @@ go_binary(
"//pkg/client/metrics/prometheus:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/util/logs:go_default_library",
"//pkg/version:go_default_library",
"//pkg/version/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/spf13/pflag",
],
)

View File

@ -21,7 +21,6 @@ go_library(
"//pkg/client/restclient:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library",
"//pkg/dns:go_default_library",
"//pkg/version:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/skynetservices/skydns/metrics",
"//vendor:github.com/skynetservices/skydns/server",

View File

@ -34,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
kdns "k8s.io/kubernetes/pkg/dns"
"k8s.io/kubernetes/pkg/version"
)
type KubeDNSServer struct {
@ -47,9 +46,7 @@ type KubeDNSServer struct {
}
func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
ks := KubeDNSServer{
domain: config.ClusterDomain,
}
ks := KubeDNSServer{domain: config.ClusterDomain}
kubeClient, err := newKubeClient(config)
if err != nil {
@ -93,28 +90,32 @@ func newKubeClient(dnsConfig *options.KubeDNSConfig) (clientset.Interface, error
}
}
glog.Infof("Using %s for kubernetes master, kubernetes API: %v", config.Host, config.GroupVersion)
glog.V(0).Infof("Using %v for kubernetes master, kubernetes API: %v",
config.Host, config.GroupVersion)
return clientset.NewForConfig(config)
}
func (server *KubeDNSServer) Run() {
glog.Infof("%+v", version.Get())
pflag.VisitAll(func(flag *pflag.Flag) {
glog.Infof("FLAG: --%s=%q", flag.Name, flag.Value)
glog.V(0).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
})
setupSignalHandlers()
server.startSkyDNSServer()
server.kd.Start()
server.setupHealthzHandlers()
glog.Infof("Setting up Healthz Handler(/readiness, /cache) on port :%d", server.healthzPort)
server.setupHandlers()
glog.V(0).Infof("Status HTTP port %v", server.healthzPort)
glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", server.healthzPort), nil))
}
// setupHealthzHandlers sets up a readiness and liveness endpoint for kube2sky.
func (server *KubeDNSServer) setupHealthzHandlers() {
func (server *KubeDNSServer) setupHandlers() {
glog.V(0).Infof("Setting up Healthz Handler (/readiness)")
http.HandleFunc("/readiness", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, "ok\n")
})
glog.V(0).Infof("Setting up cache handler (/cache)")
http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) {
serializedJSON, err := server.kd.GetCacheAsJSON()
if err == nil {
@ -126,25 +127,32 @@ func (server *KubeDNSServer) setupHealthzHandlers() {
})
}
// setupSignalHandlers runs a goroutine that waits on SIGINT or SIGTERM and logs it
// program will be terminated by SIGKILL when grace period ends.
// setupSignalHandlers installs signal handler to ignore SIGINT and
// SIGTERM. This daemon will be killed by SIGKILL after the grace
// period to allow for some manner of graceful shutdown.
func setupSignalHandlers() {
sigChan := make(chan os.Signal)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
glog.Infof("Received signal: %s, will exit when the grace period ends", <-sigChan)
glog.V(0).Infof("Ignoring signal %v (can only be terminated by SIGKILL)", <-sigChan)
}()
}
func (d *KubeDNSServer) startSkyDNSServer() {
glog.Infof("Starting SkyDNS server. Listening on %s:%d", d.dnsBindAddress, d.dnsPort)
skydnsConfig := &server.Config{Domain: d.domain, DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort)}
glog.V(0).Infof("Starting SkyDNS server (%v:%v)", d.dnsBindAddress, d.dnsPort)
skydnsConfig := &server.Config{
Domain: d.domain,
DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort),
}
server.SetDefaults(skydnsConfig)
s := server.New(d.kd, skydnsConfig)
if err := metrics.Metrics(); err != nil {
glog.Fatalf("skydns: %s", err)
glog.Fatalf("Skydns metrics error: %s", err)
} else if metrics.Port != "" {
glog.V(0).Infof("Skydns metrics enabled (%v:%v)", metrics.Path, metrics.Port)
} else {
glog.V(0).Infof("Skydns metrics not enabled")
}
glog.Infof("skydns: metrics enabled on : %s:%s", metrics.Path, metrics.Port)
go s.Run()
}

View File

@ -17,12 +17,14 @@ limitations under the License.
package main
import (
"github.com/golang/glog"
"github.com/spf13/pflag"
"k8s.io/kubernetes/cmd/kube-dns/app"
"k8s.io/kubernetes/cmd/kube-dns/app/options"
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/logs"
"k8s.io/kubernetes/pkg/version"
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
"k8s.io/kubernetes/pkg/version/verflag"
)
@ -36,6 +38,9 @@ func main() {
defer logs.FlushLogs()
verflag.PrintAndExitIfRequested()
glog.V(0).Infof("version: %+v", version.Get())
server := app.NewKubeDNSServerDefault(config)
server.Run()
}

View File

@ -15,7 +15,6 @@ go_library(
srcs = [
"dns.go",
"doc.go",
"treecache.go",
],
tags = ["automanaged"],
deps = [
@ -24,6 +23,7 @@ go_library(
"//pkg/api/unversioned:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/dns/treecache:go_default_library",
"//pkg/dns/util:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/validation:go_default_library",
@ -47,6 +47,7 @@ go_test(
"//pkg/api/unversioned:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/dns/treecache:go_default_library",
"//pkg/dns/util:go_default_library",
"//pkg/util/sets:go_default_library",
"//vendor:github.com/coreos/etcd/client",

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
kcache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/dns/treecache"
"k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/validation"
@ -78,7 +79,7 @@ type KubeDNS struct {
// stores DNS records for the domain.
// A Records and SRV Records for (regular) services and headless Services.
// CNAME Records for ExternalName Services.
cache *TreeCache
cache treecache.TreeCache
// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap.
reverseRecordMap map[string]*skymsg.Service
@ -125,7 +126,7 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin
kd := &KubeDNS{
kubeClient: client,
domain: domain,
cache: NewTreeCache(),
cache: treecache.NewTreeCache(),
cacheLock: sync.RWMutex{},
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap: make(map[string]*skymsg.Service),
@ -135,34 +136,46 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin
}
kd.setEndpointsStore()
kd.setServicesStore()
return kd, nil
}
func (kd *KubeDNS) Start() {
glog.V(2).Infof("Starting endpointsController")
go kd.endpointsController.Run(wait.NeverStop)
glog.V(2).Infof("Starting serviceController")
go kd.serviceController.Run(wait.NeverStop)
// Wait synchronously for the Kubernetes service and add a DNS record for it.
// This ensures that the Start function returns only after having received Service objects
// from APIServer.
// TODO: we might not have to wait for kubernetes service specifically. We should just wait
// for a list operation to be complete from APIServer.
// Wait synchronously for the Kubernetes service and add a DNS
// record for it. This ensures that the Start function returns only
// after having received Service objects from APIServer.
//
// TODO: we might not have to wait for kubernetes service
// specifically. We should just wait for a list operation to be
// complete from APIServer.
glog.V(2).Infof("Waiting for Kubernetes service")
kd.waitForKubernetesService()
}
func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName)
glog.Infof("Waiting for service: %v", name)
glog.V(2).Infof("Waiting for service: %v", name)
var err error
servicePollInterval := 1 * time.Second
for {
svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName)
if err != nil || svc == nil {
glog.Infof("Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", name, err, servicePollInterval)
glog.V(3).Infof(
"Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.",
name, err, servicePollInterval)
time.Sleep(servicePollInterval)
continue
}
break
}
return
}
@ -230,7 +243,9 @@ func assertIsService(obj interface{}) (*kapi.Service, bool) {
func (kd *KubeDNS) newService(obj interface{}) {
if service, ok := assertIsService(obj); ok {
glog.V(4).Infof("Add/Updated for service %v", service.Name)
glog.V(2).Infof("New service: %v", service.Name)
glog.V(4).Infof("Service details: %v", service)
// ExternalName services are a special kind that return CNAME records
if service.Spec.Type == kapi.ServiceTypeExternalName {
kd.newExternalNameService(service)
@ -242,7 +257,8 @@ func (kd *KubeDNS) newService(obj interface{}) {
return
}
if len(service.Spec.Ports) == 0 {
glog.Warningf("Unexpected service with no ports, this should not have happened: %v", service)
glog.Warningf("Service with no ports, this should not have happened: %v",
service)
}
kd.newPortalService(service)
}
@ -253,8 +269,11 @@ func (kd *KubeDNS) removeService(obj interface{}) {
subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
success := kd.cache.deletePath(subCachePath...)
glog.V(2).Infof("Removing service %v at path %v. Success: ", s.Name, subCachePath, success)
success := kd.cache.DeletePath(subCachePath...)
glog.V(2).Infof("removeService %v at path %v. Success: %v",
s.Name, subCachePath, success)
// ExternalName services have no IP
if kapi.IsServiceIPSet(s) {
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
@ -268,7 +287,8 @@ func (kd *KubeDNS) updateService(oldObj, newObj interface{}) {
if old, ok := assertIsService(oldObj); ok {
// Remove old cache path only if changing type to/from ExternalName.
// In all other cases, we'll update records in place.
if (new.Spec.Type == kapi.ServiceTypeExternalName) != (old.Spec.Type == kapi.ServiceTypeExternalName) {
if (new.Spec.Type == kapi.ServiceTypeExternalName) !=
(old.Spec.Type == kapi.ServiceTypeExternalName) {
kd.removeService(oldObj)
}
kd.newService(newObj)
@ -304,7 +324,8 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
return nil, fmt.Errorf("failed to get service object from services store - %v", err)
}
if !exists {
glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace)
glog.V(3).Infof("No service for endpoint %q in namespace %q",
e.Name, e.Namespace)
return nil, nil
}
if svc, ok := assertIsService(obj); ok {
@ -321,9 +342,9 @@ func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string {
}
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
subCache := NewTreeCache()
subCache := treecache.NewTreeCache()
recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0)
subCache.setEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
subCache.SetEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
// Generate SRV Records
for i := range service.Spec.Ports {
@ -332,7 +353,9 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
srvValue := kd.generateSRVRecordValue(service, int(port.Port))
l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name}
subCache.setEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
glog.V(2).Infof("Added SRV record %+v", srvValue)
subCache.SetEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
@ -341,7 +364,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
kd.cache.SetSubCache(service.Name, subCache, subCachePath...)
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
}
@ -352,7 +375,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
if err != nil {
return err
}
subCache := NewTreeCache()
subCache := treecache.NewTreeCache()
glog.V(4).Infof("Endpoints Annotations: %v", e.Annotations)
for idx := range e.Subsets {
for subIdx := range e.Subsets[idx].Addresses {
@ -362,14 +385,15 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
if hostLabel, exists := getHostname(address, podHostnames); exists {
endpointName = hostLabel
}
subCache.setEntry(endpointName, recordValue, kd.fqdn(svc, endpointName))
subCache.SetEntry(endpointName, recordValue, kd.fqdn(svc, endpointName))
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
if endpointPort.Name != "" && endpointPort.Protocol != "" {
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
glog.V(2).Infof("Added SRV record %+v", srvValue)
l := []string{"_" + strings.ToLower(string(endpointPort.Protocol)), "_" + endpointPort.Name}
subCache.setEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
subCache.SetEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
}
}
}
@ -377,7 +401,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.setSubCache(svc.Name, subCache, subCachePath...)
kd.cache.SetSubCache(svc.Name, subCache, subCachePath...)
return nil
}
@ -430,7 +454,8 @@ func (kd *KubeDNS) newHeadlessService(service *kapi.Service) error {
return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
}
if !exists {
glog.V(1).Infof("Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.", service.Name, service.Namespace)
glog.V(1).Infof("Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.",
service.Name, service.Namespace)
return nil
}
if e, ok := e.(*kapi.Endpoints); ok {
@ -446,11 +471,12 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) {
recordValue, _ := util.GetSkyMsg(service.Spec.ExternalName, 0)
cachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
fqdn := kd.fqdn(service)
glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v", service.Name, recordValue, fqdn, cachePath)
glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v",
service.Name, recordValue, fqdn, cachePath)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
// Store the service name directly as the leaf key
kd.cache.setEntry(service.Name, recordValue, fqdn, cachePath...)
kd.cache.SetEntry(service.Name, recordValue, fqdn, cachePath...)
}
// Records responds with DNS records that match the given name, in a format
@ -458,7 +484,7 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) {
// matching the given name is returned, otherwise all records stored under
// the subtree matching the name are returned.
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
glog.V(2).Infof("Received DNS Request:%s, exact:%v", name, exact)
glog.V(3).Infof("Query for %q, exact: %v", name, exact)
trimmed := strings.TrimRight(name, ".")
segments := strings.Split(trimmed, ".")
@ -466,14 +492,14 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
federationSegments := []string{}
if !exact && kd.isFederationQuery(segments) {
glog.V(2).Infof(
"federation service query: Received federation query. Going to try to find local service first")
// Try quering the non-federation (local) service first.
// Will try the federation one later, if this fails.
glog.V(3).Infof("Received federation query, trying local service first")
// Try querying the non-federation (local) service first. Will try
// the federation one later, if this fails.
isFederationQuery = true
federationSegments = append(federationSegments, segments...)
// To try local service, remove federation name from segments.
// Federation name is 3rd in the segment (after service name and namespace).
// Federation name is 3rd in the segment (after service name and
// namespace).
segments = append(segments[:2], segments[3:]...)
}
@ -487,9 +513,11 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
if isFederationQuery {
return kd.recordsForFederation(records, path, exact, federationSegments)
} else if len(records) > 0 {
glog.V(4).Infof("Records for %v: %v", name, records)
return records, nil
}
glog.V(3).Infof("No record found for %v", name)
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
@ -497,17 +525,18 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string,
// For federation query, verify that the local service has endpoints.
validRecord := false
for _, val := range records {
// We know that a headless service has endpoints for sure if a record was returned for it.
// The record contains endpoint IPs. So nothing to check for headless services.
// We know that a headless service has endpoints for sure if a
// record was returned for it. The record contains endpoint
// IPs. So nothing to check for headless services.
if !kd.isHeadlessServiceRecord(&val) {
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
if err != nil {
glog.V(2).Infof(
"federation service query: unexpected error while trying to find if service has endpoint: %v", err)
"Federation: error finding if service has endpoint: %v", err)
continue
}
if !ok {
glog.Infof("federation service query: skipping record since service has no endpoint: %v", val)
glog.V(2).Infof("Federation: skipping record since service has no endpoint: %v", val)
continue
}
}
@ -518,21 +547,24 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string,
if validRecord {
// There is a local service with valid endpoints, return its CNAME.
name := strings.Join(util.ReverseArray(path), ".")
// Ensure that this name that we are returning as a CNAME response is a fully qualified
// domain name so that the client's resolver library doesn't have to go through its
// search list all over again.
// Ensure that this name that we are returning as a CNAME response
// is a fully qualified domain name so that the client's resolver
// library doesn't have to go through its search list all over
// again.
if !strings.HasSuffix(name, ".") {
name = name + "."
}
glog.V(2).Infof("federation service query: Returning CNAME for local service : %s", name)
glog.V(3).Infof(
"Federation: Returning CNAME for local service: %v", name)
return []skymsg.Service{{Host: name}}, nil
}
// If the name query is not an exact query and does not match any records in the local store,
// attempt to send a federation redirect (CNAME) response.
// If the name query is not an exact query and does not match any
// records in the local store, attempt to send a federation redirect
// (CNAME) response.
if !exact {
glog.V(2).Infof(
"federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
glog.V(3).Infof(
"Federation: Did not find a local service. Trying federation redirect (CNAME)")
return kd.federationRecords(util.ReverseArray(federationSegments))
}
@ -556,25 +588,26 @@ func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Servic
}
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
if record, ok := kd.cache.getEntry(key, path[:len(path)-1]...); ok {
glog.V(2).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1])
if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok {
glog.V(3).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1])
return []skymsg.Service{*(record.(*skymsg.Service))}, nil
}
glog.V(2).Infof("Exact match for %v not found in cache", path)
glog.V(3).Infof("Exact match for %v not found in cache", path)
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...)
glog.V(2).Infof("Received %d records for %v from cache", len(records), path)
records := kd.cache.GetValuesForPathWithWildcards(path...)
glog.V(3).Infof("Found %d records for %v in the cache", len(records), path)
retval := []skymsg.Service{}
for _, val := range records {
retval = append(retval, *val)
}
glog.V(2).Infof("records:%v, retval:%v, path:%v", records, retval, path)
glog.V(4).Infof("getRecordsForPath retval=%+v, path=%v", retval, path)
return retval, nil
}
@ -619,7 +652,7 @@ func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool,
// ReverseRecords performs a reverse lookup for the given name.
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
glog.V(2).Infof("Received ReverseRecord Request:%s", name)
glog.V(3).Infof("Query for ReverseRecord %q", name)
// if portalIP is not a valid IP, the reverseRecordMap lookup will fail
portalIP, ok := util.ExtractIP(name)
@ -676,34 +709,39 @@ func (kd *KubeDNS) getPodIP(path []string) (string, error) {
// We can add support for wildcard queries later, if needed.
func (kd *KubeDNS) isFederationQuery(path []string) bool {
if len(path) != 4+len(kd.domainPath) {
glog.V(2).Infof("not a federation query: len(%q) != 4+len(%q)", path, kd.domainPath)
glog.V(4).Infof("Not a federation query: len(%q) != 4+len(%q)", path, kd.domainPath)
return false
}
if errs := validation.IsDNS1035Label(path[0]); len(errs) != 0 {
glog.V(2).Infof("not a federation query: %q is not an RFC 1035 label: %q", path[0], errs)
glog.V(4).Infof("Not a federation query: %q is not an RFC 1035 label: %q",
path[0], errs)
return false
}
if errs := validation.IsDNS1123Label(path[1]); len(errs) != 0 {
glog.V(2).Infof("not a federation query: %q is not an RFC 1123 label: %q", path[1], errs)
glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q",
path[1], errs)
return false
}
if errs := validation.IsDNS1123Label(path[2]); len(errs) != 0 {
glog.V(2).Infof("not a federation query: %q is not an RFC 1123 label: %q", path[2], errs)
glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q",
path[2], errs)
return false
}
if path[3] != serviceSubdomain {
glog.V(2).Infof("not a federation query: %q != %q (serviceSubdomain)", path[3], serviceSubdomain)
glog.V(4).Infof("Not a federation query: %q != %q (serviceSubdomain)",
path[3], serviceSubdomain)
return false
}
for i, domComp := range kd.domainPath {
// kd.domainPath is reversed, so we need to look in the `path` in the reverse order.
if domComp != path[len(path)-i-1] {
glog.V(2).Infof("not a federation query: kd.domainPath[%d] != path[%d] (%q != %q)", i, len(path)-i-1, domComp, path[len(path)-i-1])
glog.V(4).Infof("Not a federation query: kd.domainPath[%d] != path[%d] (%q != %q)",
i, len(path)-i-1, domComp, path[len(path)-i-1])
return false
}
}
if _, ok := kd.federations[path[2]]; !ok {
glog.V(2).Infof("not a federation query: kd.federations[%q] not found", path[2])
glog.V(4).Infof("Not a federation query: kd.federations[%q] not found", path[2])
return false
}
return true

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/dns/treecache"
"k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -51,7 +52,7 @@ func newKubeDNS() *KubeDNS {
domain: testDomain,
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
cache: NewTreeCache(),
cache: treecache.NewTreeCache(),
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service),
cacheLock: sync.RWMutex{},

26
pkg/dns/treecache/BUILD Normal file
View File

@ -0,0 +1,26 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["treecache.go"],
tags = ["automanaged"],
deps = ["//vendor:github.com/skynetservices/skydns/msg"],
)
go_test(
name = "go_default_test",
srcs = ["treecache_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = ["//vendor:github.com/skynetservices/skydns/msg"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package dns
package treecache
import (
"encoding/json"
@ -23,19 +23,48 @@ import (
skymsg "github.com/skynetservices/skydns/msg"
)
type TreeCache struct {
ChildNodes map[string]*TreeCache
type TreeCache interface {
// GetEntry with the given key for the given path.
GetEntry(key string, path ...string) (interface{}, bool)
// Get a list of values including wildcards labels (e.g. "*").
GetValuesForPathWithWildcards(path ...string) []*skymsg.Service
// SetEntry creates the entire path if it doesn't already exist in
// the cache, then sets the given service record under the given
// key. The path this entry would have occupied in an etcd datastore
// is computed from the given fqdn and stored as the "Key" of the
// skydns service; this is only required because skydns expects the
// service record to contain a key in a specific format (presumably
// for legacy compatibility). Note that the fqnd string typically
// contains both the key and all elements in the path.
SetEntry(key string, val *skymsg.Service, fqdn string, path ...string)
// SetSubCache inserts the given subtree under the given
// path:key. Usually the key is the name of a Kubernetes Service,
// and the path maps to the cluster subdomains matching the Service.
SetSubCache(key string, subCache TreeCache, path ...string)
// DeletePath removes all entries associated with a given path.
DeletePath(path ...string) bool
// Serialize dumps a JSON representation of the cache.
Serialize() (string, error)
}
type treeCache struct {
ChildNodes map[string]*treeCache
Entries map[string]interface{}
}
func NewTreeCache() *TreeCache {
return &TreeCache{
ChildNodes: make(map[string]*TreeCache),
func NewTreeCache() TreeCache {
return &treeCache{
ChildNodes: make(map[string]*treeCache),
Entries: make(map[string]interface{}),
}
}
func (cache *TreeCache) Serialize() (string, error) {
func (cache *treeCache) Serialize() (string, error) {
prettyJSON, err := json.MarshalIndent(cache, "", "\t")
if err != nil {
return "", err
@ -43,14 +72,7 @@ func (cache *TreeCache) Serialize() (string, error) {
return string(prettyJSON), nil
}
// setEntry creates the entire path if it doesn't already exist in the cache,
// then sets the given service record under the given key. The path this entry
// would have occupied in an etcd datastore is computed from the given fqdn and
// stored as the "Key" of the skydns service; this is only required because
// skydns expects the service record to contain a key in a specific format
// (presumably for legacy compatibility). Note that the fqnd string typically
// contains both the key and all elements in the path.
func (cache *TreeCache) setEntry(key string, val *skymsg.Service, fqdn string, path ...string) {
func (cache *treeCache) SetEntry(key string, val *skymsg.Service, fqdn string, path ...string) {
// TODO: Consolidate setEntry and setSubCache into a single method with a
// type switch.
// TODO: Instead of passing the fqdn as an argument, we can reconstruct
@ -70,7 +92,7 @@ func (cache *TreeCache) setEntry(key string, val *skymsg.Service, fqdn string, p
node.Entries[key] = val
}
func (cache *TreeCache) getSubCache(path ...string) *TreeCache {
func (cache *treeCache) getSubCache(path ...string) *treeCache {
childCache := cache
for _, subpath := range path {
childCache = childCache.ChildNodes[subpath]
@ -81,15 +103,12 @@ func (cache *TreeCache) getSubCache(path ...string) *TreeCache {
return childCache
}
// setSubCache inserts the given subtree under the given path:key. Usually the
// key is the name of a Kubernetes Service, and the path maps to the cluster
// subdomains matching the Service.
func (cache *TreeCache) setSubCache(key string, subCache *TreeCache, path ...string) {
func (cache *treeCache) SetSubCache(key string, subCache TreeCache, path ...string) {
node := cache.ensureChildNode(path...)
node.ChildNodes[key] = subCache
node.ChildNodes[key] = subCache.(*treeCache)
}
func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool) {
func (cache *treeCache) GetEntry(key string, path ...string) (interface{}, bool) {
childNode := cache.getSubCache(path...)
if childNode == nil {
return nil, false
@ -98,11 +117,11 @@ func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool)
return val, ok
}
func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []*skymsg.Service {
func (cache *treeCache) GetValuesForPathWithWildcards(path ...string) []*skymsg.Service {
retval := []*skymsg.Service{}
nodesToExplore := []*TreeCache{cache}
nodesToExplore := []*treeCache{cache}
for idx, subpath := range path {
nextNodesToExplore := []*TreeCache{}
nextNodesToExplore := []*treeCache{}
if idx == len(path)-1 {
// if path ends on an entry, instead of a child node, add the entry
for _, node := range nodesToExplore {
@ -150,7 +169,7 @@ func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []*skymsg.
return retval
}
func (cache *TreeCache) deletePath(path ...string) bool {
func (cache *treeCache) DeletePath(path ...string) bool {
if len(path) == 0 {
return false
}
@ -169,19 +188,7 @@ func (cache *TreeCache) deletePath(path ...string) bool {
return false
}
func (cache *TreeCache) deleteEntry(key string, path ...string) bool {
childNode := cache.getSubCache(path...)
if childNode == nil {
return false
}
if _, ok := childNode.Entries[key]; ok {
delete(childNode.Entries, key)
return true
}
return false
}
func (cache *TreeCache) appendValues(recursive bool, ref [][]interface{}) {
func (cache *treeCache) appendValues(recursive bool, ref [][]interface{}) {
for _, value := range cache.Entries {
ref[0] = append(ref[0], value)
}
@ -192,83 +199,15 @@ func (cache *TreeCache) appendValues(recursive bool, ref [][]interface{}) {
}
}
func (cache *TreeCache) ensureChildNode(path ...string) *TreeCache {
func (cache *treeCache) ensureChildNode(path ...string) *treeCache {
childNode := cache
for _, subpath := range path {
newNode, ok := childNode.ChildNodes[subpath]
if !ok {
newNode = NewTreeCache()
newNode = NewTreeCache().(*treeCache)
childNode.ChildNodes[subpath] = newNode
}
childNode = newNode
}
return childNode
}
// unused function. keeping it around in commented-fashion
// in the future, we might need some form of this function so that
// we can serialize to a file in a mounted empty dir..
//const (
// dataFile = "data.dat"
// crcFile = "data.crc"
//)
//func (cache *TreeCache) Serialize(dir string) (string, error) {
// cache.m.RLock()
// defer cache.m.RUnlock()
// b, err := json.Marshal(cache)
// if err != nil {
// return "", err
// }
//
// if err := ensureDir(dir, os.FileMode(0755)); err != nil {
// return "", err
// }
// if err := ioutil.WriteFile(path.Join(dir, dataFile), b, 0644); err != nil {
// return "", err
// }
// if err := ioutil.WriteFile(path.Join(dir, crcFile), getMD5(b), 0644); err != nil {
// return "", err
// }
// return string(b), nil
//}
//func ensureDir(path string, perm os.FileMode) error {
// s, err := os.Stat(path)
// if err != nil || !s.IsDir() {
// return os.Mkdir(path, perm)
// }
// return nil
//}
//func getMD5(b []byte) []byte {
// h := md5.New()
// h.Write(b)
// return []byte(fmt.Sprintf("%x", h.Sum(nil)))
//}
// unused function. keeping it around in commented-fashion
// in the future, we might need some form of this function so that
// we can restart kube-dns, deserialize the tree and have a cache
// without having to wait for kube-dns to reach out to API server.
//func Deserialize(dir string) (*TreeCache, error) {
// b, err := ioutil.ReadFile(path.Join(dir, dataFile))
// if err != nil {
// return nil, err
// }
//
// hash, err := ioutil.ReadFile(path.Join(dir, crcFile))
// if err != nil {
// return nil, err
// }
// if !reflect.DeepEqual(hash, getMD5(b)) {
// return nil, fmt.Errorf("Checksum failed")
// }
//
// var cache TreeCache
// err = json.Unmarshal(b, &cache)
// if err != nil {
// return nil, err
// }
// cache.m = &sync.RWMutex{}
// return &cache, nil
//}

View File

@ -0,0 +1,161 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package treecache
import (
"testing"
"github.com/skynetservices/skydns/msg"
)
func TestTreeCache(t *testing.T) {
tc := NewTreeCache()
{
_, ok := tc.GetEntry("key1", "p1", "p2")
if ok {
t.Errorf("key should not exist")
}
}
checkExists := func(key string, expectedSvc *msg.Service, path ...string) {
svc, ok := tc.GetEntry(key, path...)
if !ok {
t.Fatalf("key %v should exist", key)
}
if svc := svc.(*msg.Service); svc != nil {
if svc != expectedSvc {
t.Errorf("value is not correct (%v != %v)", svc, expectedSvc)
}
} else {
t.Errorf("entry is not of the right type: %T", svc)
}
}
setEntryTC := []struct {
key string
svc *msg.Service
fqdn string
path []string
}{
{"key1", &msg.Service{}, "key1.p2.p1.", []string{"p1", "p2"}},
{"key2", &msg.Service{}, "key2.p2.p1.", []string{"p1", "p2"}},
{"key3", &msg.Service{}, "key3.p2.p1.", []string{"p1", "p3"}},
}
for _, testCase := range setEntryTC {
tc.SetEntry(testCase.key, testCase.svc, testCase.fqdn, testCase.path...)
checkExists(testCase.key, testCase.svc, testCase.path...)
}
wildcardTC := []struct {
path []string
count int
}{
{[]string{"p1"}, 0},
{[]string{"p1", "p2"}, 2},
{[]string{"p1", "p3"}, 1},
{[]string{"p1", "p2", "key1"}, 1},
{[]string{"p1", "p2", "key2"}, 1},
{[]string{"p1", "p2", "key3"}, 0},
{[]string{"p1", "p3", "key3"}, 1},
{[]string{"p1", "p2", "*"}, 2},
{[]string{"p1", "*", "*"}, 3},
}
for _, testCase := range wildcardTC {
services := tc.GetValuesForPathWithWildcards(testCase.path...)
if len(services) != testCase.count {
t.Fatalf("Expected %v services for path %v, got %v",
testCase.count, testCase.path, len(services))
}
}
// Delete some paths
if !tc.DeletePath("p1", "p2") {
t.Fatal("should delete path p2.p1.")
}
if _, ok := tc.GetEntry("key3", "p1", "p3"); !ok {
t.Error("should not affect p3.p1.")
}
if tc.DeletePath("p1", "p2") {
t.Fatalf("should not be able to delete p2.p1")
}
if !tc.DeletePath("p1", "p3") {
t.Fatalf("should be able to delete p3.p1")
}
if tc.DeletePath("p1", "p3") {
t.Fatalf("should not be able to delete p3.t1")
}
for _, testCase := range []struct {
k string
p []string
}{
{"key1", []string{"p1", "p2"}},
{"key2", []string{"p1", "p2"}},
{"key3", []string{"p1", "p3"}},
} {
if _, ok := tc.GetEntry(testCase.k, testCase.p...); ok {
t.Error()
}
}
}
func TestTreeCacheSetSubCache(t *testing.T) {
tc := NewTreeCache()
m := &msg.Service{}
branch := NewTreeCache()
branch.SetEntry("key1", m, "key", "p2")
tc.SetSubCache("p1", branch, "p0")
if _, ok := tc.GetEntry("key1", "p0", "p1", "p2"); !ok {
t.Errorf("should be able to get entry p0.p1.p2.key1")
}
}
func TestTreeCacheSerialize(t *testing.T) {
tc := NewTreeCache()
tc.SetEntry("key1", &msg.Service{}, "key1.p2.p1.", "p1", "p2")
const expected = `{
"ChildNodes": {
"p1": {
"ChildNodes": {
"p2": {
"ChildNodes": {},
"Entries": {
"key1": {}
}
}
},
"Entries": {}
}
},
"Entries": {}
}`
actual, err := tc.Serialize()
if err != nil {
}
if actual != expected {
t.Errorf("expected %q, got %q", expected, actual)
}
}

View File

@ -63,7 +63,8 @@ func ReverseArray(arr []string) []string {
func GetSkyMsg(ip string, port int) (*msg.Service, string) {
msg := NewServiceRecord(ip, port)
hash := HashServiceRecord(msg)
glog.V(2).Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash)
glog.V(5).Infof("Constructed new DNS record: %s, hash:%s",
fmt.Sprintf("%v", msg), hash)
return msg, fmt.Sprintf("%x", hash)
}

View File

@ -598,6 +598,7 @@ k8s.io/kubernetes/pkg/credentialprovider,justinsb,1
k8s.io/kubernetes/pkg/credentialprovider/aws,zmerlynn,1
k8s.io/kubernetes/pkg/credentialprovider/gcp,mml,1
k8s.io/kubernetes/pkg/dns,jdef,1
k8s.io/kubernetes/pkg/dns/treecache,bowei,0
k8s.io/kubernetes/pkg/fieldpath,childsb,1
k8s.io/kubernetes/pkg/fields,jsafrane,1
k8s.io/kubernetes/pkg/genericapiserver,nikhiljindal,0

1 name owner auto-assigned
598 k8s.io/kubernetes/pkg/credentialprovider/aws zmerlynn 1
599 k8s.io/kubernetes/pkg/credentialprovider/gcp mml 1
600 k8s.io/kubernetes/pkg/dns jdef 1
601 k8s.io/kubernetes/pkg/dns/treecache bowei 0
602 k8s.io/kubernetes/pkg/fieldpath childsb 1
603 k8s.io/kubernetes/pkg/fields jsafrane 1
604 k8s.io/kubernetes/pkg/genericapiserver nikhiljindal 0