Adding EndpointSlice support for kube-proxy ipvs and iptables proxiers

This commit is contained in:
Rob Scott 2019-08-06 17:17:16 -07:00
parent 550fb1bfc3
commit 9665c590c7
No known key found for this signature in database
GPG Key ID: 53504C654CF4B3EE
29 changed files with 1401 additions and 133 deletions

View File

@ -19,6 +19,7 @@ go_library(
importpath = "k8s.io/kubernetes/cmd/kube-proxy/app",
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/qos:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/proxy:go_default_library",

View File

@ -29,7 +29,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
v1meta "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
@ -50,6 +50,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/kube-proxy/config/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/proxy"
@ -477,6 +478,7 @@ type ProxyServer struct {
CleanupIPVS bool
MetricsBindAddress string
EnableProfiling bool
UseEndpointSlices bool
OOMScoreAdj *int32
ConfigSyncPeriod time.Duration
HealthzServer *healthcheck.HealthzServer
@ -619,11 +621,11 @@ func (s *ProxyServer) Run() error {
labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)
informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *v1meta.ListOptions) {
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector.String()
}))
// Create configs (i.e. Watches for Services and Endpoints)
// Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
@ -631,9 +633,15 @@ func (s *ProxyServer) Run() error {
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop)
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1alpha1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
} else {
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop)
}
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
// functions must configure their shared informer event handlers first.

View File

