Merge pull request #161 from brendandburns/next

Add population of the host IP address when on GCE.
This commit is contained in:
Daniel Smith 2014-06-18 15:04:05 -07:00
commit 28f9dfafb3
8 changed files with 125 additions and 26 deletions

View File

@ -44,7 +44,7 @@ func main() {
reg := registry.MakeEtcdRegistry(etcdClient, machineList) reg := registry.MakeEtcdRegistry(etcdClient, machineList)
apiserver := apiserver.New(map[string]apiserver.RESTStorage{ apiserver := apiserver.New(map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(reg, &client.FakeContainerInfo{}, registry.MakeRoundRobinScheduler(machineList)), "pods": registry.MakePodRegistryStorage(reg, &client.FakeContainerInfo{}, registry.MakeRoundRobinScheduler(machineList), nil),
"replicationControllers": registry.MakeControllerRegistryStorage(reg), "replicationControllers": registry.MakeControllerRegistryStorage(reg),
}, "/api/v1beta1") }, "/api/v1beta1")
server := httptest.NewServer(apiserver) server := httptest.NewServer(apiserver)

View File

@ -16,10 +16,16 @@ limitations under the License.
package cloudprovider package cloudprovider
import (
"net"
)
// CloudInterface is an abstract, pluggable interface for cloud providers // CloudInterface is an abstract, pluggable interface for cloud providers
type Interface interface { type Interface interface {
// TCPLoadBalancer returns a balancer interface, or nil if none is supported. Returns an error if one occurs. // TCPLoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
TCPLoadBalancer() (TCPLoadBalancer, error) TCPLoadBalancer() (TCPLoadBalancer, bool)
// Instances returns an instances interface. Also returns true if the interface is supported, false otherwise.
Instances() (Instances, bool)
} }
type TCPLoadBalancer interface { type TCPLoadBalancer interface {
@ -29,3 +35,7 @@ type TCPLoadBalancer interface {
UpdateTCPLoadBalancer(name, region string, hosts []string) error UpdateTCPLoadBalancer(name, region string, hosts []string) error
DeleteTCPLoadBalancer(name, region string) error DeleteTCPLoadBalancer(name, region string) error
} }
type Instances interface {
IPAddress(name string) (net.IP, error)
}

View File

@ -35,8 +35,12 @@ func (f *FakeCloud) ClearCalls() {
f.Calls = []string{} f.Calls = []string{}
} }
func (f *FakeCloud) TCPLoadBalancer() (TCPLoadBalancer, error) { func (f *FakeCloud) TCPLoadBalancer() (TCPLoadBalancer, bool) {
return f, nil return f, true
}
func (f *FakeCloud) Instances() (Instances, bool) {
return f, true
} }
func (f *FakeCloud) TCPLoadBalancerExists(name, region string) (bool, error) { func (f *FakeCloud) TCPLoadBalancerExists(name, region string) (bool, error) {

View File

@ -19,6 +19,7 @@ package cloudprovider
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@ -78,8 +79,12 @@ func NewGCECloud() (*GCECloud, error) {
}, nil }, nil
} }
func (gce *GCECloud) TCPLoadBalancer() (TCPLoadBalancer, error) { func (gce *GCECloud) TCPLoadBalancer() (TCPLoadBalancer, bool) {
return gce, nil return gce, true
}
func (gce *GCECloud) Instances() (Instances, bool) {
return gce, true
} }
func makeHostLink(projectID, zone, host string) string { func makeHostLink(projectID, zone, host string) string {
@ -162,3 +167,15 @@ func (gce *GCECloud) DeleteTCPLoadBalancer(name, region string) error {
_, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do() _, err = gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
return err return err
} }
func (gce *GCECloud) IPAddress(instance string) (net.IP, error) {
res, err := gce.service.Instances.Get(gce.projectID, gce.zone, instance).Do()
if err != nil {
return nil, err
}
ip := net.ParseIP(res.NetworkInterfaces[0].AccessConfigs[0].NatIP)
if ip == nil {
return nil, fmt.Errorf("Invalid network IP: %s", res.NetworkInterfaces[0].AccessConfigs[0].NatIP)
}
return ip, nil
}

View File

@ -72,7 +72,7 @@ func (m *Master) init(minions []string, cloud cloudprovider.Interface) {
m.minions = minions m.minions = minions
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
m.storage = map[string]apiserver.RESTStorage{ m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random)), "pods": registry.MakePodRegistryStorage(m.podRegistry, containerInfo, registry.MakeFirstFitScheduler(m.minions, m.podRegistry, m.random), cloud),
"replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry), "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry),
"services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions), "services": registry.MakeServiceRegistryStorage(m.serviceRegistry, cloud, m.minions),
} }

View File

