Merge pull request #3171 from anguslees/openstack-provider

OpenStack updates
This commit is contained in:
Brendan Burns 2015-03-17 07:30:08 -07:00
commit b2ed6b3255
2 changed files with 108 additions and 40 deletions

View File

@ -195,7 +195,7 @@ func (s *NodeController) SyncCloud() error {
glog.Infof("Create node in registry: %s", node.Name) glog.Infof("Create node in registry: %s", node.Name)
_, err = s.kubeClient.Nodes().Create(&node) _, err = s.kubeClient.Nodes().Create(&node)
if err != nil { if err != nil {
glog.Errorf("Create node error: %s", node.Name) glog.Errorf("Create node %s error: %v", node.Name, err)
} }
} }
delete(nodeMap, node.Name) delete(nodeMap, node.Name)
@ -206,7 +206,7 @@ func (s *NodeController) SyncCloud() error {
glog.Infof("Delete node from registry: %s", nodeID) glog.Infof("Delete node from registry: %s", nodeID)
err = s.kubeClient.Nodes().Delete(nodeID) err = s.kubeClient.Nodes().Delete(nodeID)
if err != nil { if err != nil {
glog.Errorf("Delete node error: %s", nodeID) glog.Errorf("Delete node %s error: %v", nodeID, err)
} }
s.deletePods(nodeID) s.deletePods(nodeID)
} }

View File

