add hostport support for kubenet

This commit is contained in:
Minhan Xia
2016-05-11 13:25:14 -07:00
parent 6528a4a6a3
commit 6a3ad1d66d
4 changed files with 500 additions and 115 deletions

View File

@@ -65,7 +65,8 @@ const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
// the mark-for-masquerade chain
const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// TODO: let kubelet manage this chain. Other component should just assume it exists and use it.
const KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// the mark we apply to traffic needing SNAT
// TODO(thockin): Remove this for v1.3 or v1.4.
@@ -270,12 +271,12 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err)
encounteredError = true
} else {
existingNATChains := getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
// Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} {
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
if _, found := existingNATChains[chain]; found {
chainString := string(chain)
writeLine(natChains, existingNATChains[chain]) // flush
@@ -698,7 +699,7 @@ func (proxier *Proxier) syncProxyRules() {
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingFilterChains = getChainLines(utiliptables.TableFilter, iptablesSaveRaw)
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw)
}
existingNATChains := make(map[utiliptables.Chain]string)
@@ -706,7 +707,7 @@ func (proxier *Proxier) syncProxyRules() {
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingNATChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
}
filterChains := bytes.NewBuffer(nil)
@@ -723,27 +724,27 @@ func (proxier *Proxier) syncProxyRules() {
if chain, ok := existingFilterChains[kubeServicesChain]; ok {
writeLine(filterChains, chain)
} else {
writeLine(filterChains, makeChainLine(kubeServicesChain))
writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[kubeServicesChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeServicesChain))
writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeNodePortsChain))
writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain))
}
if chain, ok := existingNATChains[kubePostroutingChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubePostroutingChain))
writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain))
}
if chain, ok := existingNATChains[kubeMarkMasqChain]; ok {
if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeMarkMasqChain))
writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
}
// Install the kubernetes-specific postrouting rules. We use a whole chain for
@@ -760,7 +761,7 @@ func (proxier *Proxier) syncProxyRules() {
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
"-A", string(kubeMarkMasqChain),
"-A", string(KubeMarkMasqChain),
"-j", "MARK", "--set-xmark", proxier.masqueradeMark,
}...)
@@ -779,7 +780,7 @@ func (proxier *Proxier) syncProxyRules() {
if chain, ok := existingNATChains[svcChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(svcChain))
writeLine(natChains, utiliptables.MakeChainLine(svcChain))
}
activeNATChains[svcChain] = true
@@ -792,10 +793,10 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
if proxier.masqueradeAll {
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if len(proxier.clusterCIDR) > 0 {
writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
}
writeLine(natRules, append(args, "-j", string(svcChain))...)
@@ -833,7 +834,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// We have to SNAT packets to external IPs.
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
// nor from a local process to be forwarded to the service.
@@ -860,7 +861,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// We have to SNAT packets from external IPs.
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(svcChain))...)
}
}
@@ -896,7 +897,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
}
// Nodeports need SNAT.
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Jump to the service chain.
writeLine(natRules, append(args, "-j", string(svcChain))...)
}
@@ -927,7 +928,7 @@ func (proxier *Proxier) syncProxyRules() {
if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(endpointChain))
writeLine(natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true
}
@@ -975,7 +976,7 @@ func (proxier *Proxier) syncProxyRules() {
// TODO: if we grow logic to get this node's pod CIDR, we can use it.
writeLine(natRules, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]),
"-j", string(kubeMarkMasqChain))...)
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
@@ -1057,92 +1058,6 @@ func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n")
}
// return an iptables-save/restore formatted chain line given a Chain
func makeChainLine(chain utiliptables.Chain) string {
return fmt.Sprintf(":%s - [0:0]", chain)
}
// getChainLines parses a table's iptables-save data to find chains in the table.
// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc).
func getChainLines(table utiliptables.Table, save []byte) map[utiliptables.Chain]string {
chainsMap := make(map[utiliptables.Chain]string)
tablePrefix := "*" + string(table)
readIndex := 0
// find beginning of table
for readIndex < len(save) {
line, n := readLine(readIndex, save)
readIndex = n
if strings.HasPrefix(line, tablePrefix) {
break
}
}
// parse table lines
for readIndex < len(save) {
line, n := readLine(readIndex, save)
readIndex = n
if len(line) == 0 {
continue
}
if strings.HasPrefix(line, "COMMIT") || strings.HasPrefix(line, "*") {
break
} else if strings.HasPrefix(line, "#") {
continue
} else if strings.HasPrefix(line, ":") && len(line) > 1 {
chain := utiliptables.Chain(strings.SplitN(line[1:], " ", 2)[0])
chainsMap[chain] = line
}
}
return chainsMap
}
func readLine(readIndex int, byteArray []byte) (string, int) {
currentReadIndex := readIndex
// consume left spaces
for currentReadIndex < len(byteArray) {
if byteArray[currentReadIndex] == ' ' {
currentReadIndex++
} else {
break
}
}
// leftTrimIndex stores the left index of the line after the line is left-trimmed
leftTrimIndex := currentReadIndex
// rightTrimIndex stores the right index of the line after the line is right-trimmed
// it is set to -1 since the correct value has not yet been determined.
rightTrimIndex := -1
for ; currentReadIndex < len(byteArray); currentReadIndex++ {
if byteArray[currentReadIndex] == ' ' {
// set rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
}
} else if (byteArray[currentReadIndex] == '\n') || (currentReadIndex == (len(byteArray) - 1)) {
// end of line or byte buffer is reached
if currentReadIndex <= leftTrimIndex {
return "", currentReadIndex + 1
}
// set the rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
if currentReadIndex == (len(byteArray)-1) && (byteArray[currentReadIndex] != '\n') {
// ensure that the last character is part of the returned string,
// unless the last character is '\n'
rightTrimIndex = currentReadIndex + 1
}
}
return string(byteArray[leftTrimIndex:rightTrimIndex]), currentReadIndex + 1
} else {
// unset rightTrimIndex
rightTrimIndex = -1
}
}
return "", currentReadIndex
}
func isLocalIP(ip string) (bool, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {