mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-03 18:27:49 +00:00
Merge pull request #111635 from aryan9600/ipvs-restart
Fix IPVS proxier to update stale real server after restart
This commit is contained in:
@@ -223,6 +223,13 @@ type Proxier struct {
|
|||||||
serviceMap proxy.ServiceMap
|
serviceMap proxy.ServiceMap
|
||||||
endpointsMap proxy.EndpointsMap
|
endpointsMap proxy.EndpointsMap
|
||||||
nodeLabels map[string]string
|
nodeLabels map[string]string
|
||||||
|
// initialSync is a bool indicating if the proxier is syncing for the first time.
|
||||||
|
// It is set to true when a new proxier is initialized and then set to false on all
|
||||||
|
// future syncs.
|
||||||
|
// This lets us run specific logic that's required only during proxy startup.
|
||||||
|
// For eg: it enables us to update weights of existing destinations only on startup
|
||||||
|
// saving us the cost of querying and updating real servers during every sync.
|
||||||
|
initialSync bool
|
||||||
// endpointSlicesSynced, and servicesSynced are set to true when
|
// endpointSlicesSynced, and servicesSynced are set to true when
|
||||||
// corresponding objects are synced after startup. This is used to avoid updating
|
// corresponding objects are synced after startup. This is used to avoid updating
|
||||||
// ipvs rules with some partial data after kube-proxy restart.
|
// ipvs rules with some partial data after kube-proxy restart.
|
||||||
@@ -468,6 +475,7 @@ func NewProxier(ipt utiliptables.Interface,
|
|||||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
|
||||||
endpointsMap: make(proxy.EndpointsMap),
|
endpointsMap: make(proxy.EndpointsMap),
|
||||||
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
|
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
|
||||||
|
initialSync: true,
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
minSyncPeriod: minSyncPeriod,
|
minSyncPeriod: minSyncPeriod,
|
||||||
excludeCIDRs: parsedExcludeCIDRs,
|
excludeCIDRs: parsedExcludeCIDRs,
|
||||||
@@ -1010,6 +1018,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// its safe to set initialSync to false as it acts as a flag for startup actions
|
||||||
|
// and the mutex is held.
|
||||||
|
defer func() {
|
||||||
|
proxier.initialSync = false
|
||||||
|
}()
|
||||||
|
|
||||||
// Keep track of how long syncs take.
|
// Keep track of how long syncs take.
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -2007,6 +2021,19 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
|||||||
}
|
}
|
||||||
|
|
||||||
if curEndpoints.Has(ep) {
|
if curEndpoints.Has(ep) {
|
||||||
|
// if we are syncing for the first time, loop through all current destinations and
|
||||||
|
// reset their weight.
|
||||||
|
if proxier.initialSync {
|
||||||
|
for _, dest := range curDests {
|
||||||
|
if dest.Weight != newDest.Weight {
|
||||||
|
err = proxier.ipvs.UpdateRealServer(appliedVirtualServer, newDest)
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "Failed to update destination", "newDest", newDest)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
|
// check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately
|
||||||
uniqueRS := GetUniqueRSName(vs, newDest)
|
uniqueRS := GetUniqueRSName(vs, newDest)
|
||||||
if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
|
if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
|
||||||
@@ -2025,6 +2052,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete old endpoints
|
// Delete old endpoints
|
||||||
for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
|
for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
|
||||||
// if curEndpoint is in gracefulDelete, skip
|
// if curEndpoint is in gracefulDelete, skip
|
||||||
|
@@ -178,6 +178,15 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
|
|||||||
proxier.servicesSynced = true
|
proxier.servicesSynced = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeEndpointSliceMap(proxier *Proxier, allEpSlices ...*discovery.EndpointSlice) {
|
||||||
|
for i := range allEpSlices {
|
||||||
|
proxier.OnEndpointSliceAdd(allEpSlices[i])
|
||||||
|
}
|
||||||
|
proxier.mu.Lock()
|
||||||
|
defer proxier.mu.Unlock()
|
||||||
|
proxier.endpointSlicesSynced = true
|
||||||
|
}
|
||||||
|
|
||||||
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
|
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
|
||||||
svc := &v1.Service{
|
svc := &v1.Service{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
@@ -1379,6 +1388,88 @@ func TestNodePortIPv6(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_syncEndpoint_updateWeightsOnRestart(t *testing.T) {
|
||||||
|
tcpProtocol := v1.ProtocolTCP
|
||||||
|
|
||||||
|
ipt := iptablestest.NewFake()
|
||||||
|
ipvs := ipvstest.NewFake()
|
||||||
|
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||||
|
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
|
||||||
|
|
||||||
|
svc1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
|
||||||
|
svc.Spec.ClusterIP = "10.20.30.41"
|
||||||
|
svc.Spec.Ports = []v1.ServicePort{{
|
||||||
|
Name: "p80",
|
||||||
|
Port: int32(80),
|
||||||
|
Protocol: v1.ProtocolTCP,
|
||||||
|
}}
|
||||||
|
})
|
||||||
|
epSlice1 := makeTestEndpointSlice("ns1", "svc1", 1, func(eps *discovery.EndpointSlice) {
|
||||||
|
eps.AddressType = discovery.AddressTypeIPv4
|
||||||
|
eps.Endpoints = []discovery.Endpoint{{
|
||||||
|
Addresses: []string{"10.180.0.1"},
|
||||||
|
}}
|
||||||
|
eps.Ports = []discovery.EndpointPort{{
|
||||||
|
Name: pointer.StringPtr("p80"),
|
||||||
|
Port: pointer.Int32(80),
|
||||||
|
Protocol: &tcpProtocol,
|
||||||
|
}}
|
||||||
|
})
|
||||||
|
|
||||||
|
// sync proxy rules to get to the desired initial state
|
||||||
|
makeServiceMap(fp, svc1)
|
||||||
|
makeEndpointSliceMap(fp, epSlice1)
|
||||||
|
fp.syncProxyRules()
|
||||||
|
|
||||||
|
serv := &utilipvs.VirtualServer{
|
||||||
|
Address: netutils.ParseIPSloppy("10.20.30.41"),
|
||||||
|
Port: uint16(80),
|
||||||
|
Protocol: string(tcpProtocol),
|
||||||
|
Scheduler: fp.ipvsScheduler,
|
||||||
|
}
|
||||||
|
|
||||||
|
vs, err := fp.ipvs.GetVirtualServer(serv)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to get virtual server, err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rss, err := fp.ipvs.GetRealServers(vs)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to get real servers, err: %v", err)
|
||||||
|
}
|
||||||
|
for _, rs := range rss {
|
||||||
|
rs.Weight = 0
|
||||||
|
if err = fp.ipvs.UpdateRealServer(vs, rs); err != nil {
|
||||||
|
t.Errorf("failed to update real server: %v, err: %v", rs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// simulate a restart by enabling initial sync logic.
|
||||||
|
fp.initialSync = true
|
||||||
|
err = fp.syncEndpoint(proxy.ServicePortName{
|
||||||
|
NamespacedName: types.NamespacedName{
|
||||||
|
Name: "svc1",
|
||||||
|
Namespace: "ns1",
|
||||||
|
},
|
||||||
|
Port: "80",
|
||||||
|
Protocol: tcpProtocol,
|
||||||
|
}, true, vs)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to sync endpoint, err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rss, err = fp.ipvs.GetRealServers(vs)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("failed to get real server, err: %v", err)
|
||||||
|
}
|
||||||
|
for _, rs := range rss {
|
||||||
|
if rs.Weight != 1 {
|
||||||
|
t.Logf("unexpected realserver weight: %d, expected weight: 1", rs.Weight)
|
||||||
|
t.Errorf("unexpected realserver state")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestIPv4Proxier(t *testing.T) {
|
func TestIPv4Proxier(t *testing.T) {
|
||||||
tcpProtocol := v1.ProtocolTCP
|
tcpProtocol := v1.ProtocolTCP
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
Reference in New Issue
Block a user