@ -46,6 +46,11 @@ var ErrMultipleResults = errors.New("Multiple results where only one expected")
var ErrNoAddressFound = errors.New("No address found for host") var ErrNoAddressFound = errors.New("No address found for host")
var ErrAttrNotFound = errors.New("Expected attribute not found") var ErrAttrNotFound = errors.New("Expected attribute not found")
const (
MiB = 1024 * 1024
GB = 1000 * 1000 * 1000
)
// encoding.TextUnmarshaler interface for time.Duration // encoding.TextUnmarshaler interface for time.Duration
type MyDuration struct { type MyDuration struct {
time.Duration time.Duration
@ -71,6 +76,7 @@ type LoadBalancerOpts struct {
// OpenStack is an implementation of cloud provider Interface for OpenStack. // OpenStack is an implementation of cloud provider Interface for OpenStack.
type OpenStack struct { type OpenStack struct {
provider *gophercloud.ProviderClient provider *gophercloud.ProviderClient
authOpts gophercloud.AuthOptions
region string region string
lbOpts LoadBalancerOpts lbOpts LoadBalancerOpts
} }
@ -111,7 +117,11 @@ func (cfg Config) toAuthOptions() gophercloud.AuthOptions {
TenantID: cfg.Global.TenantId, TenantID: cfg.Global.TenantId,
TenantName: cfg.Global.TenantName, TenantName: cfg.Global.TenantName,
// Persistent service, so we need to be able to renew tokens // Persistent service, so we need to be able to renew
// tokens.
// (gophercloud doesn't appear to actually reauth yet,
// hence the explicit openstack.Authenticate() calls
// below)
AllowReauth: true, AllowReauth: true,
} }
} }
@ -128,13 +138,15 @@ func readConfig(config io.Reader) (Config, error) {
} }
func newOpenStack(cfg Config) (*OpenStack, error) { func newOpenStack(cfg Config) (*OpenStack, error) {
provider, err := openstack.AuthenticatedClient(cfg.toAuthOptions()) authOpts := cfg.toAuthOptions()
provider, err := openstack.AuthenticatedClient(authOpts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
os := OpenStack{ os := OpenStack{
provider: provider, provider: provider,
authOpts: authOpts,
region: cfg.Global.Region, region: cfg.Global.Region,
lbOpts: cfg.LoadBalancer, lbOpts: cfg.LoadBalancer,
} }
@ -148,7 +160,12 @@ type Instances struct {
// Instances returns an implementation of Instances for OpenStack. // Instances returns an implementation of Instances for OpenStack.
func (os *OpenStack) Instances() (cloudprovider.Instances, bool) { func (os *OpenStack) Instances() (cloudprovider.Instances, bool) {
glog.V(2).Info("openstack.Instances() called") glog.V(4).Info("openstack.Instances() called")
if err := openstack.Authenticate(os.provider, os.authOpts); err != nil {
glog.Warningf("Failed to reauthenticate: %v", err)
return nil, false
}
compute, err := openstack.NewComputeV2(os.provider, gophercloud.EndpointOpts{ compute, err := openstack.NewComputeV2(os.provider, gophercloud.EndpointOpts{
Region: os.region, Region: os.region,
@ -169,11 +186,11 @@ func (os *OpenStack) Instances() (cloudprovider.Instances, bool) {
for _, flavor := range flavorList { for _, flavor := range flavorList {
rsrc := api.NodeResources{ rsrc := api.NodeResources{
Capacity: api.ResourceList{ Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(int64(flavor.VCPUs*1000), resource.DecimalSI), api.ResourceCPU: *resource.NewQuantity(int64(flavor.VCPUs), resource.DecimalSI),
api.ResourceMemory: resource.MustParse(fmt.Sprintf("%dMi", flavor.RAM)), api.ResourceMemory: *resource.NewQuantity(int64(flavor.RAM)*MiB, resource.BinarySI),
"openstack.org/disk": resource.MustParse(fmt.Sprintf("%dG", flavor.Disk)), "openstack.org/disk": *resource.NewQuantity(int64(flavor.Disk)*GB, resource.DecimalSI),
"openstack.org/rxTxFactor": *resource.NewQuantity(int64(flavor.RxTxFactor*1000), resource.DecimalSI), "openstack.org/rxTxFactor": *resource.NewMilliQuantity(int64(flavor.RxTxFactor)*1000, resource.DecimalSI),
"openstack.org/swap": resource.MustParse(fmt.Sprintf("%dMi", flavor.Swap)), "openstack.org/swap": *resource.NewQuantity(int64(flavor.Swap)*MiB, resource.BinarySI),
}, },
} }
flavor_to_resource[flavor.ID] = &rsrc flavor_to_resource[flavor.ID] = &rsrc
@ -185,14 +202,14 @@ func (os *OpenStack) Instances() (cloudprovider.Instances, bool) {
return nil, false return nil, false
} }
glog.V(2).Infof("Found %v compute flavors", len(flavor_to_resource)) glog.V(3).Infof("Found %v compute flavors", len(flavor_to_resource))
glog.V(1).Info("Claiming to support Instances") glog.V(1).Info("Claiming to support Instances")
return &Instances{compute, flavor_to_resource}, true return &Instances{compute, flavor_to_resource}, true
} }
func (i *Instances) List(name_filter string) ([]string, error) { func (i *Instances) List(name_filter string) ([]string, error) {
glog.V(2).Infof("openstack List(%v) called", name_filter) glog.V(4).Infof("openstack List(%v) called", name_filter)
opts := servers.ListOpts{ opts := servers.ListOpts{
Name: name_filter, Name: name_filter,
@ -215,7 +232,8 @@ func (i *Instances) List(name_filter string) ([]string, error) {
return nil, err return nil, err
} }
glog.V(2).Infof("Found %v entries: %v", len(ret), ret) glog.V(3).Infof("Found %v instances matching %v: %v",
len(ret), name_filter, ret)
return ret, nil return ret, nil
} }
@ -253,25 +271,29 @@ func getServerByName(client *gophercloud.ServiceClient, name string) (*servers.S
return &serverList[0], nil return &serverList[0], nil
} }
func firstAddr(netblob interface{}) string { func findAddrs(netblob interface{}) []string {
// Run-time types for the win :( // Run-time types for the win :(
ret := []string{}
list, ok := netblob.([]interface{}) list, ok := netblob.([]interface{})
if !ok || len(list) < 1 {
return ""
}
props, ok := list[0].(map[string]interface{})
if !ok { if !ok {
return "" return ret
}
for _, item := range list {
props, ok := item.(map[string]interface{})
if !ok {
continue
} }
tmp, ok := props["addr"] tmp, ok := props["addr"]
if !ok { if !ok {
return "" continue
} }
addr, ok := tmp.(string) addr, ok := tmp.(string)
if !ok { if !ok {
return "" continue
} }
return addr ret = append(ret, addr)
}
return ret
} }
func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) { func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) {
@ -282,10 +304,14 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro
var s string var s string
if s == "" { if s == "" {
s = firstAddr(srv.Addresses["private"]) if tmp := findAddrs(srv.Addresses["private"]); len(tmp) >= 1 {
s = tmp[0]
}
} }
if s == "" { if s == "" {
s = firstAddr(srv.Addresses["public"]) if tmp := findAddrs(srv.Addresses["public"]); len(tmp) >= 1 {
s = tmp[0]
}
} }
if s == "" { if s == "" {
s = srv.AccessIPv4 s = srv.AccessIPv4
@ -300,17 +326,43 @@ func getAddressByName(api *gophercloud.ServiceClient, name string) (string, erro
} }
func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) { func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) {
glog.V(2).Infof("NodeAddresses(%v) called", name) glog.V(4).Infof("NodeAddresses(%v) called", name)
ip, err := getAddressByName(i.compute, name) srv, err := getServerByName(i.compute, name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
glog.V(2).Infof("NodeAddresses(%v) => %v", name, ip) addrs := []api.NodeAddress{}
// net.ParseIP().String() is to maintain compatibility with the old code for _, addr := range findAddrs(srv.Addresses["private"]) {
return []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: net.ParseIP(ip).String()}}, nil addrs = append(addrs, api.NodeAddress{
Type: api.NodeInternalIP,
Address: addr,
})
}
for _, addr := range findAddrs(srv.Addresses["public"]) {
addrs = append(addrs, api.NodeAddress{
Type: api.NodeExternalIP,
Address: addr,
})
}
// AccessIPs are usually duplicates of "public" addresses.
api.AddToNodeAddresses(&addrs,
api.NodeAddress{
Type: api.NodeExternalIP,
Address: srv.AccessIPv6,
},
api.NodeAddress{
Type: api.NodeExternalIP,
Address: srv.AccessIPv4,
},
)
glog.V(4).Infof("NodeAddresses(%v) => %v", name, addrs)
return addrs, nil
} }
// ExternalID returns the cloud provider ID of the specified instance. // ExternalID returns the cloud provider ID of the specified instance.
@ -323,7 +375,7 @@ func (i *Instances) ExternalID(name string) (string, error) {
} }
func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
glog.V(2).Infof("GetNodeResources(%v) called", name) glog.V(4).Infof("GetNodeResources(%v) called", name)
srv, err := getServerByName(i.compute, name) srv, err := getServerByName(i.compute, name)
if err != nil { if err != nil {
@ -343,7 +395,7 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, ErrNotFound return nil, ErrNotFound
} }
glog.V(2).Infof("GetNodeResources(%v) => %v", name, rsrc) glog.V(4).Infof("GetNodeResources(%v) => %v", name, rsrc)
return rsrc, nil return rsrc, nil
} }
@ -359,6 +411,13 @@ type LoadBalancer struct {
} }
func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
glog.V(4).Info("openstack.TCPLoadBalancer() called")
if err := openstack.Authenticate(os.provider, os.authOpts); err != nil {
glog.Warningf("Failed to reauthenticate: %v", err)
return nil, false
}
// TODO: Search for and support Rackspace loadbalancer API, and others. // TODO: Search for and support Rackspace loadbalancer API, and others.
network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{ network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{
Region: os.region, Region: os.region,
@ -427,10 +486,18 @@ func (lb *LoadBalancer) TCPLoadBalancerExists(name, region string) (bool, error)
// each region. // each region.
func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string, affinity api.AffinityType) (string, error) { func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string, affinity api.AffinityType) (string, error) {
glog.V(2).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v)", name, region, externalIP, port, hosts) glog.V(4).Infof("CreateTCPLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, externalIP, port, hosts, affinity)
if affinity != api.AffinityTypeNone {
var persistence *vips.SessionPersistence
switch affinity {
case api.AffinityTypeNone:
persistence = nil
case api.AffinityTypeClientIP:
persistence = &vips.SessionPersistence{Type: "SOURCE_IP"}
default:
return "", fmt.Errorf("unsupported load balancer affinity: %v", affinity) return "", fmt.Errorf("unsupported load balancer affinity: %v", affinity)
} }
pool, err := pools.Create(lb.network, pools.CreateOpts{ pool, err := pools.Create(lb.network, pools.CreateOpts{
Name: name, Name: name,
Protocol: pools.ProtocolTCP, Protocol: pools.ProtocolTCP,
@ -485,6 +552,7 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
Protocol: "TCP", Protocol: "TCP",
ProtocolPort: port, ProtocolPort: port,
PoolID: pool.ID, PoolID: pool.ID,
Persistence: persistence,
}).Extract() }).Extract()
if err != nil { if err != nil {
if mon != nil { if mon != nil {
@ -498,7 +566,7 @@ func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP ne
} }
func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error { func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
glog.V(2).Infof("UpdateTCPLoadBalancer(%v, %v, %v)", name, region, hosts) glog.V(4).Infof("UpdateTCPLoadBalancer(%v, %v, %v)", name, region, hosts)
vip, err := getVipByName(lb.network, name) vip, err := getVipByName(lb.network, name)
if err != nil { if err != nil {
@ -559,7 +627,7 @@ func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []strin
} }
func (lb *LoadBalancer) DeleteTCPLoadBalancer(name, region string) error { func (lb *LoadBalancer) DeleteTCPLoadBalancer(name, region string) error {
glog.V(2).Infof("DeleteTCPLoadBalancer(%v, %v)", name, region) glog.V(4).Infof("DeleteTCPLoadBalancer(%v, %v)", name, region)
vip, err := getVipByName(lb.network, name) vip, err := getVipByName(lb.network, name)
if err != nil { if err != nil {