mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #113521 from daschott/user/daschott/winkernel-stale-vips
resolve winkernel proxier treating stale VIPs as valid
This commit is contained in:
commit
b60b0c74c9
@ -20,6 +20,7 @@ limitations under the License.
|
|||||||
package winkernel
|
package winkernel
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/sha1"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
@ -250,11 +251,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
|
|||||||
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
|
loadBalancers := make(map[loadBalancerIdentifier]*(loadBalancerInfo))
|
||||||
for _, lb := range lbs {
|
for _, lb := range lbs {
|
||||||
portMap := lb.PortMappings[0]
|
portMap := lb.PortMappings[0]
|
||||||
|
// Compute hash from backends (endpoint IDs)
|
||||||
|
hash, err := hashEndpoints(lb.HostComputeEndpoints)
|
||||||
|
if err != nil {
|
||||||
|
klog.V(2).ErrorS(err, "Error hashing endpoints", "policy", lb)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if len(lb.FrontendVIPs) == 0 {
|
if len(lb.FrontendVIPs) == 0 {
|
||||||
// Leave VIP uninitialized
|
// Leave VIP uninitialized
|
||||||
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsCount: len(lb.HostComputeEndpoints)}
|
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, endpointsHash: hash}
|
||||||
} else {
|
} else {
|
||||||
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsCount: len(lb.HostComputeEndpoints)}
|
id = loadBalancerIdentifier{protocol: uint16(portMap.Protocol), internalPort: portMap.InternalPort, externalPort: portMap.ExternalPort, vip: lb.FrontendVIPs[0], endpointsHash: hash}
|
||||||
}
|
}
|
||||||
loadBalancers[id] = &loadBalancerInfo{
|
loadBalancers[id] = &loadBalancerInfo{
|
||||||
hnsID: lb.Id,
|
hnsID: lb.Id,
|
||||||
@ -267,11 +274,17 @@ func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerIn
|
|||||||
func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
|
func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlags, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16, previousLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) (*loadBalancerInfo, error) {
|
||||||
var id loadBalancerIdentifier
|
var id loadBalancerIdentifier
|
||||||
vips := []string{}
|
vips := []string{}
|
||||||
|
// Compute hash from backends (endpoint IDs)
|
||||||
|
hash, err := hashEndpoints(endpoints)
|
||||||
|
if err != nil {
|
||||||
|
klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if len(vip) > 0 {
|
if len(vip) > 0 {
|
||||||
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsCount: len(endpoints)}
|
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}
|
||||||
vips = append(vips, vip)
|
vips = append(vips, vip)
|
||||||
} else {
|
} else {
|
||||||
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsCount: len(endpoints)}
|
id = loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}
|
||||||
}
|
}
|
||||||
|
|
||||||
if lb, found := previousLoadBalancers[id]; found {
|
if lb, found := previousLoadBalancers[id]; found {
|
||||||
@ -354,5 +367,46 @@ func (hns hns) deleteLoadBalancer(hnsID string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = lb.Delete()
|
err = lb.Delete()
|
||||||
|
if err != nil {
|
||||||
|
// There is a bug in Windows Server 2019, that can cause the delete call to fail sometimes. We retry one more time.
|
||||||
|
// TODO: The logic in syncProxyRules should be rewritten in the future to better stage and handle a call like this failing using the policyApplied fields.
|
||||||
|
klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource. Attempting one more time...", "loadBalancer", lb)
|
||||||
|
return lb.Delete()
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calculates a hash from the given endpoint IDs.
|
||||||
|
func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err error) {
|
||||||
|
var id string
|
||||||
|
// Recover in case something goes wrong. Return error and null byte array.
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
err = r.(error)
|
||||||
|
hash = [20]byte{}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Iterate over endpoints, compute hash
|
||||||
|
for _, ep := range endpoints {
|
||||||
|
switch x := any(ep).(type) {
|
||||||
|
case endpointsInfo:
|
||||||
|
id = x.hnsID
|
||||||
|
case string:
|
||||||
|
id = x
|
||||||
|
}
|
||||||
|
if len(id) > 0 {
|
||||||
|
// We XOR the hashes of endpoints, since they are an unordered set.
|
||||||
|
// This can cause collisions, but is sufficient since we are using other keys to identify the load balancer.
|
||||||
|
hash = xor(hash, sha1.Sum(([]byte(id))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func xor(b1 [20]byte, b2 [20]byte) (xorbytes [20]byte) {
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
xorbytes[i] = b1[i] ^ b2[i]
|
||||||
|
}
|
||||||
|
return xorbytes
|
||||||
|
}
|
||||||
|
@ -37,6 +37,7 @@ const (
|
|||||||
gatewayAddress = "192.168.1.1"
|
gatewayAddress = "192.168.1.1"
|
||||||
epMacAddress = "00-11-22-33-44-55"
|
epMacAddress = "00-11-22-33-44-55"
|
||||||
epIpAddress = "192.168.1.3"
|
epIpAddress = "192.168.1.3"
|
||||||
|
epIpAddressB = "192.168.1.4"
|
||||||
epIpAddressRemote = "192.168.2.3"
|
epIpAddressRemote = "192.168.2.3"
|
||||||
epPaAddress = "10.0.0.3"
|
epPaAddress = "10.0.0.3"
|
||||||
protocol = 6
|
protocol = 6
|
||||||
@ -302,15 +303,20 @@ func TestGetLoadBalancerExisting(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
// We populate this to ensure we test for getting existing load balancer
|
|
||||||
id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsCount: len(Endpoints)}
|
|
||||||
lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id}
|
|
||||||
|
|
||||||
endpoint := &endpointsInfo{
|
endpoint := &endpointsInfo{
|
||||||
ip: Endpoint.IpConfigurations[0].IpAddress,
|
ip: Endpoint.IpConfigurations[0].IpAddress,
|
||||||
hnsID: Endpoint.Id,
|
hnsID: Endpoint.Id,
|
||||||
}
|
}
|
||||||
endpoints := []endpointsInfo{*endpoint}
|
endpoints := []endpointsInfo{*endpoint}
|
||||||
|
hash, err := hashEndpoints(endpoints)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We populate this to ensure we test for getting existing load balancer
|
||||||
|
id := loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: serviceVip, endpointsHash: hash}
|
||||||
|
lbs[id] = &loadBalancerInfo{hnsID: LoadBalancer.Id}
|
||||||
|
|
||||||
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
|
lb, err := hns.getLoadBalancer(endpoints, loadBalancerFlags{}, sourceVip, serviceVip, protocol, internalPort, externalPort, lbs)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -452,6 +458,80 @@ func mustTestNetwork(t *testing.T) *hcn.HostComputeNetwork {
|
|||||||
return network
|
return network
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHashEndpoints(t *testing.T) {
|
||||||
|
Network := mustTestNetwork(t)
|
||||||
|
// Create endpoint A
|
||||||
|
ipConfigA := &hcn.IpConfig{
|
||||||
|
IpAddress: epIpAddress,
|
||||||
|
}
|
||||||
|
endpointASpec := &hcn.HostComputeEndpoint{
|
||||||
|
IpConfigurations: []hcn.IpConfig{*ipConfigA},
|
||||||
|
MacAddress: epMacAddress,
|
||||||
|
SchemaVersion: hcn.SchemaVersion{
|
||||||
|
Major: 2,
|
||||||
|
Minor: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
endpointA, err := Network.CreateEndpoint(endpointASpec)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
endpointInfoA := &endpointsInfo{
|
||||||
|
ip: endpointA.IpConfigurations[0].IpAddress,
|
||||||
|
hnsID: endpointA.Id,
|
||||||
|
}
|
||||||
|
// Create Endpoint B
|
||||||
|
ipConfigB := &hcn.IpConfig{
|
||||||
|
IpAddress: epIpAddressB,
|
||||||
|
}
|
||||||
|
endpointBSpec := &hcn.HostComputeEndpoint{
|
||||||
|
IpConfigurations: []hcn.IpConfig{*ipConfigB},
|
||||||
|
MacAddress: epMacAddress,
|
||||||
|
SchemaVersion: hcn.SchemaVersion{
|
||||||
|
Major: 2,
|
||||||
|
Minor: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
endpointB, err := Network.CreateEndpoint(endpointBSpec)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
endpointInfoB := &endpointsInfo{
|
||||||
|
ip: endpointB.IpConfigurations[0].IpAddress,
|
||||||
|
hnsID: endpointB.Id,
|
||||||
|
}
|
||||||
|
endpoints := []endpointsInfo{*endpointInfoA, *endpointInfoB}
|
||||||
|
endpointsReverse := []endpointsInfo{*endpointInfoB, *endpointInfoA}
|
||||||
|
h1, err := hashEndpoints(endpoints)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
} else if len(h1) < 1 {
|
||||||
|
t.Error("HashEndpoints failed for endpoints", endpoints)
|
||||||
|
}
|
||||||
|
|
||||||
|
h2, err := hashEndpoints(endpointsReverse)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if h1 != h2 {
|
||||||
|
t.Errorf("%x does not match %x", h1, h2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
err = endpointA.Delete()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
err = endpointB.Delete()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
err = Network.Delete()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func createTestNetwork() (*hcn.HostComputeNetwork, error) {
|
func createTestNetwork() (*hcn.HostComputeNetwork, error) {
|
||||||
network := &hcn.HostComputeNetwork{
|
network := &hcn.HostComputeNetwork{
|
||||||
Type: NETWORK_TYPE_OVERLAY,
|
Type: NETWORK_TYPE_OVERLAY,
|
||||||
|
@ -101,7 +101,7 @@ type loadBalancerIdentifier struct {
|
|||||||
internalPort uint16
|
internalPort uint16
|
||||||
externalPort uint16
|
externalPort uint16
|
||||||
vip string
|
vip string
|
||||||
endpointsCount int
|
endpointsHash [20]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type loadBalancerFlags struct {
|
type loadBalancerFlags struct {
|
||||||
@ -153,7 +153,6 @@ func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) {
|
|||||||
} else {
|
} else {
|
||||||
panic("Windows HNS Api V2 required. This version of windows does not support API V2")
|
panic("Windows HNS Api V2 required. This version of windows does not support API V2")
|
||||||
}
|
}
|
||||||
|
|
||||||
return h, supportedFeatures
|
return h, supportedFeatures
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -784,7 +783,7 @@ func CleanupLeftovers() (encounteredError bool) {
|
|||||||
func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) {
|
func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) {
|
||||||
klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo)
|
klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo)
|
||||||
// Skip the svcInfo.policyApplied check to remove all the policies
|
// Skip the svcInfo.policyApplied check to remove all the policies
|
||||||
svcInfo.deleteAllHnsLoadBalancerPolicy()
|
svcInfo.deleteLoadBalancerPolicy()
|
||||||
// Cleanup Endpoints references
|
// Cleanup Endpoints references
|
||||||
for _, ep := range endpoints {
|
for _, ep := range endpoints {
|
||||||
epInfo, ok := ep.(*endpointsInfo)
|
epInfo, ok := ep.(*endpointsInfo)
|
||||||
@ -799,28 +798,49 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) {
|
|||||||
svcInfo.policyApplied = false
|
svcInfo.policyApplied = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
|
func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() {
|
||||||
// Remove the Hns Policy corresponding to this service
|
// Remove the Hns Policy corresponding to this service
|
||||||
hns := svcInfo.hns
|
hns := svcInfo.hns
|
||||||
hns.deleteLoadBalancer(svcInfo.hnsID)
|
if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil {
|
||||||
|
klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP())
|
||||||
|
} else {
|
||||||
|
// On successful delete, remove hnsId
|
||||||
svcInfo.hnsID = ""
|
svcInfo.hnsID = ""
|
||||||
|
}
|
||||||
|
|
||||||
hns.deleteLoadBalancer(svcInfo.nodePorthnsID)
|
if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil {
|
||||||
|
klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort())
|
||||||
|
} else {
|
||||||
|
// On successful delete, remove hnsId
|
||||||
svcInfo.nodePorthnsID = ""
|
svcInfo.nodePorthnsID = ""
|
||||||
|
}
|
||||||
|
|
||||||
for _, externalIP := range svcInfo.externalIPs {
|
for _, externalIP := range svcInfo.externalIPs {
|
||||||
hns.deleteLoadBalancer(externalIP.hnsID)
|
if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil {
|
||||||
|
klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip)
|
||||||
|
} else {
|
||||||
|
// On successful delete, remove hnsId
|
||||||
externalIP.hnsID = ""
|
externalIP.hnsID = ""
|
||||||
}
|
}
|
||||||
|
}
|
||||||
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
|
||||||
hns.deleteLoadBalancer(lbIngressIP.hnsID)
|
if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil {
|
||||||
|
klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip)
|
||||||
|
} else {
|
||||||
|
// On successful delete, remove hnsId
|
||||||
lbIngressIP.hnsID = ""
|
lbIngressIP.hnsID = ""
|
||||||
|
}
|
||||||
|
|
||||||
if lbIngressIP.healthCheckHnsID != "" {
|
if lbIngressIP.healthCheckHnsID != "" {
|
||||||
hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID)
|
if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil {
|
||||||
|
klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip)
|
||||||
|
} else {
|
||||||
|
// On successful delete, remove hnsId
|
||||||
lbIngressIP.healthCheckHnsID = ""
|
lbIngressIP.healthCheckHnsID = ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func deleteAllHnsLoadBalancerPolicy() {
|
func deleteAllHnsLoadBalancerPolicy() {
|
||||||
plists, err := hcsshim.HNSListPolicyListRequest()
|
plists, err := hcsshim.HNSListPolicyListRequest()
|
||||||
|
Loading…
Reference in New Issue
Block a user