Merge pull request #97796 from JornShen/proxier_ipvs_structured_logging

migrate proxy/ipvs/proxier.go logs to structured logging
This commit is contained in:
Kubernetes Prow Robot 2021-05-16 20:05:59 -07:00 committed by GitHub
commit 1d38084930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -359,7 +359,7 @@ func NewProxier(ipt utiliptables.Interface,
// are connected to a Linux bridge (but not SDN bridges). Until most
// plugins handle this, log when config is missing
if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
}
// Set the conntrack sysctl we need for
@ -376,7 +376,7 @@ func NewProxier(ipt utiliptables.Interface,
return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
}
if kernelVersion.LessThan(version.MustParseGeneric(connReuseMinSupportedKernelVersion)) {
klog.Errorf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion)
klog.ErrorS(nil, fmt.Sprintf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion))
} else {
// Set the connection reuse mode
if err := utilproxy.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil {
@ -416,7 +416,7 @@ func NewProxier(ipt utiliptables.Interface,
// current system timeout should be preserved
if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
klog.Warningf("failed to configure IPVS timeouts: %v", err)
klog.ErrorS(err, "failed to configure IPVS timeouts")
}
}
@ -429,10 +429,10 @@ func NewProxier(ipt utiliptables.Interface,
ipFamily = v1.IPv6Protocol
}
klog.V(2).Infof("nodeIP: %v, family: %v", nodeIP, ipFamily)
klog.V(2).InfoS("record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
if len(scheduler) == 0 {
klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
klog.InfoS("IPVS scheduler not specified, use rr by default")
scheduler = DefaultScheduler
}
@ -444,7 +444,7 @@ func NewProxier(ipt utiliptables.Interface,
nodePortAddresses = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.Warningf("IP Family: %s, NodePortAddresses of wrong family; %s", ipFamily, strings.Join(ips, ","))
klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ","))
}
// excludeCIDRs has been validated before, here we just parse it to IPNet list
@ -492,8 +492,7 @@ func NewProxier(ipt utiliptables.Interface,
proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment)
}
burstSyncs := 2
klog.V(2).Infof("ipvs(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d",
ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs)
klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
proxier.gracefuldeleteManager.Run()
return proxier, nil
@ -618,7 +617,7 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
// Find out loaded kernel modules. If this is a full static kernel it will try to verify if the module is compiled using /boot/config-KERNELVERSION
modulesFile, err := os.Open("/proc/modules")
if err == os.ErrNotExist {
klog.Warningf("Failed to read file /proc/modules with error %v. Assuming this is a kernel without loadable modules support enabled", err)
klog.ErrorS(err, "Failed to read file /proc/modules. Assuming this is a kernel without loadable modules support enabled")
kernelConfigFile := fmt.Sprintf("/boot/config-%s", kernelVersionStr)
kConfig, err := ioutil.ReadFile(kernelConfigFile)
if err != nil {
@ -643,7 +642,7 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersionStr)
b, err := ioutil.ReadFile(builtinModsFilePath)
if err != nil {
klog.Warningf("Failed to read file %s with error %v. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", builtinModsFilePath, err)
klog.ErrorS(err, "Failed to read builtin modules file. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "filePath", builtinModsFilePath)
}
for _, module := range ipvsModules {
@ -653,8 +652,8 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
// Try to load the required IPVS kernel modules if not built in
err := handle.executor.Command("modprobe", "--", module).Run()
if err != nil {
klog.Warningf("Failed to load kernel module %v with modprobe. "+
"You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", module)
klog.InfoS("Failed to load kernel module with modprobe. "+
"You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "moduleName", module)
} else {
lmods = append(lmods, module)
}
@ -766,7 +765,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
}
if err := ipt.DeleteRule(jc.table, jc.from, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
encounteredError = true
}
}
@ -776,7 +775,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
for _, ch := range iptablesCleanupChains {
if err := ipt.FlushChain(ch.table, ch.chain); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
encounteredError = true
}
}
@ -786,7 +785,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
for _, ch := range iptablesCleanupChains {
if err := ipt.DeleteChain(ch.table, ch.chain); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
klog.ErrorS(err, "Error removing iptables rules in ipvs proxier")
encounteredError = true
}
}
@ -801,7 +800,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
if ipvs != nil {
err := ipvs.Flush()
if err != nil {
klog.Errorf("Error flushing IPVS rules: %v", err)
klog.ErrorS(err, "Error flushing IPVS rules")
encounteredError = true
}
}
@ -809,7 +808,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
nl := NewNetLinkHandle(false)
err := nl.DeleteDummyDevice(DefaultDummyDevice)
if err != nil {
klog.Errorf("Error deleting dummy device %s created by IPVS proxier: %v", DefaultDummyDevice, err)
klog.ErrorS(err, "Error deleting dummy device created by IPVS proxier", "device", DefaultDummyDevice)
encounteredError = true
}
// Clear iptables created by ipvs Proxier.
@ -820,7 +819,7 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
err = ipset.DestroySet(set.name)
if err != nil {
if !utilipset.IsNotFoundError(err) {
klog.Errorf("Error removing ipset %s, error: %v", set.name, err)
klog.ErrorS(err, "Error removing ipset", "ipset", set.name)
encounteredError = true
}
}
@ -960,7 +959,7 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@ -983,7 +982,7 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@ -1006,7 +1005,7 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node", "eventNode", node.Name, "currentNode", proxier.hostname)
return
}
proxier.mu.Lock()
@ -1021,9 +1020,6 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
func (proxier *Proxier) OnNodeSynced() {
}
// EntryInvalidErr indicates if an ipset entry is invalid or not
const EntryInvalidErr = "error adding entry %s to ipset %s"
// This is where all of the ipvs calls happen.
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() {
@ -1032,7 +1028,7 @@ func (proxier *Proxier) syncProxyRules() {
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master")
klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
return
}
@ -1040,7 +1036,7 @@ func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
}()
// We assume that if this was called, we really want to sync them,
@ -1053,7 +1049,7 @@ func (proxier *Proxier) syncProxyRules() {
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
@ -1061,7 +1057,7 @@ func (proxier *Proxier) syncProxyRules() {
}
}
klog.V(3).Infof("Syncing ipvs Proxier rules")
klog.V(3).InfoS("Syncing ipvs Proxier rules")
// Begin install iptables
@ -1081,7 +1077,7 @@ func (proxier *Proxier) syncProxyRules() {
// make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
_, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
klog.ErrorS(err, "Failed to create dummy interface", "interface", DefaultDummyDevice)
return
}
@ -1104,7 +1100,7 @@ func (proxier *Proxier) syncProxyRules() {
bindedAddresses, err := proxier.ipGetter.BindedIPs()
if err != nil {
klog.Errorf("error listing addresses binded to dummy interface, error: %v", err)
klog.ErrorS(err, "error listing addresses binded to dummy interface")
}
hasNodePort := false
@ -1128,7 +1124,7 @@ func (proxier *Proxier) syncProxyRules() {
if hasNodePort {
nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
klog.Errorf("Failed to get node ip address matching nodeport cidr: %v", err)
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr")
} else {
nodeAddresses = nodeAddrSet.List()
for _, address := range nodeAddresses {
@ -1139,7 +1135,7 @@ func (proxier *Proxier) syncProxyRules() {
if utilproxy.IsZeroCIDR(address) {
nodeIPs, err = proxier.ipGetter.NodeIPs()
if err != nil {
klog.Errorf("Failed to list all node IPs from host, err: %v", err)
klog.ErrorS(err, "Failed to list all node IPs from host")
}
break
}
@ -1165,7 +1161,7 @@ func (proxier *Proxier) syncProxyRules() {
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
continue
}
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
@ -1182,7 +1178,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, e := range proxier.endpointsMap[svcName] {
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String())
continue
}
if !ep.IsLocal {
@ -1202,7 +1198,7 @@ func (proxier *Proxier) syncProxyRules() {
SetType: utilipset.HashIPPortIP,
}
if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
continue
}
proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
@ -1219,7 +1215,7 @@ func (proxier *Proxier) syncProxyRules() {
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
continue
}
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
@ -1242,10 +1238,10 @@ func (proxier *Proxier) syncProxyRules() {
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
if err := proxier.syncEndpoint(svcName, false, svcInfo.NodeLocalInternal(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
}
} else {
klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync service", "service", serv.String())
}
// Capture externalIPs.
@ -1263,7 +1259,7 @@ func (proxier *Proxier) syncProxyRules() {
Protocol: utilnet.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
@ -1280,7 +1276,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
continue
}
klog.V(2).Infof("Opened local port %s", lp.String())
klog.V(2).InfoS("Opened local port", "port", lp.String())
replacementPortsMap[lp] = socket
}
} // We're holding the port, so it's OK to install IPVS rules.
@ -1295,14 +1291,14 @@ func (proxier *Proxier) syncProxyRules() {
if svcInfo.NodeLocalExternal() {
if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPLocalSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
continue
}
proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
} else {
// We have to SNAT packets to external IPs.
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeExternalIPSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
continue
}
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
@ -1326,10 +1322,10 @@ func (proxier *Proxier) syncProxyRules() {
onlyNodeLocalEndpoints := svcInfo.NodeLocalExternal()
onlyNodeLocalEndpointsForInternal := svcInfo.NodeLocalInternal()
if err := proxier.syncEndpoint(svcName, onlyNodeLocalEndpoints, onlyNodeLocalEndpointsForInternal, serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
}
} else {
klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync service", "service", serv.String())
}
}
@ -1348,14 +1344,14 @@ func (proxier *Proxier) syncProxyRules() {
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
if svcInfo.NodeLocalExternal() {
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
@ -1365,7 +1361,7 @@ func (proxier *Proxier) syncProxyRules() {
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name)
continue
}
proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
@ -1381,7 +1377,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// enumerate all white list source cidr
if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
@ -1405,7 +1401,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// enumerate all white list source ip
if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
@ -1427,10 +1423,10 @@ func (proxier *Proxier) syncProxyRules() {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
}
} else {
klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync service", "service", serv)
}
}
}
@ -1464,7 +1460,7 @@ func (proxier *Proxier) syncProxyRules() {
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
// We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP
@ -1483,7 +1479,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
continue
}
klog.V(2).Infof("Opened local port %s", lp.String())
klog.V(2).InfoS("Opened local port", "port", lp.String())
if lp.Protocol == utilnet.UDP {
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
@ -1531,13 +1527,13 @@ func (proxier *Proxier) syncProxyRules() {
}
default:
// It should never hit
klog.Errorf("Unsupported protocol type: %s", protocol)
klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
}
if nodePortSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
entryInvalidErr = true
break
}
@ -1560,13 +1556,13 @@ func (proxier *Proxier) syncProxyRules() {
nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
default:
// It should never hit
klog.Errorf("Unsupported protocol type: %s", protocol)
klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
}
if nodePortLocalSet != nil {
entryInvalidErr := false
for _, entry := range entries {
if valid := nodePortLocalSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortLocalSet.Name)
entryInvalidErr = true
break
}
@ -1595,10 +1591,10 @@ func (proxier *Proxier) syncProxyRules() {
if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), svcInfo.NodeLocalInternal(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
}
} else {
klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
klog.ErrorS(err, "Failed to sync service", "service", serv)
}
}
}
@ -1613,7 +1609,7 @@ func (proxier *Proxier) syncProxyRules() {
}
if valid := nodePortSet.validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
continue
}
nodePortSet.activeEntries.Insert(entry.String())
@ -1637,10 +1633,10 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())
klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
klog.V(5).InfoS("Restoring iptables", "rules", string(proxier.iptablesData.Bytes()))
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
klog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes()))
metrics.IptablesRestoreFailuresTotal.Inc()
// Revert new local ports.
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
@ -1650,7 +1646,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
}
}
@ -1666,7 +1662,7 @@ func (proxier *Proxier) syncProxyRules() {
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to get bind address, err: %v", err)
klog.ErrorS(err, "Failed to get bind address")
}
legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
@ -1677,7 +1673,7 @@ func (proxier *Proxier) syncProxyRules() {
currentIPVSServices[appliedSvc.String()] = appliedSvc
}
} else {
klog.Errorf("Failed to get ipvs service, err: %v", err)
klog.ErrorS(err, "Failed to get ipvs service")
}
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
@ -1690,17 +1686,17 @@ func (proxier *Proxier) syncProxyRules() {
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.Errorf("Error syncing healthcheck services: %v", err)
klog.ErrorS(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping.
// TODO: these could be made more consistent.
for _, svcIP := range staleServices.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
klog.ErrorS(err, "Failed to delete stale service IP connections", "ip", svcIP)
}
}
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
@ -1928,7 +1924,7 @@ func (proxier *Proxier) createAndLinkKubeChain() {
// ensure KUBE-MARK-DROP chain exist but do not change any rules
for _, ch := range iptablesEnsureChains {
if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
return
}
}
@ -1936,7 +1932,7 @@ func (proxier *Proxier) createAndLinkKubeChain() {
// Make sure we keep stats for the top-level chains
for _, ch := range iptablesChains {
if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
return
}
if ch.table == utiliptables.TableNAT {
@ -1957,7 +1953,7 @@ func (proxier *Proxier) createAndLinkKubeChain() {
for _, jc := range iptablesJumpChain {
args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err)
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jc.table, "srcChain", jc.from, "dstChain", jc.to)
}
}
@ -1970,7 +1966,7 @@ func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptabl
buffer.Reset()
err := proxier.iptables.SaveInto(table, buffer)
if err != nil { // if we failed to get any rules
klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else { // otherwise parse the output
return utiliptables.GetChainLines(table, buffer.Bytes())
}
@ -1987,18 +1983,18 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
svcProto := svcInfo.Protocol()
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName.String())
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for LoadBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName.String(), "ip", lbIP)
}
}
}
@ -2010,17 +2006,17 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
if appliedVirtualServer == nil {
// IPVS service is not found, create a new service
klog.V(3).Infof("Adding new service %q %s:%d/%s", svcName, vs.Address, vs.Port, vs.Protocol)
klog.V(3).InfoS("Adding new service", "svcName", svcName, "address", fmt.Sprintf("%s:%d/%s", vs.Address, vs.Port, vs.Protocol))
if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
klog.Errorf("Failed to add IPVS service %q: %v", svcName, err)
klog.ErrorS(err, "Failed to add IPVS service", "svcName", svcName)
return err
}
} else {
// IPVS service was changed, update the existing one
// During updates, service VIP will not go down
klog.V(3).Infof("IPVS service %s was changed", svcName)
klog.V(3).InfoS("IPVS service was changed", "svcName", svcName)
if err := proxier.ipvs.UpdateVirtualServer(vs); err != nil {
klog.Errorf("Failed to update IPVS service, err:%v", err)
klog.ErrorS(err, "Failed to update IPVS service")
return err
}
}
@ -2034,10 +2030,10 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
return nil
}
klog.V(4).Infof("Bind addr %s", vs.Address.String())
klog.V(4).InfoS("Bind addr", "address", vs.Address.String())
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
klog.ErrorS(err, "Failed to bind service address to dummy device", "svcName", svcName)
return err
}
}
@ -2062,7 +2058,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
if err != nil {
klog.Errorf("Failed to list IPVS destinations, error: %v", err)
klog.ErrorS(err, "Failed to list IPVS destinations")
return err
}
for _, des := range curDests {
@ -2077,7 +2073,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// externalTrafficPolicy=Local.
svcInfo, ok := proxier.serviceMap[svcPortName]
if !ok {
klog.Warningf("Unable to filter endpoints due to missing Service info for %s", svcPortName)
klog.InfoS("Unable to filter endpoints due to missing Service info", "svcPortName", svcPortName)
} else {
endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels)
}
@ -2098,12 +2094,12 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
for _, ep := range newEndpoints.List() {
ip, port, err := net.SplitHostPort(ep)
if err != nil {
klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
continue
}
@ -2119,16 +2115,16 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
continue
}
klog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
klog.V(5).InfoS("new ep is in graceful delete list", "uniqueRS", uniqueRS)
err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
if err != nil {
klog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
klog.ErrorS(err, "Failed to delete endpoint in gracefulDeleteQueue", "endpoint", ep)
continue
}
}
err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
if err != nil {
klog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
klog.ErrorS(err, "Failed to add destination", "newDest", newDest)
continue
}
}
@ -2141,12 +2137,12 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
}
ip, port, err := net.SplitHostPort(ep)
if err != nil {
klog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
klog.ErrorS(err, "Failed to parse endpoint", "endpoint", ep)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
klog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
klog.ErrorS(err, "Failed to parse endpoint port", "port", port)
continue
}
@ -2155,10 +2151,10 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
Port: uint16(portNum),
}
klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
klog.V(5).InfoS("Using graceful delete", "uniqueRS", uniqueRS)
err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
if err != nil {
klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
klog.ErrorS(err, "Failed to delete destination", "uniqueRS", uniqueRS)
continue
}
}
@ -2177,15 +2173,15 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre
continue
}
if _, ok := activeServices[cs]; !ok {
klog.V(4).Infof("Delete service %s", svc.String())
klog.V(4).InfoS("Delete service", "service", svc.String())
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
klog.ErrorS(err, "Failed to delete service", "service", svc.String())
}
addr := svc.Address.String()
if _, ok := legacyBindAddrs[addr]; ok {
klog.V(4).Infof("Unbinding address %s", addr)
klog.V(4).InfoS("Unbinding address", "address", addr)
if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", DefaultDummyDevice, "address", addr)
} else {
// In case we delete a multi-port service, avoid trying to unbind multiple times
delete(legacyBindAddrs, addr)