Merge pull request #28655 from freehan/kubeproxyfix

Automatic merge from submit-queue

Don't delete affinity when endpoints are empty

closes: #25316
This commit is contained in:
k8s-merge-robot 2016-07-08 11:28:43 -07:00 committed by GitHub
commit 04602bb9e5
4 changed files with 44 additions and 28 deletions

View File

@ -29,5 +29,6 @@ type LoadBalancer interface {
// service-port and source address. // service-port and source address.
NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error
DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName)
} }

View File

@ -447,6 +447,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
if err != nil { if err != nil {
glog.Errorf("Failed to stop service %q: %v", name, err) glog.Errorf("Failed to stop service %q: %v", name, err)
} }
proxier.loadBalancer.DeleteService(name)
} }
} }
} }

View File

@ -536,15 +536,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]api.Endpoints{ endpoint := api.Endpoints{
{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{
Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }},
}}, }
}, lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil { if err != nil {
@ -569,6 +568,8 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
// need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
@ -588,15 +589,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]api.Endpoints{ endpoint := api.Endpoints{
{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{
Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, }},
}}, }
}, lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil { if err != nil {
@ -621,6 +621,8 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
// need to add endpoint here because it got clean up during service delete
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p.OnServiceUpdate([]api.Service{{ p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
@ -785,15 +787,14 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
func TestProxyUpdatePortal(t *testing.T) { func TestProxyUpdatePortal(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsUpdate([]api.Endpoints{ endpoint := api.Endpoints{
{ ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{
Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}},
Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }},
}}, }
}, lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
if err != nil { if err != nil {
@ -842,6 +843,7 @@ func TestProxyUpdatePortal(t *testing.T) {
Protocol: "TCP", Protocol: "TCP",
}}}, }}},
}}) }})
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
svcInfo, exists = p.getServiceInfo(service) svcInfo, exists = p.getServiceInfo(service)
if !exists { if !exists {
t.Fatalf("service with ClusterIP set not found in the proxy") t.Fatalf("service with ClusterIP set not found in the proxy")

View File

@ -82,6 +82,7 @@ func NewLoadBalancerRR() *LoadBalancerRR {
} }
func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error {
glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort)
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
lb.newServiceInternal(svcPort, affinityType, ttlMinutes) lb.newServiceInternal(svcPort, affinityType, ttlMinutes)
@ -103,6 +104,13 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi
return lb.services[svcPort] return lb.services[svcPort]
} }
func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
glog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
delete(lb.services, svcPort)
}
// return true if this service is using some form of session affinity. // return true if this service is using some form of session affinity.
func isSessionAffinity(affinity *affinityPolicy) bool { func isSessionAffinity(affinity *affinityPolicy) bool {
// Should never be empty string, but checking for it to be safe. // Should never be empty string, but checking for it to be safe.
@ -281,7 +289,11 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
for k := range lb.services { for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists { if _, exists := registeredEndpoints[k]; !exists {
glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k) glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k)
delete(lb.services, k) // Reset but don't delete.
state := lb.services[k]
state.endpoints = []string{}
state.index = 0
state.affinity.affinityMap = map[string]*affinityState{}
} }
} }
} }