Switch userspace proxy to be event based for services

This commit is contained in:
Wojciech Tyczynski
2017-05-04 15:28:15 +02:00
parent 12058c6a63
commit 33a7a288a5
3 changed files with 190 additions and 123 deletions

View File

@@ -395,97 +395,124 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
return si, nil
}
// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for _, service := range services {
// if ClusterIP is "None" or empty, skip proxying
if !helper.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
func (proxier *Proxier) mergeService(service *api.Service) sets.String {
if service == nil {
return nil
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if !helper.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
return nil
}
existingPorts := sets.NewString()
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
existingPorts.Insert(servicePort.Name)
info, exists := proxier.getServiceInfo(serviceName)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && sameConfig(info, service, servicePort) {
// Nothing changed.
continue
}
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
if err := proxier.closePortal(serviceName, info); err != nil {
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
if err := proxier.stopProxy(serviceName, info); err != nil {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
}
proxyPort, err := proxier.proxyPorts.AllocateNext()
if err != nil {
glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
continue
}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
activeServices[serviceName] = true
serviceIP := net.ParseIP(service.Spec.ClusterIP)
info, exists := proxier.getServiceInfo(serviceName)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && sameConfig(info, service, servicePort) {
// Nothing changed.
continue
}
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
err := proxier.closePortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
err = proxier.stopProxy(serviceName, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
}
proxyPort, err := proxier.proxyPorts.AllocateNext()
if err != nil {
glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err)
continue
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
}
info.portal.ip = serviceIP
info.portal.port = int(servicePort.Port)
info.externalIPs = service.Spec.ExternalIPs
// Deep-copy in case the service instance changes
info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.nodePort = int(servicePort.NodePort)
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(4).Infof("info: %#v", info)
err = proxier.openPortal(serviceName, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
}
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
serviceIP := net.ParseIP(service.Spec.ClusterIP)
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
}
info.portal.ip = serviceIP
info.portal.port = int(servicePort.Port)
info.externalIPs = service.Spec.ExternalIPs
// Deep-copy in case the service instance changes
info.loadBalancerStatus = *helper.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.nodePort = int(servicePort.NodePort)
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(4).Infof("info: %#v", info)
if err := proxier.openPortal(serviceName, info); err != nil {
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
}
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
return existingPorts
}
func (proxier *Proxier) unmergeService(service *api.Service, existingPorts sets.String) {
if service == nil {
return
}
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if !helper.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
return
}
staleUDPServices := sets.NewString()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Stopping service %q", name)
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String())
}
err := proxier.closePortal(name, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", name, err)
}
err = proxier.stopProxyInternal(name, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %v", name, err)
}
proxier.loadBalancer.DeleteService(name)
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if existingPorts.Has(servicePort.Name) {
continue
}
}
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
glog.V(1).Infof("Stopping service %q", serviceName)
info, exists := proxier.serviceMap[serviceName]
if !exists {
glog.Errorf("Service %q is being removed but doesn't exist", serviceName)
continue
}
if proxier.serviceMap[serviceName].protocol == api.ProtocolUDP {
staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String())
}
if err := proxier.closePortal(serviceName, info); err != nil {
glog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
if err := proxier.stopProxyInternal(serviceName, info); err != nil {
glog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
proxier.loadBalancer.DeleteService(serviceName)
}
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
}
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
_ = proxier.mergeService(service)
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
existingPorts := proxier.mergeService(service)
proxier.unmergeService(oldService, existingPorts)
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
proxier.unmergeService(service, sets.NewString())
}
func (proxier *Proxier) OnServiceSynced() {
}
func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool {
if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) {
return false