@ -18,10 +18,13 @@ package registry
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
) )
@ -30,13 +33,15 @@ type PodRegistryStorage struct {
registry PodRegistry registry PodRegistry
containerInfo client.ContainerInfo containerInfo client.ContainerInfo
scheduler Scheduler scheduler Scheduler
cloud cloudprovider.Interface
} }
func MakePodRegistryStorage(registry PodRegistry, containerInfo client.ContainerInfo, scheduler Scheduler) apiserver.RESTStorage { func MakePodRegistryStorage(registry PodRegistry, containerInfo client.ContainerInfo, scheduler Scheduler, cloud cloudprovider.Interface) apiserver.RESTStorage {
return &PodRegistryStorage{ return &PodRegistryStorage{
registry: registry, registry: registry,
containerInfo: containerInfo, containerInfo: containerInfo,
scheduler: scheduler, scheduler: scheduler,
cloud: cloud,
} }
} }
@ -63,17 +68,44 @@ func makePodStatus(info interface{}) string {
return "Pending" return "Pending"
} }
func getInstanceIP(cloud cloudprovider.Interface, host string) string {
if cloud == nil {
return ""
}
instances, ok := cloud.Instances()
if instances == nil || !ok {
return ""
}
ix := strings.Index(host, ".")
if ix != -1 {
host = host[:ix]
}
addr, err := instances.IPAddress(host)
if err != nil {
log.Printf("Error getting instance IP: %#v", err)
return ""
}
return addr.String()
}
func (storage *PodRegistryStorage) Get(id string) (interface{}, error) { func (storage *PodRegistryStorage) Get(id string) (interface{}, error) {
pod, err := storage.registry.GetPod(id) pod, err := storage.registry.GetPod(id)
if err != nil { if err != nil {
return pod, err return pod, err
} }
info, err := storage.containerInfo.GetContainerInfo(pod.CurrentState.Host, id) if pod == nil {
if err != nil { return pod, nil
return pod, err
} }
pod.CurrentState.Info = info if storage.containerInfo != nil {
pod.CurrentState.Status = makePodStatus(info) info, err := storage.containerInfo.GetContainerInfo(pod.CurrentState.Host, id)
if err != nil {
return pod, err
}
pod.CurrentState.Info = info
pod.CurrentState.Status = makePodStatus(info)
}
pod.CurrentState.HostIP = getInstanceIP(storage.cloud, pod.CurrentState.Host)
pod.Kind = "cluster#pod" pod.Kind = "cluster#pod"
return pod, err return pod, err
} }

View File

@ -22,11 +22,13 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
) )
type MockPodRegistry struct { type MockPodRegistry struct {
err error err error
pod *api.Pod
pods []api.Pod pods []api.Pod
} }
@ -50,7 +52,7 @@ func (registry *MockPodRegistry) ListPods(query labels.Query) ([]api.Pod, error)
} }
func (registry *MockPodRegistry) GetPod(podId string) (*api.Pod, error) { func (registry *MockPodRegistry) GetPod(podId string) (*api.Pod, error) {
return &api.Pod{}, registry.err return registry.pod, registry.err
} }
func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error { func (registry *MockPodRegistry) CreatePod(machine string, pod api.Pod) error {
@ -145,6 +147,45 @@ func TestExtractJson(t *testing.T) {
} }
} }
func TestGetPod(t *testing.T) {
mockRegistry := MockPodRegistry{
pod: &api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
},
}
storage := PodRegistryStorage{
registry: &mockRegistry,
}
obj, err := storage.Get("foo")
pod := obj.(*api.Pod)
expectNoError(t, err)
if !reflect.DeepEqual(*mockRegistry.pod, *pod) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod)
}
}
func TestGetPodCloud(t *testing.T) {
fakeCloud := &cloudprovider.FakeCloud{}
mockRegistry := MockPodRegistry{
pod: &api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
},
}
storage := PodRegistryStorage{
registry: &mockRegistry,
cloud: fakeCloud,
}
obj, err := storage.Get("foo")
pod := obj.(*api.Pod)
expectNoError(t, err)
if !reflect.DeepEqual(*mockRegistry.pod, *pod) {
t.Errorf("Unexpected pod. Expected %#v, Got %#v", *mockRegistry.pod, *pod)
}
if len(fakeCloud.Calls) != 1 || fakeCloud.Calls[0] != "ip-address" {
t.Errorf("Unexpected calls: %#v", fakeCloud.Calls)
}
}
func TestMakePodStatus(t *testing.T) { func TestMakePodStatus(t *testing.T) {
status := makePodStatus(map[string]interface{}{}) status := makePodStatus(map[string]interface{}{})
if status != "Pending" { if status != "Pending" {

View File

@ -91,13 +91,11 @@ func (sr *ServiceRegistryStorage) Delete(id string) error {
} }
if svc.(*api.Service).CreateExternalLoadBalancer { if svc.(*api.Service).CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil { if sr.cloud != nil {
balancer, err = sr.cloud.TCPLoadBalancer() balancer, ok = sr.cloud.TCPLoadBalancer()
if err != nil {
return err
}
} }
if balancer != nil { if ok && balancer != nil {
err = balancer.DeleteTCPLoadBalancer(id, "us-central1") err = balancer.DeleteTCPLoadBalancer(id, "us-central1")
if err != nil { if err != nil {
return err return err
@ -118,14 +116,11 @@ func (sr *ServiceRegistryStorage) Create(obj interface{}) error {
srv := obj.(api.Service) srv := obj.(api.Service)
if srv.CreateExternalLoadBalancer { if srv.CreateExternalLoadBalancer {
var balancer cloudprovider.TCPLoadBalancer var balancer cloudprovider.TCPLoadBalancer
var ok bool
if sr.cloud != nil { if sr.cloud != nil {
var err error balancer, ok = sr.cloud.TCPLoadBalancer()
balancer, err = sr.cloud.TCPLoadBalancer()
if err != nil {
return err
}
} }
if balancer != nil { if ok && balancer != nil {
err := balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, sr.hosts) err := balancer.CreateTCPLoadBalancer(srv.ID, "us-central1", srv.Port, sr.hosts)
if err != nil { if err != nil {
return err return err