diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9408df23a16..615baa8ed76 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -154,7 +154,7 @@ func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint { func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { o, ok := other.(*endpointsInfo) if !ok { - klog.Error("Failed to cast endpointsInfo") + klog.ErrorS(nil, "Failed to cast endpointsInfo") return false } return e.Endpoint == o.Endpoint && @@ -277,13 +277,13 @@ func NewProxier(ipt utiliptables.Interface, // are connected to a Linux bridge (but not SDN bridges). Until most // plugins handle this, log when config is missing if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { - klog.Warning("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") + klog.InfoS("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") } // Generate the masquerade mark to use for SNAT rules. masqueradeValue := 1 << uint(masqueradeBit) masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) - klog.V(2).Infof("iptables(%s) masquerade mark: %s", ipt.Protocol(), masqueradeMark) + klog.V(2).InfoS("using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark) endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) @@ -298,7 +298,7 @@ func NewProxier(ipt utiliptables.Interface, nodePortAddresses = ipFamilyMap[ipFamily] // Log the IPs not matching the ipFamily if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { - klog.Warningf("IP Family: %s, NodePortAddresses of wrong family; %s", ipFamily, strings.Join(ips, ",")) + klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ",")) } proxier := &Proxier{ @@ -331,8 +331,7 @@ func NewProxier(ipt utiliptables.Interface, } burstSyncs := 2 - klog.V(2).Infof("iptables(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d", - ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs) + klog.V(2).InfoS("iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to. // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though. // time.Hour is arbitrary. @@ -343,9 +342,9 @@ func NewProxier(ipt utiliptables.Interface, proxier.syncProxyRules, syncPeriod, wait.NeverStop) if ipt.HasRandomFully() { - klog.V(2).Infof("iptables(%s) supports --random-fully", ipt.Protocol()) + klog.V(2).InfoS("iptables supports --random-fully", "ipFamily", ipt.Protocol()) } else { - klog.V(2).Infof("iptables(%s) does not support --random-fully", ipt.Protocol()) + klog.V(2).InfoS("iptables does not support --random-fully", "ipFamily", ipt.Protocol()) } return proxier, nil @@ -427,7 +426,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { ) if err := ipt.DeleteRule(jump.table, jump.srcChain, args...); err != nil { if !utiliptables.IsNotFoundError(err) { - klog.Errorf("Error removing pure-iptables proxy rule: %v", err) + klog.ErrorS(err, "Error removing pure-iptables proxy rule") encounteredError = true } } @@ -436,7 +435,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Flush and remove all of our "-t nat" chains. iptablesData := bytes.NewBuffer(nil) if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil { - klog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err) + klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT) encounteredError = true } else { existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) @@ -464,7 +463,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Write it. err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { - klog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err) + klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableNAT) metrics.IptablesRestoreFailuresTotal.Inc() encounteredError = true } @@ -473,7 +472,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Flush and remove all of our "-t filter" chains. iptablesData.Reset() if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil { - klog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableFilter, err) + klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter) encounteredError = true } else { existingFilterChains := utiliptables.GetChainLines(utiliptables.TableFilter, iptablesData.Bytes()) @@ -491,7 +490,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { filterLines := append(filterChains.Bytes(), filterRules.Bytes()...) // Write it. if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil { - klog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err) + klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableFilter) metrics.IptablesRestoreFailuresTotal.Inc() encounteredError = true } @@ -663,7 +662,8 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { // is observed. func (proxier *Proxier) OnNodeAdd(node *v1.Node) { if node.Name != proxier.hostname { - klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", + "eventNode", node.Name, "currentNode", proxier.hostname) return } @@ -682,7 +682,8 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) { // node object is observed. func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { if node.Name != proxier.hostname { - klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", + "eventNode", node.Name, "currentNode", proxier.hostname) return } @@ -701,7 +702,8 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { // object is observed. func (proxier *Proxier) OnNodeDelete(node *v1.Node) { if node.Name != proxier.hostname { - klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", + "eventNode", node.Name, "currentNode", proxier.hostname) return } proxier.mu.Lock() @@ -770,23 +772,23 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE if nodePort != 0 { err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) if err != nil { - klog.Errorf("Failed to delete nodeport-related %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) + klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName.String()) } } err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) if err != nil { - klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) + klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName.String()) } for _, extIP := range svcInfo.ExternalIPStrings() { err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) if err != nil { - klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err) + klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName.String(), "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) if err != nil { - klog.Errorf("Failed to delete %s endpoint connections for LoabBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err) + klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName.String(), "loadBalancerIP", lbIP) } } } @@ -815,7 +817,7 @@ func (proxier *Proxier) syncProxyRules() { // don't sync rules till we've received services and endpoints if !proxier.isInitialized() { - klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") + klog.V(2).InfoS("Not syncing iptables until Services and Endpoints have been received from master") return } @@ -823,14 +825,14 @@ func (proxier *Proxier) syncProxyRules() { start := time.Now() defer func() { metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) - klog.V(2).Infof("syncProxyRules took %v", time.Since(start)) + klog.V(2).InfoS("syncProxyRules complete", "elapsed", time.Since(start)) }() localAddrs, err := utilproxy.GetLocalAddrs() if err != nil { - klog.Errorf("Failed to get local addresses during proxy sync: %v, assuming external IPs are not local", err) + klog.ErrorS(err, "Failed to get local addresses during proxy sync, assuming external IPs are not local") } else if len(localAddrs) == 0 { - klog.Warning("No local addresses found, assuming all external IPs are not local") + klog.InfoS("No local addresses found, assuming all external IPs are not local") } localAddrSet := utilnet.IPSet{} @@ -838,7 +840,7 @@ func (proxier *Proxier) syncProxyRules() { nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) if err != nil { - klog.Errorf("Failed to get node ip address matching nodeport cidrs %v, services with nodeport may not work as intended: %v", proxier.nodePortAddresses, err) + klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses) } // We assume that if this was called, we really want to sync them, @@ -851,7 +853,7 @@ func (proxier *Proxier) syncProxyRules() { // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String()) + klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String()) staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { staleServices.Insert(extIP) @@ -859,12 +861,12 @@ func (proxier *Proxier) syncProxyRules() { } } - klog.V(2).Info("Syncing iptables rules") + klog.V(2).InfoS("Syncing iptables rules") success := false defer func() { if !success { - klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod) + klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod) proxier.syncRunner.RetryAfter(proxier.syncPeriod) } }() @@ -872,7 +874,7 @@ func (proxier *Proxier) syncProxyRules() { // Create and link the kube chains. for _, jump := range iptablesJumpChains { if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil { - klog.Errorf("Failed to ensure that %s chain %s exists: %v", jump.table, jump.dstChain, err) + klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain) return } args := append(jump.extraArgs, @@ -880,7 +882,7 @@ func (proxier *Proxier) syncProxyRules() { "-j", string(jump.dstChain), ) if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil { - klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jump.table, jump.srcChain, jump.dstChain, err) + klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain) return } } @@ -888,7 +890,7 @@ func (proxier *Proxier) syncProxyRules() { // ensure KUBE-MARK-DROP chain exist but do not change any rules for _, ch := range iptablesEnsureChains { if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil { - klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err) + klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain) return } } @@ -903,7 +905,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.existingFilterChainsData.Reset() err = proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData) if err != nil { // if we failed to get any rules - klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules") } else { // otherwise parse the output existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes()) } @@ -913,7 +915,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.iptablesData.Reset() err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) if err != nil { // if we failed to get any rules - klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules") } else { // otherwise parse the output existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) } @@ -1009,7 +1011,7 @@ func (proxier *Proxier) syncProxyRules() { for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) if !ok { - klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) + klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String()) continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) @@ -1098,7 +1100,7 @@ func (proxier *Proxier) syncProxyRules() { Protocol: protocol, } if proxier.portsMap[lp] != nil { - klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) @@ -1112,7 +1114,7 @@ func (proxier *Proxier) syncProxyRules() { UID: types.UID(proxier.hostname), Namespace: "", }, v1.EventTypeWarning, err.Error(), msg) - klog.Error(msg) + klog.ErrorS(err, "can't open port, skipping externalIP", "port", lp.String()) continue } replacementPortsMap[lp] = socket @@ -1208,7 +1210,7 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) _, cidr, err := net.ParseCIDR(src) if err != nil { - klog.Errorf("Error parsing %s CIDR in LoadBalancerSourceRanges, dropping: %v", cidr, err) + klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) } else if cidr.Contains(proxier.nodeIP) { allowFromNode = true } @@ -1269,12 +1271,12 @@ func (proxier *Proxier) syncProxyRules() { // For ports on node IPs, open the actual port and hold it. for _, lp := range lps { if proxier.portsMap[lp] != nil { - klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else if svcInfo.Protocol() != v1.ProtocolSCTP { socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6) if err != nil { - klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) + klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String()) continue } if lp.Protocol == "udp" { @@ -1284,7 +1286,7 @@ func (proxier *Proxier) syncProxyRules() { // See issue: https://github.com/kubernetes/kubernetes/issues/49881 err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) if err != nil { - klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err) + klog.ErrorS(err, "Failed to clear udp conntrack", "port", lp.Port) } } replacementPortsMap[lp] = socket @@ -1341,7 +1343,7 @@ func (proxier *Proxier) syncProxyRules() { for _, ep := range allEndpoints { epInfo, ok := ep.(*endpointsInfo) if !ok { - klog.Errorf("Failed to cast endpointsInfo %q", ep.String()) + klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String()) continue } @@ -1526,7 +1528,7 @@ func (proxier *Proxier) syncProxyRules() { } // Ignore IP addresses with incorrect version if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) { - klog.Errorf("IP address %s has incorrect IP version", address) + klog.ErrorS(nil, "IP has incorrect IP version", "ip", address) continue } // create nodeport rules for each IP one by one @@ -1588,13 +1590,13 @@ func (proxier *Proxier) syncProxyRules() { proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) - klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) + klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes()) err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { - klog.Errorf("Failed to execute iptables-restore: %v", err) + klog.ErrorS(err, "Failed to execute iptables-restore") metrics.IptablesRestoreFailuresTotal.Inc() // Revert new local ports. - klog.V(2).Infof("Closing local ports after iptables-restore failure") + klog.V(2).InfoS("Closing local ports after iptables-restore failure") utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } @@ -1604,7 +1606,7 @@ func (proxier *Proxier) syncProxyRules() { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { latency := metrics.SinceInSeconds(lastChangeTriggerTime) metrics.NetworkProgrammingLatency.Observe(latency) - klog.V(4).Infof("Network programming of %s took %f seconds", name, latency) + klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency) } } @@ -1625,21 +1627,21 @@ func (proxier *Proxier) syncProxyRules() { // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { - klog.Errorf("Error syncing healthcheck services: %v", err) + klog.ErrorS(err, "Error syncing healthcheck services") } if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { - klog.Errorf("Error syncing healthcheck endpoints: %v", err) + klog.ErrorS(err, "Error syncing healthcheck endpoints") } // Finish housekeeping. // TODO: these could be made more consistent. - klog.V(4).Infof("Deleting stale services IPs: %v", staleServices.UnsortedList()) + klog.V(4).InfoS("Deleting stale services", "ips", staleServices.UnsortedList()) for _, svcIP := range staleServices.UnsortedList() { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { - klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) + klog.ErrorS(err, "Failed to delete stale service connections", "ip", svcIP) } } - klog.V(4).Infof("Deleting stale endpoint connections: %v", endpointUpdateResult.StaleEndpoints) + klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } @@ -1703,6 +1705,6 @@ func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, e default: return nil, fmt.Errorf("unknown protocol %q", lp.Protocol) } - klog.V(2).Infof("Opened local port %s", lp.String()) + klog.V(2).InfoS("Opened local port", "port", lp.String()) return socket, nil }