diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 3eebffdea4b..0be36be1639 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -18,10 +18,10 @@ package master import ( "net" - "reflect" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" @@ -38,7 +38,7 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) { glog.Errorf("Can't create master namespace: %v", err) } if m.serviceReadWriteIP != nil { - if err := m.createMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil { + if err := m.createMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil && !errors.IsAlreadyExists(err) { glog.Errorf("Can't create rw service: %v", err) } if err := m.setEndpoints("kubernetes", m.clusterIP, m.publicReadWritePort); err != nil { @@ -63,7 +63,7 @@ func (m *Master) roServiceWriterLoop(stop chan struct{}) { glog.Errorf("Can't create master namespace: %v", err) } if m.serviceReadOnlyIP != nil { - if err := m.createMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil { + if err := m.createMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil && !errors.IsAlreadyExists(err) { glog.Errorf("Can't create ro service: %v", err) } if err := m.setEndpoints("kubernetes-ro", m.clusterIP, m.publicReadOnlyPort); err != nil { @@ -128,15 +128,20 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I return err } -// setEndpoints sets the endpoints for the given service. -// TODO: in a multi-master scenario this needs to consider all masters. +// setEndpoints sets the endpoints for the given apiserver service (ro or rw). +// setEndpoints expects that the endpoints objects it manages will all be +// managed only by setEndpoints; therefore, to understand this, you need only +// understand the requirements and the body of this function. +// +// Requirements: +// * All apiservers MUST use the same ports for their {rw, ro} services. +// * All apiservers MUST use setEndpoints and only setEndpoints to manage the +// endpoints for their {rw, ro} services. +// * All apiservers MUST know and agree on the number of apiservers expected +// to be running (m.masterCount). +// * setEndpoints is called periodically from all apiservers. +// func (m *Master) setEndpoints(serviceName string, ip net.IP, port int) error { - // The setting we want to find. - want := []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: ip.String()}}, - Ports: []api.EndpointPort{{Port: port, Protocol: api.ProtocolTCP}}, - }} - ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) if err != nil { @@ -147,11 +152,63 @@ func (m *Master) setEndpoints(serviceName string, ip net.IP, port int) error { }, } } - if !reflect.DeepEqual(e.Subsets, want) { - e.Subsets = want - glog.Infof("setting endpoints for master service %q to %v", serviceName, e) + + // First, determine if the endpoint is in the format we expect (one + // subset, one port, N IP addresses). + formatCorrect, ipCorrect := m.checkEndpointSubsetFormat(e, ip.String(), port) + if !formatCorrect { + // Something is egregiously wrong, just re-make the endpoints record. + e.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: ip.String()}}, + Ports: []api.EndpointPort{{Port: port, Protocol: api.ProtocolTCP}}, + }} + glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) + return m.endpointRegistry.UpdateEndpoints(ctx, e) + } else if !ipCorrect { + // We *always* add our own IP address; if there are too many IP + // addresses, we remove the ones lexicographically after our + // own IP address. Given the requirements stated at the top of + // this function, this should cause the list of IP addresses to + // become eventually correct. + e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()}) + e.Subsets = endpoints.RepackSubsets(e.Subsets) + if addrs := &e.Subsets[0].Addresses; len(*addrs) > m.masterCount { + // addrs is a pointer because we're going to mutate it. + for i, addr := range *addrs { + if addr.IP == ip.String() { + for len(*addrs) > m.masterCount { + remove := (i + 1) % len(*addrs) + *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...) + } + break + } + } + } return m.endpointRegistry.UpdateEndpoints(ctx, e) } // We didn't make any changes, no need to actually call update. return nil } + +// Determine if the endpoint is in the format setEndpoints expect (one subset, +// one port, N IP addresses); and if the specified IP address is present and +// the correct number of ip addresses are found. +func (m *Master) checkEndpointSubsetFormat(e *api.Endpoints, ip string, port int) (formatCorrect, ipCorrect bool) { + if len(e.Subsets) != 1 { + return false, false + } + sub := &e.Subsets[0] + if len(sub.Ports) != 1 { + return false, false + } + p := &sub.Ports[0] + if p.Port != port || p.Protocol != api.ProtocolTCP { + return false, false + } + for _, addr := range sub.Addresses { + if addr.IP == ip { + return true, len(sub.Addresses) == m.masterCount + } + } + return true, false +} diff --git a/pkg/master/publish_test.go b/pkg/master/publish_test.go index 38cab3b4bd0..bdda8642833 100644 --- a/pkg/master/publish_test.go +++ b/pkg/master/publish_test.go @@ -26,21 +26,32 @@ import ( ) func TestSetEndpoints(t *testing.T) { + ns := api.NamespaceDefault + om := func(name string) api.ObjectMeta { + return api.ObjectMeta{Namespace: ns, Name: name} + } tests := []struct { - testName string - serviceName string - ip string - port int - endpoints *api.EndpointsList - expectUpdate bool + testName string + serviceName string + ip string + port int + additionalMasters int + endpoints *api.EndpointsList + expectUpdate *api.Endpoints // nil means none expected }{ { - testName: "no existing endpoints", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, - endpoints: nil, - expectUpdate: true, + testName: "no existing endpoints", + serviceName: "foo", + ip: "1.2.3.4", + port: 8080, + endpoints: nil, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, }, { testName: "existing endpoints satisfy", @@ -49,14 +60,13 @@ func TestSetEndpoints(t *testing.T) { port: 8080, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: false, }, { testName: "existing endpoints satisfy but too many", @@ -65,14 +75,88 @@ func TestSetEndpoints(t *testing.T) { port: 8080, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: true, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints satisfy but too many + extra masters", + serviceName: "foo", + ip: "1.2.3.4", + port: 8080, + additionalMasters: 3, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.1"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints satisfy but too many + extra masters + delete first", + serviceName: "foo", + ip: "4.3.2.4", + port: 8080, + additionalMasters: 3, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.1"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "4.3.2.1"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, }, { testName: "existing endpoints wrong name", @@ -81,14 +165,20 @@ func TestSetEndpoints(t *testing.T) { port: 8080, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, + ObjectMeta: om("bar"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: true, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, }, { testName: "existing endpoints wrong IP", @@ -97,14 +187,20 @@ func TestSetEndpoints(t *testing.T) { port: 8080, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: true, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, }, { testName: "existing endpoints wrong port", @@ -113,14 +209,20 @@ func TestSetEndpoints(t *testing.T) { port: 8080, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{{Port: 9090, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: true, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, }, { testName: "existing endpoints wrong protocol", @@ -129,18 +231,24 @@ func TestSetEndpoints(t *testing.T) { port: 8080, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "UDP"}}, }}, }}, }, - expectUpdate: true, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }, }, } for _, test := range tests { - master := Master{} + master := Master{masterCount: test.additionalMasters + 1} registry := ®istrytest.EndpointRegistry{ Endpoints: test.endpoints, } @@ -149,18 +257,14 @@ func TestSetEndpoints(t *testing.T) { if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } - if test.expectUpdate { - expectedSubsets := []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, - }} + if test.expectUpdate != nil { if len(registry.Updates) != 1 { t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) - } else if !reflect.DeepEqual(expectedSubsets, registry.Updates[0].Subsets) { - t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, expectedSubsets, registry.Updates[0].Subsets) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } - if !test.expectUpdate && len(registry.Updates) > 0 { + if test.expectUpdate == nil && len(registry.Updates) > 0 { t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) } }