diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index a2e44080506..d3377d07fc0 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -192,13 +192,13 @@ func startComponents(manifestURL string) (apiServerURL string) { // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") glog.Infof("Using %s as root dir for kubelet #1", testRootDir) - standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250) + standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. testRootDir = makeTempDirOrDie("kubelet_integ_2.") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) - standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251) + standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault) return apiServer.URL } diff --git a/cmd/kube-apiserver/apiserver.go b/cmd/kube-apiserver/apiserver.go index 1ee5c938af7..671e2b0f52e 100644 --- a/cmd/kube-apiserver/apiserver.go +++ b/cmd/kube-apiserver/apiserver.go @@ -28,6 +28,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -87,6 +88,7 @@ var ( Port: 10250, EnableHttps: false, } + masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods") ) func init() { @@ -177,25 +179,26 @@ func main() { admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, *admissionControlConfigFile) config := &master.Config{ - Client: client, - Cloud: cloud, - EtcdHelper: helper, - HealthCheckMinions: *healthCheckMinions, - EventTTL: *eventTTL, - KubeletClient: kubeletClient, - PortalNet: &n, - EnableLogsSupport: *enableLogsSupport, - EnableUISupport: true, - EnableSwaggerSupport: true, - APIPrefix: *apiPrefix, - CorsAllowedOriginList: corsAllowedOriginList, - ReadOnlyPort: *readOnlyPort, - ReadWritePort: *port, - PublicAddress: *publicAddressOverride, - Authenticator: authenticator, - Authorizer: authorizer, - AdmissionControl: admissionController, - EnableV1Beta3: v1beta3, + Client: client, + Cloud: cloud, + EtcdHelper: helper, + HealthCheckMinions: *healthCheckMinions, + EventTTL: *eventTTL, + KubeletClient: kubeletClient, + PortalNet: &n, + EnableLogsSupport: *enableLogsSupport, + EnableUISupport: true, + EnableSwaggerSupport: true, + APIPrefix: *apiPrefix, + CorsAllowedOriginList: corsAllowedOriginList, + ReadOnlyPort: *readOnlyPort, + ReadWritePort: *port, + PublicAddress: *publicAddressOverride, + Authenticator: authenticator, + Authorizer: authorizer, + AdmissionControl: admissionController, + EnableV1Beta3: v1beta3, + MasterServiceNamespace: *masterServiceNamespace, } m := master.New(config) diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 19433098348..d9385d9bb70 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -26,6 +26,7 @@ import ( "net" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" @@ -64,6 +65,7 @@ var ( oomScoreAdj = flag.Int("oom_score_adj", -900, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]") apiServerList util.StringList clusterDomain = flag.String("cluster_domain", "", "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") + masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods") clusterDNS = util.IP(nil) ) @@ -130,6 +132,7 @@ func main() { DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint), KubeClient: client, EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile), + MasterServiceNamespace: *masterServiceNamespace, } standalone.RunKubelet(&kcfg) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 38b36188c36..44ccaaa34ed 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -25,6 +25,7 @@ import ( "fmt" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/standalone" @@ -40,19 +41,20 @@ var ( dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") etcdServer = flag.String("etcd_server", "http://localhost:4001", "If non-empty, path to the set of etcd server to use") // TODO: Discover these by pinging the host machines, and rip out these flags. - nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") - nodeMemory = flag.Int64("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") + nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") + nodeMemory = flag.Int64("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") + masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods") ) func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string, port int) { machineList := []string{"localhost"} - standalone.RunApiServer(cl, etcdClient, addr, port) + standalone.RunApiServer(cl, etcdClient, addr, port, *masterServiceNamespace) standalone.RunScheduler(cl) standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint) - standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250) + standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace) } func newApiClient(addr string, port int) *client.Client { diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 2757f268ff0..352c9d6f0eb 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -878,7 +878,10 @@ func TestValidateService(t *testing.T) { }, existing: api.ServiceList{ Items: []api.Service{ - {Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true}}, + { + ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true}, + }, }, }, numErrs: 1, @@ -895,7 +898,10 @@ func TestValidateService(t *testing.T) { }, existing: api.ServiceList{ Items: []api.Service{ - {Spec: api.ServiceSpec{Port: 80}}, + { + ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{Port: 80}, + }, }, }, numErrs: 0, @@ -911,7 +917,10 @@ func TestValidateService(t *testing.T) { }, existing: api.ServiceList{ Items: []api.Service{ - {Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true}}, + { + ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true}, + }, }, }, numErrs: 0, @@ -927,7 +936,10 @@ func TestValidateService(t *testing.T) { }, existing: api.ServiceList{ Items: []api.Service{ - {Spec: api.ServiceSpec{Port: 80}}, + { + ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{Port: 80}, + }, }, }, numErrs: 0, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a65fb55045c..2958ff4e2f5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -34,9 +34,12 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" @@ -76,7 +79,8 @@ func NewMainKubelet( maxContainerCount int, sourceReady SourceReadyFn, clusterDomain string, - clusterDNS net.IP) (*Kubelet, error) { + clusterDNS net.IP, + masterServiceNamespace string) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) } @@ -86,24 +90,31 @@ func NewMainKubelet( if minimumGCAge <= 0 { return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge) } + + serviceStore := cache.NewStore() + cache.NewReflector(&cache.ListWatch{kubeClient, labels.Everything(), "services", api.NamespaceAll}, &api.Service{}, serviceStore).Run() + serviceLister := &cache.StoreToServiceLister{serviceStore} + klet := &Kubelet{ - hostname: hostname, - dockerClient: dockerClient, - etcdClient: etcdClient, - rootDirectory: rootDirectory, - resyncInterval: resyncInterval, - networkContainerImage: networkContainerImage, - podWorkers: newPodWorkers(), - dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, - runner: dockertools.NewDockerContainerCommandRunner(dockerClient), - httpClient: &http.Client{}, - pullQPS: pullQPS, - pullBurst: pullBurst, - minimumGCAge: minimumGCAge, - maxContainerCount: maxContainerCount, - sourceReady: sourceReady, - clusterDomain: clusterDomain, - clusterDNS: clusterDNS, + hostname: hostname, + dockerClient: dockerClient, + etcdClient: etcdClient, + rootDirectory: rootDirectory, + resyncInterval: resyncInterval, + networkContainerImage: networkContainerImage, + podWorkers: newPodWorkers(), + dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, + runner: dockertools.NewDockerContainerCommandRunner(dockerClient), + httpClient: &http.Client{}, + pullQPS: pullQPS, + pullBurst: pullBurst, + minimumGCAge: minimumGCAge, + maxContainerCount: maxContainerCount, + sourceReady: sourceReady, + clusterDomain: clusterDomain, + clusterDNS: clusterDNS, + serviceLister: serviceLister, + masterServiceNamespace: masterServiceNamespace, } if err := klet.setupDataDirs(); err != nil { @@ -117,10 +128,15 @@ type httpGetter interface { Get(url string) (*http.Response, error) } +type serviceLister interface { + List() (api.ServiceList, error) +} + // Kubelet is the main kubelet implementation. type Kubelet struct { hostname string dockerClient dockertools.DockerInterface + kubeClient *client.Client rootDirectory string networkContainerImage string podWorkers *podWorkers @@ -168,6 +184,9 @@ type Kubelet struct { // If non-nil, use this for container DNS server. clusterDNS net.IP + + masterServiceNamespace string + serviceLister serviceLister } // GetRootDir returns the full path to the directory under which kubelet can @@ -423,14 +442,6 @@ func (self *podWorkers) Run(podFullName string, action func()) { }() } -func makeEnvironmentVariables(container *api.Container) []string { - var result []string - for _, value := range container.Env { - result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value)) - } - return result -} - func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { binds := []string{} for _, mount := range container.VolumeMounts { @@ -607,7 +618,10 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } - envVariables := makeEnvironmentVariables(container) + envVariables, err := kl.makeEnvironmentVariables(pod.Namespace, container) + if err != nil { + return "", err + } binds := makeBinds(pod, container, podVolumes) exposedPorts, portBindings := makePortsAndBindings(container) @@ -692,6 +706,91 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod return dockertools.DockerID(dockerContainer.ID), err } +var masterServices = util.NewStringSet("kubernetes", "kubernetes-ro") + +// getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see +func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) { + var ( + serviceMap = make(map[string]api.Service) + m = make(map[string]string) + ) + + // Get all service resources from the master (via a cache), + // and populate them into service enviroment variables. + if kl.serviceLister == nil { + // Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars. + return m, nil + } + services, err := kl.serviceLister.List() + if err != nil { + return m, fmt.Errorf("Failed to list services when setting up env vars.") + } + + // project the services in namespace ns onto the master services + for _, service := range services.Items { + serviceName := service.Name + + switch service.Namespace { + // for the case whether the master service namespace is the namespace the pod + // is in, pod should receive all the pods in the namespace. + // + // ordering of the case clauses below enforces this + case ns: + serviceMap[serviceName] = service + case kl.masterServiceNamespace: + if masterServices.Has(serviceName) { + _, exists := serviceMap[serviceName] + if !exists { + serviceMap[serviceName] = service + } + } + } + } + services.Items = []api.Service{} + for _, service := range serviceMap { + services.Items = append(services.Items, service) + } + + for _, e := range envvars.FromServices(&services) { + m[e.Name] = e.Value + } + return m, nil +} + +// Make the service environment variables for a pod in the given namespace. +func (kl *Kubelet) makeEnvironmentVariables(ns string, container *api.Container) ([]string, error) { + var result []string + // Note: These are added to the docker.Config, but are not included in the checksum computed + // by dockertools.BuildDockerName(...). That way, we can still determine whether an + // api.Container is already running by its hash. (We don't want to restart a container just + // because some service changed.) + // + // Note that there is a race between Kubelet seeing the pod and kubelet seeing the service. + // To avoid this users can: (1) wait between starting a service and starting; or (2) detect + // missing service env var and exit and be restarted; or (3) use DNS instead of env vars + // and keep trying to resolve the DNS name of the service (recommended). + serviceEnv, err := kl.getServiceEnvVarMap(ns) + if err != nil { + return result, err + } + + for _, value := range container.Env { + // The code is in transition from using etcd+BoundPods to apiserver+Pods. + // So, the master may set service env vars, or kubelet may. In case both are doing + // it, we delete the key from the kubelet-generated ones so we don't have duplicate + // env vars. + // TODO: remove this net line once all platforms use apiserver+Pods. + delete(serviceEnv, value.Name) + result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value)) + } + + // Append remaining service env vars. + for k, v := range serviceEnv { + result = append(result, fmt.Sprintf("%s=%s", k, v)) + } + return result, nil +} + func (kl *Kubelet) applyClusterDNS(hc *docker.HostConfig, pod *api.BoundPod) error { // Get host DNS settings and append them to cluster DNS settings. f, err := os.Open("/etc/resolv.conf") diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e7babf7fd3a..71effbfdfd8 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -58,6 +58,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools kubelet.rootDirectory = "/tmp/kubelet" kubelet.podWorkers = newPodWorkers() kubelet.sourceReady = func(source string) bool { return true } + return kubelet, fakeEtcdClient, fakeDocker } @@ -913,31 +914,6 @@ func TestSyncPodUnhealthy(t *testing.T) { } } -func TestMakeEnvVariables(t *testing.T) { - container := api.Container{ - Env: []api.EnvVar{ - { - Name: "foo", - Value: "bar", - }, - { - Name: "baz", - Value: "blah", - }, - }, - } - vars := makeEnvironmentVariables(&container) - if len(vars) != len(container.Env) { - t.Errorf("Vars don't match. Expected: %#v Found: %#v", container.Env, vars) - } - for ix, env := range container.Env { - value := fmt.Sprintf("%s=%s", env.Name, env.Value) - if value != vars[ix] { - t.Errorf("Unexpected value: %s. Expected: %s", vars[ix], value) - } - } -} - func TestMountExternalVolumes(t *testing.T) { kubelet, _, _ := newTestKubelet(t) pod := api.BoundPod{ @@ -1936,3 +1912,275 @@ func TestParseResolvConf(t *testing.T) { } } } + +type testServiceLister struct { + services []api.Service +} + +func (ls testServiceLister) List() (api.ServiceList, error) { + return api.ServiceList{ + Items: ls.services, + }, nil +} + +func TestMakeEnvironmentVariables(t *testing.T) { + services := []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{ + Port: 8081, + PortalIP: "1.2.3.1", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes-ro", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{ + Port: 8082, + PortalIP: "1.2.3.2", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test1"}, + Spec: api.ServiceSpec{ + Port: 8083, + PortalIP: "1.2.3.3", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "test2"}, + Spec: api.ServiceSpec{ + Port: 8084, + PortalIP: "1.2.3.4", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test2"}, + Spec: api.ServiceSpec{ + Port: 8085, + PortalIP: "1.2.3.5", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "kubernetes"}, + Spec: api.ServiceSpec{ + Port: 8086, + PortalIP: "1.2.3.6", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes-ro", Namespace: "kubernetes"}, + Spec: api.ServiceSpec{ + Port: 8087, + PortalIP: "1.2.3.7", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "not-special", Namespace: "kubernetes"}, + Spec: api.ServiceSpec{ + Port: 8088, + PortalIP: "1.2.3.8", + }, + }, + } + + testCases := []struct { + name string // the name of the test case + ns string // the namespace to generate environment for + container *api.Container // the container to use + masterServiceNamespace string // the namespace to read master service info from + nilLister bool // whether the lister should be nil + expectedEnvs util.StringSet // a set of expected environment vars + expectedEnvSize int // total number of expected env vars + }{ + { + "api server = Y, kubelet = Y", + "test1", + &api.Container{ + Env: []api.EnvVar{ + {Name: "FOO", Value: "BAR"}, + {Name: "TEST_SERVICE_HOST", Value: "1.2.3.3"}, + {Name: "TEST_SERVICE_PORT", Value: "8083"}, + {Name: "TEST_PORT", Value: "tcp://1.2.3.3:8083"}, + {Name: "TEST_PORT_8083_TCP", Value: "tcp://1.2.3.3:8083"}, + {Name: "TEST_PORT_8083_TCP_PROTO", Value: "tcp"}, + {Name: "TEST_PORT_8083_TCP_PORT", Value: "8083"}, + {Name: "TEST_PORT_8083_TCP_ADDR", Value: "1.2.3.3"}, + }, + }, + api.NamespaceDefault, + false, + util.NewStringSet("FOO=BAR", + "TEST_SERVICE_HOST=1.2.3.3", + "TEST_SERVICE_PORT=8083", + "TEST_PORT=tcp://1.2.3.3:8083", + "TEST_PORT_8083_TCP=tcp://1.2.3.3:8083", + "TEST_PORT_8083_TCP_PROTO=tcp", + "TEST_PORT_8083_TCP_PORT=8083", + "TEST_PORT_8083_TCP_ADDR=1.2.3.3", + "KUBERNETES_SERVICE_HOST=1.2.3.1", + "KUBERNETES_SERVICE_PORT=8081", + "KUBERNETES_PORT=tcp://1.2.3.1:8081", + "KUBERNETES_PORT_8081_TCP=tcp://1.2.3.1:8081", + "KUBERNETES_PORT_8081_TCP_PROTO=tcp", + "KUBERNETES_PORT_8081_TCP_PORT=8081", + "KUBERNETES_PORT_8081_TCP_ADDR=1.2.3.1", + "KUBERNETES_RO_SERVICE_HOST=1.2.3.2", + "KUBERNETES_RO_SERVICE_PORT=8082", + "KUBERNETES_RO_PORT=tcp://1.2.3.2:8082", + "KUBERNETES_RO_PORT_8082_TCP=tcp://1.2.3.2:8082", + "KUBERNETES_RO_PORT_8082_TCP_PROTO=tcp", + "KUBERNETES_RO_PORT_8082_TCP_PORT=8082", + "KUBERNETES_RO_PORT_8082_TCP_ADDR=1.2.3.2"), + 22, + }, + { + "api server = Y, kubelet = N", + "test1", + &api.Container{ + Env: []api.EnvVar{ + {Name: "FOO", Value: "BAR"}, + {Name: "TEST_SERVICE_HOST", Value: "1.2.3.3"}, + {Name: "TEST_SERVICE_PORT", Value: "8083"}, + {Name: "TEST_PORT", Value: "tcp://1.2.3.3:8083"}, + {Name: "TEST_PORT_8083_TCP", Value: "tcp://1.2.3.3:8083"}, + {Name: "TEST_PORT_8083_TCP_PROTO", Value: "tcp"}, + {Name: "TEST_PORT_8083_TCP_PORT", Value: "8083"}, + {Name: "TEST_PORT_8083_TCP_ADDR", Value: "1.2.3.3"}, + }, + }, + api.NamespaceDefault, + true, + util.NewStringSet("FOO=BAR", + "TEST_SERVICE_HOST=1.2.3.3", + "TEST_SERVICE_PORT=8083", + "TEST_PORT=tcp://1.2.3.3:8083", + "TEST_PORT_8083_TCP=tcp://1.2.3.3:8083", + "TEST_PORT_8083_TCP_PROTO=tcp", + "TEST_PORT_8083_TCP_PORT=8083", + "TEST_PORT_8083_TCP_ADDR=1.2.3.3"), + 8, + }, + { + "api server = N; kubelet = Y", + "test1", + &api.Container{ + Env: []api.EnvVar{ + {Name: "FOO", Value: "BAZ"}, + }, + }, + api.NamespaceDefault, + false, + util.NewStringSet("FOO=BAZ", + "TEST_SERVICE_HOST=1.2.3.3", + "TEST_SERVICE_PORT=8083", + "TEST_PORT=tcp://1.2.3.3:8083", + "TEST_PORT_8083_TCP=tcp://1.2.3.3:8083", + "TEST_PORT_8083_TCP_PROTO=tcp", + "TEST_PORT_8083_TCP_PORT=8083", + "TEST_PORT_8083_TCP_ADDR=1.2.3.3", + "KUBERNETES_SERVICE_HOST=1.2.3.1", + "KUBERNETES_SERVICE_PORT=8081", + "KUBERNETES_PORT=tcp://1.2.3.1:8081", + "KUBERNETES_PORT_8081_TCP=tcp://1.2.3.1:8081", + "KUBERNETES_PORT_8081_TCP_PROTO=tcp", + "KUBERNETES_PORT_8081_TCP_PORT=8081", + "KUBERNETES_PORT_8081_TCP_ADDR=1.2.3.1", + "KUBERNETES_RO_SERVICE_HOST=1.2.3.2", + "KUBERNETES_RO_SERVICE_PORT=8082", + "KUBERNETES_RO_PORT=tcp://1.2.3.2:8082", + "KUBERNETES_RO_PORT_8082_TCP=tcp://1.2.3.2:8082", + "KUBERNETES_RO_PORT_8082_TCP_PROTO=tcp", + "KUBERNETES_RO_PORT_8082_TCP_PORT=8082", + "KUBERNETES_RO_PORT_8082_TCP_ADDR=1.2.3.2"), + 22, + }, + { + "master service in pod ns", + "test2", + &api.Container{ + Env: []api.EnvVar{ + {Name: "FOO", Value: "ZAP"}, + }, + }, + "kubernetes", + false, + util.NewStringSet("FOO=ZAP", + "TEST_SERVICE_HOST=1.2.3.5", + "TEST_SERVICE_PORT=8085", + "TEST_PORT=tcp://1.2.3.5:8085", + "TEST_PORT_8085_TCP=tcp://1.2.3.5:8085", + "TEST_PORT_8085_TCP_PROTO=tcp", + "TEST_PORT_8085_TCP_PORT=8085", + "TEST_PORT_8085_TCP_ADDR=1.2.3.5", + "KUBERNETES_SERVICE_HOST=1.2.3.4", + "KUBERNETES_SERVICE_PORT=8084", + "KUBERNETES_PORT=tcp://1.2.3.4:8084", + "KUBERNETES_PORT_8084_TCP=tcp://1.2.3.4:8084", + "KUBERNETES_PORT_8084_TCP_PROTO=tcp", + "KUBERNETES_PORT_8084_TCP_PORT=8084", + "KUBERNETES_PORT_8084_TCP_ADDR=1.2.3.4", + "KUBERNETES_RO_SERVICE_HOST=1.2.3.7", + "KUBERNETES_RO_SERVICE_PORT=8087", + "KUBERNETES_RO_PORT=tcp://1.2.3.7:8087", + "KUBERNETES_RO_PORT_8087_TCP=tcp://1.2.3.7:8087", + "KUBERNETES_RO_PORT_8087_TCP_PROTO=tcp", + "KUBERNETES_RO_PORT_8087_TCP_PORT=8087", + "KUBERNETES_RO_PORT_8087_TCP_ADDR=1.2.3.7"), + 22, + }, + { + "pod in master service ns", + "kubernetes", + &api.Container{}, + "kubernetes", + false, + util.NewStringSet( + "NOT_SPECIAL_SERVICE_HOST=1.2.3.8", + "NOT_SPECIAL_SERVICE_PORT=8088", + "NOT_SPECIAL_PORT=tcp://1.2.3.8:8088", + "NOT_SPECIAL_PORT_8088_TCP=tcp://1.2.3.8:8088", + "NOT_SPECIAL_PORT_8088_TCP_PROTO=tcp", + "NOT_SPECIAL_PORT_8088_TCP_PORT=8088", + "NOT_SPECIAL_PORT_8088_TCP_ADDR=1.2.3.8", + "KUBERNETES_SERVICE_HOST=1.2.3.6", + "KUBERNETES_SERVICE_PORT=8086", + "KUBERNETES_PORT=tcp://1.2.3.6:8086", + "KUBERNETES_PORT_8086_TCP=tcp://1.2.3.6:8086", + "KUBERNETES_PORT_8086_TCP_PROTO=tcp", + "KUBERNETES_PORT_8086_TCP_PORT=8086", + "KUBERNETES_PORT_8086_TCP_ADDR=1.2.3.6", + "KUBERNETES_RO_SERVICE_HOST=1.2.3.7", + "KUBERNETES_RO_SERVICE_PORT=8087", + "KUBERNETES_RO_PORT=tcp://1.2.3.7:8087", + "KUBERNETES_RO_PORT_8087_TCP=tcp://1.2.3.7:8087", + "KUBERNETES_RO_PORT_8087_TCP_PROTO=tcp", + "KUBERNETES_RO_PORT_8087_TCP_PORT=8087", + "KUBERNETES_RO_PORT_8087_TCP_ADDR=1.2.3.7"), + 21, + }, + } + + for _, tc := range testCases { + kl, _, _ := newTestKubelet(t) + kl.masterServiceNamespace = tc.masterServiceNamespace + if tc.nilLister { + kl.serviceLister = nil + } else { + kl.serviceLister = testServiceLister{services} + } + + result, err := kl.makeEnvironmentVariables(tc.ns, tc.container) + if err != nil { + t.Errorf("[%v] Unexpected error: %v", tc.name, err) + } + + resultSet := util.NewStringSet(result...) + if !resultSet.IsSuperset(tc.expectedEnvs) { + t.Errorf("[%v] Unexpected env entries; expected {%v}, got {%v}", tc.name, tc.expectedEnvs, resultSet) + } + + if a := len(resultSet); a != tc.expectedEnvSize { + t.Errorf("[%v] Unexpected number of env vars; expected %v, got %v", tc.name, tc.expectedEnvSize, a) + } + } +} diff --git a/pkg/master/master.go b/pkg/master/master.go index 40af845a25d..259a113c1a3 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -62,23 +62,24 @@ import ( // Config is a structure used to configure a Master. type Config struct { - Client *client.Client - Cloud cloudprovider.Interface - EtcdHelper tools.EtcdHelper - HealthCheckMinions bool - EventTTL time.Duration - MinionRegexp string - KubeletClient client.KubeletClient - PortalNet *net.IPNet - EnableLogsSupport bool - EnableUISupport bool - EnableSwaggerSupport bool - EnableV1Beta3 bool - APIPrefix string - CorsAllowedOriginList util.StringList - Authenticator authenticator.Request - Authorizer authorizer.Authorizer - AdmissionControl admission.Interface + Client *client.Client + Cloud cloudprovider.Interface + EtcdHelper tools.EtcdHelper + HealthCheckMinions bool + EventTTL time.Duration + MinionRegexp string + KubeletClient client.KubeletClient + PortalNet *net.IPNet + EnableLogsSupport bool + EnableUISupport bool + EnableSwaggerSupport bool + EnableV1Beta3 bool + APIPrefix string + CorsAllowedOriginList util.StringList + Authenticator authenticator.Request + Authorizer authorizer.Authorizer + AdmissionControl admission.Interface + MasterServiceNamespace string // If specified, all web services will be registered into this container RestfulContainer *restful.Container @@ -231,7 +232,8 @@ func New(c *Config) *Master { minionRegistry := makeMinionRegistry(c) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) boundPodFactory := &pod.BasicBoundPodFactory{ - ServiceRegistry: serviceRegistry, + ServiceRegistry: serviceRegistry, + MasterServiceNamespace: c.MasterServiceNamespace, } if c.KubeletClient == nil { glog.Fatalf("master.New() called with config.KubeletClient == nil") diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index ef6647eb6b1..8a30c7c5d45 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -130,7 +130,8 @@ func TestEtcdCreatePod(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) err := registry.CreatePod(ctx, &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "foo", + Name: "foo", + Namespace: api.NamespaceDefault, }, Spec: api.PodSpec{ Containers: []api.Container{ @@ -240,7 +241,8 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) err := registry.CreatePod(ctx, &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "foo", + Name: "foo", + Namespace: api.NamespaceDefault, }, }) if err != nil { @@ -282,7 +284,8 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) err := registry.CreatePod(ctx, &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "foo", + Name: "foo", + Namespace: api.NamespaceDefault, }, Spec: api.PodSpec{ Containers: []api.Container{ @@ -346,7 +349,8 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) err := registry.CreatePod(ctx, &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "foo", + Name: "foo", + Namespace: api.NamespaceDefault, }, Spec: api.PodSpec{ Containers: []api.Container{ diff --git a/pkg/registry/pod/bound_pod_factory.go b/pkg/registry/pod/bound_pod_factory.go index 5cd915874d6..2664aaa4438 100644 --- a/pkg/registry/pod/bound_pod_factory.go +++ b/pkg/registry/pod/bound_pod_factory.go @@ -20,6 +20,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) type BoundPodFactory interface { @@ -29,25 +30,49 @@ type BoundPodFactory interface { type BasicBoundPodFactory struct { // TODO: this should really point at the API rather than a registry - ServiceRegistry service.Registry + ServiceRegistry service.Registry + MasterServiceNamespace string } -// getServiceEnvironmentVariables populates a list of environment variables that are use +var masterServiceNames = util.NewStringSet("kubernetes", "kubernetes-ro") + +// getServiceEnvironmentVariables populates a list of environment variables that are used // in the container environment to get access to services. -func getServiceEnvironmentVariables(ctx api.Context, registry service.Registry, machine string) ([]api.EnvVar, error) { +func (b *BasicBoundPodFactory) getServiceEnvironmentVariables(ctx api.Context, registry service.Registry, machine string) ([]api.EnvVar, error) { var result []api.EnvVar - services, err := registry.ListServices(ctx) + servicesInNs, err := registry.ListServices(ctx) if err != nil { return result, err } - return envvars.FromServices(services), nil + + masterServices, err := registry.ListServices(api.WithNamespace(api.NewContext(), b.MasterServiceNamespace)) + if err != nil { + return result, err + } + + projection := map[string]api.Service{} + services := []api.Service{} + for _, service := range masterServices.Items { + if masterServiceNames.Has(service.Name) { + projection[service.Name] = service + } + } + for _, service := range servicesInNs.Items { + projection[service.Name] = service + } + for _, service := range projection { + services = append(services, service) + } + + return envvars.FromServices(&api.ServiceList{Items: services}), nil } func (b *BasicBoundPodFactory) MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) { - envVars, err := getServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine) + envVars, err := b.getServiceEnvironmentVariables(api.WithNamespace(api.NewContext(), pod.Namespace), b.ServiceRegistry, machine) if err != nil { return nil, err } + boundPod := &api.BoundPod{} if err := api.Scheme.Convert(pod, boundPod); err != nil { return nil, err diff --git a/pkg/registry/pod/bound_pod_factory_test.go b/pkg/registry/pod/bound_pod_factory_test.go index c0ec3000822..84323bd16a2 100644 --- a/pkg/registry/pod/bound_pod_factory_test.go +++ b/pkg/registry/pod/bound_pod_factory_test.go @@ -22,13 +22,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func TestMakeBoundPodNoServices(t *testing.T) { registry := registrytest.ServiceRegistry{} factory := &BasicBoundPodFactory{ - ServiceRegistry: ®istry, + ServiceRegistry: ®istry, + MasterServiceNamespace: api.NamespaceDefault, } pod, err := factory.MakeBoundPod("machine", &api.Pod{ @@ -63,13 +63,9 @@ func TestMakeBoundPodServices(t *testing.T) { List: api.ServiceList{ Items: []api.Service{ { - ObjectMeta: api.ObjectMeta{Name: "test"}, + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"}, Spec: api.ServiceSpec{ - Port: 8080, - ContainerPort: util.IntOrString{ - Kind: util.IntstrInt, - IntVal: 900, - }, + Port: 8080, PortalIP: "1.2.3.4", }, }, @@ -77,11 +73,12 @@ func TestMakeBoundPodServices(t *testing.T) { }, } factory := &BasicBoundPodFactory{ - ServiceRegistry: ®istry, + ServiceRegistry: ®istry, + MasterServiceNamespace: api.NamespaceDefault, } pod, err := factory.MakeBoundPod("machine", &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foobar"}, + ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"}, Spec: api.PodSpec{ Containers: []api.Container{ { @@ -140,13 +137,9 @@ func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) { List: api.ServiceList{ Items: []api.Service{ { - ObjectMeta: api.ObjectMeta{Name: "test"}, + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"}, Spec: api.ServiceSpec{ - Port: 8080, - ContainerPort: util.IntOrString{ - Kind: util.IntstrInt, - IntVal: 900, - }, + Port: 8080, PortalIP: "1.2.3.4", }, }, @@ -154,10 +147,12 @@ func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) { }, } factory := &BasicBoundPodFactory{ - ServiceRegistry: ®istry, + ServiceRegistry: ®istry, + MasterServiceNamespace: api.NamespaceDefault, } pod, err := factory.MakeBoundPod("machine", &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"}, Spec: api.PodSpec{ Containers: []api.Container{ { @@ -220,3 +215,238 @@ func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) { } } } + +func TestMakeBoundPodOnlyVisibleServices(t *testing.T) { + registry := registrytest.ServiceRegistry{ + List: api.ServiceList{ + Items: []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{ + Port: 8080, + PortalIP: "1.2.3.4", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"}, + Spec: api.ServiceSpec{ + Port: 8081, + PortalIP: "1.2.3.5", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "test3", Namespace: "test"}, + Spec: api.ServiceSpec{ + Port: 8083, + PortalIP: "1.2.3.7", + }, + }, + }, + }, + } + factory := &BasicBoundPodFactory{ + ServiceRegistry: ®istry, + MasterServiceNamespace: api.NamespaceDefault, + } + + pod, err := factory.MakeBoundPod("machine", &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "foo", + }, + }, + }, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + container := pod.Spec.Containers[0] + envs := map[string]string{ + "TEST_SERVICE_HOST": "1.2.3.5", + "TEST_SERVICE_PORT": "8081", + "TEST_PORT": "tcp://1.2.3.5:8081", + "TEST_PORT_8081_TCP": "tcp://1.2.3.5:8081", + "TEST_PORT_8081_TCP_PROTO": "tcp", + "TEST_PORT_8081_TCP_PORT": "8081", + "TEST_PORT_8081_TCP_ADDR": "1.2.3.5", + "TEST3_SERVICE_HOST": "1.2.3.7", + "TEST3_SERVICE_PORT": "8083", + "TEST3_PORT": "tcp://1.2.3.7:8083", + "TEST3_PORT_8083_TCP": "tcp://1.2.3.7:8083", + "TEST3_PORT_8083_TCP_PROTO": "tcp", + "TEST3_PORT_8083_TCP_PORT": "8083", + "TEST3_PORT_8083_TCP_ADDR": "1.2.3.7", + } + + if len(container.Env) != len(envs) { + t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod) + } + for _, env := range container.Env { + expectedValue := envs[env.Name] + if expectedValue != env.Value { + t.Errorf("expected env %v value %v, got %v", env.Name, expectedValue, env.Value) + } + } +} + +func TestMakeBoundPodMasterServices(t *testing.T) { + registry := registrytest.ServiceRegistry{ + List: api.ServiceList{ + Items: []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{ + Port: 8080, + PortalIP: "1.2.3.4", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"}, + Spec: api.ServiceSpec{ + Port: 8081, + PortalIP: "1.2.3.5", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "test3", Namespace: "test"}, + Spec: api.ServiceSpec{ + Port: 8083, + PortalIP: "1.2.3.7", + }, + }, + }, + }, + } + factory := &BasicBoundPodFactory{ + ServiceRegistry: ®istry, + MasterServiceNamespace: api.NamespaceDefault, + } + + pod, err := factory.MakeBoundPod("machine", &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "foo", + }, + }, + }, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + container := pod.Spec.Containers[0] + envs := map[string]string{ + "TEST_SERVICE_HOST": "1.2.3.5", + "TEST_SERVICE_PORT": "8081", + "TEST_PORT": "tcp://1.2.3.5:8081", + "TEST_PORT_8081_TCP": "tcp://1.2.3.5:8081", + "TEST_PORT_8081_TCP_PROTO": "tcp", + "TEST_PORT_8081_TCP_PORT": "8081", + "TEST_PORT_8081_TCP_ADDR": "1.2.3.5", + "TEST3_SERVICE_HOST": "1.2.3.7", + "TEST3_SERVICE_PORT": "8083", + "TEST3_PORT": "tcp://1.2.3.7:8083", + "TEST3_PORT_8083_TCP": "tcp://1.2.3.7:8083", + "TEST3_PORT_8083_TCP_PROTO": "tcp", + "TEST3_PORT_8083_TCP_PORT": "8083", + "TEST3_PORT_8083_TCP_ADDR": "1.2.3.7", + "KUBERNETES_SERVICE_HOST": "1.2.3.4", + "KUBERNETES_SERVICE_PORT": "8080", + "KUBERNETES_PORT": "tcp://1.2.3.4:8080", + "KUBERNETES_PORT_8080_TCP": "tcp://1.2.3.4:8080", + "KUBERNETES_PORT_8080_TCP_PROTO": "tcp", + "KUBERNETES_PORT_8080_TCP_PORT": "8080", + "KUBERNETES_PORT_8080_TCP_ADDR": "1.2.3.4", + } + + if len(container.Env) != len(envs) { + t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod) + } + for _, env := range container.Env { + expectedValue := envs[env.Name] + if expectedValue != env.Value { + t.Errorf("expected env %v value %v, got %v", env.Name, expectedValue, env.Value) + } + } +} + +func TestMakeBoundPodMasterServiceInNs(t *testing.T) { + registry := registrytest.ServiceRegistry{ + List: api.ServiceList{ + Items: []api.Service{ + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{ + Port: 8080, + PortalIP: "1.2.3.4", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"}, + Spec: api.ServiceSpec{ + Port: 8081, + PortalIP: "1.2.3.5", + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "test"}, + Spec: api.ServiceSpec{ + Port: 8083, + PortalIP: "1.2.3.7", + }, + }, + }, + }, + } + factory := &BasicBoundPodFactory{ + ServiceRegistry: ®istry, + MasterServiceNamespace: api.NamespaceDefault, + } + + pod, err := factory.MakeBoundPod("machine", &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"}, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "foo", + }, + }, + }, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + container := pod.Spec.Containers[0] + envs := map[string]string{ + "TEST_SERVICE_HOST": "1.2.3.5", + "TEST_SERVICE_PORT": "8081", + "TEST_PORT": "tcp://1.2.3.5:8081", + "TEST_PORT_8081_TCP": "tcp://1.2.3.5:8081", + "TEST_PORT_8081_TCP_PROTO": "tcp", + "TEST_PORT_8081_TCP_PORT": "8081", + "TEST_PORT_8081_TCP_ADDR": "1.2.3.5", + "KUBERNETES_SERVICE_HOST": "1.2.3.7", + "KUBERNETES_SERVICE_PORT": "8083", + "KUBERNETES_PORT": "tcp://1.2.3.7:8083", + "KUBERNETES_PORT_8083_TCP": "tcp://1.2.3.7:8083", + "KUBERNETES_PORT_8083_TCP_PROTO": "tcp", + "KUBERNETES_PORT_8083_TCP_PORT": "8083", + "KUBERNETES_PORT_8083_TCP_ADDR": "1.2.3.7", + } + + if len(container.Env) != len(envs) { + t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod) + } + for _, env := range container.Env { + expectedValue := envs[env.Name] + if expectedValue != env.Value { + t.Errorf("expected env %v value %v, got %v", env.Name, expectedValue, env.Value) + } + } +} diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 02f10c3aa2a..ec0d393b3eb 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -45,10 +45,23 @@ func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error r.mu.Lock() defer r.mu.Unlock() - // Return by copy to avoid data races + ns, _ := api.NamespaceFrom(ctx) + + // Copy metadata from internal list into result res := new(api.ServiceList) - *res = r.List - res.Items = append([]api.Service{}, r.List.Items...) + res.TypeMeta = r.List.TypeMeta + res.ListMeta = r.List.ListMeta + + if ns != api.NamespaceAll { + for _, service := range r.List.Items { + if ns == service.Namespace { + res.Items = append(res.Items, service) + } + } + } else { + res.Items = append([]api.Service{}, r.List.Items...) + } + return res, r.Err } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 36f16eeb983..ab288b0ae91 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -347,13 +347,13 @@ func TestServiceRegistryList(t *testing.T) { machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) registry.CreateService(ctx, &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, }, }) registry.CreateService(ctx, &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo2"}, + ObjectMeta: api.ObjectMeta{Name: "foo2", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar2": "baz2"}, }, @@ -585,7 +585,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { rest1.portalMgr.randomAttempts = 0 svc := &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, Port: 6502, @@ -595,7 +595,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { c, _ := rest1.Create(ctx, svc) <-c svc = &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, Port: 6502, @@ -609,7 +609,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) { rest2.portalMgr.randomAttempts = 0 svc = &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, Port: 6502, diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 2e42a4b6174..36da0e6deac 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -83,7 +83,7 @@ func GetAPIServerClient(authPath string, apiServerList util.StringList) (*client } // RunApiServer starts an API server in a go routine. -func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, port int) { +func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, port int, masterServiceNamespace string) { handler := delegateHandler{} helper, err := master.NewEtcdHelper(etcdClient, "") @@ -104,9 +104,10 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p APIPrefix: "/api", Authorizer: apiserver.NewAlwaysAllowAuthorizer(), - ReadWritePort: port, - ReadOnlyPort: port, - PublicAddress: addr, + ReadWritePort: port, + ReadOnlyPort: port, + PublicAddress: addr, + MasterServiceNamespace: masterServiceNamespace, }) handler.delegate = m.InsecureHandler @@ -144,7 +145,12 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, // SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient. // Under the hood it calls RunKubelet (below) -func SimpleRunKubelet(client *client.Client, etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) { +func SimpleRunKubelet(client *client.Client, + etcdClient tools.EtcdClient, + dockerClient dockertools.DockerInterface, + hostname, rootDir, manifestURL, address string, + port uint, + masterServiceNamespace string) { kcfg := KubeletConfig{ KubeClient: client, EtcdClient: etcdClient, @@ -160,6 +166,7 @@ func SimpleRunKubelet(client *client.Client, etcdClient tools.EtcdClient, docker SyncFrequency: 3 * time.Second, MinimumGCAge: 10 * time.Second, MaxContainerCount: 5, + MasterServiceNamespace: masterServiceNamespace, } RunKubelet(&kcfg) } @@ -255,6 +262,7 @@ type KubeletConfig struct { EnableDebuggingHandlers bool Port uint Runonce bool + MasterServiceNamespace string } func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) { @@ -275,7 +283,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.MaxContainerCount, pc.IsSourceSeen, kc.ClusterDomain, - net.IP(kc.ClusterDNS)) + net.IP(kc.ClusterDNS), + kc.MasterServiceNamespace) if err != nil { return nil, err