Drop broken/no-op proxyconfig.EndpointsHandler implementations

Because the proxy.Provider interface included
proxyconfig.EndpointsHandler, all the backends needed to
implement its methods. But iptables, ipvs, and winkernel implemented
them as no-ops, and metaproxier had an implementation that wouldn't
actually work (because it couldn't handle Services with no active
Endpoints).

Since Endpoints processing in kube-proxy is deprecated (and can't be
re-enabled unless you're using a backend that doesn't support
EndpointSlice), remove proxyconfig.EndpointsHandler from the
definition of proxy.Provider and drop all the useless implementations.
This commit is contained in:
Dan Winship 2021-09-06 15:51:50 -04:00
parent eb729620c5
commit 7f6fbc4482
9 changed files with 8 additions and 240 deletions

View File

@ -751,14 +751,14 @@ func (s *ProxyServer) Run() error {
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
if s.UseEndpointSlices {
if endpointsHandler, ok := s.Proxier.(config.EndpointsHandler); ok && !s.UseEndpointSlices {
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(endpointsHandler)
go endpointsConfig.Run(wait.NeverStop)
} else {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
} else {
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop)
}
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those

View File

@ -47,7 +47,9 @@ type ServiceHandler interface {
}
// EndpointsHandler is an abstract interface of objects which receive
// notifications about endpoints object changes.
// notifications about endpoints object changes. This is not a required
// sub-interface of proxy.Provider, and proxy implementations should
// not implement it unless they can't handle EndpointSlices.
type EndpointsHandler interface {
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.

View File

@ -579,25 +579,6 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.syncProxyRules()
}
// iptables proxier only uses EndpointSlice, the following methods
// exist to implement the Proxier interface but are noops
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {}
// OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {}
// OnEndpointsDelete is called whenever deletion of an existing endpoints
// object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {}
// OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointsSynced() {}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {

View File

@ -2883,7 +2883,6 @@ COMMIT
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
@ -2992,7 +2991,6 @@ COMMIT
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
@ -3056,7 +3054,6 @@ func Test_HealthCheckNodePortWhenTerminating(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
@ -3551,7 +3548,6 @@ COMMIT
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
serviceName := "svc1"
@ -4139,7 +4135,6 @@ COMMIT
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.OnServiceSynced()
fp.OnEndpointsSynced()
fp.OnEndpointSlicesSynced()
fp.OnServiceAdd(testcase.service)

View File

@ -891,21 +891,6 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.syncProxyRules()
}
// The following methods exist to implement the Proxier interface however
// ipvs proxier only uses EndpointSlices so the following are noops
// OnEndpointsAdd is called whenever creation of new endpoints object is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {}
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {}
// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {}
// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointsSynced() {}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {

View File

@ -17,15 +17,11 @@ limitations under the License.
package metaproxier
import (
"fmt"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/config"
utilnet "k8s.io/utils/net"
)
type metaProxier struct {
@ -89,60 +85,6 @@ func (proxier *metaProxier) OnServiceSynced() {
proxier.ipv6Proxier.OnServiceSynced()
}
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.
func (proxier *metaProxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
ipFamily, err := endpointsIPFamily(endpoints)
if err != nil {
klog.V(4).Infof("failed to add endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
return
}
if *ipFamily == v1.IPv4Protocol {
proxier.ipv4Proxier.OnEndpointsAdd(endpoints)
return
}
proxier.ipv6Proxier.OnEndpointsAdd(endpoints)
}
// OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed.
func (proxier *metaProxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
ipFamily, err := endpointsIPFamily(endpoints)
if err != nil {
klog.V(4).Infof("failed to update endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
return
}
if *ipFamily == v1.IPv4Protocol {
proxier.ipv4Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
return
}
proxier.ipv6Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
}
// OnEndpointsDelete is called whenever deletion of an existing
// endpoints object is observed.
func (proxier *metaProxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
ipFamily, err := endpointsIPFamily(endpoints)
if err != nil {
klog.V(4).Infof("failed to delete endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
return
}
if *ipFamily == v1.IPv4Protocol {
proxier.ipv4Proxier.OnEndpointsDelete(endpoints)
return
}
proxier.ipv6Proxier.OnEndpointsDelete(endpoints)
}
// OnEndpointsSynced is called once all the initial event handlers
// were called and the state is fully propagated to local cache.
func (proxier *metaProxier) OnEndpointsSynced() {
proxier.ipv4Proxier.OnEndpointsSynced()
proxier.ipv6Proxier.OnEndpointsSynced()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *metaProxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
@ -189,34 +131,6 @@ func (proxier *metaProxier) OnEndpointSlicesSynced() {
proxier.ipv6Proxier.OnEndpointSlicesSynced()
}
// endpointsIPFamily that returns IPFamily of endpoints or error if
// failed to identify the IP family.
func endpointsIPFamily(endpoints *v1.Endpoints) (*v1.IPFamily, error) {
if len(endpoints.Subsets) == 0 {
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no subsets)")
}
// we only need to work with subset [0],endpoint controller
// ensures that endpoints selected are of the same family.
subset := endpoints.Subsets[0]
if len(subset.Addresses) == 0 {
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no addresses)")
}
// same apply on addresses
address := subset.Addresses[0]
if len(address.IP) == 0 {
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (address has no ip)")
}
ipv4 := v1.IPv4Protocol
ipv6 := v1.IPv6Protocol
if utilnet.IsIPv6String(address.IP) {
return &ipv6, nil
}
return &ipv4, nil
}
// OnNodeAdd is called whenever creation of new node object is observed.
func (proxier *metaProxier) OnNodeAdd(node *v1.Node) {
proxier.ipv4Proxier.OnNodeAdd(node)

View File

@ -1,89 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metaproxier
import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
)
func Test_endpointsIPFamily(t *testing.T) {
ipv4 := v1.IPv4Protocol
ipv6 := v1.IPv6Protocol
tests := []struct {
name string
endpoints *v1.Endpoints
want *v1.IPFamily
wantErr bool
errorMsg string
}{
{
name: "Endpoints No Subsets",
endpoints: &v1.Endpoints{},
want: nil,
wantErr: true,
errorMsg: "failed to identify ipfamily for endpoints (no subsets)",
},
{
name: "Endpoints No Addresses",
endpoints: &v1.Endpoints{Subsets: []v1.EndpointSubset{{NotReadyAddresses: []v1.EndpointAddress{}}}},
want: nil,
wantErr: true,
errorMsg: "failed to identify ipfamily for endpoints (no addresses)",
},
{
name: "Endpoints Address Has No IP",
endpoints: &v1.Endpoints{Subsets: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{{Hostname: "testhost", IP: ""}}}}},
want: nil,
wantErr: true,
errorMsg: "failed to identify ipfamily for endpoints (address has no ip)",
},
{
name: "Endpoints Address IPv4",
endpoints: &v1.Endpoints{Subsets: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}}}},
want: &ipv4,
wantErr: false,
},
{
name: "Endpoints Address IPv6",
endpoints: &v1.Endpoints{Subsets: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{{IP: "2001:db9::2"}}}}},
want: &ipv6,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := endpointsIPFamily(tt.endpoints)
if (err != nil) != tt.wantErr {
t.Errorf("endpointsIPFamily() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err != nil && err.Error() != tt.errorMsg {
t.Errorf("endpointsIPFamily() error = %v, wantErr %v", err, tt.errorMsg)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("endpointsIPFamily() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -28,7 +28,6 @@ import (
// Provider is the interface provided by proxier implementations.
type Provider interface {
config.EndpointsHandler
config.EndpointSliceHandler
config.ServiceHandler
config.NodeHandler

View File

@ -942,25 +942,6 @@ func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
return false
}
// The following methods exist to implement the proxier interface, however
// winkernel proxier only uses EndpointSlice, so the following are noops.
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {}
// OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {}
// OnEndpointsDelete is called whenever deletion of an existing endpoints
// object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {}
// OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointsSynced() {}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {