mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 04:27:54 +00:00
replace iptree on the repairip controller
This commit is contained in:
parent
55c9b58e48
commit
f06b355daf
118
pkg/api/servicecidr/servicecidr.go
Normal file
118
pkg/api/servicecidr/servicecidr.go
Normal file
@ -0,0 +1,118 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package servicecidr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
|
||||
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
networkinglisters "k8s.io/client-go/listers/networking/v1alpha1"
|
||||
)
|
||||
|
||||
// ContainsIP return the list of ServiceCIDR that contains the IP address passed as argument
|
||||
func ContainsIP(serviceCIDRLister networkinglisters.ServiceCIDRLister, ip net.IP) []*networkingv1alpha1.ServiceCIDR {
|
||||
address := IPToAddr(ip)
|
||||
return ContainsAddress(serviceCIDRLister, address)
|
||||
}
|
||||
|
||||
// ContainsAddress return the list of ServiceCIDR that contains the address passed as argument
|
||||
func ContainsAddress(serviceCIDRLister networkinglisters.ServiceCIDRLister, address netip.Addr) []*networkingv1alpha1.ServiceCIDR {
|
||||
result := []*networkingv1alpha1.ServiceCIDR{}
|
||||
serviceCIDRList, err := serviceCIDRLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return result
|
||||
}
|
||||
|
||||
for _, serviceCIDR := range serviceCIDRList {
|
||||
for _, cidr := range serviceCIDR.Spec.CIDRs {
|
||||
if prefix, err := netip.ParsePrefix(cidr); err == nil { // it can not fail since is already validated
|
||||
if prefixContainsIP(prefix, address) {
|
||||
result = append(result, serviceCIDR)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// prefixContainsIP returns true if the given IP is contained with the prefix,
|
||||
// is not the network address and also, if IPv4, is not the broadcast address.
|
||||
// This is required (rather than just `prefix.Contains(ip)`) because a ServiceCIDR
|
||||
// covering prefix will not allocate those IPs, so a service with one of those IPs
|
||||
// can't belong to that ServiceCIDR.
|
||||
func prefixContainsIP(prefix netip.Prefix, ip netip.Addr) bool {
|
||||
// if the IP is the network address is not contained
|
||||
if prefix.Masked().Addr() == ip {
|
||||
return false
|
||||
}
|
||||
// the broadcast address is not considered contained for IPv4
|
||||
if ip.Is4() {
|
||||
ipLast, err := broadcastAddress(prefix)
|
||||
if err != nil || ipLast == ip {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return prefix.Contains(ip)
|
||||
}
|
||||
|
||||
// broadcastAddress returns the broadcast address of the subnet
|
||||
// The broadcast address is obtained by setting all the host bits
|
||||
// in a subnet to 1.
|
||||
// network 192.168.0.0/24 : subnet bits 24 host bits 32 - 24 = 8
|
||||
// broadcast address 192.168.0.255
|
||||
func broadcastAddress(subnet netip.Prefix) (netip.Addr, error) {
|
||||
base := subnet.Masked().Addr()
|
||||
bytes := base.AsSlice()
|
||||
// get all the host bits from the subnet
|
||||
n := 8*len(bytes) - subnet.Bits()
|
||||
// set all the host bits to 1
|
||||
for i := len(bytes) - 1; i >= 0 && n > 0; i-- {
|
||||
if n >= 8 {
|
||||
bytes[i] = 0xff
|
||||
n -= 8
|
||||
} else {
|
||||
mask := ^uint8(0) >> (8 - n)
|
||||
bytes[i] |= mask
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
addr, ok := netip.AddrFromSlice(bytes)
|
||||
if !ok {
|
||||
return netip.Addr{}, fmt.Errorf("invalid address %v", bytes)
|
||||
}
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
// IPToAddr converts a net.IP to a netip.Addr
|
||||
// if the net.IP is not valid it returns an empty netip.Addr{}
|
||||
func IPToAddr(ip net.IP) netip.Addr {
|
||||
// https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format
|
||||
// so we have to check the IP family to return exactly the format that we want
|
||||
// address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns
|
||||
// an address like ::ffff:192.168.0.1/32
|
||||
bytes := ip.To4()
|
||||
if bytes == nil {
|
||||
bytes = ip.To16()
|
||||
}
|
||||
// AddrFromSlice returns Addr{}, false if the input is invalid.
|
||||
address, _ := netip.AddrFromSlice(bytes)
|
||||
return address
|
||||
}
|
365
pkg/api/servicecidr/servicecidr_test.go
Normal file
365
pkg/api/servicecidr/servicecidr_test.go
Normal file
@ -0,0 +1,365 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package servicecidr
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
networkinglisters "k8s.io/client-go/listers/networking/v1alpha1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
func newServiceCIDR(name, primary, secondary string) *networkingv1alpha1.ServiceCIDR {
|
||||
serviceCIDR := &networkingv1alpha1.ServiceCIDR{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
Spec: networkingv1alpha1.ServiceCIDRSpec{},
|
||||
}
|
||||
serviceCIDR.Spec.CIDRs = append(serviceCIDR.Spec.CIDRs, primary)
|
||||
if secondary != "" {
|
||||
serviceCIDR.Spec.CIDRs = append(serviceCIDR.Spec.CIDRs, secondary)
|
||||
}
|
||||
return serviceCIDR
|
||||
}
|
||||
|
||||
func TestContainsAddress(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
serviceCIDRs []*networkingv1alpha1.ServiceCIDR
|
||||
address netip.Addr
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv4 address contained",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("10.0.0.1"),
|
||||
want: []string{"kubernetes"},
|
||||
},
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv4 address broadcast",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("10.0.0.255"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv4 address base",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("10.0.0.0"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv4 address out of range",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("192.0.0.1"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv6 address contained",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("2001:db8::2:3"),
|
||||
want: []string{"kubernetes"},
|
||||
},
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv6 address broadcast",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("2001:db8::ffff:ffff"),
|
||||
want: []string{"kubernetes"},
|
||||
},
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv6 address base",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("2001:db8::"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "only one ServiceCIDR and IPv6 address out of range",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
},
|
||||
address: netip.MustParseAddr("2002:1:2:3::2"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and IPv4 address contained",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("10.0.0.1"),
|
||||
want: []string{"kubernetes", "secondary"},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and IPv4 address broadcast",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("10.0.0.255"),
|
||||
want: []string{"secondary"},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and IPv4 address base",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("10.0.0.0"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and IPv4 address out of range",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("192.0.0.1"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and IPv6 address contained",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("2001:db8::2:3"),
|
||||
want: []string{"kubernetes", "secondary"},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and address broadcast",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("2001:db8::ffff:ffff"),
|
||||
want: []string{"kubernetes", "secondary"},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and address base",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("2001:db8::"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "two ServiceCIDR and address out of range",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("2002:1:2:3::2"),
|
||||
want: []string{},
|
||||
},
|
||||
{
|
||||
name: "multiple ServiceCIDR match with overlap",
|
||||
serviceCIDRs: []*networkingv1alpha1.ServiceCIDR{
|
||||
newServiceCIDR("kubernetes", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("kubernetes2", "10.0.0.0/24", "2001:db8::/96"),
|
||||
newServiceCIDR("secondary", "10.0.0.0/16", "2001:db8::/64"),
|
||||
},
|
||||
address: netip.MustParseAddr("10.0.0.2"),
|
||||
want: []string{"kubernetes", "kubernetes2", "secondary"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
for _, serviceCIDR := range tt.serviceCIDRs {
|
||||
err := indexer.Add(serviceCIDR)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
}
|
||||
lister := networkinglisters.NewServiceCIDRLister(indexer)
|
||||
got := []string{}
|
||||
for _, serviceCIDR := range ContainsAddress(lister, tt.address) {
|
||||
got = append(got, serviceCIDR.Name)
|
||||
}
|
||||
// sort slices to make the order predictable and avoid flakiness
|
||||
sort.Strings(got)
|
||||
sort.Strings(tt.want)
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("ContainsAddress() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_PrefixContainIP(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
prefix netip.Prefix
|
||||
ip netip.Addr
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "IPv4 contains",
|
||||
prefix: netip.MustParsePrefix("192.168.0.0/24"),
|
||||
ip: netip.MustParseAddr("192.168.0.1"),
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "IPv4 network address",
|
||||
prefix: netip.MustParsePrefix("192.168.0.0/24"),
|
||||
ip: netip.MustParseAddr("192.168.0.0"),
|
||||
},
|
||||
{
|
||||
name: "IPv4 broadcast address",
|
||||
prefix: netip.MustParsePrefix("192.168.0.0/24"),
|
||||
ip: netip.MustParseAddr("192.168.0.255"),
|
||||
},
|
||||
{
|
||||
name: "IPv4 does not contain",
|
||||
prefix: netip.MustParsePrefix("192.168.0.0/24"),
|
||||
ip: netip.MustParseAddr("192.168.1.2"),
|
||||
},
|
||||
{
|
||||
name: "IPv6 contains",
|
||||
prefix: netip.MustParsePrefix("2001:db2::/96"),
|
||||
ip: netip.MustParseAddr("2001:db2::1"),
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "IPv6 network address",
|
||||
prefix: netip.MustParsePrefix("2001:db2::/96"),
|
||||
ip: netip.MustParseAddr("2001:db2::"),
|
||||
},
|
||||
{
|
||||
name: "IPv6 broadcast address",
|
||||
prefix: netip.MustParsePrefix("2001:db2::/96"),
|
||||
ip: netip.MustParseAddr("2001:db2::ffff:ffff"),
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "IPv6 does not contain",
|
||||
prefix: netip.MustParsePrefix("2001:db2::/96"),
|
||||
ip: netip.MustParseAddr("2001:db2:1:2:3::1"),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := prefixContainsIP(tt.prefix, tt.ip); got != tt.want {
|
||||
t.Errorf("prefixContainIP() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIPToAddr(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ip string
|
||||
want netip.Addr
|
||||
}{
|
||||
{
|
||||
name: "IPv4",
|
||||
ip: "192.168.2.2",
|
||||
want: netip.MustParseAddr("192.168.2.2"),
|
||||
},
|
||||
{
|
||||
name: "IPv6",
|
||||
ip: "2001:db8::2",
|
||||
want: netip.MustParseAddr("2001:db8::2"),
|
||||
},
|
||||
{
|
||||
name: "IPv4 in IPv6",
|
||||
ip: "::ffff:192.168.0.1",
|
||||
want: netip.MustParseAddr("192.168.0.1"),
|
||||
},
|
||||
{
|
||||
name: "invalid",
|
||||
ip: "invalid_ip",
|
||||
want: netip.Addr{},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ip := netutils.ParseIPSloppy(tt.ip)
|
||||
if got := IPToAddr(ip); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("IPToAddr() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcastAddress(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
subnet netip.Prefix
|
||||
want netip.Addr
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "emty subnet",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "IPv4 even mask",
|
||||
subnet: netip.MustParsePrefix("192.168.0.0/24"),
|
||||
want: netip.MustParseAddr("192.168.0.255"),
|
||||
},
|
||||
{
|
||||
name: "IPv4 odd mask",
|
||||
subnet: netip.MustParsePrefix("192.168.0.0/23"),
|
||||
want: netip.MustParseAddr("192.168.1.255"),
|
||||
},
|
||||
{
|
||||
name: "IPv6 even mask",
|
||||
subnet: netip.MustParsePrefix("fd00:1:2:3::/64"),
|
||||
want: netip.MustParseAddr("fd00:1:2:3:ffff:ffff:ffff:ffff"),
|
||||
},
|
||||
{
|
||||
name: "IPv6 odd mask",
|
||||
subnet: netip.MustParsePrefix("fd00:1:2:3::/57"),
|
||||
want: netip.MustParseAddr("fd00:1:2:007f:ffff:ffff:ffff:ffff"),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := broadcastAddress(tt.subnet)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("BroadcastAddress() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("BroadcastAddress() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -20,8 +20,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@ -42,9 +40,9 @@ import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/api/servicecidr"
|
||||
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
|
||||
"k8s.io/kubernetes/pkg/util/iptree"
|
||||
"k8s.io/utils/clock"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
@ -100,14 +98,10 @@ type RepairIPAddress struct {
|
||||
ipAddressLister networkinglisters.IPAddressLister
|
||||
ipAddressSynced cache.InformerSynced
|
||||
|
||||
cidrQueue workqueue.TypedRateLimitingInterface[string]
|
||||
svcQueue workqueue.TypedRateLimitingInterface[string]
|
||||
ipQueue workqueue.TypedRateLimitingInterface[string]
|
||||
workerLoopPeriod time.Duration
|
||||
|
||||
muTree sync.Mutex
|
||||
tree *iptree.Tree[string]
|
||||
|
||||
broadcaster events.EventBroadcaster
|
||||
recorder events.EventRecorder
|
||||
clock clock.Clock
|
||||
@ -132,10 +126,6 @@ func NewRepairIPAddress(interval time.Duration,
|
||||
serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced,
|
||||
ipAddressLister: ipAddressInformer.Lister(),
|
||||
ipAddressSynced: ipAddressInformer.Informer().HasSynced,
|
||||
cidrQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[string](),
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "servicecidrs"},
|
||||
),
|
||||
svcQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
|
||||
workqueue.DefaultTypedControllerRateLimiter[string](),
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "services"},
|
||||
@ -144,7 +134,6 @@ func NewRepairIPAddress(interval time.Duration,
|
||||
workqueue.DefaultTypedControllerRateLimiter[string](),
|
||||
workqueue.TypedRateLimitingQueueConfig[string]{Name: "ipaddresses"},
|
||||
),
|
||||
tree: iptree.New[string](),
|
||||
workerLoopPeriod: time.Second,
|
||||
broadcaster: eventBroadcaster,
|
||||
recorder: recorder,
|
||||
@ -174,29 +163,6 @@ func NewRepairIPAddress(interval time.Duration,
|
||||
},
|
||||
}, interval)
|
||||
|
||||
_, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
r.cidrQueue.Add(key)
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old interface{}, new interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(new)
|
||||
if err == nil {
|
||||
r.cidrQueue.Add(key)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
|
||||
// key function.
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
r.cidrQueue.Add(key)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
@ -225,7 +191,6 @@ func NewRepairIPAddress(interval time.Duration,
|
||||
|
||||
// RunUntil starts the controller until the provided ch is closed.
|
||||
func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
|
||||
defer r.cidrQueue.ShutDown()
|
||||
defer r.ipQueue.ShutDown()
|
||||
defer r.svcQueue.ShutDown()
|
||||
r.broadcaster.StartRecordingToSink(stopCh)
|
||||
@ -247,9 +212,6 @@ func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{})
|
||||
}
|
||||
onFirstSuccess()
|
||||
|
||||
// serialize the operations on ServiceCIDRs
|
||||
go wait.Until(r.cidrWorker, r.workerLoopPeriod, stopCh)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh)
|
||||
go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh)
|
||||
@ -370,11 +332,7 @@ func (r *RepairIPAddress) syncService(key string) error {
|
||||
}
|
||||
// TODO(aojea) Refactor to abstract the IPs checks
|
||||
family := getFamilyByIP(ip)
|
||||
|
||||
r.muTree.Lock()
|
||||
prefixes := r.tree.GetHostIPPrefixMatches(ipToAddr(ip))
|
||||
r.muTree.Unlock()
|
||||
if len(prefixes) == 0 {
|
||||
if r.isIPOutOfRange(ip) {
|
||||
// ClusterIP is out of range
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within any configured Service CIDR; please recreate service", family, ip)
|
||||
runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within any service CIDR; please recreate", family, ip, svc.Namespace, svc.Name))
|
||||
@ -557,60 +515,9 @@ func (r *RepairIPAddress) syncIPAddress(key string) error {
|
||||
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) cidrWorker() {
|
||||
for r.processNextWorkCIDR() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) processNextWorkCIDR() bool {
|
||||
eKey, quit := r.cidrQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer r.cidrQueue.Done(eKey)
|
||||
|
||||
err := r.syncCIDRs()
|
||||
r.handleCIDRErr(err, eKey)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) handleCIDRErr(err error, key string) {
|
||||
if err == nil {
|
||||
r.cidrQueue.Forget(key)
|
||||
return
|
||||
}
|
||||
|
||||
if r.cidrQueue.NumRequeues(key) < maxRetries {
|
||||
klog.V(2).InfoS("Error syncing ServiceCIDR, retrying", "serviceCIDR", key, "err", err)
|
||||
r.cidrQueue.AddRateLimited(key)
|
||||
return
|
||||
}
|
||||
|
||||
klog.Warningf("Dropping ServiceCIDR %q out of the queue: %v", key, err)
|
||||
r.cidrQueue.Forget(key)
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
|
||||
// syncCIDRs rebuilds the radix tree based from the informers cache
|
||||
func (r *RepairIPAddress) syncCIDRs() error {
|
||||
serviceCIDRList, err := r.serviceCIDRLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tree := iptree.New[string]()
|
||||
for _, serviceCIDR := range serviceCIDRList {
|
||||
for _, cidr := range serviceCIDR.Spec.CIDRs {
|
||||
if prefix, err := netip.ParsePrefix(cidr); err == nil { // it can not fail since is already validated
|
||||
tree.InsertPrefix(prefix, serviceCIDR.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
r.muTree.Lock()
|
||||
defer r.muTree.Unlock()
|
||||
r.tree = tree
|
||||
return nil
|
||||
// isIPOutOfRange returns false if the IP is not contained in any of the ServiceCIDRs
|
||||
func (r *RepairIPAddress) isIPOutOfRange(ip net.IP) bool {
|
||||
return len(servicecidr.ContainsIP(r.serviceCIDRLister, ip)) == 0
|
||||
}
|
||||
|
||||
func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress {
|
||||
@ -677,20 +584,3 @@ func verifyIPAddressLabels(ip *networkingv1alpha1.IPAddress) bool {
|
||||
}
|
||||
return managedByController(ip)
|
||||
}
|
||||
|
||||
// TODO(aojea) move to utils, already in pkg/registry/core/service/ipallocator/cidrallocator.go
|
||||
// ipToAddr converts a net.IP to a netip.Addr
|
||||
// if the net.IP is not valid it returns an empty netip.Addr{}
|
||||
func ipToAddr(ip net.IP) netip.Addr {
|
||||
// https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format
|
||||
// so we have to check the IP family to return exactly the format that we want
|
||||
// address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns
|
||||
// an address like ::ffff:192.168.0.1/32
|
||||
bytes := ip.To4()
|
||||
if bytes == nil {
|
||||
bytes = ip.To16()
|
||||
}
|
||||
// AddrFromSlice returns Addr{}, false if the input is invalid.
|
||||
address, _ := netip.AddrFromSlice(bytes)
|
||||
return address
|
||||
}
|
||||
|
@ -327,10 +327,7 @@ func TestRepairServiceIP(t *testing.T) {
|
||||
t.Errorf("Unexpected error trying to add Service %v object: %v", cidr, err)
|
||||
}
|
||||
}
|
||||
err := r.syncCIDRs()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// override for testing
|
||||
r.servicesSynced = func() bool { return true }
|
||||
r.ipAddressSynced = func() bool { return true }
|
||||
@ -352,7 +349,7 @@ func TestRepairServiceIP(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
err = r.runOnce()
|
||||
err := r.runOnce()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user