Merge pull request #318 from lavalamp/test_fix

Fix interface{} in api/types
This commit is contained in:
brendandburns 2014-07-01 22:13:38 -07:00
commit e268b73596
15 changed files with 345 additions and 173 deletions

View File

@ -21,8 +21,10 @@ package main
import ( import (
"flag" "flag"
"net" "net"
"net/http"
"strconv" "strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -35,6 +37,7 @@ var (
apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'") apiPrefix = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'")
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs") minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs")
minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.")
etcdServerList, machineList util.StringList etcdServerList, machineList util.StringList
) )
@ -68,11 +71,16 @@ func main() {
} }
} }
podInfoGetter := &client.HTTPPodInfoGetter{
Client: http.DefaultClient,
Port: *minionPort,
}
var m *master.Master var m *master.Master
if len(etcdServerList) > 0 { if len(etcdServerList) > 0 {
m = master.New(etcdServerList, machineList, cloud, *minionRegexp) m = master.New(etcdServerList, machineList, podInfoGetter, cloud, *minionRegexp)
} else { } else {
m = master.NewMemoryServer(machineList, cloud) m = master.NewMemoryServer(machineList, podInfoGetter, cloud)
} }
glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix)) glog.Fatal(m.Run(net.JoinHostPort(*address, strconv.Itoa(int(*port))), *apiPrefix))

View File

@ -41,6 +41,29 @@ var (
fakeDocker1, fakeDocker2 kubelet.FakeDockerClient fakeDocker1, fakeDocker2 kubelet.FakeDockerClient
) )
type fakePodInfoGetter struct{}
func (fakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
// This is a horrible hack to get around the fact that we can't provide
// different port numbers per kubelet...
var c client.PodInfoGetter
switch host {
case "localhost":
c = &client.HTTPPodInfoGetter{
Client: http.DefaultClient,
Port: 10250,
}
case "machine":
c = &client.HTTPPodInfoGetter{
Client: http.DefaultClient,
Port: 10251,
}
default:
glog.Fatalf("Can't get info for: %v, %v", host, podID)
}
return c.GetPodInfo("localhost", podID)
}
func startComponents(manifestURL string) (apiServerURL string) { func startComponents(manifestURL string) (apiServerURL string) {
// Setup // Setup
servers := []string{"http://localhost:4001"} servers := []string{"http://localhost:4001"}
@ -48,7 +71,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
machineList := []string{"localhost", "machine"} machineList := []string{"localhost", "machine"}
// Master // Master
m := master.New(servers, machineList, nil, "") m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "")
apiserver := httptest.NewServer(m.ConstructHandler("/api/v1beta1")) apiserver := httptest.NewServer(m.ConstructHandler("/api/v1beta1"))
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(apiserver.URL, nil)) controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(apiserver.URL, nil))
@ -64,7 +87,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
SyncFrequency: 5 * time.Second, SyncFrequency: 5 * time.Second,
HTTPCheckFrequency: 5 * time.Second, HTTPCheckFrequency: 5 * time.Second,
} }
go myKubelet.RunKubelet("", manifestURL, servers[0], "localhost", "", 0) go myKubelet.RunKubelet("", "", manifestURL, servers[0], "localhost", 10250)
// Create a second kubelet so that the guestbook example's two redis slaves both // Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule. // have a place they can schedule.
@ -76,7 +99,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
SyncFrequency: 5 * time.Second, SyncFrequency: 5 * time.Second,
HTTPCheckFrequency: 5 * time.Second, HTTPCheckFrequency: 5 * time.Second,
} }
go otherKubelet.RunKubelet("", "", servers[0], "localhost", "", 0) go otherKubelet.RunKubelet("", "", "", servers[0], "localhost", 10251)
return apiserver.URL return apiserver.URL
} }
@ -150,7 +173,7 @@ func main() {
if len(createdPods) != 7 { if len(createdPods) != 7 {
glog.Fatalf("Unexpected list of created pods:\n\n%#v\n\n%#v\n\n%#v\n\n", createdPods.List(), fakeDocker1.Created, fakeDocker2.Created) glog.Fatalf("Unexpected list of created pods:\n\n%#v\n\n%#v\n\n%#v\n\n", createdPods.List(), fakeDocker1.Created, fakeDocker2.Created)
} }
glog.Infof("OK") glog.Infof("OK - found created pods: %#v", createdPods.List())
} }
// Serve a file for kubelet to read. // Serve a file for kubelet to read.

View File

