Merge pull request #97678 from JornShen/proxier_iptables_structured_logging

migrate proxy/iptables/proxier.go logs to structured logging
This commit is contained in:
Kubernetes Prow Robot 2021-01-07 11:51:05 -08:00 committed by GitHub
commit 466e2e3751
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -154,7 +154,7 @@ func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint {
func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
o, ok := other.(*endpointsInfo)
if !ok {
klog.Error("Failed to cast endpointsInfo")
klog.ErrorS(nil, "Failed to cast endpointsInfo")
return false
}
return e.Endpoint == o.Endpoint &&
@ -277,13 +277,13 @@ func NewProxier(ipt utiliptables.Interface,
// are connected to a Linux bridge (but not SDN bridges). Until most
// plugins handle this, log when config is missing
if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
klog.Warning("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
klog.InfoS("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
}
// Generate the masquerade mark to use for SNAT rules.
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).Infof("iptables(%s) masquerade mark: %s", ipt.Protocol(), masqueradeMark)
klog.V(2).InfoS("using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
@ -298,7 +298,7 @@ func NewProxier(ipt utiliptables.Interface,
nodePortAddresses = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.Warningf("IP Family: %s, NodePortAddresses of wrong family; %s", ipFamily, strings.Join(ips, ","))
klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ","))
}
proxier := &Proxier{
@ -331,8 +331,7 @@ func NewProxier(ipt utiliptables.Interface,
}
burstSyncs := 2
klog.V(2).Infof("iptables(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d",
ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs)
klog.V(2).InfoS("iptables sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
// We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
// We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
// time.Hour is arbitrary.
@ -343,9 +342,9 @@ func NewProxier(ipt utiliptables.Interface,
proxier.syncProxyRules, syncPeriod, wait.NeverStop)
if ipt.HasRandomFully() {
klog.V(2).Infof("iptables(%s) supports --random-fully", ipt.Protocol())
klog.V(2).InfoS("iptables supports --random-fully", "ipFamily", ipt.Protocol())
} else {
klog.V(2).Infof("iptables(%s) does not support --random-fully", ipt.Protocol())
klog.V(2).InfoS("iptables does not support --random-fully", "ipFamily", ipt.Protocol())
}
return proxier, nil
@ -427,7 +426,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
)
if err := ipt.DeleteRule(jump.table, jump.srcChain, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.Errorf("Error removing pure-iptables proxy rule: %v", err)
klog.ErrorS(err, "Error removing pure-iptables proxy rule")
encounteredError = true
}
}
@ -436,7 +435,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Flush and remove all of our "-t nat" chains.
iptablesData := bytes.NewBuffer(nil)
if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil {
klog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err)
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT)
encounteredError = true
} else {
existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes())
@ -464,7 +463,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Write it.
err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
klog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableNAT, err)
klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableNAT)
metrics.IptablesRestoreFailuresTotal.Inc()
encounteredError = true
}
@ -473,7 +472,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Flush and remove all of our "-t filter" chains.
iptablesData.Reset()
if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
klog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableFilter, err)
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter)
encounteredError = true
} else {
existingFilterChains := utiliptables.GetChainLines(utiliptables.TableFilter, iptablesData.Bytes())
@ -491,7 +490,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
// Write it.
if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
klog.Errorf("Failed to execute iptables-restore for %s: %v", utiliptables.TableFilter, err)
klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableFilter)
metrics.IptablesRestoreFailuresTotal.Inc()
encounteredError = true
}
@ -663,7 +662,8 @@ func (proxier *Proxier) OnEndpointSlicesSynced() {
// is observed.
func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@ -682,7 +682,8 @@ func (proxier *Proxier) OnNodeAdd(node *v1.Node) {
// node object is observed.
func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
@ -701,7 +702,8 @@ func (proxier *Proxier) OnNodeUpdate(oldNode, node *v1.Node) {
// object is observed.
func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
if node.Name != proxier.hostname {
klog.Errorf("Received a watch event for a node %s that doesn't match the current node %v", node.Name, proxier.hostname)
klog.ErrorS(nil, "Received a watch event for a node that doesn't match the current node",
"eventNode", node.Name, "currentNode", proxier.hostname)
return
}
proxier.mu.Lock()
@ -770,23 +772,23 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto)
if err != nil {
klog.Errorf("Failed to delete nodeport-related %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName.String())
}
}
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName.String())
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName.String(), "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for LoabBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName.String(), "loadBalancerIP", lbIP)
}
}
}
@ -815,7 +817,7 @@ func (proxier *Proxier) syncProxyRules() {
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
klog.V(2).InfoS("Not syncing iptables until Services and Endpoints have been received from master")
return
}
@ -823,14 +825,14 @@ func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(2).Infof("syncProxyRules took %v", time.Since(start))
klog.V(2).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
}()
localAddrs, err := utilproxy.GetLocalAddrs()
if err != nil {
klog.Errorf("Failed to get local addresses during proxy sync: %v, assuming external IPs are not local", err)
klog.ErrorS(err, "Failed to get local addresses during proxy sync, assuming external IPs are not local")
} else if len(localAddrs) == 0 {
klog.Warning("No local addresses found, assuming all external IPs are not local")
klog.InfoS("No local addresses found, assuming all external IPs are not local")
}
localAddrSet := utilnet.IPSet{}
@ -838,7 +840,7 @@ func (proxier *Proxier) syncProxyRules() {
nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
klog.Errorf("Failed to get node ip address matching nodeport cidrs %v, services with nodeport may not work as intended: %v", proxier.nodePortAddresses, err)
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
}
// We assume that if this was called, we really want to sync them,
@ -851,7 +853,7 @@ func (proxier *Proxier) syncProxyRules() {
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
@ -859,12 +861,12 @@ func (proxier *Proxier) syncProxyRules() {
}
}
klog.V(2).Info("Syncing iptables rules")
klog.V(2).InfoS("Syncing iptables rules")
success := false
defer func() {
if !success {
klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod)
klog.InfoS("Sync failed", "retryingTime", proxier.syncPeriod)
proxier.syncRunner.RetryAfter(proxier.syncPeriod)
}
}()
@ -872,7 +874,7 @@ func (proxier *Proxier) syncProxyRules() {
// Create and link the kube chains.
for _, jump := range iptablesJumpChains {
if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
klog.Errorf("Failed to ensure that %s chain %s exists: %v", jump.table, jump.dstChain, err)
klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain)
return
}
args := append(jump.extraArgs,
@ -880,7 +882,7 @@ func (proxier *Proxier) syncProxyRules() {
"-j", string(jump.dstChain),
)
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jump.table, jump.srcChain, jump.dstChain, err)
klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain)
return
}
}
@ -888,7 +890,7 @@ func (proxier *Proxier) syncProxyRules() {
// ensure KUBE-MARK-DROP chain exist but do not change any rules
for _, ch := range iptablesEnsureChains {
if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
klog.ErrorS(err, "Failed to ensure chain exists", "table", ch.table, "chain", ch.chain)
return
}
}
@ -903,7 +905,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.existingFilterChainsData.Reset()
err = proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
if err != nil { // if we failed to get any rules
klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else { // otherwise parse the output
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
}
@ -913,7 +915,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Reset()
err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil { // if we failed to get any rules
klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
klog.ErrorS(err, "Failed to execute iptables-save, syncing all rules")
} else { // otherwise parse the output
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
}
@ -1009,7 +1011,7 @@ func (proxier *Proxier) syncProxyRules() {
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
continue
}
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
@ -1098,7 +1100,7 @@ func (proxier *Proxier) syncProxyRules() {
Protocol: protocol,
}
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
@ -1112,7 +1114,7 @@ func (proxier *Proxier) syncProxyRules() {
UID: types.UID(proxier.hostname),
Namespace: "",
}, v1.EventTypeWarning, err.Error(), msg)
klog.Error(msg)
klog.ErrorS(err, "can't open port, skipping externalIP", "port", lp.String())
continue
}
replacementPortsMap[lp] = socket
@ -1208,7 +1210,7 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
_, cidr, err := net.ParseCIDR(src)
if err != nil {
klog.Errorf("Error parsing %s CIDR in LoadBalancerSourceRanges, dropping: %v", cidr, err)
klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr)
} else if cidr.Contains(proxier.nodeIP) {
allowFromNode = true
}
@ -1269,12 +1271,12 @@ func (proxier *Proxier) syncProxyRules() {
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
klog.ErrorS(err, "can't open port, skipping this nodePort", "port", lp.String())
continue
}
if lp.Protocol == "udp" {
@ -1284,7 +1286,7 @@ func (proxier *Proxier) syncProxyRules() {
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
klog.ErrorS(err, "Failed to clear udp conntrack", "port", lp.Port)
}
}
replacementPortsMap[lp] = socket
@ -1341,7 +1343,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String())
continue
}
@ -1526,7 +1528,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Ignore IP addresses with incorrect version
if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
klog.Errorf("IP address %s has incorrect IP version", address)
klog.ErrorS(nil, "IP has incorrect IP version", "ip", address)
continue
}
// create nodeport rules for each IP one by one
@ -1588,13 +1590,13 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
klog.Errorf("Failed to execute iptables-restore: %v", err)
klog.ErrorS(err, "Failed to execute iptables-restore")
metrics.IptablesRestoreFailuresTotal.Inc()
// Revert new local ports.
klog.V(2).Infof("Closing local ports after iptables-restore failure")
klog.V(2).InfoS("Closing local ports after iptables-restore failure")
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
}
@ -1604,7 +1606,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
metrics.NetworkProgrammingLatency.Observe(latency)
klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
}
}
@ -1625,21 +1627,21 @@ func (proxier *Proxier) syncProxyRules() {
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
// will just drop those endpoints.
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.Errorf("Error syncing healthcheck services: %v", err)
klog.ErrorS(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping.
// TODO: these could be made more consistent.
klog.V(4).Infof("Deleting stale services IPs: %v", staleServices.UnsortedList())
klog.V(4).InfoS("Deleting stale services", "ips", staleServices.UnsortedList())
for _, svcIP := range staleServices.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
klog.ErrorS(err, "Failed to delete stale service connections", "ip", svcIP)
}
}
klog.V(4).Infof("Deleting stale endpoint connections: %v", endpointUpdateResult.StaleEndpoints)
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
@ -1703,6 +1705,6 @@ func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, e
default:
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
}
klog.V(2).Infof("Opened local port %s", lp.String())
klog.V(2).InfoS("Opened local port", "port", lp.String())
return socket, nil
}