convert k8s.io/kubernetes/pkg/proxy to contextual logging, part 1

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
This commit is contained in:
Ziqi Zhao
2024-01-26 13:10:53 +08:00
parent c1924df0a8
commit be4535bd34
19 changed files with 542 additions and 420 deletions

View File

@@ -84,18 +84,19 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
// platformSetup is called after setting up the ProxyServer, but before creating the
// Proxier. It should fill in any platform-specific fields and perform other
// platform-specific setup.
func (s *ProxyServer) platformSetup() error {
func (s *ProxyServer) platformSetup(ctx context.Context) error {
logger := klog.FromContext(ctx)
if s.Config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
s.logger.Info("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname)
node, err := waitForPodCIDR(s.Client, s.Hostname)
logger.Info("Watching for node, awaiting podCIDR allocation", "hostname", s.Hostname)
node, err := waitForPodCIDR(ctx, s.Client, s.Hostname)
if err != nil {
return err
}
s.podCIDRs = node.Spec.PodCIDRs
s.logger.Info("NodeInfo", "podCIDRs", node.Spec.PodCIDRs)
logger.Info("NodeInfo", "podCIDRs", node.Spec.PodCIDRs)
}
err := s.setupConntrack()
err := s.setupConntrack(ctx)
if err != nil {
return err
}
@@ -133,7 +134,9 @@ func getIPTables(primaryFamily v1.IPFamily) ([2]utiliptables.Interface, utilipta
// platformCheckSupported is called immediately before creating the Proxier, to check
// what IP families are supported (and whether the configuration is usable at all).
func (s *ProxyServer) platformCheckSupported() (ipv4Supported, ipv6Supported, dualStackSupported bool, err error) {
func (s *ProxyServer) platformCheckSupported(ctx context.Context) (ipv4Supported, ipv6Supported, dualStackSupported bool, err error) {
logger := klog.FromContext(ctx)
if isIPTablesBased(s.Config.Mode) {
ipt, _ := getIPTables(v1.IPFamilyUnknown)
ipv4Supported = ipt[0].Present()
@@ -142,9 +145,9 @@ func (s *ProxyServer) platformCheckSupported() (ipv4Supported, ipv6Supported, du
if !ipv4Supported && !ipv6Supported {
err = fmt.Errorf("iptables is not available on this host")
} else if !ipv4Supported {
s.logger.Info("No iptables support for family", "ipFamily", v1.IPv4Protocol)
logger.Info("No iptables support for family", "ipFamily", v1.IPv4Protocol)
} else if !ipv6Supported {
s.logger.Info("No iptables support for family", "ipFamily", v1.IPv6Protocol)
logger.Info("No iptables support for family", "ipFamily", v1.IPv6Protocol)
}
} else {
// Assume support for both families.
@@ -159,25 +162,27 @@ func (s *ProxyServer) platformCheckSupported() (ipv4Supported, ipv6Supported, du
}
// createProxier creates the proxy.Provider
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration, dualStack, initOnly bool) (proxy.Provider, error) {
func (s *ProxyServer) createProxier(ctx context.Context, config *proxyconfigapi.KubeProxyConfiguration, dualStack, initOnly bool) (proxy.Provider, error) {
logger := klog.FromContext(ctx)
var proxier proxy.Provider
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
var localDetector proxyutiliptables.LocalTrafficDetector
var err error
if config.Mode == proxyconfigapi.ProxyModeIPTables {
s.logger.Info("Using iptables Proxier")
logger.Info("Using iptables Proxier")
if dualStack {
ipt, _ := getIPTables(s.PrimaryIPFamily)
localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
localDetectors, err = getDualStackLocalDetectorTuple(logger, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewDualStackProxier(
ctx,
ipt,
utilsysctl.New(),
exec.New(),
@@ -197,13 +202,14 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
} else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
_, iptInterface := getIPTables(s.PrimaryIPFamily)
localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
localDetector, err = getLocalDetector(logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = iptables.NewProxier(
ctx,
s.PrimaryIPFamily,
iptInterface,
utilsysctl.New(),
@@ -230,21 +236,22 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
execer := exec.New()
ipsetInterface := utilipset.New(execer)
ipvsInterface := utilipvs.New()
if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil {
if err := ipvs.CanUseIPVSProxier(ctx, ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil {
return nil, fmt.Errorf("can't use the IPVS proxier: %v", err)
}
s.logger.Info("Using ipvs Proxier")
logger.Info("Using ipvs Proxier")
if dualStack {
ipt, _ := getIPTables(s.PrimaryIPFamily)
// Always ordered to match []ipt
localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
localDetectors, err = getDualStackLocalDetectorTuple(logger, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxier, err = ipvs.NewDualStackProxier(
ctx,
ipt,
ipvsInterface,
ipsetInterface,
@@ -270,12 +277,13 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
)
} else {
_, iptInterface := getIPTables(s.PrimaryIPFamily)
localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
localDetector, err = getLocalDetector(logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxier, err = ipvs.NewProxier(
ctx,
s.PrimaryIPFamily,
iptInterface,
ipvsInterface,
@@ -305,16 +313,17 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
} else if config.Mode == proxyconfigapi.ProxyModeNFTables {
s.logger.Info("Using nftables Proxier")
logger.Info("Using nftables Proxier")
if dualStack {
localDetectors, err = getDualStackLocalDetectorTuple(s.logger, config.DetectLocalMode, config, s.podCIDRs)
localDetectors, err = getDualStackLocalDetectorTuple(logger, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = nftables.NewDualStackProxier(
ctx,
utilsysctl.New(),
config.NFTables.SyncPeriod.Duration,
config.NFTables.MinSyncPeriod.Duration,
@@ -330,13 +339,14 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
)
} else {
// Create a single-stack proxier if and only if the node does not support dual-stack
localDetector, err = getLocalDetector(s.logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
localDetector, err = getLocalDetector(logger, s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
// TODO this has side effects that should only happen when Run() is invoked.
proxier, err = nftables.NewProxier(
ctx,
s.PrimaryIPFamily,
utilsysctl.New(),
config.NFTables.SyncPeriod.Duration,
@@ -361,17 +371,15 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
return proxier, nil
}
func (s *ProxyServer) setupConntrack() error {
ct := &realConntracker{
logger: s.logger,
}
func (s *ProxyServer) setupConntrack(ctx context.Context) error {
ct := &realConntracker{}
max, err := getConntrackMax(s.logger, s.Config.Conntrack)
max, err := getConntrackMax(ctx, s.Config.Conntrack)
if err != nil {
return err
}
if max > 0 {
err := ct.SetMax(max)
err := ct.SetMax(ctx, max)
if err != nil {
if err != errReadOnlySysFS {
return err
@@ -391,34 +399,34 @@ func (s *ProxyServer) setupConntrack() error {
if s.Config.Conntrack.TCPEstablishedTimeout != nil && s.Config.Conntrack.TCPEstablishedTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPEstablishedTimeout.Duration / time.Second)
if err := ct.SetTCPEstablishedTimeout(timeout); err != nil {
if err := ct.SetTCPEstablishedTimeout(ctx, timeout); err != nil {
return err
}
}
if s.Config.Conntrack.TCPCloseWaitTimeout != nil && s.Config.Conntrack.TCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.TCPCloseWaitTimeout.Duration / time.Second)
if err := ct.SetTCPCloseWaitTimeout(timeout); err != nil {
if err := ct.SetTCPCloseWaitTimeout(ctx, timeout); err != nil {
return err
}
}
if s.Config.Conntrack.TCPBeLiberal {
if err := ct.SetTCPBeLiberal(1); err != nil {
if err := ct.SetTCPBeLiberal(ctx, 1); err != nil {
return err
}
}
if s.Config.Conntrack.UDPTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.UDPTimeout.Duration / time.Second)
if err := ct.SetUDPTimeout(timeout); err != nil {
if err := ct.SetUDPTimeout(ctx, timeout); err != nil {
return err
}
}
if s.Config.Conntrack.UDPStreamTimeout.Duration > 0 {
timeout := int(s.Config.Conntrack.UDPStreamTimeout.Duration / time.Second)
if err := ct.SetUDPStreamTimeout(timeout); err != nil {
if err := ct.SetUDPStreamTimeout(ctx, timeout); err != nil {
return err
}
}
@@ -426,7 +434,8 @@ func (s *ProxyServer) setupConntrack() error {
return nil
}
func getConntrackMax(logger klog.Logger, config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) {
func getConntrackMax(ctx context.Context, config proxyconfigapi.KubeProxyConntrackConfiguration) (int, error) {
logger := klog.FromContext(ctx)
if config.MaxPerCore != nil && *config.MaxPerCore > 0 {
floor := 0
if config.Min != nil {
@@ -443,10 +452,10 @@ func getConntrackMax(logger klog.Logger, config proxyconfigapi.KubeProxyConntrac
return 0, nil
}
func waitForPodCIDR(client clientset.Interface, nodeName string) (*v1.Node, error) {
func waitForPodCIDR(ctx context.Context, client clientset.Interface, nodeName string) (*v1.Node, error) {
// since allocators can assign the podCIDR after the node registers, we do a watch here to wait
// for podCIDR to be assigned, instead of assuming that the Get() on startup will have it.
ctx, cancelFunc := context.WithTimeout(context.TODO(), timeoutForNodePodCIDR)
ctx, cancelFunc := context.WithTimeout(ctx, timeoutForNodePodCIDR)
defer cancelFunc()
fieldSelector := fields.OneTermEqualSelector("metadata.name", nodeName).String()
@@ -552,7 +561,7 @@ func getDualStackLocalDetectorTuple(logger klog.Logger, mode proxyconfigapi.Loca
// cleanupAndExit is true, it will attempt to remove rules from all known kube-proxy
// modes. If it is false, it will only remove rules that are definitely not in use by the
// currently-configured mode.
func platformCleanup(mode proxyconfigapi.ProxyMode, cleanupAndExit bool) error {
func platformCleanup(ctx context.Context, mode proxyconfigapi.ProxyMode, cleanupAndExit bool) error {
var encounteredError bool
// Clean up iptables and ipvs rules if switching to nftables, or if cleanupAndExit
@@ -563,15 +572,15 @@ func platformCleanup(mode proxyconfigapi.ProxyMode, cleanupAndExit bool) error {
ipvsInterface := utilipvs.New()
for _, ipt := range ipts {
encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError
encounteredError = ipvs.CleanupLeftovers(ipvsInterface, ipt, ipsetInterface) || encounteredError
encounteredError = iptables.CleanupLeftovers(ctx, ipt) || encounteredError
encounteredError = ipvs.CleanupLeftovers(ctx, ipvsInterface, ipt, ipsetInterface) || encounteredError
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.NFTablesProxyMode) {
// Clean up nftables rules when switching to iptables or ipvs, or if cleanupAndExit
if isIPTablesBased(mode) || cleanupAndExit {
encounteredError = nftables.CleanupLeftovers() || encounteredError
encounteredError = nftables.CleanupLeftovers(ctx) || encounteredError
}
}