@ -74,11 +74,11 @@ func readConfig(storage string) []byte {
} }
data, err := ioutil.ReadFile(*config) data, err := ioutil.ReadFile(*config)
if err != nil { if err != nil {
glog.Fatalf("Unable to read %v: %#v\n", *config, err) glog.Fatalf("Unable to read %v: %v\n", *config, err)
} }
data, err = kubecfg.ToWireFormat(data, storage) data, err = kubecfg.ToWireFormat(data, storage)
if err != nil { if err != nil {
glog.Fatalf("Error parsing %v as an object for %v: %#v\n", *config, storage, err) glog.Fatalf("Error parsing %v as an object for %v: %v\n", *config, storage, err)
} }
if *verbose { if *verbose {
glog.Infof("Parsed config file successfully; sending:\n%v\n", string(data)) glog.Infof("Parsed config file successfully; sending:\n%v\n", string(data))
@ -122,7 +122,7 @@ func main() {
if secure { if secure {
auth, err = kubecfg.LoadAuthInfo(*authConfig) auth, err = kubecfg.LoadAuthInfo(*authConfig)
if err != nil { if err != nil {
glog.Fatalf("Error loading auth: %#v", err) glog.Fatalf("Error loading auth: %v", err)
} }
} }
@ -175,7 +175,8 @@ func executeAPIRequest(method string, s *kube_client.Client) bool {
if method == "create" || method == "update" { if method == "create" || method == "update" {
r.Body(readConfig(parseStorage())) r.Body(readConfig(parseStorage()))
} }
obj, err := r.Do().Get() result := r.Do()
obj, err := result.Get()
if err != nil { if err != nil {
glog.Fatalf("Got request error: %v\n", err) glog.Fatalf("Got request error: %v\n", err)
return false return false
@ -191,7 +192,8 @@ func executeAPIRequest(method string, s *kube_client.Client) bool {
} }
if err = printer.PrintObj(obj, os.Stdout); err != nil { if err = printer.PrintObj(obj, os.Stdout); err != nil {
glog.Fatalf("Failed to print: %#v\nRaw received object:\n%#v\n", err, obj) body, _ := result.Raw()
glog.Fatalf("Failed to print: %v\nRaw received object:\n%#v\n\nBody received: %v", err, obj, string(body))
} }
fmt.Print("\n") fmt.Print("\n")
@ -223,7 +225,7 @@ func executeControllerRequest(method string, c *kube_client.Client) bool {
replicas, err := strconv.Atoi(flag.Arg(2)) replicas, err := strconv.Atoi(flag.Arg(2))
name := flag.Arg(3) name := flag.Arg(3)
if err != nil { if err != nil {
glog.Fatalf("Error parsing replicas: %#v", err) glog.Fatalf("Error parsing replicas: %v", err)
} }
err = kubecfg.RunController(image, name, replicas, c, *portSpec, *servicePort) err = kubecfg.RunController(image, name, replicas, c, *portSpec, *servicePort)
case "resize": case "resize":
@ -234,14 +236,14 @@ func executeControllerRequest(method string, c *kube_client.Client) bool {
name := args[1] name := args[1]
replicas, err := strconv.Atoi(args[2]) replicas, err := strconv.Atoi(args[2])
if err != nil { if err != nil {
glog.Fatalf("Error parsing replicas: %#v", err) glog.Fatalf("Error parsing replicas: %v", err)
} }
err = kubecfg.ResizeController(name, replicas, c) err = kubecfg.ResizeController(name, replicas, c)
default: default:
return false return false
} }
if err != nil { if err != nil {
glog.Fatalf("Error: %#v", err) glog.Fatalf("Error: %v", err)
} }
return true return true
} }

View File

@ -89,5 +89,5 @@ func main() {
SyncFrequency: *syncFrequency, SyncFrequency: *syncFrequency,
HTTPCheckFrequency: *httpCheckFrequency, HTTPCheckFrequency: *httpCheckFrequency,
} }
my_kubelet.RunKubelet(*config, *manifestUrl, *etcdServers, *address, *dockerEndpoint, *port) my_kubelet.RunKubelet(*dockerEndpoint, *config, *manifestUrl, *etcdServers, *address, *port)
} }

View File

