mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
Check whether service changed
This commit is contained in:
parent
05ffcccdc1
commit
c0c41aa083
@ -205,8 +205,8 @@ type endpointsChangeMap struct {
|
||||
}
|
||||
|
||||
type serviceChange struct {
|
||||
previous *api.Service
|
||||
current *api.Service
|
||||
previous proxyServiceMap
|
||||
current proxyServiceMap
|
||||
}
|
||||
|
||||
type serviceChangeMap struct {
|
||||
@ -253,17 +253,60 @@ func newServiceChangeMap() serviceChangeMap {
|
||||
}
|
||||
}
|
||||
|
||||
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) {
|
||||
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool {
|
||||
scm.Lock()
|
||||
defer scm.Unlock()
|
||||
|
||||
change, exists := scm.items[*namespacedName]
|
||||
if !exists {
|
||||
change = &serviceChange{}
|
||||
change.previous = previous
|
||||
change.previous = serviceToServiceMap(previous)
|
||||
scm.items[*namespacedName] = change
|
||||
}
|
||||
change.current = current
|
||||
change.current = serviceToServiceMap(current)
|
||||
if reflect.DeepEqual(change.previous, change.current) {
|
||||
delete(scm.items, *namespacedName)
|
||||
return false
|
||||
}
|
||||
// TODO: Instead of returning true/false, we should consider returning whether
|
||||
// the map contains some element or not. Currently, if the change is
|
||||
// "reverting" some previous endpoints update, but there are still some other
|
||||
// modified endpoints, we will return false, even though there are some change
|
||||
// to apply.
|
||||
return true
|
||||
}
|
||||
|
||||
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
|
||||
existingPorts := sets.NewString()
|
||||
for serviceName, info := range other {
|
||||
existingPorts.Insert(serviceName.Port)
|
||||
_, exists := (*sm)[serviceName]
|
||||
if !exists {
|
||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol)
|
||||
} else {
|
||||
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, info.port, info.protocol)
|
||||
}
|
||||
(*sm)[serviceName] = info
|
||||
}
|
||||
return existingPorts
|
||||
}
|
||||
|
||||
func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts, staleServices sets.String) {
|
||||
for serviceName := range other {
|
||||
if existingPorts.Has(serviceName.Port) {
|
||||
continue
|
||||
}
|
||||
info, exists := (*sm)[serviceName]
|
||||
if exists {
|
||||
glog.V(1).Infof("Removing service %q", serviceName)
|
||||
if info.protocol == api.ProtocolUDP {
|
||||
staleServices.Insert(info.clusterIP.String())
|
||||
}
|
||||
delete(*sm, serviceName)
|
||||
} else {
|
||||
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
|
||||
@ -554,30 +597,48 @@ func (proxier *Proxier) SyncLoop() {
|
||||
|
||||
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
proxier.serviceChanges.update(&namespacedName, nil, service)
|
||||
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
if proxier.serviceChanges.update(&namespacedName, nil, service) {
|
||||
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||
// because it will early-return (to avoid resyncing iptables with partial
|
||||
// state right after kube-proxy restart). This can eat a token for calling
|
||||
// syncProxyRules, but is not that critical since it can happen only
|
||||
// after kube-proxy was (re)started.
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
proxier.serviceChanges.update(&namespacedName, oldService, service)
|
||||
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
if proxier.serviceChanges.update(&namespacedName, oldService, service) {
|
||||
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||
// because it will early-return (to avoid resyncing iptables with partial
|
||||
// state right after kube-proxy restart). This can eat a token for calling
|
||||
// syncProxyRules, but is not that critical since it can happen only
|
||||
// after kube-proxy was (re)started.
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
|
||||
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
proxier.serviceChanges.update(&namespacedName, service, nil)
|
||||
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
if proxier.serviceChanges.update(&namespacedName, service, nil) {
|
||||
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
|
||||
// services is not finished, it doesn't make sense to call syncProxyRules
|
||||
// because it will early-return (to avoid resyncing iptables with partial
|
||||
// state right after kube-proxy restart). This can eat a token for calling
|
||||
// syncProxyRules, but is not that critical since it can happen only
|
||||
// after kube-proxy was (re)started.
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceSynced() {
|
||||
proxier.mu.Lock()
|
||||
proxier.servicesSynced = true
|
||||
proxier.mu.Unlock()
|
||||
|
||||
// Call it unconditionally - this is called once per lifetime.
|
||||
proxier.syncProxyRules(syncReasonServices)
|
||||
}
|
||||
|
||||
@ -595,67 +656,6 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool
|
||||
return false
|
||||
}
|
||||
|
||||
func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String) {
|
||||
if service == nil {
|
||||
return false, nil
|
||||
}
|
||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if shouldSkipService(svcName, service) {
|
||||
return false, nil
|
||||
}
|
||||
syncRequired := false
|
||||
existingPorts := sets.NewString()
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
||||
existingPorts.Insert(servicePort.Name)
|
||||
info := newServiceInfo(serviceName, servicePort, service)
|
||||
oldInfo, exists := (*sm)[serviceName]
|
||||
equal := reflect.DeepEqual(info, oldInfo)
|
||||
if exists {
|
||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
||||
} else if !equal {
|
||||
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
||||
}
|
||||
if !equal {
|
||||
(*sm)[serviceName] = info
|
||||
syncRequired = true
|
||||
}
|
||||
}
|
||||
return syncRequired, existingPorts
|
||||
}
|
||||
|
||||
// <staleServices> are modified by this function with detected stale services.
|
||||
func (sm *proxyServiceMap) unmergeService(service *api.Service, existingPorts, staleServices sets.String) bool {
|
||||
if service == nil {
|
||||
return false
|
||||
}
|
||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if shouldSkipService(svcName, service) {
|
||||
return false
|
||||
}
|
||||
syncRequired := false
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
if existingPorts.Has(servicePort.Name) {
|
||||
continue
|
||||
}
|
||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
||||
info, exists := (*sm)[serviceName]
|
||||
if exists {
|
||||
glog.V(1).Infof("Removing service %q", serviceName)
|
||||
if info.protocol == api.ProtocolUDP {
|
||||
staleServices.Insert(info.clusterIP.String())
|
||||
}
|
||||
delete(*sm, serviceName)
|
||||
syncRequired = true
|
||||
} else {
|
||||
glog.Errorf("Service %q removed, but doesn't exists", serviceName)
|
||||
}
|
||||
}
|
||||
return syncRequired
|
||||
}
|
||||
|
||||
// <serviceMap> is updated by this function (based on the given changes).
|
||||
// <changes> map is cleared after applying them.
|
||||
func updateServiceMap(
|
||||
@ -668,9 +668,9 @@ func updateServiceMap(
|
||||
changes.Lock()
|
||||
defer changes.Unlock()
|
||||
for _, change := range changes.items {
|
||||
mergeSyncRequired, existingPorts := serviceMap.mergeService(change.current)
|
||||
unmergeSyncRequired := serviceMap.unmergeService(change.previous, existingPorts, staleServices)
|
||||
syncRequired = syncRequired || mergeSyncRequired || unmergeSyncRequired
|
||||
existingPorts := serviceMap.merge(change.current)
|
||||
serviceMap.unmerge(change.previous, existingPorts, staleServices)
|
||||
syncRequired = true
|
||||
}
|
||||
changes.items = make(map[types.NamespacedName]*serviceChange)
|
||||
}()
|
||||
@ -855,6 +855,27 @@ func endpointsToEndpointsMap(endpoints *api.Endpoints, hostname string) proxyEnd
|
||||
return endpointsMap
|
||||
}
|
||||
|
||||
// Translates single Service object to proxyServiceMap.
|
||||
//
|
||||
// NOTE: service object should NOT be modified.
|
||||
func serviceToServiceMap(service *api.Service) proxyServiceMap {
|
||||
if service == nil {
|
||||
return nil
|
||||
}
|
||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if shouldSkipService(svcName, service) {
|
||||
return nil
|
||||
}
|
||||
|
||||
serviceMap := make(proxyServiceMap)
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
||||
serviceMap[serviceName] = newServiceInfo(serviceName, servicePort, service)
|
||||
}
|
||||
return serviceMap
|
||||
}
|
||||
|
||||
// portProtoHash takes the ServicePortName and protocol for a service
|
||||
// returns the associated 16 character hash. This is computed by hashing (sha256)
|
||||
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
|
||||
|
Loading…
Reference in New Issue
Block a user