From 4cf9b3c3623411ee95e008ff1c6ffc81fd865441 Mon Sep 17 00:00:00 2001 From: Jitendra Bhurat Date: Thu, 3 Nov 2016 09:37:29 -0400 Subject: [PATCH 1/4] Added netsh util package. --- hack/.linted_packages | 1 + pkg/util/netsh/BUILD | 24 ++++ pkg/util/netsh/doc.go | 18 +++ pkg/util/netsh/netsh.go | 203 +++++++++++++++++++++++++++++++++ pkg/util/netsh/testing/BUILD | 18 +++ pkg/util/netsh/testing/fake.go | 68 +++++++++++ 6 files changed, 332 insertions(+) create mode 100644 pkg/util/netsh/BUILD create mode 100644 pkg/util/netsh/doc.go create mode 100644 pkg/util/netsh/netsh.go create mode 100644 pkg/util/netsh/testing/BUILD create mode 100644 pkg/util/netsh/testing/fake.go diff --git a/hack/.linted_packages b/hack/.linted_packages index b9ddb25b91c..51d525debcb 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -220,6 +220,7 @@ pkg/util/limitwriter pkg/util/logs pkg/util/maps pkg/util/metrics +pkg/util/netsh pkg/util/ratelimit pkg/util/replicaset pkg/util/validation/field diff --git a/pkg/util/netsh/BUILD b/pkg/util/netsh/BUILD new file mode 100644 index 00000000000..a73247d5d8b --- /dev/null +++ b/pkg/util/netsh/BUILD @@ -0,0 +1,24 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "netsh.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/util/exec:go_default_library", + "//vendor:github.com/golang/glog", + ], +) diff --git a/pkg/util/netsh/doc.go b/pkg/util/netsh/doc.go new file mode 100644 index 00000000000..eb628bc0350 --- /dev/null +++ b/pkg/util/netsh/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package netsh provides an interface and implementations for running Windows netsh commands. +package netsh // import "k8s.io/kubernetes/pkg/util/netsh" diff --git a/pkg/util/netsh/netsh.go b/pkg/util/netsh/netsh.go new file mode 100644 index 00000000000..47cd5e42ca5 --- /dev/null +++ b/pkg/util/netsh/netsh.go @@ -0,0 +1,203 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package netsh + +import ( + "fmt" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/golang/glog" + utilexec "k8s.io/kubernetes/pkg/util/exec" +) + +// Interface is an injectable interface for running netsh commands. Implementations must be goroutine-safe. +type Interface interface { + // EnsurePortProxyRule checks if the specified redirect exists, if not creates it + EnsurePortProxyRule(args []string) (bool, error) + // DeletePortProxyRule deletes the specified portproxy rule. If the rule did not exist, return error. + DeletePortProxyRule(args []string) error + // EnsureIPAddress checks if the specified IP Address is added to vEthernet (HNSTransparent) interface, if not, add it. If the address existed, return true. + EnsureIPAddress(args []string, ip net.IP) (bool, error) + // DeleteIPAddress checks if the specified IP address is present and, if so, deletes it. + DeleteIPAddress(args []string) error + // Restore runs `netsh exec` to restore portproxy or addresses using a file. + // TODO Check if this is required, most likely not + Restore(args []string) error + + // GetInterfaceToAddIP returns the interface name where Service IP needs to be added + // IP Address needs to be added for netsh portproxy to redirect traffic + // Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (HNSTransparent)" is returned + GetInterfaceToAddIP() string +} + +const ( + cmdNetsh string = "netsh" +) + +// runner implements Interface in terms of exec("netsh"). +type runner struct { + mu sync.Mutex + exec utilexec.Interface +} + +// New returns a new Interface which will exec netsh. +func New(exec utilexec.Interface) Interface { + runner := &runner{ + exec: exec, + } + return runner +} + +// EnsurePortProxyRule checks if the specified redirect exists, if not creates it. +func (runner *runner) EnsurePortProxyRule(args []string) (bool, error) { + glog.V(4).Infof("running netsh interface portproxy add v4tov4 %v", args) + out, err := runner.exec.Command(cmdNetsh, args...).CombinedOutput() + + if err == nil { + return true, nil + } + if ee, ok := err.(utilexec.ExitError); ok { + // netsh uses exit(0) to indicate a success of the operation, + // as compared to a malformed commandline, for example. + if ee.Exited() && ee.ExitStatus() != 0 { + return false, nil + } + } + return false, fmt.Errorf("error checking portproxy rule: %v: %s", err, out) + +} + +// DeletePortProxyRule deletes the specified portproxy rule. If the rule did not exist, return error. +func (runner *runner) DeletePortProxyRule(args []string) error { + glog.V(4).Infof("running netsh interface portproxy delete v4tov4 %v", args) + out, err := runner.exec.Command(cmdNetsh, args...).CombinedOutput() + + if err == nil { + return nil + } + if ee, ok := err.(utilexec.ExitError); ok { + // netsh uses exit(0) to indicate a success of the operation, + // as compared to a malformed commandline, for example. + if ee.Exited() && ee.ExitStatus() == 0 { + return nil + } + } + return fmt.Errorf("error deleting portproxy rule: %v: %s", err, out) +} + +// EnsureIPAddress checks if the specified IP Address is added to interface identified by Environment variable INTERFACE_TO_ADD_SERVICE_IP, if not, add it. If the address existed, return true. +func (runner *runner) EnsureIPAddress(args []string, ip net.IP) (bool, error) { + // Check if the ip address exists + intName := runner.GetInterfaceToAddIP() + argsShowAddress := []string{ + "interface", "ipv4", "show", "address", + "name=" + intName, + } + + ipToCheck := ip.String() + + exists, _ := checkIPExists(ipToCheck, argsShowAddress, runner) + if exists == true { + glog.V(4).Infof("not adding IP address %q as it already exists", ipToCheck) + return true, nil + } + + // IP Address is not already added, add it now + glog.V(4).Infof("running netsh interface ipv4 add address %v", args) + out, err := runner.exec.Command(cmdNetsh, args...).CombinedOutput() + + if err == nil { + // Once the IP Address is added, it takes a bit to initialize and show up when querying for it + // Query all the IP addresses and see if the one we added is present + // PS: We are using netsh interface ipv4 show address here to query all the IP addresses, instead of + // querying net.InterfaceAddrs() as it returns the IP address as soon as it is added even though it is uninitialized + glog.V(3).Infof("Waiting until IP: %v is added to the network adapter", ipToCheck) + for { + if exists, _ := checkIPExists(ipToCheck, argsShowAddress, runner); exists { + return true, nil + } + time.Sleep(500 * time.Millisecond) + } + } + if ee, ok := err.(utilexec.ExitError); ok { + // netsh uses exit(0) to indicate a success of the operation, + // as compared to a malformed commandline, for example. + if ee.Exited() && ee.ExitStatus() != 0 { + return false, nil + } + } + return false, fmt.Errorf("error adding ipv4 address: %v: %s", err, out) +} + +// DeleteIPAddress checks if the specified IP address is present and, if so, deletes it. +func (runner *runner) DeleteIPAddress(args []string) error { + glog.V(4).Infof("running netsh interface ipv4 delete address %v", args) + out, err := runner.exec.Command(cmdNetsh, args...).CombinedOutput() + + if err == nil { + return nil + } + if ee, ok := err.(utilexec.ExitError); ok { + // netsh uses exit(0) to indicate a success of the operation, + // as compared to a malformed commandline, for example. + if ee.Exited() && ee.ExitStatus() == 0 { + return nil + } + } + return fmt.Errorf("error deleting ipv4 address: %v: %s", err, out) +} + +// GetInterfaceToAddIP returns the interface name where Service IP needs to be added +// IP Address needs to be added for netsh portproxy to redirect traffic +// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (HNS Internal NIC)" is returned +func (runner *runner) GetInterfaceToAddIP() string { + if iface := os.Getenv("INTERFACE_TO_ADD_SERVICE_IP"); len(iface) > 0 { + return iface + } + return "vEthernet (HNS Internal NIC)" +} + +// Restore is part of Interface. +func (runner *runner) Restore(args []string) error { + return nil +} + +// checkIPExists checks if an IP address exists in 'netsh interface ipv4 show address' output +func checkIPExists(ipToCheck string, args []string, runner *runner) (bool, error) { + ipAddress, err := runner.exec.Command(cmdNetsh, args...).CombinedOutput() + if err != nil { + return false, err + } + ipAddressString := string(ipAddress[:]) + glog.V(3).Infof("Searching for IP: %v in IP dump: %v", ipToCheck, ipAddressString) + showAddressArray := strings.Split(ipAddressString, "\n") + for _, showAddress := range showAddressArray { + if strings.Contains(showAddress, "IP Address:") { + ipFromNetsh := strings.TrimLeft(showAddress, "IP Address:") + ipFromNetsh = strings.TrimSpace(ipFromNetsh) + if ipFromNetsh == ipToCheck { + return true, nil + } + } + } + + return false, nil +} diff --git a/pkg/util/netsh/testing/BUILD b/pkg/util/netsh/testing/BUILD new file mode 100644 index 00000000000..7dc29aeb348 --- /dev/null +++ b/pkg/util/netsh/testing/BUILD @@ -0,0 +1,18 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["fake.go"], + tags = ["automanaged"], + deps = ["//pkg/util/netsh:go_default_library"], +) diff --git a/pkg/util/netsh/testing/fake.go b/pkg/util/netsh/testing/fake.go new file mode 100644 index 00000000000..a0e5249611f --- /dev/null +++ b/pkg/util/netsh/testing/fake.go @@ -0,0 +1,68 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "net" + + "k8s.io/kubernetes/pkg/util/netsh" +) + +// no-op implementation of netsh Interface +type FakeNetsh struct { +} + +func NewFake() *FakeNetsh { + return &FakeNetsh{} +} + +func (*FakeNetsh) EnsurePortProxyRule(args []string) (bool, error) { + return true, nil +} + +// DeletePortProxyRule deletes the specified portproxy rule. If the rule did not exist, return error. +func (*FakeNetsh) DeletePortProxyRule(args []string) error { + // Do Nothing + return nil +} + +// EnsureIPAddress checks if the specified IP Address is added to vEthernet (HNSTransparent) interface, if not, add it. If the address existed, return true. +func (*FakeNetsh) EnsureIPAddress(args []string, ip net.IP) (bool, error) { + return true, nil +} + +// DeleteIPAddress checks if the specified IP address is present and, if so, deletes it. +func (*FakeNetsh) DeleteIPAddress(args []string) error { + // Do Nothing + return nil +} + +// Restore runs `netsh exec` to restore portproxy or addresses using a file. +// TODO Check if this is required, most likely not +func (*FakeNetsh) Restore(args []string) error { + // Do Nothing + return nil +} + +// GetInterfaceToAddIP returns the interface name where Service IP needs to be added +// IP Address needs to be added for netsh portproxy to redirect traffic +// Reads Environment variable INTERFACE_TO_ADD_SERVICE_IP, if it is not defined then "vEthernet (HNSTransparent)" is returned +func (*FakeNetsh) GetInterfaceToAddIP() string { + return "Interface 1" +} + +var _ = netsh.Interface(&FakeNetsh{}) From acf3f368bc3eaa9d43ea220879bde8b9e8717f04 Mon Sep 17 00:00:00 2001 From: Paulo Pires Date: Thu, 3 Nov 2016 10:39:23 -0400 Subject: [PATCH 2/4] Added new userspace proxy mode specifically for Windows. --- pkg/proxy/winuserspace/BUILD | 55 ++ pkg/proxy/winuserspace/loadbalancer.go | 34 + pkg/proxy/winuserspace/port_allocator.go | 153 ++++ pkg/proxy/winuserspace/port_allocator_test.go | 101 +++ pkg/proxy/winuserspace/proxier.go | 682 ++++++++++++++ pkg/proxy/winuserspace/proxier_test.go | 855 ++++++++++++++++++ pkg/proxy/winuserspace/proxysocket.go | 300 ++++++ pkg/proxy/winuserspace/roundrobin.go | 326 +++++++ pkg/proxy/winuserspace/roundrobin_test.go | 727 +++++++++++++++ pkg/proxy/winuserspace/udp_server.go | 47 + test/test_owners.csv | 1 + 11 files changed, 3281 insertions(+) create mode 100644 pkg/proxy/winuserspace/BUILD create mode 100644 pkg/proxy/winuserspace/loadbalancer.go create mode 100644 pkg/proxy/winuserspace/port_allocator.go create mode 100644 pkg/proxy/winuserspace/port_allocator_test.go create mode 100644 pkg/proxy/winuserspace/proxier.go create mode 100644 pkg/proxy/winuserspace/proxier_test.go create mode 100644 pkg/proxy/winuserspace/proxysocket.go create mode 100644 pkg/proxy/winuserspace/roundrobin.go create mode 100644 pkg/proxy/winuserspace/roundrobin_test.go create mode 100644 pkg/proxy/winuserspace/udp_server.go diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD new file mode 100644 index 00000000000..b9ca9108487 --- /dev/null +++ b/pkg/proxy/winuserspace/BUILD @@ -0,0 +1,55 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "loadbalancer.go", + "port_allocator.go", + "proxier.go", + "proxysocket.go", + "roundrobin.go", + "udp_server.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/proxy:go_default_library", + "//pkg/types:go_default_library", + "//pkg/util/errors:go_default_library", + "//pkg/util/net:go_default_library", + "//pkg/util/netsh:go_default_library", + "//pkg/util/runtime:go_default_library", + "//pkg/util/slice:go_default_library", + "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "port_allocator_test.go", + "proxier_test.go", + "roundrobin_test.go", + ], + library = "go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/proxy:go_default_library", + "//pkg/types:go_default_library", + "//pkg/util/net:go_default_library", + "//pkg/util/netsh/testing:go_default_library", + "//pkg/util/runtime:go_default_library", + ], +) diff --git a/pkg/proxy/winuserspace/loadbalancer.go b/pkg/proxy/winuserspace/loadbalancer.go new file mode 100644 index 00000000000..c649369e031 --- /dev/null +++ b/pkg/proxy/winuserspace/loadbalancer.go @@ -0,0 +1,34 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "net" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" +) + +// LoadBalancer is an interface for distributing incoming requests to service endpoints. +type LoadBalancer interface { + // NextEndpoint returns the endpoint to handle a request for the given + // service-port and source address. + NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) + NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error + DeleteService(service proxy.ServicePortName) + CleanupStaleStickySessions(service proxy.ServicePortName) +} diff --git a/pkg/proxy/winuserspace/port_allocator.go b/pkg/proxy/winuserspace/port_allocator.go new file mode 100644 index 00000000000..baeae9c9998 --- /dev/null +++ b/pkg/proxy/winuserspace/port_allocator.go @@ -0,0 +1,153 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "errors" + "math/big" + "math/rand" + "sync" + "time" + + "k8s.io/kubernetes/pkg/util/net" + "k8s.io/kubernetes/pkg/util/wait" +) + +var ( + errPortRangeNoPortsRemaining = errors.New("port allocation failed; there are no remaining ports left to allocate in the accepted range") +) + +type PortAllocator interface { + AllocateNext() (int, error) + Release(int) +} + +// randomAllocator is a PortAllocator implementation that allocates random ports, yielding +// a port value of 0 for every call to AllocateNext(). +type randomAllocator struct{} + +// AllocateNext always returns 0 +func (r *randomAllocator) AllocateNext() (int, error) { + return 0, nil +} + +// Release is a noop +func (r *randomAllocator) Release(_ int) { + // noop +} + +// newPortAllocator builds PortAllocator for a given PortRange. If the PortRange is empty +// then a random port allocator is returned; otherwise, a new range-based allocator +// is returned. +func newPortAllocator(r net.PortRange) PortAllocator { + if r.Base == 0 { + return &randomAllocator{} + } + return newPortRangeAllocator(r) +} + +const ( + portsBufSize = 16 + nextFreePortCooldown = 500 * time.Millisecond + allocateNextTimeout = 1 * time.Second +) + +type rangeAllocator struct { + net.PortRange + ports chan int + used big.Int + lock sync.Mutex + rand *rand.Rand +} + +func newPortRangeAllocator(r net.PortRange) PortAllocator { + if r.Base == 0 || r.Size == 0 { + panic("illegal argument: may not specify an empty port range") + } + ra := &rangeAllocator{ + PortRange: r, + ports: make(chan int, portsBufSize), + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + } + go wait.Until(func() { ra.fillPorts(wait.NeverStop) }, nextFreePortCooldown, wait.NeverStop) + return ra +} + +// fillPorts loops, always searching for the next free port and, if found, fills the ports buffer with it. +// this func blocks until either there are no remaining free ports, or else the stopCh chan is closed. +func (r *rangeAllocator) fillPorts(stopCh <-chan struct{}) { + for { + port := r.nextFreePort() + if port == -1 { + return + } + select { + case <-stopCh: + return + case r.ports <- port: + } + } +} + +// nextFreePort finds a free port, first picking a random port. if that port is already in use +// then the port range is scanned sequentially until either a port is found or the scan completes +// unsuccessfully. an unsuccessful scan returns a port of -1. +func (r *rangeAllocator) nextFreePort() int { + r.lock.Lock() + defer r.lock.Unlock() + + // choose random port + j := r.rand.Intn(r.Size) + if b := r.used.Bit(j); b == 0 { + r.used.SetBit(&r.used, j, 1) + return j + r.Base + } + + // search sequentially + for i := j + 1; i < r.Size; i++ { + if b := r.used.Bit(i); b == 0 { + r.used.SetBit(&r.used, i, 1) + return i + r.Base + } + } + for i := 0; i < j; i++ { + if b := r.used.Bit(i); b == 0 { + r.used.SetBit(&r.used, i, 1) + return i + r.Base + } + } + return -1 +} + +func (r *rangeAllocator) AllocateNext() (port int, err error) { + select { + case port = <-r.ports: + case <-time.After(allocateNextTimeout): + err = errPortRangeNoPortsRemaining + } + return +} + +func (r *rangeAllocator) Release(port int) { + port -= r.Base + if port < 0 || port >= r.Size { + return + } + r.lock.Lock() + defer r.lock.Unlock() + r.used.SetBit(&r.used, port, 0) +} diff --git a/pkg/proxy/winuserspace/port_allocator_test.go b/pkg/proxy/winuserspace/port_allocator_test.go new file mode 100644 index 00000000000..f68c2aec296 --- /dev/null +++ b/pkg/proxy/winuserspace/port_allocator_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "reflect" + "testing" + + "k8s.io/kubernetes/pkg/util/net" +) + +func TestRangeAllocatorEmpty(t *testing.T) { + r := &net.PortRange{} + r.Set("0-0") + defer func() { + if rv := recover(); rv == nil { + t.Fatalf("expected panic because of empty port range: %#v", r) + } + }() + _ = newPortRangeAllocator(*r) +} + +func TestRangeAllocatorFullyAllocated(t *testing.T) { + r := &net.PortRange{} + r.Set("1-1") + a := newPortRangeAllocator(*r) + p, err := a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p != 1 { + t.Fatalf("unexpected allocated port: %d", p) + } + + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } + + a.Release(p) + p, err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p != 1 { + t.Fatalf("unexpected allocated port: %d", p) + } + + _, err = a.AllocateNext() + if err == nil { + t.Fatalf("expected error because of fully-allocated range") + } +} + +func TestRangeAllocator_RandomishAllocation(t *testing.T) { + r := &net.PortRange{} + r.Set("1-100") + a := newPortRangeAllocator(*r) + + // allocate all the ports + var err error + ports := make([]int, 100, 100) + for i := 0; i < 100; i++ { + ports[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + // release them all + for i := 0; i < 100; i++ { + a.Release(ports[i]) + } + + // allocate the ports again + rports := make([]int, 100, 100) + for i := 0; i < 100; i++ { + rports[i], err = a.AllocateNext() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + } + + if reflect.DeepEqual(ports, rports) { + t.Fatalf("expected re-allocated ports to be in a somewhat random order") + } +} diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go new file mode 100644 index 00000000000..9aa0e394076 --- /dev/null +++ b/pkg/proxy/winuserspace/proxier.go @@ -0,0 +1,682 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "fmt" + "net" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/types" + utilerrors "k8s.io/kubernetes/pkg/util/errors" + utilnet "k8s.io/kubernetes/pkg/util/net" + "k8s.io/kubernetes/pkg/util/netsh" + "k8s.io/kubernetes/pkg/util/runtime" +) + +type portal struct { + ip net.IP + port int + isExternal bool +} + +type serviceInfo struct { + isAliveAtomic int32 // Only access this with atomic ops + portal portal + protocol api.Protocol + proxyPort int + socket proxySocket + timeout time.Duration + activeClients *clientCache + nodePort int + loadBalancerStatus api.LoadBalancerStatus + sessionAffinityType api.ServiceAffinity + stickyMaxAgeMinutes int + // Deprecated, but required for back-compat (including e2e) + externalIPs []string +} + +func (info *serviceInfo) setAlive(b bool) { + var i int32 + if b { + i = 1 + } + atomic.StoreInt32(&info.isAliveAtomic, i) +} + +func (info *serviceInfo) isAlive() bool { + return atomic.LoadInt32(&info.isAliveAtomic) != 0 +} + +func logTimeout(err error) bool { + if e, ok := err.(net.Error); ok { + if e.Timeout() { + glog.V(3).Infof("connection to endpoint closed due to inactivity") + return true + } + } + return false +} + +// Proxier is a simple proxy for TCP connections between a localhost:lport +// and services that provide the actual implementations. +type Proxier struct { + loadBalancer LoadBalancer + mu sync.Mutex // protects serviceMap + serviceMap map[proxy.ServicePortName]*serviceInfo + syncPeriod time.Duration + udpIdleTimeout time.Duration + portMapMutex sync.Mutex + portMap map[portMapKey]*portMapValue + numProxyLoops int32 // use atomic ops to access this; mostly for testing + listenIP net.IP + netsh netsh.Interface + hostIP net.IP + proxyPorts PortAllocator +} + +// assert Proxier is a ProxyProvider +var _ proxy.ProxyProvider = &Proxier{} + +// A key for the portMap. The ip has to be a string because slices can't be map +// keys. +type portMapKey struct { + ip string + port int + protocol api.Protocol +} + +func (k *portMapKey) String() string { + return fmt.Sprintf("%s:%d/%s", k.ip, k.port, k.protocol) +} + +// A value for the portMap +type portMapValue struct { + owner proxy.ServicePortName + socket interface { + Close() error + } +} + +var ( + // ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on + // the loopback address. May be checked for by callers of NewProxier to know whether + // the caller provided invalid input. + ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost") +) + +// IsProxyLocked returns true if the proxy could not acquire the lock on iptables. +func IsProxyLocked(err error) bool { + return strings.Contains(err.Error(), "holding the xtables lock") +} + +// Used below. +var localhostIPv4 = net.ParseIP("127.0.0.1") +var localhostIPv6 = net.ParseIP("::1") + +// NewProxier returns a new Proxier given a LoadBalancer and an address on +// which to listen. Because of the iptables logic, It is assumed that there +// is only a single Proxier active on a machine. An error will be returned if +// the proxier cannot be started due to an invalid ListenIP (loopback) or +// if iptables fails to update or acquire the initial lock. Once a proxier is +// created, it will keep iptables up to date in the background and will not +// terminate if a particular iptables call fails. +func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, pr utilnet.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { + if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) { + return nil, ErrProxyOnLocalhost + } + + hostIP, err := utilnet.ChooseHostInterface() + if err != nil { + return nil, fmt.Errorf("failed to select a host interface: %v", err) + } + + proxyPorts := newPortAllocator(pr) + + glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) + return createProxier(loadBalancer, listenIP, netsh, hostIP, proxyPorts, syncPeriod, udpIdleTimeout) +} + +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, netsh netsh.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { + // convenient to pass nil for tests.. + if proxyPorts == nil { + proxyPorts = newPortAllocator(utilnet.PortRange{}) + } + return &Proxier{ + loadBalancer: loadBalancer, + serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + portMap: make(map[portMapKey]*portMapValue), + syncPeriod: syncPeriod, + udpIdleTimeout: udpIdleTimeout, + listenIP: listenIP, + netsh: netsh, + hostIP: hostIP, + proxyPorts: proxyPorts, + }, nil +} + +// Sync is called to immediately synchronize the proxier state to iptables +func (proxier *Proxier) Sync() { + proxier.ensurePortals() + proxier.cleanupStaleStickySessions() +} + +// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. +func (proxier *Proxier) SyncLoop() { + t := time.NewTicker(proxier.syncPeriod) + defer t.Stop() + for { + <-t.C + glog.V(6).Infof("Periodic sync") + proxier.Sync() + } +} + +// Ensure that portals exist for all services. +func (proxier *Proxier) ensurePortals() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + // NB: This does not remove rules that should not be present. + for name, info := range proxier.serviceMap { + err := proxier.openPortal(name, info) + if err != nil { + glog.Errorf("Failed to ensure portal for %q: %v", name, err) + } + } +} + +// cleanupStaleStickySessions cleans up any stale sticky session records in the hash map. +func (proxier *Proxier) cleanupStaleStickySessions() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + for name := range proxier.serviceMap { + proxier.loadBalancer.CleanupStaleStickySessions(name) + } +} + +// This assumes proxier.mu is not locked. +func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error { + proxier.mu.Lock() + defer proxier.mu.Unlock() + return proxier.stopProxyInternal(service, info) +} + +// This assumes proxier.mu is locked. +func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error { + delete(proxier.serviceMap, service) + info.setAlive(false) + err := info.socket.Close() + port := info.socket.ListenPort() + proxier.proxyPorts.Release(port) + return err +} + +func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) { + proxier.mu.Lock() + defer proxier.mu.Unlock() + info, ok := proxier.serviceMap[service] + return info, ok +} + +func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) { + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.serviceMap[service] = info +} + +// addServiceOnPort starts listening for a new service, returning the serviceInfo. +// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP +// connections, for now. +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { + sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) + if err != nil { + return nil, err + } + _, portStr, err := net.SplitHostPort(sock.Addr().String()) + if err != nil { + sock.Close() + return nil, err + } + portNum, err := strconv.Atoi(portStr) + if err != nil { + sock.Close() + return nil, err + } + si := &serviceInfo{ + isAliveAtomic: 1, + proxyPort: portNum, + protocol: protocol, + socket: sock, + timeout: timeout, + activeClients: newClientCache(), + sessionAffinityType: api.ServiceAffinityNone, // default + stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API. + } + proxier.setServiceInfo(service, si) + + glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) + go func(service proxy.ServicePortName, proxier *Proxier) { + defer runtime.HandleCrash() + atomic.AddInt32(&proxier.numProxyLoops, 1) + sock.ProxyLoop(service, si, proxier) + atomic.AddInt32(&proxier.numProxyLoops, -1) + }(service, proxier) + + return si, nil +} + +// OnServiceUpdate manages the active set of service proxies. +// Active service proxies are reinitialized if found in the update set or +// shutdown if missing from the update set. +func (proxier *Proxier) OnServiceUpdate(services []api.Service) { + glog.V(4).Infof("Received update notice: %+v", services) + activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set + for i := range services { + service := &services[i] + + // if ClusterIP is "None" or empty, skip proxying + if !api.IsServiceIPSet(service) { + glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) + continue + } + + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name} + activeServices[serviceName] = true + serviceIP := net.ParseIP(service.Spec.ClusterIP) + info, exists := proxier.getServiceInfo(serviceName) + // TODO: check health of the socket? What if ProxyLoop exited? + if exists && sameConfig(info, service, servicePort) { + // Nothing changed. + continue + } + if exists { + glog.V(4).Infof("Something changed for service %q: stopping it", serviceName) + err := proxier.closePortal(serviceName, info) + if err != nil { + glog.Errorf("Failed to close portal for %q: %v", serviceName, err) + } + err = proxier.stopProxy(serviceName, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %v", serviceName, err) + } + } + + proxyPort, err := proxier.proxyPorts.AllocateNext() + if err != nil { + glog.Errorf("failed to allocate proxy port for service %q: %v", serviceName, err) + continue + } + + glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) + info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + if err != nil { + glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) + continue + } + info.portal.ip = serviceIP + info.portal.port = int(servicePort.Port) + info.externalIPs = service.Spec.ExternalIPs + // Deep-copy in case the service instance changes + info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) + info.nodePort = int(servicePort.NodePort) + info.sessionAffinityType = service.Spec.SessionAffinity + glog.V(4).Infof("info: %#v", info) + + err = proxier.openPortal(serviceName, info) + if err != nil { + glog.Errorf("Failed to open portal for %q: %v", serviceName, err) + } + proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes) + } + } + proxier.mu.Lock() + defer proxier.mu.Unlock() + for name, info := range proxier.serviceMap { + if !activeServices[name] { + glog.V(1).Infof("Stopping service %q", name) + err := proxier.closePortal(name, info) + if err != nil { + glog.Errorf("Failed to close portal for %q: %v", name, err) + } + err = proxier.stopProxyInternal(name, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %v", name, err) + } + proxier.loadBalancer.DeleteService(name) + } + } +} + +func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool { + if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) { + return false + } + if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) { + return false + } + if !ipsEqual(info.externalIPs, service.Spec.ExternalIPs) { + return false + } + if !api.LoadBalancerStatusEqual(&info.loadBalancerStatus, &service.Status.LoadBalancer) { + return false + } + if info.sessionAffinityType != service.Spec.SessionAffinity { + return false + } + return true +} + +func ipsEqual(lhs, rhs []string) bool { + if len(lhs) != len(rhs) { + return false + } + for i := range lhs { + if lhs[i] != rhs[i] { + return false + } + } + return true +} + +func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error { + err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) + if err != nil { + return err + } + for _, publicIP := range info.externalIPs { + err = proxier.openOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service) + if err != nil { + return err + } + } + for _, ingress := range info.loadBalancerStatus.Ingress { + if ingress.IP != "" { + err = proxier.openOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service) + if err != nil { + return err + } + } + } + if info.nodePort != 0 { + err = proxier.openNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service) + if err != nil { + return err + } + } + return nil +} + +func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { + if protocol == api.ProtocolUDP { + glog.Warningf("Not adding rule for %q on %s:%d as UDP protocol is not supported by netsh portproxy", name, portal.ip, portal.port) + return nil + } + + // Add IP address to "vEthernet (HNSTransparent)" so that portproxy could be used to redirect the traffic + args := proxier.netshIpv4AddressAddArgs(portal.ip) + existed, err := proxier.netsh.EnsureIPAddress(args, portal.ip) + + if err != nil { + glog.Errorf("Failed to add ip address for service %q, args:%v", name, args) + return err + } + if !existed { + glog.V(3).Infof("Added ip address to HNSTransparent interface for service %q on %s %s:%d", name, protocol, portal.ip, portal.port) + } + + args = proxier.netshPortProxyAddArgs(portal.ip, portal.port, proxyIP, proxyPort, name) + existed, err = proxier.netsh.EnsurePortProxyRule(args) + + if err != nil { + glog.Errorf("Failed to run portproxy rule for service %q, args:%v", name, args) + return err + } + if !existed { + glog.V(3).Infof("Added portproxy rule for service %q on %s %s:%d", name, protocol, portal.ip, portal.port) + } + + return nil +} + +// claimNodePort marks a port as being owned by a particular service, or returns error if already claimed. +// Idempotent: reclaiming with the same owner is not an error +func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { + proxier.portMapMutex.Lock() + defer proxier.portMapMutex.Unlock() + + // TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports + + key := portMapKey{ip: ip.String(), port: port, protocol: protocol} + existing, found := proxier.portMap[key] + if !found { + // Hold the actual port open, even though we use iptables to redirect + // it. This ensures that a) it's safe to take and b) that stays true. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + socket, err := newProxySocket(protocol, ip, port) + if err != nil { + return fmt.Errorf("can't open node port for %s: %v", key.String(), err) + } + proxier.portMap[key] = &portMapValue{owner: owner, socket: socket} + glog.V(2).Infof("Claimed local port %s", key.String()) + return nil + } + if existing.owner == owner { + // We are idempotent + return nil + } + return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing) +} + +// releaseNodePort releases a claim on a port. Returns an error if the owner does not match the claim. +// Tolerates release on an unclaimed port, to simplify . +func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error { + proxier.portMapMutex.Lock() + defer proxier.portMapMutex.Unlock() + + key := portMapKey{ip: ip.String(), port: port, protocol: protocol} + existing, found := proxier.portMap[key] + if !found { + // We tolerate this, it happens if we are cleaning up a failed allocation + glog.Infof("Ignoring release on unowned port: %v", key) + return nil + } + if existing.owner != owner { + return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing) + } + delete(proxier.portMap, key) + existing.socket.Close() + return nil +} + +func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error { + if protocol == api.ProtocolUDP { + glog.Warningf("Not adding node port rule for %q on port %d as UDP protocol is not supported by netsh portproxy", name, nodePort) + return nil + } + + err := proxier.claimNodePort(nil, nodePort, protocol, name) + if err != nil { + return err + } + + args := proxier.netshPortProxyAddArgs(nil, nodePort, proxyIP, proxyPort, name) + existed, err := proxier.netsh.EnsurePortProxyRule(args) + + if err != nil { + glog.Errorf("Failed to run portproxy rule for service %q", name) + return err + } + if !existed { + glog.Infof("Added portproxy rule for service %q on %s port %d", name, protocol, nodePort) + } + + return nil +} + +func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error { + // Collect errors and report them all at the end. + el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service) + for _, publicIP := range info.externalIPs { + el = append(el, proxier.closeOnePortal(portal{net.ParseIP(publicIP), info.portal.port, true}, info.protocol, proxier.listenIP, info.proxyPort, service)...) + } + for _, ingress := range info.loadBalancerStatus.Ingress { + if ingress.IP != "" { + el = append(el, proxier.closeOnePortal(portal{net.ParseIP(ingress.IP), info.portal.port, false}, info.protocol, proxier.listenIP, info.proxyPort, service)...) + } + } + if info.nodePort != 0 { + el = append(el, proxier.closeNodePort(info.nodePort, info.protocol, proxier.listenIP, info.proxyPort, service)...) + } + if len(el) == 0 { + glog.V(3).Infof("Closed iptables portals for service %q", service) + } else { + glog.Errorf("Some errors closing iptables portals for service %q", service) + } + return utilerrors.NewAggregate(el) +} + +func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { + el := []error{} + + if local, err := isLocalIP(portal.ip); err != nil { + el = append(el, fmt.Errorf("can't determine if IP is local, assuming not: %v", err)) + } else if local { + if err := proxier.releaseNodePort(portal.ip, portal.port, protocol, name); err != nil { + el = append(el, err) + } + } + + args := proxier.netshIpv4AddressDeleteArgs(portal.ip) + if err := proxier.netsh.DeleteIPAddress(args); err != nil { + glog.Errorf("Failed to delete IP address for service %q", name) + el = append(el, err) + } + + args = proxier.netshPortProxyDeleteArgs(portal.ip, portal.port, proxyIP, proxyPort, name) + if err := proxier.netsh.DeletePortProxyRule(args); err != nil { + glog.Errorf("Failed to delete portproxy rule for service %q", name) + el = append(el, err) + } + + return el +} + +func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error { + el := []error{} + + args := proxier.netshPortProxyDeleteArgs(nil, nodePort, proxyIP, proxyPort, name) + if err := proxier.netsh.DeletePortProxyRule(args); err != nil { + glog.Errorf("Failed to delete portproxy rule for service %q", name) + el = append(el, err) + } + + if err := proxier.releaseNodePort(nil, nodePort, protocol, name); err != nil { + el = append(el, err) + } + + return el +} + +func isLocalIP(ip net.IP) (bool, error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + return false, err + } + for i := range addrs { + intf, _, err := net.ParseCIDR(addrs[i].String()) + if err != nil { + return false, err + } + if ip.Equal(intf) { + return true, nil + } + } + return false, nil +} + +func isTooManyFDsError(err error) bool { + return strings.Contains(err.Error(), "too many open files") +} + +func isClosedError(err error) bool { + // A brief discussion about handling closed error here: + // https://code.google.com/p/go/issues/detail?id=4373#c14 + // TODO: maybe create a stoppable TCP listener that returns a StoppedError + return strings.HasSuffix(err.Error(), "use of closed network connection") +} + +func (proxier *Proxier) netshPortProxyAddArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { + args := []string{ + "interface", "portproxy", "set", "v4tov4", + "listenPort=" + strconv.Itoa(destPort), + "connectaddress=" + proxyIP.String(), + "connectPort=" + strconv.Itoa(proxyPort), + } + if destIP != nil { + args = append(args, "listenaddress="+destIP.String()) + } + + return args +} + +func (proxier *Proxier) netshIpv4AddressAddArgs(destIP net.IP) []string { + intName := proxier.netsh.GetInterfaceToAddIP() + args := []string{ + "interface", "ipv4", "add", "address", + "name=" + intName, + "address=" + destIP.String(), + } + + return args +} + +func (proxier *Proxier) netshPortProxyDeleteArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service proxy.ServicePortName) []string { + args := []string{ + "interface", "portproxy", "delete", "v4tov4", + "listenPort=" + strconv.Itoa(destPort), + } + if destIP != nil { + args = append(args, "listenaddress="+destIP.String()) + } + + return args +} + +func (proxier *Proxier) netshIpv4AddressDeleteArgs(destIP net.IP) []string { + intName := proxier.netsh.GetInterfaceToAddIP() + args := []string{ + "interface", "ipv4", "delete", "address", + "name=" + intName, + "address=" + destIP.String(), + } + + return args +} diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go new file mode 100644 index 00000000000..18289fb226e --- /dev/null +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -0,0 +1,855 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strconv" + "sync/atomic" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/types" + netshtest "k8s.io/kubernetes/pkg/util/netsh/testing" + "k8s.io/kubernetes/pkg/util/runtime" +) + +const ( + udpIdleTimeoutForTest = 250 * time.Millisecond +) + +func joinHostPort(host string, port int) string { + return net.JoinHostPort(host, fmt.Sprintf("%d", port)) +} + +func waitForClosedPortTCP(p *Proxier, proxyPort int) error { + for i := 0; i < 50; i++ { + conn, err := net.Dial("tcp", joinHostPort("", proxyPort)) + if err != nil { + return nil + } + conn.Close() + time.Sleep(1 * time.Millisecond) + } + return fmt.Errorf("port %d still open", proxyPort) +} + +func waitForClosedPortUDP(p *Proxier, proxyPort int) error { + for i := 0; i < 50; i++ { + conn, err := net.Dial("udp", joinHostPort("", proxyPort)) + if err != nil { + return nil + } + conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) + // To detect a closed UDP port write, then read. + _, err = conn.Write([]byte("x")) + if err != nil { + if e, ok := err.(net.Error); ok && !e.Timeout() { + return nil + } + } + var buf [4]byte + _, err = conn.Read(buf[0:]) + if err != nil { + if e, ok := err.(net.Error); ok && !e.Timeout() { + return nil + } + } + conn.Close() + time.Sleep(1 * time.Millisecond) + } + return fmt.Errorf("port %d still open", proxyPort) +} + +var tcpServerPort int32 +var udpServerPort int32 + +func TestMain(m *testing.M) { + // Don't handle panics + runtime.ReallyCrash = true + + // TCP setup. + tcp := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(r.URL.Path[1:])) + })) + defer tcp.Close() + + u, err := url.Parse(tcp.URL) + if err != nil { + panic(fmt.Sprintf("failed to parse: %v", err)) + } + _, port, err := net.SplitHostPort(u.Host) + if err != nil { + panic(fmt.Sprintf("failed to parse: %v", err)) + } + tcpServerPortValue, err := strconv.Atoi(port) + if err != nil { + panic(fmt.Sprintf("failed to atoi(%s): %v", port, err)) + } + tcpServerPort = int32(tcpServerPortValue) + + // UDP setup. + udp, err := newUDPEchoServer() + if err != nil { + panic(fmt.Sprintf("failed to make a UDP server: %v", err)) + } + _, port, err = net.SplitHostPort(udp.LocalAddr().String()) + if err != nil { + panic(fmt.Sprintf("failed to parse: %v", err)) + } + udpServerPortValue, err := strconv.Atoi(port) + if err != nil { + panic(fmt.Sprintf("failed to atoi(%s): %v", port, err)) + } + udpServerPort = int32(udpServerPortValue) + go udp.Loop() + + ret := m.Run() + // it should be safe to call Close() multiple times. + tcp.Close() + os.Exit(ret) +} + +func testEchoTCP(t *testing.T, address string, port int) { + path := "aaaaa" + res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path) + if err != nil { + t.Fatalf("error connecting to server: %v", err) + } + defer res.Body.Close() + data, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("error reading data: %v %v", err, string(data)) + } + if string(data) != path { + t.Errorf("expected: %s, got %s", path, string(data)) + } +} + +func testEchoUDP(t *testing.T, address string, port int) { + data := "abc123" + + conn, err := net.Dial("udp", joinHostPort(address, port)) + if err != nil { + t.Fatalf("error connecting to server: %v", err) + } + if _, err := conn.Write([]byte(data)); err != nil { + t.Fatalf("error sending to server: %v", err) + } + var resp [1024]byte + n, err := conn.Read(resp[0:]) + if err != nil { + t.Errorf("error receiving data: %v", err) + } + if string(resp[0:n]) != data { + t.Errorf("expected: %s, got %s", data, string(resp[0:n])) + } +} + +func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) { + var got int32 + for i := 0; i < 600; i++ { + got = atomic.LoadInt32(&p.numProxyLoops) + if got == want { + return + } + time.Sleep(100 * time.Millisecond) + } + t.Errorf("expected %d ProxyLoops running, got %d", want, got) +} + +func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) { + var got int + now := time.Now() + deadline := now.Add(timeout) + for time.Now().Before(deadline) { + s.activeClients.mu.Lock() + got = len(s.activeClients.clients) + s.activeClients.mu.Unlock() + if got == want { + return + } + time.Sleep(500 * time.Millisecond) + } + t.Errorf("expected %d ProxyClients live, got %d", want, got) +} + +func TestTCPProxy(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) +} + +func TestUDPProxy(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) +} + +func TestUDPProxyTimeout(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + waitForNumProxyLoops(t, p, 1) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + // When connecting to a UDP service endpoint, there should be a Conn for proxy. + waitForNumProxyClients(t, svcInfo, 1, time.Second) + // If conn has no activity for serviceInfo.timeout since last Read/Write, it should be closed because of timeout. + waitForNumProxyClients(t, svcInfo, 0, 2*time.Second) +} + +func TestMultiPortProxy(t *testing.T) { + lb := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} + serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} + lb.OnEndpointsUpdate([]api.Endpoints{{ + ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}}, + }}, + }, { + ObjectMeta: api.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}}, + }}, + }}) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort) + waitForNumProxyLoops(t, p, 1) + + svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoUDP(t, "127.0.0.1", svcInfoQ.proxyPort) + waitForNumProxyLoops(t, p, 2) +} + +func TestMultiPortOnServiceUpdate(t *testing.T) { + lb := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} + serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: 80, + Protocol: "TCP", + }, { + Name: "q", + Port: 81, + Protocol: "UDP", + }}}, + }}) + waitForNumProxyLoops(t, p, 2) + svcInfo, exists := p.getServiceInfo(serviceP) + if !exists { + t.Fatalf("can't find serviceInfo for %s", serviceP) + } + if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 80 || svcInfo.protocol != "TCP" { + t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo) + } + + svcInfo, exists = p.getServiceInfo(serviceQ) + if !exists { + t.Fatalf("can't find serviceInfo for %s", serviceQ) + } + if svcInfo.portal.ip.String() != "1.2.3.4" || svcInfo.portal.port != 81 || svcInfo.protocol != "UDP" { + t.Errorf("unexpected serviceInfo for %s: %#v", serviceQ, svcInfo) + } + + svcInfo, exists = p.getServiceInfo(serviceX) + if exists { + t.Fatalf("found unwanted serviceInfo for %s: %#v", serviceX, svcInfo) + } +} + +// Helper: Stops the proxy for the named service. +func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { + info, found := proxier.getServiceInfo(service) + if !found { + return fmt.Errorf("unknown service: %s", service) + } + return proxier.stopProxy(service, info) +} + +func TestTCPProxyStop(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + if !svcInfo.isAlive() { + t.Fatalf("wrong value for isAlive(): expected true") + } + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + waitForNumProxyLoops(t, p, 1) + + stopProxyByName(p, service) + if svcInfo.isAlive() { + t.Fatalf("wrong value for isAlive(): expected false") + } + // Wait for the port to really close. + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + waitForNumProxyLoops(t, p, 0) +} + +func TestUDPProxyStop(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + waitForNumProxyLoops(t, p, 1) + + stopProxyByName(p, service) + // Wait for the port to really close. + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + waitForNumProxyLoops(t, p, 0) +} + +func TestTCPProxyUpdateDelete(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{}) + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + waitForNumProxyLoops(t, p, 0) +} + +func TestUDPProxyUpdateDelete(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{}) + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + waitForNumProxyLoops(t, p, 0) +} + +func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + } + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{}) + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + waitForNumProxyLoops(t, p, 0) + + // need to add endpoint here because it got clean up during service delete + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }}) + svcInfo, exists := p.getServiceInfo(service) + if !exists { + t.Fatalf("can't find serviceInfo for %s", service) + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) +} + +func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + } + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) + if err != nil { + t.Fatalf("error connecting to proxy: %v", err) + } + conn.Close() + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{}) + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + waitForNumProxyLoops(t, p, 0) + + // need to add endpoint here because it got clean up during service delete + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "UDP", + }}}, + }}) + svcInfo, exists := p.getServiceInfo(service) + if !exists { + t.Fatalf("can't find serviceInfo") + } + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) +} + +func TestTCPProxyUpdatePort(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: 99, + Protocol: "TCP", + }}}, + }}) + // Wait for the socket to actually get free. + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + svcInfo, exists := p.getServiceInfo(service) + if !exists { + t.Fatalf("can't find serviceInfo") + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + // This is a bit async, but this should be sufficient. + time.Sleep(500 * time.Millisecond) + waitForNumProxyLoops(t, p, 1) +} + +func TestUDPProxyUpdatePort(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: 99, + Protocol: "UDP", + }}}, + }}) + // Wait for the socket to actually get free. + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + svcInfo, exists := p.getServiceInfo(service) + if !exists { + t.Fatalf("can't find serviceInfo") + } + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) +} + +func TestProxyUpdatePublicIPs(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + lb.OnEndpointsUpdate([]api.Endpoints{ + { + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + }, + }) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.portal.port), + Protocol: "TCP", + }}, + ClusterIP: svcInfo.portal.ip.String(), + ExternalIPs: []string{"4.3.2.1"}, + }, + }}) + // Wait for the socket to actually get free. + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + t.Fatalf(err.Error()) + } + svcInfo, exists := p.getServiceInfo(service) + if !exists { + t.Fatalf("can't find serviceInfo") + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + // This is a bit async, but this should be sufficient. + time.Sleep(500 * time.Millisecond) + waitForNumProxyLoops(t, p, 1) +} + +func TestProxyUpdatePortal(t *testing.T) { + lb := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + endpoint := api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, + } + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), netshtest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) + if err != nil { + t.Fatal(err) + } + waitForNumProxyLoops(t, p, 0) + + svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) + if err != nil { + t.Fatalf("error adding new service: %#v", err) + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) + + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }}) + _, exists := p.getServiceInfo(service) + if exists { + t.Fatalf("service with empty ClusterIP should not be included in the proxy") + } + + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }}) + _, exists = p.getServiceInfo(service) + if exists { + t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy") + } + + p.OnServiceUpdate([]api.Service{{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ + Name: "p", + Port: int32(svcInfo.proxyPort), + Protocol: "TCP", + }}}, + }}) + lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + svcInfo, exists = p.getServiceInfo(service) + if !exists { + t.Fatalf("service with ClusterIP set not found in the proxy") + } + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) + waitForNumProxyLoops(t, p, 1) +} + +// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go new file mode 100644 index 00000000000..41dcf6c1839 --- /dev/null +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -0,0 +1,300 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "fmt" + "io" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/util/runtime" +) + +// Abstraction over TCP/UDP sockets which are proxied. +type proxySocket interface { + // Addr gets the net.Addr for a proxySocket. + Addr() net.Addr + // Close stops the proxySocket from accepting incoming connections. + // Each implementation should comment on the impact of calling Close + // while sessions are active. + Close() error + // ProxyLoop proxies incoming connections for the specified service to the service endpoints. + ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier) + // ListenPort returns the host port that the proxySocket is listening on + ListenPort() int +} + +func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) { + host := "" + if ip != nil { + host = ip.String() + } + + switch strings.ToUpper(string(protocol)) { + case "TCP": + listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port))) + if err != nil { + return nil, err + } + return &tcpProxySocket{Listener: listener, port: port}, nil + case "UDP": + addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, strconv.Itoa(port))) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + return &udpProxySocket{UDPConn: conn, port: port}, nil + } + return nil, fmt.Errorf("unknown protocol %q", protocol) +} + +// How long we wait for a connection to a backend in seconds +var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second} + +// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, +// no new connections are allowed but existing connections are left untouched. +type tcpProxySocket struct { + net.Listener + port int +} + +func (tcp *tcpProxySocket) ListenPort() int { + return tcp.port +} + +func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { + sessionAffinityReset := false + for _, dialTimeout := range endpointDialTimeout { + endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) + if err != nil { + glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) + return nil, err + } + glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) + // TODO: This could spin up a new goroutine to make the outbound connection, + // and keep accepting inbound traffic. + outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout) + if err != nil { + if isTooManyFDsError(err) { + panic("Dial failed: " + err.Error()) + } + glog.Errorf("Dial failed: %v", err) + sessionAffinityReset = true + continue + } + return outConn, nil + } + return nil, fmt.Errorf("failed to connect to an endpoint.") +} + +func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { + for { + if !myInfo.isAlive() { + // The service port was closed or replaced. + return + } + // Block until a connection is made. + inConn, err := tcp.Accept() + if err != nil { + if isTooManyFDsError(err) { + panic("Accept failed: " + err.Error()) + } + + if isClosedError(err) { + return + } + if !myInfo.isAlive() { + // Then the service port was just closed so the accept failure is to be expected. + return + } + glog.Errorf("Accept failed: %v", err) + continue + } + glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) + outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) + if err != nil { + glog.Errorf("Failed to connect to balancer: %v", err) + inConn.Close() + continue + } + // Spin up an async copy loop. + go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) + } +} + +// proxyTCP proxies data bi-directionally between in and out. +func proxyTCP(in, out *net.TCPConn) { + var wg sync.WaitGroup + wg.Add(2) + glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", + in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) + go copyBytes("from backend", in, out, &wg) + go copyBytes("to backend", out, in, &wg) + wg.Wait() +} + +func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { + defer wg.Done() + glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) + n, err := io.Copy(dest, src) + if err != nil { + if !isClosedError(err) { + glog.Errorf("I/O error: %v", err) + } + } + glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) + dest.Close() + src.Close() +} + +// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called, +// no new connections are allowed and existing connections are broken. +// TODO: We could lame-duck this ourselves, if it becomes important. +type udpProxySocket struct { + *net.UDPConn + port int +} + +func (udp *udpProxySocket) ListenPort() int { + return udp.port +} + +func (udp *udpProxySocket) Addr() net.Addr { + return udp.LocalAddr() +} + +// Holds all the known UDP clients that have not timed out. +type clientCache struct { + mu sync.Mutex + clients map[string]net.Conn // addr string -> connection +} + +func newClientCache() *clientCache { + return &clientCache{clients: map[string]net.Conn{}} +} + +func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) { + var buffer [4096]byte // 4KiB should be enough for most whole-packets + for { + if !myInfo.isAlive() { + // The service port was closed or replaced. + break + } + + // Block until data arrives. + // TODO: Accumulate a histogram of n or something, to fine tune the buffer size. + n, cliAddr, err := udp.ReadFrom(buffer[0:]) + if err != nil { + if e, ok := err.(net.Error); ok { + if e.Temporary() { + glog.V(1).Infof("ReadFrom had a temporary failure: %v", err) + continue + } + } + glog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err) + break + } + // If this is a client we know already, reuse the connection and goroutine. + svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout) + if err != nil { + continue + } + // TODO: It would be nice to let the goroutine handle this write, but we don't + // really want to copy the buffer. We could do a pool of buffers or something. + _, err = svrConn.Write(buffer[0:n]) + if err != nil { + if !logTimeout(err) { + glog.Errorf("Write failed: %v", err) + // TODO: Maybe tear down the goroutine for this client/server pair? + } + continue + } + err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) + if err != nil { + glog.Errorf("SetDeadline failed: %v", err) + continue + } + } +} + +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) { + activeClients.mu.Lock() + defer activeClients.mu.Unlock() + + svrConn, found := activeClients.clients[cliAddr.String()] + if !found { + // TODO: This could spin up a new goroutine to make the outbound connection, + // and keep accepting inbound traffic. + glog.V(3).Infof("New UDP connection from %s", cliAddr) + var err error + svrConn, err = tryConnect(service, cliAddr, "udp", proxier) + if err != nil { + return nil, err + } + if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil { + glog.Errorf("SetDeadline failed: %v", err) + return nil, err + } + activeClients.clients[cliAddr.String()] = svrConn + go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { + defer runtime.HandleCrash() + udp.proxyClient(cliAddr, svrConn, activeClients, timeout) + }(cliAddr, svrConn, activeClients, timeout) + } + return svrConn, nil +} + +// This function is expected to be called as a goroutine. +// TODO: Track and log bytes copied, like TCP +func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { + defer svrConn.Close() + var buffer [4096]byte + for { + n, err := svrConn.Read(buffer[0:]) + if err != nil { + if !logTimeout(err) { + glog.Errorf("Read failed: %v", err) + } + break + } + err = svrConn.SetDeadline(time.Now().Add(timeout)) + if err != nil { + glog.Errorf("SetDeadline failed: %v", err) + break + } + n, err = udp.WriteTo(buffer[0:n], cliAddr) + if err != nil { + if !logTimeout(err) { + glog.Errorf("WriteTo failed: %v", err) + } + break + } + } + activeClients.mu.Lock() + delete(activeClients.clients, cliAddr.String()) + activeClients.mu.Unlock() +} diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go new file mode 100644 index 00000000000..851e26a7e23 --- /dev/null +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -0,0 +1,326 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "errors" + "fmt" + "net" + "reflect" + "strconv" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/slice" +) + +var ( + ErrMissingServiceEntry = errors.New("missing service entry") + ErrMissingEndpoints = errors.New("missing endpoints") +) + +type affinityState struct { + clientIP string + //clientProtocol api.Protocol //not yet used + //sessionCookie string //not yet used + endpoint string + lastUsed time.Time +} + +type affinityPolicy struct { + affinityType api.ServiceAffinity + affinityMap map[string]*affinityState // map client IP -> affinity info + ttlMinutes int +} + +// LoadBalancerRR is a round-robin load balancer. +type LoadBalancerRR struct { + lock sync.RWMutex + services map[proxy.ServicePortName]*balancerState +} + +// Ensure this implements LoadBalancer. +var _ LoadBalancer = &LoadBalancerRR{} + +type balancerState struct { + endpoints []string // a list of "ip:port" style strings + index int // current index into endpoints + affinity affinityPolicy +} + +func newAffinityPolicy(affinityType api.ServiceAffinity, ttlMinutes int) *affinityPolicy { + return &affinityPolicy{ + affinityType: affinityType, + affinityMap: make(map[string]*affinityState), + ttlMinutes: ttlMinutes, + } +} + +// NewLoadBalancerRR returns a new LoadBalancerRR. +func NewLoadBalancerRR() *LoadBalancerRR { + return &LoadBalancerRR{ + services: map[proxy.ServicePortName]*balancerState{}, + } +} + +func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { + glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort) + lb.lock.Lock() + defer lb.lock.Unlock() + lb.newServiceInternal(svcPort, affinityType, ttlMinutes) + return nil +} + +// This assumes that lb.lock is already held. +func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) *balancerState { + if ttlMinutes == 0 { + ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimited instead???? + } + + if _, exists := lb.services[svcPort]; !exists { + lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort) + } else if affinityType != "" { + lb.services[svcPort].affinity.affinityType = affinityType + } + return lb.services[svcPort] +} + +func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) { + glog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort) + lb.lock.Lock() + defer lb.lock.Unlock() + delete(lb.services, svcPort) +} + +// return true if this service is using some form of session affinity. +func isSessionAffinity(affinity *affinityPolicy) bool { + // Should never be empty string, but checking for it to be safe. + if affinity.affinityType == "" || affinity.affinityType == api.ServiceAffinityNone { + return false + } + return true +} + +// NextEndpoint returns a service endpoint. +// The service endpoint is chosen using the round-robin algorithm. +func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) { + // Coarse locking is simple. We can get more fine-grained if/when we + // can prove it matters. + lb.lock.Lock() + defer lb.lock.Unlock() + + state, exists := lb.services[svcPort] + if !exists || state == nil { + return "", ErrMissingServiceEntry + } + if len(state.endpoints) == 0 { + return "", ErrMissingEndpoints + } + glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints) + + sessionAffinityEnabled := isSessionAffinity(&state.affinity) + + var ipaddr string + if sessionAffinityEnabled { + // Caution: don't shadow ipaddr + var err error + ipaddr, _, err = net.SplitHostPort(srcAddr.String()) + if err != nil { + return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) + } + if !sessionAffinityReset { + sessionAffinity, exists := state.affinity.affinityMap[ipaddr] + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + // Affinity wins. + endpoint := sessionAffinity.endpoint + sessionAffinity.lastUsed = time.Now() + glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s", svcPort, ipaddr, sessionAffinity, endpoint) + return endpoint, nil + } + } + } + // Take the next endpoint. + endpoint := state.endpoints[state.index] + state.index = (state.index + 1) % len(state.endpoints) + + if sessionAffinityEnabled { + var affinity *affinityState + affinity = state.affinity.affinityMap[ipaddr] + if affinity == nil { + affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} + state.affinity.affinityMap[ipaddr] = affinity + } + affinity.lastUsed = time.Now() + affinity.endpoint = endpoint + affinity.clientIP = ipaddr + glog.V(4).Infof("Updated affinity key %s: %#v", ipaddr, state.affinity.affinityMap[ipaddr]) + } + + return endpoint, nil +} + +type hostPortPair struct { + host string + port int +} + +func isValidEndpoint(hpp *hostPortPair) bool { + return hpp.host != "" && hpp.port > 0 +} + +func flattenValidEndpoints(endpoints []hostPortPair) []string { + // Convert Endpoint objects into strings for easier use later. Ignore + // the protocol field - we'll get that from the Service objects. + var result []string + for i := range endpoints { + hpp := &endpoints[i] + if isValidEndpoint(hpp) { + result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) + } + } + return result +} + +// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). +func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) { + for _, affinity := range state.affinity.affinityMap { + if affinity.endpoint == endpoint { + glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort) + delete(state.affinity.affinityMap, affinity.clientIP) + } + } +} + +// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +// This assumes the lb.lock is held. +func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEndpoints []string) { + allEndpoints := map[string]int{} + for _, newEndpoint := range newEndpoints { + allEndpoints[newEndpoint] = 1 + } + state, exists := lb.services[svcPort] + if !exists { + return + } + for _, existingEndpoint := range state.endpoints { + allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 + } + for mKey, mVal := range allEndpoints { + if mVal == 1 { + glog.V(2).Infof("Delete endpoint %s for service %q", mKey, svcPort) + removeSessionAffinityByEndpoint(state, svcPort, mKey) + } + } +} + +// OnEndpointsUpdate manages the registered service endpoints. +// Registered endpoints are updated if found in the update set or +// unregistered if missing from the update set. +func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) { + registeredEndpoints := make(map[proxy.ServicePortName]bool) + lb.lock.Lock() + defer lb.lock.Unlock() + + // Update endpoints for services. + for i := range allEndpoints { + svcEndpoints := &allEndpoints[i] + + // We need to build a map of portname -> all ip:ports for that + // portname. Explode Endpoints.Subsets[*] into this structure. + portsToEndpoints := map[string][]hostPortPair{} + for i := range svcEndpoints.Subsets { + ss := &svcEndpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) + // Ignore the protocol field - we'll get that from the Service objects. + } + } + } + + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} + state, exists := lb.services[svcPort] + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } + newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + lb.updateAffinityMap(svcPort, newEndpoints) + // OnEndpointsUpdate can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) + + // Reset the round-robin index. + state.index = 0 + } + registeredEndpoints[svcPort] = true + } + } + // Remove endpoints missing from the update. + for k := range lb.services { + if _, exists := registeredEndpoints[k]; !exists { + glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k) + // Reset but don't delete. + state := lb.services[k] + state.endpoints = []string{} + state.index = 0 + state.affinity.affinityMap = map[string]*affinityState{} + } + } +} + +// Tests whether two slices are equivalent. This sorts both slices in-place. +func slicesEquiv(lhs, rhs []string) bool { + if len(lhs) != len(rhs) { + return false + } + if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { + return true + } + return false +} + +func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortName) { + lb.lock.Lock() + defer lb.lock.Unlock() + + state, exists := lb.services[svcPort] + if !exists { + return + } + for ip, affinity := range state.affinity.affinityMap { + if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { + glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort) + delete(state.affinity.affinityMap, ip) + } + } +} diff --git a/pkg/proxy/winuserspace/roundrobin_test.go b/pkg/proxy/winuserspace/roundrobin_test.go new file mode 100644 index 00000000000..0e37fcdfb04 --- /dev/null +++ b/pkg/proxy/winuserspace/roundrobin_test.go @@ -0,0 +1,727 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "net" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/types" +) + +func TestValidateWorks(t *testing.T) { + if isValidEndpoint(&hostPortPair{}) { + t.Errorf("Didn't fail for empty set") + } + if isValidEndpoint(&hostPortPair{host: "foobar"}) { + t.Errorf("Didn't fail with invalid port") + } + if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) { + t.Errorf("Didn't fail with a negative port") + } + if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) { + t.Errorf("Failed a valid config.") + } +} + +func TestFilterWorks(t *testing.T) { + endpoints := []hostPortPair{ + {host: "foobar", port: 1}, + {host: "foobar", port: 2}, + {host: "foobar", port: -1}, + {host: "foobar", port: 3}, + {host: "foobar", port: -2}, + } + filtered := flattenValidEndpoints(endpoints) + + if len(filtered) != 3 { + t.Errorf("Failed to filter to the correct size") + } + if filtered[0] != "foobar:1" { + t.Errorf("Index zero is not foobar:1") + } + if filtered[1] != "foobar:2" { + t.Errorf("Index one is not foobar:2") + } + if filtered[2] != "foobar:3" { + t.Errorf("Index two is not foobar:3") + } +} + +func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + var endpoints []api.Endpoints + loadBalancer.OnEndpointsUpdate(endpoints) + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil { + t.Errorf("Didn't fail with non-existent service") + } + if len(endpoint) != 0 { + t.Errorf("Got an endpoint") + } +} + +func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) { + endpoint, err := loadBalancer.NextEndpoint(service, netaddr, false) + if err != nil { + t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) + } + if endpoint != expected { + t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint) + } +} + +func expectEndpointWithSessionAffinityReset(t *testing.T, loadBalancer *LoadBalancerRR, service proxy.ServicePortName, expected string, netaddr net.Addr) { + endpoint, err := loadBalancer.NextEndpoint(service, netaddr, true) + if err != nil { + t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) + } + if endpoint != expected { + t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint) + } +} + +func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 40}}, + }}, + } + loadBalancer.OnEndpointsUpdate(endpoints) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) +} + +func stringsInSlice(haystack []string, needles ...string) bool { + for _, needle := range needles { + found := false + for i := range haystack { + if haystack[i] == needle { + found = true + break + } + } + if found == false { + return false + } + } + return true +} + +func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}}, + }}, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + shuffledEndpoints := loadBalancer.services[service].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil) +} + +func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} + serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} + endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 2}}, + }, + { + Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 4}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + shuffledEndpoints := loadBalancer.services[serviceP].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil) + + shuffledEndpoints = loadBalancer.services[serviceQ].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint1:2", "endpoint2:2", "endpoint3:4") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil) +} + +func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} + serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "q"} + endpoint, err := loadBalancer.NextEndpoint(serviceP, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 10}}, + }, + { + Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 2}, {Name: "q", Port: 20}}, + }, + { + Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 30}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + shuffledEndpoints := loadBalancer.services[serviceP].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil) + + shuffledEndpoints = loadBalancer.services[serviceQ].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint1:10", "endpoint2:20", "endpoint3:30") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil) + + // Then update the configuration with one fewer endpoints, make sure + // we start in the beginning again + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint4"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 4}, {Name: "q", Port: 40}}, + }, + { + Addresses: []api.EndpointAddress{{IP: "endpoint5"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 5}, {Name: "q", Port: 50}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + shuffledEndpoints = loadBalancer.services[serviceP].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceP, shuffledEndpoints[1], nil) + + shuffledEndpoints = loadBalancer.services[serviceQ].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint4:40", "endpoint5:50") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) + + // Clear endpoints + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpoints) + + endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } +} + +func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + fooServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "p"} + barServiceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: "p"} + endpoint, err := loadBalancer.NextEndpoint(fooServiceP, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + endpoints := make([]api.Endpoints, 2) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}, {IP: "endpoint3"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 123}}, + }, + }, + } + endpoints[1] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint4"}, {IP: "endpoint5"}, {IP: "endpoint6"}}, + Ports: []api.EndpointPort{{Name: "p", Port: 456}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints + expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil) + expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[2], nil) + expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil) + + shuffledBarEndpoints := loadBalancer.services[barServiceP].endpoints + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil) + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil) + + // Then update the configuration by removing foo + loadBalancer.OnEndpointsUpdate(endpoints[1:]) + endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + // but bar is still there, and we continue RR from where we left off. + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil) + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[2], nil) +} + +func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + // Call NewService() before OnEndpointsUpdate() + loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + + ep1, err := loadBalancer.NextEndpoint(service, client1, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + + ep2, err := loadBalancer.NextEndpoint(service, client2, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + + ep3, err := loadBalancer.NextEndpoint(service, client3, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) +} + +func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + // Call OnEndpointsUpdate() before NewService() + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + + ep1, err := loadBalancer.NextEndpoint(service, client1, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep1, client1) + + ep2, err := loadBalancer.NextEndpoint(service, client2, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep2, client2) + + ep3, err := loadBalancer.NextEndpoint(service, client3, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep3, client3) + + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) + expectEndpoint(t, loadBalancer, service, ep1, client1) + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpoint(t, loadBalancer, service, ep3, client3) +} + +func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + client4 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 4), Port: 0} + client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} + client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + client1Endpoint := shuffledEndpoints[0] + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + client2Endpoint := shuffledEndpoints[1] + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) + client3Endpoint := shuffledEndpoints[2] + + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Port: 1}, {Port: 2}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + shuffledEndpoints = loadBalancer.services[service].endpoints + if client1Endpoint == "endpoint:3" { + client1Endpoint = shuffledEndpoints[0] + } else if client2Endpoint == "endpoint:3" { + client2Endpoint = shuffledEndpoints[0] + } else if client3Endpoint == "endpoint:3" { + client3Endpoint = shuffledEndpoints[0] + } + expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) + expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) + expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) + + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 4}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + shuffledEndpoints = loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) + expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) + expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6) +} + +func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + shuffledEndpoints := loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + // Then update the configuration with one fewer endpoints, make sure + // we start in the beginning again + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Port: 4}, {Port: 5}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + shuffledEndpoints = loadBalancer.services[service].endpoints + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) + + // Clear endpoints + endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpoints) + + endpoint, err = loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } +} + +func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + fooService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} + endpoint, err := loadBalancer.NextEndpoint(fooService, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) + endpoints := make([]api.Endpoints, 2) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}}, + }, + }, + } + barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} + loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) + endpoints[1] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: "endpoint"}}, + Ports: []api.EndpointPort{{Port: 4}, {Port: 5}}, + }, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + shuffledFooEndpoints := loadBalancer.services[fooService].endpoints + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3) + + shuffledBarEndpoints := loadBalancer.services[barService].endpoints + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) + + // Then update the configuration by removing foo + loadBalancer.OnEndpointsUpdate(endpoints[1:]) + endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + // but bar is still there, and we continue RR from where we left off. + shuffledBarEndpoints = loadBalancer.services[barService].endpoints + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1) +} + +func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: ""} + endpoint, err := loadBalancer.NextEndpoint(service, nil, false) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + // Call NewService() before OnEndpointsUpdate() + loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{ + {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, + {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, + }, + } + loadBalancer.OnEndpointsUpdate(endpoints) + + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + + ep1, err := loadBalancer.NextEndpoint(service, client1, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + + ep2, err := loadBalancer.NextEndpoint(service, client2, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + + ep3, err := loadBalancer.NextEndpoint(service, client3, false) + if err != nil { + t.Errorf("Didn't find a service for %s: %v", service, err) + } + + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client1) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client1) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1) + + expectEndpoint(t, loadBalancer, service, ep2, client2) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep3, client1) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep1, client2) + expectEndpointWithSessionAffinityReset(t, loadBalancer, service, ep2, client3) +} diff --git a/pkg/proxy/winuserspace/udp_server.go b/pkg/proxy/winuserspace/udp_server.go new file mode 100644 index 00000000000..a54c28aa12f --- /dev/null +++ b/pkg/proxy/winuserspace/udp_server.go @@ -0,0 +1,47 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winuserspace + +import ( + "fmt" + "net" +) + +// udpEchoServer is a simple echo server in UDP, intended for testing the proxy. +type udpEchoServer struct { + net.PacketConn +} + +func (r *udpEchoServer) Loop() { + var buffer [4096]byte + for { + n, cliAddr, err := r.ReadFrom(buffer[0:]) + if err != nil { + fmt.Printf("ReadFrom failed: %v\n", err) + continue + } + r.WriteTo(buffer[0:n], cliAddr) + } +} + +func newUDPEchoServer() (*udpEchoServer, error) { + packetconn, err := net.ListenPacket("udp", ":0") + if err != nil { + return nil, err + } + return &udpEchoServer{packetconn}, nil +} diff --git a/test/test_owners.csv b/test/test_owners.csv index fbdfa9037fd..d3e156429a5 100644 --- a/test/test_owners.csv +++ b/test/test_owners.csv @@ -666,6 +666,7 @@ k8s.io/kubernetes/pkg/proxy/config,ixdy,1 k8s.io/kubernetes/pkg/proxy/healthcheck,ghodss,1 k8s.io/kubernetes/pkg/proxy/iptables,freehan,0 k8s.io/kubernetes/pkg/proxy/userspace,luxas,1 +k8s.io/kubernetes/pkg/proxy/winuserspace,jbhurat,0 k8s.io/kubernetes/pkg/quota,sttts,1 k8s.io/kubernetes/pkg/quota/evaluator/core,yifan-gu,1 k8s.io/kubernetes/pkg/registry/apps/petset,kevin-wangzefeng,1 From 23c35f24c777711cacdd06b423c9d9869f761597 Mon Sep 17 00:00:00 2001 From: Paulo Pires Date: Thu, 3 Nov 2016 10:41:22 -0400 Subject: [PATCH 3/4] Added automatic seletion of userspace proxy mode depending on the OS kube-proxy is running. --- cmd/kube-proxy/app/BUILD | 2 ++ cmd/kube-proxy/app/server.go | 63 +++++++++++++++++++++++++++--------- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index f2e6b2f1789..099c65de1dd 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -29,6 +29,7 @@ go_library( "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", "//pkg/proxy/userspace:go_default_library", + "//pkg/proxy/winuserspace:go_default_library", "//pkg/types:go_default_library", "//pkg/util/configz:go_default_library", "//pkg/util/dbus:go_default_library", @@ -36,6 +37,7 @@ go_library( "//pkg/util/iptables:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/util/net:go_default_library", + "//pkg/util/netsh:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/oom:go_default_library", "//pkg/util/resourcecontainer:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 82a5621159f..d079adff163 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -39,12 +39,14 @@ import ( proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/userspace" + "k8s.io/kubernetes/pkg/proxy/winuserspace" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/configz" utildbus "k8s.io/kubernetes/pkg/util/dbus" "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilnet "k8s.io/kubernetes/pkg/util/net" + utilnetsh "k8s.io/kubernetes/pkg/util/netsh" nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/resourcecontainer" @@ -136,10 +138,19 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err protocol = utiliptables.ProtocolIpv6 } + var netshInterface utilnetsh.Interface + var iptInterface utiliptables.Interface + var dbus utildbus.Interface + // Create a iptables utils. execer := exec.New() - dbus := utildbus.New() - iptInterface := utiliptables.New(execer, dbus, protocol) + + if runtime.GOOS == "windows" { + netshInterface = utilnetsh.New(execer) + } else { + dbus = utildbus.New() + iptInterface = utiliptables.New(execer, dbus, protocol) + } // We omit creation of pretty much everything if we run in cleanup mode if config.CleanupAndExit { @@ -223,24 +234,44 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer - proxierUserspace, err := userspace.NewProxier( - loadBalancer, - net.ParseIP(config.BindAddress), - iptInterface, - *utilnet.ParsePortRangeOrDie(config.PortRange), - config.IPTablesSyncPeriod.Duration, - config.IPTablesMinSyncPeriod.Duration, - config.UDPIdleTimeout.Duration, - ) + var proxierUserspace proxy.ProxyProvider + + if runtime.GOOS == "windows" { + proxierUserspace, err = winuserspace.NewProxier( + loadBalancer, + net.ParseIP(config.BindAddress), + netshInterface, + *utilnet.ParsePortRangeOrDie(config.PortRange), + // TODO @pires replace below with default values, if applicable + config.IPTablesSyncPeriod.Duration, + config.UDPIdleTimeout.Duration, + ) + } else { + proxierUserspace, err = userspace.NewProxier( + loadBalancer, + net.ParseIP(config.BindAddress), + iptInterface, + *utilnet.ParsePortRangeOrDie(config.PortRange), + config.IPTablesSyncPeriod.Duration, + config.IPTablesMinSyncPeriod.Duration, + config.UDPIdleTimeout.Duration, + ) + } if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierUserspace - // Remove artifacts from the pure-iptables Proxier. - glog.V(0).Info("Tearing down pure-iptables proxy rules.") - iptables.CleanupLeftovers(iptInterface) + // Remove artifacts from the pure-iptables Proxier, if not on Windows. + if runtime.GOOS != "windows" { + glog.V(0).Info("Tearing down pure-iptables proxy rules.") + iptables.CleanupLeftovers(iptInterface) + } + } + + // Add iptables reload function, if not on Windows. + if runtime.GOOS != "windows" { + iptInterface.AddReloadFunc(proxier.Sync) } - iptInterface.AddReloadFunc(proxier.Sync) // Create configs (i.e. Watches for Services and Endpoints) // Note: RegisterHandler() calls need to happen before creation of Sources because sources @@ -300,7 +331,7 @@ func (s *ProxyServer) Run() error { } // Tune conntrack, if requested - if s.Conntracker != nil { + if s.Conntracker != nil && runtime.GOOS != "windows" { max, err := getConntrackMax(s.Config) if err != nil { return err From 562d0756ef696639d6dc5209b8b708ab65290a46 Mon Sep 17 00:00:00 2001 From: Paulo Pires Date: Mon, 7 Nov 2016 09:13:58 +0000 Subject: [PATCH 4/4] Fixed copyright headers. --- pkg/proxy/winuserspace/loadbalancer.go | 2 +- pkg/proxy/winuserspace/port_allocator.go | 2 +- pkg/proxy/winuserspace/port_allocator_test.go | 2 +- pkg/proxy/winuserspace/proxier.go | 2 +- pkg/proxy/winuserspace/proxier_test.go | 2 +- pkg/proxy/winuserspace/proxysocket.go | 2 +- pkg/proxy/winuserspace/roundrobin.go | 2 +- pkg/proxy/winuserspace/roundrobin_test.go | 2 +- pkg/proxy/winuserspace/udp_server.go | 2 +- pkg/util/netsh/doc.go | 2 +- pkg/util/netsh/netsh.go | 2 +- pkg/util/netsh/testing/fake.go | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/proxy/winuserspace/loadbalancer.go b/pkg/proxy/winuserspace/loadbalancer.go index c649369e031..b4a5bc2c253 100644 --- a/pkg/proxy/winuserspace/loadbalancer.go +++ b/pkg/proxy/winuserspace/loadbalancer.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/port_allocator.go b/pkg/proxy/winuserspace/port_allocator.go index baeae9c9998..cbd94314edc 100644 --- a/pkg/proxy/winuserspace/port_allocator.go +++ b/pkg/proxy/winuserspace/port_allocator.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/port_allocator_test.go b/pkg/proxy/winuserspace/port_allocator_test.go index f68c2aec296..ca13b9110ce 100644 --- a/pkg/proxy/winuserspace/port_allocator_test.go +++ b/pkg/proxy/winuserspace/port_allocator_test.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 9aa0e394076..88de43b1209 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 18289fb226e..e0b3a80d522 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go index 41dcf6c1839..fcb5fd02873 100644 --- a/pkg/proxy/winuserspace/proxysocket.go +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go index 851e26a7e23..7b02d0f1fce 100644 --- a/pkg/proxy/winuserspace/roundrobin.go +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/roundrobin_test.go b/pkg/proxy/winuserspace/roundrobin_test.go index 0e37fcdfb04..79d00904108 100644 --- a/pkg/proxy/winuserspace/roundrobin_test.go +++ b/pkg/proxy/winuserspace/roundrobin_test.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/proxy/winuserspace/udp_server.go b/pkg/proxy/winuserspace/udp_server.go index a54c28aa12f..c95c1e05973 100644 --- a/pkg/proxy/winuserspace/udp_server.go +++ b/pkg/proxy/winuserspace/udp_server.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/util/netsh/doc.go b/pkg/util/netsh/doc.go index eb628bc0350..529d1e8f1cf 100644 --- a/pkg/util/netsh/doc.go +++ b/pkg/util/netsh/doc.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/util/netsh/netsh.go b/pkg/util/netsh/netsh.go index 47cd5e42ca5..59d1a18b61b 100644 --- a/pkg/util/netsh/netsh.go +++ b/pkg/util/netsh/netsh.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/util/netsh/testing/fake.go b/pkg/util/netsh/testing/fake.go index a0e5249611f..0322aeb1ebb 100644 --- a/pkg/util/netsh/testing/fake.go +++ b/pkg/util/netsh/testing/fake.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.