@ -16,6 +16,10 @@ limitations under the License.
package api package api
import (
"github.com/fsouza/go-dockerclient"
)
// Common string formats // Common string formats
// --------------------- // ---------------------
// Many fields in this API have formatting requirements. The commonly used // Many fields in this API have formatting requirements. The commonly used
@ -153,13 +157,22 @@ const (
PodStopped PodStatus = "Stopped" PodStopped PodStatus = "Stopped"
) )
// PodInfo contains one entry for every container with available info.
type PodInfo map[string]docker.Container
// PodState is the state of a pod, used as either input (desired state) or output (current state) // PodState is the state of a pod, used as either input (desired state) or output (current state)
type PodState struct { type PodState struct {
Manifest ContainerManifest `json:"manifest,omitempty" yaml:"manifest,omitempty"` Manifest ContainerManifest `json:"manifest,omitempty" yaml:"manifest,omitempty"`
Status PodStatus `json:"status,omitempty" yaml:"status,omitempty"` Status PodStatus `json:"status,omitempty" yaml:"status,omitempty"`
Host string `json:"host,omitempty" yaml:"host,omitempty"` Host string `json:"host,omitempty" yaml:"host,omitempty"`
HostIP string `json:"hostIP,omitempty" yaml:"hostIP,omitempty"` HostIP string `json:"hostIP,omitempty" yaml:"hostIP,omitempty"`
Info interface{} `json:"info,omitempty" yaml:"info,omitempty"`
// The key of this map is the *name* of the container within the manifest; it has one
// entry per container in the manifest. The value of this map is currently the output
// of `docker inspect`. This output format is *not* final and should not be relied
// upon.
// TODO: Make real decisions about what our info should look like.
Info PodInfo `json:"info,omitempty" yaml:"info,omitempty"`
} }
type PodList struct { type PodList struct {

View File

@ -20,25 +20,35 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
) )
// ContainerInfo is an interface for things that can get information about a container. // PodInfoGetter is an interface for things that can get information about a pod's containers.
// Injectable for easy testing. // Injectable for easy testing.
type ContainerInfo interface { type PodInfoGetter interface {
// GetContainerInfo returns information about container 'name' on 'host' // GetPodInfo returns information about all containers which are part
// Returns an untyped interface, and an error, if one occurs // Returns an api.PodInfo, or an error if one occurs.
GetContainerInfo(host, name string) (interface{}, error) GetPodInfo(host, podID string) (api.PodInfo, error)
} }
// The default implementation, accesses the kubelet over HTTP // The default implementation, accesses the kubelet over HTTP
type HTTPContainerInfo struct { type HTTPPodInfoGetter struct {
Client *http.Client Client *http.Client
Port uint Port uint
} }
func (c *HTTPContainerInfo) GetContainerInfo(host, name string) (interface{}, error) { func (c *HTTPPodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
request, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/containerInfo?container=%s", host, c.Port, name), nil) request, err := http.NewRequest(
"GET",
fmt.Sprintf(
"http://%s/podInfo?podID=%s",
net.JoinHostPort(host, strconv.FormatUint(uint64(c.Port), 10)),
podID),
nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -51,17 +61,21 @@ func (c *HTTPContainerInfo) GetContainerInfo(host, name string) (interface{}, er
if err != nil { if err != nil {
return nil, err return nil, err
} }
var data interface{} // Check that this data can be unmarshalled
err = json.Unmarshal(body, &data) info := api.PodInfo{}
return data, err err = json.Unmarshal(body, &info)
if err != nil {
return nil, err
}
return info, nil
} }
// Useful for testing. // Useful for testing.
type FakeContainerInfo struct { type FakePodInfoGetter struct {
data interface{} data api.PodInfo
err error err error
} }
func (c *FakeContainerInfo) GetContainerInfo(host, name string) (interface{}, error) { func (c *FakePodInfoGetter) GetPodInfo(host, podID string) (api.PodInfo, error) {
return c.data, c.err return c.data, c.err
} }

View File

@ -25,7 +25,9 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
) )
// TODO: This doesn't reduce typing enough to make it worth the less readable errors. Remove. // TODO: This doesn't reduce typing enough to make it worth the less readable errors. Remove.
@ -35,11 +37,15 @@ func expectNoError(t *testing.T, err error) {
} }
} }
func TestHTTPContainerInfo(t *testing.T) { func TestHTTPPodInfoGetter(t *testing.T) {
body := `{"items":[]}` expectObj := api.PodInfo{
"myID": docker.Container{ID: "myID"},
}
body, err := json.Marshal(expectObj)
expectNoError(t, err)
fakeHandler := util.FakeHandler{ fakeHandler := util.FakeHandler{
StatusCode: 200, StatusCode: 200,
ResponseBody: body, ResponseBody: string(body),
} }
testServer := httptest.NewServer(&fakeHandler) testServer := httptest.NewServer(&fakeHandler)
@ -49,14 +55,15 @@ func TestHTTPContainerInfo(t *testing.T) {
port, err := strconv.Atoi(parts[1]) port, err := strconv.Atoi(parts[1])
expectNoError(t, err) expectNoError(t, err)
containerInfo := &HTTPContainerInfo{ podInfoGetter := &HTTPPodInfoGetter{
Client: http.DefaultClient, Client: http.DefaultClient,
Port: uint(port), Port: uint(port),
} }
data, err := containerInfo.GetContainerInfo(parts[0], "foo") gotObj, err := podInfoGetter.GetPodInfo(parts[0], "foo")
expectNoError(t, err) expectNoError(t, err)
dataString, _ := json.Marshal(data)
if string(dataString) != body { // reflect.DeepEqual(expectObj, gotObj) doesn't handle blank times well
t.Errorf("Unexpected response. Expected: %s, received %s", body, string(dataString)) if len(gotObj) != len(expectObj) || expectObj["myID"].ID != gotObj["myID"].ID {
t.Errorf("Unexpected response. Expected: %#v, received %#v", expectObj, gotObj)
} }
} }

View File

@ -101,9 +101,9 @@ const (
// Starts background goroutines. If config_path, manifest_url, or address are empty, // Starts background goroutines. If config_path, manifest_url, or address are empty,
// they are not watched. Never returns. // they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(config_path, manifest_url, etcd_servers, address, endpoint string, port uint) { func (kl *Kubelet) RunKubelet(dockerEndpoint, config_path, manifest_url, etcd_servers, address string, port uint) {
if kl.DockerPuller == nil { if kl.DockerPuller == nil {
kl.DockerPuller = MakeDockerPuller(endpoint) kl.DockerPuller = MakeDockerPuller(dockerEndpoint)
} }
updateChannel := make(chan manifestUpdate) updateChannel := make(chan manifestUpdate)
if config_path != "" { if config_path != "" {
@ -783,18 +783,32 @@ func (kl *Kubelet) getContainerIdFromName(name string) (DockerId, bool, error) {
return "", false, nil return "", false, nil
} }
// Returns docker info for a container // Returns docker info for all containers in the pod/manifest
func (kl *Kubelet) GetContainerInfo(name string) (string, error) { func (kl *Kubelet) GetPodInfo(podID string) (api.PodInfo, error) {
dockerId, found, err := kl.getContainerIdFromName(name) info := api.PodInfo{}
if err != nil || !found {
return "{}", err containerList, err := kl.DockerClient.ListContainers(docker.ListContainersOptions{})
}
info, err := kl.DockerClient.InspectContainer(string(dockerId))
if err != nil { if err != nil {
return "{}", err return nil, err
} }
data, err := json.Marshal(info)
return string(data), err for _, value := range containerList {
manifestID, containerName := parseDockerName(value.Names[0])
if manifestID != podID {
continue
}
inspectResult, err := kl.DockerClient.InspectContainer(value.ID)
if err != nil {
return nil, err
}
if inspectResult == nil {
// Why did we not get an error?
info[containerName] = docker.Container{}
} else {
info[containerName] = *inspectResult
}
}
return info, nil
} }
//Returns stats (from Cadvisor) for a container //Returns stats (from Cadvisor) for a container

View File

@ -24,6 +24,7 @@ import (
"net/url" "net/url"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"gopkg.in/v1/yaml" "gopkg.in/v1/yaml"
) )
@ -36,7 +37,7 @@ type KubeletServer struct {
// For testablitiy. // For testablitiy.
type kubeletInterface interface { type kubeletInterface interface {
GetContainerStats(name string) (*api.ContainerStats, error) GetContainerStats(name string) (*api.ContainerStats, error)
GetContainerInfo(name string) (string, error) GetPodInfo(name string) (api.PodInfo, error)
} }
func (s *KubeletServer) error(w http.ResponseWriter, err error) { func (s *KubeletServer) error(w http.ResponseWriter, err error) {
@ -45,6 +46,10 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) {
} }
func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
logger := apiserver.MakeLogged(req, w)
w = logger
defer logger.Log()
u, err := url.ParseRequestURI(req.RequestURI) u, err := url.ParseRequestURI(req.RequestURI)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
@ -104,16 +109,20 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Header().Add("Content-type", "application/json") w.Header().Add("Content-type", "application/json")
w.Write(data) w.Write(data)
case u.Path == "/containerInfo": case u.Path == "/podInfo":
// NOTE: The master appears to pass a Pod.ID podID := u.Query().Get("podID")
// The server appears to pass a Pod.ID if len(podID) == 0 {
container := u.Query().Get("container")
if len(container) == 0 {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "Missing container selector arg.") fmt.Fprint(w, "Missing 'podID=' query entry.")
return return
} }
data, err := s.Kubelet.GetContainerInfo(container) info, err := s.Kubelet.GetPodInfo(podID)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Error: %v", err)
return
}
data, err := json.Marshal(info)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Internal Error: %v", err) fmt.Fprintf(w, "Internal Error: %v", err)
@ -121,7 +130,7 @@ func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Header().Add("Content-type", "application/json") w.Header().Add("Content-type", "application/json")
fmt.Fprint(w, data) w.Write(data)
default: default:
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
fmt.Fprint(w, "Not found.") fmt.Fprint(w, "Not found.")

