From c28cdfbd431256a88a4cb117abcf67309fb26393 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Fri, 12 Jun 2015 11:40:34 -0400 Subject: [PATCH 1/4] For kubelet, differentiate between the nodeName and the hostname This will allow us to use a nodeName that is not the hostname, for example on clouds where the hostname is not the natural identifier for a node. --- cmd/kubelet/app/server.go | 12 ++++++--- pkg/kubelet/config/apiserver.go | 4 +-- pkg/kubelet/config/common.go | 14 +++++------ pkg/kubelet/config/file.go | 8 +++--- pkg/kubelet/config/http.go | 8 +++--- pkg/kubelet/kubelet.go | 43 ++++++++++++++++++--------------- 6 files changed, 49 insertions(+), 40 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 241b835e0e6..995a567b11b 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -565,8 +565,10 @@ func SimpleKubelet(client *client.Client, // Eventually, #2 will be replaced with instances of #3 func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error { kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride) + kcfg.NodeName = kcfg.Hostname + eventBroadcaster := record.NewBroadcaster() - kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname}) + kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName}) eventBroadcaster.StartLogging(glog.Infof) if kcfg.KubeClient != nil { glog.V(4).Infof("Sending events to api server.") @@ -625,17 +627,17 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { // define file config source if kc.ConfigFile != "" { glog.Infof("Adding manifest file: %v", kc.ConfigFile) - config.NewSourceFile(kc.ConfigFile, kc.Hostname, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource)) + config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource)) } // define url config source if kc.ManifestURL != "" { glog.Infof("Adding manifest url: %v", kc.ManifestURL) - config.NewSourceURL(kc.ManifestURL, kc.Hostname, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource)) + config.NewSourceURL(kc.ManifestURL, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource)) } if kc.KubeClient != nil { glog.Infof("Watching apiserver") - config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource)) + config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubelet.ApiserverSource)) } return cfg } @@ -656,6 +658,7 @@ type KubeletConfig struct { FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration Hostname string + NodeName string PodInfraContainerImage string SyncFrequency time.Duration RegistryPullQPS float64 @@ -715,6 +718,7 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod pc = makePodSourceConfig(kc) k, err = kubelet.NewMainKubelet( kc.Hostname, + kc.NodeName, kc.DockerClient, kubeClient, kc.RootDirectory, diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index 56794ca3e74..88bb84b48a0 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -26,8 +26,8 @@ import ( ) // NewSourceApiserver creates a config source that watches and pulls from the apiserver. -func NewSourceApiserver(c *client.Client, hostname string, updates chan<- interface{}) { - lw := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, hostname)) +func NewSourceApiserver(c *client.Client, nodeName string, updates chan<- interface{}) { + lw := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, nodeName)) newSourceApiserverFromLW(lw, updates) } diff --git a/pkg/kubelet/config/common.go b/pkg/kubelet/config/common.go index b757bba3aa7..d5a4bd42799 100644 --- a/pkg/kubelet/config/common.go +++ b/pkg/kubelet/config/common.go @@ -33,16 +33,16 @@ import ( "github.com/golang/glog" ) -// Generate a pod name that is unique among nodes by appending the hostname. -func generatePodName(name, hostname string) string { - return fmt.Sprintf("%s-%s", name, hostname) +// Generate a pod name that is unique among nodes by appending the nodeName. +func generatePodName(name, nodeName string) string { + return fmt.Sprintf("%s-%s", name, nodeName) } -func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) error { +func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName string) error { if len(pod.UID) == 0 { hasher := md5.New() if isFile { - fmt.Fprintf(hasher, "host:%s", hostname) + fmt.Fprintf(hasher, "host:%s", nodeName) fmt.Fprintf(hasher, "file:%s", source) } else { fmt.Fprintf(hasher, "url:%s", source) @@ -57,7 +57,7 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) er if len(pod.Name) == 0 { pod.Name = string(pod.UID) } - pod.Name = generatePodName(pod.Name, hostname) + pod.Name = generatePodName(pod.Name, nodeName) glog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source) if pod.Namespace == "" { @@ -66,7 +66,7 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) er glog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source) // Set the Host field to indicate this pod is scheduled on the current node. - pod.Spec.NodeName = hostname + pod.Spec.NodeName = nodeName pod.ObjectMeta.SelfLink = getSelfLink(pod.Name, pod.Namespace) return nil diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 3ef6a17f877..3d73010dd2e 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -34,14 +34,14 @@ import ( type sourceFile struct { path string - hostname string + nodeName string updates chan<- interface{} } -func NewSourceFile(path string, hostname string, period time.Duration, updates chan<- interface{}) { +func NewSourceFile(path string, nodeName string, period time.Duration, updates chan<- interface{}) { config := &sourceFile{ path: path, - hostname: hostname, + nodeName: nodeName, updates: updates, } glog.V(1).Infof("Watching path %q", path) @@ -55,7 +55,7 @@ func (s *sourceFile) run() { } func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error { - return applyDefaults(pod, source, true, s.hostname) + return applyDefaults(pod, source, true, s.nodeName) } func (s *sourceFile) extractFromPath() error { diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index e8d69f85290..de536ae3b74 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -33,15 +33,15 @@ import ( type sourceURL struct { url string - hostname string + nodeName string updates chan<- interface{} data []byte } -func NewSourceURL(url, hostname string, period time.Duration, updates chan<- interface{}) { +func NewSourceURL(url, nodeName string, period time.Duration, updates chan<- interface{}) { config := &sourceURL{ url: url, - hostname: hostname, + nodeName: nodeName, updates: updates, data: nil, } @@ -56,7 +56,7 @@ func (s *sourceURL) run() { } func (s *sourceURL) applyDefaults(pod *api.Pod) error { - return applyDefaults(pod, s.url, false, s.hostname) + return applyDefaults(pod, s.url, false, s.nodeName) } func (s *sourceURL) extractFromURL() error { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6d88c53001a..d1070cc390b 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -114,6 +114,7 @@ func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error // New creates a new Kubelet for use in main func NewMainKubelet( hostname string, + nodeName string, dockerClient dockertools.DockerInterface, kubeClient client.Interface, rootDirectory string, @@ -179,7 +180,7 @@ func NewMainKubelet( if kubeClient != nil { // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather // than an interface. There is no way to construct a list+watcher using resource name. - fieldSelector := fields.Set{client.ObjectNameField: hostname}.AsSelector() + fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector() listWatch := &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { return kubeClient.Nodes().List(labels.Everything(), fieldSelector) @@ -197,8 +198,8 @@ func NewMainKubelet( // TODO: what is namespace for node? nodeRef := &api.ObjectReference{ Kind: "Node", - Name: hostname, - UID: types.UID(hostname), + Name: nodeName, + UID: types.UID(nodeName), Namespace: "", } @@ -224,6 +225,7 @@ func NewMainKubelet( klet := &Kubelet{ hostname: hostname, + nodeName: nodeName, dockerClient: dockerClient, kubeClient: kubeClient, rootDirectory: rootDirectory, @@ -362,6 +364,7 @@ type nodeLister interface { // Kubelet is the main kubelet implementation. type Kubelet struct { hostname string + nodeName string dockerClient dockertools.DockerInterface runtimeCache kubecontainer.RuntimeCache kubeClient client.Interface @@ -637,13 +640,13 @@ func (kl *Kubelet) GetNode() (*api.Node, error) { if err != nil { return nil, errors.New("cannot list nodes") } - host := kl.GetHostname() + nodeName := kl.nodeName for _, n := range l.Items { - if n.Name == host { + if n.Name == nodeName { return &n, nil } } - return nil, fmt.Errorf("node %v not found", host) + return nil, fmt.Errorf("node %v not found", nodeName) } // Starts garbage collection theads. @@ -709,7 +712,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { node := &api.Node{ ObjectMeta: api.ObjectMeta{ - Name: kl.hostname, + Name: kl.nodeName, Labels: map[string]string{"kubernetes.io/hostname": kl.hostname}, }, } @@ -718,18 +721,20 @@ func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { if !ok { return nil, fmt.Errorf("failed to get instances from cloud provider") } + // TODO(roberthbailey): Can we do this without having credentials to talk // to the cloud provider? // TODO: ExternalID is deprecated, we'll have to drop this code - externalID, err := instances.ExternalID(kl.hostname) + externalID, err := instances.ExternalID(kl.nodeName) if err != nil { return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) } node.Spec.ExternalID = externalID + // TODO: We can't assume that the node has credentials to talk to the // cloudprovider from arbitrary nodes. At most, we should talk to a // local metadata server here. - node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.hostname) + node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName) if err != nil { return nil, err } @@ -760,13 +765,13 @@ func (kl *Kubelet) registerWithApiserver() { glog.V(2).Infof("Attempting to register node %s", node.Name) if _, err := kl.kubeClient.Nodes().Create(node); err != nil { if apierrors.IsAlreadyExists(err) { - currentNode, err := kl.kubeClient.Nodes().Get(kl.hostname) + currentNode, err := kl.kubeClient.Nodes().Get(kl.nodeName) if err != nil { - glog.Errorf("error getting node %q: %v", kl.hostname, err) + glog.Errorf("error getting node %q: %v", kl.nodeName, err) continue } if currentNode == nil { - glog.Errorf("no node instance returned for %q", kl.hostname) + glog.Errorf("no node instance returned for %q", kl.nodeName) continue } if currentNode.Spec.ExternalID == node.Spec.ExternalID { @@ -1824,10 +1829,10 @@ func (kl *Kubelet) updateNodeStatus() error { } func (kl *Kubelet) recordNodeStatusEvent(event string) { - glog.V(2).Infof("Recording %s event message for node %s", event, kl.hostname) + glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. - kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.hostname, event) + kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.nodeName, event) } // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() @@ -1844,7 +1849,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { } // TODO(roberthbailey): Can we do this without having credentials to talk // to the cloud provider? - nodeAddresses, err := instances.NodeAddresses(kl.hostname) + nodeAddresses, err := instances.NodeAddresses(kl.nodeName) if err != nil { return fmt.Errorf("failed to get node address from cloud provider: %v", err) } @@ -1898,7 +1903,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. kl.recorder.Eventf(kl.nodeRef, "rebooted", - "Node %s has been rebooted, boot id: %s", kl.hostname, info.BootID) + "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID) } node.Status.NodeInfo.BootID = info.BootID } @@ -1987,12 +1992,12 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 // is set, this function will also confirm that cbr0 is configured correctly. func (kl *Kubelet) tryUpdateNodeStatus() error { - node, err := kl.kubeClient.Nodes().Get(kl.hostname) + node, err := kl.kubeClient.Nodes().Get(kl.nodeName) if err != nil { - return fmt.Errorf("error getting node %q: %v", kl.hostname, err) + return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } if node == nil { - return fmt.Errorf("no node instance returned for %q", kl.hostname) + return fmt.Errorf("no node instance returned for %q", kl.nodeName) } if err := kl.setNodeStatus(node); err != nil { return err From efaead81dc7220e3921ede18358aca5d6c566c19 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Fri, 12 Jun 2015 11:42:38 -0400 Subject: [PATCH 2/4] Allow cloud providers to return a node identifier different from the hostname --- cmd/kubelet/app/server.go | 22 +++++++++++++++++++++- pkg/cloudprovider/aws/aws.go | 4 ++++ pkg/cloudprovider/cloud.go | 3 +++ pkg/cloudprovider/fake/fake.go | 5 +++++ pkg/cloudprovider/gce/gce.go | 5 +++++ pkg/cloudprovider/mesos/mesos.go | 5 +++++ pkg/cloudprovider/openstack/openstack.go | 5 +++++ pkg/cloudprovider/ovirt/ovirt.go | 5 +++++ pkg/cloudprovider/rackspace/rackspace.go | 5 +++++ pkg/cloudprovider/vagrant/vagrant.go | 5 +++++ pkg/kubelet/kubelet.go | 1 + 11 files changed, 64 insertions(+), 1 deletion(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 995a567b11b..4d67bbb1d7c 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -565,7 +565,27 @@ func SimpleKubelet(client *client.Client, // Eventually, #2 will be replaced with instances of #3 func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error { kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride) - kcfg.NodeName = kcfg.Hostname + + if kcfg.NodeName == "" { + // Query the cloud provider for our node name, default to Hostname + nodeName := kcfg.Hostname + if kcfg.Cloud != nil { + var err error + instances, ok := kcfg.Cloud.Instances() + if !ok { + return fmt.Errorf("failed to get instances from cloud provider") + } + + nodeName, err = instances.CurrentNodeName(kcfg.Hostname) + if err != nil { + return fmt.Errorf("error fetching current instance name from cloud provider: %v", err) + } + + glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName) + } + + kcfg.NodeName = nodeName + } eventBroadcaster := record.NewBroadcaster() kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName}) diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 4a899966b42..c6a9d099ebf 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -234,6 +234,10 @@ func (self *AWSCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error return errors.New("unimplemented") } +func (a *AWSCloud) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + // Implementation of EC2.Instances func (self *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*ec2.Instance, error) { // Instances are paged diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index e5d371ffeb3..3e0d50086ab 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -111,6 +111,9 @@ type Instances interface { // AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances // expected format for the key is standard ssh-keygen format: AddSSHKeyToAllInstances(user string, keyData []byte) error + // Returns the name of the node we are currently running on + // On most clouds (e.g. GCE) this is the hostname, so we provide the hostname + CurrentNodeName(hostname string) (string, error) } // Route is a representation of an advanced routing rule. diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index ed81d523899..96f39e3abde 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -149,6 +149,11 @@ func (f *FakeCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { return errors.New("unimplemented") } +// Implementation of Instances.CurrentNodeName +func (f *FakeCloud) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + // NodeAddresses is a test-spy implementation of Instances.NodeAddresses. // It adds an entry "node-addresses" into the internal method call record. func (f *FakeCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) { diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index d7cdb8a8d05..13be7164bb0 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -483,6 +483,11 @@ func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) { return res, nil } +// Implementation of Instances.CurrentNodeName +func (gce *GCECloud) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) { project, err := gce.service.Projects.Get(gce.projectID).Do() diff --git a/pkg/cloudprovider/mesos/mesos.go b/pkg/cloudprovider/mesos/mesos.go index 600c9050ac3..266fd80f989 100644 --- a/pkg/cloudprovider/mesos/mesos.go +++ b/pkg/cloudprovider/mesos/mesos.go @@ -78,6 +78,11 @@ func newMesosCloud(configReader io.Reader) (*MesosCloud, error) { } } +// Implementation of Instances.CurrentNodeName +func (c *MesosCloud) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + func (c *MesosCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { return errors.New("unimplemented") } diff --git a/pkg/cloudprovider/openstack/openstack.go b/pkg/cloudprovider/openstack/openstack.go index c4349ebf09d..f0ed6f0e7d7 100644 --- a/pkg/cloudprovider/openstack/openstack.go +++ b/pkg/cloudprovider/openstack/openstack.go @@ -317,6 +317,11 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro return s, nil } +// Implementation of Instances.CurrentNodeName +func (i *Instances) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error { return errors.New("unimplemented") } diff --git a/pkg/cloudprovider/ovirt/ovirt.go b/pkg/cloudprovider/ovirt/ovirt.go index abcfff64539..6b4682411df 100644 --- a/pkg/cloudprovider/ovirt/ovirt.go +++ b/pkg/cloudprovider/ovirt/ovirt.go @@ -275,6 +275,11 @@ func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, nil } +// Implementation of Instances.CurrentNodeName +func (v *OVirtCloud) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + func (v *OVirtCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { return errors.New("unimplemented") } diff --git a/pkg/cloudprovider/rackspace/rackspace.go b/pkg/cloudprovider/rackspace/rackspace.go index 38f2c25ebaf..e3fc2b4f3b2 100644 --- a/pkg/cloudprovider/rackspace/rackspace.go +++ b/pkg/cloudprovider/rackspace/rackspace.go @@ -380,6 +380,11 @@ func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error { return errors.New("unimplemented") } +// Implementation of Instances.CurrentNodeName +func (i *Instances) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { glog.V(2).Infof("GetNodeResources(%v) called", name) diff --git a/pkg/cloudprovider/vagrant/vagrant.go b/pkg/cloudprovider/vagrant/vagrant.go index e4e89ed453e..ed0bf2c91e9 100644 --- a/pkg/cloudprovider/vagrant/vagrant.go +++ b/pkg/cloudprovider/vagrant/vagrant.go @@ -135,6 +135,11 @@ func (v *VagrantCloud) AddSSHKeyToAllInstances(user string, keyData []byte) erro return errors.New("unimplemented") } +// Implementation of Instances.CurrentNodeName +func (v *VagrantCloud) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + // NodeAddresses returns the NodeAddresses of a particular machine instance. func (v *VagrantCloud) NodeAddresses(instance string) ([]api.NodeAddress, error) { // Due to vagrant not running with a dedicated DNS setup, we return the IP address of a minion as its hostname at this time diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d1070cc390b..7a6b5c40983 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1849,6 +1849,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { } // TODO(roberthbailey): Can we do this without having credentials to talk // to the cloud provider? + // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface nodeAddresses, err := instances.NodeAddresses(kl.nodeName) if err != nil { return fmt.Errorf("failed to get node address from cloud provider: %v", err) From c89b0cd807526dfb690b3e16bb57d62809f0ffbe Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Thu, 11 Jun 2015 20:37:57 -0400 Subject: [PATCH 3/4] AWS: Use the instance id as the node name The EC2 instance id is the canonical node name on EC2. --- pkg/cloudprovider/aws/aws.go | 98 ++++++++++++------------------- pkg/cloudprovider/aws/aws_test.go | 28 ++++----- 2 files changed, 52 insertions(+), 74 deletions(-) diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index c6a9d099ebf..67dad251cd4 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -235,7 +235,11 @@ func (self *AWSCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error } func (a *AWSCloud) CurrentNodeName(hostname string) (string, error) { - return hostname, nil + selfInstance, err := a.getSelfAWSInstance() + if err != nil { + return "", err + } + return selfInstance.awsID, nil } // Implementation of EC2.Instances @@ -551,7 +555,7 @@ func (aws *AWSCloud) Routes() (cloudprovider.Routes, bool) { // NodeAddresses is an implementation of Instances.NodeAddresses. func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) { - instance, err := aws.getInstanceByDnsName(name) + instance, err := aws.getInstanceById(name) if err != nil { return nil, err } @@ -585,7 +589,8 @@ func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) { // ExternalID returns the cloud provider ID of the specified instance (deprecated). func (aws *AWSCloud) ExternalID(name string) (string, error) { - inst, err := aws.getInstanceByDnsName(name) + // TODO: Do we need to verify it exists, or can we just return name + inst, err := aws.getInstanceById(name) if err != nil { return "", err } @@ -594,7 +599,8 @@ func (aws *AWSCloud) ExternalID(name string) (string, error) { // InstanceID returns the cloud provider ID of the specified instance. func (aws *AWSCloud) InstanceID(name string) (string, error) { - inst, err := aws.getInstanceByDnsName(name) + // TODO: Do we need to verify it exists, or can we just construct it knowing our AZ (or via caching?) + inst, err := aws.getInstanceById(name) if err != nil { return "", err } @@ -603,46 +609,6 @@ func (aws *AWSCloud) InstanceID(name string) (string, error) { return "/" + orEmpty(inst.Placement.AvailabilityZone) + "/" + orEmpty(inst.InstanceID), nil } -// Return the instances matching the relevant private dns name. -func (s *AWSCloud) getInstanceByDnsName(name string) (*ec2.Instance, error) { - filters := []*ec2.Filter{ - newEc2Filter("private-dns-name", name), - } - filters = s.addFilters(filters) - request := &ec2.DescribeInstancesInput{ - Filters: filters, - } - - instances, err := s.ec2.DescribeInstances(request) - if err != nil { - return nil, err - } - - matchingInstances := []*ec2.Instance{} - for _, instance := range instances { - // TODO: Push running logic down into filter? - if !isAlive(instance) { - continue - } - - if orEmpty(instance.PrivateDNSName) != name { - // TODO: Should we warn here? - the filter should have caught this - // (this will happen in the tests if they don't fully mock the EC2 API) - continue - } - - matchingInstances = append(matchingInstances, instance) - } - - if len(matchingInstances) == 0 { - return nil, fmt.Errorf("no instances found for host: %s", name) - } - if len(matchingInstances) > 1 { - return nil, fmt.Errorf("multiple instances found for host: %s", name) - } - return matchingInstances[0], nil -} - // Check if the instance is alive (running or pending) // We typically ignore instances that are not alive func isAlive(instance *ec2.Instance) bool { @@ -702,16 +668,9 @@ func (s *AWSCloud) getInstancesByRegex(regex string) ([]string, error) { continue } - privateDNSName := orEmpty(instance.PrivateDNSName) - if privateDNSName == "" { - glog.V(2).Infof("skipping EC2 instance (no PrivateDNSName): %s", - orEmpty(instance.InstanceID)) - continue - } - for _, tag := range instance.Tags { if orEmpty(tag.Key) == "Name" && re.MatchString(orEmpty(tag.Value)) { - matchingInstances = append(matchingInstances, privateDNSName) + matchingInstances = append(matchingInstances, orEmpty(instance.InstanceID)) break } } @@ -728,7 +687,7 @@ func (aws *AWSCloud) List(filter string) ([]string, error) { // GetNodeResources implements Instances.GetNodeResources func (aws *AWSCloud) GetNodeResources(name string) (*api.NodeResources, error) { - instance, err := aws.getInstanceByDnsName(name) + instance, err := aws.getInstanceById(name) if err != nil { return nil, err } @@ -1189,7 +1148,7 @@ func (aws *AWSCloud) getAwsInstance(instanceName string) (*awsInstance, error) { return nil, fmt.Errorf("error getting self-instance: %v", err) } } else { - instance, err := aws.getInstanceByDnsName(instanceName) + instance, err := aws.getInstanceById(instanceName) if err != nil { return nil, fmt.Errorf("error finding instance: %v", err) } @@ -1656,7 +1615,7 @@ func (s *AWSCloud) CreateTCPLoadBalancer(name, region string, publicIP net.IP, p return nil, fmt.Errorf("publicIP cannot be specified for AWS ELB") } - instances, err := s.getInstancesByDnsNames(hosts) + instances, err := s.getInstancesByIds(hosts) if err != nil { return nil, err } @@ -2068,7 +2027,7 @@ func (s *AWSCloud) EnsureTCPLoadBalancerDeleted(name, region string) error { // UpdateTCPLoadBalancer implements TCPLoadBalancer.UpdateTCPLoadBalancer func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error { - instances, err := s.getInstancesByDnsNames(hosts) + instances, err := s.getInstancesByIds(hosts) if err != nil { return err } @@ -2143,21 +2102,40 @@ func (s *AWSCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) er } // TODO: Make efficient -func (a *AWSCloud) getInstancesByDnsNames(names []string) ([]*ec2.Instance, error) { +func (a *AWSCloud) getInstancesByIds(ids []string) ([]*ec2.Instance, error) { instances := []*ec2.Instance{} - for _, name := range names { - instance, err := a.getInstanceByDnsName(name) + for _, id := range ids { + instance, err := a.getInstanceById(id) if err != nil { return nil, err } if instance == nil { - return nil, fmt.Errorf("unable to find instance " + name) + return nil, fmt.Errorf("unable to find instance " + id) } instances = append(instances, instance) } return instances, nil } +// Returns the instance with the specified ID +func (a *AWSCloud) getInstanceById(instanceID string) (*ec2.Instance, error) { + request := &ec2.DescribeInstancesInput{ + InstanceIDs: []*string{&instanceID}, + } + + instances, err := a.ec2.DescribeInstances(request) + if err != nil { + return nil, err + } + if len(instances) == 0 { + return nil, fmt.Errorf("no instances found for instance: %s", instanceID) + } + if len(instances) > 1 { + return nil, fmt.Errorf("multiple instances found for instance: %s", instanceID) + } + return instances[0], nil +} + // Add additional filters, to match on our tags // This lets us run multiple k8s clusters in a single EC2 AZ func (s *AWSCloud) addFilters(filters []*ec2.Filter) []*ec2.Filter { diff --git a/pkg/cloudprovider/aws/aws_test.go b/pkg/cloudprovider/aws/aws_test.go index 469b8e352da..60462a74155 100644 --- a/pkg/cloudprovider/aws/aws_test.go +++ b/pkg/cloudprovider/aws/aws_test.go @@ -412,7 +412,7 @@ func TestList(t *testing.T) { Value: aws.String("foo"), } instance0.Tags = []*ec2.Tag{&tag0} - instance0.PrivateDNSName = aws.String("instance1") + instance0.InstanceID = aws.String("instance0") state0 := ec2.InstanceState{ Name: aws.String("running"), } @@ -424,7 +424,7 @@ func TestList(t *testing.T) { Value: aws.String("bar"), } instance1.Tags = []*ec2.Tag{&tag1} - instance1.PrivateDNSName = aws.String("instance2") + instance1.InstanceID = aws.String("instance1") state1 := ec2.InstanceState{ Name: aws.String("running"), } @@ -436,7 +436,7 @@ func TestList(t *testing.T) { Value: aws.String("baz"), } instance2.Tags = []*ec2.Tag{&tag2} - instance2.PrivateDNSName = aws.String("instance3") + instance2.InstanceID = aws.String("instance2") state2 := ec2.InstanceState{ Name: aws.String("running"), } @@ -448,7 +448,7 @@ func TestList(t *testing.T) { Value: aws.String("quux"), } instance3.Tags = []*ec2.Tag{&tag3} - instance3.PrivateDNSName = aws.String("instance4") + instance3.InstanceID = aws.String("instance3") state3 := ec2.InstanceState{ Name: aws.String("running"), } @@ -462,8 +462,8 @@ func TestList(t *testing.T) { expect []string }{ {"blahonga", []string{}}, - {"quux", []string{"instance4"}}, - {"a", []string{"instance2", "instance3"}}, + {"quux", []string{"instance3"}}, + {"a", []string{"instance1", "instance2"}}, } for _, item := range table { @@ -493,7 +493,7 @@ func TestNodeAddresses(t *testing.T) { var instance1 ec2.Instance //0 - instance0.PrivateDNSName = aws.String("instance1") + instance0.InstanceID = aws.String("instance-same") instance0.PrivateIPAddress = aws.String("192.168.0.1") instance0.PublicIPAddress = aws.String("1.2.3.4") instance0.InstanceType = aws.String("c3.large") @@ -503,7 +503,7 @@ func TestNodeAddresses(t *testing.T) { instance0.State = &state0 //1 - instance1.PrivateDNSName = aws.String("instance1") + instance1.InstanceID = aws.String("instance-same") instance1.PrivateIPAddress = aws.String("192.168.0.2") instance1.InstanceType = aws.String("c3.large") state1 := ec2.InstanceState{ @@ -514,19 +514,19 @@ func TestNodeAddresses(t *testing.T) { instances := []*ec2.Instance{&instance0, &instance1} aws1 := mockInstancesResp([]*ec2.Instance{}) - _, err1 := aws1.NodeAddresses("instance") + _, err1 := aws1.NodeAddresses("instance-mismatch") if err1 == nil { t.Errorf("Should error when no instance found") } aws2 := mockInstancesResp(instances) - _, err2 := aws2.NodeAddresses("instance1") + _, err2 := aws2.NodeAddresses("instance-same") if err2 == nil { t.Errorf("Should error when multiple instances found") } aws3 := mockInstancesResp(instances[0:1]) - addrs3, err3 := aws3.NodeAddresses("instance1") + addrs3, err3 := aws3.NodeAddresses("instance-same") if err3 != nil { t.Errorf("Should not error when instance found") } @@ -562,7 +562,7 @@ func TestGetResources(t *testing.T) { var instance2 ec2.Instance //0 - instance0.PrivateDNSName = aws.String("m3.medium") + instance0.InstanceID = aws.String("m3.medium") instance0.InstanceType = aws.String("m3.medium") state0 := ec2.InstanceState{ Name: aws.String("running"), @@ -570,7 +570,7 @@ func TestGetResources(t *testing.T) { instance0.State = &state0 //1 - instance1.PrivateDNSName = aws.String("r3.8xlarge") + instance1.InstanceID = aws.String("r3.8xlarge") instance1.InstanceType = aws.String("r3.8xlarge") state1 := ec2.InstanceState{ Name: aws.String("running"), @@ -578,7 +578,7 @@ func TestGetResources(t *testing.T) { instance1.State = &state1 //2 - instance2.PrivateDNSName = aws.String("unknown.type") + instance2.InstanceID = aws.String("unknown.type") instance2.InstanceType = aws.String("unknown.type") state2 := ec2.InstanceState{ Name: aws.String("running"), From 77e1bd3f56fea95a128bd3d9fe02fa78e9c6492c Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Tue, 16 Jun 2015 10:32:53 -0400 Subject: [PATCH 4/4] NodeName != HostName: Fixes for contrib/mesos --- contrib/mesos/pkg/executor/service/service.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index ad738fd8a24..dd3b6605b9e 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -259,6 +259,8 @@ func (s *KubeletExecutorServer) Run(hks hyperkube.Interface, _ []string) error { DockerExecHandler: dockerExecHandler, } + kcfg.NodeName = kcfg.Hostname + err = app.RunKubelet(&kcfg, app.KubeletBuilder(func(kc *app.KubeletConfig) (app.KubeletBootstrap, *kconfig.PodConfig, error) { return s.createAndInitKubelet(kc, hks, clientConfig, shutdownCloser) })) @@ -319,6 +321,7 @@ func (ks *KubeletExecutorServer) createAndInitKubelet( klet, err := kubelet.NewMainKubelet( kc.Hostname, + kc.NodeName, kc.DockerClient, kubeClient, kc.RootDirectory,