Merge pull request #3331 from pmorie/services

Service visibility w/in namespaces, master services, set env vars in kubelet
This commit is contained in:
Eric Tune 2015-01-14 15:01:05 -08:00
commit 88c68e0349
14 changed files with 790 additions and 140 deletions

View File

@ -192,13 +192,13 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Kubelet (localhost) // Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.") testRootDir := makeTempDirOrDie("kubelet_integ_1.")
glog.Infof("Using %s as root dir for kubelet #1", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250) standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault)
// Kubelet (machine) // Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both // Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule. // have a place they can schedule.
testRootDir = makeTempDirOrDie("kubelet_integ_2.") testRootDir = makeTempDirOrDie("kubelet_integ_2.")
glog.Infof("Using %s as root dir for kubelet #2", testRootDir) glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251) standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault)
return apiServer.URL return apiServer.URL
} }

View File

@ -28,6 +28,7 @@ import (
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/admission" "github.com/GoogleCloudPlatform/kubernetes/pkg/admission"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -87,6 +88,7 @@ var (
Port: 10250, Port: 10250,
EnableHttps: false, EnableHttps: false,
} }
masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods")
) )
func init() { func init() {
@ -177,25 +179,26 @@ func main() {
admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, *admissionControlConfigFile) admissionController := admission.NewFromPlugins(client, admissionControlPluginNames, *admissionControlConfigFile)
config := &master.Config{ config := &master.Config{
Client: client, Client: client,
Cloud: cloud, Cloud: cloud,
EtcdHelper: helper, EtcdHelper: helper,
HealthCheckMinions: *healthCheckMinions, HealthCheckMinions: *healthCheckMinions,
EventTTL: *eventTTL, EventTTL: *eventTTL,
KubeletClient: kubeletClient, KubeletClient: kubeletClient,
PortalNet: &n, PortalNet: &n,
EnableLogsSupport: *enableLogsSupport, EnableLogsSupport: *enableLogsSupport,
EnableUISupport: true, EnableUISupport: true,
EnableSwaggerSupport: true, EnableSwaggerSupport: true,
APIPrefix: *apiPrefix, APIPrefix: *apiPrefix,
CorsAllowedOriginList: corsAllowedOriginList, CorsAllowedOriginList: corsAllowedOriginList,
ReadOnlyPort: *readOnlyPort, ReadOnlyPort: *readOnlyPort,
ReadWritePort: *port, ReadWritePort: *port,
PublicAddress: *publicAddressOverride, PublicAddress: *publicAddressOverride,
Authenticator: authenticator, Authenticator: authenticator,
Authorizer: authorizer, Authorizer: authorizer,
AdmissionControl: admissionController, AdmissionControl: admissionController,
EnableV1Beta3: v1beta3, EnableV1Beta3: v1beta3,
MasterServiceNamespace: *masterServiceNamespace,
} }
m := master.New(config) m := master.New(config)

View File

@ -26,6 +26,7 @@ import (
"net" "net"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
@ -64,6 +65,7 @@ var (
oomScoreAdj = flag.Int("oom_score_adj", -900, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]") oomScoreAdj = flag.Int("oom_score_adj", -900, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]")
apiServerList util.StringList apiServerList util.StringList
clusterDomain = flag.String("cluster_domain", "", "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") clusterDomain = flag.String("cluster_domain", "", "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains")
masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods")
clusterDNS = util.IP(nil) clusterDNS = util.IP(nil)
) )
@ -130,6 +132,7 @@ func main() {
DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint), DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint),
KubeClient: client, KubeClient: client,
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile), EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
MasterServiceNamespace: *masterServiceNamespace,
} }
standalone.RunKubelet(&kcfg) standalone.RunKubelet(&kcfg)

View File

@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone" "github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
@ -40,19 +41,20 @@ var (
dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with")
etcdServer = flag.String("etcd_server", "http://localhost:4001", "If non-empty, path to the set of etcd server to use") etcdServer = flag.String("etcd_server", "http://localhost:4001", "If non-empty, path to the set of etcd server to use")
// TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: Discover these by pinging the host machines, and rip out these flags.
nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
nodeMemory = flag.Int64("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") nodeMemory = flag.Int64("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods")
) )
func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string, port int) { func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string, port int) {
machineList := []string{"localhost"} machineList := []string{"localhost"}
standalone.RunApiServer(cl, etcdClient, addr, port) standalone.RunApiServer(cl, etcdClient, addr, port, *masterServiceNamespace)
standalone.RunScheduler(cl) standalone.RunScheduler(cl)
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint) dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250) standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace)
} }
func newApiClient(addr string, port int) *client.Client { func newApiClient(addr string, port int) *client.Client {

View File

@ -878,7 +878,10 @@ func TestValidateService(t *testing.T) {
}, },
existing: api.ServiceList{ existing: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
{Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true}}, {
ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true},
},
}, },
}, },
numErrs: 1, numErrs: 1,
@ -895,7 +898,10 @@ func TestValidateService(t *testing.T) {
}, },
existing: api.ServiceList{ existing: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
{Spec: api.ServiceSpec{Port: 80}}, {
ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{Port: 80},
},
}, },
}, },
numErrs: 0, numErrs: 0,
@ -911,7 +917,10 @@ func TestValidateService(t *testing.T) {
}, },
existing: api.ServiceList{ existing: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
{Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true}}, {
ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{Port: 80, CreateExternalLoadBalancer: true},
},
}, },
}, },
numErrs: 0, numErrs: 0,
@ -927,7 +936,10 @@ func TestValidateService(t *testing.T) {
}, },
existing: api.ServiceList{ existing: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
{Spec: api.ServiceSpec{Port: 80}}, {
ObjectMeta: api.ObjectMeta{Name: "def123", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{Port: 80},
},
}, },
}, },
numErrs: 0, numErrs: 0,

