mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
migrate proxy/winkernel/proxier.go logs to structured logging
This commit is contained in:
parent
d72c056260
commit
f3b9e8b105
@ -192,7 +192,7 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
|
|||||||
} else if len(elbPolicy.VIPs) != 0 {
|
} else if len(elbPolicy.VIPs) != 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
LogJson(plist, "Found existing Hns loadbalancer policy resource", 1)
|
LogJson("policyList", plist, "Found existing Hns loadbalancer policy resource", 1)
|
||||||
return &loadBalancerInfo{
|
return &loadBalancerInfo{
|
||||||
hnsID: plist.ID,
|
hnsID: plist.ID,
|
||||||
}, nil
|
}, nil
|
||||||
@ -218,7 +218,7 @@ func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
|
|||||||
)
|
)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
LogJson(lb, "Hns loadbalancer policy resource", 1)
|
LogJson("policyList", lb, "Hns loadbalancer policy resource", 1)
|
||||||
} else {
|
} else {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -237,7 +237,7 @@ func (hns hnsV1) deleteLoadBalancer(hnsID string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
LogJson(hnsloadBalancer, "Removing Policy", 2)
|
LogJson("policyList", hnsloadBalancer, "Removing Policy", 2)
|
||||||
|
|
||||||
_, err = hnsloadBalancer.Delete()
|
_, err = hnsloadBalancer.Delete()
|
||||||
return err
|
return err
|
||||||
|
@ -201,7 +201,7 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
|
|||||||
} else if len(plist.FrontendVIPs) != 0 {
|
} else if len(plist.FrontendVIPs) != 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
LogJson(plist, "Found existing Hns loadbalancer policy resource", 1)
|
LogJson("policyList", plist, "Found existing Hns loadbalancer policy resource", 1)
|
||||||
return &loadBalancerInfo{
|
return &loadBalancerInfo{
|
||||||
hnsID: plist.Id,
|
hnsID: plist.Id,
|
||||||
}, nil
|
}, nil
|
||||||
@ -280,7 +280,7 @@ func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFl
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
LogJson(lb, "Hns loadbalancer policy resource", 1)
|
LogJson("hostComputeLoadBalancer", lb, "Hns loadbalancer policy resource", 1)
|
||||||
|
|
||||||
return &loadBalancerInfo{
|
return &loadBalancerInfo{
|
||||||
hnsID: lb.Id,
|
hnsID: lb.Id,
|
||||||
|
@ -139,13 +139,13 @@ type remoteSubnetInfo struct {
|
|||||||
const NETWORK_TYPE_OVERLAY = "overlay"
|
const NETWORK_TYPE_OVERLAY = "overlay"
|
||||||
|
|
||||||
func Log(v interface{}, message string, level klog.Level) {
|
func Log(v interface{}, message string, level klog.Level) {
|
||||||
klog.V(level).Infof("%s, %s", message, spewSdump(v))
|
klog.V(level).InfoS("%s", message, "spewConfig", spewSdump(v))
|
||||||
}
|
}
|
||||||
|
|
||||||
func LogJson(v interface{}, message string, level klog.Level) {
|
func LogJson(interfaceName string, v interface{}, message string, level klog.Level) {
|
||||||
jsonString, err := json.Marshal(v)
|
jsonString, err := json.Marshal(v)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
klog.V(level).Infof("%s, %s", message, string(jsonString))
|
klog.V(level).InfoS("%s", message, interfaceName, string(jsonString))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -250,15 +250,15 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName)
|
|||||||
svcInfo, ok := svc.(*serviceInfo)
|
svcInfo, ok := svc.(*serviceInfo)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
|
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcPortName", svcPortName.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(3).Infof("Endpoints are modified. Service [%v] is stale", *svcPortName)
|
klog.V(3).InfoS("Endpoints are modified. Service is stale", "svcPortName", svcPortName.String())
|
||||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
|
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
|
||||||
} else {
|
} else {
|
||||||
// If no service exists, just cleanup the remote endpoints
|
// If no service exists, just cleanup the remote endpoints
|
||||||
klog.V(3).Infof("Endpoints are orphaned. Cleaning up")
|
klog.V(3).InfoS("Endpoints are orphaned. Cleaning up")
|
||||||
// Cleanup Endpoints references
|
// Cleanup Endpoints references
|
||||||
epInfos, exists := proxier.endpointsMap[*svcPortName]
|
epInfos, exists := proxier.endpointsMap[*svcPortName]
|
||||||
|
|
||||||
@ -297,11 +297,11 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
|
|||||||
svcInfo, ok := svc.(*serviceInfo)
|
svcInfo, ok := svc.(*serviceInfo)
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
|
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcPortName", svcPortName.String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(3).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, svcInfo.ClusterIP(), svcInfo.Port(), svcInfo.Protocol())
|
klog.V(3).InfoS("Updating existing service port", "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol())
|
||||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
|
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -356,12 +356,12 @@ func (ep *endpointsInfo) Cleanup() {
|
|||||||
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
|
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
|
||||||
// Remove only remote endpoints created by this service
|
// Remove only remote endpoints created by this service
|
||||||
if *ep.refCount <= 0 && !ep.GetIsLocal() {
|
if *ep.refCount <= 0 && !ep.GetIsLocal() {
|
||||||
klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
|
klog.V(4).InfoS("Removing endpoints, since no one is referencing it", "endpoint", ep.String())
|
||||||
err := ep.hns.deleteEndpoint(ep.hnsID)
|
err := ep.hns.deleteEndpoint(ep.hnsID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
ep.hnsID = ""
|
ep.hnsID = ""
|
||||||
} else {
|
} else {
|
||||||
klog.Errorf("Endpoint deletion failed for %v: %v", ep.IP(), err)
|
klog.ErrorS(err, "Endpoint deletion failed", "ip", ep.IP())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -415,7 +415,7 @@ func (network hnsNetworkInfo) findRemoteSubnetProviderAddress(ip string) string
|
|||||||
for _, rs := range network.remoteSubnets {
|
for _, rs := range network.remoteSubnets {
|
||||||
_, ipNet, err := net.ParseCIDR(rs.destinationPrefix)
|
_, ipNet, err := net.ParseCIDR(rs.destinationPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Fatalf("%v", err)
|
klog.ErrorS(err, "Failed to parse CIDR")
|
||||||
}
|
}
|
||||||
if ipNet.Contains(net.ParseIP(ip)) {
|
if ipNet.Contains(net.ParseIP(ip)) {
|
||||||
providerAddress = rs.providerAddress
|
providerAddress = rs.providerAddress
|
||||||
@ -528,12 +528,12 @@ func NewProxier(
|
|||||||
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
|
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
|
||||||
|
|
||||||
if nodeIP == nil {
|
if nodeIP == nil {
|
||||||
klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
|
klog.InfoS("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
|
||||||
nodeIP = net.ParseIP("127.0.0.1")
|
nodeIP = net.ParseIP("127.0.0.1")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(clusterCIDR) == 0 {
|
if len(clusterCIDR) == 0 {
|
||||||
klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
|
klog.InfoS("clusterCIDR not specified, unable to distinguish between internal and external traffic")
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
|
||||||
@ -546,20 +546,20 @@ func NewProxier(
|
|||||||
|
|
||||||
hnsNetworkName := config.NetworkName
|
hnsNetworkName := config.NetworkName
|
||||||
if len(hnsNetworkName) == 0 {
|
if len(hnsNetworkName) == 0 {
|
||||||
klog.V(3).Infof("network-name flag not set. Checking environment variable")
|
klog.V(3).InfoS("network-name flag not set. Checking environment variable")
|
||||||
hnsNetworkName = os.Getenv("KUBE_NETWORK")
|
hnsNetworkName = os.Getenv("KUBE_NETWORK")
|
||||||
if len(hnsNetworkName) == 0 {
|
if len(hnsNetworkName) == 0 {
|
||||||
return nil, fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
|
return nil, fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(3).Infof("Cleaning up old HNS policy lists")
|
klog.V(3).InfoS("Cleaning up old HNS policy lists")
|
||||||
deleteAllHnsLoadBalancerPolicy()
|
deleteAllHnsLoadBalancerPolicy()
|
||||||
|
|
||||||
// Get HNS network information
|
// Get HNS network information
|
||||||
hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
|
hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
|
||||||
for err != nil {
|
for err != nil {
|
||||||
klog.Errorf("Unable to find HNS Network specified by %s. Please check network name and CNI deployment", hnsNetworkName)
|
klog.ErrorS(err, "Unable to find HNS Network specified. Please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
|
hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
|
||||||
}
|
}
|
||||||
@ -574,7 +574,7 @@ func NewProxier(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(1).Infof("Hns Network loaded with info = %v", hnsNetworkInfo)
|
klog.V(1).InfoS("Hns Network loaded", "hnsNetworkInfo", hnsNetworkInfo)
|
||||||
isDSR := config.EnableDSR
|
isDSR := config.EnableDSR
|
||||||
if isDSR && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinDSR) {
|
if isDSR && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinDSR) {
|
||||||
return nil, fmt.Errorf("WinDSR feature gate not enabled")
|
return nil, fmt.Errorf("WinDSR feature gate not enabled")
|
||||||
@ -605,7 +605,7 @@ func NewProxier(
|
|||||||
for _, addr := range addresses {
|
for _, addr := range addresses {
|
||||||
addrIP, _, _ := net.ParseCIDR(addr.String())
|
addrIP, _, _ := net.ParseCIDR(addr.String())
|
||||||
if addrIP.String() == nodeIP.String() {
|
if addrIP.String() == nodeIP.String() {
|
||||||
klog.V(2).Infof("Host MAC address is %s", inter.HardwareAddr.String())
|
klog.V(2).InfoS("record Host MAC address", "addr", inter.HardwareAddr.String())
|
||||||
hostMac = inter.HardwareAddr.String()
|
hostMac = inter.HardwareAddr.String()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -649,7 +649,7 @@ func NewProxier(
|
|||||||
proxier.serviceChanges = serviceChanges
|
proxier.serviceChanges = serviceChanges
|
||||||
|
|
||||||
burstSyncs := 2
|
burstSyncs := 2
|
||||||
klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
|
klog.V(3).InfoS("record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
|
||||||
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
|
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
|
||||||
return proxier, nil
|
return proxier, nil
|
||||||
}
|
}
|
||||||
@ -740,10 +740,10 @@ func deleteAllHnsLoadBalancerPolicy() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, plist := range plists {
|
for _, plist := range plists {
|
||||||
LogJson(plist, "Remove Policy", 3)
|
LogJson("policyList", plist, "Remove Policy", 3)
|
||||||
_, err = plist.Delete()
|
_, err = plist.Delete()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("%v", err)
|
klog.ErrorS(err, "Failed to delete policy list")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -752,7 +752,7 @@ func deleteAllHnsLoadBalancerPolicy() {
|
|||||||
func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
|
func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
|
||||||
hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName)
|
hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("%v", err)
|
klog.ErrorS(err, "Failed to get HNS Network by name")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -834,12 +834,12 @@ func (proxier *Proxier) OnServiceSynced() {
|
|||||||
func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
|
func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
|
||||||
// if ClusterIP is "None" or empty, skip proxying
|
// if ClusterIP is "None" or empty, skip proxying
|
||||||
if !helper.IsServiceIPSet(service) {
|
if !helper.IsServiceIPSet(service) {
|
||||||
klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
klog.V(3).InfoS("Skipping service due to clusterIP", "svcName", svcName.String(), "clusterIP", service.Spec.ClusterIP)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
|
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
|
||||||
if service.Spec.Type == v1.ServiceTypeExternalName {
|
if service.Spec.Type == v1.ServiceTypeExternalName {
|
||||||
klog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName)
|
klog.V(3).InfoS("Skipping service due to Type=ExternalName", "svcName", svcName.String())
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
@ -918,7 +918,7 @@ func (proxier *Proxier) cleanupAllPolicies() {
|
|||||||
for svcName, svc := range proxier.serviceMap {
|
for svcName, svc := range proxier.serviceMap {
|
||||||
svcInfo, ok := svc.(*serviceInfo)
|
svcInfo, ok := svc.(*serviceInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
|
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName])
|
svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName])
|
||||||
@ -947,11 +947,11 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
|
||||||
klog.V(4).Infof("syncProxyRules took %v", time.Since(start))
|
klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
|
||||||
}()
|
}()
|
||||||
// don't sync rules till we've received services and endpoints
|
// don't sync rules till we've received services and endpoints
|
||||||
if !proxier.isInitialized() {
|
if !proxier.isInitialized() {
|
||||||
klog.V(2).Info("Not syncing hns until Services and Endpoints have been received from master")
|
klog.V(2).InfoS("Not syncing hns until Services and Endpoints have been received from master")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -961,7 +961,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
prevNetworkID := proxier.network.id
|
prevNetworkID := proxier.network.id
|
||||||
updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
|
updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
|
||||||
if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
|
if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
|
||||||
klog.Infof("The HNS network %s is not present or has changed since the last sync. Please check the CNI deployment", hnsNetworkName)
|
klog.InfoS("The HNS network %s is not present or has changed since the last sync. Please check the CNI deployment", "hnsNetworkName", hnsNetworkName)
|
||||||
proxier.cleanupAllPolicies()
|
proxier.cleanupAllPolicies()
|
||||||
if updatedNetwork != nil {
|
if updatedNetwork != nil {
|
||||||
proxier.network = *updatedNetwork
|
proxier.network = *updatedNetwork
|
||||||
@ -979,7 +979,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// merge stale services gathered from updateEndpointsMap
|
// merge stale services gathered from updateEndpointsMap
|
||||||
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
|
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
|
||||||
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
||||||
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
|
klog.V(2).InfoS("Stale udp service", "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
|
||||||
staleServices.Insert(svcInfo.ClusterIP().String())
|
staleServices.Insert(svcInfo.ClusterIP().String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -990,30 +990,30 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
|
_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Source Vip endpoint creation failed: %v", err)
|
klog.ErrorS(err, "Source Vip endpoint creation failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(3).Infof("Syncing Policies")
|
klog.V(3).InfoS("Syncing Policies")
|
||||||
|
|
||||||
// Program HNS by adding corresponding policies for each service.
|
// Program HNS by adding corresponding policies for each service.
|
||||||
for svcName, svc := range proxier.serviceMap {
|
for svcName, svc := range proxier.serviceMap {
|
||||||
svcInfo, ok := svc.(*serviceInfo)
|
svcInfo, ok := svc.(*serviceInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
|
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if svcInfo.policyApplied {
|
if svcInfo.policyApplied {
|
||||||
klog.V(4).Infof("Policy already applied for %s", spewSdump(svcInfo))
|
klog.V(4).InfoS("Policy already applied", "spewConfig", spewSdump(svcInfo))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
||||||
serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.ClusterIP().String(), hnsNetworkName)
|
serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.ClusterIP().String(), hnsNetworkName)
|
||||||
if serviceVipEndpoint == nil {
|
if serviceVipEndpoint == nil {
|
||||||
klog.V(4).Infof("No existing remote endpoint for service VIP %v", svcInfo.ClusterIP().String())
|
klog.V(4).InfoS("No existing remote endpoint", "ip", svcInfo.ClusterIP().String())
|
||||||
hnsEndpoint := &endpointsInfo{
|
hnsEndpoint := &endpointsInfo{
|
||||||
ip: svcInfo.ClusterIP().String(),
|
ip: svcInfo.ClusterIP().String(),
|
||||||
isLocal: false,
|
isLocal: false,
|
||||||
@ -1023,7 +1023,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
|
newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Remote endpoint creation failed for service VIP: %v", err)
|
klog.ErrorS(err, "Remote endpoint creation failed for service VIP")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1035,7 +1035,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
var hnsEndpoints []endpointsInfo
|
var hnsEndpoints []endpointsInfo
|
||||||
var hnsLocalEndpoints []endpointsInfo
|
var hnsLocalEndpoints []endpointsInfo
|
||||||
klog.V(4).Infof("====Applying Policy for %s====", svcName)
|
klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName.String())
|
||||||
// Create Remote endpoints for every endpoint, corresponding to the service
|
// Create Remote endpoints for every endpoint, corresponding to the service
|
||||||
containsPublicIP := false
|
containsPublicIP := false
|
||||||
containsNodeIP := false
|
containsNodeIP := false
|
||||||
@ -1043,7 +1043,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
for _, epInfo := range proxier.endpointsMap[svcName] {
|
for _, epInfo := range proxier.endpointsMap[svcName] {
|
||||||
ep, ok := epInfo.(*endpointsInfo)
|
ep, ok := epInfo.(*endpointsInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
klog.Errorf("Failed to cast endpointsInfo %q", svcName.String())
|
klog.ErrorS(nil, "Failed to cast endpointsInfo", "svcName", svcName.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1074,23 +1074,23 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
if newHnsEndpoint == nil {
|
if newHnsEndpoint == nil {
|
||||||
if ep.GetIsLocal() {
|
if ep.GetIsLocal() {
|
||||||
klog.Errorf("Local endpoint not found for %v: err: %v on network %s", ep.IP(), err, hnsNetworkName)
|
klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
|
||||||
klog.Infof("Updating network %v to check for new remote subnet policies", proxier.network.name)
|
klog.InfoS("Updating network to check for new remote subnet policies", "networkName", proxier.network.name)
|
||||||
networkName := proxier.network.name
|
networkName := proxier.network.name
|
||||||
updatedNetwork, err := hns.getNetworkByName(networkName)
|
updatedNetwork, err := hns.getNetworkByName(networkName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Unable to find HNS Network specified by %s. Please check network name and CNI deployment", hnsNetworkName)
|
klog.ErrorS(err, "Unable to find HNS Network specified. Please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
|
||||||
proxier.cleanupAllPolicies()
|
proxier.cleanupAllPolicies()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
proxier.network = *updatedNetwork
|
proxier.network = *updatedNetwork
|
||||||
providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
|
providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
|
||||||
if len(providerAddress) == 0 {
|
if len(providerAddress) == 0 {
|
||||||
klog.Infof("Could not find provider address for %s. Assuming it is a public IP", ep.IP())
|
klog.InfoS("Could not find provider address. Assuming it is a public IP", "ip", ep.IP())
|
||||||
providerAddress = proxier.nodeIP.String()
|
providerAddress = proxier.nodeIP.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1103,7 +1103,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
|
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Remote endpoint creation failed: %v, %s", err, spewSdump(hnsEndpoint))
|
klog.ErrorS(err, "Remote endpoint creation failed", "spewConfig", spewSdump(hnsEndpoint))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1116,7 +1116,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
|
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Remote endpoint creation failed: %v", err)
|
klog.ErrorS(err, "Remote endpoint creation failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1138,14 +1138,14 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
isNodeIP := (ep.IP() == providerAddress)
|
isNodeIP := (ep.IP() == providerAddress)
|
||||||
isPublicIP := (len(providerAddress) == 0)
|
isPublicIP := (len(providerAddress) == 0)
|
||||||
klog.Infof("Endpoint %s on overlay network %s is classified as NodeIp: %v, Public Ip: %v", ep.IP(), hnsNetworkName, isNodeIP, isPublicIP)
|
klog.InfoS("Endpoint on overlay network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName, "isNodeIP", isNodeIP, "isPublicIP", isPublicIP)
|
||||||
|
|
||||||
containsNodeIP = containsNodeIP || isNodeIP
|
containsNodeIP = containsNodeIP || isNodeIP
|
||||||
containsPublicIP = containsPublicIP || isPublicIP
|
containsPublicIP = containsPublicIP || isPublicIP
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the hnsId for reference
|
// Save the hnsId for reference
|
||||||
LogJson(newHnsEndpoint, "Hns Endpoint resource", 1)
|
LogJson("endpointInfo", newHnsEndpoint, "Hns Endpoint resource", 1)
|
||||||
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
|
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
|
||||||
if newHnsEndpoint.GetIsLocal() {
|
if newHnsEndpoint.GetIsLocal() {
|
||||||
hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
|
hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
|
||||||
@ -1160,19 +1160,19 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
Log(ep, "Endpoint resource found", 3)
|
Log(ep, "Endpoint resource found", 3)
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(3).Infof("Associated endpoints [%s] for service [%s]", spewSdump(hnsEndpoints), svcName)
|
klog.V(3).InfoS("Associated endpoints for service", "spewConfig", spewSdump(hnsEndpoints), "svcName", svcName.String())
|
||||||
|
|
||||||
if len(svcInfo.hnsID) > 0 {
|
if len(svcInfo.hnsID) > 0 {
|
||||||
// This should not happen
|
// This should not happen
|
||||||
klog.Warningf("Load Balancer already exists %s -- Debug ", svcInfo.hnsID)
|
klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(hnsEndpoints) == 0 {
|
if len(hnsEndpoints) == 0 {
|
||||||
klog.Errorf("Endpoint information not available for service %s. Not applying any policy", svcName)
|
klog.ErrorS(nil, "Endpoint information not available for service. Not applying any policy", "svcName", svcName.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("Trying to Apply Policies for service %s", spewSdump(svcInfo))
|
klog.V(4).Infof("Trying to Apply Policies for service", "spewConfig", spewSdump(svcInfo))
|
||||||
var hnsLoadBalancer *loadBalancerInfo
|
var hnsLoadBalancer *loadBalancerInfo
|
||||||
var sourceVip = proxier.sourceVip
|
var sourceVip = proxier.sourceVip
|
||||||
if containsPublicIP || containsNodeIP {
|
if containsPublicIP || containsNodeIP {
|
||||||
@ -1181,7 +1181,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
|
sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
|
||||||
if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity {
|
if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity {
|
||||||
klog.Warningf("Session Affinity is not supported on this version of Windows.")
|
klog.InfoS("Session Affinity is not supported on this version of Windows.")
|
||||||
}
|
}
|
||||||
|
|
||||||
hnsLoadBalancer, err := hns.getLoadBalancer(
|
hnsLoadBalancer, err := hns.getLoadBalancer(
|
||||||
@ -1194,12 +1194,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
uint16(svcInfo.Port()),
|
uint16(svcInfo.Port()),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Policy creation failed: %v", err)
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
svcInfo.hnsID = hnsLoadBalancer.hnsID
|
svcInfo.hnsID = hnsLoadBalancer.hnsID
|
||||||
klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.ClusterIP(), hnsLoadBalancer.hnsID)
|
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
|
||||||
|
|
||||||
// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
|
// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
|
||||||
if svcInfo.NodePort() > 0 {
|
if svcInfo.NodePort() > 0 {
|
||||||
@ -1219,12 +1219,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
uint16(svcInfo.NodePort()),
|
uint16(svcInfo.NodePort()),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Policy creation failed: %v", err)
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
|
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
|
||||||
klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.ClusterIP(), hnsLoadBalancer.hnsID)
|
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a Load Balancer Policy for each external IP
|
// Create a Load Balancer Policy for each external IP
|
||||||
@ -1245,11 +1245,11 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
uint16(svcInfo.Port()),
|
uint16(svcInfo.Port()),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Policy creation failed: %v", err)
|
klog.ErrorS(err, "Policy creation failed", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
externalIP.hnsID = hnsLoadBalancer.hnsID
|
externalIP.hnsID = hnsLoadBalancer.hnsID
|
||||||
klog.V(3).Infof("Hns LoadBalancer resource created for externalIP resources %v, Id[%s]", externalIP, hnsLoadBalancer.hnsID)
|
klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
|
||||||
}
|
}
|
||||||
// Create a Load Balancer Policy for each loadbalancer ingress
|
// Create a Load Balancer Policy for each loadbalancer ingress
|
||||||
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
||||||
@ -1268,11 +1268,11 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
uint16(svcInfo.Port()),
|
uint16(svcInfo.Port()),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Policy creation failed: %v", err)
|
klog.ErrorS(err, "Policy creation failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
|
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
|
||||||
klog.V(3).Infof("Hns LoadBalancer resource created for loadBalancer Ingress resources %v", lbIngressIP)
|
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
|
||||||
}
|
}
|
||||||
svcInfo.policyApplied = true
|
svcInfo.policyApplied = true
|
||||||
Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
|
Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
|
||||||
@ -1287,17 +1287,17 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
|
||||||
// will just drop those endpoints.
|
// will just drop those endpoints.
|
||||||
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
|
||||||
klog.Errorf("Error syncing healthcheck services: %v", err)
|
klog.ErrorS(err, "Error syncing healthcheck services")
|
||||||
}
|
}
|
||||||
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
|
||||||
klog.Errorf("Error syncing healthcheck endpoints: %v", err)
|
klog.ErrorS(err, "Error syncing healthcheck endpoints")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish housekeeping.
|
// Finish housekeeping.
|
||||||
// TODO: these could be made more consistent.
|
// TODO: these could be made more consistent.
|
||||||
for _, svcIP := range staleServices.UnsortedList() {
|
for _, svcIP := range staleServices.UnsortedList() {
|
||||||
// TODO : Check if this is required to cleanup stale services here
|
// TODO : Check if this is required to cleanup stale services here
|
||||||
klog.V(5).Infof("Pending delete stale service IP %s connections", svcIP)
|
klog.V(5).InfoS("Pending delete stale service IP connections", "ip", svcIP)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove stale endpoint refcount entries
|
// remove stale endpoint refcount entries
|
||||||
|
Loading…
Reference in New Issue
Block a user