View File

@ -28,14 +28,15 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
) )
type fakeKubelet struct { type fakeKubelet struct {
infoFunc func(name string) (string, error) infoFunc func(name string) (api.PodInfo, error)
statsFunc func(name string) (*api.ContainerStats, error) statsFunc func(name string) (*api.ContainerStats, error)
} }
func (fk *fakeKubelet) GetContainerInfo(name string) (string, error) { func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) {
return fk.infoFunc(name) return fk.infoFunc(name)
} }
@ -114,16 +115,16 @@ func TestContainers(t *testing.T) {
} }
} }
func TestContainerInfo(t *testing.T) { func TestPodInfo(t *testing.T) {
fw := makeServerTest() fw := makeServerTest()
expected := "good container info string" expected := api.PodInfo{"goodpod": docker.Container{ID: "myContainerID"}}
fw.fakeKubelet.infoFunc = func(name string) (string, error) { fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) {
if name == "goodcontainer" { if name == "goodpod" {
return expected, nil return expected, nil
} }
return "", fmt.Errorf("bad container") return nil, fmt.Errorf("bad pod")
} }
resp, err := http.Get(fw.testHttpServer.URL + "/containerInfo?container=goodcontainer") resp, err := http.Get(fw.testHttpServer.URL + "/podInfo?podID=goodpod")
if err != nil { if err != nil {
t.Errorf("Got error GETing: %v", err) t.Errorf("Got error GETing: %v", err)
} }
@ -131,7 +132,11 @@ func TestContainerInfo(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Error reading body: %v", err) t.Errorf("Error reading body: %v", err)
} }
if got != expected { expectedBytes, err := json.Marshal(expected)
if err != nil {
t.Fatalf("Unexpected marshal error %v", err)
}
if got != string(expectedBytes) {
t.Errorf("Expected: '%v', got: '%v'", expected, got) t.Errorf("Expected: '%v', got: '%v'", expected, got)
} }
} }

