mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
fixing dead endpoint black hole udp traffic
This commit is contained in:
parent
bf0e6e0047
commit
4fa6f3841a
@ -103,28 +103,28 @@ kube::build::get_docker_wrapped_binaries() {
|
||||
kube-apiserver,busybox
|
||||
kube-controller-manager,busybox
|
||||
kube-scheduler,busybox
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v2
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v3
|
||||
);;
|
||||
"arm")
|
||||
local targets=(
|
||||
kube-apiserver,armel/busybox
|
||||
kube-controller-manager,armel/busybox
|
||||
kube-scheduler,armel/busybox
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-arm:v2
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-arm:v3
|
||||
);;
|
||||
"arm64")
|
||||
local targets=(
|
||||
kube-apiserver,aarch64/busybox
|
||||
kube-controller-manager,aarch64/busybox
|
||||
kube-scheduler,aarch64/busybox
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v2
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v3
|
||||
);;
|
||||
"ppc64le")
|
||||
local targets=(
|
||||
kube-apiserver,ppc64le/busybox
|
||||
kube-controller-manager,ppc64le/busybox
|
||||
kube-scheduler,ppc64le/busybox
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v2
|
||||
kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v3
|
||||
);;
|
||||
esac
|
||||
|
||||
|
@ -22,4 +22,5 @@ CROSS_BUILD_COPY qemu-ARCH-static /usr/bin/
|
||||
# cleanup has no effect.
|
||||
RUN DEBIAN_FRONTEND=noninteractive apt-get update \
|
||||
&& DEBIAN_FRONTEND=noninteractive apt-get install -y iptables \
|
||||
&& DEBIAN_FRONTEND=noninteractive apt-get install -y conntrack \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
REGISTRY?="gcr.io/google_containers"
|
||||
IMAGE=debian-iptables
|
||||
TAG=v2
|
||||
TAG=v3
|
||||
ARCH?=amd64
|
||||
TEMP_DIR:=$(shell mktemp -d)
|
||||
|
||||
|
@ -41,6 +41,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/slice"
|
||||
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
|
||||
)
|
||||
@ -70,6 +71,8 @@ const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
|
||||
// TODO(thockin): Remove this for v1.3 or v1.4.
|
||||
const oldIptablesMasqueradeMark = "0x4d415351"
|
||||
|
||||
const noConnectionToDelete = "0 flow entries have been deleted"
|
||||
|
||||
// IptablesVersioner can query the current iptables version.
|
||||
type IptablesVersioner interface {
|
||||
// returns "X.Y.Z"
|
||||
@ -160,6 +163,7 @@ type Proxier struct {
|
||||
iptables utiliptables.Interface
|
||||
masqueradeAll bool
|
||||
masqueradeMark string
|
||||
exec utilexec.Interface
|
||||
}
|
||||
|
||||
type localPort struct {
|
||||
@ -220,6 +224,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
|
||||
iptables: ipt,
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
exec: exec,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -434,15 +439,21 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||
}
|
||||
}
|
||||
|
||||
staleUDPService := sets.NewString()
|
||||
// Remove services missing from the update.
|
||||
for name := range proxier.serviceMap {
|
||||
if !activeServices[name] {
|
||||
glog.V(1).Infof("Removing service %q", name)
|
||||
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
|
||||
staleUDPService.Insert(proxier.serviceMap[name].clusterIP.String())
|
||||
}
|
||||
delete(proxier.serviceMap, name)
|
||||
}
|
||||
}
|
||||
|
||||
proxier.syncProxyRules()
|
||||
proxier.deleteServiceConnection(staleUDPService.List())
|
||||
|
||||
}
|
||||
|
||||
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
||||
@ -457,6 +468,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
proxier.haveReceivedEndpointsUpdate = true
|
||||
|
||||
activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
|
||||
staleConnections := make(map[endpointServicePair]bool)
|
||||
|
||||
// Update endpoints for services.
|
||||
for i := range allEndpoints {
|
||||
@ -480,7 +492,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
|
||||
curEndpoints := proxier.endpointsMap[svcPort]
|
||||
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
|
||||
|
||||
if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
|
||||
removedEndpoints := getRemovedEndpoints(curEndpoints, newEndpoints)
|
||||
for _, ep := range removedEndpoints {
|
||||
staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
|
||||
}
|
||||
glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
|
||||
proxier.endpointsMap[svcPort] = newEndpoints
|
||||
}
|
||||
@ -491,12 +508,18 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
// Remove endpoints missing from the update.
|
||||
for name := range proxier.endpointsMap {
|
||||
if !activeEndpoints[name] {
|
||||
// record endpoints of unactive service to stale connections
|
||||
for _, ep := range proxier.endpointsMap[name] {
|
||||
staleConnections[endpointServicePair{endpoint: ep, servicePortName: name}] = true
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Removing endpoints for %q", name)
|
||||
delete(proxier.endpointsMap, name)
|
||||
}
|
||||
}
|
||||
|
||||
proxier.syncProxyRules()
|
||||
proxier.deleteEndpointConnection(staleConnections)
|
||||
}
|
||||
|
||||
// used in OnEndpointsUpdate
|
||||
@ -552,6 +575,62 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp
|
||||
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
|
||||
}
|
||||
|
||||
// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints
|
||||
func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string {
|
||||
return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List()
|
||||
}
|
||||
|
||||
type endpointServicePair struct {
|
||||
endpoint string
|
||||
servicePortName proxy.ServicePortName
|
||||
}
|
||||
|
||||
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
|
||||
// risk sending more traffic to it, all of which will be lost (because UDP).
|
||||
// This assumes the proxier mutex is held
|
||||
func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServicePair]bool) {
|
||||
for epSvcPair := range connectionMap {
|
||||
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
|
||||
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0]
|
||||
glog.V(2).Infof("Deleting connection to service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
|
||||
err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
// TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
// is expensive to baby sit all udp connections to kubernetes services.
|
||||
glog.Errorf("conntrack return with error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deleteServiceConnection use conntrack-tool to delete udp connection specified by service ip
|
||||
func (proxier *Proxier) deleteServiceConnection(svcIPs []string) {
|
||||
for _, ip := range svcIPs {
|
||||
glog.V(2).Infof("Deleting udp connection to service IP %s", ip)
|
||||
err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp")
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
// TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
// is expensive to baby sit all udp connections to kubernetes services.
|
||||
glog.Errorf("conntrack return with error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//execConntrackTool executes conntrack tool using given paramters
|
||||
func (proxier *Proxier) execConntrackTool(parameters ...string) error {
|
||||
conntrackPath, err := proxier.exec.LookPath("conntrack")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error looking for path of conntrack: %v", err)
|
||||
}
|
||||
output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Conntrack command returns: %s, error message: %s", string(output), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is where all of the iptables-save/restore calls happen.
|
||||
// The only other iptables rules are those that are setup in iptablesInit()
|
||||
// assumes proxier.mu is held
|
||||
|
Loading…
Reference in New Issue
Block a user