pr feedback

This commit is contained in:
Abhishek Shah 2016-05-18 10:33:17 -07:00
parent fc040645eb
commit 3ada2170a3
7 changed files with 218 additions and 241 deletions

View File

@ -19,12 +19,9 @@ spec:
version: v12
kubernetes.io/cluster-service: "true"
spec:
{% if grains['cloud'] is defined and grains['cloud'] in [ 'vsphere', 'photon-controller' ] %}
hostNetwork: true
{% endif %}
containers:
- name: kubedns
image: artfulcoder/kubedns-amd64:1.0
image: gcr.io/google_containers/kubedns-amd64:1.0
resources:
# TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -52,8 +52,8 @@ func (m clusterDomainVar) Set(v string) error {
v = strings.TrimSuffix(v, ".")
segments := strings.Split(v, ".")
for _, segment := range segments {
if !validation.IsDNS1123Label(segment) {
return fmt.Errorf("Not a valid DNS label")
if errs := validation.IsDNS1123Label(segment); len(errs) > 0 {
return fmt.Errorf("Not a valid DNS label. %v", errs)
}
}
if !strings.HasSuffix(v, ".") {

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -24,7 +24,6 @@ import (
"syscall"
"github.com/golang/glog"
"github.com/skynetservices/skydns/metrics"
"github.com/skynetservices/skydns/server"
"k8s.io/kubernetes/cmd/kube-dns/app/options"
@ -104,7 +103,13 @@ func (server *KubeDNSServer) setupHealthzHandlers() {
fmt.Fprintf(w, "ok\n")
})
http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprint(w, server.kd.GetCacheAsJSON())
serializedJSON, err := server.kd.GetCacheAsJSON()
if err == nil {
fmt.Fprint(w, serializedJSON)
} else {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err)
}
})
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -23,11 +23,9 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/version/verflag"
"runtime"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
config := options.NewKubeDNSConfig()
config.AddFlags(pflag.CommandLine)

View File

