Pods should see services only from their own ns

This commit is contained in:
Paul Morie
2015-01-08 10:25:14 -05:00
parent 9117260bfd
commit fd834ae84d
14 changed files with 790 additions and 140 deletions

View File

@@ -34,9 +34,12 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
@@ -76,7 +79,8 @@ func NewMainKubelet(
maxContainerCount int,
sourceReady SourceReadyFn,
clusterDomain string,
clusterDNS net.IP) (*Kubelet, error) {
clusterDNS net.IP,
masterServiceNamespace string) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@@ -86,24 +90,31 @@ func NewMainKubelet(
if minimumGCAge <= 0 {
return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge)
}
serviceStore := cache.NewStore()
cache.NewReflector(&cache.ListWatch{kubeClient, labels.Everything(), "services", api.NamespaceAll}, &api.Service{}, serviceStore).Run()
serviceLister := &cache.StoreToServiceLister{serviceStore}
klet := &Kubelet{
hostname: hostname,
dockerClient: dockerClient,
etcdClient: etcdClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
networkContainerImage: networkContainerImage,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourceReady: sourceReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
hostname: hostname,
dockerClient: dockerClient,
etcdClient: etcdClient,
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
networkContainerImage: networkContainerImage,
podWorkers: newPodWorkers(),
dockerIDToRef: map[dockertools.DockerID]*api.ObjectReference{},
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
httpClient: &http.Client{},
pullQPS: pullQPS,
pullBurst: pullBurst,
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourceReady: sourceReady,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
masterServiceNamespace: masterServiceNamespace,
}
if err := klet.setupDataDirs(); err != nil {
@@ -117,10 +128,15 @@ type httpGetter interface {
Get(url string) (*http.Response, error)
}
type serviceLister interface {
List() (api.ServiceList, error)
}
// Kubelet is the main kubelet implementation.
type Kubelet struct {
hostname string
dockerClient dockertools.DockerInterface
kubeClient *client.Client
rootDirectory string
networkContainerImage string
podWorkers *podWorkers
@@ -168,6 +184,9 @@ type Kubelet struct {
// If non-nil, use this for container DNS server.
clusterDNS net.IP
masterServiceNamespace string
serviceLister serviceLister
}
// GetRootDir returns the full path to the directory under which kubelet can
@@ -423,14 +442,6 @@ func (self *podWorkers) Run(podFullName string, action func()) {
}()
}
func makeEnvironmentVariables(container *api.Container) []string {
var result []string
for _, value := range container.Env {
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
}
return result
}
func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
for _, mount := range container.VolumeMounts {
@@ -607,7 +618,10 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
envVariables := makeEnvironmentVariables(container)
envVariables, err := kl.makeEnvironmentVariables(pod.Namespace, container)
if err != nil {
return "", err
}
binds := makeBinds(pod, container, podVolumes)
exposedPorts, portBindings := makePortsAndBindings(container)
@@ -692,6 +706,91 @@ func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, pod
return dockertools.DockerID(dockerContainer.ID), err
}
var masterServices = util.NewStringSet("kubernetes", "kubernetes-ro")
// getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
var (
serviceMap = make(map[string]api.Service)
m = make(map[string]string)
)
// Get all service resources from the master (via a cache),
// and populate them into service enviroment variables.
if kl.serviceLister == nil {
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
return m, nil
}
services, err := kl.serviceLister.List()
if err != nil {
return m, fmt.Errorf("Failed to list services when setting up env vars.")
}
// project the services in namespace ns onto the master services
for _, service := range services.Items {
serviceName := service.Name
switch service.Namespace {
// for the case whether the master service namespace is the namespace the pod
// is in, pod should receive all the pods in the namespace.
//
// ordering of the case clauses below enforces this
case ns:
serviceMap[serviceName] = service
case kl.masterServiceNamespace:
if masterServices.Has(serviceName) {
_, exists := serviceMap[serviceName]
if !exists {
serviceMap[serviceName] = service
}
}
}
}
services.Items = []api.Service{}
for _, service := range serviceMap {
services.Items = append(services.Items, service)
}
for _, e := range envvars.FromServices(&services) {
m[e.Name] = e.Value
}
return m, nil
}
// Make the service environment variables for a pod in the given namespace.
func (kl *Kubelet) makeEnvironmentVariables(ns string, container *api.Container) ([]string, error) {
var result []string
// Note: These are added to the docker.Config, but are not included in the checksum computed
// by dockertools.BuildDockerName(...). That way, we can still determine whether an
// api.Container is already running by its hash. (We don't want to restart a container just
// because some service changed.)
//
// Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
// To avoid this users can: (1) wait between starting a service and starting; or (2) detect
// missing service env var and exit and be restarted; or (3) use DNS instead of env vars
// and keep trying to resolve the DNS name of the service (recommended).
serviceEnv, err := kl.getServiceEnvVarMap(ns)
if err != nil {
return result, err
}
for _, value := range container.Env {
// The code is in transition from using etcd+BoundPods to apiserver+Pods.
// So, the master may set service env vars, or kubelet may. In case both are doing
// it, we delete the key from the kubelet-generated ones so we don't have duplicate
// env vars.
// TODO: remove this net line once all platforms use apiserver+Pods.
delete(serviceEnv, value.Name)
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
}
// Append remaining service env vars.
for k, v := range serviceEnv {
result = append(result, fmt.Sprintf("%s=%s", k, v))
}
return result, nil
}
func (kl *Kubelet) applyClusterDNS(hc *docker.HostConfig, pod *api.BoundPod) error {
// Get host DNS settings and append them to cluster DNS settings.
f, err := os.Open("/etc/resolv.conf")

View File

@@ -58,6 +58,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools
kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers()
kubelet.sourceReady = func(source string) bool { return true }
return kubelet, fakeEtcdClient, fakeDocker
}
@@ -913,31 +914,6 @@ func TestSyncPodUnhealthy(t *testing.T) {
}
}
func TestMakeEnvVariables(t *testing.T) {
container := api.Container{
Env: []api.EnvVar{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "blah",
},
},
}
vars := makeEnvironmentVariables(&container)
if len(vars) != len(container.Env) {
t.Errorf("Vars don't match. Expected: %#v Found: %#v", container.Env, vars)
}
for ix, env := range container.Env {
value := fmt.Sprintf("%s=%s", env.Name, env.Value)
if value != vars[ix] {
t.Errorf("Unexpected value: %s. Expected: %s", vars[ix], value)
}
}
}
func TestMountExternalVolumes(t *testing.T) {
kubelet, _, _ := newTestKubelet(t)
pod := api.BoundPod{
@@ -1936,3 +1912,275 @@ func TestParseResolvConf(t *testing.T) {
}
}
}
type testServiceLister struct {
services []api.Service
}
func (ls testServiceLister) List() (api.ServiceList, error) {
return api.ServiceList{
Items: ls.services,
}, nil
}
func TestMakeEnvironmentVariables(t *testing.T) {
services := []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Port: 8081,
PortalIP: "1.2.3.1",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes-ro", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Port: 8082,
PortalIP: "1.2.3.2",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test1"},
Spec: api.ServiceSpec{
Port: 8083,
PortalIP: "1.2.3.3",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "test2"},
Spec: api.ServiceSpec{
Port: 8084,
PortalIP: "1.2.3.4",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "test", Namespace: "test2"},
Spec: api.ServiceSpec{
Port: 8085,
PortalIP: "1.2.3.5",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes", Namespace: "kubernetes"},
Spec: api.ServiceSpec{
Port: 8086,
PortalIP: "1.2.3.6",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "kubernetes-ro", Namespace: "kubernetes"},
Spec: api.ServiceSpec{
Port: 8087,
PortalIP: "1.2.3.7",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "not-special", Namespace: "kubernetes"},
Spec: api.ServiceSpec{
Port: 8088,
PortalIP: "1.2.3.8",
},
},
}
testCases := []struct {
name string // the name of the test case
ns string // the namespace to generate environment for
container *api.Container // the container to use
masterServiceNamespace string // the namespace to read master service info from
nilLister bool // whether the lister should be nil
expectedEnvs util.StringSet // a set of expected environment vars
expectedEnvSize int // total number of expected env vars
}{
{
"api server = Y, kubelet = Y",
"test1",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "BAR"},
{Name: "TEST_SERVICE_HOST", Value: "1.2.3.3"},
{Name: "TEST_SERVICE_PORT", Value: "8083"},
{Name: "TEST_PORT", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP_PROTO", Value: "tcp"},
{Name: "TEST_PORT_8083_TCP_PORT", Value: "8083"},
{Name: "TEST_PORT_8083_TCP_ADDR", Value: "1.2.3.3"},
},
},
api.NamespaceDefault,
false,
util.NewStringSet("FOO=BAR",
"TEST_SERVICE_HOST=1.2.3.3",
"TEST_SERVICE_PORT=8083",
"TEST_PORT=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP_PROTO=tcp",
"TEST_PORT_8083_TCP_PORT=8083",
"TEST_PORT_8083_TCP_ADDR=1.2.3.3",
"KUBERNETES_SERVICE_HOST=1.2.3.1",
"KUBERNETES_SERVICE_PORT=8081",
"KUBERNETES_PORT=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP_PROTO=tcp",
"KUBERNETES_PORT_8081_TCP_PORT=8081",
"KUBERNETES_PORT_8081_TCP_ADDR=1.2.3.1",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.2",
"KUBERNETES_RO_SERVICE_PORT=8082",
"KUBERNETES_RO_PORT=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8082_TCP_PORT=8082",
"KUBERNETES_RO_PORT_8082_TCP_ADDR=1.2.3.2"),
22,
},
{
"api server = Y, kubelet = N",
"test1",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "BAR"},
{Name: "TEST_SERVICE_HOST", Value: "1.2.3.3"},
{Name: "TEST_SERVICE_PORT", Value: "8083"},
{Name: "TEST_PORT", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP", Value: "tcp://1.2.3.3:8083"},
{Name: "TEST_PORT_8083_TCP_PROTO", Value: "tcp"},
{Name: "TEST_PORT_8083_TCP_PORT", Value: "8083"},
{Name: "TEST_PORT_8083_TCP_ADDR", Value: "1.2.3.3"},
},
},
api.NamespaceDefault,
true,
util.NewStringSet("FOO=BAR",
"TEST_SERVICE_HOST=1.2.3.3",
"TEST_SERVICE_PORT=8083",
"TEST_PORT=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP_PROTO=tcp",
"TEST_PORT_8083_TCP_PORT=8083",
"TEST_PORT_8083_TCP_ADDR=1.2.3.3"),
8,
},
{
"api server = N; kubelet = Y",
"test1",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "BAZ"},
},
},
api.NamespaceDefault,
false,
util.NewStringSet("FOO=BAZ",
"TEST_SERVICE_HOST=1.2.3.3",
"TEST_SERVICE_PORT=8083",
"TEST_PORT=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP=tcp://1.2.3.3:8083",
"TEST_PORT_8083_TCP_PROTO=tcp",
"TEST_PORT_8083_TCP_PORT=8083",
"TEST_PORT_8083_TCP_ADDR=1.2.3.3",
"KUBERNETES_SERVICE_HOST=1.2.3.1",
"KUBERNETES_SERVICE_PORT=8081",
"KUBERNETES_PORT=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP=tcp://1.2.3.1:8081",
"KUBERNETES_PORT_8081_TCP_PROTO=tcp",
"KUBERNETES_PORT_8081_TCP_PORT=8081",
"KUBERNETES_PORT_8081_TCP_ADDR=1.2.3.1",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.2",
"KUBERNETES_RO_SERVICE_PORT=8082",
"KUBERNETES_RO_PORT=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP=tcp://1.2.3.2:8082",
"KUBERNETES_RO_PORT_8082_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8082_TCP_PORT=8082",
"KUBERNETES_RO_PORT_8082_TCP_ADDR=1.2.3.2"),
22,
},
{
"master service in pod ns",
"test2",
&api.Container{
Env: []api.EnvVar{
{Name: "FOO", Value: "ZAP"},
},
},
"kubernetes",
false,
util.NewStringSet("FOO=ZAP",
"TEST_SERVICE_HOST=1.2.3.5",
"TEST_SERVICE_PORT=8085",
"TEST_PORT=tcp://1.2.3.5:8085",
"TEST_PORT_8085_TCP=tcp://1.2.3.5:8085",
"TEST_PORT_8085_TCP_PROTO=tcp",
"TEST_PORT_8085_TCP_PORT=8085",
"TEST_PORT_8085_TCP_ADDR=1.2.3.5",
"KUBERNETES_SERVICE_HOST=1.2.3.4",
"KUBERNETES_SERVICE_PORT=8084",
"KUBERNETES_PORT=tcp://1.2.3.4:8084",
"KUBERNETES_PORT_8084_TCP=tcp://1.2.3.4:8084",
"KUBERNETES_PORT_8084_TCP_PROTO=tcp",
"KUBERNETES_PORT_8084_TCP_PORT=8084",
"KUBERNETES_PORT_8084_TCP_ADDR=1.2.3.4",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.7",
"KUBERNETES_RO_SERVICE_PORT=8087",
"KUBERNETES_RO_PORT=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8087_TCP_PORT=8087",
"KUBERNETES_RO_PORT_8087_TCP_ADDR=1.2.3.7"),
22,
},
{
"pod in master service ns",
"kubernetes",
&api.Container{},
"kubernetes",
false,
util.NewStringSet(
"NOT_SPECIAL_SERVICE_HOST=1.2.3.8",
"NOT_SPECIAL_SERVICE_PORT=8088",
"NOT_SPECIAL_PORT=tcp://1.2.3.8:8088",
"NOT_SPECIAL_PORT_8088_TCP=tcp://1.2.3.8:8088",
"NOT_SPECIAL_PORT_8088_TCP_PROTO=tcp",
"NOT_SPECIAL_PORT_8088_TCP_PORT=8088",
"NOT_SPECIAL_PORT_8088_TCP_ADDR=1.2.3.8",
"KUBERNETES_SERVICE_HOST=1.2.3.6",
"KUBERNETES_SERVICE_PORT=8086",
"KUBERNETES_PORT=tcp://1.2.3.6:8086",
"KUBERNETES_PORT_8086_TCP=tcp://1.2.3.6:8086",
"KUBERNETES_PORT_8086_TCP_PROTO=tcp",
"KUBERNETES_PORT_8086_TCP_PORT=8086",
"KUBERNETES_PORT_8086_TCP_ADDR=1.2.3.6",
"KUBERNETES_RO_SERVICE_HOST=1.2.3.7",
"KUBERNETES_RO_SERVICE_PORT=8087",
"KUBERNETES_RO_PORT=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP=tcp://1.2.3.7:8087",
"KUBERNETES_RO_PORT_8087_TCP_PROTO=tcp",
"KUBERNETES_RO_PORT_8087_TCP_PORT=8087",
"KUBERNETES_RO_PORT_8087_TCP_ADDR=1.2.3.7"),
21,
},
}
for _, tc := range testCases {
kl, _, _ := newTestKubelet(t)
kl.masterServiceNamespace = tc.masterServiceNamespace
if tc.nilLister {
kl.serviceLister = nil
} else {
kl.serviceLister = testServiceLister{services}
}
result, err := kl.makeEnvironmentVariables(tc.ns, tc.container)
if err != nil {
t.Errorf("[%v] Unexpected error: %v", tc.name, err)
}
resultSet := util.NewStringSet(result...)
if !resultSet.IsSuperset(tc.expectedEnvs) {
t.Errorf("[%v] Unexpected env entries; expected {%v}, got {%v}", tc.name, tc.expectedEnvs, resultSet)
}
if a := len(resultSet); a != tc.expectedEnvSize {
t.Errorf("[%v] Unexpected number of env vars; expected %v, got %v", tc.name, tc.expectedEnvSize, a)
}
}
}