View File

@ -34,9 +34,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health" "github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
@ -76,7 +79,8 @@ func NewMainKubelet(
maxContainerCount int, maxContainerCount int,
sourceReady SourceReadyFn, sourceReady SourceReadyFn,
clusterDomain string, clusterDomain string,
clusterDNS net.IP) (*Kubelet, error) { clusterDNS net.IP,
masterServiceNamespace string) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} }
@ -86,24 +90,31 @@ func NewMainKubelet(
if minimumGCAge <= 0 { if minimumGCAge <= 0 {
return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge) return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge)
} }
serviceStore := cache.NewStore()
cache.NewReflector(&cache.ListWatch{kubeClient, labels.Everything(), "services", api.NamespaceAll}, &api.Service{}, serviceStore).Run()
serviceLister := &cache.StoreToServiceLister{serviceStore}
klet := &Kubelet{ klet := &Kubelet{
hostname: hostname, hostname: hostname,
dockerClient: dockerClient, dockerClient: dockerClient,
etcdClient: etcdClient, etcdClient: etcdClient,
rootDirectory: rootDirectory, rootDirectory: rootDirectory,
resyncInterval: resyncInterval, resyncInterval: resyncInterval,
networkContainerImage: networkContainerImage, networkContainerImage: networkContainerImage,
podWorkers: newPodWorkers(), podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{}, dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient), runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{}, httpClient: &http.Client{},
pullQPS: pullQPS, pullQPS: pullQPS,
pullBurst: pullBurst, pullBurst: pullBurst,
minimumGCAge: minimumGCAge, minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount, maxContainerCount: maxContainerCount,
sourceReady: sourceReady, sourceReady: sourceReady,
clusterDomain: clusterDomain, clusterDomain: clusterDomain,
clusterDNS: clusterDNS, clusterDNS: clusterDNS,
serviceLister: serviceLister,
masterServiceNamespace: masterServiceNamespace,
} }
if err := klet.setupDataDirs(); err != nil { if err := klet.setupDataDirs(); err != nil {
@ -117,10 +128,15 @@ type httpGetter interface {
Get(url string) (*http.Response, error) Get(url string) (*http.Response, error)
} }
type serviceLister interface {
List() (api.ServiceList, error)
}
// Kubelet is the main kubelet implementation. // Kubelet is the main kubelet implementation.
type Kubelet struct { type Kubelet struct {
hostname string hostname string
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
kubeClient *client.Client
rootDirectory string rootDirectory string
networkContainerImage string networkContainerImage string
podWorkers *podWorkers podWorkers *podWorkers
@ -168,6 +184,9 @@ type Kubelet struct {
// If non-nil, use this for container DNS server. // If non-nil, use this for container DNS server.
clusterDNS net.IP clusterDNS net.IP
masterServiceNamespace string
serviceLister serviceLister
} }
// GetRootDir returns the full path to the directory under which kubelet can // GetRootDir returns the full path to the directory under which kubelet can
@ -423,14 +442,6 @@ func (self *podWorkers) Run(podFullName string, action func()) {
}() }()
} }
func makeEnvironmentVariables(container *api.Container) []string {
var result []string
for _, value := range container.Env {
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
}
return result
}
func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{} binds := []string{}
for _, mount := range container.VolumeMounts { for _, mount := range container.VolumeMounts {
@ -607,7 +618,10 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
} }
envVariables := makeEnvironmentVariables(container) envVariables, err := kl.makeEnvironmentVariables(pod.Namespace, container)
if err != nil {
return "", err
}
binds := makeBinds(pod, container, podVolumes) binds := makeBinds(pod, container, podVolumes)
exposedPorts, portBindings := makePortsAndBindings(container) exposedPorts, portBindings := makePortsAndBindings(container)
@ -692,6 +706,91 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
return dockertools.DockerID(dockerContainer.ID), err return dockertools.DockerID(dockerContainer.ID), err
} }
var masterServices = util.NewStringSet("kubernetes", "kubernetes-ro")
// getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
var (
serviceMap = make(map[string]api.Service)
m = make(map[string]string)
)
// Get all service resources from the master (via a cache),
// and populate them into service enviroment variables.
if kl.serviceLister == nil {
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
return m, nil
}
services, err := kl.serviceLister.List()
if err != nil {
return m, fmt.Errorf("Failed to list services when setting up env vars.")
}
// project the services in namespace ns onto the master services
for _, service := range services.Items {
serviceName := service.Name
switch service.Namespace {
// for the case whether the master service namespace is the namespace the pod
// is in, pod should receive all the pods in the namespace.
//
// ordering of the case clauses below enforces this
case ns:
serviceMap[serviceName] = service
case kl.masterServiceNamespace:
if masterServices.Has(serviceName) {
_, exists := serviceMap[serviceName]
if !exists {
serviceMap[serviceName] = service
}
}
}
}
services.Items = []api.Service{}
for _, service := range serviceMap {
services.Items = append(services.Items, service)
}
for _, e := range envvars.FromServices(&services) {
m[e.Name] = e.Value
}
return m, nil
}
// Make the service environment variables for a pod in the given namespace.
func (kl *Kubelet) makeEnvironmentVariables(ns string, container *api.Container) ([]string, error) {
var result []string
// Note: These are added to the docker.Config, but are not included in the checksum computed
// by dockertools.BuildDockerName(...). That way, we can still determine whether an
// api.Container is already running by its hash. (We don't want to restart a container just
// because some service changed.)
//
// Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
// To avoid this users can: (1) wait between starting a service and starting; or (2) detect
// missing service env var and exit and be restarted; or (3) use DNS instead of env vars
// and keep trying to resolve the DNS name of the service (recommended).
serviceEnv, err := kl.getServiceEnvVarMap(ns)
if err != nil {
return result, err
}
for _, value := range container.Env {
// The code is in transition from using etcd+BoundPods to apiserver+Pods.
// So, the master may set service env vars, or kubelet may. In case both are doing
// it, we delete the key from the kubelet-generated ones so we don't have duplicate
// env vars.
// TODO: remove this net line once all platforms use apiserver+Pods.
delete(serviceEnv, value.Name)
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
}
// Append remaining service env vars.
for k, v := range serviceEnv {
result = append(result, fmt.Sprintf("%s=%s", k, v))
}
return result, nil
}
func (kl *Kubelet) applyClusterDNS(hc *docker.HostConfig, pod *api.BoundPod) error { func (kl *Kubelet) applyClusterDNS(hc *docker.HostConfig, pod *api.BoundPod) error {
// Get host DNS settings and append them to cluster DNS settings. // Get host DNS settings and append them to cluster DNS settings.
f, err := os.Open("/etc/resolv.conf") f, err := os.Open("/etc/resolv.conf")

View File

@ -58,6 +58,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools
kubelet.rootDirectory = "/tmp/kubelet" kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers() kubelet.podWorkers = newPodWorkers()
kubelet.sourceReady = func(source string) bool { return true } kubelet.sourceReady = func(source string) bool { return true }
return kubelet, fakeEtcdClient, fakeDocker return kubelet, fakeEtcdClient, fakeDocker
} }
@ -913,31 +914,6 @@ func TestSyncPodUnhealthy(t *testing.T) {
} }
} }
func TestMakeEnvVariables(t *testing.T) {
container := api.Container{
Env: []api.EnvVar{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "blah",
},
},
}
vars := makeEnvironmentVariables(&container)
if len(vars) != len(container.Env) {
t.Errorf("Vars don't match. Expected: %#v Found: %#v", container.Env, vars)
}
for ix, env := range container.Env {
value := fmt.Sprintf("%s=%s", env.Name, env.Value)
if value != vars[ix] {
t.Errorf("Unexpected value: %s. Expected: %s", vars[ix], value)
}
}
}
func TestMountExternalVolumes(t *testing.T) { func TestMountExternalVolumes(t *testing.T) {
kubelet, _, _ := newTestKubelet(t) kubelet, _, _ := newTestKubelet(t)
pod := api.BoundPod{ pod := api.BoundPod{
@ -1936,3 +1912,275 @@ func TestParseResolvConf(t *testing.T) {
} }
} }
} }
type testServiceLister struct {
services []api.Service
}
func (ls testServiceLister) List() (api.ServiceList, error) {
return api.ServiceList{
Items: ls.services,
}, nil
}
func TestMakeEnvironmentVariables(t *testing.T) {
services := []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Port: 8081,
PortalIP: "1.2.3.1",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes-ro", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Port: 8082,
PortalIP: "1.2.3.2",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test1"},
Spec: api.ServiceSpec{
Port: 8083,
PortalIP: "1.2.3.3",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "test2"},
Spec: api.ServiceSpec{
Port: 8084,
PortalIP: "1.2.3.4",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test2"},
Spec: api.ServiceSpec{
Port: 8085,
PortalIP: "1.2.3.5",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "kubernetes"},
Spec: api.ServiceSpec{
Port: 8086,
PortalIP: "1.2.3.6",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes-ro", Namespace: "kubernetes"},
Spec: api.ServiceSpec{
Port: 8087,
PortalIP: "1.2.3.7",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "not-special", Namespace: "kubernetes"},
Spec: api.ServiceSpec{
Port: 8088,
PortalIP: "1.2.3.8",
},
},
}
testCases := []struct {
name string // the name of the test case
ns string // the namespace to generate environment for
container *api.Container // the container to use
masterServiceNamespace string // the namespace to read master service info from
nilLister bool // whether the lister should be nil
expectedEnvs util.StringSet // a set of expected environment vars
expectedEnvSize int // total number of expected env vars
}{
{
"api server = Y, kubelet = Y",
"test1",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "BAR"},
{Name: "TEST_SERVICE_HOST", Value: "1.2.3.3"},
{Name: "TEST_SERVICE_PORT", Value: "8083"},
{Name: "TEST_PORT", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP_PROTO", Value: "tcp"},
{Name: "TEST_PORT_8083_TCP_PORT", Value: "8083"},
{Name: "TEST_PORT_8083_TCP_ADDR", Value: "1.2.3.3"},
},
},
api.NamespaceDefault,
false,
util.NewStringSet("FOO=BAR",
"TEST_SERVICE_HOST=1.2.3.3",
"TEST_SERVICE_PORT=8083",
"TEST_PORT=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP_PROTO=tcp",
"TEST_PORT_8083_TCP_PORT=8083",
"TEST_PORT_8083_TCP_ADDR=1.2.3.3",
"KUBERNETES_SERVICE_HOST=1.2.3.1",
"KUBERNETES_SERVICE_PORT=8081",
"KUBERNETES_PORT=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP_PROTO=tcp",
"KUBERNETES_PORT_8081_TCP_PORT=8081",
"KUBERNETES_PORT_8081_TCP_ADDR=1.2.3.1",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.2",
"KUBERNETES_RO_SERVICE_PORT=8082",
"KUBERNETES_RO_PORT=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8082_TCP_PORT=8082",
"KUBERNETES_RO_PORT_8082_TCP_ADDR=1.2.3.2"),
22,
},
{
"api server = Y, kubelet = N",
"test1",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "BAR"},
{Name: "TEST_SERVICE_HOST", Value: "1.2.3.3"},
{Name: "TEST_SERVICE_PORT", Value: "8083"},
{Name: "TEST_PORT", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP_PROTO", Value: "tcp"},
{Name: "TEST_PORT_8083_TCP_PORT", Value: "8083"},
{Name: "TEST_PORT_8083_TCP_ADDR", Value: "1.2.3.3"},
},
},
api.NamespaceDefault,
true,
util.NewStringSet("FOO=BAR",
"TEST_SERVICE_HOST=1.2.3.3",
"TEST_SERVICE_PORT=8083",
"TEST_PORT=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP_PROTO=tcp",
"TEST_PORT_8083_TCP_PORT=8083",
"TEST_PORT_8083_TCP_ADDR=1.2.3.3"),
8,
},
{
"api server = N; kubelet = Y",
"test1",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "BAZ"},
},
},
api.NamespaceDefault,
false,
util.NewStringSet("FOO=BAZ",
"TEST_SERVICE_HOST=1.2.3.3",
"TEST_SERVICE_PORT=8083",
"TEST_PORT=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP_PROTO=tcp",
"TEST_PORT_8083_TCP_PORT=8083",
"TEST_PORT_8083_TCP_ADDR=1.2.3.3",
"KUBERNETES_SERVICE_HOST=1.2.3.1",
"KUBERNETES_SERVICE_PORT=8081",
"KUBERNETES_PORT=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP_PROTO=tcp",
"KUBERNETES_PORT_8081_TCP_PORT=8081",
"KUBERNETES_PORT_8081_TCP_ADDR=1.2.3.1",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.2",
"KUBERNETES_RO_SERVICE_PORT=8082",
"KUBERNETES_RO_PORT=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8082_TCP_PORT=8082",
"KUBERNETES_RO_PORT_8082_TCP_ADDR=1.2.3.2"),
22,
},
{
"master service in pod ns",
"test2",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "ZAP"},
},
},
"kubernetes",
false,
util.NewStringSet("FOO=ZAP",
"TEST_SERVICE_HOST=1.2.3.5",
"TEST_SERVICE_PORT=8085",
"TEST_PORT=tcp://1.2.3.5:8085",
"TEST_PORT_8085_TCP=tcp://1.2.3.5:8085",
"TEST_PORT_8085_TCP_PROTO=tcp",
"TEST_PORT_8085_TCP_PORT=8085",
"TEST_PORT_8085_TCP_ADDR=1.2.3.5",
"KUBERNETES_SERVICE_HOST=1.2.3.4",
"KUBERNETES_SERVICE_PORT=8084",
"KUBERNETES_PORT=tcp://1.2.3.4:8084",
"KUBERNETES_PORT_8084_TCP=tcp://1.2.3.4:8084",
"KUBERNETES_PORT_8084_TCP_PROTO=tcp",
"KUBERNETES_PORT_8084_TCP_PORT=8084",
"KUBERNETES_PORT_8084_TCP_ADDR=1.2.3.4",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.7",
"KUBERNETES_RO_SERVICE_PORT=8087",
"KUBERNETES_RO_PORT=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8087_TCP_PORT=8087",
"KUBERNETES_RO_PORT_8087_TCP_ADDR=1.2.3.7"),
22,
},
{
"pod in master service ns",
"kubernetes",
&api.Container{},
"kubernetes",
false,
util.NewStringSet(
"NOT_SPECIAL_SERVICE_HOST=1.2.3.8",
"NOT_SPECIAL_SERVICE_PORT=8088",
"NOT_SPECIAL_PORT=tcp://1.2.3.8:8088",
"NOT_SPECIAL_PORT_8088_TCP=tcp://1.2.3.8:8088",
"NOT_SPECIAL_PORT_8088_TCP_PROTO=tcp",
"NOT_SPECIAL_PORT_8088_TCP_PORT=8088",
"NOT_SPECIAL_PORT_8088_TCP_ADDR=1.2.3.8",
"KUBERNETES_SERVICE_HOST=1.2.3.6",
"KUBERNETES_SERVICE_PORT=8086",
"KUBERNETES_PORT=tcp://1.2.3.6:8086",
"KUBERNETES_PORT_8086_TCP=tcp://1.2.3.6:8086",
"KUBERNETES_PORT_8086_TCP_PROTO=tcp",
"KUBERNETES_PORT_8086_TCP_PORT=8086",
"KUBERNETES_PORT_8086_TCP_ADDR=1.2.3.6",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.7",
"KUBERNETES_RO_SERVICE_PORT=8087",
"KUBERNETES_RO_PORT=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8087_TCP_PORT=8087",
"KUBERNETES_RO_PORT_8087_TCP_ADDR=1.2.3.7"),
21,
},
}
for _, tc := range testCases {
kl, _, _ := newTestKubelet(t)
kl.masterServiceNamespace = tc.masterServiceNamespace
if tc.nilLister {
kl.serviceLister = nil
} else {
kl.serviceLister = testServiceLister{services}
}
result, err := kl.makeEnvironmentVariables(tc.ns, tc.container)
if err != nil {
t.Errorf("[%v] Unexpected error: %v", tc.name, err)
}
resultSet := util.NewStringSet(result...)
if !resultSet.IsSuperset(tc.expectedEnvs) {
t.Errorf("[%v] Unexpected env entries; expected {%v}, got {%v}", tc.name, tc.expectedEnvs, resultSet)
}
if a := len(resultSet); a != tc.expectedEnvSize {
t.Errorf("[%v] Unexpected number of env vars; expected %v, got %v", tc.name, tc.expectedEnvSize, a)
}
}
}

