Merge pull request #20688 from ArtfulCoder/podhostname

Specify Pod hostname by Annotation
This commit is contained in:
Abhi Shah 2016-03-04 15:17:34 -08:00
commit 9bfd70f8f6
13 changed files with 261 additions and 54 deletions

View File

@ -1,3 +1,7 @@
## Version 1.14 (Mar 4 2016 Abhishek Shah <abshah@google.com>)
- If Endpoint has hostnames-map annotation (endpoints.net.beta.kubernetes.io/hostnames-map),
the hostnames supplied via the annotation will be used to generate A Records for Headless Service.
## Version 1.13 (Mar 1 2016 Prashanth.B <beeps@google.com>) ## Version 1.13 (Mar 1 2016 Prashanth.B <beeps@google.com>)
- Synchronously wait for the Kubernetes service at startup. - Synchronously wait for the Kubernetes service at startup.
- Add a SIGTERM/SIGINT handler. - Add a SIGTERM/SIGINT handler.

View File

@ -18,7 +18,7 @@
.PHONY: all kube2sky container push clean test .PHONY: all kube2sky container push clean test
TAG = 1.13 TAG = 1.14
PREFIX = gcr.io/google_containers PREFIX = gcr.io/google_containers
all: container all: container

View File

@ -37,6 +37,7 @@ import (
skymsg "github.com/skynetservices/skydns/msg" skymsg "github.com/skynetservices/skydns/msg"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
kapi "k8s.io/kubernetes/pkg/api" kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
kcache "k8s.io/kubernetes/pkg/client/cache" kcache "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
@ -46,6 +47,7 @@ import (
kselector "k8s.io/kubernetes/pkg/fields" kselector "k8s.io/kubernetes/pkg/fields"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -159,14 +161,28 @@ func getSkyMsg(ip string, port int) *skymsg.Service {
} }
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error { func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error {
glog.V(4).Infof("Endpoints Annotations: %v", e.Annotations)
for idx := range e.Subsets { for idx := range e.Subsets {
for subIdx := range e.Subsets[idx].Addresses { for subIdx := range e.Subsets[idx].Addresses {
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, 0)) endpointIP := e.Subsets[idx].Addresses[subIdx].IP
b, err := json.Marshal(getSkyMsg(endpointIP, 0))
if err != nil { if err != nil {
return err return err
} }
recordValue := string(b) recordValue := string(b)
recordLabel := getHash(recordValue) recordLabel := getHash(recordValue)
if serializedPodHostnames := e.Annotations[endpoints.PodHostnamesAnnotation]; len(serializedPodHostnames) > 0 {
podHostnames := map[string]endpoints.HostRecord{}
err := json.Unmarshal([]byte(serializedPodHostnames), &podHostnames)
if err != nil {
return err
}
if hostRecord, exists := podHostnames[string(endpointIP)]; exists {
if validation.IsDNS1123Label(hostRecord.HostName) {
recordLabel = hostRecord.HostName
}
}
}
recordKey := buildDNSNameString(subdomain, recordLabel) recordKey := buildDNSNameString(subdomain, recordLabel)
glog.V(2).Infof("Setting DNS record: %v -> %q\n", recordKey, recordValue) glog.V(2).Infof("Setting DNS record: %v -> %q\n", recordKey, recordValue)

View File

@ -47,7 +47,7 @@ spec:
- name: etcd-storage - name: etcd-storage
mountPath: /var/etcd/data mountPath: /var/etcd/data
- name: kube2sky - name: kube2sky
image: gcr.io/google_containers/kube2sky:1.13 image: gcr.io/google_containers/kube2sky:1.14
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in

View File

@ -28,6 +28,16 @@ import (
hashutil "k8s.io/kubernetes/pkg/util/hash" hashutil "k8s.io/kubernetes/pkg/util/hash"
) )
const (
// Its value is the json representation of map[string(IP)][HostRecord]
// example: '{"10.245.1.6":{"HostName":"my-webserver"}}'
PodHostnamesAnnotation = "endpoints.beta.kubernetes.io/hostnames-map"
)
type HostRecord struct {
HostName string
}
// RepackSubsets takes a slice of EndpointSubset objects, expands it to the full // RepackSubsets takes a slice of EndpointSubset objects, expands it to the full
// representation, and then repacks that into the canonical layout. This // representation, and then repacks that into the canonical layout. This
// ensures that code which operates on these objects can rely on the common // ensures that code which operates on these objects can rely on the common

View File

@ -23,6 +23,18 @@ import (
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
) )
const (
// The annotation value is a string specifying the hostname to be used for the pod e.g 'my-webserver-1'
PodHostnameAnnotation = "pod.beta.kubernetes.io/hostname"
// The annotation value is a string specifying the subdomain e.g. "my-web-service"
// If specified, on the the pod itself, "<hostname>.my-web-service.<namespace>.svc.<cluster domain>" would resolve to
// the pod's IP.
// If there is a headless service named "my-web-service" in the same namespace as the pod, then,
// <hostname>.my-web-service.<namespace>.svc.<cluster domain>" would be resolved by the cluster DNS Server.
PodSubdomainAnnotation = "pod.beta.kubernetes.io/subdomain"
)
// FindPort locates the container port for the given pod and portName. If the // FindPort locates the container port for the given pod and portName. If the
// targetPort is a number, use that. If the targetPort is a string, look that // targetPort is a number, use that. If the targetPort is a string, look that
// string up in all named ports in all containers in the target pod. If no // string up in all named ports in all containers in the target pod. If no

View File

@ -27,7 +27,10 @@ import (
"regexp" "regexp"
"strings" "strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
apiservice "k8s.io/kubernetes/pkg/api/service" apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/capabilities"
@ -36,8 +39,6 @@ import (
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
"github.com/golang/glog"
) )
// TODO: delete this global variable when we enable the validation of common // TODO: delete this global variable when we enable the validation of common
@ -113,10 +114,34 @@ func ValidateAnnotations(annotations map[string]string, fldPath *field.Path) fie
if totalSize > (int64)(totalAnnotationSizeLimitB) { if totalSize > (int64)(totalAnnotationSizeLimitB) {
allErrs = append(allErrs, field.TooLong(fldPath, "", totalAnnotationSizeLimitB)) allErrs = append(allErrs, field.TooLong(fldPath, "", totalAnnotationSizeLimitB))
} }
return allErrs
}
func ValidatePodSpecificAnnotations(annotations map[string]string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if annotations[api.AffinityAnnotationKey] != "" { if annotations[api.AffinityAnnotationKey] != "" {
allErrs = append(allErrs, ValidateAffinityInPodAnnotations(annotations, fldPath)...) allErrs = append(allErrs, ValidateAffinityInPodAnnotations(annotations, fldPath)...)
} }
if hostname, exists := annotations[utilpod.PodHostnameAnnotation]; exists && !validation.IsDNS1123Label(hostname) {
allErrs = append(allErrs, field.Invalid(fldPath, utilpod.PodHostnameAnnotation, DNS1123LabelErrorMsg))
}
if subdomain, exists := annotations[utilpod.PodSubdomainAnnotation]; exists && !validation.IsDNS1123Label(subdomain) {
allErrs = append(allErrs, field.Invalid(fldPath, utilpod.PodSubdomainAnnotation, DNS1123LabelErrorMsg))
}
return allErrs
}
func ValidateEndpointsSpecificAnnotations(annotations map[string]string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
hostnamesMap, exists := annotations[endpoints.PodHostnamesAnnotation]
if exists && !isValidHostnamesMap(hostnamesMap) {
allErrs = append(allErrs, field.Invalid(fldPath, endpoints.PodHostnamesAnnotation,
`must be a valid json representation of map[string(IP)][HostRecord] e.g. "{"10.245.1.6":{"HostName":"my-webserver"}}"`))
}
return allErrs return allErrs
} }
@ -1356,7 +1381,9 @@ func validateImagePullSecrets(imagePullSecrets []api.LocalObjectReference, fldPa
// ValidatePod tests if required fields in the pod are set. // ValidatePod tests if required fields in the pod are set.
func ValidatePod(pod *api.Pod) field.ErrorList { func ValidatePod(pod *api.Pod) field.ErrorList {
allErrs := ValidateObjectMeta(&pod.ObjectMeta, true, ValidatePodName, field.NewPath("metadata")) fldPath := field.NewPath("metadata")
allErrs := ValidateObjectMeta(&pod.ObjectMeta, true, ValidatePodName, fldPath)
allErrs = append(allErrs, ValidatePodSpecificAnnotations(pod.ObjectMeta.Annotations, fldPath.Child("annotations"))...)
allErrs = append(allErrs, ValidatePodSpec(&pod.Spec, field.NewPath("spec"))...) allErrs = append(allErrs, ValidatePodSpec(&pod.Spec, field.NewPath("spec"))...)
return allErrs return allErrs
} }
@ -1520,8 +1547,9 @@ func ValidatePodSecurityContext(securityContext *api.PodSecurityContext, spec *a
// ValidatePodUpdate tests to see if the update is legal for an end user to make. newPod is updated with fields // ValidatePodUpdate tests to see if the update is legal for an end user to make. newPod is updated with fields
// that cannot be changed. // that cannot be changed.
func ValidatePodUpdate(newPod, oldPod *api.Pod) field.ErrorList { func ValidatePodUpdate(newPod, oldPod *api.Pod) field.ErrorList {
allErrs := ValidateObjectMetaUpdate(&newPod.ObjectMeta, &oldPod.ObjectMeta, field.NewPath("metadata")) fldPath := field.NewPath("metadata")
allErrs := ValidateObjectMetaUpdate(&newPod.ObjectMeta, &oldPod.ObjectMeta, fldPath)
allErrs = append(allErrs, ValidatePodSpecificAnnotations(newPod.ObjectMeta.Annotations, fldPath.Child("annotations"))...)
specPath := field.NewPath("spec") specPath := field.NewPath("spec")
if len(newPod.Spec.Containers) != len(oldPod.Spec.Containers) { if len(newPod.Spec.Containers) != len(oldPod.Spec.Containers) {
//TODO: Pinpoint the specific container that causes the invalid error after we have strategic merge diff //TODO: Pinpoint the specific container that causes the invalid error after we have strategic merge diff
@ -1884,6 +1912,7 @@ func ValidatePodTemplateSpec(spec *api.PodTemplateSpec, fldPath *field.Path) fie
allErrs := field.ErrorList{} allErrs := field.ErrorList{}
allErrs = append(allErrs, ValidateLabels(spec.Labels, fldPath.Child("labels"))...) allErrs = append(allErrs, ValidateLabels(spec.Labels, fldPath.Child("labels"))...)
allErrs = append(allErrs, ValidateAnnotations(spec.Annotations, fldPath.Child("annotations"))...) allErrs = append(allErrs, ValidateAnnotations(spec.Annotations, fldPath.Child("annotations"))...)
allErrs = append(allErrs, ValidatePodSpecificAnnotations(spec.Annotations, fldPath.Child("annotations"))...)
allErrs = append(allErrs, ValidatePodSpec(&spec.Spec, fldPath.Child("spec"))...) allErrs = append(allErrs, ValidatePodSpec(&spec.Spec, fldPath.Child("spec"))...)
return allErrs return allErrs
} }
@ -2503,6 +2532,7 @@ func ValidateNamespaceFinalizeUpdate(newNamespace, oldNamespace *api.Namespace)
// ValidateEndpoints tests if required fields are set. // ValidateEndpoints tests if required fields are set.
func ValidateEndpoints(endpoints *api.Endpoints) field.ErrorList { func ValidateEndpoints(endpoints *api.Endpoints) field.ErrorList {
allErrs := ValidateObjectMeta(&endpoints.ObjectMeta, true, ValidateEndpointsName, field.NewPath("metadata")) allErrs := ValidateObjectMeta(&endpoints.ObjectMeta, true, ValidateEndpointsName, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateEndpointsSpecificAnnotations(endpoints.Annotations, field.NewPath("annotations"))...)
allErrs = append(allErrs, validateEndpointSubsets(endpoints.Subsets, field.NewPath("subsets"))...) allErrs = append(allErrs, validateEndpointSubsets(endpoints.Subsets, field.NewPath("subsets"))...)
return allErrs return allErrs
} }
@ -2586,6 +2616,7 @@ func validateEndpointPort(port *api.EndpointPort, requireName bool, fldPath *fie
func ValidateEndpointsUpdate(newEndpoints, oldEndpoints *api.Endpoints) field.ErrorList { func ValidateEndpointsUpdate(newEndpoints, oldEndpoints *api.Endpoints) field.ErrorList {
allErrs := ValidateObjectMetaUpdate(&newEndpoints.ObjectMeta, &oldEndpoints.ObjectMeta, field.NewPath("metadata")) allErrs := ValidateObjectMetaUpdate(&newEndpoints.ObjectMeta, &oldEndpoints.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, validateEndpointSubsets(newEndpoints.Subsets, field.NewPath("subsets"))...) allErrs = append(allErrs, validateEndpointSubsets(newEndpoints.Subsets, field.NewPath("subsets"))...)
allErrs = append(allErrs, ValidateEndpointsSpecificAnnotations(newEndpoints.Annotations, field.NewPath("annotations"))...)
return allErrs return allErrs
} }
@ -2651,3 +2682,24 @@ func ValidateLoadBalancerStatus(status *api.LoadBalancerStatus, fldPath *field.P
} }
return allErrs return allErrs
} }
func isValidHostnamesMap(serializedPodHostNames string) bool {
if len(serializedPodHostNames) == 0 {
return false
}
podHostNames := map[string]endpoints.HostRecord{}
err := json.Unmarshal([]byte(serializedPodHostNames), &podHostNames)
if err != nil {
return false
}
for ip, hostRecord := range podHostNames {
if !validation.IsDNS1123Label(hostRecord.HostName) {
return false
}
if net.ParseIP(ip) == nil {
return false
}
}
return true
}

View File

@ -22,10 +22,13 @@ import (
"reflect" "reflect"
"time" "time"
"encoding/json"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
podutil "k8s.io/kubernetes/pkg/api/pod" podutil "k8s.io/kubernetes/pkg/api/pod"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
@ -37,8 +40,6 @@ import (
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
) )
const ( const (
@ -187,7 +188,8 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
oldPod := cur.(*api.Pod) oldPod := cur.(*api.Pod)
// Only need to get the old services if the labels changed. // Only need to get the old services if the labels changed.
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) { if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) ||
!hostNameAndDomainAnnotationsAreEqual(newPod.Annotations, oldPod.Annotations) {
oldServices, err := e.getPodServiceMemberships(oldPod) oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil { if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err) glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
@ -200,6 +202,17 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
} }
} }
func hostNameAndDomainAnnotationsAreEqual(annotation1, annotation2 map[string]string) bool {
if annotation1 == nil {
annotation1 = map[string]string{}
}
if annotation2 == nil {
annotation2 = map[string]string{}
}
return annotation1[utilpod.PodHostnameAnnotation] == annotation2[utilpod.PodHostnameAnnotation] &&
annotation1[utilpod.PodSubdomainAnnotation] == annotation2[utilpod.PodSubdomainAnnotation]
}
// When a pod is deleted, enqueue the services the pod used to be a member of. // When a pod is deleted, enqueue the services the pod used to be a member of.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (e *EndpointController) deletePod(obj interface{}) { func (e *EndpointController) deletePod(obj interface{}) {
@ -294,6 +307,8 @@ func (e *EndpointController) syncService(key string) {
} }
subsets := []api.EndpointSubset{} subsets := []api.EndpointSubset{}
podHostNames := map[string]endpoints.HostRecord{}
for i := range pods.Items { for i := range pods.Items {
pod := &pods.Items[i] pod := &pods.Items[i]
@ -316,14 +331,26 @@ func (e *EndpointController) syncService(key string) {
continue continue
} }
hostname := pod.Annotations[utilpod.PodHostnameAnnotation]
if len(hostname) > 0 &&
pod.Annotations[utilpod.PodSubdomainAnnotation] == service.Name &&
service.Namespace == pod.Namespace {
hostRecord := endpoints.HostRecord{
HostName: hostname,
}
podHostNames[string(pod.Status.PodIP)] = hostRecord
}
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto} epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{ epa := api.EndpointAddress{
Kind: "Pod", IP: pod.Status.PodIP,
Namespace: pod.ObjectMeta.Namespace, TargetRef: &api.ObjectReference{
Name: pod.ObjectMeta.Name, Kind: "Pod",
UID: pod.ObjectMeta.UID, Namespace: pod.ObjectMeta.Namespace,
ResourceVersion: pod.ObjectMeta.ResourceVersion, Name: pod.ObjectMeta.Name,
}} UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
if api.IsPodReady(pod) { if api.IsPodReady(pod) {
subsets = append(subsets, api.EndpointSubset{ subsets = append(subsets, api.EndpointSubset{
Addresses: []api.EndpointAddress{epa}, Addresses: []api.EndpointAddress{epa},
@ -356,14 +383,38 @@ func (e *EndpointController) syncService(key string) {
return return
} }
} }
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
serializedPodHostNames := ""
if len(podHostNames) > 0 {
b, err := json.Marshal(podHostNames)
if err != nil {
glog.Errorf("Error updating endpoints. Marshalling of hostnames failed.: %v", err)
e.queue.Add(key) // Retry
return
}
serializedPodHostNames = string(b)
}
podHostNamesAreEqual := verifyPodHostNamesAreEqual(serializedPodHostNames, currentEndpoints.Annotations)
newAnnotations := make(map[string]string)
newAnnotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) &&
reflect.DeepEqual(currentEndpoints.Labels, service.Labels) && podHostNamesAreEqual {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return return
} }
newEndpoints := currentEndpoints newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = make(map[string]string)
}
if len(serializedPodHostNames) == 0 {
delete(newEndpoints.Annotations, endpoints.PodHostnamesAnnotation)
} else {
newEndpoints.Annotations[endpoints.PodHostnamesAnnotation] = serializedPodHostNames
}
if len(currentEndpoints.ResourceVersion) == 0 { if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them // No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
@ -377,6 +428,14 @@ func (e *EndpointController) syncService(key string) {
} }
} }
func verifyPodHostNamesAreEqual(newPodHostNames string, oldAnnotations map[string]string) bool {
oldPodHostNames := ""
if oldAnnotations != nil {
oldPodHostNames = oldAnnotations[endpoints.PodHostnamesAnnotation]
}
return oldPodHostNames == newPodHostNames
}
// checkLeftoverEndpoints lists all currently existing endpoints and adds their // checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no // service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to // corresponding service; these endpoints need to be deleted. We only need to