View File

@ -44,53 +44,49 @@ type Master struct {
} }
// Returns a memory (not etcd) backed apiserver. // Returns a memory (not etcd) backed apiserver.
func NewMemoryServer(minions []string, cloud cloudprovider.Interface) *Master { func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master {
m := &Master{ m := &Master{
podRegistry: registry.MakeMemoryRegistry(), podRegistry: registry.MakeMemoryRegistry(),
controllerRegistry: registry.MakeMemoryRegistry(), controllerRegistry: registry.MakeMemoryRegistry(),
serviceRegistry: registry.MakeMemoryRegistry(), serviceRegistry: registry.MakeMemoryRegistry(),
minionRegistry: registry.MakeMinionRegistry(minions), minionRegistry: registry.MakeMinionRegistry(minions),
} }
m.init(cloud) m.init(cloud, podInfoGetter)
return m return m
} }
// Returns a new apiserver. // Returns a new apiserver.
func New(etcdServers, minions []string, cloud cloudprovider.Interface, minionRegexp string) *Master { func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master {
etcdClient := etcd.NewClient(etcdServers) etcdClient := etcd.NewClient(etcdServers)
var minionRegistry registry.MinionRegistry minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp)
if cloud != nil && len(minionRegexp) > 0 {
var err error
minionRegistry, err = registry.MakeCloudMinionRegistry(cloud, minionRegexp)
if err != nil {
glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err)
minionRegistry = registry.MakeMinionRegistry(minions)
}
} else {
minionRegistry = registry.MakeMinionRegistry(minions)
}
m := &Master{ m := &Master{
podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), podRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), controllerRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry), serviceRegistry: registry.MakeEtcdRegistry(etcdClient, minionRegistry),
minionRegistry: minionRegistry, minionRegistry: minionRegistry,
} }
m.init(cloud) m.init(cloud, podInfoGetter)
return m return m
} }
func (m *Master) init(cloud cloudprovider.Interface) { func minionRegistryMaker(minions []string, cloud cloudprovider.Interface, minionRegexp string) registry.MinionRegistry {
containerInfo := &client.HTTPContainerInfo{ if cloud != nil && len(minionRegexp) > 0 {
Client: http.DefaultClient, minionRegistry, err := registry.MakeCloudMinionRegistry(cloud, minionRegexp)
Port: 10250, if err != nil {
glog.Errorf("Failed to initalize cloud minion registry reverting to static registry (%#v)", err)
}
return minionRegistry
} }
return registry.MakeMinionRegistry(minions)
}
func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInfoGetter) {
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
podCache := NewPodCache(containerInfo, m.podRegistry, time.Second*30) podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30)
go podCache.Loop() go podCache.Loop()
s := scheduler.MakeFirstFitScheduler(m.podRegistry, m.random) s := scheduler.MakeFirstFitScheduler(m.podRegistry, m.random)
m.storage = map[string]apiserver.RESTStorage{ m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, s, m.minionRegistry, cloud, podCache), "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache),
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry, m.podRegistry), "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"minions": registry.MakeMinionRegistryStorage(m.minionRegistry), "minions": registry.MakeMinionRegistryStorage(m.minionRegistry),

View File

