mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Merge pull request #7901 from vishh/kube2sky
Updated kube2sky to use the controller framework.
This commit is contained in:
commit
caddcd8cd3
@ -1,10 +1,10 @@
|
|||||||
all: kube2sky
|
all: kube2sky
|
||||||
|
|
||||||
kube2sky: kube2sky.go
|
kube2sky: kube2sky.go
|
||||||
CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go
|
CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go
|
||||||
|
|
||||||
container: kube2sky
|
container: kube2sky
|
||||||
sudo docker build -t gcr.io/google_containers/kube2sky .
|
docker build -t gcr.io/google_containers/kube2sky .
|
||||||
|
|
||||||
push:
|
push:
|
||||||
gcloud preview docker push gcr.io/google_containers/kube2sky
|
gcloud preview docker push gcr.io/google_containers/kube2sky
|
||||||
|
@ -23,41 +23,62 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
|
kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
|
||||||
kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
|
kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
|
||||||
kfields "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||||
klabels "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
kcontrollerFramework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||||
|
kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||||
kwatch "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
|
||||||
etcd "github.com/coreos/go-etcd/etcd"
|
etcd "github.com/coreos/go-etcd/etcd"
|
||||||
|
"github.com/golang/glog"
|
||||||
skymsg "github.com/skynetservices/skydns/msg"
|
skymsg "github.com/skynetservices/skydns/msg"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
domain = flag.String("domain", "kubernetes.local", "domain under which to create names")
|
argDomain = flag.String("domain", "kubernetes.local", "domain under which to create names")
|
||||||
etcd_mutation_timeout = flag.Duration("etcd_mutation_timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration")
|
argEtcdMutationTimeout = flag.Duration("etcd_mutation_timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration")
|
||||||
etcd_server = flag.String("etcd-server", "http://127.0.0.1:4001", "URL to etcd server")
|
argEtcdServer = flag.String("etcd-server", "http://127.0.0.1:4001", "URL to etcd server")
|
||||||
verbose = flag.Bool("verbose", false, "log extra information")
|
argKubecfgFile = flag.String("kubecfg_file", "", "Location of kubecfg file for access to kubernetes service")
|
||||||
kubecfg_file = flag.String("kubecfg_file", "", "Location of kubecfg file for access to kubernetes service")
|
argKubeMasterUrl = flag.String("kube_master_url", "http://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}", "Url to reach kubernetes master. Env variables in this flag will be expanded.")
|
||||||
)
|
)
|
||||||
|
|
||||||
func removeDNS(record string, etcdClient *etcd.Client) error {
|
const (
|
||||||
log.Printf("Removing %s from DNS", record)
|
// Maximum number of retries to connect to etcd server.
|
||||||
_, err := etcdClient.Delete(skymsg.Path(record), true)
|
maxConnectRetries = 12
|
||||||
|
// Resync period for the kube controller loop.
|
||||||
|
resyncPeriod = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type kube2sky struct {
|
||||||
|
// Etcd client.
|
||||||
|
etcdClient *etcd.Client
|
||||||
|
// Kubernetes client.
|
||||||
|
kubeClient *kclient.Client
|
||||||
|
// DNS domain name.
|
||||||
|
domain string
|
||||||
|
// Etcd mutation timeout.
|
||||||
|
etcdMutationTimeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) removeDNS(record string) error {
|
||||||
|
glog.V(2).Infof("Removing %s from DNS", record)
|
||||||
|
_, err := ks.etcdClient.Delete(skymsg.Path(record), true)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error {
|
func (ks *kube2sky) addDNS(record string, service *kapi.Service) error {
|
||||||
// if PortalIP is not set, a DNS entry should not be created
|
// if PortalIP is not set, a DNS entry should not be created
|
||||||
if !kapi.IsServiceIPSet(service) {
|
if !kapi.IsServiceIPSet(service) {
|
||||||
log.Printf("Skipping dns record for headless service: %s\n", service.Name)
|
glog.V(1).Infof("Skipping dns record for headless service: %s\n", service.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,8 +96,8 @@ func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error
|
|||||||
}
|
}
|
||||||
// Set with no TTL, and hope that kubernetes events are accurate.
|
// Set with no TTL, and hope that kubernetes events are accurate.
|
||||||
|
|
||||||
log.Printf("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port)
|
glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port)
|
||||||
_, err = etcdClient.Set(skymsg.Path(record), string(b), uint64(0))
|
_, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -86,16 +107,16 @@ func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error
|
|||||||
|
|
||||||
// Implements retry logic for arbitrary mutator. Crashes after retrying for
|
// Implements retry logic for arbitrary mutator. Crashes after retrying for
|
||||||
// etcd_mutation_timeout.
|
// etcd_mutation_timeout.
|
||||||
func mutateEtcdOrDie(mutator func() error) {
|
func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) {
|
||||||
timeout := time.After(*etcd_mutation_timeout)
|
timeout := time.After(ks.etcdMutationTimeout)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
log.Fatalf("Failed to mutate etcd for %v using mutator: %v", *etcd_mutation_timeout, mutator)
|
glog.Fatalf("Failed to mutate etcd for %v using mutator: %v", ks.etcdMutationTimeout, mutator)
|
||||||
default:
|
default:
|
||||||
if err := mutator(); err != nil {
|
if err := mutator(); err != nil {
|
||||||
delay := 50 * time.Millisecond
|
delay := 50 * time.Millisecond
|
||||||
log.Printf("Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v", mutator, err, delay)
|
glog.V(1).Infof("Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v", mutator, err, delay)
|
||||||
time.Sleep(delay)
|
time.Sleep(delay)
|
||||||
} else {
|
} else {
|
||||||
return
|
return
|
||||||
@ -104,28 +125,33 @@ func mutateEtcdOrDie(mutator func() error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEtcdClient() (client *etcd.Client) {
|
func newEtcdClient(etcdServer string) (*etcd.Client, error) {
|
||||||
maxConnectRetries := 12
|
var (
|
||||||
for maxConnectRetries > 0 {
|
client *etcd.Client
|
||||||
if _, err := tools.GetEtcdVersion(*etcd_server); err != nil {
|
err error
|
||||||
log.Fatalf("Failed to connect to etcd server: %v, error: %v", *etcd_server, err)
|
)
|
||||||
if maxConnectRetries > 0 {
|
retries := maxConnectRetries
|
||||||
log.Println("Retrying request after 5 second sleep.")
|
for retries > 0 {
|
||||||
time.Sleep(5 * time.Second)
|
if _, err = tools.GetEtcdVersion(etcdServer); err == nil {
|
||||||
maxConnectRetries--
|
|
||||||
} else {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf("Etcd server found: %v", *etcd_server)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
if maxConnectRetries == 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
glog.Info("[Attempt: %d] Retrying request after 5 second sleep", retries)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
retries--
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to etcd server: %v, error: %v", etcdServer, err)
|
||||||
|
}
|
||||||
|
glog.Infof("Etcd server found: %v", etcdServer)
|
||||||
|
|
||||||
// loop until we have > 0 machines && machines[0] != ""
|
// loop until we have > 0 machines && machines[0] != ""
|
||||||
poll, timeout := 1*time.Second, 10*time.Second
|
poll, timeout := 1*time.Second, 10*time.Second
|
||||||
if err := wait.Poll(poll, timeout, func() (bool, error) {
|
if err := wait.Poll(poll, timeout, func() (bool, error) {
|
||||||
if client = etcd.NewClient([]string{*etcd_server}); client == nil {
|
if client = etcd.NewClient([]string{etcdServer}); client == nil {
|
||||||
log.Fatal("etcd.NewClient returned nil")
|
return false, fmt.Errorf("etcd.NewClient returned nil")
|
||||||
}
|
}
|
||||||
client.SyncCluster()
|
client.SyncCluster()
|
||||||
machines := client.GetCluster()
|
machines := client.GetCluster()
|
||||||
@ -134,195 +160,105 @@ func newEtcdClient() (client *etcd.Client) {
|
|||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Fatalf("Timed out after %s waiting for at least 1 synchronized etcd server in the cluster", timeout)
|
return nil, fmt.Errorf("Timed out after %s waiting for at least 1 synchronized etcd server in the cluster. Error: %v", timeout, err)
|
||||||
}
|
}
|
||||||
return client
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getKubeMasterUrl() (string, error) {
|
||||||
|
if *argKubeMasterUrl == "" {
|
||||||
|
return "", fmt.Errorf("no --kube_master_url specified")
|
||||||
|
}
|
||||||
|
parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl))
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err)
|
||||||
|
}
|
||||||
|
if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" {
|
||||||
|
return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl)
|
||||||
|
}
|
||||||
|
return parsedUrl.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: evaluate using pkg/client/clientcmd
|
// TODO: evaluate using pkg/client/clientcmd
|
||||||
func newKubeClient() (*kclient.Client, error) {
|
func newKubeClient() (*kclient.Client, error) {
|
||||||
var config *kclient.Config
|
var config *kclient.Config
|
||||||
if *kubecfg_file == "" {
|
masterUrl, err := getKubeMasterUrl()
|
||||||
// No kubecfg file provided. Use kubernetes_ro service.
|
if err != nil {
|
||||||
masterHost := os.Getenv("KUBERNETES_RO_SERVICE_HOST")
|
return nil, err
|
||||||
if masterHost == "" {
|
}
|
||||||
log.Fatalf("KUBERNETES_RO_SERVICE_HOST is not defined")
|
if *argKubecfgFile == "" {
|
||||||
}
|
|
||||||
masterPort := os.Getenv("KUBERNETES_RO_SERVICE_PORT")
|
|
||||||
if masterPort == "" {
|
|
||||||
log.Fatalf("KUBERNETES_RO_SERVICE_PORT is not defined")
|
|
||||||
}
|
|
||||||
config = &kclient.Config{
|
config = &kclient.Config{
|
||||||
Host: fmt.Sprintf("http://%s:%s", masterHost, masterPort),
|
Host: masterUrl,
|
||||||
Version: "v1beta1",
|
Version: "v1beta3",
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
masterHost := os.Getenv("KUBERNETES_SERVICE_HOST")
|
|
||||||
if masterHost == "" {
|
|
||||||
log.Fatalf("KUBERNETES_SERVICE_HOST is not defined")
|
|
||||||
}
|
|
||||||
masterPort := os.Getenv("KUBERNETES_SERVICE_PORT")
|
|
||||||
if masterPort == "" {
|
|
||||||
log.Fatalf("KUBERNETES_SERVICE_PORT is not defined")
|
|
||||||
}
|
|
||||||
master := fmt.Sprintf("https://%s:%s", masterHost, masterPort)
|
|
||||||
var err error
|
var err error
|
||||||
if config, err = kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(
|
if config, err = kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(
|
||||||
&kclientcmd.ClientConfigLoadingRules{ExplicitPath: *kubecfg_file},
|
&kclientcmd.ClientConfigLoadingRules{ExplicitPath: *argKubecfgFile},
|
||||||
&kclientcmd.ConfigOverrides{ClusterInfo: kclientcmdapi.Cluster{Server: master}}).ClientConfig(); err != nil {
|
&kclientcmd.ConfigOverrides{ClusterInfo: kclientcmdapi.Cluster{Server: masterUrl}}).ClientConfig(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("Using %s for kubernetes master", config.Host)
|
glog.Infof("Using %s for kubernetes master", config.Host)
|
||||||
log.Printf("Using kubernetes API %s", config.Version)
|
glog.Infof("Using kubernetes API %s", config.Version)
|
||||||
return kclient.New(config)
|
return kclient.New(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildNameString(service, namespace, domain string) string {
|
func (ks *kube2sky) buildNameString(service, namespace, domain string) string {
|
||||||
return fmt.Sprintf("%s.%s.%s.", service, namespace, domain)
|
return fmt.Sprintf("%s.%s.%s.", service, namespace, domain)
|
||||||
}
|
}
|
||||||
|
|
||||||
func watchOnce(etcdClient *etcd.Client, kubeClient *kclient.Client) {
|
// Returns a cache.ListWatch that gets all changes to services.
|
||||||
// Start the goroutine to produce update events.
|
func (ks *kube2sky) createServiceLW() *cache.ListWatch {
|
||||||
updates := make(chan serviceUpdate)
|
return cache.NewListWatchFromClient(ks.kubeClient, "services", kapi.NamespaceAll, kSelector.Everything())
|
||||||
startWatching(kubeClient.Services(kapi.NamespaceAll), updates)
|
}
|
||||||
|
|
||||||
// This loop will break if the channel closes, which is how the
|
func (ks *kube2sky) newService(obj interface{}) {
|
||||||
// goroutine signals an error.
|
if s, ok := obj.(*kapi.Service); ok {
|
||||||
for ev := range updates {
|
name := ks.buildNameString(s.Name, s.Namespace, ks.domain)
|
||||||
if *verbose {
|
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
||||||
log.Printf("Received update event: %#v", ev)
|
|
||||||
}
|
|
||||||
switch ev.Op {
|
|
||||||
case SetServices, AddService:
|
|
||||||
for i := range ev.Services {
|
|
||||||
s := &ev.Services[i]
|
|
||||||
name := buildNameString(s.Name, s.Namespace, *domain)
|
|
||||||
mutateEtcdOrDie(func() error { return addDNS(name, s, etcdClient) })
|
|
||||||
}
|
|
||||||
case RemoveService:
|
|
||||||
for i := range ev.Services {
|
|
||||||
s := &ev.Services[i]
|
|
||||||
name := buildNameString(s.Name, s.Namespace, *domain)
|
|
||||||
mutateEtcdOrDie(func() error { return removeDNS(name, etcdClient) })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
//TODO: fully resync periodically.
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) removeService(obj interface{}) {
|
||||||
|
if s, ok := obj.(*kapi.Service); ok {
|
||||||
|
name := ks.buildNameString(s.Name, s.Namespace, ks.domain)
|
||||||
|
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) watchForServices() {
|
||||||
|
var serviceController *kcontrollerFramework.Controller
|
||||||
|
_, serviceController = framework.NewInformer(
|
||||||
|
ks.createServiceLW(),
|
||||||
|
&kapi.Service{},
|
||||||
|
resyncPeriod,
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: ks.newService,
|
||||||
|
DeleteFunc: ks.removeService,
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
ks.newService(newObj)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
serviceController.Run(util.NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
var err error
|
||||||
etcdClient := newEtcdClient()
|
// TODO: Validate input flags.
|
||||||
if etcdClient == nil {
|
ks := kube2sky{
|
||||||
log.Fatal("Failed to create etcd client")
|
domain: *argDomain,
|
||||||
|
etcdMutationTimeout: *argEtcdMutationTimeout,
|
||||||
|
}
|
||||||
|
if ks.etcdClient, err = newEtcdClient(*argEtcdServer); err != nil {
|
||||||
|
glog.Fatalf("Failed to create etcd client - %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
kubeClient, err := newKubeClient()
|
if ks.kubeClient, err = newKubeClient(); err != nil {
|
||||||
if err != nil {
|
glog.Fatalf("Failed to create a kubernetes client: %v", err)
|
||||||
log.Fatalf("Failed to create a kubernetes client: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// In case of error, the watch will be aborted. At that point we just
|
ks.watchForServices()
|
||||||
// retry.
|
|
||||||
for {
|
|
||||||
watchOnce(etcdClient, kubeClient)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//FIXME: make the below part of the k8s client lib?
|
|
||||||
|
|
||||||
// servicesWatcher is capable of listing and watching for changes to services
|
|
||||||
// across ALL namespaces
|
|
||||||
type servicesWatcher interface {
|
|
||||||
List(label klabels.Selector) (*kapi.ServiceList, error)
|
|
||||||
Watch(label klabels.Selector, field kfields.Selector, resourceVersion string) (kwatch.Interface, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type operation int
|
|
||||||
|
|
||||||
// These are the available operation types.
|
|
||||||
const (
|
|
||||||
SetServices operation = iota
|
|
||||||
AddService
|
|
||||||
RemoveService
|
|
||||||
)
|
|
||||||
|
|
||||||
// serviceUpdate describes an operation of services, sent on the channel.
|
|
||||||
//
|
|
||||||
// You can add or remove a single service by sending an array of size one with
|
|
||||||
// Op == AddService|RemoveService. For setting the state of the system to a given state, just
|
|
||||||
// set Services as desired and Op to SetServices, which will reset the system
|
|
||||||
// state to that specified in this operation for this source channel. To remove
|
|
||||||
// all services, set Services to empty array and Op to SetServices
|
|
||||||
type serviceUpdate struct {
|
|
||||||
Services []kapi.Service
|
|
||||||
Op operation
|
|
||||||
}
|
|
||||||
|
|
||||||
// startWatching launches a goroutine that watches for changes to services.
|
|
||||||
func startWatching(watcher servicesWatcher, updates chan<- serviceUpdate) {
|
|
||||||
serviceVersion := ""
|
|
||||||
go watchLoop(watcher, updates, &serviceVersion)
|
|
||||||
}
|
|
||||||
|
|
||||||
// watchLoop loops forever looking for changes to services. If an error occurs
|
|
||||||
// it will close the channel and return.
|
|
||||||
func watchLoop(svcWatcher servicesWatcher, updates chan<- serviceUpdate, resourceVersion *string) {
|
|
||||||
defer close(updates)
|
|
||||||
|
|
||||||
if len(*resourceVersion) == 0 {
|
|
||||||
services, err := svcWatcher.List(klabels.Everything())
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to load services: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
*resourceVersion = services.ResourceVersion
|
|
||||||
updates <- serviceUpdate{Op: SetServices, Services: services.Items}
|
|
||||||
}
|
|
||||||
|
|
||||||
watcher, err := svcWatcher.Watch(klabels.Everything(), kfields.Everything(), *resourceVersion)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Failed to watch for service changes: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer watcher.Stop()
|
|
||||||
|
|
||||||
ch := watcher.ResultChan()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
log.Printf("watchLoop channel closed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if event.Type == kwatch.Error {
|
|
||||||
if status, ok := event.Object.(*kapi.Status); ok {
|
|
||||||
log.Printf("Error during watch: %#v", status)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
log.Fatalf("Received unexpected error: %#v", event.Object)
|
|
||||||
}
|
|
||||||
|
|
||||||
if service, ok := event.Object.(*kapi.Service); ok {
|
|
||||||
sendUpdate(updates, event, service, resourceVersion)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendUpdate(updates chan<- serviceUpdate, event kwatch.Event, service *kapi.Service, resourceVersion *string) {
|
|
||||||
*resourceVersion = service.ResourceVersion
|
|
||||||
|
|
||||||
switch event.Type {
|
|
||||||
case kwatch.Added, kwatch.Modified:
|
|
||||||
updates <- serviceUpdate{Op: AddService, Services: []kapi.Service{*service}}
|
|
||||||
case kwatch.Deleted:
|
|
||||||
updates <- serviceUpdate{Op: RemoveService, Services: []kapi.Service{*service}}
|
|
||||||
default:
|
|
||||||
log.Fatalf("Unknown event.Type: %v", event.Type)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user