mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #38996 from dcbw/proxy-sync-fewer-services
Automatic merge from submit-queue proxy/iptables: don't sync proxy rules if services map didn't change Build the service map in a separate testable function. Return that map instead of changing proxier.serviceMap directly. Use reflect.DeepEqual() to skip syncing proxy rules if nothing actually changed. @thockin @kubernetes/rh-networking @kubernetes/sig-network-misc @timothysc @wojtek-t @jeremyeder @caseydavenport
This commit is contained in:
commit
3371766d0a
@ -40,8 +40,10 @@ go_test(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api:go_default_library",
|
"//pkg/api:go_default_library",
|
||||||
|
"//pkg/api/service:go_default_library",
|
||||||
"//pkg/proxy:go_default_library",
|
"//pkg/proxy:go_default_library",
|
||||||
"//pkg/util/exec:go_default_library",
|
"//pkg/util/exec:go_default_library",
|
||||||
|
"//pkg/util/intstr:go_default_library",
|
||||||
"//pkg/util/iptables:go_default_library",
|
"//pkg/util/iptables:go_default_library",
|
||||||
"//pkg/util/iptables/testing:go_default_library",
|
"//pkg/util/iptables/testing:go_default_library",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/types",
|
"//vendor:k8s.io/apimachinery/pkg/types",
|
||||||
|
@ -153,18 +153,44 @@ type endpointsInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// returns a new serviceInfo struct
|
// returns a new serviceInfo struct
|
||||||
func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
|
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
|
||||||
return &serviceInfo{
|
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
|
||||||
sessionAffinityType: api.ServiceAffinityNone, // default
|
info := &serviceInfo{
|
||||||
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
|
clusterIP: net.ParseIP(service.Spec.ClusterIP),
|
||||||
|
port: int(port.Port),
|
||||||
|
protocol: port.Protocol,
|
||||||
|
nodePort: int(port.NodePort),
|
||||||
|
// Deep-copy in case the service instance changes
|
||||||
|
loadBalancerStatus: *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer),
|
||||||
|
sessionAffinityType: service.Spec.SessionAffinity,
|
||||||
|
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
|
||||||
|
externalIPs: make([]string, len(service.Spec.ExternalIPs)),
|
||||||
|
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
|
||||||
|
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
||||||
}
|
}
|
||||||
|
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
|
||||||
|
copy(info.externalIPs, service.Spec.ExternalIPs)
|
||||||
|
|
||||||
|
if info.onlyNodeLocalEndpoints {
|
||||||
|
p := apiservice.GetServiceHealthCheckNodePort(service)
|
||||||
|
if p == 0 {
|
||||||
|
glog.Errorf("Service does not contain necessary annotation %v",
|
||||||
|
apiservice.BetaAnnotationHealthCheckNodePort)
|
||||||
|
} else {
|
||||||
|
info.healthCheckNodePort = int(p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return info
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
||||||
|
|
||||||
// Proxier is an iptables based proxy for connections between a localhost:lport
|
// Proxier is an iptables based proxy for connections between a localhost:lport
|
||||||
// and services that provide the actual backends.
|
// and services that provide the actual backends.
|
||||||
type Proxier struct {
|
type Proxier struct {
|
||||||
mu sync.Mutex // protects the following fields
|
mu sync.Mutex // protects the following fields
|
||||||
serviceMap map[proxy.ServicePortName]*serviceInfo
|
serviceMap proxyServiceMap
|
||||||
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
||||||
portsMap map[localPort]closeable
|
portsMap map[localPort]closeable
|
||||||
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
||||||
@ -278,7 +304,7 @@ func NewProxier(ipt utiliptables.Interface,
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &Proxier{
|
return &Proxier{
|
||||||
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
serviceMap: make(proxyServiceMap),
|
||||||
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
|
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
|
||||||
portsMap: make(map[localPort]closeable),
|
portsMap: make(map[localPort]closeable),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
@ -382,44 +408,6 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
|
|||||||
return encounteredError
|
return encounteredError
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
|
|
||||||
if info.protocol != port.Protocol || info.port != int(port.Port) || info.nodePort != int(port.NodePort) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if info.sessionAffinityType != service.Spec.SessionAffinity {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly()
|
|
||||||
if info.onlyNodeLocalEndpoints != onlyNodeLocalEndpoints {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func ipsEqual(lhs, rhs []string) bool {
|
|
||||||
if len(lhs) != len(rhs) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for i := range lhs {
|
|
||||||
if lhs[i] != rhs[i] {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync is called to immediately synchronize the proxier state to iptables
|
// Sync is called to immediately synchronize the proxier state to iptables
|
||||||
func (proxier *Proxier) Sync() {
|
func (proxier *Proxier) Sync() {
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
@ -438,18 +426,18 @@ func (proxier *Proxier) SyncLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnServiceUpdate tracks the active set of service proxies.
|
type healthCheckPort struct {
|
||||||
// They will be synchronized using syncProxyRules()
|
namespace types.NamespacedName
|
||||||
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
nodeport int
|
||||||
start := time.Now()
|
}
|
||||||
defer func() {
|
|
||||||
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
|
|
||||||
}()
|
|
||||||
proxier.mu.Lock()
|
|
||||||
defer proxier.mu.Unlock()
|
|
||||||
proxier.haveReceivedServiceUpdate = true
|
|
||||||
|
|
||||||
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
|
// Accepts a list of Services and the existing service map. Returns the new
|
||||||
|
// service map, a list of healthcheck ports to add to or remove from the health
|
||||||
|
// checking listener service, and a set of stale UDP services.
|
||||||
|
func buildServiceMap(allServices []api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) {
|
||||||
|
newServiceMap := make(proxyServiceMap)
|
||||||
|
healthCheckAdd := make([]healthCheckPort, 0)
|
||||||
|
healthCheckDel := make([]healthCheckPort, 0)
|
||||||
|
|
||||||
for i := range allServices {
|
for i := range allServices {
|
||||||
service := &allServices[i]
|
service := &allServices[i]
|
||||||
@ -463,6 +451,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
|||||||
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
|
||||||
|
if service.Spec.Type == api.ServiceTypeExternalName {
|
||||||
|
glog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
@ -471,72 +464,80 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
|||||||
NamespacedName: svcName,
|
NamespacedName: svcName,
|
||||||
Port: servicePort.Name,
|
Port: servicePort.Name,
|
||||||
}
|
}
|
||||||
activeServices[serviceName] = true
|
|
||||||
info, exists := proxier.serviceMap[serviceName]
|
|
||||||
if exists && proxier.sameConfig(info, service, servicePort) {
|
|
||||||
// Nothing changed.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if exists {
|
|
||||||
// Something changed.
|
|
||||||
glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
|
|
||||||
delete(proxier.serviceMap, serviceName)
|
|
||||||
}
|
|
||||||
serviceIP := net.ParseIP(service.Spec.ClusterIP)
|
|
||||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
|
|
||||||
info = newServiceInfo(serviceName)
|
|
||||||
info.clusterIP = serviceIP
|
|
||||||
info.port = int(servicePort.Port)
|
|
||||||
info.protocol = servicePort.Protocol
|
|
||||||
info.nodePort = int(servicePort.NodePort)
|
|
||||||
info.externalIPs = service.Spec.ExternalIPs
|
|
||||||
// Deep-copy in case the service instance changes
|
|
||||||
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
|
||||||
info.sessionAffinityType = service.Spec.SessionAffinity
|
|
||||||
info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
|
|
||||||
info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
|
|
||||||
if info.onlyNodeLocalEndpoints {
|
|
||||||
p := apiservice.GetServiceHealthCheckNodePort(service)
|
|
||||||
if p == 0 {
|
|
||||||
glog.Errorf("Service does not contain necessary annotation %v",
|
|
||||||
apiservice.BetaAnnotationHealthCheckNodePort)
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Adding health check for %+v, port %v", serviceName.NamespacedName, p)
|
|
||||||
info.healthCheckNodePort = int(p)
|
|
||||||
// Turn on healthcheck responder to listen on the health check nodePort
|
|
||||||
healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.V(4).Infof("Deleting health check for %+v", serviceName.NamespacedName)
|
|
||||||
// Delete healthcheck responders, if any, previously listening for this service
|
|
||||||
healthcheck.DeleteServiceListener(serviceName.NamespacedName, 0)
|
|
||||||
}
|
|
||||||
proxier.serviceMap[serviceName] = info
|
|
||||||
|
|
||||||
|
info := newServiceInfo(serviceName, servicePort, service)
|
||||||
|
oldInfo, exists := oldServiceMap[serviceName]
|
||||||
|
equal := reflect.DeepEqual(info, oldInfo)
|
||||||
|
if !exists {
|
||||||
|
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
||||||
|
} else if !equal {
|
||||||
|
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !exists || !equal {
|
||||||
|
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
|
||||||
|
healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort})
|
||||||
|
} else {
|
||||||
|
healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newServiceMap[serviceName] = info
|
||||||
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
|
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
staleUDPServices := sets.NewString()
|
staleUDPServices := sets.NewString()
|
||||||
// Remove serviceports missing from the update.
|
// Remove serviceports missing from the update.
|
||||||
for name, info := range proxier.serviceMap {
|
for name, info := range oldServiceMap {
|
||||||
if !activeServices[name] {
|
if _, exists := newServiceMap[name]; !exists {
|
||||||
glog.V(1).Infof("Removing service %q", name)
|
glog.V(1).Infof("Removing service %q", name)
|
||||||
if info.protocol == api.ProtocolUDP {
|
if info.protocol == api.ProtocolUDP {
|
||||||
staleUDPServices.Insert(info.clusterIP.String())
|
staleUDPServices.Insert(info.clusterIP.String())
|
||||||
}
|
}
|
||||||
delete(proxier.serviceMap, name)
|
|
||||||
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
|
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
|
||||||
// Remove ServiceListener health check nodePorts from the health checker
|
healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort})
|
||||||
// TODO - Stats
|
|
||||||
glog.V(4).Infof("Deleting health check for %+v, port %v", name.NamespacedName, info.healthCheckNodePort)
|
|
||||||
healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
proxier.syncProxyRules()
|
|
||||||
proxier.deleteServiceConnections(staleUDPServices.List())
|
|
||||||
|
|
||||||
|
return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnServiceUpdate tracks the active set of service proxies.
|
||||||
|
// They will be synchronized using syncProxyRules()
|
||||||
|
func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
|
||||||
|
}()
|
||||||
|
proxier.mu.Lock()
|
||||||
|
defer proxier.mu.Unlock()
|
||||||
|
proxier.haveReceivedServiceUpdate = true
|
||||||
|
|
||||||
|
newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap)
|
||||||
|
for _, hc := range hcAdd {
|
||||||
|
glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport)
|
||||||
|
// Turn on healthcheck responder to listen on the health check nodePort
|
||||||
|
// FIXME: handle failures from adding the service
|
||||||
|
healthcheck.AddServiceListener(hc.namespace, hc.nodeport)
|
||||||
|
}
|
||||||
|
for _, hc := range hcDel {
|
||||||
|
// Remove ServiceListener health check nodePorts from the health checker
|
||||||
|
// TODO - Stats
|
||||||
|
glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport)
|
||||||
|
// FIXME: handle failures from deleting the service
|
||||||
|
healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) {
|
||||||
|
proxier.serviceMap = newServiceMap
|
||||||
|
proxier.syncProxyRules()
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed")
|
||||||
|
}
|
||||||
|
|
||||||
|
proxier.deleteServiceConnections(staleUDPServices.List())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate a list of ip strings from the list of endpoint infos
|
// Generate a list of ip strings from the list of endpoint infos
|
||||||
|
@ -25,8 +25,10 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/service"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
"k8s.io/kubernetes/pkg/util/exec"
|
"k8s.io/kubernetes/pkg/util/exec"
|
||||||
|
"k8s.io/kubernetes/pkg/util/intstr"
|
||||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||||
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
|
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
|
||||||
)
|
)
|
||||||
@ -883,4 +885,290 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeTestService(namespace, name string, svcFunc func(*api.Service)) api.Service {
|
||||||
|
svc := api.Service{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: namespace,
|
||||||
|
},
|
||||||
|
Spec: api.ServiceSpec{},
|
||||||
|
Status: api.ServiceStatus{},
|
||||||
|
}
|
||||||
|
svcFunc(&svc)
|
||||||
|
return svc
|
||||||
|
}
|
||||||
|
|
||||||
|
func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort {
|
||||||
|
svcPort := api.ServicePort{
|
||||||
|
Name: name,
|
||||||
|
Protocol: protocol,
|
||||||
|
Port: port,
|
||||||
|
NodePort: nodeport,
|
||||||
|
TargetPort: intstr.FromInt(targetPort),
|
||||||
|
}
|
||||||
|
return append(array, svcPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||||
|
services := []api.Service{
|
||||||
|
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
|
||||||
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||||
|
svc.Spec.ClusterIP = "172.16.55.4"
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "UDP", 1235, 5321, 0)
|
||||||
|
}),
|
||||||
|
makeTestService("somewhere-else", "node-port", func(svc *api.Service) {
|
||||||
|
svc.Spec.Type = api.ServiceTypeNodePort
|
||||||
|
svc.Spec.ClusterIP = "172.16.55.10"
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blahblah", "UDP", 345, 678, 0)
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "moreblahblah", "TCP", 344, 677, 0)
|
||||||
|
}),
|
||||||
|
makeTestService("somewhere", "load-balancer", func(svc *api.Service) {
|
||||||
|
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
|
svc.Spec.ClusterIP = "172.16.55.11"
|
||||||
|
svc.Spec.LoadBalancerIP = "5.6.7.8"
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
|
||||||
|
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
||||||
|
Ingress: []api.LoadBalancerIngress{
|
||||||
|
{IP: "10.1.2.4"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
makeTestService("somewhere", "only-local-load-balancer", func(svc *api.Service) {
|
||||||
|
svc.ObjectMeta.Annotations = map[string]string{
|
||||||
|
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
|
||||||
|
service.BetaAnnotationHealthCheckNodePort: "345",
|
||||||
|
}
|
||||||
|
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
|
svc.Spec.ClusterIP = "172.16.55.12"
|
||||||
|
svc.Spec.LoadBalancerIP = "5.6.7.8"
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
|
||||||
|
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
||||||
|
Ingress: []api.LoadBalancerIngress{
|
||||||
|
{IP: "10.1.2.3"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap))
|
||||||
|
if len(serviceMap) != 8 {
|
||||||
|
t.Errorf("expected service map length 8, got %v", serviceMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The only-local-loadbalancer ones get added
|
||||||
|
if len(hcAdd) != 2 {
|
||||||
|
t.Errorf("expected healthcheck add length 2, got %v", hcAdd)
|
||||||
|
} else {
|
||||||
|
for _, hc := range hcAdd {
|
||||||
|
if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" {
|
||||||
|
t.Errorf("unexpected healthcheck listener added: %v", hc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// All the rest get deleted
|
||||||
|
if len(hcDel) != 6 {
|
||||||
|
t.Errorf("expected healthcheck del length 6, got %v", hcDel)
|
||||||
|
} else {
|
||||||
|
for _, hc := range hcDel {
|
||||||
|
if hc.namespace.Namespace == "somewhere" && hc.namespace.Name == "only-local-load-balancer" {
|
||||||
|
t.Errorf("unexpected healthcheck listener deleted: %v", hc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(staleUDPServices) != 0 {
|
||||||
|
// Services only added, so nothing stale yet
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove some stuff
|
||||||
|
services = []api.Service{services[0]}
|
||||||
|
services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]}
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(services, serviceMap)
|
||||||
|
if len(serviceMap) != 1 {
|
||||||
|
t.Errorf("expected service map length 1, got %v", serviceMap)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(hcAdd) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 1, got %v", hcAdd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The only OnlyLocal annotation was removed above, so we expect a delete now.
|
||||||
|
// FIXME: Since the BetaAnnotationHealthCheckNodePort is the same for all
|
||||||
|
// ServicePorts, we'll get one delete per ServicePort, even though they all
|
||||||
|
// contain the same information
|
||||||
|
if len(hcDel) != 2 {
|
||||||
|
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
|
||||||
|
} else {
|
||||||
|
for _, hc := range hcDel {
|
||||||
|
if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" {
|
||||||
|
t.Errorf("unexpected healthcheck listener deleted: %v", hc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||||
|
// from the three deleted services here, we still have the ClusterIP for
|
||||||
|
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||||
|
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
||||||
|
if len(staleUDPServices) != len(expectedStaleUDPServices) {
|
||||||
|
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), staleUDPServices.List())
|
||||||
|
}
|
||||||
|
for _, ip := range expectedStaleUDPServices {
|
||||||
|
if !staleUDPServices.Has(ip) {
|
||||||
|
t.Errorf("expected stale UDP service service %s", ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||||
|
services := []api.Service{
|
||||||
|
makeTestService("somewhere-else", "headless", func(svc *api.Service) {
|
||||||
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||||
|
svc.Spec.ClusterIP = api.ClusterIPNone
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Headless service should be ignored
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap))
|
||||||
|
if len(serviceMap) != 0 {
|
||||||
|
t.Errorf("expected service map length 0, got %d", len(serviceMap))
|
||||||
|
}
|
||||||
|
|
||||||
|
// No proxied services, so no healthchecks
|
||||||
|
if len(hcAdd) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 0, got %d", len(hcAdd))
|
||||||
|
}
|
||||||
|
if len(hcDel) != 0 {
|
||||||
|
t.Errorf("expected healthcheck del length 0, got %d", len(hcDel))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(staleUDPServices) != 0 {
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||||
|
services := []api.Service{
|
||||||
|
makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
|
||||||
|
svc.Spec.Type = api.ServiceTypeExternalName
|
||||||
|
svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
|
||||||
|
svc.Spec.ExternalName = "foo2.bar.com"
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(services, make(proxyServiceMap))
|
||||||
|
if len(serviceMap) != 0 {
|
||||||
|
t.Errorf("expected service map length 0, got %v", serviceMap)
|
||||||
|
}
|
||||||
|
// No proxied services, so no healthchecks
|
||||||
|
if len(hcAdd) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
|
||||||
|
}
|
||||||
|
if len(hcDel) != 0 {
|
||||||
|
t.Errorf("expected healthcheck del length 0, got %v", hcDel)
|
||||||
|
}
|
||||||
|
if len(staleUDPServices) != 0 {
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||||
|
first := []api.Service{
|
||||||
|
makeTestService("somewhere", "some-service", func(svc *api.Service) {
|
||||||
|
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||||
|
svc.Spec.ClusterIP = "172.16.55.4"
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
second := []api.Service{
|
||||||
|
makeTestService("somewhere", "some-service", func(svc *api.Service) {
|
||||||
|
svc.ObjectMeta.Annotations = map[string]string{
|
||||||
|
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
|
||||||
|
service.BetaAnnotationHealthCheckNodePort: "345",
|
||||||
|
}
|
||||||
|
svc.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
|
svc.Spec.ClusterIP = "172.16.55.4"
|
||||||
|
svc.Spec.LoadBalancerIP = "5.6.7.8"
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 7002)
|
||||||
|
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 7003)
|
||||||
|
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
||||||
|
Ingress: []api.LoadBalancerIngress{
|
||||||
|
{IP: "10.1.2.3"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(first, make(proxyServiceMap))
|
||||||
|
if len(serviceMap) != 2 {
|
||||||
|
t.Errorf("expected service map length 2, got %v", serviceMap)
|
||||||
|
}
|
||||||
|
if len(hcAdd) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
|
||||||
|
}
|
||||||
|
if len(hcDel) != 2 {
|
||||||
|
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
|
||||||
|
}
|
||||||
|
if len(staleUDPServices) != 0 {
|
||||||
|
// Services only added, so nothing stale yet
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change service to load-balancer
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap)
|
||||||
|
if len(serviceMap) != 2 {
|
||||||
|
t.Errorf("expected service map length 2, got %v", serviceMap)
|
||||||
|
}
|
||||||
|
if len(hcAdd) != 2 {
|
||||||
|
t.Errorf("expected healthcheck add length 2, got %v", hcAdd)
|
||||||
|
}
|
||||||
|
if len(hcDel) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
|
||||||
|
}
|
||||||
|
if len(staleUDPServices) != 0 {
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
|
||||||
|
}
|
||||||
|
|
||||||
|
// No change; make sure the service map stays the same and there are
|
||||||
|
// no health-check changes
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(second, serviceMap)
|
||||||
|
if len(serviceMap) != 2 {
|
||||||
|
t.Errorf("expected service map length 2, got %v", serviceMap)
|
||||||
|
}
|
||||||
|
if len(hcAdd) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
|
||||||
|
}
|
||||||
|
if len(hcDel) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 2, got %v", hcDel)
|
||||||
|
}
|
||||||
|
if len(staleUDPServices) != 0 {
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
|
||||||
|
}
|
||||||
|
|
||||||
|
// And back to ClusterIP
|
||||||
|
serviceMap, hcAdd, hcDel, staleUDPServices = buildServiceMap(first, serviceMap)
|
||||||
|
if len(serviceMap) != 2 {
|
||||||
|
t.Errorf("expected service map length 2, got %v", serviceMap)
|
||||||
|
}
|
||||||
|
if len(hcAdd) != 0 {
|
||||||
|
t.Errorf("expected healthcheck add length 0, got %v", hcAdd)
|
||||||
|
}
|
||||||
|
if len(hcDel) != 2 {
|
||||||
|
t.Errorf("expected healthcheck del length 2, got %v", hcDel)
|
||||||
|
}
|
||||||
|
if len(staleUDPServices) != 0 {
|
||||||
|
// Services only added, so nothing stale yet
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|
||||||
|
Loading…
Reference in New Issue
Block a user