View File

@ -62,23 +62,24 @@ import (
// Config is a structure used to configure a Master. // Config is a structure used to configure a Master.
type Config struct { type Config struct {
Client *client.Client Client *client.Client
Cloud cloudprovider.Interface Cloud cloudprovider.Interface
EtcdHelper tools.EtcdHelper EtcdHelper tools.EtcdHelper
HealthCheckMinions bool HealthCheckMinions bool
EventTTL time.Duration EventTTL time.Duration
MinionRegexp string MinionRegexp string
KubeletClient client.KubeletClient KubeletClient client.KubeletClient
PortalNet *net.IPNet PortalNet *net.IPNet
EnableLogsSupport bool EnableLogsSupport bool
EnableUISupport bool EnableUISupport bool
EnableSwaggerSupport bool EnableSwaggerSupport bool
EnableV1Beta3 bool EnableV1Beta3 bool
APIPrefix string APIPrefix string
CorsAllowedOriginList util.StringList CorsAllowedOriginList util.StringList
Authenticator authenticator.Request Authenticator authenticator.Request
Authorizer authorizer.Authorizer Authorizer authorizer.Authorizer
AdmissionControl admission.Interface AdmissionControl admission.Interface
MasterServiceNamespace string
// If specified, all web services will be registered into this container // If specified, all web services will be registered into this container
RestfulContainer *restful.Container RestfulContainer *restful.Container
@ -231,7 +232,8 @@ func New(c *Config) *Master {
minionRegistry := makeMinionRegistry(c) minionRegistry := makeMinionRegistry(c)
serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil)
boundPodFactory := &pod.BasicBoundPodFactory{ boundPodFactory := &pod.BasicBoundPodFactory{
ServiceRegistry: serviceRegistry, ServiceRegistry: serviceRegistry,
MasterServiceNamespace: c.MasterServiceNamespace,
} }
if c.KubeletClient == nil { if c.KubeletClient == nil {
glog.Fatalf("master.New() called with config.KubeletClient == nil") glog.Fatalf("master.New() called with config.KubeletClient == nil")

View File

@ -130,7 +130,8 @@ func TestEtcdCreatePod(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: api.NamespaceDefault,
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
Containers: []api.Container{ Containers: []api.Container{
@ -240,7 +241,8 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: api.NamespaceDefault,
}, },
}) })
if err != nil { if err != nil {
@ -282,7 +284,8 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: api.NamespaceDefault,
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
Containers: []api.Container{ Containers: []api.Container{
@ -346,7 +349,8 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient) registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreatePod(ctx, &api.Pod{ err := registry.CreatePod(ctx, &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: api.NamespaceDefault,
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
Containers: []api.Container{ Containers: []api.Container{

View File

@ -20,6 +20,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
type BoundPodFactory interface { type BoundPodFactory interface {
@ -29,25 +30,49 @@ type BoundPodFactory interface {
type BasicBoundPodFactory struct { type BasicBoundPodFactory struct {
// TODO: this should really point at the API rather than a registry // TODO: this should really point at the API rather than a registry
ServiceRegistry service.Registry ServiceRegistry service.Registry
MasterServiceNamespace string
} }
// getServiceEnvironmentVariables populates a list of environment variables that are use var masterServiceNames = util.NewStringSet("kubernetes", "kubernetes-ro")
// getServiceEnvironmentVariables populates a list of environment variables that are used
// in the container environment to get access to services. // in the container environment to get access to services.
func getServiceEnvironmentVariables(ctx api.Context, registry service.Registry, machine string) ([]api.EnvVar, error) { func (b *BasicBoundPodFactory) getServiceEnvironmentVariables(ctx api.Context, registry service.Registry, machine string) ([]api.EnvVar, error) {
var result []api.EnvVar var result []api.EnvVar
services, err := registry.ListServices(ctx) servicesInNs, err := registry.ListServices(ctx)
if err != nil { if err != nil {
return result, err return result, err
} }
return envvars.FromServices(services), nil
masterServices, err := registry.ListServices(api.WithNamespace(api.NewContext(), b.MasterServiceNamespace))
if err != nil {
return result, err
}
projection := map[string]api.Service{}
services := []api.Service{}
for _, service := range masterServices.Items {
if masterServiceNames.Has(service.Name) {
projection[service.Name] = service
}
}
for _, service := range servicesInNs.Items {
projection[service.Name] = service
}
for _, service := range projection {
services = append(services, service)
}
return envvars.FromServices(&api.ServiceList{Items: services}), nil
} }
func (b *BasicBoundPodFactory) MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) { func (b *BasicBoundPodFactory) MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) {
envVars, err := getServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine) envVars, err := b.getServiceEnvironmentVariables(api.WithNamespace(api.NewContext(), pod.Namespace), b.ServiceRegistry, machine)
if err != nil { if err != nil {
return nil, err return nil, err
} }
boundPod := &api.BoundPod{} boundPod := &api.BoundPod{}
if err := api.Scheme.Convert(pod, boundPod); err != nil { if err := api.Scheme.Convert(pod, boundPod); err != nil {
return nil, err return nil, err

View File

@ -22,13 +22,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
func TestMakeBoundPodNoServices(t *testing.T) { func TestMakeBoundPodNoServices(t *testing.T) {
registry := registrytest.ServiceRegistry{} registry := registrytest.ServiceRegistry{}
factory := &BasicBoundPodFactory{ factory := &BasicBoundPodFactory{
ServiceRegistry: &registry, ServiceRegistry: &registry,
MasterServiceNamespace: api.NamespaceDefault,
} }
pod, err := factory.MakeBoundPod("machine", &api.Pod{ pod, err := factory.MakeBoundPod("machine", &api.Pod{
@ -63,13 +63,9 @@ func TestMakeBoundPodServices(t *testing.T) {
List: api.ServiceList{ List: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
{ {
ObjectMeta: api.ObjectMeta{Name: "test"}, ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Port: 8080, Port: 8080,
ContainerPort: util.IntOrString{
Kind: util.IntstrInt,
IntVal: 900,
},
PortalIP: "1.2.3.4", PortalIP: "1.2.3.4",
}, },
}, },
@ -77,11 +73,12 @@ func TestMakeBoundPodServices(t *testing.T) {
}, },
} }
factory := &BasicBoundPodFactory{ factory := &BasicBoundPodFactory{
ServiceRegistry: &registry, ServiceRegistry: &registry,
MasterServiceNamespace: api.NamespaceDefault,
} }
pod, err := factory.MakeBoundPod("machine", &api.Pod{ pod, err := factory.MakeBoundPod("machine", &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foobar"}, ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"},
Spec: api.PodSpec{ Spec: api.PodSpec{
Containers: []api.Container{ Containers: []api.Container{
{ {
@ -140,13 +137,9 @@ func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) {
List: api.ServiceList{ List: api.ServiceList{
Items: []api.Service{ Items: []api.Service{
{ {
ObjectMeta: api.ObjectMeta{Name: "test"}, ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Port: 8080, Port: 8080,
ContainerPort: util.IntOrString{
Kind: util.IntstrInt,
IntVal: 900,
},
PortalIP: "1.2.3.4", PortalIP: "1.2.3.4",
}, },
}, },
@ -154,10 +147,12 @@ func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) {
}, },
} }
factory := &BasicBoundPodFactory{ factory := &BasicBoundPodFactory{
ServiceRegistry: &registry, ServiceRegistry: &registry,
MasterServiceNamespace: api.NamespaceDefault,
} }
pod, err := factory.MakeBoundPod("machine", &api.Pod{ pod, err := factory.MakeBoundPod("machine", &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"},
Spec: api.PodSpec{ Spec: api.PodSpec{
Containers: []api.Container{ Containers: []api.Container{
{ {
@ -220,3 +215,238 @@ func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) {
} }
} }
} }
func TestMakeBoundPodOnlyVisibleServices(t *testing.T) {
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Port: 8080,
PortalIP: "1.2.3.4",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"},
Spec: api.ServiceSpec{
Port: 8081,
PortalIP: "1.2.3.5",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test3", Namespace: "test"},
Spec: api.ServiceSpec{
Port: 8083,
PortalIP: "1.2.3.7",
},
},
},
},
}
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
MasterServiceNamespace: api.NamespaceDefault,
}
pod, err := factory.MakeBoundPod("machine", &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
container := pod.Spec.Containers[0]
envs := map[string]string{
"TEST_SERVICE_HOST": "1.2.3.5",
"TEST_SERVICE_PORT": "8081",
"TEST_PORT": "tcp://1.2.3.5:8081",
"TEST_PORT_8081_TCP": "tcp://1.2.3.5:8081",
"TEST_PORT_8081_TCP_PROTO": "tcp",
"TEST_PORT_8081_TCP_PORT": "8081",
"TEST_PORT_8081_TCP_ADDR": "1.2.3.5",
"TEST3_SERVICE_HOST": "1.2.3.7",
"TEST3_SERVICE_PORT": "8083",
"TEST3_PORT": "tcp://1.2.3.7:8083",
"TEST3_PORT_8083_TCP": "tcp://1.2.3.7:8083",
"TEST3_PORT_8083_TCP_PROTO": "tcp",
"TEST3_PORT_8083_TCP_PORT": "8083",
"TEST3_PORT_8083_TCP_ADDR": "1.2.3.7",
}
if len(container.Env) != len(envs) {
t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod)
}
for _, env := range container.Env {
expectedValue := envs[env.Name]
if expectedValue != env.Value {
t.Errorf("expected env %v value %v, got %v", env.Name, expectedValue, env.Value)
}
}
}
func TestMakeBoundPodMasterServices(t *testing.T) {
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Port: 8080,
PortalIP: "1.2.3.4",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"},
Spec: api.ServiceSpec{
Port: 8081,
PortalIP: "1.2.3.5",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test3", Namespace: "test"},
Spec: api.ServiceSpec{
Port: 8083,
PortalIP: "1.2.3.7",
},
},
},
},
}
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
MasterServiceNamespace: api.NamespaceDefault,
}
pod, err := factory.MakeBoundPod("machine", &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
container := pod.Spec.Containers[0]
envs := map[string]string{
"TEST_SERVICE_HOST": "1.2.3.5",
"TEST_SERVICE_PORT": "8081",
"TEST_PORT": "tcp://1.2.3.5:8081",
"TEST_PORT_8081_TCP": "tcp://1.2.3.5:8081",
"TEST_PORT_8081_TCP_PROTO": "tcp",
"TEST_PORT_8081_TCP_PORT": "8081",
"TEST_PORT_8081_TCP_ADDR": "1.2.3.5",
"TEST3_SERVICE_HOST": "1.2.3.7",
"TEST3_SERVICE_PORT": "8083",
"TEST3_PORT": "tcp://1.2.3.7:8083",
"TEST3_PORT_8083_TCP": "tcp://1.2.3.7:8083",
"TEST3_PORT_8083_TCP_PROTO": "tcp",
"TEST3_PORT_8083_TCP_PORT": "8083",
"TEST3_PORT_8083_TCP_ADDR": "1.2.3.7",
"KUBERNETES_SERVICE_HOST": "1.2.3.4",
"KUBERNETES_SERVICE_PORT": "8080",
"KUBERNETES_PORT": "tcp://1.2.3.4:8080",
"KUBERNETES_PORT_8080_TCP": "tcp://1.2.3.4:8080",
"KUBERNETES_PORT_8080_TCP_PROTO": "tcp",
"KUBERNETES_PORT_8080_TCP_PORT": "8080",
"KUBERNETES_PORT_8080_TCP_ADDR": "1.2.3.4",
}
if len(container.Env) != len(envs) {
t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod)
}
for _, env := range container.Env {
expectedValue := envs[env.Name]
if expectedValue != env.Value {
t.Errorf("expected env %v value %v, got %v", env.Name, expectedValue, env.Value)
}
}
}
func TestMakeBoundPodMasterServiceInNs(t *testing.T) {
registry := registrytest.ServiceRegistry{
List: api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Port: 8080,
PortalIP: "1.2.3.4",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test"},
Spec: api.ServiceSpec{
Port: 8081,
PortalIP: "1.2.3.5",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "test"},
Spec: api.ServiceSpec{
Port: 8083,
PortalIP: "1.2.3.7",
},
},
},
},
}
factory := &BasicBoundPodFactory{
ServiceRegistry: &registry,
MasterServiceNamespace: api.NamespaceDefault,
}
pod, err := factory.MakeBoundPod("machine", &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "test"},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
},
},
},
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
container := pod.Spec.Containers[0]
envs := map[string]string{
"TEST_SERVICE_HOST": "1.2.3.5",
"TEST_SERVICE_PORT": "8081",
"TEST_PORT": "tcp://1.2.3.5:8081",
"TEST_PORT_8081_TCP": "tcp://1.2.3.5:8081",
"TEST_PORT_8081_TCP_PROTO": "tcp",
"TEST_PORT_8081_TCP_PORT": "8081",
"TEST_PORT_8081_TCP_ADDR": "1.2.3.5",
"KUBERNETES_SERVICE_HOST": "1.2.3.7",
"KUBERNETES_SERVICE_PORT": "8083",
"KUBERNETES_PORT": "tcp://1.2.3.7:8083",
"KUBERNETES_PORT_8083_TCP": "tcp://1.2.3.7:8083",
"KUBERNETES_PORT_8083_TCP_PROTO": "tcp",
"KUBERNETES_PORT_8083_TCP_PORT": "8083",
"KUBERNETES_PORT_8083_TCP_ADDR": "1.2.3.7",
}
if len(container.Env) != len(envs) {
t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod)
}
for _, env := range container.Env {
expectedValue := envs[env.Name]
if expectedValue != env.Value {
t.Errorf("expected env %v value %v, got %v", env.Name, expectedValue, env.Value)
}
}
}

View File

@ -45,10 +45,23 @@ func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
// Return by copy to avoid data races ns, _ := api.NamespaceFrom(ctx)
// Copy metadata from internal list into result
res := new(api.ServiceList) res := new(api.ServiceList)
*res = r.List res.TypeMeta = r.List.TypeMeta
res.Items = append([]api.Service{}, r.List.Items...) res.ListMeta = r.List.ListMeta
if ns != api.NamespaceAll {
for _, service := range r.List.Items {
if ns == service.Namespace {
res.Items = append(res.Items, service)
}
}
} else {
res.Items = append([]api.Service{}, r.List.Items...)
}
return res, r.Err return res, r.Err
} }

View File

@ -347,13 +347,13 @@ func TestServiceRegistryList(t *testing.T) {
machines := []string{"foo", "bar", "baz"} machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
registry.CreateService(ctx, &api.Service{ registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
}, },
}) })
registry.CreateService(ctx, &api.Service{ registry.CreateService(ctx, &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo2"}, ObjectMeta: api.ObjectMeta{Name: "foo2", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Selector: map[string]string{"bar2": "baz2"}, Selector: map[string]string{"bar2": "baz2"},
}, },
@ -585,7 +585,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
rest1.portalMgr.randomAttempts = 0 rest1.portalMgr.randomAttempts = 0
svc := &api.Service{ svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
Port: 6502, Port: 6502,
@ -595,7 +595,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
c, _ := rest1.Create(ctx, svc) c, _ := rest1.Create(ctx, svc)
<-c <-c
svc = &api.Service{ svc = &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
Port: 6502, Port: 6502,
@ -609,7 +609,7 @@ func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
rest2.portalMgr.randomAttempts = 0 rest2.portalMgr.randomAttempts = 0
svc = &api.Service{ svc = &api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"}, ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"}, Selector: map[string]string{"bar": "baz"},
Port: 6502, Port: 6502,

View File

@ -83,7 +83,7 @@ func GetAPIServerClient(authPath string, apiServerList util.StringList) (*client
} }
// RunApiServer starts an API server in a go routine. // RunApiServer starts an API server in a go routine.
func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, port int) { func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, port int, masterServiceNamespace string) {
handler := delegateHandler{} handler := delegateHandler{}
helper, err := master.NewEtcdHelper(etcdClient, "") helper, err := master.NewEtcdHelper(etcdClient, "")
@ -104,9 +104,10 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p
APIPrefix: "/api", APIPrefix: "/api",
Authorizer: apiserver.NewAlwaysAllowAuthorizer(), Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
ReadWritePort: port, ReadWritePort: port,
ReadOnlyPort: port, ReadOnlyPort: port,
PublicAddress: addr, PublicAddress: addr,
MasterServiceNamespace: masterServiceNamespace,
}) })
handler.delegate = m.InsecureHandler handler.delegate = m.InsecureHandler
@ -144,7 +145,12 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient. // SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient.
// Under the hood it calls RunKubelet (below) // Under the hood it calls RunKubelet (below)
func SimpleRunKubelet(client *client.Client, etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) { func SimpleRunKubelet(client *client.Client,
etcdClient tools.EtcdClient,
dockerClient dockertools.DockerInterface,
hostname, rootDir, manifestURL, address string,
port uint,
masterServiceNamespace string) {
kcfg := KubeletConfig{ kcfg := KubeletConfig{
KubeClient: client, KubeClient: client,
EtcdClient: etcdClient, EtcdClient: etcdClient,
@ -160,6 +166,7 @@ func SimpleRunKubelet(client *client.Client, etcdClient tools.EtcdClient, docker
SyncFrequency: 3 * time.Second, SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second, MinimumGCAge: 10 * time.Second,
MaxContainerCount: 5, MaxContainerCount: 5,
MasterServiceNamespace: masterServiceNamespace,
} }
RunKubelet(&kcfg) RunKubelet(&kcfg)
} }
@ -255,6 +262,7 @@ type KubeletConfig struct {
EnableDebuggingHandlers bool EnableDebuggingHandlers bool
Port uint Port uint
Runonce bool Runonce bool
MasterServiceNamespace string
} }
func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) { func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kubelet, error) {
@ -275,7 +283,8 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.MaxContainerCount, kc.MaxContainerCount,
pc.IsSourceSeen, pc.IsSourceSeen,
kc.ClusterDomain, kc.ClusterDomain,
net.IP(kc.ClusterDNS)) net.IP(kc.ClusterDNS),
kc.MasterServiceNamespace)
if err != nil { if err != nil {
return nil, err return nil, err