@ -476,6 +476,12 @@ const (
// Enables ipv6 dual stack
IPv6DualStack featuregate.Feature = "IPv6DualStack"
// owner: @robscott @freehan
// alpha: v1.16
//
// Enable Endpoint Slices for more scalable Service endpoints.
EndpointSlice featuregate.Feature = "EndpointSlice"
// owner: @Huang-Wei
// alpha: v1.16
//
@ -559,6 +565,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
VolumePVCDataSource: {Default: false, PreRelease: featuregate.Alpha},
PodOverhead: {Default: false, PreRelease: featuregate.Alpha},
IPv6DualStack: {Default: false, PreRelease: featuregate.Alpha},
EndpointSlice: {Default: false, PreRelease: featuregate.Alpha},
EvenPodsSpread: {Default: false, PreRelease: featuregate.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/iptables:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/mount:go_default_library",

View File

@ -20,13 +20,14 @@ import (
"fmt"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/iptables"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnode "k8s.io/kubernetes/pkg/util/node"
@ -41,7 +42,9 @@ type HollowProxy struct {
ProxyServer *proxyapp.ProxyServer
}
type FakeProxier struct{}
type FakeProxier struct {
proxyconfig.NoopEndpointSliceHandler
}
func (*FakeProxier) Sync() {}
func (*FakeProxier) SyncLoop() {

View File

@ -11,6 +11,7 @@ go_library(
srcs = [
"doc.go",
"endpoints.go",
"endpointslicecache.go",
"service.go",
"types.go",
],
@ -21,6 +22,7 @@ go_library(
"//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
@ -58,15 +60,18 @@ go_test(
name = "go_default_test",
srcs = [
"endpoints_test.go",
"endpointslicecache_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -15,8 +15,10 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/proxy/config",
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -21,8 +21,10 @@ import (
"time"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)
@ -61,6 +63,40 @@ type EndpointsHandler interface {
OnEndpointsSynced()
}
// EndpointSliceHandler is an abstract interface of objects which receive
// notifications about endpoint slice object changes.
type EndpointSliceHandler interface {
// OnEndpointSliceAdd is called whenever creation of new endpoint slice
// object is observed.
OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice)
// OnEndpointSliceUpdate is called whenever modification of an existing
// endpoint slice object is observed.
OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice)
// OnEndpointSliceDelete is called whenever deletion of an existing
// endpoint slice object is observed.
OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice)
// OnEndpointSlicesSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
OnEndpointSlicesSynced()
}
// NoopEndpointSliceHandler is a noop handler for proxiers that have not yet
// implemented a full EndpointSliceHandler.
type NoopEndpointSliceHandler struct{}
// OnEndpointSliceAdd is a noop handler for EndpointSlice creates.
func (*NoopEndpointSliceHandler) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {}
// OnEndpointSliceUpdate is a noop handler for EndpointSlice updates.
func (*NoopEndpointSliceHandler) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) {
}
// OnEndpointSliceDelete is a noop handler for EndpointSlice deletes.
func (*NoopEndpointSliceHandler) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {}
// OnEndpointSlicesSynced is a noop handler for EndpointSlice syncs.
func (*NoopEndpointSliceHandler) OnEndpointSlicesSynced() {}
// EndpointsConfig tracks a set of endpoints configurations.
type EndpointsConfig struct {
listerSynced cache.InformerSynced
@ -152,6 +188,97 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
}
}
// EndpointSliceConfig tracks a set of endpoints configurations.
type EndpointSliceConfig struct {
listerSynced cache.InformerSynced
eventHandlers []EndpointSliceHandler
}
// NewEndpointSliceConfig creates a new EndpointSliceConfig.
func NewEndpointSliceConfig(endpointSliceInformer discoveryinformers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
result := &EndpointSliceConfig{
listerSynced: endpointSliceInformer.Informer().HasSynced,
}
endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddEndpointSlice,
UpdateFunc: result.handleUpdateEndpointSlice,
DeleteFunc: result.handleDeleteEndpointSlice,
},
resyncPeriod,
)
return result
}
// RegisterEventHandler registers a handler which is called on every endpoint slice change.
func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}
// Run waits for cache synced and invokes handlers after syncing.
func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
klog.Info("Starting endpoint slice config controller")
if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
return
}
for _, h := range c.eventHandlers {
klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()")
h.OnEndpointSlicesSynced()
}
}
func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) {
endpointSlice, ok := obj.(*discovery.EndpointSlice)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}
for _, h := range c.eventHandlers {
klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate %+v", endpointSlice)
h.OnEndpointSliceAdd(endpointSlice)
}
}
func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) {
oldEndpointSlice, ok := oldObj.(*discovery.EndpointSlice)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
return
}
newEndpointSlice, ok := newObj.(*discovery.EndpointSlice)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
return
}
for _, h := range c.eventHandlers {
klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate")
h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
}
}
func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
endpointSlice, ok := obj.(*discovery.EndpointSlice)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}
if endpointSlice, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}
}
for _, h := range c.eventHandlers {
klog.V(4).Infof("Calling handler.OnEndpointsDelete")
h.OnEndpointSliceDelete(endpointSlice)
}
}
// ServiceConfig tracks a set of service configurations.
type ServiceConfig struct {
listerSynced cache.InformerSynced

View File

@ -26,7 +26,7 @@ import (
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
@ -92,6 +92,8 @@ type EndpointChangeTracker struct {
items map[types.NamespacedName]*endpointsChange
// makeEndpointInfo allows proxier to inject customized information when processing endpoint.
makeEndpointInfo makeEndpointFunc
// endpointSliceCache holds a simplified version of endpoint slices
endpointSliceCache *EndpointSliceCache
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
isIPv6Mode *bool
recorder record.EventRecorder
@ -101,8 +103,8 @@ type EndpointChangeTracker struct {
}
// NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker {
return &EndpointChangeTracker{
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool) *EndpointChangeTracker {
ect := &EndpointChangeTracker{
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo,
@ -110,6 +112,10 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
recorder: recorder,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
}
if endpointSlicesEnabled {
ect.endpointSliceCache = NewEndpointSliceCache(hostname, isIPv6Mode, recorder, makeEndpointInfo)
}
return ect
}
// Update updates given service's endpoints change map based on the <previous, current> endpoints pair. It returns true
@ -141,9 +147,9 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
change.previous = ect.endpointsToEndpointsMap(previous)
ect.items[namespacedName] = change
}
if t := getLastChangeTriggerTime(current); !t.IsZero() {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() {
ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t)
}
change.current = ect.endpointsToEndpointsMap(current)
@ -163,31 +169,83 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
return len(ect.items) > 0
}
// getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
// or was set incorrectly.
func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time {
// EndpointSliceUpdate updates given service's endpoints change map based on the <previous, current> endpoints pair.
// It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap.
// If removeSlice is true, slice will be removed, otherwise it will be added or updated.
func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
// This should never happen
if endpointSlice == nil {
klog.Error("Nil endpointSlice passed to EndpointSliceUpdate")
return false
}
namespacedName, _ := endpointSliceCacheKeys(endpointSlice)
metrics.EndpointChangesTotal.Inc()
ect.lock.Lock()
defer ect.lock.Unlock()
change, ok := ect.items[namespacedName]
if !ok {
change = &endpointsChange{}
change.previous = ect.endpointSliceCache.EndpointsMap(namespacedName)
ect.items[namespacedName] = change
}
if removeSlice {
ect.endpointSliceCache.Delete(endpointSlice)
} else {
ect.endpointSliceCache.Update(endpointSlice)
}
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
}
change.current = ect.endpointSliceCache.EndpointsMap(namespacedName)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) {
delete(ect.items, namespacedName)
// Reset the lastChangeTriggerTimes for this service. Given that the network programming
// SLI is defined as the duration between a time of an event and a time when the network was
// programmed to incorporate that event, if there are events that happened between two
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
// there will be no network programming for them and thus no network programming latency metric
// should be exported.
delete(ect.lastChangeTriggerTimes, namespacedName)
}
metrics.EndpointChangesPending.Set(float64(len(ect.items)))
return len(ect.items) > 0
}
// getLastChangeTriggerTime returns the time.Time value of the
// EndpointsLastChangeTriggerTime annotation stored in the given endpoints
// object or the "zero" time if the annotation wasn't set or was set
// incorrectly.
func getLastChangeTriggerTime(annotations map[string]string) time.Time {
// TODO(#81360): ignore case when Endpoint is deleted.
if endpoints == nil {
if _, ok := annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
// It's possible that the Endpoints object won't have the
// EndpointsLastChangeTriggerTime annotation set. In that case return
// the 'zero value', which is ignored in the upstream code.
return time.Time{}
}
if _, ok := endpoints.Annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
// It's possible that the Endpoints object won't have the EndpointsLastChangeTriggerTime
// annotation set. In that case return the 'zero value', which is ignored in the upstream code.
return time.Time{}
}
val, err := time.Parse(time.RFC3339Nano, endpoints.Annotations[v1.EndpointsLastChangeTriggerTime])
val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime])
if err != nil {
klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v",
endpoints.Annotations[v1.EndpointsLastChangeTriggerTime], err)
annotations[v1.EndpointsLastChangeTriggerTime], err)
// In case of error val = time.Zero, which is ignored in the upstream code.
}
return val
}
// endpointsChange contains all changes to endpoints that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying the changes.
// endpointsChange contains all changes to endpoints that happened since proxy
// rules were synced. For a single object, changes are accumulated, i.e.
// previous is state from before applying the changes, current is state after
// applying the changes.
type endpointsChange struct {
previous EndpointsMap
current EndpointsMap
@ -240,8 +298,8 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
}
endpointsMap := make(EndpointsMap)
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
// We need to build a map of portname -> all ip:ports for that portname.
// Explode Endpoints.Subsets[*] into this structure.
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
@ -276,13 +334,8 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo)
}
}
if klog.V(3) {
newEPList := []string{}
for _, ep := range endpointsMap[svcPortName] {
newEPList = append(newEPList, ep.String())
}
klog.Infof("Setting endpoints for %q to %+v", svcPortName, newEPList)
}
klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName]))
}
}
return endpointsMap

View File