@ -17,9 +17,11 @@ limitations under the License.
package master package master
import ( import (
"errors"
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
@ -30,37 +32,38 @@ import (
// PodCache contains both a cache of container information, as well as the mechanism for keeping // PodCache contains both a cache of container information, as well as the mechanism for keeping
// that cache up to date. // that cache up to date.
type PodCache struct { type PodCache struct {
containerInfo client.ContainerInfo containerInfo client.PodInfoGetter
pods registry.PodRegistry pods registry.PodRegistry
podInfo map[string]interface{} // This is a map of pod id to a map of container name to the
period time.Duration podInfo map[string]api.PodInfo
podLock sync.Mutex period time.Duration
podLock sync.Mutex
} }
func NewPodCache(info client.ContainerInfo, pods registry.PodRegistry, period time.Duration) *PodCache { func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache {
return &PodCache{ return &PodCache{
containerInfo: info, containerInfo: info,
pods: pods, pods: pods,
podInfo: map[string]interface{}{}, podInfo: map[string]api.PodInfo{},
period: period, period: period,
} }
} }
// Implements the ContainerInfo interface // Implements the PodInfoGetter interface.
// The returned value should be treated as read-only // The returned value should be treated as read-only.
func (p *PodCache) GetContainerInfo(host, id string) (interface{}, error) { func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) {
p.podLock.Lock() p.podLock.Lock()
defer p.podLock.Unlock() defer p.podLock.Unlock()
value, ok := p.podInfo[id] value, ok := p.podInfo[podID]
if !ok { if !ok {
return nil, nil return nil, errors.New("No cached pod info")
} else { } else {
return value, nil return value, nil
} }
} }
func (p *PodCache) updateContainerInfo(host, id string) error { func (p *PodCache) updatePodInfo(host, id string) error {
info, err := p.containerInfo.GetContainerInfo(host, id) info, err := p.containerInfo.GetPodInfo(host, id)
if err != nil { if err != nil {
return err return err
} }
@ -78,7 +81,7 @@ func (p *PodCache) UpdateAllContainers() {
return return
} }
for _, pod := range pods { for _, pod := range pods {
err := p.updateContainerInfo(pod.CurrentState.Host, pod.ID) err := p.updatePodInfo(pod.CurrentState.Host, pod.ID)
if err != nil { if err != nil {
glog.Errorf("Error synchronizing container: %#v", err) glog.Errorf("Error synchronizing container: %#v", err)
} }

View File

@ -23,16 +23,17 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/fsouza/go-dockerclient"
) )
type FakeContainerInfo struct { type FakePodInfoGetter struct {
host string host string
id string id string
data interface{} data api.PodInfo
err error err error
} }
func (f *FakeContainerInfo) GetContainerInfo(host, id string) (interface{}, error) { func (f *FakePodInfoGetter) GetPodInfo(host, id string) (api.PodInfo, error) {
f.host = host f.host = host
f.id = id f.id = id
return f.data, f.err return f.data, f.err
@ -41,53 +42,49 @@ func (f *FakeContainerInfo) GetContainerInfo(host, id string) (interface{}, erro
func TestPodCacheGet(t *testing.T) { func TestPodCacheGet(t *testing.T) {
cache := NewPodCache(nil, nil, time.Second*1) cache := NewPodCache(nil, nil, time.Second*1)
pod := api.Pod{ expected := api.PodInfo{"foo": docker.Container{ID: "foo"}}
JSONBase: api.JSONBase{ID: "foo"}, cache.podInfo["foo"] = expected
}
cache.podInfo["foo"] = pod
info, err := cache.GetContainerInfo("host", "foo") info, err := cache.GetPodInfo("host", "foo")
if err != nil { if err != nil {
t.Errorf("Unexpected error: %#v", err) t.Errorf("Unexpected error: %#v", err)
} }
if !reflect.DeepEqual(info, pod) { if !reflect.DeepEqual(info, expected) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info)
} }
} }
func TestPodCacheGetMissing(t *testing.T) { func TestPodCacheGetMissing(t *testing.T) {
cache := NewPodCache(nil, nil, time.Second*1) cache := NewPodCache(nil, nil, time.Second*1)
info, err := cache.GetContainerInfo("host", "foo") info, err := cache.GetPodInfo("host", "foo")
if err != nil { if err == nil {
t.Errorf("Unexpected error: %#v", err) t.Errorf("Unexpected non-error: %#v", err)
} }
if info != nil { if info != nil {
t.Errorf("Unexpected info: %#v", info) t.Errorf("Unexpected info: %#v", info)
} }
} }
func TestPodGetContainerInfo(t *testing.T) { func TestPodGetPodInfoGetter(t *testing.T) {
pod := api.Pod{ expected := api.PodInfo{"foo": docker.Container{ID: "foo"}}
JSONBase: api.JSONBase{ID: "foo"}, fake := FakePodInfoGetter{
} data: expected,
fake := FakeContainerInfo{
data: pod,
} }
cache := NewPodCache(&fake, nil, time.Second*1) cache := NewPodCache(&fake, nil, time.Second*1)
cache.updateContainerInfo("host", "foo") cache.updatePodInfo("host", "foo")
if fake.host != "host" || fake.id != "foo" { if fake.host != "host" || fake.id != "foo" {
t.Errorf("Unexpected access: %#v", fake) t.Errorf("Unexpected access: %#v", fake)
} }
info, err := cache.GetContainerInfo("host", "foo") info, err := cache.GetPodInfo("host", "foo")
if err != nil { if err != nil {
t.Errorf("Unexpected error: %#v", err) t.Errorf("Unexpected error: %#v", err)
} }
if !reflect.DeepEqual(info, pod) { if !reflect.DeepEqual(info, expected) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info)
} }
} }
@ -98,10 +95,13 @@ func TestPodUpdateAllContainers(t *testing.T) {
Host: "machine", Host: "machine",
}, },
} }
pods := []api.Pod{pod} pods := []api.Pod{pod}
mockRegistry := registry.MakeMockPodRegistry(pods) mockRegistry := registry.MakeMockPodRegistry(pods)
fake := FakeContainerInfo{
data: pod, expected := api.PodInfo{"foo": docker.Container{ID: "foo"}}
fake := FakePodInfoGetter{
data: expected,
} }
cache := NewPodCache(&fake, mockRegistry, time.Second*1) cache := NewPodCache(&fake, mockRegistry, time.Second*1)
@ -111,11 +111,11 @@ func TestPodUpdateAllContainers(t *testing.T) {
t.Errorf("Unexpected access: %#v", fake) t.Errorf("Unexpected access: %#v", fake)
} }
info, err := cache.GetContainerInfo("machine", "foo") info, err := cache.GetPodInfo("machine", "foo")
if err != nil { if err != nil {
t.Errorf("Unexpected error: %#v", err) t.Errorf("Unexpected error: %#v", err)
} }
if !reflect.DeepEqual(info, pod) { if !reflect.DeepEqual(info, expected) {
t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", pod, info) t.Errorf("Unexpected mismatch. Expected: %#v, Got: #%v", &expected, info)
} }
} }