@ -1,5 +1,5 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -19,13 +19,14 @@ package dns
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"hash/fnv"
"net"
"strings"
"sync"
"time"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
skymsg "github.com/skynetservices/skydns/msg"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
@ -47,20 +48,40 @@ const (
podSubdomain = "pod"
// Resync period for the kube controller loop.
resyncPeriod = 30 * time.Minute
resyncPeriod = 5 * time.Minute
)
type KubeDNS struct {
// kubeClient makes calls to API Server and registers calls with API Server
// to get Endpoints and Service objects.
kubeClient *kclient.Client
// DNS domain name.
// The domain for which this DNS Server is authoritative.
domain string
// A cache that contains all the endpoints in the system.
endpointsStore kcache.Store
// A cache that contains all the services in the system.
servicesStore kcache.Store
cache *TreeCache
domainPath []string
eController *kframework.Controller
servicesStore kcache.Store
// stores DNS records for the domain.
// A Records and SRV Records for (regular) services and headless Services.
cache *TreeCache
// caller is responsible for using the cacheLock before invoking methods on cache
// the cache is not thread-safe, and the caller can guarantee thread safety by using
// the cacheLock
cacheLock sync.RWMutex
// The domain for which this DNS Server is authoritative, in array format and reversed.
// e.g. if domain is "cluster.local", domainPath is []string{"local", "cluster"}
domainPath []string
// endpointsController invokes registered callbacks when endpoints change.
endpointsController *kframework.Controller
// serviceController invokes registered callbacks when services change.
serviceController *kframework.Controller
}
@ -69,6 +90,7 @@ func NewKubeDNS(client *kclient.Client, domain string) *KubeDNS {
kubeClient: client,
domain: domain,
cache: NewTreeCache(),
cacheLock: sync.RWMutex{},
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
}
kd.setEndpointsStore()
@ -77,11 +99,14 @@ func NewKubeDNS(client *kclient.Client, domain string) *KubeDNS {
}
func (kd *KubeDNS) Start() {
go kd.eController.Run(wait.NeverStop)
go kd.endpointsController.Run(wait.NeverStop)
go kd.serviceController.Run(wait.NeverStop)
// Wait synchronously for the Kubernetes service and add a DNS record for it.
// TODO (abshah) UNCOMMENT AFTER TEST COMPLETE
//kd.waitForKubernetesService()
// This ensures that the Start function returns only after having received Service objects
// from APIServer.
// TODO: we might not have to wait for kubernetes service specifically. We should just wait
// for a list operation to be complete from APIServer.
kd.waitForKubernetesService()
}
func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
@ -101,9 +126,11 @@ func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
return
}
func (kd *KubeDNS) GetCacheAsJSON() string {
json, _ := kd.cache.Serialize("")
return json
func (kd *KubeDNS) GetCacheAsJSON() (string, error) {
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
json, err := kd.cache.Serialize()
return json, err
}
func (kd *KubeDNS) setServicesStore() {
@ -124,7 +151,7 @@ func (kd *KubeDNS) setServicesStore() {
func (kd *KubeDNS) setEndpointsStore() {
// Returns a cache.ListWatch that gets all changes to endpoints.
endpointsWatch := kcache.NewListWatchFromClient(kd.kubeClient, "endpoints", kapi.NamespaceAll, kselector.Everything())
kd.endpointsStore, kd.eController = kframework.NewInformer(
kd.endpointsStore, kd.endpointsController = kframework.NewInformer(
endpointsWatch,
&kapi.Endpoints{},
resyncPeriod,
@ -138,24 +165,35 @@ func (kd *KubeDNS) setEndpointsStore() {
)
}
func (kd *KubeDNS) newService(obj interface{}) {
func assertIsService(obj interface{}) (*kapi.Service, bool) {
if service, ok := obj.(*kapi.Service); ok {
return service, ok
} else {
glog.Errorf("Type assertion failed! Expected 'Service', got %T", service)
return nil, ok
}
}
func (kd *KubeDNS) newService(obj interface{}) {
if service, ok := assertIsService(obj); ok {
// if ClusterIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) {
kd.newHeadlessService(service)
return
}
if len(service.Spec.Ports) == 0 {
glog.Info("Unexpected service with no ports, this should not have happend: %v", service)
glog.Warning("Unexpected service with no ports, this should not have happend: %v", service)
}
kd.newPortalService(service)
}
}
func (kd *KubeDNS) removeService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
if s, ok := assertIsService(obj); ok {
subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name)
kd.cache.DeletePath(subCachePath...)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.deletePath(subCachePath...)
}
}
@ -194,7 +232,7 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace)
return nil, nil
}
if svc, ok := obj.(*kapi.Service); ok {
if svc, ok := assertIsService(obj); ok {
return svc, nil
}
return nil, fmt.Errorf("got a non service object in services store %v", obj)
@ -203,18 +241,20 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
subCache := NewTreeCache()
recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0)
subCache.SetEntry(recordLabel, recordValue)
subCache.setEntry(recordLabel, recordValue)
// Generate SRV Records
for i := range service.Spec.Ports {
port := &service.Spec.Ports[i]
if port.Name != "" && port.Protocol != "" {
srvValue := kd.generateSRVRecordValue(service, int(port.Port))
subCache.SetEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name)
subCache.setEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name)
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
kd.cache.SetSubCache(service.Name, subCache, subCachePath...)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
}
func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error {
@ -233,18 +273,20 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
if hostLabel, exists := getHostname(address, podHostnames); exists {
endpointName = hostLabel
}
subCache.SetEntry(endpointName, recordValue)
subCache.setEntry(endpointName, recordValue)
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
if endpointPort.Name != "" && endpointPort.Protocol != "" {
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
subCache.SetEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name)
subCache.setEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name)
}
}
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace)
kd.cache.SetSubCache(svc.Name, subCache, subCachePath...)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.setSubCache(svc.Name, subCache, subCachePath...)
return nil
}
@ -252,7 +294,7 @@ func getHostname(address *kapi.EndpointAddress, podHostnames map[string]endpoint
if len(address.Hostname) > 0 {
return address.Hostname, true
}
if hostRecord, exists := podHostnames[address.IP]; exists && validation.IsDNS1123Label(hostRecord.HostName) {
if hostRecord, exists := podHostnames[address.IP]; exists && len(validation.IsDNS1123Label(hostRecord.HostName)) == 0 {
return hostRecord.HostName, true
}
return "", false
@ -272,12 +314,12 @@ func getPodHostnamesFromAnnotation(annotations map[string]string) (map[string]en
return hostnames, nil
}
func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, cNameLabels ...string) *skymsg.Service {
cName := strings.Join([]string{svc.Name, svc.Namespace, serviceSubdomain, kd.domain}, ".")
for _, cNameLabel := range cNameLabels {
cName = cNameLabel + "." + cName
func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, labels ...string) *skymsg.Service {
host := strings.Join([]string{svc.Name, svc.Namespace, serviceSubdomain, kd.domain}, ".")
for _, cNameLabel := range labels {
host = cNameLabel + "." + host
}
recordValue, _ := getSkyMsg(cName, portNumber)
recordValue, _ := getSkyMsg(host, portNumber)
return recordValue
}
@ -312,9 +354,10 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
segments := strings.Split(trimmed, ".")
path := reverseArray(segments)
if kd.isPodRecord(path) {
response, err := kd.getPodRecord(path)
ip, err := kd.getPodIP(path)
if err == nil {
return []skymsg.Service{*response}, nil
skyMsg, _ := getSkyMsg(ip, 0)
return []skymsg.Service{*skyMsg}, nil
}
return nil, err
}
@ -324,15 +367,17 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
if key == "" {
return []skymsg.Service{}, nil
}
if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok {
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
if record, ok := kd.cache.getEntry(key, path[:len(path)-1]...); ok {
return []skymsg.Service{*(record.(*skymsg.Service))}, nil
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
// tmp, _ := kd.cache.Serialize("")
// glog.Infof("Searching path:%q, %v", path, tmp)
records := kd.cache.GetValuesForPathWithRegex(path...)
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...)
retval := []skymsg.Service{}
for _, val := range records {
retval = append(retval, *(val.(*skymsg.Service)))
@ -350,7 +395,7 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
segments := strings.Split(strings.TrimRight(name, "."), ".")
for _, k := range segments {
if k == "*" || k == "any" {
if k == "*" {
return nil, fmt.Errorf("reverse can not contain wildcards")
}
}
@ -374,20 +419,13 @@ func (kd *KubeDNS) isPodRecord(path []string) bool {
return true
}
func (kd *KubeDNS) getPodRecord(path []string) (*skymsg.Service, error) {
func (kd *KubeDNS) getPodIP(path []string) (string, error) {
ipStr := path[len(path)-1]
ip := strings.Replace(ipStr, "-", ".", -1)
if parsed := net.ParseIP(ip); parsed != nil {
msg := &skymsg.Service{
Host: ip,
Port: 0,
Priority: 10,
Weight: 10,
Ttl: 30,
}
return msg, nil
return ip, nil
}
return nil, fmt.Errorf("Invalid IP Address %v", ip)
return "", fmt.Errorf("Invalid IP Address %v", ip)
}
// Returns record in a format that SkyDNS understands.

View File

@ -1,5 +1,5 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,7 +18,9 @@ package dns
import (
"fmt"
"net"
"strings"
"sync"
"testing"
skymsg "github.com/skynetservices/skydns/msg"
@ -26,16 +28,12 @@ import (
"github.com/stretchr/testify/require"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"net"
)
const (
testDomain = "cluster.local."
basePath = "/skydns/local/cluster"
serviceSubDomain = "svc"
podSubDomain = "pod"
testService = "testservice"
testNamespace = "default"
testDomain = "cluster.local."
testService = "testservice"
testNamespace = "default"
)
func newKubeDNS() *KubeDNS {
@ -44,6 +42,7 @@ func newKubeDNS() *KubeDNS {
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
cache: NewTreeCache(),
cacheLock: sync.RWMutex{},
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
}
return kd
@ -53,7 +52,6 @@ func TestPodDns(t *testing.T) {
const (
testPodIP = "1.2.3.4"
sanitizedPodIP = "1-2-3-4"
testPodName = "testPod"
)
kd := newKubeDNS()
@ -240,9 +238,7 @@ func newEndpoints(service *kapi.Service, subsets ...kapi.EndpointSubset) *kapi.E
Subsets: []kapi.EndpointSubset{},
}
for _, subset := range subsets {
endpoints.Subsets = append(endpoints.Subsets, subset)
}
endpoints.Subsets = append(endpoints.Subsets, subsets...)
return &endpoints
}
@ -310,7 +306,7 @@ func assertCNameRecordsMatchEndpointIPs(t *testing.T, kd *KubeDNS, e []kapi.Endp
assert.Equal(t, len(e), len(records), "unexpected record count")
for _, record := range records {
_, found := endpoints[getIPForCName(t, kd, record.Host)]
assert.True(t, found, "Did not endpoint with address:%s", record.Host)
assert.True(t, found, "Did not find endpoint with address:%s", record.Host)
}
}
@ -342,20 +338,18 @@ func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portNam
}
func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
records, err := kd.Records(getServiceFQDN(kd, s), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
serviceFQDN := getServiceFQDN(kd, s)
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
}
func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
serviceFQDN := getServiceFQDN(kd, s)
queries := []string{
serviceFQDN,
strings.Replace(serviceFQDN, ".svc.", ".*.", 1),
strings.Replace(serviceFQDN, s.Namespace, "*", 1),
strings.Replace(strings.Replace(serviceFQDN, s.Namespace, "*", 1), ".svc.", ".*.", 1),
"*." + serviceFQDN,
}
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
require.NoError(t, err)
@ -364,12 +358,22 @@ func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
}
}
func getEquivalentQueries(serviceFQDN, namespace string) []string {
return []string{
serviceFQDN,
strings.Replace(serviceFQDN, ".svc.", ".*.", 1),
strings.Replace(serviceFQDN, namespace, "*", 1),
strings.Replace(strings.Replace(serviceFQDN, namespace, "*", 1), ".svc.", ".*.", 1),
"*." + serviceFQDN,
}
}
func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string {
return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain)
}
func getEndpointsFQDN(kd *KubeDNS, e *kapi.Endpoints) string {
return fmt.Sprintf("%s.%s.svc.%s", e.ObjectMeta.Name, e.ObjectMeta.Namespace, kd.domain)
return fmt.Sprintf("%s.%s.svc.%s", e.Name, e.Namespace, kd.domain)
}
func getSRVFQDN(kd *KubeDNS, s *kapi.Service, portName string) string {

View File

@ -1,5 +1,5 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,136 +18,65 @@ package dns
import (
"bytes"
"crypto/md5"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"strings"
"sync"
)
const (
dataFile = "data.dat"
crcFile = "data.crc"
)
type object interface{}
type TreeCache struct {
ChildNodes map[string]*TreeCache
Entries map[string]interface{}
m *sync.RWMutex
}
func NewTreeCache() *TreeCache {
return &TreeCache{
ChildNodes: make(map[string]*TreeCache),
Entries: make(map[string]interface{}),
m: &sync.RWMutex{},
}
}
func Deserialize(dir string) (*TreeCache, error) {
b, err := ioutil.ReadFile(path.Join(dir, dataFile))
if err != nil {
return nil, err
}
var hash []byte
hash, err = ioutil.ReadFile(path.Join(dir, crcFile))
if err != nil {
return nil, err
}
if !reflect.DeepEqual(hash, getMD5(b)) {
return nil, fmt.Errorf("Checksum failed")
}
var cache TreeCache
err = json.Unmarshal(b, &cache)
if err != nil {
return nil, err
}
cache.m = &sync.RWMutex{}
return &cache, nil
}
func (cache *TreeCache) Serialize(dir string) (string, error) {
cache.m.RLock()
defer cache.m.RUnlock()
func (cache *TreeCache) Serialize() (string, error) {
b, err := json.Marshal(cache)
if err != nil {
return "", err
}
if len(dir) == 0 {
var prettyJSON bytes.Buffer
err = json.Indent(&prettyJSON, b, "", "\t")
var prettyJSON bytes.Buffer
err = json.Indent(&prettyJSON, b, "", "\t")
if err != nil {
return "", err
}
return string(prettyJSON.Bytes()), nil
}
if err := ensureDir(dir, os.FileMode(0755)); err != nil {
if err != nil {
return "", err
}
if err := ioutil.WriteFile(path.Join(dir, dataFile), b, 0644); err != nil {
return "", err
}
if err := ioutil.WriteFile(path.Join(dir, crcFile), getMD5(b), 0644); err != nil {
return "", err
}
return string(b), nil
return string(prettyJSON.Bytes()), nil
}
func (cache *TreeCache) SetEntry(key string, val interface{}, path ...string) {
cache.m.Lock()
defer cache.m.Unlock()
func (cache *TreeCache) setEntry(key string, val interface{}, path ...string) {
node := cache.ensureChildNode(path...)
node.Entries[key] = val
}
func (cache *TreeCache) ReplaceEntries(entries map[string]interface{}, path ...string) {
cache.m.Lock()
defer cache.m.Unlock()
node := cache.ensureChildNode(path...)
node.Entries = make(map[string]interface{})
for key, val := range entries {
node.Entries[key] = val
}
}
func (cache *TreeCache) GetSubCache(path ...string) *TreeCache {
func (cache *TreeCache) getSubCache(path ...string) *TreeCache {
childCache := cache
for _, subpath := range path {
childCache = childCache.ChildNodes[subpath]
if childCache == nil {
return childCache
return nil
}
}
return childCache
}
func (cache *TreeCache) SetSubCache(key string, subCache *TreeCache, path ...string) {
cache.m.Lock()
defer cache.m.Unlock()
func (cache *TreeCache) setSubCache(key string, subCache *TreeCache, path ...string) {
node := cache.ensureChildNode(path...)
node.ChildNodes[key] = subCache
}
func (cache *TreeCache) GetEntry(key string, path ...string) (interface{}, bool) {
cache.m.RLock()
defer cache.m.RUnlock()
childNode := cache.GetSubCache(path...)
func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool) {
childNode := cache.getSubCache(path...)
val, ok := childNode.Entries[key]
return val, ok
}
func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{} {
cache.m.RLock()
defer cache.m.RUnlock()
func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interface{} {
retval := []interface{}{}
nodesToExplore := []*TreeCache{cache}
for idx, subpath := range path {
@ -155,7 +84,7 @@ func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{}
if idx == len(path)-1 {
// if path ends on an entry, instead of a child node, add the entry
for _, node := range nodesToExplore {
if subpath == "*" || subpath == "any" {
if subpath == "*" {
nextNodesToExplore = append(nextNodesToExplore, node)
} else {
if val, ok := node.Entries[subpath]; ok {
@ -172,7 +101,7 @@ func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{}
break
}
if subpath == "*" || subpath == "any" {
if subpath == "*" {
for _, node := range nodesToExplore {
for subkey, subnode := range node.ChildNodes {
if !strings.HasPrefix(subkey, "_") {
@ -200,26 +129,11 @@ func (cache *TreeCache) GetValuesForPathWithRegex(path ...string) []interface{}
return retval
}
func (cache *TreeCache) GetEntries(recursive bool, path ...string) []interface{} {
cache.m.RLock()
defer cache.m.RUnlock()
childNode := cache.GetSubCache(path...)
if childNode == nil {
return nil
}
retval := [][]interface{}{{}}
childNode.appendValues(recursive, retval)
return retval[0]
}
func (cache *TreeCache) DeletePath(path ...string) bool {
func (cache *TreeCache) deletePath(path ...string) bool {
if len(path) == 0 {
return false
}
cache.m.Lock()
defer cache.m.Unlock()
if parentNode := cache.GetSubCache(path[:len(path)-1]...); parentNode != nil {
if parentNode := cache.getSubCache(path[:len(path)-1]...); parentNode != nil {
if _, ok := parentNode.ChildNodes[path[len(path)-1]]; ok {
delete(parentNode.ChildNodes, path[len(path)-1])
return true
@ -228,10 +142,8 @@ func (cache *TreeCache) DeletePath(path ...string) bool {
return false
}
func (tn *TreeCache) DeleteEntry(key string, path ...string) bool {
tn.m.Lock()
defer tn.m.Unlock()
childNode := tn.GetSubCache(path...)
func (cache *TreeCache) deleteEntry(key string, path ...string) bool {
childNode := cache.getSubCache(path...)
if childNode == nil {
return false
}
@ -242,22 +154,22 @@ func (tn *TreeCache) DeleteEntry(key string, path ...string) bool {
return false
}
func (tn *TreeCache) appendValues(recursive bool, ref [][]interface{}) {
for _, value := range tn.Entries {
func (cache *TreeCache) appendValues(recursive bool, ref [][]interface{}) {
for _, value := range cache.Entries {
ref[0] = append(ref[0], value)
}
if recursive {
for _, node := range tn.ChildNodes {
for _, node := range cache.ChildNodes {
node.appendValues(recursive, ref)
}
}
}
func (tn *TreeCache) ensureChildNode(path ...string) *TreeCache {
childNode := tn
func (cache *TreeCache) ensureChildNode(path ...string) *TreeCache {
childNode := cache
for _, subpath := range path {
newNode := childNode.ChildNodes[subpath]
if newNode == nil {
newNode, ok := childNode.ChildNodes[subpath]
if !ok {
newNode = NewTreeCache()
childNode.ChildNodes[subpath] = newNode
}
@ -266,47 +178,70 @@ func (tn *TreeCache) ensureChildNode(path ...string) *TreeCache {
return childNode
}
func ensureDir(path string, perm os.FileMode) error {
s, err := os.Stat(path)
if err != nil || !s.IsDir() {
return os.Mkdir(path, perm)
}
return nil
}
// unused function. keeping it around in commented-fashion
// in the future, we might need some form of this function so that
// we can serialize to a file in a mounted empty dir..
//const (
// dataFile = "data.dat"
// crcFile = "data.crc"
//)
//func (cache *TreeCache) Serialize(dir string) (string, error) {
// cache.m.RLock()
// defer cache.m.RUnlock()
// b, err := json.Marshal(cache)
// if err != nil {
// return "", err
// }
//
// if err := ensureDir(dir, os.FileMode(0755)); err != nil {
// return "", err
// }
// if err := ioutil.WriteFile(path.Join(dir, dataFile), b, 0644); err != nil {
// return "", err
// }
// if err := ioutil.WriteFile(path.Join(dir, crcFile), getMD5(b), 0644); err != nil {
// return "", err
// }
// return string(b), nil
//}
func getMD5(b []byte) []byte {
h := md5.New()
h.Write(b)
return []byte(fmt.Sprintf("%x", h.Sum(nil)))
}
//func ensureDir(path string, perm os.FileMode) error {
// s, err := os.Stat(path)
// if err != nil || !s.IsDir() {
// return os.Mkdir(path, perm)
// }
// return nil
//}
func main() {
root := NewTreeCache()
fmt.Println("Adding Entries")
root.SetEntry("k", "v")
root.SetEntry("foo", "bar", "local")
root.SetEntry("foo1", "bar1", "local", "cluster")
//func getMD5(b []byte) []byte {
// h := md5.New()
// h.Write(b)
// return []byte(fmt.Sprintf("%x", h.Sum(nil)))
//}
fmt.Println("Fetching Entries")
for _, entry := range root.GetEntries(true, "local") {
fmt.Printf("%s\n", entry)
}
fmt.Println("Serializing")
if _, err := root.Serialize("./foo"); err != nil {
fmt.Printf("Serialization Error: %v,\n", err)
return
}
fmt.Println("Deserializing")
tn, err := Deserialize("./foo")
if err != nil {
fmt.Printf("Deserialization Error: %v\n", err)
return
}
fmt.Println("Fetching Entries")
for _, entry := range tn.GetEntries(true, "local") {
fmt.Printf("%s\n", entry)
}
}
// unused function. keeping it around in commented-fashion
// in the future, we might need some form of this function so that
// we can restart kube-dns, deserialize the tree and have a cache
// without having to wait for kube-dns to reach out to API server.
//func Deserialize(dir string) (*TreeCache, error) {
// b, err := ioutil.ReadFile(path.Join(dir, dataFile))
// if err != nil {
// return nil, err
// }
//
// hash, err := ioutil.ReadFile(path.Join(dir, crcFile))
// if err != nil {
// return nil, err
// }
// if !reflect.DeepEqual(hash, getMD5(b)) {
// return nil, fmt.Errorf("Checksum failed")
// }
//
// var cache TreeCache
// err = json.Unmarshal(b, &cache)
// if err != nil {
// return nil, err
// }
// cache.m = &sync.RWMutex{}
// return &cache, nil
//}