@ -23,10 +23,12 @@ import (
"github.com/davecgh/go-spew/spew"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilpointer "k8s.io/utils/pointer"
)
func (proxier *FakeProxier) addEndpoints(endpoints *v1.Endpoints) {
@ -133,7 +135,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.
// This is a coarse test, but it offers some modicum of confidence as the code is evolved.
func TestEndpointsToEndpointsMap(t *testing.T) {
epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil)
epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false)
trueVal := true
falseVal := false
@ -1382,13 +1384,246 @@ func TestLastChangeTriggerTime(t *testing.T) {
}
}
func TestEndpointSliceUpdate(t *testing.T) {
testCases := map[string]struct {
startingSlices []*discovery.EndpointSlice
endpointChangeTracker *EndpointChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
}{
// test starting from an empty state
"add a simple slice that doesn't already exist": {
startingSlices: []*discovery.EndpointSlice{},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
expectedReturnVal: true,
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
},
makeServicePortName("ns1", "svc1", "port-1"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:443"},
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:443"},
},
},
},
// test no modification to state - current change should be nil as nothing changes
"add the same slice that already exists": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
},
// test additions to existing state
"add a slice that overlaps with existing state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
expectedReturnVal: true,
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.4:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.5:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.2.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.2.2:80", IsLocal: true},
},
makeServicePortName("ns1", "svc1", "port-1"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.4:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.5:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.2.1:443"},
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true},
},
},
},
// test additions to existing state with partially overlapping slices and ports
"add a slice that overlaps with existing state and partial ports": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
paramRemoveSlice: false,
expectedReturnVal: true,
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.4:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.5:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.2.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.2.2:80", IsLocal: true},
},
makeServicePortName("ns1", "svc1", "port-1"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:443"},
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:443"},
&BaseEndpointInfo{Endpoint: "10.0.2.1:443"},
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true},
},
},
},
// test deletions from existing state with partially overlapping slices and ports
"remove a slice that overlaps with existing state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true,
expectedReturnVal: true,
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.2.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.2.2:80", IsLocal: true},
},
makeServicePortName("ns1", "svc1", "port-1"): {
&BaseEndpointInfo{Endpoint: "10.0.2.1:443"},
&BaseEndpointInfo{Endpoint: "10.0.2.2:443", IsLocal: true},
},
},
},
// ensure a removal that has no effect turns into a no-op
"remove a slice that doesn't even exist in current state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
},
// start with all endpoints ready, transition to no endpoints ready
"transition all endpoints to unready state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
expectedReturnVal: true,
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{},
},
// start with no endpoints ready, transition to all endpoints ready
"transition all endpoints to ready state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 2, 1, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
expectedReturnVal: true,
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
},
makeServicePortName("ns1", "svc1", "port-1"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true},
},
},
},
// start with some endpoints ready, transition to more endpoints ready
"transition some endpoints to ready state": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false,
expectedReturnVal: true,
expectedCurrentChange: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.2.1:80", IsLocal: true},
},
makeServicePortName("ns1", "svc1", "port-1"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.2.1:443", IsLocal: true},
},
},
},
}
for name, tc := range testCases {
for _, startingSlice := range tc.startingSlices {
tc.endpointChangeTracker.endpointSliceCache.Update(startingSlice)
}
got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
t.Errorf("[%s] EndpointSliceUpdate return value got: %v, want %v", name, got, tc.expectedReturnVal)
}
if tc.endpointChangeTracker.items == nil {
t.Errorf("[%s] Expected ect.items to not be nil", name)
}
if tc.expectedCurrentChange == nil {
if tc.endpointChangeTracker.items[tc.namespacedName] != nil {
t.Errorf("[%s] Expected ect.items[%s] to be nil", name, tc.namespacedName)
}
} else {
if tc.endpointChangeTracker.items[tc.namespacedName] == nil {
t.Errorf("[%s] Expected ect.items[%s] to not be nil", name, tc.namespacedName)
}
compareEndpointsMapsStr(t, name, tc.endpointChangeTracker.items[tc.namespacedName].current, tc.expectedCurrentChange)
}
}
}
// Test helpers
func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected map[ServicePortName][]*BaseEndpointInfo) {
t.Helper()
compareEndpointsMapsStr(t, string(tci), newMap, expected)
}
func compareEndpointsMapsStr(t *testing.T, testName string, newMap EndpointsMap, expected map[ServicePortName][]*BaseEndpointInfo) {
t.Helper()
if len(newMap) != len(expected) {
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
t.Errorf("[%s] expected %d results, got %d: %v", testName, len(expected), len(newMap), newMap)
}
for x := range expected {
if len(newMap[x]) != len(expected[x]) {
t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(expected[x]), x, len(newMap[x]))
t.Errorf("[%s] expected %d endpoints for %v, got %d", testName, len(expected[x]), x, len(newMap[x]))
t.Logf("Endpoints %+v", newMap[x])
} else {
for i := range expected[x] {
newEp, ok := newMap[x][i].(*BaseEndpointInfo)
@ -1397,7 +1632,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected m
continue
}
if *newEp != *(expected[x][i]) {
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
t.Errorf("[%s] expected new[%v][%d] to be %v, got %v (IsLocal expected %v, got %v)", testName, x, i, expected[x][i], newEp, expected[x][i].IsLocal, newEp.IsLocal)
}
}
}

View File

