diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index de0bc4032c5..48b61d636dc 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -359,7 +359,7 @@ func NewProxier(ipt utiliptables.Interface, // are connected to a Linux bridge (but not SDN bridges). Until most // plugins handle this, log when config is missing if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { - klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") + klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") } // Set the conntrack sysctl we need for @@ -376,7 +376,7 @@ func NewProxier(ipt utiliptables.Interface, return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err) } if kernelVersion.LessThan(version.MustParseGeneric(connReuseMinSupportedKernelVersion)) { - klog.Errorf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion) + klog.ErrorS(nil, fmt.Sprintf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion)) } else { // Set the connection reuse mode if err := utilproxy.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil { @@ -416,7 +416,7 @@ func NewProxier(ipt utiliptables.Interface, // current system timeout should be preserved if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 { if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil { - klog.Warningf("failed to configure IPVS timeouts: %v", err) + klog.ErrorS(err, "failed to configure IPVS timeouts") } } @@ -429,10 +429,10 @@ func NewProxier(ipt utiliptables.Interface, ipFamily = v1.IPv6Protocol } - klog.V(2).Infof("nodeIP: %v, family: %v", nodeIP, ipFamily) + klog.V(2).InfoS("record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily) if len(scheduler) == 0 { - klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler) + klog.InfoS("IPVS scheduler not specified, use rr by default") scheduler = DefaultScheduler } @@ -444,7 +444,7 @@ func NewProxier(ipt utiliptables.Interface, nodePortAddresses = ipFamilyMap[ipFamily] // Log the IPs not matching the ipFamily if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { - klog.Warningf("IP Family: %s, NodePortAddresses of wrong family; %s", ipFamily, strings.Join(ips, ",")) + klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ",")) } // excludeCIDRs has been validated before, here we just parse it to IPNet list @@ -492,8 +492,7 @@ func NewProxier(ipt utiliptables.Interface, proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment) } burstSyncs := 2 - klog.V(2).Infof("ipvs(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d", - ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs) + klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs) proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) proxier.gracefuldeleteManager.Run() return proxier, nil @@ -618,7 +617,7 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) { // Find out loaded kernel modules. If this is a full static kernel it will try to verify if the module is compiled using /boot/config-KERNELVERSION modulesFile, err := os.Open("/proc/modules") if err == os.ErrNotExist { - klog.Warningf("Failed to read file /proc/modules with error %v. Assuming this is a kernel without loadable modules support enabled", err) + klog.ErrorS(err, "Failed to read file /proc/modules. Assuming this is a kernel without loadable modules support enabled") kernelConfigFile := fmt.Sprintf("/boot/config-%s", kernelVersionStr) kConfig, err := ioutil.ReadFile(kernelConfigFile) if err != nil { @@ -643,7 +642,7 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) { builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersionStr) b, err := ioutil.ReadFile(builtinModsFilePath) if err != nil { - klog.Warningf("Failed to read file %s with error %v. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", builtinModsFilePath, err) + klog.ErrorS(err, "Failed to read builtin modules file. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "filePath", builtinModsFilePath) } for _, module := range ipvsModules { @@ -653,8 +652,8 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) { // Try to load the required IPVS kernel modules if not built in err := handle.executor.Command("modprobe", "--", module).Run() if err != nil { - klog.Warningf("Failed to load kernel module %v with modprobe. "+ - "You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", module) + klog.InfoS("Failed to load kernel module with modprobe. "+ + "You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "moduleName", module) } else { lmods = append(lmods, module) } @@ -766,7 +765,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool } if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil { if !utiliptables.IsNotFoundError(err) { - klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) + klog.ErrorS(err, "Error removing iptables rules in ipvs proxier") encounteredError = true } } @@ -776,7 +775,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool for _, ch := range iptablesCleanupChains { if err := ipt.FlushChain(ch.table, ch.chain); err != nil { if !utiliptables.IsNotFoundError(err) { - klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) + klog.ErrorS(err, "Error removing iptables rules in ipvs proxier") encounteredError = true } } @@ -786,7 +785,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool for _, ch := range iptablesCleanupChains { if err := ipt.DeleteChain(ch.table, ch.chain); err != nil { if !utiliptables.IsNotFoundError(err) { - klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err) + klog.ErrorS(err, "Error removing iptables rules in ipvs proxier") encounteredError = true } } @@ -801,7 +800,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset if ipvs != nil { err := ipvs.Flush() if err != nil { - klog.Errorf("Error flushing IPVS rules: %v", err) + klog.ErrorS(err, "Error flushing IPVS rules") encounteredError = true } } @@ -809,7 +808,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset nl := NewNetLinkHandle(false) err := nl.DeleteDummyDevice(DefaultDummyDevice) if err != nil { - klog.Errorf("Error deleting dummy device %s created by IPVS proxier: %v", DefaultDummyDevice, err) + klog.ErrorS(err, "Error deleting dummy device created by IPVS proxier", "device", DefaultDummyDevice) encounteredError = true } // Clear iptables created by ipvs Proxier. @@ -820,7 +819,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset err = ipset.DestroySet(set.name) if err != nil { if !utilipset.IsNotFoundError(err) { - klog.Errorf("Error removing ipset %s, error: %v", set.name, err) + klog.ErrorS(err, "Error removing ipset", "ipset", set.name) encounteredError = true } } @@ -960,7 +959,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() { // is observed. func (proxier *Proxier) OnNodeAdd(node *v1.Node) { if node.Name != proxier.hostname { - klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname) return } @@ -983,7 +982,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) { // node object is observed. func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { if node.Name != proxier.hostname { - klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname) return } @@ -1006,7 +1005,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) { // object is observed. func (proxier *Proxier) OnNodeDelete(node *v1.Node) { if node.Name != proxier.hostname { - klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname) + klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname) return } proxier.mu.Lock() @@ -1021,9 +1020,6 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { func (proxier *Proxier) OnNodeSynced() { } -// EntryInvalidErr indicates if an ipset entry is invalid or not -const EntryInvalidErr = "error adding entry %s to ipset %s" - // This is where all of the ipvs calls happen. // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { @@ -1032,7 +1028,7 @@ func (proxier *Proxier) syncProxyRules() { // don't sync rules till we've received services and endpoints if !proxier.isInitialized() { - klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master") + klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master") return } @@ -1040,7 +1036,7 @@ func (proxier *Proxier) syncProxyRules() { start := time.Now() defer func() { metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start)) - klog.V(4).Infof("syncProxyRules took %v", time.Since(start)) + klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start)) }() // We assume that if this was called, we really want to sync them, @@ -1053,7 +1049,7 @@ func (proxier *Proxier) syncProxyRules() { // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String()) + klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String()) staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { staleServices.Insert(extIP) @@ -1061,7 +1057,7 @@ func (proxier *Proxier) syncProxyRules() { } } - klog.V(3).Infof("Syncing ipvs Proxier rules") + klog.V(3).InfoS("Syncing ipvs Proxier rules") // Begin install iptables @@ -1081,7 +1077,7 @@ func (proxier *Proxier) syncProxyRules() { // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it _, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice) if err != nil { - klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err) + klog.ErrorS(err, "Failed to create dummy interface", "interface", DefaultDummyDevice) return } @@ -1104,7 +1100,7 @@ func (proxier *Proxier) syncProxyRules() { bindedAddresses, err := proxier.ipGetter.BindedIPs() if err != nil { - klog.Errorf("error listing addresses binded to dummy interface, error: %v", err) + klog.ErrorS(err, "error listing addresses binded to dummy interface") } hasNodePort := false @@ -1128,7 +1124,7 @@ func (proxier *Proxier) syncProxyRules() { if hasNodePort { nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) if err != nil { - klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err) + klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr") } else { nodeAddresses = nodeAddrSet.List() for _, address := range nodeAddresses { @@ -1139,7 +1135,7 @@ func (proxier *Proxier) syncProxyRules() { if utilproxy.IsZeroCIDR(address) { nodeIPs, err = proxier.ipGetter.NodeIPs() if err != nil { - klog.Errorf("Failed to list all node IPs from host, err: %v", err) + klog.ErrorS(err, "Failed to list all node IPs from host") } break } @@ -1165,7 +1161,7 @@ func (proxier *Proxier) syncProxyRules() { for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) if !ok { - klog.Errorf("Failed to cast serviceInfo %q", svcName.String()) + klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String()) continue } isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP()) @@ -1182,7 +1178,7 @@ func (proxier *Proxier) syncProxyRules() { for _, e := range proxier.endpointsMap[svcName] { ep, ok := e.(*proxy.BaseEndpointInfo) if !ok { - klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String()) + klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String()) continue } if !ep.IsLocal { @@ -1202,7 +1198,7 @@ func (proxier *Proxier) syncProxyRules() { SetType: utilipset.HashIPPortIP, } if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name) continue } proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String()) @@ -1219,7 +1215,7 @@ func (proxier *Proxier) syncProxyRules() { // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name) continue } proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) @@ -1242,10 +1238,10 @@ func (proxier *Proxier) syncProxyRules() { // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, svcInfo.NodeLocalInternal(), serv); err != nil { - klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) } } else { - klog.Errorf("Failed to sync service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync service", "service", serv.String()) } // Capture externalIPs. @@ -1263,7 +1259,7 @@ func (proxier *Proxier) syncProxyRules() { Protocol: utilnet.Protocol(svcInfo.Protocol()), } if proxier.portsMap[lp] != nil { - klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := proxier.portMapper.OpenLocalPort(&lp) @@ -1280,7 +1276,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "can't open port, skipping it", "port", lp.String()) continue } - klog.V(2).Infof("Opened local port %s", lp.String()) + klog.V(2).InfoS("Opened local port", "port", lp.String()) replacementPortsMap[lp] = socket } } // We're holding the port, so it's OK to install IPVS rules. @@ -1295,14 +1291,14 @@ func (proxier *Proxier) syncProxyRules() { if svcInfo.NodeLocalExternal() { if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPLocalSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name) continue } proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String()) } else { // We have to SNAT packets to external IPs. if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name) continue } proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String()) @@ -1326,10 +1322,10 @@ func (proxier *Proxier) syncProxyRules() { onlyNodeLocalEndpoints := svcInfo.NodeLocalExternal() onlyNodeLocalEndpointsForInternal := svcInfo.NodeLocalInternal() if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, onlyNodeLocalEndpointsForInternal, serv); err != nil { - klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String()) } } else { - klog.Errorf("Failed to sync service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync service", "service", serv.String()) } } @@ -1348,14 +1344,14 @@ func (proxier *Proxier) syncProxyRules() { // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name) continue } proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local if svcInfo.NodeLocalExternal() { if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name) continue } proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String()) @@ -1365,7 +1361,7 @@ func (proxier *Proxier) syncProxyRules() { // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name) continue } proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String()) @@ -1381,7 +1377,7 @@ func (proxier *Proxier) syncProxyRules() { } // enumerate all white list source cidr if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name) continue } proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String()) @@ -1405,7 +1401,7 @@ func (proxier *Proxier) syncProxyRules() { } // enumerate all white list source ip if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name) continue } proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String()) @@ -1427,10 +1423,10 @@ func (proxier *Proxier) syncProxyRules() { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { - klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) } } else { - klog.Errorf("Failed to sync service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync service", "service", serv) } } } @@ -1464,7 +1460,7 @@ func (proxier *Proxier) syncProxyRules() { // For ports on node IPs, open the actual port and hold it. for _, lp := range lps { if proxier.portsMap[lp] != nil { - klog.V(4).Infof("Port %s was open before and is still needed", lp.String()) + klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] // We do not start listening on SCTP ports, according to our agreement in the // SCTP support KEP @@ -1483,7 +1479,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "can't open port, skipping it", "port", lp.String()) continue } - klog.V(2).Infof("Opened local port %s", lp.String()) + klog.V(2).InfoS("Opened local port", "port", lp.String()) if lp.Protocol == utilnet.UDP { conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) @@ -1531,13 +1527,13 @@ func (proxier *Proxier) syncProxyRules() { } default: // It should never hit - klog.Errorf("Unsupported protocol type: %s", protocol) + klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol) } if nodePortSet != nil { entryInvalidErr := false for _, entry := range entries { if valid := nodePortSet.validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name) entryInvalidErr = true break } @@ -1560,13 +1556,13 @@ func (proxier *Proxier) syncProxyRules() { nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP] default: // It should never hit - klog.Errorf("Unsupported protocol type: %s", protocol) + klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol) } if nodePortLocalSet != nil { entryInvalidErr := false for _, entry := range entries { if valid := nodePortLocalSet.validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortLocalSet.Name) entryInvalidErr = true break } @@ -1595,10 +1591,10 @@ func (proxier *Proxier) syncProxyRules() { if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil { - klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv) } } else { - klog.Errorf("Failed to sync service: %v, err: %v", serv, err) + klog.ErrorS(err, "Failed to sync service", "service", serv) } } } @@ -1613,7 +1609,7 @@ func (proxier *Proxier) syncProxyRules() { } if valid := nodePortSet.validateEntry(entry); !valid { - klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name)) + klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name) continue } nodePortSet.activeEntries.Insert(entry.String()) @@ -1637,10 +1633,10 @@ func (proxier *Proxier) syncProxyRules() { proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes()) - klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) + klog.V(5).InfoS("Restoring iptables", "rules", string(proxier.iptablesData.Bytes())) err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { - klog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes()) + klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes())) metrics.IptablesRestoreFailuresTotal.Inc() // Revert new local ports. utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) @@ -1650,7 +1646,7 @@ func (proxier *Proxier) syncProxyRules() { for _, lastChangeTriggerTime := range lastChangeTriggerTimes { latency := metrics.SinceInSeconds(lastChangeTriggerTime) metrics.NetworkProgrammingLatency.Observe(latency) - klog.V(4).Infof("Network programming of %s took %f seconds", name, latency) + klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency) } } @@ -1666,7 +1662,7 @@ func (proxier *Proxier) syncProxyRules() { // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice) if err != nil { - klog.Errorf("Failed to get bind address, err: %v", err) + klog.ErrorS(err, "Failed to get bind address") } legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs) @@ -1677,7 +1673,7 @@ func (proxier *Proxier) syncProxyRules() { currentIPVSServices[appliedSvc.String()] = appliedSvc } } else { - klog.Errorf("Failed to get ipvs service, err: %v", err) + klog.ErrorS(err, "Failed to get ipvs service") } proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs) @@ -1690,17 +1686,17 @@ func (proxier *Proxier) syncProxyRules() { // not "OnlyLocal", but the services list will not, and the serviceHealthServer // will just drop those endpoints. if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { - klog.Errorf("Error syncing healthcheck services: %v", err) + klog.ErrorS(err, "Error syncing healthcheck services") } if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil { - klog.Errorf("Error syncing healthcheck endpoints: %v", err) + klog.ErrorS(err, "Error syncing healthcheck endpoints") } // Finish housekeeping. // TODO: these could be made more consistent. for _, svcIP := range staleServices.UnsortedList() { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { - klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err) + klog.ErrorS(err, "Failed to delete stale service IP connections", "ip", svcIP) } } proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) @@ -1928,7 +1924,7 @@ func (proxier *Proxier) createAndLinkKubeChain() { // ensure KUBE-MARK-DROP chain exist but do not change any rules for _, ch := range iptablesEnsureChains { if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil { - klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err) + klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain) return } } @@ -1936,7 +1932,7 @@ func (proxier *Proxier) createAndLinkKubeChain() { // Make sure we keep stats for the top-level chains for _, ch := range iptablesChains { if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil { - klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err) + klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain) return } if ch.table == utiliptables.TableNAT { @@ -1957,7 +1953,7 @@ func (proxier *Proxier) createAndLinkKubeChain() { for _, jc := range iptablesJumpChain { args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)} if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil { - klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err) + klog.ErrorS(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to) } } @@ -1970,7 +1966,7 @@ func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptabl buffer.Reset() err := proxier.iptables.SaveInto(table, buffer) if err != nil { // if we failed to get any rules - klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules") } else { // otherwise parse the output return utiliptables.GetChainLines(table, buffer.Bytes()) } @@ -1987,18 +1983,18 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE svcProto := svcInfo.Protocol() err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) if err != nil { - klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) + klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName.String()) } for _, extIP := range svcInfo.ExternalIPStrings() { err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) if err != nil { - klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err) + klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) if err != nil { - klog.Errorf("Failed to delete %s endpoint connections for LoadBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err) + klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", lbIP) } } } @@ -2010,17 +2006,17 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) { if appliedVirtualServer == nil { // IPVS service is not found, create a new service - klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol) + klog.V(3).InfoS("Adding new service", "svcName", svcName, "address", fmt.Sprintf("%s:%d/%s", vs.Address, vs.Port, vs.Protocol)) if err := proxier.ipvs.AddVirtualServer(vs); err != nil { - klog.Errorf("Failed to add IPVS service %q: %v", svcName, err) + klog.ErrorS(err, "Failed to add IPVS service", "svcName", svcName) return err } } else { // IPVS service was changed, update the existing one // During updates, service VIP will not go down - klog.V(3).Infof("IPVS service %s was changed", svcName) + klog.V(3).InfoS("IPVS service was changed", "svcName", svcName) if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil { - klog.Errorf("Failed to update IPVS service, err:%v", err) + klog.ErrorS(err, "Failed to update IPVS service") return err } } @@ -2034,10 +2030,10 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, return nil } - klog.V(4).Infof("Bind addr %s", vs.Address.String()) + klog.V(4).InfoS("Bind addr", "address", vs.Address.String()) _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice) if err != nil { - klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err) + klog.ErrorS(err, "Failed to bind service address to dummy device", "svcName", svcName) return err } } @@ -2062,7 +2058,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer) if err != nil { - klog.Errorf("Failed to list IPVS destinations, error: %v", err) + klog.ErrorS(err, "Failed to list IPVS destinations") return err } for _, des := range curDests { @@ -2077,7 +2073,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // externalTrafficPolicy=Local. svcInfo, ok := proxier.serviceMap[svcPortName] if !ok { - klog.Warningf("Unable to filter endpoints due to missing Service info for %s", svcPortName) + klog.InfoS("Unable to filter endpoints due to missing Service info", "svcPortName", svcPortName) } else { endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels) } @@ -2098,12 +2094,12 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode for _, ep := range newEndpoints.List() { ip, port, err := net.SplitHostPort(ep) if err != nil { - klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err) + klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep) continue } portNum, err := strconv.Atoi(port) if err != nil { - klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err) + klog.ErrorS(err, "Failed to parse endpoint port", "port", port) continue } @@ -2119,16 +2115,16 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) { continue } - klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS) + klog.V(5).InfoS("new ep is in graceful delete list", "uniqueRS", uniqueRS) err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS) if err != nil { - klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err) + klog.ErrorS(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep) continue } } err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest) if err != nil { - klog.Errorf("Failed to add destination: %v, error: %v", newDest, err) + klog.ErrorS(err, "Failed to add destination", "newDest", newDest) continue } } @@ -2141,12 +2137,12 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode } ip, port, err := net.SplitHostPort(ep) if err != nil { - klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err) + klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep) continue } portNum, err := strconv.Atoi(port) if err != nil { - klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err) + klog.ErrorS(err, "Failed to parse endpoint port", "port", port) continue } @@ -2155,10 +2151,10 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode Port: uint16(portNum), } - klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS) + klog.V(5).InfoS("Using graceful delete", "uniqueRS", uniqueRS) err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest) if err != nil { - klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err) + klog.ErrorS(err, "Failed to delete destination", "uniqueRS", uniqueRS) continue } } @@ -2177,15 +2173,15 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre continue } if _, ok := activeServices[cs]; !ok { - klog.V(4).Infof("Delete service %s", svc.String()) + klog.V(4).InfoS("Delete service", "service", svc.String()) if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil { - klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err) + klog.ErrorS(err, "Failed to delete service", "service", svc.String()) } addr := svc.Address.String() if _, ok := legacyBindAddrs[addr]; ok { - klog.V(4).Infof("Unbinding address %s", addr) + klog.V(4).InfoS("Unbinding address", "address", addr) if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil { - klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err) + klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", DefaultDummyDevice, "address", addr) } else { // In case we delete a multi-port service, avoid trying to unbind multiple times delete(legacyBindAddrs, addr)