View File

@ -356,6 +356,8 @@ type RunContainerOptions struct {
CgroupParent string CgroupParent string
// The type of container rootfs // The type of container rootfs
ReadOnly bool ReadOnly bool
// hostname for pod containers
Hostname string
} }
// VolumeInfo contains information about the volume. // VolumeInfo contains information about the volume.

View File

@ -539,7 +539,6 @@ func (dm *DockerManager) runContainer(
// of CPU shares. // of CPU shares.
cpuShares = milliCPUToShares(cpuRequest.MilliValue()) cpuShares = milliCPUToShares(cpuRequest.MilliValue())
} }
podHasSELinuxLabel := pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil podHasSELinuxLabel := pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil
binds := makeMountBindings(opts.Mounts, podHasSELinuxLabel) binds := makeMountBindings(opts.Mounts, podHasSELinuxLabel)
// The reason we create and mount the log file in here (not in kubelet) is because // The reason we create and mount the log file in here (not in kubelet) is because
@ -646,14 +645,7 @@ func setInfraContainerNetworkConfig(pod *api.Pod, netMode string, opts *kubecont
dockerOpts.HostConfig.PortBindings = portBindings dockerOpts.HostConfig.PortBindings = portBindings
if netMode != namespaceModeHost { if netMode != namespaceModeHost {
// TODO(vmarmol): Handle better. dockerOpts.Config.Hostname = opts.Hostname
// Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char).
const hostnameMaxLen = 63
containerHostname := pod.Name
if len(containerHostname) > hostnameMaxLen {
containerHostname = containerHostname[:hostnameMaxLen]
}
dockerOpts.Config.Hostname = containerHostname
if len(opts.DNS) > 0 { if len(opts.DNS) > 0 {
dockerOpts.HostConfig.DNS = opts.DNS dockerOpts.HostConfig.DNS = opts.DNS
} }

View File

@ -36,6 +36,7 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2" cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors" apierrors "k8s.io/kubernetes/pkg/api/errors"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/api/validation"
@ -80,6 +81,7 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
utilvalidation "k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/version"
@ -1215,7 +1217,7 @@ func (kl *Kubelet) relabelVolumes(pod *api.Pod, volumes kubecontainer.VolumeMap)
return nil return nil
} }
func makeMounts(pod *api.Pod, podDir string, container *api.Container, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) { func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
// Kubernetes only mounts on /etc/hosts if : // Kubernetes only mounts on /etc/hosts if :
// - container does not use hostNetwork and // - container does not use hostNetwork and
// - container is not a infrastructure(pause) container // - container is not a infrastructure(pause) container
@ -1249,7 +1251,7 @@ func makeMounts(pod *api.Pod, podDir string, container *api.Container, podVolume
}) })
} }
if mountEtcHostsFile { if mountEtcHostsFile {
hostsMount, err := makeHostsMount(podDir, pod.Status.PodIP, pod.Name) hostsMount, err := makeHostsMount(podDir, pod.Status.PodIP, hostName, hostDomain)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1258,9 +1260,9 @@ func makeMounts(pod *api.Pod, podDir string, container *api.Container, podVolume
return mounts, nil return mounts, nil
} }
func makeHostsMount(podDir, podIP, podName string) (*kubecontainer.Mount, error) { func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontainer.Mount, error) {
hostsFilePath := path.Join(podDir, "etc-hosts") hostsFilePath := path.Join(podDir, "etc-hosts")
if err := ensureHostsFile(hostsFilePath, podIP, podName); err != nil { if err := ensureHostsFile(hostsFilePath, podIP, hostName, hostDomainName); err != nil {
return nil, err return nil, err
} }
return &kubecontainer.Mount{ return &kubecontainer.Mount{
@ -1271,7 +1273,7 @@ func makeHostsMount(podDir, podIP, podName string) (*kubecontainer.Mount, error)
}, nil }, nil
} }
func ensureHostsFile(fileName string, hostIP, hostName string) error { func ensureHostsFile(fileName, hostIP, hostName, hostDomainName string) error {
if _, err := os.Stat(fileName); os.IsExist(err) { if _, err := os.Stat(fileName); os.IsExist(err) {
glog.V(4).Infof("kubernetes-managed etc-hosts file exits. Will not be recreated: %q", fileName) glog.V(4).Infof("kubernetes-managed etc-hosts file exits. Will not be recreated: %q", fileName)
return nil return nil
@ -1284,7 +1286,11 @@ func ensureHostsFile(fileName string, hostIP, hostName string) error {
buffer.WriteString("fe00::0\tip6-mcastprefix\n") buffer.WriteString("fe00::0\tip6-mcastprefix\n")
buffer.WriteString("fe00::1\tip6-allnodes\n") buffer.WriteString("fe00::1\tip6-allnodes\n")
buffer.WriteString("fe00::2\tip6-allrouters\n") buffer.WriteString("fe00::2\tip6-allrouters\n")
buffer.WriteString(fmt.Sprintf("%s\t%s\n", hostIP, hostName)) if len(hostDomainName) > 0 {
buffer.WriteString(fmt.Sprintf("%s\t%s.%s\t%s\n", hostIP, hostName, hostDomainName, hostName))
} else {
buffer.WriteString(fmt.Sprintf("%s\t%s\n", hostIP, hostName))
}
return ioutil.WriteFile(fileName, buffer.Bytes(), 0644) return ioutil.WriteFile(fileName, buffer.Bytes(), 0644)
} }
@ -1318,12 +1324,40 @@ func makePortMappings(container *api.Container) (ports []kubecontainer.PortMappi
return return
} }
func generatePodHostNameAndDomain(pod *api.Pod, clusterDomain string) (string, string) {
// TODO(vmarmol): Handle better.
// Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char).
const hostnameMaxLen = 63
podAnnotations := pod.Annotations
if podAnnotations == nil {
podAnnotations = make(map[string]string)
}
hostname := pod.Name
hostnameCandidate := podAnnotations[utilpod.PodHostnameAnnotation]
if utilvalidation.IsDNS1123Label(hostnameCandidate) {
// use hostname annotation, if specified.
hostname = hostnameCandidate
}
if len(hostname) > hostnameMaxLen {
hostname = hostname[:hostnameMaxLen]
glog.Errorf("hostname for pod:%q was longer than %d. Truncated hostname to :%q", pod.Name, hostnameMaxLen, hostname)
}
hostDomain := ""
subdomainCandidate := pod.Annotations[utilpod.PodSubdomainAnnotation]
if utilvalidation.IsDNS1123Label(subdomainCandidate) {
hostDomain = fmt.Sprintf("%s.%s.svc.%s", subdomainCandidate, pod.Namespace, clusterDomain)
}
return hostname, hostDomain
}
// GenerateRunContainerOptions generates the RunContainerOptions, which can be used by // GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container. // the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Container) (*kubecontainer.RunContainerOptions, error) { func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Container) (*kubecontainer.RunContainerOptions, error) {
var err error var err error
opts := &kubecontainer.RunContainerOptions{CgroupParent: kl.cgroupRoot} opts := &kubecontainer.RunContainerOptions{CgroupParent: kl.cgroupRoot}
hostname, hostDomainName := generatePodHostNameAndDomain(pod, kl.clusterDomain)
opts.Hostname = hostname
vol, ok := kl.volumeManager.GetVolumes(pod.UID) vol, ok := kl.volumeManager.GetVolumes(pod.UID)
if !ok { if !ok {
return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", format.Pod(pod)) return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", format.Pod(pod))
@ -1340,7 +1374,7 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Cont
} }
} }
opts.Mounts, err = makeMounts(pod, kl.getPodDir(pod.UID), container, vol) opts.Mounts, err = makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, vol)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -727,7 +727,7 @@ func TestMakeVolumeMounts(t *testing.T) {
}, },
} }
mounts, _ := makeMounts(&pod, "/pod", &container, podVolumes) mounts, _ := makeMounts(&pod, "/pod", &container, "fakepodname", "", podVolumes)
expectedMounts := []kubecontainer.Mount{ expectedMounts := []kubecontainer.Mount{
{ {

View File

@ -21,19 +21,22 @@ import (
"strings" "strings"
"time" "time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apimachinery/registered"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
) )
var dnsServiceLableSelector = labels.Set{ const dnsTestPodHostName = "dns-querier-1"
const dnsTestServiceName = "dns-test-service"
var dnsServiceLabelSelector = labels.Set{
"k8s-app": "kube-dns", "k8s-app": "kube-dns",
"kubernetes.io/cluster-service": "true", "kubernetes.io/cluster-service": "true",
}.AsSelector() }.AsSelector()
@ -47,6 +50,10 @@ func createDNSPod(namespace, wheezyProbeCmd, jessieProbeCmd string) *api.Pod {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "dns-test-" + string(util.NewUUID()), Name: "dns-test-" + string(util.NewUUID()),
Namespace: namespace, Namespace: namespace,
Annotations: map[string]string{
pod.PodHostnameAnnotation: dnsTestPodHostName,
pod.PodSubdomainAnnotation: dnsTestServiceName,
},
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
Volumes: []api.Volume{ Volumes: []api.Volume{
@ -103,7 +110,7 @@ func createDNSPod(namespace, wheezyProbeCmd, jessieProbeCmd string) *api.Pod {
return pod return pod
} }
func createProbeCommand(namesToResolve []string, fileNamePrefix string) (string, []string) { func createProbeCommand(namesToResolve []string, hostEntries []string, fileNamePrefix, namespace string) (string, []string) {
fileNames := make([]string, 0, len(namesToResolve)*2) fileNames := make([]string, 0, len(namesToResolve)*2)
probeCmd := "for i in `seq 1 600`; do " probeCmd := "for i in `seq 1 600`; do "
for _, name := range namesToResolve { for _, name := range namesToResolve {
@ -121,6 +128,21 @@ func createProbeCommand(namesToResolve []string, fileNamePrefix string) (string,
fileNames = append(fileNames, fileName) fileNames = append(fileNames, fileName)
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName) probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName)
} }
for _, name := range hostEntries {
fileName := fmt.Sprintf("%s_hosts@%s", fileNamePrefix, name)
fileNames = append(fileNames, fileName)
probeCmd += fmt.Sprintf(`test -n "$$(getent hosts %s)" && echo OK > /results/%s;`, name, fileName)
}
podARecByUDPFileName := fmt.Sprintf("%s_udp@PodARecord", fileNamePrefix)
podARecByTCPFileName := fmt.Sprintf("%s_tcp@PodARecord", fileNamePrefix)
probeCmd += fmt.Sprintf(`podARec=$$(hostname -i| awk -F. '{print $$1"-"$$2"-"$$3"-"$$4".%s.pod.cluster.local"}');`, namespace)
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search $${podARec} A)" && echo OK > /results/%s;`, podARecByUDPFileName)
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search $${podARec} A)" && echo OK > /results/%s;`, podARecByTCPFileName)
fileNames = append(fileNames, podARecByUDPFileName)
fileNames = append(fileNames, podARecByTCPFileName)
probeCmd += "sleep 1; done" probeCmd += "sleep 1; done"
return probeCmd, fileNames return probeCmd, fileNames
} }
@ -186,7 +208,6 @@ func validateDNSResults(f *Framework, pod *api.Pod, fileNames []string) {
if err != nil { if err != nil {
Failf("Failed to get pod %s: %v", pod.Name, err) Failf("Failed to get pod %s: %v", pod.Name, err)
} }
// Try to find results for each expected name. // Try to find results for each expected name.
By("looking for the results for each expected name from probiers") By("looking for the results for each expected name from probiers")
assertFilesExist(fileNames, "results", pod, f.Client) assertFilesExist(fileNames, "results", pod, f.Client)
@ -202,16 +223,15 @@ var _ = Describe("DNS", func() {
It("should provide DNS for the cluster", func() { It("should provide DNS for the cluster", func() {
// TODO: support DNS on vagrant #3580 // TODO: support DNS on vagrant #3580
SkipIfProviderIs("vagrant") SkipIfProviderIs("vagrant")
systemClient := f.Client.Pods(api.NamespaceSystem) systemClient := f.Client.Pods(api.NamespaceSystem)
By("Waiting for DNS Service to be Running") By("Waiting for DNS Service to be Running")
options := api.ListOptions{LabelSelector: dnsServiceLableSelector} options := api.ListOptions{LabelSelector: dnsServiceLabelSelector}
dnsPods, err := systemClient.List(options) dnsPods, err := systemClient.List(options)
if err != nil { if err != nil {
Failf("Failed to list all dns service pods") Failf("Failed to list all dns service pods")
} }
if len(dnsPods.Items) != 1 { if len(dnsPods.Items) != 1 {
Failf("Unexpected number of pods (%d) matches the label selector %v", len(dnsPods.Items), dnsServiceLableSelector.String()) Failf("Unexpected number of pods (%d) matches the label selector %v", len(dnsPods.Items), dnsServiceLabelSelector.String())
} }
expectNoError(waitForPodRunningInNamespace(f.Client, dnsPods.Items[0].Name, api.NamespaceSystem)) expectNoError(waitForPodRunningInNamespace(f.Client, dnsPods.Items[0].Name, api.NamespaceSystem))
@ -228,8 +248,10 @@ var _ = Describe("DNS", func() {
namesToResolve = append(namesToResolve, "metadata") namesToResolve = append(namesToResolve, "metadata")
} }
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, "wheezy") wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, []string{dnsTestPodHostName}, "wheezy", f.Namespace.Name)
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, "jessie") jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, []string{dnsTestPodHostName}, "jessie", f.Namespace.Name)
By("Running these commands on wheezy:" + wheezyProbeCmd + "\n")
By("Running these commands on jessie:" + jessieProbeCmd + "\n")
// Run a pod which probes DNS and exposes the results by HTTP. // Run a pod which probes DNS and exposes the results by HTTP.
By("creating a pod to probe DNS") By("creating a pod to probe DNS")
@ -244,13 +266,13 @@ var _ = Describe("DNS", func() {
systemClient := f.Client.Pods(api.NamespaceSystem) systemClient := f.Client.Pods(api.NamespaceSystem)
By("Waiting for DNS Service to be Running") By("Waiting for DNS Service to be Running")
options := api.ListOptions{LabelSelector: dnsServiceLableSelector} options := api.ListOptions{LabelSelector: dnsServiceLabelSelector}
dnsPods, err := systemClient.List(options) dnsPods, err := systemClient.List(options)
if err != nil { if err != nil {
Failf("Failed to list all dns service pods") Failf("Failed to list all dns service pods")
} }
if len(dnsPods.Items) != 1 { if len(dnsPods.Items) != 1 {
Failf("Unexpected number of pods (%d) matches the label selector %v", len(dnsPods.Items), dnsServiceLableSelector.String()) Failf("Unexpected number of pods (%d) matches the label selector %v", len(dnsPods.Items), dnsServiceLabelSelector.String())
} }
expectNoError(waitForPodRunningInNamespace(f.Client, dnsPods.Items[0].Name, api.NamespaceSystem)) expectNoError(waitForPodRunningInNamespace(f.Client, dnsPods.Items[0].Name, api.NamespaceSystem))
@ -261,7 +283,7 @@ var _ = Describe("DNS", func() {
} }
headlessService := &api.Service{ headlessService := &api.Service{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "test-service", Name: dnsTestServiceName,
}, },
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
ClusterIP: "None", ClusterIP: "None",
@ -303,16 +325,20 @@ var _ = Describe("DNS", func() {
// All the names we need to be able to resolve. // All the names we need to be able to resolve.
// TODO: Create more endpoints and ensure that multiple A records are returned // TODO: Create more endpoints and ensure that multiple A records are returned
// for headless service. // for headless service.
hostFQDN := fmt.Sprintf("%s.%s.%s.svc.cluster.local", dnsTestPodHostName, dnsTestServiceName, f.Namespace.Name)
namesToResolve := []string{ namesToResolve := []string{
fmt.Sprintf("%s", headlessService.Name), fmt.Sprintf("%s", headlessService.Name),
fmt.Sprintf("%s.%s", headlessService.Name, f.Namespace.Name), fmt.Sprintf("%s.%s", headlessService.Name, f.Namespace.Name),
fmt.Sprintf("%s.%s.svc", headlessService.Name, f.Namespace.Name), fmt.Sprintf("%s.%s.svc", headlessService.Name, f.Namespace.Name),
fmt.Sprintf("_http._tcp.%s.%s.svc", headlessService.Name, f.Namespace.Name), fmt.Sprintf("_http._tcp.%s.%s.svc", headlessService.Name, f.Namespace.Name),
fmt.Sprintf("_http._tcp.%s.%s.svc", regularService.Name, f.Namespace.Name), fmt.Sprintf("_http._tcp.%s.%s.svc", regularService.Name, f.Namespace.Name),
hostFQDN,
} }
wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, "wheezy") wheezyProbeCmd, wheezyFileNames := createProbeCommand(namesToResolve, []string{hostFQDN, dnsTestPodHostName}, "wheezy", f.Namespace.Name)
jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, "jessie") jessieProbeCmd, jessieFileNames := createProbeCommand(namesToResolve, []string{hostFQDN, dnsTestPodHostName}, "jessie", f.Namespace.Name)
By("Running these commands on wheezy:" + wheezyProbeCmd + "\n")
By("Running these commands on jessie:" + jessieProbeCmd + "\n")
// Run a pod which probes DNS and exposes the results by HTTP. // Run a pod which probes DNS and exposes the results by HTTP.
By("creating a pod to probe DNS") By("creating a pod to probe DNS")