From 883d0c3b717987c19e5083c48f0ae939cbade38a Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Thu, 6 Jul 2023 14:31:00 -0400 Subject: [PATCH] Add a dummy implementation of proxyutil.LineBuffer Rather than actually assembling all of the rules we aren't going to use, just count them and throw them away. --- pkg/proxy/iptables/proxier.go | 28 ++--- pkg/proxy/iptables/proxier_test.go | 8 +- pkg/proxy/ipvs/proxier.go | 8 +- pkg/proxy/ipvs/proxier_test.go | 8 +- pkg/proxy/util/linebuffer.go | 150 ++++++++++++++++++++++++++ pkg/proxy/util/linebuffer_test.go | 168 +++++++++++++++++++++++++++++ pkg/proxy/util/utils.go | 62 ----------- pkg/proxy/util/utils_test.go | 141 ------------------------ 8 files changed, 344 insertions(+), 229 deletions(-) create mode 100644 pkg/proxy/util/linebuffer.go create mode 100644 pkg/proxy/util/linebuffer_test.go diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index fd4886d3283..8054946e980 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -288,10 +288,10 @@ func NewProxier(ipFamily v1.IPFamily, precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), existingFilterChainsData: bytes.NewBuffer(nil), - filterChains: proxyutil.LineBuffer{}, - filterRules: proxyutil.LineBuffer{}, - natChains: proxyutil.LineBuffer{}, - natRules: proxyutil.LineBuffer{}, + filterChains: proxyutil.NewLineBuffer(), + filterRules: proxyutil.NewLineBuffer(), + natChains: proxyutil.NewLineBuffer(), + natRules: proxyutil.NewLineBuffer(), localhostNodePorts: localhostNodePorts, nodePortAddresses: nodePortAddresses, networkInterfacer: proxyutil.RealNetwork{}, @@ -411,8 +411,8 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { encounteredError = true } else { existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes()) - natChains := &proxyutil.LineBuffer{} - natRules := &proxyutil.LineBuffer{} + natChains := proxyutil.NewLineBuffer() + natRules := proxyutil.NewLineBuffer() natChains.Write("*nat") // Start with chains we know we need to remove. for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} { @@ -448,8 +448,8 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { encounteredError = true } else { existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes()) - filterChains := &proxyutil.LineBuffer{} - filterRules := &proxyutil.LineBuffer{} + filterChains := proxyutil.NewLineBuffer() + filterRules := proxyutil.NewLineBuffer() filterChains.Write("*filter") for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { if _, found := existingFilterChains[chain]; found { @@ -852,8 +852,8 @@ func (proxier *Proxier) syncProxyRules() { proxier.natChains.Reset() proxier.natRules.Reset() - skippedNatChains := &proxyutil.LineBuffer{} - skippedNatRules := &proxyutil.LineBuffer{} + skippedNatChains := proxyutil.NewDiscardLineBuffer() + skippedNatRules := proxyutil.NewDiscardLineBuffer() // Write chain lines for all the "top-level" chains we'll be filling in for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} { @@ -1069,9 +1069,9 @@ func (proxier *Proxier) syncProxyRules() { } } - filterRules := &proxier.filterRules - natChains := &proxier.natChains - natRules := &proxier.natRules + filterRules := proxier.filterRules + natChains := proxier.natChains + natRules := proxier.natRules // Capture the clusterIP. if hasInternalEndpoints { @@ -1562,7 +1562,7 @@ func (proxier *Proxier) syncProxyRules() { conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } -func (proxier *Proxier) writeServiceToEndpointRules(natRules *proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { +func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { for _, ep := range endpoints { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index d02ca9cad94..5691ea0feac 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -324,10 +324,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), existingFilterChainsData: bytes.NewBuffer(nil), - filterChains: proxyutil.LineBuffer{}, - filterRules: proxyutil.LineBuffer{}, - natChains: proxyutil.LineBuffer{}, - natRules: proxyutil.LineBuffer{}, + filterChains: proxyutil.NewLineBuffer(), + filterRules: proxyutil.NewLineBuffer(), + natChains: proxyutil.NewLineBuffer(), + natRules: proxyutil.NewLineBuffer(), nodeIP: netutils.ParseIPSloppy(testNodeIP), localhostNodePorts: true, nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil), diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index cf3236bf1a5..1194ce84b4f 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -440,10 +440,10 @@ func NewProxier(ipFamily v1.IPFamily, ipvsScheduler: scheduler, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), - natChains: proxyutil.LineBuffer{}, - natRules: proxyutil.LineBuffer{}, - filterChains: proxyutil.LineBuffer{}, - filterRules: proxyutil.LineBuffer{}, + natChains: proxyutil.NewLineBuffer(), + natRules: proxyutil.NewLineBuffer(), + filterChains: proxyutil.NewLineBuffer(), + filterRules: proxyutil.NewLineBuffer(), netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol), ipset: ipset, nodePortAddresses: nodePortAddresses, diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 134276f9aec..22a1b84e638 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -163,10 +163,10 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u ipvsScheduler: defaultScheduler, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), - natChains: proxyutil.LineBuffer{}, - natRules: proxyutil.LineBuffer{}, - filterChains: proxyutil.LineBuffer{}, - filterRules: proxyutil.LineBuffer{}, + natChains: proxyutil.NewLineBuffer(), + natRules: proxyutil.NewLineBuffer(), + filterChains: proxyutil.NewLineBuffer(), + filterRules: proxyutil.NewLineBuffer(), netlinkHandle: netlinkHandle, ipsetList: ipsetList, nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil), diff --git a/pkg/proxy/util/linebuffer.go b/pkg/proxy/util/linebuffer.go new file mode 100644 index 00000000000..2309d93c807 --- /dev/null +++ b/pkg/proxy/util/linebuffer.go @@ -0,0 +1,150 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "bytes" + "fmt" +) + +// LineBuffer is an interface for writing lines of input to a bytes.Buffer +type LineBuffer interface { + // Write takes a list of arguments, each a string or []string, joins all the + // individual strings with spaces, terminates with newline, and writes them to the + // buffer. Any other argument type will panic. + Write(args ...interface{}) + + // WriteBytes writes bytes to the buffer, and terminates with newline. + WriteBytes(bytes []byte) + + // Reset clears the buffer + Reset() + + // Bytes returns the contents of the buffer as a []byte + Bytes() []byte + + // String returns the contents of the buffer as a string + String() string + + // Lines returns the number of lines in the buffer. Note that more precisely, this + // returns the number of times Write() or WriteBytes() was called; it assumes that + // you never wrote any newlines to the buffer yourself. + Lines() int +} + +type realLineBuffer struct { + b bytes.Buffer + lines int +} + +// NewLineBuffer returns a new "real" LineBuffer +func NewLineBuffer() LineBuffer { + return &realLineBuffer{} +} + +// Write is part of LineBuffer +func (buf *realLineBuffer) Write(args ...interface{}) { + for i, arg := range args { + if i > 0 { + buf.b.WriteByte(' ') + } + switch x := arg.(type) { + case string: + buf.b.WriteString(x) + case []string: + for j, s := range x { + if j > 0 { + buf.b.WriteByte(' ') + } + buf.b.WriteString(s) + } + default: + panic(fmt.Sprintf("unknown argument type: %T", x)) + } + } + buf.b.WriteByte('\n') + buf.lines++ +} + +// WriteBytes is part of LineBuffer +func (buf *realLineBuffer) WriteBytes(bytes []byte) { + buf.b.Write(bytes) + buf.b.WriteByte('\n') + buf.lines++ +} + +// Reset is part of LineBuffer +func (buf *realLineBuffer) Reset() { + buf.b.Reset() + buf.lines = 0 +} + +// Bytes is part of LineBuffer +func (buf *realLineBuffer) Bytes() []byte { + return buf.b.Bytes() +} + +// String is part of LineBuffer +func (buf *realLineBuffer) String() string { + return buf.b.String() +} + +// Lines is part of LineBuffer +func (buf *realLineBuffer) Lines() int { + return buf.lines +} + +type discardLineBuffer struct { + lines int +} + +// NewDiscardLineBuffer returns a dummy LineBuffer that counts the number of writes but +// throws away the data. (This is used for iptables proxy partial syncs, to keep track of +// how many rules we managed to avoid having to sync.) +func NewDiscardLineBuffer() LineBuffer { + return &discardLineBuffer{} +} + +// Write is part of LineBuffer +func (buf *discardLineBuffer) Write(args ...interface{}) { + buf.lines++ +} + +// WriteBytes is part of LineBuffer +func (buf *discardLineBuffer) WriteBytes(bytes []byte) { + buf.lines++ +} + +// Reset is part of LineBuffer +func (buf *discardLineBuffer) Reset() { + buf.lines = 0 +} + +// Bytes is part of LineBuffer +func (buf *discardLineBuffer) Bytes() []byte { + return []byte{} +} + +// String is part of LineBuffer +func (buf *discardLineBuffer) String() string { + return "" +} + +// Lines is part of LineBuffer +func (buf *discardLineBuffer) Lines() int { + return buf.lines +} diff --git a/pkg/proxy/util/linebuffer_test.go b/pkg/proxy/util/linebuffer_test.go new file mode 100644 index 00000000000..312c85ebf1b --- /dev/null +++ b/pkg/proxy/util/linebuffer_test.go @@ -0,0 +1,168 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "math/rand" + "strings" + "testing" +) + +func TestLineBufferWrite(t *testing.T) { + testCases := []struct { + name string + input []interface{} + expected string + }{ + { + name: "none", + input: []interface{}{}, + expected: "\n", + }, + { + name: "one string", + input: []interface{}{"test1"}, + expected: "test1\n", + }, + { + name: "one slice", + input: []interface{}{[]string{"test1", "test2"}}, + expected: "test1 test2\n", + }, + { + name: "mixed", + input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"}, + expected: "s1 s2 s3 s4 s5 s6 s7\n", + }, + } + testBuffer := NewLineBuffer() + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + testBuffer.Reset() + testBuffer.Write(testCase.input...) + if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) { + t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got) + } + if testBuffer.Lines() != 1 { + t.Fatalf("expected 1 line, got: %d", testBuffer.Lines()) + } + }) + } +} + +func TestLineBufferWritePanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("did not panic") + } + }() + testBuffer := NewLineBuffer() + testBuffer.Write("string", []string{"a", "slice"}, 1234) +} + +func TestLineBufferWriteBytes(t *testing.T) { + testCases := []struct { + name string + bytes []byte + expected string + }{ + { + name: "empty bytes", + bytes: []byte{}, + expected: "\n", + }, + { + name: "test bytes", + bytes: []byte("test write bytes line"), + expected: "test write bytes line\n", + }, + } + + testBuffer := NewLineBuffer() + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + testBuffer.Reset() + testBuffer.WriteBytes(testCase.bytes) + if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) { + t.Fatalf("write bytes is %v\n expected: %s, got: %s", testCase.bytes, want, got) + } + }) + } +} + +// obtained from https://stackoverflow.com/a/22892986 +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randSeq() string { + b := make([]rune, 30) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} + +func TestWriteCountLines(t *testing.T) { + testCases := []struct { + name string + expected int + }{ + { + name: "write no line", + expected: 0, + }, + { + name: "write one line", + expected: 1, + }, + { + name: "write 100 lines", + expected: 100, + }, + { + name: "write 1000 lines", + expected: 1000, + }, + { + name: "write 10000 lines", + expected: 10000, + }, + { + name: "write 100000 lines", + expected: 100000, + }, + } + testBuffer := NewLineBuffer() + discardBuffer := NewDiscardLineBuffer() + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + testBuffer.Reset() + discardBuffer.Reset() + for i := 0; i < testCase.expected; i++ { + testBuffer.Write(randSeq()) + discardBuffer.Write(randSeq()) + } + n := testBuffer.Lines() + if n != testCase.expected { + t.Fatalf("lines expected: %d, got: %d", testCase.expected, n) + } + n = discardBuffer.Lines() + if n != testCase.expected { + t.Fatalf("discardBuffer lines expected: %d, got: %d", testCase.expected, n) + } + }) + } +} diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index bda706ce657..3f56bb6dbf5 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -17,7 +17,6 @@ limitations under the License. package util import ( - "bytes" "context" "fmt" "net" @@ -321,67 +320,6 @@ func GetClusterIPByFamily(ipFamily v1.IPFamily, service *v1.Service) string { return "" } -type LineBuffer struct { - b bytes.Buffer - lines int -} - -// Write takes a list of arguments, each a string or []string, joins all the -// individual strings with spaces, terminates with newline, and writes to buf. -// Any other argument type will panic. -func (buf *LineBuffer) Write(args ...interface{}) { - for i, arg := range args { - if i > 0 { - buf.b.WriteByte(' ') - } - switch x := arg.(type) { - case string: - buf.b.WriteString(x) - case []string: - for j, s := range x { - if j > 0 { - buf.b.WriteByte(' ') - } - buf.b.WriteString(s) - } - default: - panic(fmt.Sprintf("unknown argument type: %T", x)) - } - } - buf.b.WriteByte('\n') - buf.lines++ -} - -// WriteBytes writes bytes to buffer, and terminates with newline. -func (buf *LineBuffer) WriteBytes(bytes []byte) { - buf.b.Write(bytes) - buf.b.WriteByte('\n') - buf.lines++ -} - -// Reset clears buf -func (buf *LineBuffer) Reset() { - buf.b.Reset() - buf.lines = 0 -} - -// Bytes returns the contents of buf as a []byte -func (buf *LineBuffer) Bytes() []byte { - return buf.b.Bytes() -} - -// String returns the contents of buf as a string -func (buf *LineBuffer) String() string { - return buf.b.String() -} - -// Lines returns the number of lines in buf. Note that more precisely, this returns the -// number of times Write() or WriteBytes() was called; it assumes that you never wrote -// any newlines to the buffer yourself. -func (buf *LineBuffer) Lines() int { - return buf.lines -} - // RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only // closes the ports opened in this sync. func RevertPorts(replacementPortsMap, originalPortsMap map[netutils.LocalPort]netutils.Closeable) { diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index 4209d113130..aa3a32dd2c5 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -17,10 +17,8 @@ limitations under the License. package util import ( - "math/rand" "net" "reflect" - "strings" "testing" v1 "k8s.io/api/core/v1" @@ -707,145 +705,6 @@ func TestRevertPorts(t *testing.T) { } } -func TestLineBufferWrite(t *testing.T) { - testCases := []struct { - name string - input []interface{} - expected string - }{ - { - name: "none", - input: []interface{}{}, - expected: "\n", - }, - { - name: "one string", - input: []interface{}{"test1"}, - expected: "test1\n", - }, - { - name: "one slice", - input: []interface{}{[]string{"test1", "test2"}}, - expected: "test1 test2\n", - }, - { - name: "mixed", - input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"}, - expected: "s1 s2 s3 s4 s5 s6 s7\n", - }, - } - testBuffer := LineBuffer{} - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - testBuffer.Reset() - testBuffer.Write(testCase.input...) - if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) { - t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got) - } - if testBuffer.Lines() != 1 { - t.Fatalf("expected 1 line, got: %d", testBuffer.Lines()) - } - }) - } -} - -func TestLineBufferWritePanic(t *testing.T) { - defer func() { - if r := recover(); r == nil { - t.Errorf("did not panic") - } - }() - testBuffer := LineBuffer{} - testBuffer.Write("string", []string{"a", "slice"}, 1234) -} - -func TestLineBufferWriteBytes(t *testing.T) { - testCases := []struct { - name string - bytes []byte - expected string - }{ - { - name: "empty bytes", - bytes: []byte{}, - expected: "\n", - }, - { - name: "test bytes", - bytes: []byte("test write bytes line"), - expected: "test write bytes line\n", - }, - } - - testBuffer := LineBuffer{} - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - testBuffer.Reset() - testBuffer.WriteBytes(testCase.bytes) - if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) { - t.Fatalf("write bytes is %v\n expected: %s, got: %s", testCase.bytes, want, got) - } - }) - } -} - -func TestWriteCountLines(t *testing.T) { - - testCases := []struct { - name string - expected int - }{ - { - name: "write no line", - expected: 0, - }, - { - name: "write one line", - expected: 1, - }, - { - name: "write 100 lines", - expected: 100, - }, - { - name: "write 1000 lines", - expected: 1000, - }, - { - name: "write 10000 lines", - expected: 10000, - }, - { - name: "write 100000 lines", - expected: 100000, - }, - } - testBuffer := LineBuffer{} - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - testBuffer.Reset() - for i := 0; i < testCase.expected; i++ { - testBuffer.Write(randSeq()) - } - n := testBuffer.Lines() - if n != testCase.expected { - t.Fatalf("lines expected: %d, got: %d", testCase.expected, n) - } - }) - } -} - -// obtained from https://stackoverflow.com/a/22892986 -var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - -func randSeq() string { - b := make([]rune, 30) - for i := range b { - b[i] = letters[rand.Intn(len(letters))] - } - return string(b) -} - func mustParseIPAddr(str string) net.Addr { a, err := net.ResolveIPAddr("ip", str) if err != nil {