@ -0,0 +1,240 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"sort"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilnet "k8s.io/utils/net"
)
// EndpointSliceCache is used as a cache of EndpointSlice information.
type EndpointSliceCache struct {
// sliceByServiceMap is the basis of this cache. It contains endpoint slice
// info grouped by service name and endpoint slice name. The first key
// represents a namespaced service name while the second key represents
// an endpoint slice name. Since endpoints can move between slices, we
// require slice specific caching to prevent endpoints being removed from
// the cache when they may have just moved to a different slice.
sliceByServiceMap map[types.NamespacedName]map[string]*endpointSliceInfo
makeEndpointInfo makeEndpointFunc
hostname string
isIPv6Mode *bool
recorder record.EventRecorder
}
// endpointSliceInfo contains just the attributes kube-proxy cares about.
// Used for caching. Intentionally small to limit memory util.
type endpointSliceInfo struct {
Ports []discovery.EndpointPort
Endpoints []*endpointInfo
}
// endpointInfo contains just the attributes kube-proxy cares about.
// Used for caching. Intentionally small to limit memory util.
// Addresses and Topology are copied from EndpointSlice Endpoints.
type endpointInfo struct {
Addresses []string
Topology map[string]string
}
// NewEndpointSliceCache initializes an EndpointSliceCache.
func NewEndpointSliceCache(hostname string, isIPv6Mode *bool, recorder record.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
if makeEndpointInfo == nil {
makeEndpointInfo = standardEndpointInfo
}
return &EndpointSliceCache{
sliceByServiceMap: map[types.NamespacedName]map[string]*endpointSliceInfo{},
hostname: hostname,
isIPv6Mode: isIPv6Mode,
makeEndpointInfo: makeEndpointInfo,
recorder: recorder,
}
}
// standardEndpointInfo is the default makeEndpointFunc.
func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
return ep
}
// Update a slice in the cache.
func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) {
serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice)
// This should never actually happen
if serviceKey.Name == "" || serviceKey.Namespace == "" || sliceKey == "" {
klog.Errorf("Invalid endpoint slice, name and owner reference required %v", endpointSlice)
return
}
esInfo := &endpointSliceInfo{
Ports: endpointSlice.Ports,
Endpoints: []*endpointInfo{},
}
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true {
esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
Addresses: endpoint.Addresses,
Topology: endpoint.Topology,
})
}
}
if _, exists := cache.sliceByServiceMap[serviceKey]; !exists {
cache.sliceByServiceMap[serviceKey] = map[string]*endpointSliceInfo{}
}
cache.sliceByServiceMap[serviceKey][sliceKey] = esInfo
}
// Delete a slice from the cache.
func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) {
serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice)
delete(cache.sliceByServiceMap[serviceKey], sliceKey)
}
// EndpointsMap computes an EndpointsMap for a given service.
func (cache *EndpointSliceCache) EndpointsMap(serviceNN types.NamespacedName) EndpointsMap {
endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN)
return endpointsMapFromEndpointInfo(endpointInfoBySP)
}
// endpointInfoByServicePort groups endpoint info by service port name and address.
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName) map[ServicePortName]map[string]Endpoint {
endpointInfoBySP := map[ServicePortName]map[string]Endpoint{}
for _, sliceInfo := range cache.sliceByServiceMap[serviceNN] {
for _, port := range sliceInfo.Ports {
if port.Name == nil {
klog.Warningf("ignoring port with nil name %v", port)
continue
}
// TODO: handle nil ports to mean "all"
if port.Port == nil || *port.Port == int32(0) {
klog.Warningf("ignoring invalid endpoint port %s", *port.Name)
continue
}
svcPortName := ServicePortName{NamespacedName: serviceNN}
svcPortName.Port = *port.Name
endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
}
}
return endpointInfoBySP
}
// addEndpointsByIP adds endpointInfo for each IP.
func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName, portNum int, endpointsByIP map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
if endpointsByIP == nil {
endpointsByIP = map[string]Endpoint{}
}
// iterate through endpoints to add them to endpointsByIP.
for _, endpoint := range endpoints {
if len(endpoint.Addresses) == 0 {
klog.Warningf("ignoring invalid endpoint port %s with empty addresses", endpoint)
continue
}
// Filter out the incorrect IP version case. Any endpoint port that
// contains incorrect IP version will be ignored.
if cache.isIPv6Mode != nil && utilnet.IsIPv6String(endpoint.Addresses[0]) != *cache.isIPv6Mode {
// Emit event on the corresponding service which had a different IP
// version than the endpoint.
utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], serviceNN.Name, serviceNN.Namespace, "")
continue
}
isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname])
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal)
// This logic ensures we're deduping potential overlapping endpoints
// isLocal should not vary between matching IPs, but if it does, we
// favor a true value here if it exists.
if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal {
endpointsByIP[endpointInfo.IP()] = cache.makeEndpointInfo(endpointInfo)
}
}
return endpointsByIP
}
func (cache *EndpointSliceCache) isLocal(hostname string) bool {
return len(cache.hostname) > 0 && hostname == cache.hostname
}
// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that
// has been grouped by service port and IP.
func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
endpointsMap := EndpointsMap{}
// transform endpointInfoByServicePort into an endpointsMap with sorted IPs.
for svcPortName, endpointInfoByIP := range endpointInfoBySP {
if len(endpointInfoByIP) > 0 {
endpointsMap[svcPortName] = []Endpoint{}
for _, endpointInfo := range endpointInfoByIP {
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo)
}
// Ensure IPs are always returned in the same order to simplify diffing.
sort.Sort(byIP(endpointsMap[svcPortName]))
klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName]))
}
}
return endpointsMap
}
// formatEndpointsList returns a string list converted from an endpoints list.
func formatEndpointsList(endpoints []Endpoint) []string {
var formattedList []string
for _, ep := range endpoints {
formattedList = append(formattedList, ep.String())
}
return formattedList
}
// endpointSliceCacheKeys returns cache keys used for a given EndpointSlice.
func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string) {
if len(endpointSlice.OwnerReferences) == 0 {
klog.Errorf("No owner reference set on endpoint slice: %s", endpointSlice.Name)
return types.NamespacedName{}, endpointSlice.Name
}
if len(endpointSlice.OwnerReferences) > 1 {
klog.Errorf("More than 1 owner reference set on endpoint slice: %s", endpointSlice.Name)
}
ownerRef := endpointSlice.OwnerReferences[0]
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: ownerRef.Name}, endpointSlice.Name
}
// byIP helps sort endpoints by IP
type byIP []Endpoint
func (e byIP) Len() int {
return len(e)
}
func (e byIP) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
func (e byIP) Less(i, j int) bool {
return e[i].IP() < e[j].IP()
}

View File

@ -0,0 +1,239 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"reflect"
"testing"
discovery "k8s.io/api/discovery/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilpointer "k8s.io/utils/pointer"
)
func TestEndpointsMapFromESC(t *testing.T) {
testCases := map[string]struct {
endpointSlices []*discovery.EndpointSlice
hostname string
namespacedName types.NamespacedName
expectedMap map[ServicePortName][]*BaseEndpointInfo
}{
"1 slice, 2 hosts, ports 80,443": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
hostname: "host1",
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false},
},
makeServicePortName("ns1", "svc1", "port-1"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:443", IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.1.2:443", IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.1.3:443", IsLocal: false},
},
},
},
"2 slices, same port": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSlice("svc1", "ns1", 2, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
&BaseEndpointInfo{Endpoint: "10.0.2.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.2.2:80"},
&BaseEndpointInfo{Endpoint: "10.0.2.3:80"},
},
},
},
// 2 slices, with some overlapping endpoints, result should be a union
// of the 2.
"2 overlapping slices, same port": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSlice("svc1", "ns1", 1, 4, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.4:80"},
},
},
},
// 2 slices with all endpoints overlapping, more unready in first than
// second. If an endpoint is marked ready, we add it to the
// EndpointsMap, even if conditions.Ready isn't true for another
// matching endpoint
"2 slices, overlapping endpoints, some endpoints unready in 1 or both": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 10, 3, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSlice("svc1", "ns1", 1, 10, 6, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.10:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.4:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.5:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.7:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.8:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.9:80"},
},
},
},
"2 slices, overlapping endpoints, all unready": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 10, 1, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSlice("svc1", "ns1", 1, 10, 1, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{},
},
"3 slices with different services and namespaces": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSlice("svc2", "ns1", 2, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSlice("svc1", "ns2", 3, 3, 999, []string{}, []*int32{utilpointer.Int32Ptr(80)}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0"): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80"},
&BaseEndpointInfo{Endpoint: "10.0.1.3:80"},
},
},
},
// Ensuring that nil port value will not break things. This will
// represent all ports in the future, but that has not been implemented
// yet.
"Nil port should not break anything": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
hostname: "host1",
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{nil}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{},
},
}
for name, tc := range testCases {
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
for _, endpointSlice := range tc.endpointSlices {
esCache.Update(endpointSlice)
}
compareEndpointsMapsStr(t, name, esCache.EndpointsMap(tc.namespacedName), tc.expectedMap)
}
}
func TestEndpointInfoByServicePort(t *testing.T) {
testCases := map[string]struct {
namespacedName types.NamespacedName
endpointSlices []*discovery.EndpointSlice
hostname string
expectedMap map[ServicePortName]map[string]Endpoint
}{
"simple use case with 3 endpoints": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
hostname: "host1",
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80)}),
},
expectedMap: map[ServicePortName]map[string]Endpoint{
{NamespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, Port: "port-0"}: {
"10.0.1.1": &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false},
"10.0.1.2": &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true},
"10.0.1.3": &BaseEndpointInfo{Endpoint: "10.0.1.3:80", IsLocal: false},
},
},
},
}
for name, tc := range testCases {
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
for _, endpointSlice := range tc.endpointSlices {
esCache.Update(endpointSlice)
}
got := esCache.endpointInfoByServicePort(tc.namespacedName)
if !reflect.DeepEqual(got, tc.expectedMap) {
t.Errorf("[%s] endpointInfoByServicePort does not match. Want: %v, Got: %v", name, tc.expectedMap, got)
}
}
}
func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, offset, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
ipAddressType := discovery.AddressTypeIP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", serviceName, sliceNum),
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}},
},
Ports: []discovery.EndpointPort{},
AddressType: &ipAddressType,
Endpoints: []discovery.Endpoint{},
}
for i, portNum := range portNums {
endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{
Name: utilpointer.StringPtr(fmt.Sprintf("port-%d", i)),
Port: portNum,
})
}
for i := 1; i <= numEndpoints; i++ {
endpoint := discovery.Endpoint{
Addresses: []string{fmt.Sprintf("10.0.%d.%d", offset, i)},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(i%unreadyMod != 0)},
}
if len(hosts) > 0 {
endpoint.Topology = map[string]string{
"kubernetes.io/hostname": hosts[i%len(hosts)],
}
}
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpoint)
}
return endpointSlice
}
func generateEndpointSlice(serviceName, namespace string, sliceNum, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
return generateEndpointSliceWithOffset(serviceName, namespace, sliceNum, sliceNum, numEndpoints, unreadyMod, hosts, portNums)
}

View File

@ -11,6 +11,7 @@ go_library(
srcs = ["proxier.go"],
importpath = "k8s.io/kubernetes/pkg/proxy/iptables",
deps = [
"//pkg/features:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/metrics:go_default_library",
@ -20,8 +21,10 @@ go_library(
"//pkg/util/iptables:go_default_library",
"//pkg/util/sysctl:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
@ -42,9 +45,11 @@ go_test(
"//pkg/util/iptables:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",

View File

@ -32,12 +32,14 @@ import (
"sync/atomic"
"time"
"k8s.io/klog"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metrics"
@ -179,13 +181,14 @@ type Proxier struct {
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating iptables
// with some partial data after kube-proxy restart.
endpointsSynced bool
servicesSynced bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid
// updating iptables with some partial data after kube-proxy restart.
endpointsSynced bool
endpointSlicesSynced bool
servicesSynced bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held.
iptables utiliptables.Interface
@ -286,6 +289,8 @@ func NewProxier(ipt utiliptables.Interface,
return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, ipt.IsIpv6())
}
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
isIPv6 := ipt.IsIpv6()
@ -294,7 +299,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled),
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
@ -500,7 +505,7 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@ -517,7 +522,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
// endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.syncRunner.Run()
proxier.Sync()
}
}
@ -539,6 +544,42 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.syncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
// slice object is observed.
func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
// object is observed.
func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointSlicesSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.mu.Lock()
proxier.endpointSlicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
// portProtoHash takes the ServicePortName and protocol for a service
// returns the associated 16 character hash. This is computed by hashing (sha256)
// then encoding to base32 and truncating to 16 chars. We do this because IPTables

View File

@ -28,7 +28,9 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
@ -250,7 +252,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
}
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
fp.exec = &fexec
for _, tc := range testCases {
@ -363,7 +365,7 @@ func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedNa
const testHostname = "test-hostname"
func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method.
p := &Proxier{
@ -371,7 +373,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled),
iptables: ipt,
clusterCIDR: "10.0.0.0/24",
hostname: testHostname,
@ -575,7 +577,7 @@ func errorf(msg string, rules []iptablestest.Rule, t *testing.T) {
func TestClusterIPReject(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
svcPort := 80
svcPortName := proxy.ServicePortName{
@ -609,7 +611,7 @@ func TestClusterIPReject(t *testing.T) {
func TestClusterIPEndpointsJump(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
svcPort := 80
svcPortName := proxy.ServicePortName{
@ -666,7 +668,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
func TestLoadBalancer(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -726,7 +728,7 @@ func TestLoadBalancer(t *testing.T) {
func TestNodePort(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -784,7 +786,7 @@ func TestNodePort(t *testing.T) {
func TestExternalIPsReject(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := "50.60.70.81"
@ -818,7 +820,7 @@ func TestExternalIPsReject(t *testing.T) {
func TestNodePortReject(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -851,7 +853,7 @@ func TestNodePortReject(t *testing.T) {
func TestOnlyLocalLoadBalancing(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -941,7 +943,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
// set cluster CIDR to empty before test
fp.clusterCIDR = ""
onlyLocalNodePorts(t, fp, ipt)
@ -949,7 +951,7 @@ func TestOnlyLocalNodePortsNoClusterCIDR(t *testing.T) {
func TestOnlyLocalNodePorts(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
onlyLocalNodePorts(t, fp, ipt)
}
@ -1064,7 +1066,7 @@ func addTestPort(array []v1.ServicePort, name string, protocol v1.Protocol, port
func TestBuildServiceMapAddRemove(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
services := []*v1.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
@ -1170,7 +1172,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
func TestBuildServiceMapServiceHeadless(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
makeServiceMap(fp,
makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
@ -1202,7 +1204,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
makeServiceMap(fp,
makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
@ -1228,7 +1230,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
func TestBuildServiceMapServiceUpdate(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@ -2182,7 +2184,7 @@ func Test_updateEndpointsMap(t *testing.T) {
for tci, tc := range testCases {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp := NewFakeProxier(ipt, false)
fp.hostname = nodeName
// First check that after adding all previous versions of endpoints,
@ -2250,4 +2252,120 @@ func Test_updateEndpointsMap(t *testing.T) {
}
}
// The majority of EndpointSlice specific tests are not iptables specific and focus on
// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
// iptables proxier supports translating EndpointSlices to iptables output.
func TestEndpointSliceE2E(t *testing.T) {
expectedIPTablesWithoutSlice := `*filter
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1: has no endpoints" -m -p -d 172.20.1.1/32 --dport 0 -j REJECT
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/24 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -d 10.0.0.0/24 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-3WUAALNGPYZZAWAD - [0:0]
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --set-xmark
-X KUBE-SVC-3WUAALNGPYZZAWAD
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`
expectedIPTablesWithSlice := `*filter
:KUBE-SERVICES - [0:0]
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark -j ACCEPT
-A KUBE-FORWARD -s 10.0.0.0/24 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack pod destination rule" -d 10.0.0.0/24 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
*nat
:KUBE-SERVICES - [0:0]
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-3WUAALNGPYZZAWAD - [0:0]
: - [0:0]
: - [0:0]
: - [0:0]
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --set-xmark
-A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m -p -d 172.20.1.1/32 --dport 0 ! -s 10.0.0.0/24 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1: cluster IP" -m -p -d 172.20.1.1/32 --dport 0 -j KUBE-SVC-3WUAALNGPYZZAWAD
-A KUBE-SVC-3WUAALNGPYZZAWAD -m statistic --mode random --probability 0.33333 -j
-A -s 10.0.1.1/32 -j KUBE-MARK-MASQ
-A -m -p -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SVC-3WUAALNGPYZZAWAD -m statistic --mode random --probability 0.50000 -j
-A -s 10.0.1.2/32 -j KUBE-MARK-MASQ
-A -m -p -j DNAT --to-destination 10.0.1.2:80
-A KUBE-SVC-3WUAALNGPYZZAWAD -j
-A -s 10.0.1.3/32 -j KUBE-MARK-MASQ
-A -m -p -j DNAT --to-destination 10.0.1.3:80
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt, true)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80)}},
},
})
ipAddressType := discovery.AddressTypeIP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
}},
AddressType: &ipAddressType,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node2"},
}, {
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node3"},
}},
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
assert.Equal(t, expectedIPTablesWithSlice, fp.iptablesData.String())
fp.OnEndpointSliceDelete(endpointSlice)
fp.syncProxyRules()
assert.Equal(t, expectedIPTablesWithoutSlice, fp.iptablesData.String())
}
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.

View File

@ -26,12 +26,15 @@ go_test(
"//pkg/util/ipvs:go_default_library",
"//pkg/util/ipvs/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)
@ -47,6 +50,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/proxy/ipvs",
deps = [
"//pkg/features:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/metrics:go_default_library",
@ -58,10 +62,12 @@ go_library(
"//pkg/util/ipvs:go_default_library",
"//pkg/util/sysctl:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",

View File

@ -31,12 +31,15 @@ import (
"time"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metrics"
@ -184,13 +187,14 @@ type Proxier struct {
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating ipvs rules
// with some partial data after kube-proxy restart.
endpointsSynced bool
servicesSynced bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
// corresponding objects are synced after startup. This is used to avoid updating
// ipvs rules with some partial data after kube-proxy restart.
endpointsSynced bool
endpointSlicesSynced bool
servicesSynced bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
@ -406,12 +410,14 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
excludeCIDRs: parseExcludedCIDRs(excludeCIDRs),
@ -745,7 +751,7 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@ -780,6 +786,42 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.syncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
// slice object is observed.
func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
// object is observed.
func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointSlicesSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.mu.Lock()
proxier.endpointSlicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
// EntryInvalidErr indicates if an ipset entry is invalid or not
const EntryInvalidErr = "error adding entry %s to ipset %s"

View File

@ -25,7 +25,9 @@ import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
@ -42,6 +44,7 @@ import (
ipvstest "k8s.io/kubernetes/pkg/util/ipvs/testing"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
utilpointer "k8s.io/utils/pointer"
)
const testHostname = "test-hostname"
@ -112,7 +115,7 @@ func (fake *fakeIPSetVersioner) GetVersion() (string, error) {
return fake.version, fake.err
}
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []*net.IPNet) *Proxier {
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []*net.IPNet, endpointSlicesEnabled bool) *Proxier {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("dummy device have been created"), nil },
@ -132,33 +135,34 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment)
}
return &Proxier{
exec: fexec,
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
excludeCIDRs: excludeCIDRs,
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
clusterCIDR: "10.0.0.0/24",
strictARP: false,
hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
healthChecker: newFakeHealthChecker(),
ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
ipsetList: ipsetList,
nodePortAddresses: make([]string, 0),
networkInterfacer: proxyutiltest.NewFakeNetwork(),
exec: fexec,
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled),
excludeCIDRs: excludeCIDRs,
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
clusterCIDR: "10.0.0.0/24",
strictARP: false,
hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
healthChecker: newFakeHealthChecker(),
ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
ipsetList: ipsetList,
nodePortAddresses: make([]string, 0),
networkInterfacer: proxyutiltest.NewFakeNetwork(),
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
}
}
@ -215,7 +219,7 @@ func TestCleanupLeftovers(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -933,7 +937,7 @@ func TestNodePort(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, test.nodeIPs, nil, false)
fp.nodePortAddresses = test.nodePortAddresses
makeServiceMap(fp, test.services...)
@ -1100,7 +1104,7 @@ func TestClusterIP(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
makeServiceMap(fp, test.services...)
makeEndpointsMap(fp, test.endpoints...)
@ -1120,7 +1124,7 @@ func TestExternalIPsNoEndpoint(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := "50.60.70.81"
@ -1175,7 +1179,7 @@ func TestExternalIPs(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
svcIP := "10.20.30.41"
svcPort := 80
svcExternalIPs := sets.NewString("50.60.70.81", "2012::51", "127.0.0.1")
@ -1693,7 +1697,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
services := []*v1.Service{
makeTestService("somewhere-else", "cluster-ip", func(svc *v1.Service) {
@ -1803,7 +1807,7 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
makeServiceMap(fp,
makeTestService("somewhere-else", "headless", func(svc *v1.Service) {
@ -1842,7 +1846,7 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
makeServiceMap(fp,
makeTestService("somewhere-else", "external-name", func(svc *v1.Service) {
@ -1870,7 +1874,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
servicev1 := makeTestService("somewhere", "some-service", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@ -1954,7 +1958,7 @@ func TestSessionAffinity(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
nodeIP := net.ParseIP("100.101.102.103")
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil, false)
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
@ -2817,7 +2821,7 @@ func Test_updateEndpointsMap(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
fp.hostname = nodeName
// First check that after adding all previous versions of endpoints,
@ -3061,7 +3065,7 @@ func Test_syncService(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
proxier := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
if testCases[i].oldVirtualServer != nil {
@ -3091,7 +3095,7 @@ func buildFakeProxier() (*iptablestest.FakeIPTables, *Proxier) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil)
return ipt, NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
}
func hasJump(rules []iptablestest.Rule, destChain, ipSet string) bool {
@ -3168,7 +3172,7 @@ func TestCleanLegacyService(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"}))
fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3.3.3.0/24", "4.4.4.0/24"}), false)
// All ipvs services that were processed in the latest sync loop.
activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
@ -3274,7 +3278,7 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
// all deleted expect ipvs2
activeServices := map[string]bool{"ipvs2": true}
@ -3368,7 +3372,7 @@ func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) {
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
gtm := NewGracefulTerminationManager(ipvs)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"4.4.4.4/32"}))
fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"4.4.4.4/32"}), false)
fp.gracefuldeleteManager = gtm
vs := &utilipvs.VirtualServer{
@ -3422,7 +3426,7 @@ func TestCleanLegacyService6(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3000::/64", "4000::/64"}))
fp := NewFakeProxier(ipt, ipvs, ipset, nil, parseExcludedCIDRs([]string{"3000::/64", "4000::/64"}), false)
fp.nodeIP = net.ParseIP("::1")
// All ipvs services that were processed in the latest sync loop.
@ -3529,7 +3533,7 @@ func TestMultiPortServiceBindAddr(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, false)
service1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
@ -3626,3 +3630,89 @@ raid10 57344 0 - Live 0xffffffffc0597000`,
})
}
}
// The majority of EndpointSlice specific tests are not ipvs specific and focus on
// the shared EndpointChangeTracker and EndpointSliceCache. This test ensures that the
// ipvs proxier supports translating EndpointSlices to ipvs output.
func TestEndpointSliceE2E(t *testing.T) {
ipt := iptablestest.NewFake()
ipvs := ipvstest.NewFake()
ipset := ipsettest.NewFake(testIPSetVersion)
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, true)
fp.servicesSynced = true
fp.endpointsSynced = true
fp.endpointSlicesSynced = true
// Add initial service
serviceName := "svc1"
namespaceName := "ns1"
fp.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: namespaceName},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: "", TargetPort: intstr.FromInt(80)}},
},
})
// Add initial endpoint slice
ipAddressType := discovery.AddressTypeIP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Port: utilpointer.Int32Ptr(80),
}},
AddressType: &ipAddressType,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.1.1"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": testHostname},
}, {
Addresses: []string{"10.0.1.2"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node2"},
}, {
Addresses: []string{"10.0.1.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node3"},
}},
}
fp.OnEndpointSliceAdd(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice update
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries1 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 1, activeEntries1.Len(), "Expected 1 active entry in KUBE-LOOP-BACK")
assert.Equal(t, true, activeEntries1.Has("10.0.1.1,tcp:80,10.0.1.1"), "Expected activeEntries to reference first (local) pod")
virtualServers1, vsErr1 := ipvs.GetVirtualServers()
assert.Nil(t, vsErr1, "Expected no error getting virtual servers")
assert.Len(t, virtualServers1, 1, "Expected 1 virtual server")
realServers1, rsErr1 := ipvs.GetRealServers(virtualServers1[0])
assert.Nil(t, rsErr1, "Expected no error getting real servers")
assert.Len(t, realServers1, 3, "Expected 3 real servers")
assert.Equal(t, realServers1[0].String(), "10.0.1.1:80")
assert.Equal(t, realServers1[1].String(), "10.0.1.2:80")
assert.Equal(t, realServers1[2].String(), "10.0.1.3:80")
fp.OnEndpointSliceDelete(endpointSlice)
fp.syncProxyRules()
// Ensure that Proxier updates ipvs appropriately after EndpointSlice delete
assert.NotNil(t, fp.ipsetList["KUBE-LOOP-BACK"])
activeEntries2 := fp.ipsetList["KUBE-LOOP-BACK"].activeEntries
assert.Equal(t, 0, activeEntries2.Len(), "Expected 0 active entries in KUBE-LOOP-BACK")
virtualServers2, vsErr2 := ipvs.GetVirtualServers()
assert.Nil(t, vsErr2, "Expected no error getting virtual servers")
assert.Len(t, virtualServers2, 1, "Expected 1 virtual server")
realServers2, rsErr2 := ipvs.GetRealServers(virtualServers2[0])
assert.Nil(t, rsErr2, "Expected no error getting real servers")
assert.Len(t, realServers2, 0, "Expected 0 real servers")
}

View File

@ -396,7 +396,7 @@ func newFakeProxier() *FakeProxier {
serviceMap: make(ServiceMap),
serviceChanges: NewServiceChangeTracker(nil, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil),
endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false),
}
}

View File

@ -20,7 +20,7 @@ import (
"fmt"
"net"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/proxy/config"
)
@ -28,6 +28,7 @@ import (
// Provider is the interface provided by proxier implementations.
type Provider interface {
config.EndpointsHandler
config.EndpointSliceHandler
config.ServiceHandler
// Sync immediately synchronizes the Provider's current state to proxy rules.

View File

@ -83,6 +83,7 @@ go_test(
"//pkg/proxy:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",

View File

@ -26,7 +26,7 @@ import (
"sync/atomic"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
@ -35,6 +35,7 @@ import (
"k8s.io/klog"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/config"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
@ -109,6 +110,9 @@ type asyncRunnerInterface interface {
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
// EndpointSlice support has not been added for this proxier yet.
config.NoopEndpointSliceHandler
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*ServiceInfo

View File

@ -31,6 +31,7 @@ import (
"time"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
@ -1137,6 +1138,14 @@ func TestOnServiceAddChangeMap(t *testing.T) {
}
}
func TestNoopEndpointSlice(t *testing.T) {
p := Proxier{}
p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{})
p.OnEndpointSliceDelete(&discovery.EndpointSlice{})
p.OnEndpointSlicesSynced()
}
func makeFakeExec() *fakeexec.FakeExec {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/apis/config:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/util/async:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -61,6 +62,7 @@ go_test(
"@io_bazel_rules_go//go/platform:windows": [
"//pkg/proxy:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library",

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/util/async"
)
@ -441,6 +442,9 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap, curServices proxySe
// Proxier is an hns based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
// EndpointSlice support has not been added for this proxier yet.
proxyconfig.NoopEndpointSliceHandler
// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since policies were synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,

View File

@ -20,6 +20,7 @@ package winkernel
import (
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/proxy"
@ -331,6 +332,15 @@ func TestCreateLoadBalancer(t *testing.T) {
t.Errorf("%v does not match %v", proxier.serviceMap[svcPortName].hnsID, guid)
}
}
func TestNoopEndpointSlice(t *testing.T) {
p := Proxier{}
p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{})
p.OnEndpointSliceDelete(&discovery.EndpointSlice{})
p.OnEndpointSlicesSynced()
}
func makeNSN(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}

View File

@ -45,6 +45,7 @@ go_test(
"//pkg/proxy:go_default_library",
"//pkg/util/netsh/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",

View File

@ -27,12 +27,13 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/util/netsh"
)
@ -80,6 +81,9 @@ func logTimeout(err error) bool {
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
// EndpointSlice support has not been added for this proxier yet.
config.NoopEndpointSliceHandler
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[ServicePortPortalName]*serviceInfo

View File

@ -30,6 +30,7 @@ import (
"time"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
@ -950,4 +951,12 @@ func TestProxyUpdatePortal(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}
func TestNoopEndpointSlice(t *testing.T) {
p := Proxier{}
p.OnEndpointSliceAdd(&discovery.EndpointSlice{})
p.OnEndpointSliceUpdate(&discovery.EndpointSlice{}, &discovery.EndpointSlice{})
p.OnEndpointSliceDelete(&discovery.EndpointSlice{})
p.OnEndpointSlicesSynced()
}
// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in