View File

@ -33,8 +33,8 @@ import (
// PodRegistryStorage implements the RESTStorage interface in terms of a PodRegistry // PodRegistryStorage implements the RESTStorage interface in terms of a PodRegistry
type PodRegistryStorage struct { type PodRegistryStorage struct {
registry PodRegistry registry PodRegistry
containerInfo client.ContainerInfo podInfoGetter client.PodInfoGetter
podCache client.ContainerInfo podCache client.PodInfoGetter
scheduler scheduler.Scheduler scheduler scheduler.Scheduler
minionLister scheduler.MinionLister minionLister scheduler.MinionLister
cloud cloudprovider.Interface cloud cloudprovider.Interface
@ -44,20 +44,20 @@ type PodRegistryStorage struct {
// MakePodRegistryStorage makes a RESTStorage object for a pod registry. // MakePodRegistryStorage makes a RESTStorage object for a pod registry.
// Parameters: // Parameters:
// registry: The pod registry // registry: The pod registry
// containerInfo: Source of fresh container info // podInfoGetter: Source of fresh container info
// scheduler: The scheduler for assigning pods to machines // scheduler: The scheduler for assigning pods to machines
// minionLister: Object which can list available minions for the scheduler // minionLister: Object which can list available minions for the scheduler
// cloud: Interface to a cloud provider (may be null) // cloud: Interface to a cloud provider (may be null)
// podCache: Source of cached container info // podCache: Source of cached container info
func MakePodRegistryStorage(registry PodRegistry, func MakePodRegistryStorage(registry PodRegistry,
containerInfo client.ContainerInfo, podInfoGetter client.PodInfoGetter,
scheduler scheduler.Scheduler, scheduler scheduler.Scheduler,
minionLister scheduler.MinionLister, minionLister scheduler.MinionLister,
cloud cloudprovider.Interface, cloud cloudprovider.Interface,
podCache client.ContainerInfo) apiserver.RESTStorage { podCache client.PodInfoGetter) apiserver.RESTStorage {
return &PodRegistryStorage{ return &PodRegistryStorage{
registry: registry, registry: registry,
containerInfo: containerInfo, podInfoGetter: podInfoGetter,
scheduler: scheduler, scheduler: scheduler,
minionLister: minionLister, minionLister: minionLister,
cloud: cloud, cloud: cloud,
@ -71,34 +71,56 @@ func (storage *PodRegistryStorage) List(selector labels.Selector) (interface{},
pods, err := storage.registry.ListPods(selector) pods, err := storage.registry.ListPods(selector)
if err == nil { if err == nil {
result.Items = pods result.Items = pods
// Get cached info for the list currently. for i := range result.Items {
// TODO: Optionally use fresh info storage.fillPodInfo(&result.Items[i])
if storage.podCache != nil {
for ix, pod := range pods {
info, err := storage.podCache.GetContainerInfo(pod.CurrentState.Host, pod.ID)
if err != nil {
glog.Errorf("Error getting container info: %#v", err)
continue
}
result.Items[ix].CurrentState.Info = info
}
} }
} }
return result, err return result, err
} }
func makePodStatus(info interface{}) api.PodStatus { func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) {
if state, ok := info.(map[string]interface{})["State"]; ok { // Get cached info for the list currently.
if running, ok := state.(map[string]interface{})["Running"]; ok { // TODO: Optionally use fresh info
if running.(bool) { if storage.podCache != nil {
return api.PodRunning info, err := storage.podCache.GetPodInfo(pod.CurrentState.Host, pod.ID)
if err != nil {
glog.Errorf("Error getting container info: %#v", err)
return
}
pod.CurrentState.Info = info
}
}
func makePodStatus(pod *api.Pod) api.PodStatus {
if pod.CurrentState.Info == nil {
return api.PodPending
}
running := 0
stopped := 0
unknown := 0
for _, container := range pod.DesiredState.Manifest.Containers {
if info, ok := pod.CurrentState.Info[container.Name]; ok {
if info.State.Running {
running++
} else { } else {
return api.PodStopped stopped++
} }
} else {
unknown++
} }
} }
return api.PodPending
switch {
case running > 0 && stopped == 0 && unknown == 0:
return api.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
return api.PodStopped
case running == 0 && stopped == 0 && unknown > 0:
return api.PodPending
default:
return api.PodPending
}
} }
func getInstanceIP(cloud cloudprovider.Interface, host string) string { func getInstanceIP(cloud cloudprovider.Interface, host string) string {
@ -129,13 +151,9 @@ func (storage *PodRegistryStorage) Get(id string) (interface{}, error) {
if pod == nil { if pod == nil {
return pod, nil return pod, nil
} }
if storage.containerInfo != nil { if storage.podCache != nil || storage.podInfoGetter != nil {
info, err := storage.containerInfo.GetContainerInfo(pod.CurrentState.Host, id) storage.fillPodInfo(pod)
if err != nil { pod.CurrentState.Status = makePodStatus(pod)
return pod, err
}
pod.CurrentState.Info = info
pod.CurrentState.Status = makePodStatus(info)
} }
pod.CurrentState.HostIP = getInstanceIP(storage.cloud, pod.CurrentState.Host) pod.CurrentState.HostIP = getInstanceIP(storage.cloud, pod.CurrentState.Host)

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/fsouza/go-dockerclient"
) )
func expectNoError(t *testing.T, err error) { func expectNoError(t *testing.T, err error) {
@ -153,29 +154,88 @@ func TestGetPodCloud(t *testing.T) {
} }
func TestMakePodStatus(t *testing.T) { func TestMakePodStatus(t *testing.T) {
status := makePodStatus(map[string]interface{}{}) desiredState := api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{Name: "containerA"},
{Name: "containerB"},
},
},
}
pod := &api.Pod{DesiredState: desiredState}
status := makePodStatus(pod)
if status != api.PodPending { if status != api.PodPending {
t.Errorf("Expected 'Pending', got '%s'", status) t.Errorf("Expected 'Pending', got '%s'", status)
} }
status = makePodStatus(map[string]interface{}{ runningState := docker.Container{
"State": map[string]interface{}{ State: docker.State{
"Running": false, Running: true,
}, },
}) }
stoppedState := docker.Container{
State: docker.State{
Running: false,
},
}
// All running.
pod = &api.Pod{
DesiredState: desiredState,
CurrentState: api.PodState{
Info: map[string]docker.Container{
"containerA": runningState,
"containerB": runningState,
},
},
}
status = makePodStatus(pod)
if status != api.PodRunning {
t.Errorf("Expected 'Running', got '%s'", status)
}
// All stopped.
pod = &api.Pod{
DesiredState: desiredState,
CurrentState: api.PodState{
Info: map[string]docker.Container{
"containerA": stoppedState,
"containerB": stoppedState,
},
},
}
status = makePodStatus(pod)
if status != api.PodStopped { if status != api.PodStopped {
t.Errorf("Expected 'Stopped', got '%s'", status) t.Errorf("Expected 'Stopped', got '%s'", status)
} }
status = makePodStatus(map[string]interface{}{ // Mixed state.
"State": map[string]interface{}{ pod = &api.Pod{
"Running": true, DesiredState: desiredState,
CurrentState: api.PodState{
Info: map[string]docker.Container{
"containerA": runningState,
"containerB": stoppedState,
},
}, },
}) }
status = makePodStatus(pod)
if status != api.PodPending {
t.Errorf("Expected 'Pending', got '%s'", status)
}
if status != api.PodRunning { // Mixed state.
t.Errorf("Expected 'Running', got '%s'", status) pod = &api.Pod{
DesiredState: desiredState,
CurrentState: api.PodState{
Info: map[string]docker.Container{
"containerA": runningState,
},
},
}
status = makePodStatus(pod)
if status != api.PodPending {
t.Errorf("Expected 'Pending', got '%s'", status)
} }
} }