From 465255425e7f1ab58e312641876f0382167728f7 Mon Sep 17 00:00:00 2001 From: "Khaled Henidak(Kal)" Date: Thu, 29 Aug 2019 23:25:30 +0000 Subject: [PATCH 1/2] create meta-proxy for proxy-mode=ipvs (dualstack) co-authored-by: Lars Ekman --- cmd/kube-proxy/app/server_others.go | 119 +++++++++++++--- pkg/proxy/ipvs/ipset.go | 15 ++ pkg/proxy/ipvs/meta_proxier.go | 203 ++++++++++++++++++++++++++++ pkg/proxy/ipvs/proxier.go | 69 +++++++++- pkg/proxy/ipvs/proxier_test.go | 29 ++++ pkg/proxy/ipvs/safe_ipset.go | 104 ++++++++++++++ 6 files changed, 515 insertions(+), 24 deletions(-) create mode 100644 pkg/proxy/ipvs/meta_proxier.go create mode 100644 pkg/proxy/ipvs/safe_ipset.go diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index ecd34278131..e233d42e7a9 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -24,12 +24,15 @@ import ( "errors" "fmt" "net" + "strings" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme" @@ -46,6 +49,7 @@ import ( utilnode "k8s.io/kubernetes/pkg/util/node" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" "k8s.io/utils/exec" + utilsnet "k8s.io/utils/net" "k8s.io/klog" ) @@ -170,26 +174,61 @@ func newProxyServer( metrics.RegisterMetrics() } else if proxyMode == proxyModeIPVS { klog.V(0).Info("Using ipvs Proxier.") - proxier, err = ipvs.NewProxier( - iptInterface, - ipvsInterface, - ipsetInterface, - utilsysctl.New(), - execer, - config.IPVS.SyncPeriod.Duration, - config.IPVS.MinSyncPeriod.Duration, - config.IPVS.ExcludeCIDRs, - config.IPVS.StrictARP, - config.IPTables.MasqueradeAll, - int(*config.IPTables.MasqueradeBit), - config.ClusterCIDR, - hostname, - nodeIP, - recorder, - healthzServer, - config.IPVS.Scheduler, - config.NodePortAddresses, - ) + if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { + klog.V(0).Info("creating dualStackProxier for ipvs.") + + // Create iptables handlers for both families, one is already created + var ipt [2]utiliptables.Interface + if iptInterface.IsIpv6() { + ipt[1] = iptInterface + ipt[0] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4) + } else { + ipt[0] = iptInterface + ipt[1] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6) + } + + proxier, err = ipvs.NewDualStackProxier( + ipt, + ipvsInterface, + ipsetInterface, + utilsysctl.New(), + execer, + config.IPVS.SyncPeriod.Duration, + config.IPVS.MinSyncPeriod.Duration, + config.IPVS.ExcludeCIDRs, + config.IPVS.StrictARP, + config.IPTables.MasqueradeAll, + int(*config.IPTables.MasqueradeBit), + cidrTuple(config.ClusterCIDR), + hostname, + nodeIPTuple(config.BindAddress), + recorder, + healthzServer, + config.IPVS.Scheduler, + config.NodePortAddresses, + ) + } else { + proxier, err = ipvs.NewProxier( + iptInterface, + ipvsInterface, + ipsetInterface, + utilsysctl.New(), + execer, + config.IPVS.SyncPeriod.Duration, + config.IPVS.MinSyncPeriod.Duration, + config.IPVS.ExcludeCIDRs, + config.IPVS.StrictARP, + config.IPTables.MasqueradeAll, + int(*config.IPTables.MasqueradeBit), + config.ClusterCIDR, + hostname, + nodeIP, + recorder, + healthzServer, + config.IPVS.Scheduler, + config.NodePortAddresses, + ) + } if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } @@ -238,6 +277,46 @@ func newProxyServer( }, nil } +// cidrTuple takes a comma separated list of CIDRs and return a tuple (ipv4cidr,ipv6cidr) +// The returned tuple is guaranteed to have the order (ipv4,ipv6) and if no cidr from a family is found an +// empty string "" is inserted. +func cidrTuple(cidrList string) [2]string { + cidrs := [2]string{"", ""} + foundIPv4 := false + foundIPv6 := false + + for _, cidr := range strings.Split(cidrList, ",") { + if utilsnet.IsIPv6CIDRString(cidr) && !foundIPv6 { + cidrs[1] = cidr + foundIPv6 = true + } else if !foundIPv4 { + cidrs[0] = cidr + foundIPv4 = true + } + if foundIPv6 && foundIPv4 { + break + } + } + + return cidrs +} + +// nodeIPTuple takes an addresses and return a tuple (ipv4,ipv6) +// The returned tuple is guaranteed to have the order (ipv4,ipv6). The address NOT of the passed address +// will have "any" address (0.0.0.0 or ::) inserted. +func nodeIPTuple(bindAddress string) [2]net.IP { + nodes := [2]net.IP{net.IPv4zero, net.IPv6zero} + + adr := net.ParseIP(bindAddress) + if utilsnet.IsIPv6(adr) { + nodes[1] = adr + } else { + nodes[0] = adr + } + + return nodes +} + func getProxyMode(proxyMode string, khandle ipvs.KernelHandler, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string { switch proxyMode { case proxyModeUserspace: diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 636c7c95282..bed64ff75c0 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -23,6 +23,7 @@ import ( "fmt" "k8s.io/klog" + "strings" ) const ( @@ -102,6 +103,20 @@ func NewIPSet(handle utilipset.Interface, name string, setType utilipset.Type, i hashFamily := utilipset.ProtocolFamilyIPV4 if isIPv6 { hashFamily = utilipset.ProtocolFamilyIPV6 + // In dual-stack both ipv4 and ipv6 ipset's can co-exist. To + // ensure unique names the prefix for ipv6 is changed from + // "KUBE-" to "KUBE-6-". The "KUBE-" prefix is kept for + // backward compatibility. The maximum name length of an ipset + // is 31 characters which must be taken into account. The + // ipv4 names are not altered to minimize the risk for + // problems on upgrades. + if strings.HasPrefix(name, "KUBE-") { + name = strings.Replace(name, "KUBE-", "KUBE-6-", 1) + if len(name) > 31 { + klog.Warningf("ipset name truncated; [%s] -> [%s]", name, name[:31]) + name = name[:31] + } + } } set := &IPSet{ IPSet: utilipset.IPSet{ diff --git a/pkg/proxy/ipvs/meta_proxier.go b/pkg/proxy/ipvs/meta_proxier.go new file mode 100644 index 00000000000..45083c61909 --- /dev/null +++ b/pkg/proxy/ipvs/meta_proxier.go @@ -0,0 +1,203 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "fmt" + + "k8s.io/api/core/v1" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/proxy" + utilnet "k8s.io/utils/net" + + discovery "k8s.io/api/discovery/v1alpha1" +) + +type metaProxier struct { + ipv4Proxier proxy.Provider + ipv6Proxier proxy.Provider +} + +// NewMetaProxier returns a dual-stack "meta-proxier". Proxier API +// calls will be dispatched to the ProxyProvider instances depending +// on address family. +func NewMetaProxier(ipv4Proxier, ipv6Proxier proxy.Provider) proxy.Provider { + return proxy.Provider(&metaProxier{ + ipv4Proxier: ipv4Proxier, + ipv6Proxier: ipv6Proxier, + }) +} + +// Sync immediately synchronizes the ProxyProvider's current state to +// proxy rules. +func (proxier *metaProxier) Sync() { + proxier.ipv4Proxier.Sync() + proxier.ipv6Proxier.Sync() +} + +// SyncLoop runs periodic work. This is expected to run as a +// goroutine or as the main loop of the app. It does not return. +func (proxier *metaProxier) SyncLoop() { + go proxier.ipv6Proxier.SyncLoop() // Use go-routine here! + proxier.ipv4Proxier.SyncLoop() // never returns +} + +// OnServiceAdd is called whenever creation of new service object is observed. +func (proxier *metaProxier) OnServiceAdd(service *v1.Service) { + if *(service.Spec.IPFamily) == v1.IPv4Protocol { + proxier.ipv4Proxier.OnServiceAdd(service) + return + } + proxier.ipv6Proxier.OnServiceAdd(service) +} + +// OnServiceUpdate is called whenever modification of an existing +// service object is observed. +func (proxier *metaProxier) OnServiceUpdate(oldService, service *v1.Service) { + // IPFamily is immutable, hence we only need to check on the new service + if *(service.Spec.IPFamily) == v1.IPv4Protocol { + proxier.ipv4Proxier.OnServiceUpdate(oldService, service) + return + } + + proxier.ipv6Proxier.OnServiceUpdate(oldService, service) +} + +// OnServiceDelete is called whenever deletion of an existing service +// object is observed. +func (proxier *metaProxier) OnServiceDelete(service *v1.Service) { + if *(service.Spec.IPFamily) == v1.IPv4Protocol { + proxier.ipv4Proxier.OnServiceDelete(service) + return + } + proxier.ipv6Proxier.OnServiceDelete(service) +} + +// OnServiceSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (proxier *metaProxier) OnServiceSynced() { + proxier.ipv4Proxier.OnServiceSynced() + proxier.ipv6Proxier.OnServiceSynced() +} + +// OnEndpointsAdd is called whenever creation of new endpoints object +// is observed. +func (proxier *metaProxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + ipFamily, err := endpointsIPFamily(endpoints) + if err != nil { + klog.Warningf("failed to add endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err) + return + } + if *ipFamily == v1.IPv4Protocol { + proxier.ipv4Proxier.OnEndpointsAdd(endpoints) + return + } + proxier.ipv6Proxier.OnEndpointsAdd(endpoints) +} + +// OnEndpointsUpdate is called whenever modification of an existing +// endpoints object is observed. +func (proxier *metaProxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + ipFamily, err := endpointsIPFamily(endpoints) + if err != nil { + klog.Warningf("failed to update endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err) + return + } + + if *ipFamily == v1.IPv4Protocol { + proxier.ipv4Proxier.OnEndpointsUpdate(oldEndpoints, endpoints) + return + } + proxier.ipv6Proxier.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +// OnEndpointsDelete is called whenever deletion of an existing +// endpoints object is observed. +func (proxier *metaProxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + ipFamily, err := endpointsIPFamily(endpoints) + if err != nil { + klog.Warningf("failed to delete endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err) + return + } + + if *ipFamily == v1.IPv4Protocol { + proxier.ipv4Proxier.OnEndpointsDelete(endpoints) + return + } + proxier.ipv6Proxier.OnEndpointsDelete(endpoints) +} + +// OnEndpointsSynced is called once all the initial event handlers +// were called and the state is fully propagated to local cache. +func (proxier *metaProxier) OnEndpointsSynced() { + proxier.ipv4Proxier.OnEndpointsSynced() + proxier.ipv6Proxier.OnEndpointsSynced() +} + +// TODO: (khenidak) implement EndpointSlice handling + +// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object +// is observed. +func (proxier *metaProxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { + // noop +} + +// OnEndpointSliceUpdate is called whenever modification of an existing endpoint +// slice object is observed. +func (proxier *metaProxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) { + //noop +} + +// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice +// object is observed. +func (proxier *metaProxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { + //noop +} + +// OnEndpointSlicesSynced is called once all the initial event handlers were +// called and the state is fully propagated to local cache. +func (proxier *metaProxier) OnEndpointSlicesSynced() { + //noop +} + +// endpointsIPFamily that returns IPFamily of endpoints or error if +// failed to identify the IP family. +func endpointsIPFamily(endpoints *v1.Endpoints) (*v1.IPFamily, error) { + if len(endpoints.Subsets) == 0 { + return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no subsets)") + } + + // we only need to work with subset [0],endpoint controller + // ensures that endpoints selected are of the same family. + subset := endpoints.Subsets[0] + if len(subset.Addresses) == 0 { + return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no addresses)") + } + // same apply on addresses + address := subset.Addresses[0] + if len(address.IP) == 0 { + return nil, fmt.Errorf("failed to identify ipfamily for endpoints (address has no ip)") + } + + ipv4 := v1.IPv4Protocol + ipv6 := v1.IPv6Protocol + if utilnet.IsIPv6String(address.IP) { + return &ipv6, nil + } + + return &ipv4, nil +} diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 8bd805b8b87..c344806c3fc 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -471,6 +471,62 @@ func NewProxier(ipt utiliptables.Interface, return proxier, nil } +// NewDualStackProxier returns a new Proxier for dual-stack operation +func NewDualStackProxier( + ipt [2]utiliptables.Interface, + ipvs utilipvs.Interface, + ipset utilipset.Interface, + sysctl utilsysctl.Interface, + exec utilexec.Interface, + syncPeriod time.Duration, + minSyncPeriod time.Duration, + excludeCIDRs []string, + strictARP bool, + masqueradeAll bool, + masqueradeBit int, + clusterCIDR [2]string, + hostname string, + nodeIP [2]net.IP, + recorder record.EventRecorder, + healthzServer healthcheck.HealthzUpdater, + scheduler string, + nodePortAddresses []string, +) (proxy.Provider, error) { + + safeIpset := newSafeIpset(ipset) + + // Create an ipv4 instance of the single-stack proxier + ipv4Proxier, err := NewProxier(ipt[0], ipvs, safeIpset, sysctl, + exec, syncPeriod, minSyncPeriod, filterCIDRs(false, excludeCIDRs), strictARP, + masqueradeAll, masqueradeBit, clusterCIDR[0], hostname, nodeIP[0], + recorder, healthzServer, scheduler, nodePortAddresses) + if err != nil { + return nil, fmt.Errorf("unable to create ipv4 proxier: %v", err) + } + + ipv6Proxier, err := NewProxier(ipt[1], ipvs, safeIpset, sysctl, + exec, syncPeriod, minSyncPeriod, filterCIDRs(true, excludeCIDRs), strictARP, + masqueradeAll, masqueradeBit, clusterCIDR[1], hostname, nodeIP[1], + nil, nil, scheduler, nodePortAddresses) + if err != nil { + return nil, fmt.Errorf("unable to create ipv6 proxier: %v", err) + } + + // Return a meta-proxier that dispatch calls between the two + // single-stack proxier instances + return NewMetaProxier(ipv4Proxier, ipv6Proxier), nil +} + +func filterCIDRs(wantIPv6 bool, cidrs []string) []string { + var filteredCIDRs []string + for _, cidr := range cidrs { + if utilnet.IsIPv6CIDRString(cidr) == wantIPv6 { + filteredCIDRs = append(filteredCIDRs, cidr) + } + } + return filteredCIDRs +} + // internal struct for string service information type serviceInfo struct { *proxy.BaseServiceInfo @@ -1478,7 +1534,7 @@ func (proxier *Proxier) writeIptablesRules() { } args = append(args, "-m", "comment", "--comment", proxier.ipsetList[set.name].getComment(), - "-m", "set", "--match-set", set.name, + "-m", "set", "--match-set", proxier.ipsetList[set.name].Name, set.matchType, ) writeLine(proxier.natRules, append(args, "-j", set.to)...) @@ -1489,7 +1545,7 @@ func (proxier *Proxier) writeIptablesRules() { args = append(args[:0], "-A", string(kubeServicesChain), "-m", "comment", "--comment", proxier.ipsetList[kubeClusterIPSet].getComment(), - "-m", "set", "--match-set", kubeClusterIPSet, + "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, ) if proxier.masqueradeAll { writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) @@ -1517,7 +1573,7 @@ func (proxier *Proxier) writeIptablesRules() { args = append(args[:0], "-A", string(kubeServicesChain), "-m", "comment", "--comment", proxier.ipsetList[kubeExternalIPSet].getComment(), - "-m", "set", "--match-set", kubeExternalIPSet, + "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, "dst,dst", ) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) @@ -1611,7 +1667,7 @@ func (proxier *Proxier) acceptIPVSTraffic() { } writeLine(proxier.natRules, []string{ "-A", string(kubeServicesChain), - "-m", "set", "--match-set", set, matchType, + "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType, "-j", "ACCEPT", }...) } @@ -1845,11 +1901,16 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) { + isIPv6 := utilnet.IsIPv6(proxier.nodeIP) for cs := range currentServices { svc := currentServices[cs] if proxier.isIPInExcludeCIDRs(svc.Address) { continue } + if utilnet.IsIPv6(svc.Address) != isIPv6 { + // Not our family + continue + } if _, ok := activeServices[cs]; !ok { klog.V(4).Infof("Delete service %s", svc.String()) if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil { diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 7b5600c09f9..324bcf26fc2 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -3716,3 +3716,32 @@ func TestEndpointSliceE2E(t *testing.T) { assert.Nil(t, rsErr2, "Expected no error getting real servers") assert.Len(t, realServers2, 0, "Expected 0 real servers") } +func TestFilterCIDRs(t *testing.T) { + var cidrList []string + var cidrs []string + var expected []string + cidrs = filterCIDRs(true, []string{}) + if len(cidrs) > 0 { + t.Errorf("An empty list produces a non-empty return %v", cidrs) + } + + cidrList = []string{"1000::/64", "10.0.0.0/16", "11.0.0.0/16", "2000::/64"} + expected = []string{"1000::/64", "2000::/64"} + cidrs = filterCIDRs(true, cidrList) + if !reflect.DeepEqual(cidrs, expected) { + t.Errorf("cidrs %v is not expected %v", cidrs, expected) + } + + expected = []string{"10.0.0.0/16", "11.0.0.0/16"} + cidrs = filterCIDRs(false, cidrList) + if !reflect.DeepEqual(cidrs, expected) { + t.Errorf("cidrs %v is not expected %v", cidrs, expected) + } + + cidrList = []string{"1000::/64", "2000::/64"} + expected = []string{} + cidrs = filterCIDRs(false, cidrList) + if len(cidrs) > 0 { + t.Errorf("cidrs %v is not expected %v", cidrs, expected) + } +} diff --git a/pkg/proxy/ipvs/safe_ipset.go b/pkg/proxy/ipvs/safe_ipset.go new file mode 100644 index 00000000000..2be9900d96d --- /dev/null +++ b/pkg/proxy/ipvs/safe_ipset.go @@ -0,0 +1,104 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "sync" + + "k8s.io/kubernetes/pkg/util/ipset" +) + +type safeIpset struct { + ipset ipset.Interface + mu sync.Mutex +} + +func newSafeIpset(ipset ipset.Interface) ipset.Interface { + return &safeIpset{ + ipset: ipset, + } +} + +// FlushSet deletes all entries from a named set. +func (s *safeIpset) FlushSet(set string) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.FlushSet(set) +} + +// DestroySet deletes a named set. +func (s *safeIpset) DestroySet(set string) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.DestroySet(set) +} + +// DestroyAllSets deletes all sets. +func (s *safeIpset) DestroyAllSets() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.DestroyAllSets() +} + +// CreateSet creates a new set. It will ignore error when the set already exists if ignoreExistErr=true. +func (s *safeIpset) CreateSet(set *ipset.IPSet, ignoreExistErr bool) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.CreateSet(set, ignoreExistErr) +} + +// AddEntry adds a new entry to the named set. It will ignore error when the entry already exists if ignoreExistErr=true. +func (s *safeIpset) AddEntry(entry string, set *ipset.IPSet, ignoreExistErr bool) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.AddEntry(entry, set, ignoreExistErr) +} + +// DelEntry deletes one entry from the named set +func (s *safeIpset) DelEntry(entry string, set string) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.DelEntry(entry, set) +} + +// Test test if an entry exists in the named set +func (s *safeIpset) TestEntry(entry string, set string) (bool, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.TestEntry(entry, set) +} + +// ListEntries lists all the entries from a named set +func (s *safeIpset) ListEntries(set string) ([]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.ListEntries(set) +} + +// ListSets list all set names from kernel +func (s *safeIpset) ListSets() ([]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.ListSets() +} + +// GetVersion returns the "X.Y" version string for ipset. +func (s *safeIpset) GetVersion() (string, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.ipset.GetVersion() +} From ef75723564397f7c3bf423553f1c98eff72f58df Mon Sep 17 00:00:00 2001 From: "Khaled Henidak(Kal)" Date: Thu, 29 Aug 2019 23:37:25 +0000 Subject: [PATCH 2/2] ipvs dualstack: generated items co-authored-by: Lars Ekman --- cmd/kube-proxy/app/BUILD | 10 ++++++++++ pkg/proxy/ipvs/BUILD | 2 ++ 2 files changed, 12 insertions(+) diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index d9b3759341d..f81a1dee004 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -79,6 +79,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:darwin": [ "//pkg/proxy/metrics:go_default_library", @@ -86,6 +87,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:dragonfly": [ "//pkg/proxy/metrics:go_default_library", @@ -93,6 +95,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:freebsd": [ "//pkg/proxy/metrics:go_default_library", @@ -100,6 +103,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:linux": [ "//pkg/proxy/metrics:go_default_library", @@ -107,6 +111,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:nacl": [ "//pkg/proxy/metrics:go_default_library", @@ -114,6 +119,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:netbsd": [ "//pkg/proxy/metrics:go_default_library", @@ -121,6 +127,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:openbsd": [ "//pkg/proxy/metrics:go_default_library", @@ -128,6 +135,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:plan9": [ "//pkg/proxy/metrics:go_default_library", @@ -135,6 +143,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:solaris": [ "//pkg/proxy/metrics:go_default_library", @@ -142,6 +151,7 @@ go_library( "//pkg/util/node:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//vendor/k8s.io/utils/net:go_default_library", ], "@io_bazel_rules_go//go/platform:windows": [ "//pkg/proxy/winkernel:go_default_library", diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 741dbc07af7..0da921e54c6 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -43,10 +43,12 @@ go_library( srcs = [ "graceful_termination.go", "ipset.go", + "meta_proxier.go", "netlink.go", "netlink_linux.go", "netlink_unsupported.go", "proxier.go", + "safe_ipset.go", ], importpath = "k8s.io/kubernetes/pkg/proxy/ipvs", deps = [