Merge pull request #71895 from DataDog/lbernail/svc-graceful-deletion

Support IPVS graceful termination when deleting a service
This commit is contained in:
Kubernetes Prow Robot 2018-12-09 21:57:00 -08:00 committed by GitHub
commit 27fca554e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 98 deletions

View File

@ -1190,7 +1190,15 @@ func (proxier *Proxier) syncProxyRules() {
}
proxier.portsMap = replacementPortsMap
// Clean up legacy IPVS services
// Get legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to get bind address, err: %v", err)
}
legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
// Clean up legacy IPVS services and unbind addresses
appliedSvcs, err := proxier.ipvs.GetVirtualServers()
if err == nil {
for _, appliedSvc := range appliedSvcs {
@ -1199,15 +1207,7 @@ func (proxier *Proxier) syncProxyRules() {
} else {
klog.Errorf("Failed to get ipvs service, err: %v", err)
}
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
// Clean up legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to get bind address, err: %v", err)
}
proxier.cleanLegacyBindAddr(activeBindAddrs, currentBindAddrs)
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
// Update healthz timestamp
if proxier.healthzServer != nil {
@ -1605,29 +1605,34 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
if err != nil {
klog.Errorf("Failed to delete destination: %v, error: %v", delDest, err)
klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
continue
}
}
return nil
}
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
for cs := range currentServices {
svc := currentServices[cs]
if _, ok := activeServices[cs]; !ok {
// This service was not processed in the latest sync loop so before deleting it,
// make sure it does not fall within an excluded CIDR range.
okayToDelete := true
rsList, _ := proxier.ipvs.GetRealServers(svc)
// If we still have real servers graceful termination is not done
if len(rsList) > 0 {
okayToDelete = false
}
// Applying graceful termination to all real servers
for _, rs := range rsList {
uniqueRS := GetUniqueRSName(svc, rs)
// if there are in terminating real server in this service, then handle it later
if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
okayToDelete = false
break
klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
if err := proxier.gracefuldeleteManager.GracefulDeleteRS(svc, rs); err != nil {
klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
}
}
// make sure it does not fall within an excluded CIDR range.
for _, excludedCIDR := range proxier.excludeCIDRs {
// Any validation of this CIDR already should have occurred.
_, n, _ := net.ParseCIDR(excludedCIDR)
@ -1637,26 +1642,33 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre
}
}
if okayToDelete {
klog.V(4).Infof("Delete service %s", svc.String())
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
klog.Errorf("Failed to delete service, error: %v", err)
klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
}
addr := svc.Address.String()
if _, ok := legacyBindAddrs[addr]; ok {
klog.V(4).Infof("Unbinding address %s", addr)
if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
} else {
// In case we delete a multi-port service, avoid trying to unbind multiple times
delete(legacyBindAddrs, addr)
}
}
}
}
}
}
func (proxier *Proxier) cleanLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) {
func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool {
legacyAddrs := make(map[string]bool)
for _, addr := range currentBindAddrs {
if _, ok := activeBindAddrs[addr]; !ok {
// This address was not processed in the latest sync loop
klog.V(4).Infof("Unbind addr %s", addr)
err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice)
// Ignore no such address error when try to unbind address
if err != nil {
klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
}
legacyAddrs[addr] = true
}
}
return legacyAddrs
}
// Join all words with spaces, terminate with newline and write to buff.

View File

@ -23,7 +23,6 @@ import (
"reflect"
"strings"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -126,7 +125,7 @@ func (fakeSysctl *FakeSysctl) SetSysctl(sysctl string, newVal int) error {
return nil
}
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP) *Proxier {
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []string) *Proxier {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("dummy device have been created"), nil },
@ -151,7 +150,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
excludeCIDRs: make([]string, 0),
excludeCIDRs: excludeCIDRs,
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
@ -228,7 +227,7 @@ func TestCleanupLeftovers(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -418,7 +417,7 @@ func TestNodePortUDP(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil)
svcIP := "10.20.30.41"
svcPort := 80
@ -495,7 +494,7 @@ func TestNodePort(t *testing.T) {
nodeIPv4 := net.ParseIP("100.101.102.103")
nodeIPv6 := net.ParseIP("2001:db8::1:1")
nodeIPs := sets.NewString(nodeIPv4.String(), nodeIPv6.String())
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIPv4, nodeIPv6})
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIPv4, nodeIPv6}, nil)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -573,7 +572,7 @@ func TestNodePortNoEndpoint(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -628,7 +627,7 @@ func TestClusterIPNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
svcIP := "10.20.30.41"
svcPort := 80
svcPortName := proxy.ServicePortName{
@ -672,7 +671,7 @@ func TestClusterIP(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
svcIPv4 := "10.20.30.41"
svcPortV4 := 80
@ -779,7 +778,7 @@ func TestExternalIPsNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := "50.60.70.81"
@ -834,7 +833,7 @@ func TestExternalIPs(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := sets.NewString("50.60.70.81", "2012::51", "127.0.0.1")
@ -1338,7 +1337,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
services := []*v1.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
@ -1448,7 +1447,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
makeServiceMap(fp,
makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
@ -1487,7 +1486,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
makeServiceMap(fp,
makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
@ -1515,7 +1514,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@ -1599,7 +1598,7 @@ func TestSessionAffinity(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -2462,7 +2461,7 @@ func Test_updateEndpointsMap(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp.hostname = nodeName
// First check that after adding all previous versions of endpoints,
@ -2706,7 +2705,7 @@ func Test_syncService(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
proxier := NewFakeProxier(ipt, ipvs, ipset, nil)
proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
if testCases[i].oldVirtualServer != nil {
@ -2736,7 +2735,7 @@ func buildFakeProxier() (*iptablestest.FakeIPTables, *Proxier) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
return ipt, NewFakeProxier(ipt, ipvs, ipset, nil)
return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil)
}
func hasJump(rules []iptablestest.Rule, destChain, ipSet string) bool {
@ -2806,33 +2805,10 @@ func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer)
}
func TestCleanLegacyService(t *testing.T) {
execer := exec.New()
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
excludeCIDRs := []string{"3.3.3.0/24", "4.4.4.0/24"}
proxier, err := NewProxier(
ipt,
ipvs,
ipset,
NewFakeSysctl(),
execer,
250*time.Millisecond,
100*time.Millisecond,
excludeCIDRs,
false,
0,
"10.0.0.0/24",
testHostname,
net.ParseIP("127.0.0.1"),
nil,
nil,
DefaultScheduler,
make([]string, 0),
)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
fp := NewFakeProxier(ipt, ipvs, ipset, nil, []string{"3.3.3.0/24", "4.4.4.0/24"})
// All ipvs services that were processed in the latest sync loop.
activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
@ -2888,15 +2864,22 @@ func TestCleanLegacyService(t *testing.T) {
},
}
for v := range currentServices {
proxier.ipvs.AddVirtualServer(currentServices[v])
fp.ipvs.AddVirtualServer(currentServices[v])
}
proxier.cleanLegacyService(activeServices, currentServices)
fp.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
activeBindAddrs := map[string]bool{"1.1.1.1": true, "2.2.2.2": true, "3.3.3.3": true, "4.4.4.4": true}
currentBindAddrs := []string{"1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", "5.5.5.5", "6.6.6.6"}
for i := range currentBindAddrs {
fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], DefaultDummyDevice)
}
fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"5.5.5.5": true, "6.6.6.6": true})
// ipvs4 and ipvs5 should have been cleaned.
remainingVirtualServers, _ := proxier.ipvs.GetVirtualServers()
remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
if len(remainingVirtualServers) != 4 {
t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 4, len(remainingVirtualServers))
}
for _, vs := range remainingVirtualServers {
// Checking that ipvs4 and ipvs5 were removed.
if vs.Port == 57 {
@ -2906,33 +2889,13 @@ func TestCleanLegacyService(t *testing.T) {
t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
}
}
}
func TestCleanLegacyBindAddr(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
// All ipvs service addresses that were bound to ipvs0 in the latest sync loop.
activeBindAddrs := map[string]bool{"1.2.3.4": true, "1002:ab8::2:1": true}
// All service addresses that were bound to ipvs0 in system
currentBindAddrs := []string{"1.2.3.4", "1.2.3.5", "1.2.3.6", "1002:ab8::2:1", "1002:ab8::2:2"}
fp.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
for i := range currentBindAddrs {
fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], DefaultDummyDevice)
}
fp.cleanLegacyBindAddr(activeBindAddrs, currentBindAddrs)
// Addresses 5.5.5.5 and 6.6.6.6 should not be bound any more
remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(DefaultDummyDevice)
// should only remain "1.2.3.4" and "1002:ab8::2:1"
if len(remainingAddrs) != 2 {
t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 2, len(remainingAddrs))
if len(remainingAddrs) != 4 {
t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 4, len(remainingAddrs))
}
// check that address "1.2.3.4" and "1002:ab8::2:1" remain
// check that address "1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4" are still bound
remainingAddrsMap := make(map[string]bool)
for i := range remainingAddrs {
remainingAddrsMap[remainingAddrs[i]] = true
@ -2940,13 +2903,14 @@ func TestCleanLegacyBindAddr(t *testing.T) {
if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) {
t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap)
}
}
func TestMultiPortServiceBindAddr(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
service1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP