diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index be72cc9ce2f..1dcb1830dd1 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -223,6 +223,13 @@ type Proxier struct { serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap nodeLabels map[string]string + // initialSync is a bool indicating if the proxier is syncing for the first time. + // It is set to true when a new proxier is initialized and then set to false on all + // future syncs. + // This lets us run specific logic that's required only during proxy startup. + // For eg: it enables us to update weights of existing destinations only on startup + // saving us the cost of querying and updating real servers during every sync. + initialSync bool // endpointSlicesSynced, and servicesSynced are set to true when // corresponding objects are synced after startup. This is used to avoid updating // ipvs rules with some partial data after kube-proxy restart. @@ -468,6 +475,7 @@ func NewProxier(ipt utiliptables.Interface, serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil), + initialSync: true, syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, excludeCIDRs: parsedExcludeCIDRs, @@ -1010,6 +1018,12 @@ func (proxier *Proxier) syncProxyRules() { return } + // its safe to set initialSync to false as it acts as a flag for startup actions + // and the mutex is held. + defer func() { + proxier.initialSync = false + }() + // Keep track of how long syncs take. start := time.Now() defer func() { @@ -2007,6 +2021,19 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } if curEndpoints.Has(ep) { + // if we are syncing for the first time, loop through all current destinations and + // reset their weight. + if proxier.initialSync { + for _, dest := range curDests { + if dest.Weight != newDest.Weight { + err = proxier.ipvs.UpdateRealServer(appliedVirtualServer, newDest) + if err != nil { + klog.ErrorS(err, "Failed to update destination", "newDest", newDest) + continue + } + } + } + } // check if newEndpoint is in gracefulDelete list, if true, delete this ep immediately uniqueRS := GetUniqueRSName(vs, newDest) if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) { @@ -2025,6 +2052,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode continue } } + // Delete old endpoints for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() { // if curEndpoint is in gracefulDelete, skip