diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index b38e761da59..584f3581488 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -21,8 +21,10 @@ package main import ( "flag" "net" + "net/http" "strconv" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -35,6 +37,7 @@ var ( apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs") + minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.") etcdServerList, machineList util.StringList ) @@ -68,11 +71,16 @@ func main() { } } + podInfoGetter := &client.HTTPPodInfoGetter{ + Client: http.DefaultClient, + Port: *minionPort, + } + var m *master.Master if len(etcdServerList) > 0 { - m = master.New(etcdServerList, machineList, cloud, *minionRegexp) + m = master.New(etcdServerList, machineList, podInfoGetter, cloud, *minionRegexp) } else { - m = master.NewMemoryServer(machineList, cloud) + m = master.NewMemoryServer(machineList, podInfoGetter, cloud) } glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix)) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index a90f7fb99af..e6e9f183bed 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -41,6 +41,29 @@ var ( fakeDocker1, fakeDocker2 kubelet.FakeDockerClient ) +type fakePodInfoGetter struct{} + +func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) { + // This is a horrible hack to get around the fact that we can't provide + // different port numbers per kubelet... + var c client.PodInfoGetter + switch host { + case "localhost": + c = &client.HTTPPodInfoGetter{ + Client: http.DefaultClient, + Port: 10250, + } + case "machine": + c = &client.HTTPPodInfoGetter{ + Client: http.DefaultClient, + Port: 10251, + } + default: + glog.Fatalf("Can't get info for: %v, %v", host, podID) + } + return c.GetPodInfo("localhost", podID) +} + func startComponents(manifestURL string) (apiServerURL string) { // Setup servers := []string{"http://localhost:4001"} @@ -48,7 +71,7 @@ func startComponents(manifestURL string) (apiServerURL string) { machineList := []string{"localhost", "machine"} // Master - m := master.New(servers, machineList, nil, "") + m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "") apiserver := httptest.NewServer(m.ConstructHandler("/api/v1beta1")) controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(apiserver.URL, nil)) @@ -64,7 +87,7 @@ func startComponents(manifestURL string) (apiServerURL string) { SyncFrequency: 5 * time.Second, HTTPCheckFrequency: 5 * time.Second, } - go myKubelet.RunKubelet("", manifestURL, servers[0], "localhost", "", 0) + go myKubelet.RunKubelet("", "", manifestURL, servers[0], "localhost", 10250) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. @@ -76,7 +99,7 @@ func startComponents(manifestURL string) (apiServerURL string) { SyncFrequency: 5 * time.Second, HTTPCheckFrequency: 5 * time.Second, } - go otherKubelet.RunKubelet("", "", servers[0], "localhost", "", 0) + go otherKubelet.RunKubelet("", "", "", servers[0], "localhost", 10251) return apiserver.URL } @@ -150,7 +173,7 @@ func main() { if len(createdPods) != 7 { glog.Fatalf("Unexpected list of created pods:\n\n%#v\n\n%#v\n\n%#v\n\n", createdPods.List(), fakeDocker1.Created, fakeDocker2.Created) } - glog.Infof("OK") + glog.Infof("OK - found created pods: %#v", createdPods.List()) } // Serve a file for kubelet to read. diff --git a/cmd/kubecfg/kubecfg.go b/cmd/kubecfg/kubecfg.go index bb95652972a..ae13408b394 100644 --- a/cmd/kubecfg/kubecfg.go +++ b/cmd/kubecfg/kubecfg.go @@ -74,11 +74,11 @@ func readConfig(storage string) []byte { } data, err := ioutil.ReadFile(*config) if err != nil { - glog.Fatalf("Unable to read %v: %#v\n", *config, err) + glog.Fatalf("Unable to read %v: %v\n", *config, err) } data, err = kubecfg.ToWireFormat(data, storage) if err != nil { - glog.Fatalf("Error parsing %v as an object for %v: %#v\n", *config, storage, err) + glog.Fatalf("Error parsing %v as an object for %v: %v\n", *config, storage, err) } if *verbose { glog.Infof("Parsed config file successfully; sending:\n%v\n", string(data)) @@ -122,7 +122,7 @@ func main() { if secure { auth, err = kubecfg.LoadAuthInfo(*authConfig) if err != nil { - glog.Fatalf("Error loading auth: %#v", err) + glog.Fatalf("Error loading auth: %v", err) } } @@ -175,7 +175,8 @@ func executeAPIRequest(method string, s *kube_client.Client) bool { if method == "create" || method == "update" { r.Body(readConfig(parseStorage())) } - obj, err := r.Do().Get() + result := r.Do() + obj, err := result.Get() if err != nil { glog.Fatalf("Got request error: %v\n", err) return false @@ -191,7 +192,8 @@ func executeAPIRequest(method string, s *kube_client.Client) bool { } if err = printer.PrintObj(obj, os.Stdout); err != nil { - glog.Fatalf("Failed to print: %#v\nRaw received object:\n%#v\n", err, obj) + body, _ := result.Raw() + glog.Fatalf("Failed to print: %v\nRaw received object:\n%#v\n\nBody received: %v", err, obj, string(body)) } fmt.Print("\n") @@ -223,7 +225,7 @@ func executeControllerRequest(method string, c *kube_client.Client) bool { replicas, err := strconv.Atoi(flag.Arg(2)) name := flag.Arg(3) if err != nil { - glog.Fatalf("Error parsing replicas: %#v", err) + glog.Fatalf("Error parsing replicas: %v", err) } err = kubecfg.RunController(image, name, replicas, c, *portSpec, *servicePort) case "resize": @@ -234,14 +236,14 @@ func executeControllerRequest(method string, c *kube_client.Client) bool { name := args[1] replicas, err := strconv.Atoi(args[2]) if err != nil { - glog.Fatalf("Error parsing replicas: %#v", err) + glog.Fatalf("Error parsing replicas: %v", err) } err = kubecfg.ResizeController(name, replicas, c) default: return false } if err != nil { - glog.Fatalf("Error: %#v", err) + glog.Fatalf("Error: %v", err) } return true } diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 55e880a3d39..865ea19ef11 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -89,5 +89,5 @@ func main() { SyncFrequency: *syncFrequency, HTTPCheckFrequency: *httpCheckFrequency, } - my_kubelet.RunKubelet(*config, *manifestUrl, *etcdServers, *address, *dockerEndpoint, *port) + my_kubelet.RunKubelet(*dockerEndpoint, *config, *manifestUrl, *etcdServers, *address, *port) } diff --git a/pkg/api/types.go b/pkg/api/types.go index 2df6fe12cd7..9e11e52714a 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -16,6 +16,10 @@ limitations under the License. package api +import ( + "github.com/fsouza/go-dockerclient" +) + // Common string formats // --------------------- // Many fields in this API have formatting requirements. The commonly used @@ -153,13 +157,22 @@ const ( PodStopped PodStatus = "Stopped" ) +// PodInfo contains one entry for every container with available info. +type PodInfo map[string]docker.Container + // PodState is the state of a pod, used as either input (desired state) or output (current state) type PodState struct { Manifest ContainerManifest `json:"manifest,omitempty" yaml:"manifest,omitempty"` Status PodStatus `json:"status,omitempty" yaml:"status,omitempty"` Host string `json:"host,omitempty" yaml:"host,omitempty"` HostIP string `json:"hostIP,omitempty" yaml:"hostIP,omitempty"` - Info interface{} `json:"info,omitempty" yaml:"info,omitempty"` + + // The key of this map is the *name* of the container within the manifest; it has one + // entry per container in the manifest. The value of this map is currently the output + // of `docker inspect`. This output format is *not* final and should not be relied + // upon. + // TODO: Make real decisions about what our info should look like. + Info PodInfo `json:"info,omitempty" yaml:"info,omitempty"` } type PodList struct { diff --git a/pkg/client/container_info.go b/pkg/client/podinfo.go similarity index 53% rename from pkg/client/container_info.go rename to pkg/client/podinfo.go index ce408206f0a..e216662f5fe 100644 --- a/pkg/client/container_info.go +++ b/pkg/client/podinfo.go @@ -20,25 +20,35 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net" "net/http" + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -// ContainerInfo is an interface for things that can get information about a container. +// PodInfoGetter is an interface for things that can get information about a pod's containers. // Injectable for easy testing. -type ContainerInfo interface { - // GetContainerInfo returns information about container 'name' on 'host' - // Returns an untyped interface, and an error, if one occurs - GetContainerInfo(host, name string) (interface{}, error) +type PodInfoGetter interface { + // GetPodInfo returns information about all containers which are part + // Returns an api.PodInfo, or an error if one occurs. + GetPodInfo(host, podID string) (api.PodInfo, error) } // The default implementation, accesses the kubelet over HTTP -type HTTPContainerInfo struct { +type HTTPPodInfoGetter struct { Client *http.Client Port uint } -func (c *HTTPContainerInfo) GetContainerInfo(host, name string) (interface{}, error) { - request, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/containerInfo?container=%s", host, c.Port, name), nil) +func (c *HTTPPodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) { + request, err := http.NewRequest( + "GET", + fmt.Sprintf( + "http://%s/podInfo?podID=%s", + net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)), + podID), + nil) if err != nil { return nil, err } @@ -51,17 +61,21 @@ func (c *HTTPContainerInfo) GetContainerInfo(host, name string) (interface{}, er if err != nil { return nil, err } - var data interface{} - err = json.Unmarshal(body, &data) - return data, err + // Check that this data can be unmarshalled + info := api.PodInfo{} + err = json.Unmarshal(body, &info) + if err != nil { + return nil, err + } + return info, nil } // Useful for testing. -type FakeContainerInfo struct { - data interface{} +type FakePodInfoGetter struct { + data api.PodInfo err error } -func (c *FakeContainerInfo) GetContainerInfo(host, name string) (interface{}, error) { +func (c *FakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) { return c.data, c.err } diff --git a/pkg/client/container_info_test.go b/pkg/client/podinfo_test.go similarity index 67% rename from pkg/client/container_info_test.go rename to pkg/client/podinfo_test.go index 1b0904f9596..7748b7b3996 100644 --- a/pkg/client/container_info_test.go +++ b/pkg/client/podinfo_test.go @@ -25,7 +25,9 @@ import ( "strings" "testing" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/fsouza/go-dockerclient" ) // TODO: This doesn't reduce typing enough to make it worth the less readable errors. Remove. @@ -35,11 +37,15 @@ func expectNoError(t *testing.T, err error) { } } -func TestHTTPContainerInfo(t *testing.T) { - body := `{"items":[]}` +func TestHTTPPodInfoGetter(t *testing.T) { + expectObj := api.PodInfo{ + "myID": docker.Container{ID: "myID"}, + } + body, err := json.Marshal(expectObj) + expectNoError(t, err) fakeHandler := util.FakeHandler{ StatusCode: 200, - ResponseBody: body, + ResponseBody: string(body), } testServer := httptest.NewServer(&fakeHandler) @@ -49,14 +55,15 @@ func TestHTTPContainerInfo(t *testing.T) { port, err := strconv.Atoi(parts[1]) expectNoError(t, err) - containerInfo := &HTTPContainerInfo{ + podInfoGetter := &HTTPPodInfoGetter{ Client: http.DefaultClient, Port: uint(port), } - data, err := containerInfo.GetContainerInfo(parts[0], "foo") + gotObj, err := podInfoGetter.GetPodInfo(parts[0], "foo") expectNoError(t, err) - dataString, _ := json.Marshal(data) - if string(dataString) != body { - t.Errorf("Unexpected response. Expected: %s, received %s", body, string(dataString)) + + // reflect.DeepEqual(expectObj, gotObj) doesn't handle blank times well + if len(gotObj) != len(expectObj) || expectObj["myID"].ID != gotObj["myID"].ID { + t.Errorf("Unexpected response. Expected: %#v, received %#v", expectObj, gotObj) } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7bdb48cb4fe..f33585183f1 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -101,9 +101,9 @@ const ( // Starts background goroutines. If config_path, manifest_url, or address are empty, // they are not watched. Never returns. -func (kl *Kubelet) RunKubelet(config_path, manifest_url, etcd_servers, address, endpoint string, port uint) { +func (kl *Kubelet) RunKubelet(dockerEndpoint, config_path, manifest_url, etcd_servers, address string, port uint) { if kl.DockerPuller == nil { - kl.DockerPuller = MakeDockerPuller(endpoint) + kl.DockerPuller = MakeDockerPuller(dockerEndpoint) } updateChannel := make(chan manifestUpdate) if config_path != "" { @@ -783,18 +783,32 @@ func (kl *Kubelet) getContainerIdFromName(name string) (DockerId, bool, error) { return "", false, nil } -// Returns docker info for a container -func (kl *Kubelet) GetContainerInfo(name string) (string, error) { - dockerId, found, err := kl.getContainerIdFromName(name) - if err != nil || !found { - return "{}", err - } - info, err := kl.DockerClient.InspectContainer(string(dockerId)) +// Returns docker info for all containers in the pod/manifest +func (kl *Kubelet) GetPodInfo(podID string) (api.PodInfo, error) { + info := api.PodInfo{} + + containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{}) if err != nil { - return "{}", err + return nil, err } - data, err := json.Marshal(info) - return string(data), err + + for _, value := range containerList { + manifestID, containerName := parseDockerName(value.Names[0]) + if manifestID != podID { + continue + } + inspectResult, err := kl.DockerClient.InspectContainer(value.ID) + if err != nil { + return nil, err + } + if inspectResult == nil { + // Why did we not get an error? + info[containerName] = docker.Container{} + } else { + info[containerName] = *inspectResult + } + } + return info, nil } //Returns stats (from Cadvisor) for a container diff --git a/pkg/kubelet/kubelet_server.go b/pkg/kubelet/kubelet_server.go index 4c7f45ea45c..0883dc61454 100644 --- a/pkg/kubelet/kubelet_server.go +++ b/pkg/kubelet/kubelet_server.go @@ -24,6 +24,7 @@ import ( "net/url" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "gopkg.in/v1/yaml" ) @@ -36,7 +37,7 @@ type KubeletServer struct { // For testablitiy. type kubeletInterface interface { GetContainerStats(name string) (*api.ContainerStats, error) - GetContainerInfo(name string) (string, error) + GetPodInfo(name string) (api.PodInfo, error) } func (s *KubeletServer) error(w http.ResponseWriter, err error) { @@ -45,6 +46,10 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) { } func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + logger := apiserver.MakeLogged(req, w) + w = logger + defer logger.Log() + u, err := url.ParseRequestURI(req.RequestURI) if err != nil { s.error(w, err) @@ -104,16 +109,20 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) w.Header().Add("Content-type", "application/json") w.Write(data) - case u.Path == "/containerInfo": - // NOTE: The master appears to pass a Pod.ID - // The server appears to pass a Pod.ID - container := u.Query().Get("container") - if len(container) == 0 { + case u.Path == "/podInfo": + podID := u.Query().Get("podID") + if len(podID) == 0 { w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, "Missing container selector arg.") + fmt.Fprint(w, "Missing 'podID=' query entry.") return } - data, err := s.Kubelet.GetContainerInfo(container) + info, err := s.Kubelet.GetPodInfo(podID) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Internal Error: %v", err) + return + } + data, err := json.Marshal(info) if err != nil { w.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(w, "Internal Error: %v", err) @@ -121,7 +130,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } w.WriteHeader(http.StatusOK) w.Header().Add("Content-type", "application/json") - fmt.Fprint(w, data) + w.Write(data) default: w.WriteHeader(http.StatusNotFound) fmt.Fprint(w, "Not found.") diff --git a/pkg/kubelet/kubelet_server_test.go b/pkg/kubelet/kubelet_server_test.go index 22891e2a42f..6b2b14f86f0 100644 --- a/pkg/kubelet/kubelet_server_test.go +++ b/pkg/kubelet/kubelet_server_test.go @@ -28,14 +28,15 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/fsouza/go-dockerclient" ) type fakeKubelet struct { - infoFunc func(name string) (string, error) + infoFunc func(name string) (api.PodInfo, error) statsFunc func(name string) (*api.ContainerStats, error) } -func (fk *fakeKubelet) GetContainerInfo(name string) (string, error) { +func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) { return fk.infoFunc(name) } @@ -114,16 +115,16 @@ func TestContainers(t *testing.T) { } } -func TestContainerInfo(t *testing.T) { +func TestPodInfo(t *testing.T) { fw := makeServerTest() - expected := "good container info string" - fw.fakeKubelet.infoFunc = func(name string) (string, error) { - if name == "goodcontainer" { + expected := api.PodInfo{"goodpod": docker.Container{ID: "myContainerID"}} + fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) { + if name == "goodpod" { return expected, nil } - return "", fmt.Errorf("bad container") + return nil, fmt.Errorf("bad pod") } - resp, err := http.Get(fw.testHttpServer.URL + "/containerInfo?container=goodcontainer") + resp, err := http.Get(fw.testHttpServer.URL + "/podInfo?podID=goodpod") if err != nil { t.Errorf("Got error GETing: %v", err) } @@ -131,7 +132,11 @@ func TestContainerInfo(t *testing.T) { if err != nil { t.Errorf("Error reading body: %v", err) } - if got != expected { + expectedBytes, err := json.Marshal(expected) + if err != nil { + t.Fatalf("Unexpected marshal error %v", err) + } + if got != string(expectedBytes) { t.Errorf("Expected: '%v', got: '%v'", expected, got) } } diff --git a/pkg/master/master.go b/pkg/master/master.go index 1c60eaba5a3..5b17457fcb3 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -44,53 +44,49 @@ type Master struct { } // Returns a memory (not etcd) backed apiserver. -func NewMemoryServer(minions []string, cloud cloudprovider.Interface) *Master { +func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master { m := &Master{ podRegistry: registry.MakeMemoryRegistry(), controllerRegistry: registry.MakeMemoryRegistry(), serviceRegistry: registry.MakeMemoryRegistry(), minionRegistry: registry.MakeMinionRegistry(minions), } - m.init(cloud) + m.init(cloud, podInfoGetter) return m } // Returns a new apiserver. -func New(etcdServers, minions []string, cloud cloudprovider.Interface, minionRegexp string) *Master { +func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master { etcdClient := etcd.NewClient(etcdServers) - var minionRegistry registry.MinionRegistry - if cloud != nil && len(minionRegexp) > 0 { - var err error - minionRegistry, err = registry.MakeCloudMinionRegistry(cloud, minionRegexp) - if err != nil { - glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err) - minionRegistry = registry.MakeMinionRegistry(minions) - } - } else { - minionRegistry = registry.MakeMinionRegistry(minions) - } + minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp) m := &Master{ podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), minionRegistry: minionRegistry, } - m.init(cloud) + m.init(cloud, podInfoGetter) return m } -func (m *Master) init(cloud cloudprovider.Interface) { - containerInfo := &client.HTTPContainerInfo{ - Client: http.DefaultClient, - Port: 10250, +func minionRegistryMaker(minions []string, cloud cloudprovider.Interface, minionRegexp string) registry.MinionRegistry { + if cloud != nil && len(minionRegexp) > 0 { + minionRegistry, err := registry.MakeCloudMinionRegistry(cloud, minionRegexp) + if err != nil { + glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err) + } + return minionRegistry } + return registry.MakeMinionRegistry(minions) +} +func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInfoGetter) { m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) - podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30) + podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30) go podCache.Loop() s := scheduler.MakeFirstFitScheduler(m.podRegistry, m.random) m.storage = map[string]apiserver.RESTStorage{ - "pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, s, m.minionRegistry, cloud, podCache), + "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache), "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry, m.podRegistry), "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), "minions": registry.MakeMinionRegistryStorage(m.minionRegistry), diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 33b6da00819..d075e6cde57 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -17,9 +17,11 @@ limitations under the License. package master import ( + "errors" "sync" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" @@ -30,37 +32,38 @@ import ( // PodCache contains both a cache of container information, as well as the mechanism for keeping // that cache up to date. type PodCache struct { - containerInfo client.ContainerInfo + containerInfo client.PodInfoGetter pods registry.PodRegistry - podInfo map[string]interface{} - period time.Duration - podLock sync.Mutex + // This is a map of pod id to a map of container name to the + podInfo map[string]api.PodInfo + period time.Duration + podLock sync.Mutex } -func NewPodCache(info client.ContainerInfo, pods registry.PodRegistry, period time.Duration) *PodCache { +func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache { return &PodCache{ containerInfo: info, pods: pods, - podInfo: map[string]interface{}{}, + podInfo: map[string]api.PodInfo{}, period: period, } } -// Implements the ContainerInfo interface -// The returned value should be treated as read-only -func (p *PodCache) GetContainerInfo(host, id string) (interface{}, error) { +// Implements the PodInfoGetter interface. +// The returned value should be treated as read-only. +func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { p.podLock.Lock() defer p.podLock.Unlock() - value, ok := p.podInfo[id] + value, ok := p.podInfo[podID] if !ok { - return nil, nil + return nil, errors.New("No cached pod info") } else { return value, nil } } -func (p *PodCache) updateContainerInfo(host, id string) error { - info, err := p.containerInfo.GetContainerInfo(host, id) +func (p *PodCache) updatePodInfo(host, id string) error { + info, err := p.containerInfo.GetPodInfo(host, id) if err != nil { return err } @@ -78,7 +81,7 @@ func (p *PodCache) UpdateAllContainers() { return } for _, pod := range pods { - err := p.updateContainerInfo(pod.CurrentState.Host, pod.ID) + err := p.updatePodInfo(pod.CurrentState.Host, pod.ID) if err != nil { glog.Errorf("Error synchronizing container: %#v", err) } diff --git a/pkg/master/pod_cache_test.go b/pkg/master/pod_cache_test.go index 9db9d8db4bb..f2eb86c1d8e 100644 --- a/pkg/master/pod_cache_test.go +++ b/pkg/master/pod_cache_test.go @@ -23,16 +23,17 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry" + "github.com/fsouza/go-dockerclient" ) -type FakeContainerInfo struct { +type FakePodInfoGetter struct { host string id string - data interface{} + data api.PodInfo err error } -func (f *FakeContainerInfo) GetContainerInfo(host, id string) (interface{}, error) { +func (f *FakePodInfoGetter) GetPodInfo(host, id string) (api.PodInfo, error) { f.host = host f.id = id return f.data, f.err @@ -41,53 +42,49 @@ func (f *FakeContainerInfo) GetContainerInfo(host, id string) (interface{}, erro func TestPodCacheGet(t *testing.T) { cache := NewPodCache(nil, nil, time.Second*1) - pod := api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - } - cache.podInfo["foo"] = pod + expected := api.PodInfo{"foo": docker.Container{ID: "foo"}} + cache.podInfo["foo"] = expected - info, err := cache.GetContainerInfo("host", "foo") + info, err := cache.GetPodInfo("host", "foo") if err != nil { t.Errorf("Unexpected error: %#v", err) } - if !reflect.DeepEqual(info, pod) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) + if !reflect.DeepEqual(info, expected) { + t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info) } } func TestPodCacheGetMissing(t *testing.T) { cache := NewPodCache(nil, nil, time.Second*1) - info, err := cache.GetContainerInfo("host", "foo") - if err != nil { - t.Errorf("Unexpected error: %#v", err) + info, err := cache.GetPodInfo("host", "foo") + if err == nil { + t.Errorf("Unexpected non-error: %#v", err) } if info != nil { t.Errorf("Unexpected info: %#v", info) } } -func TestPodGetContainerInfo(t *testing.T) { - pod := api.Pod{ - JSONBase: api.JSONBase{ID: "foo"}, - } - fake := FakeContainerInfo{ - data: pod, +func TestPodGetPodInfoGetter(t *testing.T) { + expected := api.PodInfo{"foo": docker.Container{ID: "foo"}} + fake := FakePodInfoGetter{ + data: expected, } cache := NewPodCache(&fake, nil, time.Second*1) - cache.updateContainerInfo("host", "foo") + cache.updatePodInfo("host", "foo") if fake.host != "host" || fake.id != "foo" { t.Errorf("Unexpected access: %#v", fake) } - info, err := cache.GetContainerInfo("host", "foo") + info, err := cache.GetPodInfo("host", "foo") if err != nil { t.Errorf("Unexpected error: %#v", err) } - if !reflect.DeepEqual(info, pod) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) + if !reflect.DeepEqual(info, expected) { + t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info) } } @@ -98,10 +95,13 @@ func TestPodUpdateAllContainers(t *testing.T) { Host: "machine", }, } + pods := []api.Pod{pod} mockRegistry := registry.MakeMockPodRegistry(pods) - fake := FakeContainerInfo{ - data: pod, + + expected := api.PodInfo{"foo": docker.Container{ID: "foo"}} + fake := FakePodInfoGetter{ + data: expected, } cache := NewPodCache(&fake, mockRegistry, time.Second*1) @@ -111,11 +111,11 @@ func TestPodUpdateAllContainers(t *testing.T) { t.Errorf("Unexpected access: %#v", fake) } - info, err := cache.GetContainerInfo("machine", "foo") + info, err := cache.GetPodInfo("machine", "foo") if err != nil { t.Errorf("Unexpected error: %#v", err) } - if !reflect.DeepEqual(info, pod) { - t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) + if !reflect.DeepEqual(info, expected) { + t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info) } } diff --git a/pkg/registry/pod_registry.go b/pkg/registry/pod_registry.go index 6b1321366e4..bc6415bc294 100644 --- a/pkg/registry/pod_registry.go +++ b/pkg/registry/pod_registry.go @@ -33,8 +33,8 @@ import ( // PodRegistryStorage implements the RESTStorage interface in terms of a PodRegistry type PodRegistryStorage struct { registry PodRegistry - containerInfo client.ContainerInfo - podCache client.ContainerInfo + podInfoGetter client.PodInfoGetter + podCache client.PodInfoGetter scheduler scheduler.Scheduler minionLister scheduler.MinionLister cloud cloudprovider.Interface @@ -44,20 +44,20 @@ type PodRegistryStorage struct { // MakePodRegistryStorage makes a RESTStorage object for a pod registry. // Parameters: // registry: The pod registry -// containerInfo: Source of fresh container info +// podInfoGetter: Source of fresh container info // scheduler: The scheduler for assigning pods to machines // minionLister: Object which can list available minions for the scheduler // cloud: Interface to a cloud provider (may be null) // podCache: Source of cached container info func MakePodRegistryStorage(registry PodRegistry, - containerInfo client.ContainerInfo, + podInfoGetter client.PodInfoGetter, scheduler scheduler.Scheduler, minionLister scheduler.MinionLister, cloud cloudprovider.Interface, - podCache client.ContainerInfo) apiserver.RESTStorage { + podCache client.PodInfoGetter) apiserver.RESTStorage { return &PodRegistryStorage{ registry: registry, - containerInfo: containerInfo, + podInfoGetter: podInfoGetter, scheduler: scheduler, minionLister: minionLister, cloud: cloud, @@ -71,34 +71,56 @@ func (storage *PodRegistryStorage) List(selector labels.Selector) (interface{}, pods, err := storage.registry.ListPods(selector) if err == nil { result.Items = pods - // Get cached info for the list currently. - // TODO: Optionally use fresh info - if storage.podCache != nil { - for ix, pod := range pods { - info, err := storage.podCache.GetContainerInfo(pod.CurrentState.Host, pod.ID) - if err != nil { - glog.Errorf("Error getting container info: %#v", err) - continue - } - result.Items[ix].CurrentState.Info = info - } + for i := range result.Items { + storage.fillPodInfo(&result.Items[i]) } } return result, err } -func makePodStatus(info interface{}) api.PodStatus { - if state, ok := info.(map[string]interface{})["State"]; ok { - if running, ok := state.(map[string]interface{})["Running"]; ok { - if running.(bool) { - return api.PodRunning +func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) { + // Get cached info for the list currently. + // TODO: Optionally use fresh info + if storage.podCache != nil { + info, err := storage.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID) + if err != nil { + glog.Errorf("Error getting container info: %#v", err) + return + } + pod.CurrentState.Info = info + } +} + +func makePodStatus(pod *api.Pod) api.PodStatus { + if pod.CurrentState.Info == nil { + return api.PodPending + } + running := 0 + stopped := 0 + unknown := 0 + for _, container := range pod.DesiredState.Manifest.Containers { + if info, ok := pod.CurrentState.Info[container.Name]; ok { + if info.State.Running { + running++ } else { - return api.PodStopped + stopped++ } + } else { + unknown++ } } - return api.PodPending + + switch { + case running > 0 && stopped == 0 && unknown == 0: + return api.PodRunning + case running == 0 && stopped > 0 && unknown == 0: + return api.PodStopped + case running == 0 && stopped == 0 && unknown > 0: + return api.PodPending + default: + return api.PodPending + } } func getInstanceIP(cloud cloudprovider.Interface, host string) string { @@ -129,13 +151,9 @@ func (storage *PodRegistryStorage) Get(id string) (interface{}, error) { if pod == nil { return pod, nil } - if storage.containerInfo != nil { - info, err := storage.containerInfo.GetContainerInfo(pod.CurrentState.Host, id) - if err != nil { - return pod, err - } - pod.CurrentState.Info = info - pod.CurrentState.Status = makePodStatus(info) + if storage.podCache != nil || storage.podInfoGetter != nil { + storage.fillPodInfo(pod) + pod.CurrentState.Status = makePodStatus(pod) } pod.CurrentState.HostIP = getInstanceIP(storage.cloud, pod.CurrentState.Host) diff --git a/pkg/registry/pod_registry_test.go b/pkg/registry/pod_registry_test.go index 42505ca876c..c5a42d081ba 100644 --- a/pkg/registry/pod_registry_test.go +++ b/pkg/registry/pod_registry_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/fsouza/go-dockerclient" ) func expectNoError(t *testing.T, err error) { @@ -153,29 +154,88 @@ func TestGetPodCloud(t *testing.T) { } func TestMakePodStatus(t *testing.T) { - status := makePodStatus(map[string]interface{}{}) + desiredState := api.PodState{ + Manifest: api.ContainerManifest{ + Containers: []api.Container{ + {Name: "containerA"}, + {Name: "containerB"}, + }, + }, + } + pod := &api.Pod{DesiredState: desiredState} + status := makePodStatus(pod) if status != api.PodPending { t.Errorf("Expected 'Pending', got '%s'", status) } - status = makePodStatus(map[string]interface{}{ - "State": map[string]interface{}{ - "Running": false, + runningState := docker.Container{ + State: docker.State{ + Running: true, }, - }) + } + stoppedState := docker.Container{ + State: docker.State{ + Running: false, + }, + } + // All running. + pod = &api.Pod{ + DesiredState: desiredState, + CurrentState: api.PodState{ + Info: map[string]docker.Container{ + "containerA": runningState, + "containerB": runningState, + }, + }, + } + status = makePodStatus(pod) + if status != api.PodRunning { + t.Errorf("Expected 'Running', got '%s'", status) + } + + // All stopped. + pod = &api.Pod{ + DesiredState: desiredState, + CurrentState: api.PodState{ + Info: map[string]docker.Container{ + "containerA": stoppedState, + "containerB": stoppedState, + }, + }, + } + status = makePodStatus(pod) if status != api.PodStopped { t.Errorf("Expected 'Stopped', got '%s'", status) } - status = makePodStatus(map[string]interface{}{ - "State": map[string]interface{}{ - "Running": true, + // Mixed state. + pod = &api.Pod{ + DesiredState: desiredState, + CurrentState: api.PodState{ + Info: map[string]docker.Container{ + "containerA": runningState, + "containerB": stoppedState, + }, }, - }) + } + status = makePodStatus(pod) + if status != api.PodPending { + t.Errorf("Expected 'Pending', got '%s'", status) + } - if status != api.PodRunning { - t.Errorf("Expected 'Running', got '%s'", status) + // Mixed state. + pod = &api.Pod{ + DesiredState: desiredState, + CurrentState: api.PodState{ + Info: map[string]docker.Container{ + "containerA": runningState, + }, + }, + } + status = makePodStatus(pod) + if status != api.PodPending { + t.Errorf("Expected 'Pending', got '%s'", status) } }