For kubelet, differentiate between the nodeName and the hostname

This will allow us to use a nodeName that is not the hostname,
for example on clouds where the hostname is not the natural identifier
for a node.
This commit is contained in:
Justin Santa Barbara 2015-06-12 11:40:34 -04:00
parent 43889c612c
commit c28cdfbd43
6 changed files with 49 additions and 40 deletions

View File

@ -565,8 +565,10 @@ func SimpleKubelet(client *client.Client,
// Eventually, #2 will be replaced with instances of #3 // Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error { func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride) kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride)
kcfg.NodeName = kcfg.Hostname
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.Hostname}) kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName})
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
if kcfg.KubeClient != nil { if kcfg.KubeClient != nil {
glog.V(4).Infof("Sending events to api server.") glog.V(4).Infof("Sending events to api server.")
@ -625,17 +627,17 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// define file config source // define file config source
if kc.ConfigFile != "" { if kc.ConfigFile != "" {
glog.Infof("Adding manifest file: %v", kc.ConfigFile) glog.Infof("Adding manifest file: %v", kc.ConfigFile)
config.NewSourceFile(kc.ConfigFile, kc.Hostname, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource)) config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
} }
// define url config source // define url config source
if kc.ManifestURL != "" { if kc.ManifestURL != "" {
glog.Infof("Adding manifest url: %v", kc.ManifestURL) glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.Hostname, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource)) config.NewSourceURL(kc.ManifestURL, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
} }
if kc.KubeClient != nil { if kc.KubeClient != nil {
glog.Infof("Watching apiserver") glog.Infof("Watching apiserver")
config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource)) config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubelet.ApiserverSource))
} }
return cfg return cfg
} }
@ -656,6 +658,7 @@ type KubeletConfig struct {
FileCheckFrequency time.Duration FileCheckFrequency time.Duration
HTTPCheckFrequency time.Duration HTTPCheckFrequency time.Duration
Hostname string Hostname string
NodeName string
PodInfraContainerImage string PodInfraContainerImage string
SyncFrequency time.Duration SyncFrequency time.Duration
RegistryPullQPS float64 RegistryPullQPS float64
@ -715,6 +718,7 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
pc = makePodSourceConfig(kc) pc = makePodSourceConfig(kc)
k, err = kubelet.NewMainKubelet( k, err = kubelet.NewMainKubelet(
kc.Hostname, kc.Hostname,
kc.NodeName,
kc.DockerClient, kc.DockerClient,
kubeClient, kubeClient,
kc.RootDirectory, kc.RootDirectory,

View File

@ -26,8 +26,8 @@ import (
) )
// NewSourceApiserver creates a config source that watches and pulls from the apiserver. // NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c *client.Client, hostname string, updates chan<- interface{}) { func NewSourceApiserver(c *client.Client, nodeName string, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, hostname)) lw := cache.NewListWatchFromClient(c, "pods", api.NamespaceAll, fields.OneTermEqualSelector(client.PodHost, nodeName))
newSourceApiserverFromLW(lw, updates) newSourceApiserverFromLW(lw, updates)
} }

View File

