mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Clean up iptables rules, add nodeport support
This commit is contained in:
parent
d14c98f6cc
commit
731d5e5191
@ -82,18 +82,15 @@ func ShouldUseIptablesProxier() (bool, error) {
|
||||
return !version.LessThan(*minVersion), nil
|
||||
}
|
||||
|
||||
type portal struct {
|
||||
ip net.IP
|
||||
port int
|
||||
protocol api.Protocol
|
||||
}
|
||||
|
||||
// internal struct for string service information
|
||||
type serviceInfo struct {
|
||||
portal portal
|
||||
clusterIP net.IP
|
||||
port int
|
||||
protocol api.Protocol
|
||||
nodePort int
|
||||
loadBalancerStatus api.LoadBalancerStatus
|
||||
sessionAffinityType api.ServiceAffinity
|
||||
stickyMaxAgeMinutes int
|
||||
stickyMaxAgeSeconds int
|
||||
endpoints []string
|
||||
// Deprecated, but required for back-compat (including e2e)
|
||||
deprecatedPublicIPs []string
|
||||
@ -103,7 +100,7 @@ type serviceInfo struct {
|
||||
func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
|
||||
return &serviceInfo{
|
||||
sessionAffinityType: api.ServiceAffinityNone, // default
|
||||
stickyMaxAgeMinutes: 180, // TODO: paramaterize this in the API.
|
||||
stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API.
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,10 +178,10 @@ func tearDownUserspaceIptables(ipt utiliptables.Interface) {
|
||||
}
|
||||
|
||||
func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
|
||||
if info.portal.protocol != port.Protocol || info.portal.port != port.Port {
|
||||
if info.protocol != port.Protocol || info.port != port.Port || info.nodePort != port.NodePort {
|
||||
return false
|
||||
}
|
||||
if !info.portal.ip.Equal(net.ParseIP(service.Spec.ClusterIP)) {
|
||||
if !info.clusterIP.Equal(net.ParseIP(service.Spec.ClusterIP)) {
|
||||
return false
|
||||
}
|
||||
if !ipsEqual(info.deprecatedPublicIPs, service.Spec.DeprecatedPublicIPs) {
|
||||
@ -243,7 +240,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||
|
||||
// if ClusterIP is "None" or empty, skip proxying
|
||||
if !api.IsServiceIPSet(service) {
|
||||
glog.V(3).Infof("Skipping service %s due to portal IP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
|
||||
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -266,9 +263,10 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||
serviceIP := net.ParseIP(service.Spec.ClusterIP)
|
||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
|
||||
info = newServiceInfo(serviceName)
|
||||
info.portal.ip = serviceIP
|
||||
info.portal.port = servicePort.Port
|
||||
info.portal.protocol = servicePort.Protocol
|
||||
info.clusterIP = serviceIP
|
||||
info.port = servicePort.Port
|
||||
info.protocol = servicePort.Protocol
|
||||
info.nodePort = servicePort.NodePort
|
||||
info.deprecatedPublicIPs = service.Spec.DeprecatedPublicIPs
|
||||
// Deep-copy in case the service instance changes
|
||||
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
||||
@ -413,7 +411,7 @@ func (proxier *Proxier) syncProxyRules() error {
|
||||
glog.V(2).Info("not syncing iptables until Services and Endpoints have been received from master")
|
||||
return nil
|
||||
}
|
||||
glog.V(4).Infof("Syncing iptables rules.")
|
||||
glog.V(4).Infof("Syncing iptables rules")
|
||||
|
||||
// Ensure main chains and rules are installed.
|
||||
inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting}
|
||||
@ -451,7 +449,6 @@ func (proxier *Proxier) syncProxyRules() error {
|
||||
// Get iptables-save output so we can check for existing chains and rules.
|
||||
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
|
||||
existingChains := make(map[utiliptables.Chain]string)
|
||||
// run iptables-save
|
||||
iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableNAT)
|
||||
if err != nil { // if we failed to get any rules
|
||||
glog.Errorf("Failed to execute iptable-save, syncing all rules. %s", err.Error())
|
||||
@ -459,126 +456,213 @@ func (proxier *Proxier) syncProxyRules() error {
|
||||
existingChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
|
||||
}
|
||||
|
||||
// for first line and chains
|
||||
var chainsLines bytes.Buffer
|
||||
// for the actual rules (which should be after the list of chains)
|
||||
var rulesLines bytes.Buffer
|
||||
chainsLines := bytes.NewBuffer(nil)
|
||||
rulesLines := bytes.NewBuffer(nil)
|
||||
|
||||
// write table header
|
||||
chainsLines.WriteString("*nat\n")
|
||||
// Write table header.
|
||||
writeLine(chainsLines, "*nat")
|
||||
|
||||
// Make sure we keep stats for the top-level chains, if they existed
|
||||
// (which they should have because we created them above).
|
||||
if chain, ok := existingChains[iptablesServicesChain]; ok {
|
||||
chainsLines.WriteString(fmt.Sprintf("%s\n", chain))
|
||||
writeLine(chainsLines, chain)
|
||||
} else {
|
||||
chainsLines.WriteString(makeChainLine(iptablesServicesChain))
|
||||
writeLine(chainsLines, makeChainLine(iptablesServicesChain))
|
||||
}
|
||||
if chain, ok := existingChains[iptablesNodePortsChain]; ok {
|
||||
writeLine(chainsLines, chain)
|
||||
} else {
|
||||
writeLine(chainsLines, makeChainLine(iptablesNodePortsChain))
|
||||
}
|
||||
|
||||
newHostChains := []utiliptables.Chain{}
|
||||
newServiceChains := []utiliptables.Chain{}
|
||||
// Accumulate chains to keep.
|
||||
activeChains := make(map[utiliptables.Chain]bool) // use a map as a set
|
||||
|
||||
//Build rules for services
|
||||
// Build rules for each service.
|
||||
for name, info := range proxier.serviceMap {
|
||||
protocol := strings.ToLower((string)(info.portal.protocol))
|
||||
// get chain name
|
||||
protocol := strings.ToLower((string)(info.protocol))
|
||||
|
||||
// Create the per-service chain, retaining counters if possible.
|
||||
svcChain := servicePortToServiceChain(name)
|
||||
// Create chain
|
||||
if chain, ok := existingChains[svcChain]; ok {
|
||||
chainsLines.WriteString(fmt.Sprintf("%s\n", chain))
|
||||
writeLine(chainsLines, chain)
|
||||
} else {
|
||||
chainsLines.WriteString(makeChainLine(svcChain))
|
||||
}
|
||||
// get hosts and host-Chains
|
||||
hosts := make([]string, 0)
|
||||
hostChains := make([]utiliptables.Chain, 0)
|
||||
for _, ep := range info.endpoints {
|
||||
hosts = append(hosts, ep)
|
||||
hostChains = append(hostChains, servicePortAndEndpointToServiceChain(name, ep))
|
||||
writeLine(chainsLines, makeChainLine(svcChain))
|
||||
}
|
||||
activeChains[svcChain] = true
|
||||
|
||||
// Ensure we know what chains to flush/remove next time we generate the rules
|
||||
newHostChains = append(newHostChains, hostChains...)
|
||||
newServiceChains = append(newServiceChains, svcChain)
|
||||
// Capture the clusterIP.
|
||||
writeLine(rulesLines,
|
||||
"-A", string(iptablesServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", name.String()),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", info.clusterIP.String()),
|
||||
"--dport", fmt.Sprintf("%d", info.port),
|
||||
"-j", string(svcChain))
|
||||
|
||||
// write chain and sticky session rule
|
||||
for _, hostChain := range hostChains {
|
||||
// Create chain
|
||||
if chain, ok := existingChains[utiliptables.Chain(hostChain)]; ok {
|
||||
chainsLines.WriteString(fmt.Sprintf("%s\n", chain))
|
||||
} else {
|
||||
chainsLines.WriteString(makeChainLine(hostChain))
|
||||
}
|
||||
|
||||
// Sticky session
|
||||
if info.sessionAffinityType == api.ServiceAffinityClientIP {
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --rcheck --seconds %d --reap -j %s\n", svcChain, name.String(), hostChain, info.stickyMaxAgeMinutes*60, hostChain))
|
||||
// Capture externalIPs.
|
||||
for _, externalIP := range info.deprecatedPublicIPs {
|
||||
args := []string{
|
||||
"-A", string(iptablesServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", name.String()),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", externalIP),
|
||||
"--dport", fmt.Sprintf("%d", info.port),
|
||||
}
|
||||
// We have to SNAT packets from external IPs.
|
||||
writeLine(rulesLines, append(args,
|
||||
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
|
||||
writeLine(rulesLines, append(args,
|
||||
"-j", string(svcChain))...)
|
||||
}
|
||||
|
||||
// write proxy/loadblanacing rules
|
||||
n := len(hostChains)
|
||||
for i, hostChain := range hostChains {
|
||||
// Roughly round robin statistically if we have more than one host
|
||||
if i < (n - 1) {
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m statistic --mode random --probability %f -j %s\n", svcChain, name.String(), 1.0/float64(n-i), hostChain))
|
||||
} else {
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j %s\n", svcChain, name.String(), hostChain))
|
||||
}
|
||||
// proxy
|
||||
if info.sessionAffinityType == api.ServiceAffinityClientIP {
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -m recent --name %s --set -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), hostChain, protocol, hosts[i]))
|
||||
} else {
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"%s\" -j DNAT -p %s --to-destination %s\n", hostChain, name.String(), protocol, hosts[i]))
|
||||
}
|
||||
}
|
||||
|
||||
// proxy
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), info.portal.ip.String(), protocol, protocol, info.portal.port, svcChain))
|
||||
|
||||
for _, publicIP := range info.deprecatedPublicIPs {
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"deprecated-PublicIP portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), publicIP, protocol, protocol, info.portal.port, svcChain))
|
||||
}
|
||||
|
||||
// Capture load-balancer ingress.
|
||||
for _, ingress := range info.loadBalancerStatus.Ingress {
|
||||
if ingress.IP != "" {
|
||||
rulesLines.WriteString(fmt.Sprintf("-A %s -m comment --comment \"load-balancer portal for %s\" -d %s/32 -m state --state NEW -p %s -m %s --dport %d -j %s\n", iptablesServicesChain, name.String(), ingress.IP, protocol, protocol, info.portal.port, svcChain))
|
||||
args := []string{
|
||||
"-A", string(iptablesServicesChain),
|
||||
"-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", name.String()),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", ingress.IP),
|
||||
"--dport", fmt.Sprintf("%d", info.port),
|
||||
}
|
||||
// We have to SNAT packets from external IPs.
|
||||
writeLine(rulesLines, append(args,
|
||||
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
|
||||
writeLine(rulesLines, append(args,
|
||||
"-j", string(svcChain))...)
|
||||
}
|
||||
}
|
||||
|
||||
// Capture nodeports. If we had more than 2 rules it might be
|
||||
// worthwhile to make a new per-service chain for nodeport rules, but
|
||||
// with just 2 rules it ends up being a waste and a cognitive burden.
|
||||
if info.nodePort != 0 {
|
||||
// Nodeports need SNAT.
|
||||
writeLine(rulesLines,
|
||||
"-A", string(iptablesNodePortsChain),
|
||||
"-m", "comment", "--comment", name.String(),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"--dport", fmt.Sprintf("%d", info.nodePort),
|
||||
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))
|
||||
// Jump to the service chain.
|
||||
writeLine(rulesLines,
|
||||
"-A", string(iptablesNodePortsChain),
|
||||
"-m", "comment", "--comment", name.String(),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"--dport", fmt.Sprintf("%d", info.nodePort),
|
||||
"-j", string(svcChain))
|
||||
}
|
||||
|
||||
// Generate the per-endpoint chains. We do this in multiple passes so we
|
||||
// can group rules together.
|
||||
endpoints := make([]string, 0)
|
||||
endpointChains := make([]utiliptables.Chain, 0)
|
||||
for _, ep := range info.endpoints {
|
||||
endpoints = append(endpoints, ep)
|
||||
endpointChain := servicePortAndEndpointToServiceChain(name, ep)
|
||||
endpointChains = append(endpointChains, endpointChain)
|
||||
|
||||
// Create the endpoint chain, retaining counters if possible.
|
||||
if chain, ok := existingChains[utiliptables.Chain(endpointChain)]; ok {
|
||||
writeLine(chainsLines, chain)
|
||||
} else {
|
||||
writeLine(chainsLines, makeChainLine(endpointChain))
|
||||
}
|
||||
activeChains[endpointChain] = true
|
||||
}
|
||||
|
||||
// First write session affinity rules, if applicable.
|
||||
if info.sessionAffinityType == api.ServiceAffinityClientIP {
|
||||
for _, endpointChain := range endpointChains {
|
||||
writeLine(rulesLines,
|
||||
"-A", string(svcChain),
|
||||
"-m", "comment", "--comment", name.String(),
|
||||
"-m", "recent", "--name", string(endpointChain),
|
||||
"--rcheck", "--seconds", fmt.Sprintf("%d", info.stickyMaxAgeSeconds), "--reap",
|
||||
"-j", string(endpointChain))
|
||||
}
|
||||
}
|
||||
|
||||
// Now write loadbalancing & DNAT rules.
|
||||
n := len(endpointChains)
|
||||
for i, endpointChain := range endpointChains {
|
||||
// Balancing rules in the per-service chain.
|
||||
args := []string{
|
||||
"-A", string(svcChain),
|
||||
"-m", "comment", "--comment", name.String(),
|
||||
}
|
||||
if i < (n - 1) {
|
||||
// Each rule is a probabilistic match.
|
||||
args = append(args,
|
||||
"-m", "statistic",
|
||||
"--mode", "random",
|
||||
"--probability", fmt.Sprintf("%f", 1.0/float64(n-i)))
|
||||
}
|
||||
// The final (or only if n == 1) rule is a guaranteed match.
|
||||
args = append(args, "-j", string(endpointChain))
|
||||
writeLine(rulesLines, args...)
|
||||
|
||||
// Rules in the per-endpoint chain.
|
||||
args = []string{
|
||||
"-A", string(endpointChain),
|
||||
"-m", "comment", "--comment", name.String(),
|
||||
}
|
||||
// Handle traffic that loops back to the originator with SNAT.
|
||||
// Technically we only need to do this if the endpoint is on this
|
||||
// host, but we don't have that information, so we just do this for
|
||||
// all endpoints.
|
||||
// TODO: if we grow logic to get this node's pod CIDR, we can use it.
|
||||
writeLine(rulesLines, append(args,
|
||||
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]),
|
||||
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
|
||||
|
||||
// Update client-affinity lists.
|
||||
if info.sessionAffinityType == api.ServiceAffinityClientIP {
|
||||
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
|
||||
}
|
||||
// DNAT to final destination.
|
||||
args = append(args,
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-j", "DNAT", "--to-destination", endpoints[i])
|
||||
writeLine(rulesLines, args...)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete chains no longer in use:
|
||||
activeChains := make(map[utiliptables.Chain]bool) // use a map as a set
|
||||
for _, chain := range newHostChains {
|
||||
activeChains[chain] = true
|
||||
}
|
||||
for _, chain := range newServiceChains {
|
||||
activeChains[chain] = true
|
||||
}
|
||||
// Delete chains no longer in use.
|
||||
for chain := range existingChains {
|
||||
if !activeChains[chain] {
|
||||
chainString := string(chain)
|
||||
// Ignore chains that aren't ours.
|
||||
if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") {
|
||||
// Ignore chains that aren't ours.
|
||||
continue
|
||||
}
|
||||
rulesLines.WriteString(fmt.Sprintf("-F %s\n-X %s\n", chain, chain))
|
||||
// We must (as per iptables) write a chain-line for it, which has
|
||||
// the nice effect of flushing the chain. Then we can remove the
|
||||
// chain.
|
||||
writeLine(chainsLines, existingChains[chain])
|
||||
writeLine(rulesLines, "-X", chainString)
|
||||
}
|
||||
}
|
||||
|
||||
// write end of table
|
||||
rulesLines.WriteString("COMMIT\n")
|
||||
// combine parts
|
||||
lines := append(chainsLines.Bytes(), rulesLines.Bytes()...)
|
||||
// Write the end-of-table marker.
|
||||
writeLine(rulesLines, "COMMIT")
|
||||
|
||||
// sync rules and return error
|
||||
// Sync rules.
|
||||
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
|
||||
lines := append(chainsLines.Bytes(), rulesLines.Bytes()...)
|
||||
glog.V(3).Infof("Syncing rules: %s", lines)
|
||||
// NOTE: flush=false is used so we don't flush non-kubernetes chains in the table.
|
||||
err = proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
||||
return err
|
||||
return proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
||||
}
|
||||
|
||||
// Join all words with spaces, terminate with newline and write to buf.
|
||||
func writeLine(buf *bytes.Buffer, words ...string) {
|
||||
buf.WriteString(strings.Join(words, " ") + "\n")
|
||||
}
|
||||
|
||||
// return an iptables-save/restore formatted chain line given a Chain
|
||||
func makeChainLine(chain utiliptables.Chain) string {
|
||||
return fmt.Sprintf(":%s - [0:0]\n", chain)
|
||||
return fmt.Sprintf(":%s - [0:0]", chain)
|
||||
}
|
||||
|
||||
// getChainLines parses a table's iptables-save data to find chains in the table.
|
||||
|
Loading…
Reference in New Issue
Block a user