Add a dummy implementation of proxyutil.LineBuffer

Rather than actually assembling all of the rules we aren't going to
use, just count them and throw them away.
This commit is contained in:
Dan Winship 2023-07-06 14:31:00 -04:00
parent 68ed020b2a
commit 883d0c3b71
8 changed files with 344 additions and 229 deletions

View File

@ -288,10 +288,10 @@ func NewProxier(ipFamily v1.IPFamily,
precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil),
existingFilterChainsData: bytes.NewBuffer(nil),
filterChains: proxyutil.LineBuffer{},
filterRules: proxyutil.LineBuffer{},
natChains: proxyutil.LineBuffer{},
natRules: proxyutil.LineBuffer{},
filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.NewLineBuffer(),
natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.NewLineBuffer(),
localhostNodePorts: localhostNodePorts,
nodePortAddresses: nodePortAddresses,
networkInterfacer: proxyutil.RealNetwork{},
@ -411,8 +411,8 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
encounteredError = true
} else {
existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
natChains := &proxyutil.LineBuffer{}
natRules := &proxyutil.LineBuffer{}
natChains := proxyutil.NewLineBuffer()
natRules := proxyutil.NewLineBuffer()
natChains.Write("*nat")
// Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
@ -448,8 +448,8 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
encounteredError = true
} else {
existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
filterChains := &proxyutil.LineBuffer{}
filterRules := &proxyutil.LineBuffer{}
filterChains := proxyutil.NewLineBuffer()
filterRules := proxyutil.NewLineBuffer()
filterChains.Write("*filter")
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} {
if _, found := existingFilterChains[chain]; found {
@ -852,8 +852,8 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natChains.Reset()
proxier.natRules.Reset()
skippedNatChains := &proxyutil.LineBuffer{}
skippedNatRules := &proxyutil.LineBuffer{}
skippedNatChains := proxyutil.NewDiscardLineBuffer()
skippedNatRules := proxyutil.NewDiscardLineBuffer()
// Write chain lines for all the "top-level" chains we'll be filling in
for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeProxyFirewallChain} {
@ -1069,9 +1069,9 @@ func (proxier *Proxier) syncProxyRules() {
}
}
filterRules := &proxier.filterRules
natChains := &proxier.natChains
natRules := &proxier.natRules
filterRules := proxier.filterRules
natChains := proxier.natChains
natRules := proxier.natRules
// Capture the clusterIP.
if hasInternalEndpoints {
@ -1562,7 +1562,7 @@ func (proxier *Proxier) syncProxyRules() {
conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
func (proxier *Proxier) writeServiceToEndpointRules(natRules *proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, ep := range endpoints {

View File

@ -324,10 +324,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil),
existingFilterChainsData: bytes.NewBuffer(nil),
filterChains: proxyutil.LineBuffer{},
filterRules: proxyutil.LineBuffer{},
natChains: proxyutil.LineBuffer{},
natRules: proxyutil.LineBuffer{},
filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.NewLineBuffer(),
natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.NewLineBuffer(),
nodeIP: netutils.ParseIPSloppy(testNodeIP),
localhostNodePorts: true,
nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil),

View File

@ -440,10 +440,10 @@ func NewProxier(ipFamily v1.IPFamily,
ipvsScheduler: scheduler,
iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil),
natChains: proxyutil.LineBuffer{},
natRules: proxyutil.LineBuffer{},
filterChains: proxyutil.LineBuffer{},
filterRules: proxyutil.LineBuffer{},
natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.NewLineBuffer(),
filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.NewLineBuffer(),
netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
ipset: ipset,
nodePortAddresses: nodePortAddresses,

View File

@ -163,10 +163,10 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
ipvsScheduler: defaultScheduler,
iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil),
natChains: proxyutil.LineBuffer{},
natRules: proxyutil.LineBuffer{},
filterChains: proxyutil.LineBuffer{},
filterRules: proxyutil.LineBuffer{},
natChains: proxyutil.NewLineBuffer(),
natRules: proxyutil.NewLineBuffer(),
filterChains: proxyutil.NewLineBuffer(),
filterRules: proxyutil.NewLineBuffer(),
netlinkHandle: netlinkHandle,
ipsetList: ipsetList,
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil),

View File

@ -0,0 +1,150 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"bytes"
"fmt"
)
// LineBuffer is an interface for writing lines of input to a bytes.Buffer
type LineBuffer interface {
// Write takes a list of arguments, each a string or []string, joins all the
// individual strings with spaces, terminates with newline, and writes them to the
// buffer. Any other argument type will panic.
Write(args ...interface{})
// WriteBytes writes bytes to the buffer, and terminates with newline.
WriteBytes(bytes []byte)
// Reset clears the buffer
Reset()
// Bytes returns the contents of the buffer as a []byte
Bytes() []byte
// String returns the contents of the buffer as a string
String() string
// Lines returns the number of lines in the buffer. Note that more precisely, this
// returns the number of times Write() or WriteBytes() was called; it assumes that
// you never wrote any newlines to the buffer yourself.
Lines() int
}
type realLineBuffer struct {
b bytes.Buffer
lines int
}
// NewLineBuffer returns a new "real" LineBuffer
func NewLineBuffer() LineBuffer {
return &realLineBuffer{}
}
// Write is part of LineBuffer
func (buf *realLineBuffer) Write(args ...interface{}) {
for i, arg := range args {
if i > 0 {
buf.b.WriteByte(' ')
}
switch x := arg.(type) {
case string:
buf.b.WriteString(x)
case []string:
for j, s := range x {
if j > 0 {
buf.b.WriteByte(' ')
}
buf.b.WriteString(s)
}
default:
panic(fmt.Sprintf("unknown argument type: %T", x))
}
}
buf.b.WriteByte('\n')
buf.lines++
}
// WriteBytes is part of LineBuffer
func (buf *realLineBuffer) WriteBytes(bytes []byte) {
buf.b.Write(bytes)
buf.b.WriteByte('\n')
buf.lines++
}
// Reset is part of LineBuffer
func (buf *realLineBuffer) Reset() {
buf.b.Reset()
buf.lines = 0
}
// Bytes is part of LineBuffer
func (buf *realLineBuffer) Bytes() []byte {
return buf.b.Bytes()
}
// String is part of LineBuffer
func (buf *realLineBuffer) String() string {
return buf.b.String()
}
// Lines is part of LineBuffer
func (buf *realLineBuffer) Lines() int {
return buf.lines
}
type discardLineBuffer struct {
lines int
}
// NewDiscardLineBuffer returns a dummy LineBuffer that counts the number of writes but
// throws away the data. (This is used for iptables proxy partial syncs, to keep track of
// how many rules we managed to avoid having to sync.)
func NewDiscardLineBuffer() LineBuffer {
return &discardLineBuffer{}
}
// Write is part of LineBuffer
func (buf *discardLineBuffer) Write(args ...interface{}) {
buf.lines++
}
// WriteBytes is part of LineBuffer
func (buf *discardLineBuffer) WriteBytes(bytes []byte) {
buf.lines++
}
// Reset is part of LineBuffer
func (buf *discardLineBuffer) Reset() {
buf.lines = 0
}
// Bytes is part of LineBuffer
func (buf *discardLineBuffer) Bytes() []byte {
return []byte{}
}
// String is part of LineBuffer
func (buf *discardLineBuffer) String() string {
return ""
}
// Lines is part of LineBuffer
func (buf *discardLineBuffer) Lines() int {
return buf.lines
}