@ -33,16 +33,16 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// Generate a pod name that is unique among nodes by appending the hostname. // Generate a pod name that is unique among nodes by appending the nodeName.
func generatePodName(name, hostname string) string { func generatePodName(name, nodeName string) string {
return fmt.Sprintf("%s-%s", name, hostname) return fmt.Sprintf("%s-%s", name, nodeName)
} }
func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) error { func applyDefaults(pod *api.Pod, source string, isFile bool, nodeName string) error {
if len(pod.UID) == 0 { if len(pod.UID) == 0 {
hasher := md5.New() hasher := md5.New()
if isFile { if isFile {
fmt.Fprintf(hasher, "host:%s", hostname) fmt.Fprintf(hasher, "host:%s", nodeName)
fmt.Fprintf(hasher, "file:%s", source) fmt.Fprintf(hasher, "file:%s", source)
} else { } else {
fmt.Fprintf(hasher, "url:%s", source) fmt.Fprintf(hasher, "url:%s", source)
@ -57,7 +57,7 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) er
if len(pod.Name) == 0 { if len(pod.Name) == 0 {
pod.Name = string(pod.UID) pod.Name = string(pod.UID)
} }
pod.Name = generatePodName(pod.Name, hostname) pod.Name = generatePodName(pod.Name, nodeName)
glog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source) glog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source)
if pod.Namespace == "" { if pod.Namespace == "" {
@ -66,7 +66,7 @@ func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) er
glog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source) glog.V(5).Infof("Using namespace %q for pod %q from %s", pod.Namespace, pod.Name, source)
// Set the Host field to indicate this pod is scheduled on the current node. // Set the Host field to indicate this pod is scheduled on the current node.
pod.Spec.NodeName = hostname pod.Spec.NodeName = nodeName
pod.ObjectMeta.SelfLink = getSelfLink(pod.Name, pod.Namespace) pod.ObjectMeta.SelfLink = getSelfLink(pod.Name, pod.Namespace)
return nil return nil

View File

@ -34,14 +34,14 @@ import (
type sourceFile struct { type sourceFile struct {
path string path string
hostname string nodeName string
updates chan<- interface{} updates chan<- interface{}
} }
func NewSourceFile(path string, hostname string, period time.Duration, updates chan<- interface{}) { func NewSourceFile(path string, nodeName string, period time.Duration, updates chan<- interface{}) {
config := &sourceFile{ config := &sourceFile{
path: path, path: path,
hostname: hostname, nodeName: nodeName,
updates: updates, updates: updates,
} }
glog.V(1).Infof("Watching path %q", path) glog.V(1).Infof("Watching path %q", path)
@ -55,7 +55,7 @@ func (s *sourceFile) run() {
} }
func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error { func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
return applyDefaults(pod, source, true, s.hostname) return applyDefaults(pod, source, true, s.nodeName)
} }
func (s *sourceFile) extractFromPath() error { func (s *sourceFile) extractFromPath() error {

View File

@ -33,15 +33,15 @@ import (
type sourceURL struct { type sourceURL struct {
url string url string
hostname string nodeName string
updates chan<- interface{} updates chan<- interface{}
data []byte data []byte
} }
func NewSourceURL(url, hostname string, period time.Duration, updates chan<- interface{}) { func NewSourceURL(url, nodeName string, period time.Duration, updates chan<- interface{}) {
config := &sourceURL{ config := &sourceURL{
url: url, url: url,
hostname: hostname, nodeName: nodeName,
updates: updates, updates: updates,
data: nil, data: nil,
} }
@ -56,7 +56,7 @@ func (s *sourceURL) run() {
} }
func (s *sourceURL) applyDefaults(pod *api.Pod) error { func (s *sourceURL) applyDefaults(pod *api.Pod) error {
return applyDefaults(pod, s.url, false, s.hostname) return applyDefaults(pod, s.url, false, s.nodeName)
} }
func (s *sourceURL) extractFromURL() error { func (s *sourceURL) extractFromURL() error {

View File

@ -114,6 +114,7 @@ func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error
// New creates a new Kubelet for use in main // New creates a new Kubelet for use in main
func NewMainKubelet( func NewMainKubelet(
hostname string, hostname string,
nodeName string,
dockerClient dockertools.DockerInterface, dockerClient dockertools.DockerInterface,
kubeClient client.Interface, kubeClient client.Interface,
rootDirectory string, rootDirectory string,
@ -179,7 +180,7 @@ func NewMainKubelet(
if kubeClient != nil { if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name. // than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{client.ObjectNameField: hostname}.AsSelector() fieldSelector := fields.Set{client.ObjectNameField: nodeName}.AsSelector()
listWatch := &cache.ListWatch{ listWatch := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) { ListFunc: func() (runtime.Object, error) {
return kubeClient.Nodes().List(labels.Everything(), fieldSelector) return kubeClient.Nodes().List(labels.Everything(), fieldSelector)
@ -197,8 +198,8 @@ func NewMainKubelet(
// TODO: what is namespace for node? // TODO: what is namespace for node?
nodeRef := &api.ObjectReference{ nodeRef := &api.ObjectReference{
Kind: "Node", Kind: "Node",
Name: hostname, Name: nodeName,
UID: types.UID(hostname), UID: types.UID(nodeName),
Namespace: "", Namespace: "",
} }
@ -224,6 +225,7 @@ func NewMainKubelet(
klet := &Kubelet{ klet := &Kubelet{
hostname: hostname, hostname: hostname,
nodeName: nodeName,
dockerClient: dockerClient, dockerClient: dockerClient,
kubeClient: kubeClient, kubeClient: kubeClient,
rootDirectory: rootDirectory, rootDirectory: rootDirectory,
@ -362,6 +364,7 @@ type nodeLister interface {
// Kubelet is the main kubelet implementation. // Kubelet is the main kubelet implementation.
type Kubelet struct { type Kubelet struct {
hostname string hostname string
nodeName string
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
runtimeCache kubecontainer.RuntimeCache runtimeCache kubecontainer.RuntimeCache
kubeClient client.Interface kubeClient client.Interface
@ -637,13 +640,13 @@ func (kl *Kubelet) GetNode() (*api.Node, error) {
if err != nil { if err != nil {
return nil, errors.New("cannot list nodes") return nil, errors.New("cannot list nodes")
} }
host := kl.GetHostname() nodeName := kl.nodeName
for _, n := range l.Items { for _, n := range l.Items {
if n.Name == host { if n.Name == nodeName {
return &n, nil return &n, nil
} }
} }
return nil, fmt.Errorf("node %v not found", host) return nil, fmt.Errorf("node %v not found", nodeName)
} }
// Starts garbage collection theads. // Starts garbage collection theads.
@ -709,7 +712,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{ node := &api.Node{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: kl.hostname, Name: kl.nodeName,
Labels: map[string]string{"kubernetes.io/hostname": kl.hostname}, Labels: map[string]string{"kubernetes.io/hostname": kl.hostname},
}, },
} }
@ -718,18 +721,20 @@ func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
if !ok { if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider") return nil, fmt.Errorf("failed to get instances from cloud provider")
} }
// TODO(roberthbailey): Can we do this without having credentials to talk // TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider? // to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code // TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(kl.hostname) externalID, err := instances.ExternalID(kl.nodeName)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err) return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
} }
node.Spec.ExternalID = externalID node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the // TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a // cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here. // local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.hostname) node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -760,13 +765,13 @@ func (kl *Kubelet) registerWithApiserver() {
glog.V(2).Infof("Attempting to register node %s", node.Name) glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Nodes().Create(node); err != nil { if _, err := kl.kubeClient.Nodes().Create(node); err != nil {
if apierrors.IsAlreadyExists(err) { if apierrors.IsAlreadyExists(err) {
currentNode, err := kl.kubeClient.Nodes().Get(kl.hostname) currentNode, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil { if err != nil {
glog.Errorf("error getting node %q: %v", kl.hostname, err) glog.Errorf("error getting node %q: %v", kl.nodeName, err)
continue continue
} }
if currentNode == nil { if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.hostname) glog.Errorf("no node instance returned for %q", kl.nodeName)
continue continue
} }
if currentNode.Spec.ExternalID == node.Spec.ExternalID { if currentNode.Spec.ExternalID == node.Spec.ExternalID {
@ -1824,10 +1829,10 @@ func (kl *Kubelet) updateNodeStatus() error {
} }
func (kl *Kubelet) recordNodeStatusEvent(event string) { func (kl *Kubelet) recordNodeStatusEvent(event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, kl.hostname) glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
// TODO: This requires a transaction, either both node status is updated // TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055. // and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.hostname, event) kl.recorder.Eventf(kl.nodeRef, event, "Node %s status is now: %s", kl.nodeName, event)
} }
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
@ -1844,7 +1849,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
} }
// TODO(roberthbailey): Can we do this without having credentials to talk // TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider? // to the cloud provider?
nodeAddresses, err := instances.NodeAddresses(kl.hostname) nodeAddresses, err := instances.NodeAddresses(kl.nodeName)
if err != nil { if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err) return fmt.Errorf("failed to get node address from cloud provider: %v", err)
} }
@ -1898,7 +1903,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// TODO: This requires a transaction, either both node status is updated // TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055. // and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, "rebooted", kl.recorder.Eventf(kl.nodeRef, "rebooted",
"Node %s has been rebooted, boot id: %s", kl.hostname, info.BootID) "Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
} }
node.Status.NodeInfo.BootID = info.BootID node.Status.NodeInfo.BootID = info.BootID
} }
@ -1987,12 +1992,12 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 // tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly. // is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error { func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname) node, err := kl.kubeClient.Nodes().Get(kl.nodeName)
if err != nil { if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.hostname, err) return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
} }
if node == nil { if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.hostname) return fmt.Errorf("no node instance returned for %q", kl.nodeName)
} }
if err := kl.setNodeStatus(node); err != nil { if err := kl.setNodeStatus(node); err != nil {
return err return err