Merge pull request #407 from yugui/fix/golint

Fixes Go lint errors.
This commit is contained in:
Tim Hockin 2014-07-15 08:44:57 -07:00
commit 3a9295add0
15 changed files with 164 additions and 123 deletions

View File

@ -23,14 +23,15 @@ import (
// Labels allows you to present labels independently from their storage. // Labels allows you to present labels independently from their storage.
type Labels interface { type Labels interface {
// Get returns the value for the provided label.
Get(label string) (value string) Get(label string) (value string)
} }
// A map of label:value. Implements Labels. // Set is a map of label:value. It implements Labels.
type Set map[string]string type Set map[string]string
// All labels listed as a human readable string. Conveniently, exactly the format // String returns all labels listed as a human readable string.
// that ParseSelector takes. // Conveniently, exactly the format that ParseSelector takes.
func (ls Set) String() string { func (ls Set) String() string {
selector := make([]string, 0, len(ls)) selector := make([]string, 0, len(ls))
for key, value := range ls { for key, value := range ls {
@ -41,12 +42,12 @@ func (ls Set) String() string {
return strings.Join(selector, ",") return strings.Join(selector, ",")
} }
// Implement Labels interface. // Get returns the value in the map for the provided label.
func (ls Set) Get(label string) string { func (ls Set) Get(label string) string {
return ls[label] return ls[label]
} }
// Convenience function: convert these labels to a selector. // AsSelector converts labels into a selectors.
func (ls Set) AsSelector() Selector { func (ls Set) AsSelector() Selector {
return SelectorFromSet(ls) return SelectorFromSet(ls)
} }

View File

@ -22,12 +22,12 @@ import (
"strings" "strings"
) )
// Represents a selector. // Selector represents a label selector.
type Selector interface { type Selector interface {
// Returns true if this selector matches the given set of labels. // Matches returns true if this selector matches the given set of labels.
Matches(Labels) bool Matches(Labels) bool
// Prints a human readable version of this selector. // String returns a human readable string that represents this selector.
String() string String() string
} }
@ -87,7 +87,7 @@ func try(selectorPiece, op string) (lhs, rhs string, ok bool) {
return "", "", false return "", "", false
} }
// Given a Set, return a Selector which will match exactly that Set. // SelectorFromSet returns a Selector which will match exactly the given Set.
func SelectorFromSet(ls Set) Selector { func SelectorFromSet(ls Set) Selector {
items := make([]Selector, 0, len(ls)) items := make([]Selector, 0, len(ls))
for label, value := range ls { for label, value := range ls {
@ -99,7 +99,8 @@ func SelectorFromSet(ls Set) Selector {
return andTerm(items) return andTerm(items)
} }
// Takes a string repsenting a selector and returns an object suitable for matching, or an error. // ParseSelector takes a string repsenting a selector and returns an
// object suitable for matching, or an error.
func ParseSelector(selector string) (Selector, error) { func ParseSelector(selector string) (Selector, error) {
parts := strings.Split(selector, ",") parts := strings.Split(selector, ",")
sort.StringSlice(parts).Sort() sort.StringSlice(parts).Sort()

View File

@ -43,7 +43,7 @@ type Master struct {
storage map[string]apiserver.RESTStorage storage map[string]apiserver.RESTStorage
} }
// Returns a memory (not etcd) backed apiserver. // NewMemoryServer returns a new instance of Master backed with memory (not etcd).
func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master { func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master {
m := &Master{ m := &Master{
podRegistry: registry.MakeMemoryRegistry(), podRegistry: registry.MakeMemoryRegistry(),
@ -55,7 +55,7 @@ func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud
return m return m
} }
// Returns a new apiserver. // New returns a new instance of Master connected to the given etcdServer.
func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master { func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master {
etcdClient := etcd.NewClient(etcdServers) etcdClient := etcd.NewClient(etcdServers)
minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp) minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp)
@ -84,7 +84,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30) podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30)
go podCache.Loop() go podCache.Loop()
s := scheduler.MakeRandomFitScheduler(m.podRegistry, m.random) s := scheduler.NewRandomFitScheduler(m.podRegistry, m.random)
m.storage = map[string]apiserver.RESTStorage{ m.storage = map[string]apiserver.RESTStorage{
"pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache), "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache),
"replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry), "replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry),
@ -94,7 +94,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
} }
// Runs master. Never returns. // Run begins serving the Kubernetes API. It never returns.
func (m *Master) Run(myAddress, apiPrefix string) error { func (m *Master) Run(myAddress, apiPrefix string) error {
endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
@ -109,8 +109,9 @@ func (m *Master) Run(myAddress, apiPrefix string) error {
return s.ListenAndServe() return s.ListenAndServe()
} }
// Instead of calling Run, call ConstructHandler to get a handler for your own // ConstructHandler returns an http.Handler which serves the Kubernetes API.
// server. Intended for testing. Only call once. // Instead of calling Run, you can call this function to get a handler for your own server.
// It is intended for testing. Only call once.
func (m *Master) ConstructHandler(apiPrefix string) http.Handler { func (m *Master) ConstructHandler(apiPrefix string) http.Handler {
endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)

View File

@ -40,6 +40,7 @@ type PodCache struct {
podLock sync.Mutex podLock sync.Mutex
} }
// NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry.
func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache { func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache {
return &PodCache{ return &PodCache{
containerInfo: info, containerInfo: info,
@ -49,7 +50,7 @@ func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period ti
} }
} }
// Implements the PodInfoGetter interface. // GetPodInfo Implements the PodInfoGetter.GetPodInfo.
// The returned value should be treated as read-only. // The returned value should be treated as read-only.
func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) {
p.podLock.Lock() p.podLock.Lock()
@ -57,9 +58,8 @@ func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) {
value, ok := p.podInfo[podID] value, ok := p.podInfo[podID]
if !ok { if !ok {
return nil, errors.New("no cached pod info") return nil, errors.New("no cached pod info")
} else {
return value, nil
} }
return value, nil
} }
func (p *PodCache) updatePodInfo(host, id string) error { func (p *PodCache) updatePodInfo(host, id string) error {
@ -73,7 +73,7 @@ func (p *PodCache) updatePodInfo(host, id string) error {
return nil return nil
} }
// Update information about all containers. Either called by Loop() below, or one-off. // UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off.
func (p *PodCache) UpdateAllContainers() { func (p *PodCache) UpdateAllContainers() {
pods, err := p.pods.ListPods(labels.Everything()) pods, err := p.pods.ListPods(labels.Everything())
if err != nil { if err != nil {
@ -88,7 +88,8 @@ func (p *PodCache) UpdateAllContainers() {
} }
} }
// Loop runs forever, it is expected to be placed in a go routine. // Loop begins watching updates of container information.
// It runs forever, and is expected to be placed in a go routine.
func (p *PodCache) Loop() { func (p *PodCache) Loop() {
util.Forever(func() { p.UpdateAllContainers() }, p.period) util.Forever(func() { p.UpdateAllContainers() }, p.period)
} }

View File

@ -24,40 +24,44 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// Operation is a type of operation of services or endpoints.
type Operation int type Operation int
// These are the available operation types.
const ( const (
SET Operation = iota SET Operation = iota
ADD ADD
REMOVE REMOVE
) )
// Defines an operation sent on the channel. You can add or remove single services by // ServiceUpdate describes an operation of services, sent on the channel.
// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system // You can add or remove single services by sending an array of size one and Op == ADD|REMOVE.
// to a given state for this source configuration, set Services as desired and Op to SET, // For setting the state of the system to a given state for this source configuration, set Services as desired and Op to SET,
// which will reset the system state to that specified in this operation for this source // which will reset the system state to that specified in this operation for this source channel.
// channel. To remove all services, set Services to empty array and Op to SET // To remove all services, set Services to empty array and Op to SET
type ServiceUpdate struct { type ServiceUpdate struct {
Services []api.Service Services []api.Service
Op Operation Op Operation
} }
// Defines an operation sent on the channel. You can add or remove single endpoints by // EndpointsUpdate describes an operation of endpoints, sent on the channel.
// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system // You can add or remove single endpoints by sending an array of size one and Op == ADD|REMOVE.
// to a given state for this source configuration, set Endpoints as desired and Op to SET, // For setting the state of the system to a given state for this source configuration, set Endpoints as desired and Op to SET,
// which will reset the system state to that specified in this operation for this source // which will reset the system state to that specified in this operation for this source channel.
// channel. To remove all endpoints, set Endpoints to empty array and Op to SET // To remove all endpoints, set Endpoints to empty array and Op to SET
type EndpointsUpdate struct { type EndpointsUpdate struct {
Endpoints []api.Endpoints Endpoints []api.Endpoints
Op Operation Op Operation
} }
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
type ServiceConfigHandler interface { type ServiceConfigHandler interface {
// Sent when a configuration has been changed by one of the sources. This is the // OnUpdate gets called when a configuration has been changed by one of the sources.
// union of all the configuration sources. // This is the union of all the configuration sources.
OnUpdate(services []api.Service) OnUpdate(services []api.Service)
} }
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
type EndpointsConfigHandler interface { type EndpointsConfigHandler interface {
// OnUpdate gets called when endpoints configuration is changed for a given // OnUpdate gets called when endpoints configuration is changed for a given
// service on any of the configuration sources. An example is when a new // service on any of the configuration sources. An example is when a new
@ -65,6 +69,8 @@ type EndpointsConfigHandler interface {
OnUpdate(endpoints []api.Endpoints) OnUpdate(endpoints []api.Endpoints)
} }
// ServiceConfig tracks a set of service configurations and their endpoint configurations.
// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change.
type ServiceConfig struct { type ServiceConfig struct {
// Configuration sources and their lock. // Configuration sources and their lock.
configSourceLock sync.RWMutex configSourceLock sync.RWMutex
@ -94,6 +100,8 @@ type ServiceConfig struct {
endpointsNotifyChannel chan string endpointsNotifyChannel chan string
} }
// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig { func NewServiceConfig() *ServiceConfig {
config := &ServiceConfig{ config := &ServiceConfig{
serviceConfigSources: make(map[string]chan ServiceUpdate), serviceConfigSources: make(map[string]chan ServiceUpdate),
@ -109,22 +117,26 @@ func NewServiceConfig() *ServiceConfig {
return config return config
} }
// Run begins a loop to accept new service configurations and new endpoint configurations.
// It never returns.
func (impl *ServiceConfig) Run() { func (impl *ServiceConfig) Run() {
glog.Infof("Starting the config Run loop") glog.Infof("Starting the config Run loop")
for { for {
select { select {
case source := <-impl.serviceNotifyChannel: case source := <-impl.serviceNotifyChannel:
glog.Infof("Got new service configuration from source %s", source) glog.Infof("Got new service configuration from source %s", source)
impl.NotifyServiceUpdate() impl.notifyServiceUpdate()
case source := <-impl.endpointsNotifyChannel: case source := <-impl.endpointsNotifyChannel:
glog.Infof("Got new endpoint configuration from source %s", source) glog.Infof("Got new endpoint configuration from source %s", source)
impl.NotifyEndpointsUpdate() impl.notifyEndpointsUpdate()
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
} }
} }
} }
func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel chan ServiceUpdate) { // serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel.
// It never returns.
func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) {
// Represents the current services configuration for this channel. // Represents the current services configuration for this channel.
serviceMap := make(map[string]api.Service) serviceMap := make(map[string]api.Service)
for { for {
@ -160,7 +172,9 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c
} }
} }
func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { // endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel.
// It never returns.
func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) {
endpointMap := make(map[string]api.Endpoints) endpointMap := make(map[string]api.Endpoints)
for { for {
select { select {
@ -214,11 +228,11 @@ func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan Se
} }
newChannel := make(chan ServiceUpdate) newChannel := make(chan ServiceUpdate)
impl.serviceConfigSources[source] = newChannel impl.serviceConfigSources[source] = newChannel
go impl.ServiceChannelListener(source, newChannel) go impl.serviceChannelListener(source, newChannel)
return newChannel return newChannel
} }
// GetEndpointConfigurationChannel returns a channel where a configuration source // GetEndpointsConfigurationChannel returns a channel where a configuration source
// can send updates of new endpoint configurations. Multiple calls with the same // can send updates of new endpoint configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources // source will return the same channel. This allows change and state based sources
// to use the same channel. Difference source names however will be treated as a // to use the same channel. Difference source names however will be treated as a
@ -235,11 +249,11 @@ func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan
} }
newChannel := make(chan EndpointsUpdate) newChannel := make(chan EndpointsUpdate)
impl.endpointsConfigSources[source] = newChannel impl.endpointsConfigSources[source] = newChannel
go impl.EndpointsChannelListener(source, newChannel) go impl.endpointsChannelListener(source, newChannel)
return newChannel return newChannel
} }
// Register ServiceConfigHandler to receive updates of changes to services. // RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services.
func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) { func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) {
impl.handlerLock.Lock() impl.handlerLock.Lock()
defer impl.handlerLock.Unlock() defer impl.handlerLock.Unlock()
@ -255,7 +269,7 @@ func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler)
panic("Only up to 10 service handlers supported for now") panic("Only up to 10 service handlers supported for now")
} }
// Register ServiceConfigHandler to receive updates of changes to services. // RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services.
func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) { func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) {
impl.handlerLock.Lock() impl.handlerLock.Lock()
defer impl.handlerLock.Unlock() defer impl.handlerLock.Unlock()
@ -271,8 +285,9 @@ func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandl
panic("Only up to 10 endpoint handlers supported for now") panic("Only up to 10 endpoint handlers supported for now")
} }
func (impl *ServiceConfig) NotifyServiceUpdate() { // notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services.
services := make([]api.Service, 0) func (impl *ServiceConfig) notifyServiceUpdate() {
services := []api.Service{}
impl.configLock.RLock() impl.configLock.RLock()
for _, sourceServices := range impl.serviceConfig { for _, sourceServices := range impl.serviceConfig {
for _, value := range sourceServices { for _, value := range sourceServices {
@ -291,8 +306,9 @@ func (impl *ServiceConfig) NotifyServiceUpdate() {
} }
} }
func (impl *ServiceConfig) NotifyEndpointsUpdate() { // notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints.
endpoints := make([]api.Endpoints, 0) func (impl *ServiceConfig) notifyEndpointsUpdate() {
endpoints := []api.Endpoints{}
impl.configLock.RLock() impl.configLock.RLock()
for _, sourceEndpoints := range impl.endpointConfig { for _, sourceEndpoints := range impl.endpointConfig {
for _, value := range sourceEndpoints { for _, value := range sourceEndpoints {

View File

@ -15,7 +15,7 @@ limitations under the License.
*/ */
// Watches etcd and gets the full configuration on preset intervals. // Watches etcd and gets the full configuration on preset intervals.
// Expects the list of exposed services to live under: // It expects the list of exposed services to live under:
// registry/services // registry/services
// which in etcd is exposed like so: // which in etcd is exposed like so:
// http://<etcd server>/v2/keys/registry/services // http://<etcd server>/v2/keys/registry/services
@ -30,7 +30,7 @@ limitations under the License.
// '[ { "machine": <host>, "name": <name", "port": <port> }, // '[ { "machine": <host>, "name": <name", "port": <port> },
// { "machine": <host2>, "name": <name2", "port": <port2> } // { "machine": <host2>, "name": <name2", "port": <port2> }
// ]', // ]',
//
package config package config
import ( import (
@ -44,14 +44,17 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
const RegistryRoot = "registry/services" // registryRoot is the key prefix for service configs in etcd.
const registryRoot = "registry/services"
// ConfigSourceEtcd communicates with a etcd via the client, and sends the change notification of services and endpoints to the specified channels.
type ConfigSourceEtcd struct { type ConfigSourceEtcd struct {
client *etcd.Client client *etcd.Client
serviceChannel chan ServiceUpdate serviceChannel chan ServiceUpdate
endpointsChannel chan EndpointsUpdate endpointsChannel chan EndpointsUpdate
} }
// NewConfigSourceEtcd creates a new ConfigSourceEtcd and immediately runs the created ConfigSourceEtcd in a goroutine.
func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd { func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd {
config := ConfigSourceEtcd{ config := ConfigSourceEtcd{
client: client, client: client,
@ -62,13 +65,14 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate,
return config return config
} }
// Run begins watching for new services and their endpoints on etcd.
func (impl ConfigSourceEtcd) Run() { func (impl ConfigSourceEtcd) Run() {
// Initially, just wait for the etcd to come up before doing anything more complicated. // Initially, just wait for the etcd to come up before doing anything more complicated.
var services []api.Service var services []api.Service
var endpoints []api.Endpoints var endpoints []api.Endpoints
var err error var err error
for { for {
services, endpoints, err = impl.GetServices() services, endpoints, err = impl.getServices()
if err == nil { if err == nil {
break break
} }
@ -87,10 +91,10 @@ func (impl ConfigSourceEtcd) Run() {
// Ok, so we got something back from etcd. Let's set up a watch for new services, and // Ok, so we got something back from etcd. Let's set up a watch for new services, and
// their endpoints // their endpoints
go impl.WatchForChanges() go impl.watchForChanges()
for { for {
services, endpoints, err = impl.GetServices() services, endpoints, err = impl.getServices()
if err != nil { if err != nil {
glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err) glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err)
} else { } else {
@ -107,12 +111,12 @@ func (impl ConfigSourceEtcd) Run() {
} }
} }
// Finds the list of services and their endpoints from etcd. // getServices finds the list of services and their endpoints from etcd.
// This operation is akin to a set a known good at regular intervals. // This operation is akin to a set a known good at regular intervals.
func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) { func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) {
response, err := impl.client.Get(RegistryRoot+"/specs", true, false) response, err := impl.client.Get(registryRoot+"/specs", true, false)
if err != nil { if err != nil {
glog.Errorf("Failed to get the key %s: %v", RegistryRoot, err) glog.Errorf("Failed to get the key %s: %v", registryRoot, err)
return make([]api.Service, 0), make([]api.Endpoints, 0), err return make([]api.Service, 0), make([]api.Endpoints, 0), err
} }
if response.Node.Dir == true { if response.Node.Dir == true {
@ -129,7 +133,7 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro
continue continue
} }
retServices[i] = svc retServices[i] = svc
endpoints, err := impl.GetEndpoints(svc.ID) endpoints, err := impl.getEndpoints(svc.ID)
if err != nil { if err != nil {
glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err) glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
} }
@ -138,23 +142,23 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro
} }
return retServices, retEndpoints, err return retServices, retEndpoints, err
} }
return nil, nil, fmt.Errorf("did not get the root of the registry %s", RegistryRoot) return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot)
} }
func (impl ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) { // getEndpoints finds the list of endpoints of the service from etcd.
key := fmt.Sprintf(RegistryRoot + "/endpoints/" + service) func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) {
key := fmt.Sprintf(registryRoot + "/endpoints/" + service)
response, err := impl.client.Get(key, true, false) response, err := impl.client.Get(key, true, false)
if err != nil { if err != nil {
glog.Errorf("Failed to get the key: %s %v", key, err) glog.Errorf("Failed to get the key: %s %v", key, err)
return api.Endpoints{}, err return api.Endpoints{}, err
} }
// Parse all the endpoint specifications in this value. // Parse all the endpoint specifications in this value.
return ParseEndpoints(response.Node.Value) return parseEndpoints(response.Node.Value)
} }
// EtcdResponseToServiceAndLocalport takes an etcd response and pulls it apart to find // etcdResponseToService takes an etcd response and pulls it apart to find service.
// service func etcdResponseToService(response *etcd.Response) (*api.Service, error) {
func EtcdResponseToService(response *etcd.Response) (*api.Service, error) {
if response.Node == nil { if response.Node == nil {
return nil, fmt.Errorf("invalid response from etcd: %#v", response) return nil, fmt.Errorf("invalid response from etcd: %#v", response)
} }
@ -166,31 +170,31 @@ func EtcdResponseToService(response *etcd.Response) (*api.Service, error) {
return &svc, err return &svc, err
} }
func ParseEndpoints(jsonString string) (api.Endpoints, error) { func parseEndpoints(jsonString string) (api.Endpoints, error) {
var e api.Endpoints var e api.Endpoints
err := json.Unmarshal([]byte(jsonString), &e) err := json.Unmarshal([]byte(jsonString), &e)
return e, err return e, err
} }
func (impl ConfigSourceEtcd) WatchForChanges() { func (impl ConfigSourceEtcd) watchForChanges() {
glog.Info("Setting up a watch for new services") glog.Info("Setting up a watch for new services")
watchChannel := make(chan *etcd.Response) watchChannel := make(chan *etcd.Response)
go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil) go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil)
for { for {
watchResponse := <-watchChannel watchResponse := <-watchChannel
impl.ProcessChange(watchResponse) impl.processChange(watchResponse)
} }
} }
func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) { func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
glog.Infof("Processing a change in service configuration... %s", *response) glog.Infof("Processing a change in service configuration... %s", *response)
// If it's a new service being added (signified by a localport being added) // If it's a new service being added (signified by a localport being added)
// then process it as such // then process it as such
if strings.Contains(response.Node.Key, "/endpoints/") { if strings.Contains(response.Node.Key, "/endpoints/") {
impl.ProcessEndpointResponse(response) impl.processEndpointResponse(response)
} else if response.Action == "set" { } else if response.Action == "set" {
service, err := EtcdResponseToService(response) service, err := etcdResponseToService(response)
if err != nil { if err != nil {
glog.Errorf("Failed to parse %s Port: %s", response, err) glog.Errorf("Failed to parse %s Port: %s", response, err)
return return
@ -208,13 +212,12 @@ func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}} serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}}
impl.serviceChannel <- serviceUpdate impl.serviceChannel <- serviceUpdate
return return
} else {
glog.Infof("Unknown service delete: %#v", parts)
} }
glog.Infof("Unknown service delete: %#v", parts)
} }
} }
func (impl ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) {
glog.Infof("Processing a change in endpoint configuration... %s", *response) glog.Infof("Processing a change in endpoint configuration... %s", *response)
var endpoints api.Endpoints var endpoints api.Endpoints
err := json.Unmarshal([]byte(response.Node.Value), &endpoints) err := json.Unmarshal([]byte(response.Node.Value), &endpoints)

View File

@ -26,15 +26,15 @@ import (
const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34" const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34"
const TomcatService = "tomcat" const TomcatService = "tomcat"
const TomcatContainerId = "tomcat-3bd5af34" const TomcatContainerID = "tomcat-3bd5af34"
func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { func validateJSONParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) {
endpoints, err := ParseEndpoints(jsonString) endpoints, err := parseEndpoints(jsonString)
if err == nil && expectError { if err == nil && expectError {
t.Errorf("ValidateJsonParsing did not get expected error when parsing %s", jsonString) t.Errorf("validateJSONParsing did not get expected error when parsing %s", jsonString)
} }
if err != nil && !expectError { if err != nil && !expectError {
t.Errorf("ValidateJsonParsing got unexpected error %+v when parsing %s", err, jsonString) t.Errorf("validateJSONParsing got unexpected error %+v when parsing %s", err, jsonString)
} }
if !reflect.DeepEqual(expectedEndpoints, endpoints) { if !reflect.DeepEqual(expectedEndpoints, endpoints) {
t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints) t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints)
@ -42,7 +42,7 @@ func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api.
} }
func TestParseJsonEndpoints(t *testing.T) { func TestParseJsonEndpoints(t *testing.T) {
ValidateJsonParsing(t, "", api.Endpoints{}, true) validateJSONParsing(t, "", api.Endpoints{}, true)
endpoints := api.Endpoints{ endpoints := api.Endpoints{
Name: "foo", Name: "foo",
Endpoints: []string{"foo", "bar", "baz"}, Endpoints: []string{"foo", "bar", "baz"},
@ -51,6 +51,6 @@ func TestParseJsonEndpoints(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("Unexpected error: %#v", err) t.Errorf("Unexpected error: %#v", err)
} }
ValidateJsonParsing(t, string(data), endpoints, false) validateJSONParsing(t, string(data), endpoints, false)
// ValidateJsonParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) // validateJSONParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false)
} }

View File

@ -28,6 +28,7 @@ limitations under the License.
// } // }
//] //]
//} //}
package config package config
import ( import (
@ -41,22 +42,23 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// TODO: kill this struct. // serviceConfig is a deserialized form of the config file format which ConfigSourceFile accepts.
type ServiceJSON struct { type serviceConfig struct {
Name string Services []struct {
Port int Name string `json: "name"`
Endpoints []string Port int `json: "port"`
} Endpoints []string `json: "endpoints"`
type ConfigFile struct { } `json: "service"`
Services []ServiceJSON
} }
// ConfigSourceFile periodically reads service configurations in JSON from a file, and sends the services and endpoints defined in the file to the specified channels.
type ConfigSourceFile struct { type ConfigSourceFile struct {
serviceChannel chan ServiceUpdate serviceChannel chan ServiceUpdate
endpointsChannel chan EndpointsUpdate endpointsChannel chan EndpointsUpdate
filename string filename string
} }
// NewConfigSourceFile creates a new ConfigSourceFile and let it immediately runs the created ConfigSourceFile in a goroutine.
func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile { func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile {
config := ConfigSourceFile{ config := ConfigSourceFile{
filename: filename, filename: filename,
@ -67,6 +69,7 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end
return config return config
} }
// Run begins watching the config file.
func (impl ConfigSourceFile) Run() { func (impl ConfigSourceFile) Run() {
glog.Infof("Watching file %s", impl.filename) glog.Infof("Watching file %s", impl.filename)
var lastData []byte var lastData []byte
@ -85,7 +88,7 @@ func (impl ConfigSourceFile) Run() {
} }
lastData = data lastData = data
config := new(ConfigFile) config := &serviceConfig{}
if err = json.Unmarshal(data, config); err != nil { if err = json.Unmarshal(data, config); err != nil {
glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err) glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err)
continue continue

View File

@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
// Package proxy implements the layer-3 network proxy // Package proxy implements the layer-3 network proxy.
package proxy package proxy

View File

@ -22,6 +22,8 @@ import (
"net" "net"
) )
// LoadBalancer represents a load balancer that decides where to route
// the incoming services for a particular service to.
type LoadBalancer interface { type LoadBalancer interface {
// LoadBalance takes an incoming request and figures out where to route it to. // LoadBalance takes an incoming request and figures out where to route it to.
// Determination is based on destination service (for example, 'mysql') as // Determination is based on destination service (for example, 'mysql') as

View File

@ -33,11 +33,12 @@ type Proxier struct {
serviceMap map[string]int serviceMap map[string]int
} }
// NewProxier returns a newly created and correctly initialized instance of Proxier.
func NewProxier(loadBalancer LoadBalancer) *Proxier { func NewProxier(loadBalancer LoadBalancer) *Proxier {
return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)}
} }
func CopyBytes(in, out *net.TCPConn) { func copyBytes(in, out *net.TCPConn) {
glog.Infof("Copying from %v <-> %v <-> %v <-> %v", glog.Infof("Copying from %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
_, err := io.Copy(in, out) _, err := io.Copy(in, out)
@ -49,14 +50,17 @@ func CopyBytes(in, out *net.TCPConn) {
out.CloseWrite() out.CloseWrite()
} }
// Create a bidirectional byte shuffler. Copies bytes to/from each connection. // proxyConnection creates a bidirectional byte shuffler.
func ProxyConnection(in, out *net.TCPConn) { // It copies bytes to/from each connection.
func proxyConnection(in, out *net.TCPConn) {
glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v", glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
go CopyBytes(in, out) go copyBytes(in, out)
go CopyBytes(out, in) go copyBytes(out, in)
} }
// AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints.
// It never returns.
func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
for { for {
inConn, err := listener.Accept() inConn, err := listener.Accept()
@ -83,12 +87,12 @@ func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
inConn.Close() inConn.Close()
continue continue
} }
ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
} }
} }
// AddService starts listening for a new service on a given port. // addService starts listening for a new service on a given port.
func (proxier Proxier) AddService(service string, port int) error { func (proxier Proxier) addService(service string, port int) error {
// Make sure we can start listening on the port before saying all's well. // Make sure we can start listening on the port before saying all's well.
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil { if err != nil {
@ -117,18 +121,21 @@ func (proxier Proxier) addServiceCommon(service string, l net.Listener) {
go proxier.AcceptHandler(service, l) go proxier.AcceptHandler(service, l)
} }
// OnUpdate recieves update notices for the updated services and start listening newly added services.
// It implements "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config".ServiceConfigHandler.OnUpdate.
func (proxier Proxier) OnUpdate(services []api.Service) { func (proxier Proxier) OnUpdate(services []api.Service) {
glog.Infof("Received update notice: %+v", services) glog.Infof("Received update notice: %+v", services)
for _, service := range services { for _, service := range services {
port, exists := proxier.serviceMap[service.ID] port, exists := proxier.serviceMap[service.ID]
if !exists || port != service.Port { if exists && port == service.Port {
glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) continue
err := proxier.AddService(service.ID, service.Port)
if err == nil {
proxier.serviceMap[service.ID] = service.Port
} else {
glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port)
}
} }
glog.Infof("Adding a new service %s on port %d", service.ID, service.Port)
err := proxier.addService(service.ID, service.Port)
if err != nil {
glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port)
continue
}
proxier.serviceMap[service.ID] = service.Port
} }
} }

View File

@ -29,22 +29,25 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// LoadBalancerRR is a round-robin load balancer. It implements LoadBalancer.
type LoadBalancerRR struct { type LoadBalancerRR struct {
lock sync.RWMutex lock sync.RWMutex
endpointsMap map[string][]string endpointsMap map[string][]string
rrIndex map[string]int rrIndex map[string]int
} }
// NewLoadBalancerRR returns a newly created and correctly initialized instance of LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR { func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)} return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)}
} }
// LoadBalance selects an endpoint of the service by round-robin algorithm.
func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) { func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) {
impl.lock.RLock() impl.lock.RLock()
endpoints, exists := impl.endpointsMap[service] endpoints, exists := impl.endpointsMap[service]
index := impl.rrIndex[service] index := impl.rrIndex[service]
impl.lock.RUnlock() impl.lock.RUnlock()
if exists == false { if !exists {
return "", errors.New("no service entry for:" + service) return "", errors.New("no service entry for:" + service)
} }
if len(endpoints) == 0 { if len(endpoints) == 0 {
@ -55,7 +58,7 @@ func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string
return endpoint, nil return endpoint, nil
} }
func (impl LoadBalancerRR) IsValid(spec string) bool { func (impl LoadBalancerRR) isValid(spec string) bool {
_, port, err := net.SplitHostPort(spec) _, port, err := net.SplitHostPort(spec)
if err != nil { if err != nil {
return false return false
@ -67,16 +70,19 @@ func (impl LoadBalancerRR) IsValid(spec string) bool {
return value > 0 return value > 0
} }
func (impl LoadBalancerRR) FilterValidEndpoints(endpoints []string) []string { func (impl LoadBalancerRR) filterValidEndpoints(endpoints []string) []string {
var result []string var result []string
for _, spec := range endpoints { for _, spec := range endpoints {
if impl.IsValid(spec) { if impl.isValid(spec) {
result = append(result, spec) result = append(result, spec)
} }
} }
return result return result
} }
// OnUpdate updates the registered endpoints with the new
// endpoint information, removes the registered endpoints
// no longer present in the provided endpoints.
func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) {
tmp := make(map[string]bool) tmp := make(map[string]bool)
impl.lock.Lock() impl.lock.Lock()
@ -84,7 +90,7 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) {
// First update / add all new endpoints for services. // First update / add all new endpoints for services.
for _, value := range endpoints { for _, value := range endpoints {
existingEndpoints, exists := impl.endpointsMap[value.Name] existingEndpoints, exists := impl.endpointsMap[value.Name]
validEndpoints := impl.FilterValidEndpoints(value.Endpoints) validEndpoints := impl.filterValidEndpoints(value.Endpoints)
if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) { if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) {
glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints) glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints)
impl.endpointsMap[value.Name] = validEndpoints impl.endpointsMap[value.Name] = validEndpoints

View File

@ -24,16 +24,16 @@ import (
func TestLoadBalanceValidateWorks(t *testing.T) { func TestLoadBalanceValidateWorks(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
if loadBalancer.IsValid("") { if loadBalancer.isValid("") {
t.Errorf("Didn't fail for empty string") t.Errorf("Didn't fail for empty string")
} }
if loadBalancer.IsValid("foobar") { if loadBalancer.isValid("foobar") {
t.Errorf("Didn't fail with no port") t.Errorf("Didn't fail with no port")
} }
if loadBalancer.IsValid("foobar:-1") { if loadBalancer.isValid("foobar:-1") {
t.Errorf("Didn't fail with a negative port") t.Errorf("Didn't fail with a negative port")
} }
if !loadBalancer.IsValid("foobar:8080") { if !loadBalancer.isValid("foobar:8080") {
t.Errorf("Failed a valid config.") t.Errorf("Failed a valid config.")
} }
} }
@ -41,7 +41,7 @@ func TestLoadBalanceValidateWorks(t *testing.T) {
func TestLoadBalanceFilterWorks(t *testing.T) { func TestLoadBalanceFilterWorks(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"} endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"}
filtered := loadBalancer.FilterValidEndpoints(endpoints) filtered := loadBalancer.filterValidEndpoints(endpoints)
if len(filtered) != 3 { if len(filtered) != 3 {
t.Errorf("Failed to filter to the correct size") t.Errorf("Failed to filter to the correct size")
@ -59,7 +59,7 @@ func TestLoadBalanceFilterWorks(t *testing.T) {
func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR() loadBalancer := NewLoadBalancerRR()
endpoints := make([]api.Endpoints, 0) var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
endpoint, err := loadBalancer.LoadBalance("foo", nil) endpoint, err := loadBalancer.LoadBalance("foo", nil)
if err == nil { if err == nil {

View File

@ -32,7 +32,7 @@ type RandomFitScheduler struct {
randomLock sync.Mutex randomLock sync.Mutex
} }
func MakeRandomFitScheduler(podLister PodLister, random *rand.Rand) Scheduler { func NewRandomFitScheduler(podLister PodLister, random *rand.Rand) Scheduler {
return &RandomFitScheduler{ return &RandomFitScheduler{
podLister: podLister, podLister: podLister,
random: random, random: random,

View File

@ -28,7 +28,7 @@ func TestRandomFitSchedulerNothingScheduled(t *testing.T) {
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeRandomFitScheduler(&fakeRegistry, r), scheduler: NewRandomFitScheduler(&fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectSchedule(api.Pod{}, "m3") st.expectSchedule(api.Pod{}, "m3")
@ -41,7 +41,7 @@ func TestRandomFitSchedulerFirstScheduled(t *testing.T) {
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeRandomFitScheduler(fakeRegistry, r), scheduler: NewRandomFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectSchedule(makePod("", 8080), "m3") st.expectSchedule(makePod("", 8080), "m3")
@ -56,7 +56,7 @@ func TestRandomFitSchedulerFirstScheduledComplicated(t *testing.T) {
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeRandomFitScheduler(fakeRegistry, r), scheduler: NewRandomFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectSchedule(makePod("", 8080, 8081), "m3") st.expectSchedule(makePod("", 8080, 8081), "m3")
@ -71,7 +71,7 @@ func TestRandomFitSchedulerFirstScheduledImpossible(t *testing.T) {
r := rand.New(rand.NewSource(0)) r := rand.New(rand.NewSource(0))
st := schedulerTester{ st := schedulerTester{
t: t, t: t,
scheduler: MakeRandomFitScheduler(fakeRegistry, r), scheduler: NewRandomFitScheduler(fakeRegistry, r),
minionLister: FakeMinionLister{"m1", "m2", "m3"}, minionLister: FakeMinionLister{"m1", "m2", "m3"},
} }
st.expectFailure(makePod("", 8080, 8081)) st.expectFailure(makePod("", 8080, 8081))