View File

@ -0,0 +1,168 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"math/rand"
"strings"
"testing"
)
func TestLineBufferWrite(t *testing.T) {
testCases := []struct {
name string
input []interface{}
expected string
}{
{
name: "none",
input: []interface{}{},
expected: "\n",
},
{
name: "one string",
input: []interface{}{"test1"},
expected: "test1\n",
},
{
name: "one slice",
input: []interface{}{[]string{"test1", "test2"}},
expected: "test1 test2\n",
},
{
name: "mixed",
input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"},
expected: "s1 s2 s3 s4 s5 s6 s7\n",
},
}
testBuffer := NewLineBuffer()
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.Write(testCase.input...)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got)
}
if testBuffer.Lines() != 1 {
t.Fatalf("expected 1 line, got: %d", testBuffer.Lines())
}
})
}
}
func TestLineBufferWritePanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("did not panic")
}
}()
testBuffer := NewLineBuffer()
testBuffer.Write("string", []string{"a", "slice"}, 1234)
}
func TestLineBufferWriteBytes(t *testing.T) {
testCases := []struct {
name string
bytes []byte
expected string
}{
{
name: "empty bytes",
bytes: []byte{},
expected: "\n",
},
{
name: "test bytes",
bytes: []byte("test write bytes line"),
expected: "test write bytes line\n",
},
}
testBuffer := NewLineBuffer()
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.WriteBytes(testCase.bytes)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write bytes is %v\n expected: %s, got: %s", testCase.bytes, want, got)
}
})
}
}
// obtained from https://stackoverflow.com/a/22892986
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randSeq() string {
b := make([]rune, 30)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func TestWriteCountLines(t *testing.T) {
testCases := []struct {
name string
expected int
}{
{
name: "write no line",
expected: 0,
},
{
name: "write one line",
expected: 1,
},
{
name: "write 100 lines",
expected: 100,
},
{
name: "write 1000 lines",
expected: 1000,
},
{
name: "write 10000 lines",
expected: 10000,
},
{
name: "write 100000 lines",
expected: 100000,
},
}
testBuffer := NewLineBuffer()
discardBuffer := NewDiscardLineBuffer()
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
discardBuffer.Reset()
for i := 0; i < testCase.expected; i++ {
testBuffer.Write(randSeq())
discardBuffer.Write(randSeq())
}
n := testBuffer.Lines()
if n != testCase.expected {
t.Fatalf("lines expected: %d, got: %d", testCase.expected, n)
}
n = discardBuffer.Lines()
if n != testCase.expected {
t.Fatalf("discardBuffer lines expected: %d, got: %d", testCase.expected, n)
}
})
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package util
import (
"bytes"
"context"
"fmt"
"net"
@ -321,67 +320,6 @@ func GetClusterIPByFamily(ipFamily v1.IPFamily, service *v1.Service) string {
return ""
}
type LineBuffer struct {
b bytes.Buffer
lines int
}
// Write takes a list of arguments, each a string or []string, joins all the
// individual strings with spaces, terminates with newline, and writes to buf.
// Any other argument type will panic.
func (buf *LineBuffer) Write(args ...interface{}) {
for i, arg := range args {
if i > 0 {
buf.b.WriteByte(' ')
}
switch x := arg.(type) {
case string:
buf.b.WriteString(x)
case []string:
for j, s := range x {
if j > 0 {
buf.b.WriteByte(' ')
}
buf.b.WriteString(s)
}
default:
panic(fmt.Sprintf("unknown argument type: %T", x))
}
}
buf.b.WriteByte('\n')
buf.lines++
}
// WriteBytes writes bytes to buffer, and terminates with newline.
func (buf *LineBuffer) WriteBytes(bytes []byte) {
buf.b.Write(bytes)
buf.b.WriteByte('\n')
buf.lines++
}
// Reset clears buf
func (buf *LineBuffer) Reset() {
buf.b.Reset()
buf.lines = 0
}
// Bytes returns the contents of buf as a []byte
func (buf *LineBuffer) Bytes() []byte {
return buf.b.Bytes()
}
// String returns the contents of buf as a string
func (buf *LineBuffer) String() string {
return buf.b.String()
}
// Lines returns the number of lines in buf. Note that more precisely, this returns the
// number of times Write() or WriteBytes() was called; it assumes that you never wrote
// any newlines to the buffer yourself.
func (buf *LineBuffer) Lines() int {
return buf.lines
}
// RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only
// closes the ports opened in this sync.
func RevertPorts(replacementPortsMap, originalPortsMap map[netutils.LocalPort]netutils.Closeable) {

View File

@ -17,10 +17,8 @@ limitations under the License.
package util
import (
"math/rand"
"net"
"reflect"
"strings"
"testing"
v1 "k8s.io/api/core/v1"
@ -707,145 +705,6 @@ func TestRevertPorts(t *testing.T) {
}
}
func TestLineBufferWrite(t *testing.T) {
testCases := []struct {
name string
input []interface{}
expected string
}{
{
name: "none",
input: []interface{}{},
expected: "\n",
},
{
name: "one string",
input: []interface{}{"test1"},
expected: "test1\n",
},
{
name: "one slice",
input: []interface{}{[]string{"test1", "test2"}},
expected: "test1 test2\n",
},
{
name: "mixed",
input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"},
expected: "s1 s2 s3 s4 s5 s6 s7\n",
},
}
testBuffer := LineBuffer{}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.Write(testCase.input...)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got)
}
if testBuffer.Lines() != 1 {
t.Fatalf("expected 1 line, got: %d", testBuffer.Lines())
}
})
}
}
func TestLineBufferWritePanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("did not panic")
}
}()
testBuffer := LineBuffer{}
testBuffer.Write("string", []string{"a", "slice"}, 1234)
}
func TestLineBufferWriteBytes(t *testing.T) {
testCases := []struct {
name string
bytes []byte
expected string
}{
{
name: "empty bytes",
bytes: []byte{},
expected: "\n",
},
{
name: "test bytes",
bytes: []byte("test write bytes line"),
expected: "test write bytes line\n",
},
}
testBuffer := LineBuffer{}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
testBuffer.WriteBytes(testCase.bytes)
if want, got := testCase.expected, testBuffer.String(); !strings.EqualFold(want, got) {
t.Fatalf("write bytes is %v\n expected: %s, got: %s", testCase.bytes, want, got)
}
})
}
}
func TestWriteCountLines(t *testing.T) {
testCases := []struct {
name string
expected int
}{
{
name: "write no line",
expected: 0,
},
{
name: "write one line",
expected: 1,
},
{
name: "write 100 lines",
expected: 100,
},
{
name: "write 1000 lines",
expected: 1000,
},
{
name: "write 10000 lines",
expected: 10000,
},
{
name: "write 100000 lines",
expected: 100000,
},
}
testBuffer := LineBuffer{}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset()
for i := 0; i < testCase.expected; i++ {
testBuffer.Write(randSeq())
}
n := testBuffer.Lines()
if n != testCase.expected {
t.Fatalf("lines expected: %d, got: %d", testCase.expected, n)
}
})
}
}
// obtained from https://stackoverflow.com/a/22892986
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randSeq() string {
b := make([]rune, 30)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
func mustParseIPAddr(str string) net.Addr {
a, err := net.ResolveIPAddr("ip", str)
if err != nil {