diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index c274e9b56b4..a0a0b55ea18 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -19,25 +19,31 @@ limitations under the License. package kubenet import ( + "bytes" + "crypto/sha256" + "encoding/base32" "fmt" "net" - "strings" "sync" "syscall" "time" - "github.com/vishvananda/netlink" - "github.com/vishvananda/netlink/nl" - "github.com/appc/cni/libcni" cnitypes "github.com/appc/cni/pkg/types" "github.com/golang/glog" + "github.com/vishvananda/netlink" + "github.com/vishvananda/netlink/nl" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" + iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/util/bandwidth" + utildbus "k8s.io/kubernetes/pkg/util/dbus" utilexec "k8s.io/kubernetes/pkg/util/exec" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilsets "k8s.io/kubernetes/pkg/util/sets" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" ) @@ -48,6 +54,11 @@ const ( DefaultCNIDir = "/opt/cni/bin" sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables" + + // the hostport chain + kubenetHostportsChain utiliptables.Chain = "KUBENET-HOSTPORTS" + // prefix for kubenet hostport chains + kubenetHostportChainPrefix string = "KUBENET-HP-" ) type kubenetNetworkPlugin struct { @@ -64,13 +75,22 @@ type kubenetNetworkPlugin struct { execer utilexec.Interface nsenterPath string hairpinMode componentconfig.HairpinMode + hostPortMap map[hostport]closeable + iptables utiliptables.Interface } func NewPlugin() network.NetworkPlugin { + protocol := utiliptables.ProtocolIpv4 + execer := utilexec.New() + dbus := utildbus.New() + iptInterface := utiliptables.New(execer, dbus, protocol) + return &kubenetNetworkPlugin{ - podCIDRs: make(map[kubecontainer.ContainerID]string), - MTU: 1460, - execer: utilexec.New(), + podCIDRs: make(map[kubecontainer.ContainerID]string), + hostPortMap: make(map[hostport]closeable), + MTU: 1460, //TODO: don't hardcode this + execer: utilexec.New(), + iptables: iptInterface, } } @@ -268,6 +288,16 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k if !ok { return fmt.Errorf("pod %q cannot be found", name) } + // try to open pod host port if specified + hostportMap, err := plugin.openPodHostports(pod) + if err != nil { + return err + } + if len(hostportMap) > 0 { + // defer to decide whether to keep the host port open based on the result of SetUpPod + defer plugin.syncHostportMap(id, hostportMap) + } + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) if err != nil { return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) @@ -322,6 +352,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k } } + plugin.syncHostportsRules() return nil } @@ -353,6 +384,7 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i } delete(plugin.podCIDRs, id) + plugin.syncHostportsRules() return nil } @@ -453,3 +485,333 @@ func (plugin *kubenetNetworkPlugin) getNsenterPath() (string, error) { } return plugin.nsenterPath, nil } + +type closeable interface { + Close() error +} + +type hostport struct { + port int32 + protocol string +} + +type targetPod struct { + podFullName string + podIP string +} + +func (hp *hostport) String() string { + return fmt.Sprintf("%s:%d", hp.protocol, hp.port) +} + +//openPodHostports opens all hostport for pod and returns the map of hostport and socket +func (plugin *kubenetNetworkPlugin) openPodHostports(pod *api.Pod) (map[hostport]closeable, error) { + var retErr error + hostportMap := make(map[hostport]closeable) + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.HostPort <= 0 { + // Ignore + continue + } + hp := hostport{ + port: port.HostPort, + protocol: strings.ToLower(string(port.Protocol)), + } + socket, err := openLocalPort(&hp) + if err != nil { + retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, kubecontainer.GetPodFullName(pod), err) + break + } + hostportMap[hp] = socket + } + if retErr != nil { + break + } + } + // If encounter any error, close all hostports that just got opened. + if retErr != nil { + for hp, socket := range hostportMap { + if err := socket.Close(); err != nil { + glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, kubecontainer.GetPodFullName(pod), err) + } + } + } + return hostportMap, retErr +} + +//syncHostportMap syncs newly opened hostports to kubenet on successful pod setup. If pod setup failed, then clean up. +func (plugin *kubenetNetworkPlugin) syncHostportMap(id kubecontainer.ContainerID, hostportMap map[hostport]closeable) { + // if pod ip cannot be retrieved from podCIDR, then assume pod setup failed. + if _, ok := plugin.podCIDRs[id]; !ok { + for hp, socket := range hostportMap { + err := socket.Close() + if err != nil { + glog.Errorf("Failed to close socket for hostport %v", hp) + } + } + return + } + // add newly opened hostports + for hp, socket := range hostportMap { + plugin.hostPortMap[hp] = socket + } +} + +// gatherAllHostports returns all hostports that should be presented on node +func (plugin *kubenetNetworkPlugin) gatherAllHostports() (map[api.ContainerPort]targetPod, error) { + podHostportMap := make(map[api.ContainerPort]targetPod) + pods, err := plugin.host.GetRuntime().GetPods(false) + if err != nil { + return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err) + } + for _, p := range pods { + var podInfraContainerId kubecontainer.ContainerID + for _, c := range p.Containers { + if c.Name == dockertools.PodInfraContainerName { + podInfraContainerId = c.ID + break + } + } + // Assuming if kubenet has the pod's ip, the pod is alive and its host port should be presented. + cidr, ok := plugin.podCIDRs[podInfraContainerId] + if !ok { + // The POD has been delete. Ignore + continue + } + podIP, _, err := net.ParseCIDR(strings.Trim(cidr, "\n")) + if err != nil { + glog.V(3).Info("Failed to retrieve pod ip for %s-%s: %v", p.Namespace, p.Name, err) + continue + } + // Need the complete api.Pod object + pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name) + if ok { + for _, container := range pod.Spec.Containers { + for _, port := range container.Ports { + if port.HostPort != 0 { + podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP.String()} + } + } + } + } + } + return podHostportMap, nil +} + +// Join all words with spaces, terminate with newline and write to buf. +func writeLine(buf *bytes.Buffer, words ...string) { + buf.WriteString(strings.Join(words, " ") + "\n") +} + +//hostportChainName takes containerPort for a pod and returns associated iptables chain. +// This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do +// this because Iptables Chain Names must be <= 28 chars long, and the longer +// they are the harder they are to read. +func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Chain { + hash := sha256.Sum256([]byte(string(cp.HostPort) + string(cp.Protocol) + podFullName)) + encoded := base32.StdEncoding.EncodeToString(hash[:]) + return utiliptables.Chain(kubenetHostportChainPrefix + encoded[:16]) +} + +// syncHostportsRules gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports +func (plugin *kubenetNetworkPlugin) syncHostportsRules() { + start := time.Now() + defer func() { + glog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) + }() + + containerPortMap, err := plugin.gatherAllHostports() + if err != nil { + glog.Errorf("Fail to get hostports: %v", err) + return + } + + glog.V(4).Info("Ensuring kubenet hostport chains") + // Ensure kubenetHostportChain + if _, err := plugin.iptables.EnsureChain(utiliptables.TableNAT, kubenetHostportsChain); err != nil { + glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubenetHostportsChain, err) + return + } + tableChainsNeedJumpServices := []struct { + table utiliptables.Table + chain utiliptables.Chain + }{ + {utiliptables.TableNAT, utiliptables.ChainOutput}, + {utiliptables.TableNAT, utiliptables.ChainPrerouting}, + } + args := []string{"-m", "comment", "--comment", "kubenet hostport portals", + "-m", "addrtype", "--dst-type", "LOCAL", + "-j", string(kubenetHostportsChain)} + for _, tc := range tableChainsNeedJumpServices { + if _, err := plugin.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { + glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubenetHostportsChain, err) + return + } + } + // Need to SNAT traffic from localhost + args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", BridgeName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"} + if _, err := plugin.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { + glog.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err) + return + } + + // Get iptables-save output so we can check for existing chains and rules. + // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore + existingNATChains := make(map[utiliptables.Chain]string) + iptablesSaveRaw, err := plugin.iptables.Save(utiliptables.TableNAT) + if err != nil { // if we failed to get any rules + glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) + } else { // otherwise parse the output + existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) + } + + natChains := bytes.NewBuffer(nil) + natRules := bytes.NewBuffer(nil) + writeLine(natChains, "*nat") + // Make sure we keep stats for the top-level chains, if they existed + // (which most should have because we created them above). + if chain, ok := existingNATChains[kubenetHostportsChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(kubenetHostportsChain)) + } + // Assuming the node is running kube-proxy in iptables mode + // Reusing kube-proxy's KubeMarkMasqChain for SNAT + // TODO: let kubelet manage KubeMarkMasqChain. Other components should just be able to use it + if chain, ok := existingNATChains[iptablesproxy.KubeMarkMasqChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(iptablesproxy.KubeMarkMasqChain)) + } + + // Accumulate NAT chains to keep. + activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set + + for containerPort, target := range containerPortMap { + protocol := strings.ToLower(string(containerPort.Protocol)) + hostportChain := hostportChainName(containerPort, target.podFullName) + if chain, ok := existingNATChains[hostportChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(hostportChain)) + } + + activeNATChains[hostportChain] = true + + // Redirect to hostport chain + args := []string{ + "-A", string(kubenetHostportsChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), + "-m", protocol, "-p", protocol, + "--dport", fmt.Sprintf("%d", containerPort.HostPort), + "-j", string(hostportChain), + } + writeLine(natRules, args...) + + // If the request comes from the pod that is serving the hostport, then SNAT + args = []string{ + "-A", string(hostportChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), + "-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain), + } + writeLine(natRules, args...) + + // Create hostport chain to DNAT traffic to final destination + // Iptables will maintained the stats for this chain + args = []string{ + "-A", string(hostportChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort), + "-m", protocol, "-p", protocol, + "-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, containerPort.ContainerPort), + } + writeLine(natRules, args...) + } + + // Delete chains no longer in use. + for chain := range existingNATChains { + if !activeNATChains[chain] { + chainString := string(chain) + if !strings.HasPrefix(chainString, kubenetHostportChainPrefix) { + // Ignore chains that aren't ours. + continue + } + // We must (as per iptables) write a chain-line for it, which has + // the nice effect of flushing the chain. Then we can remove the + // chain. + writeLine(natChains, existingNATChains[chain]) + writeLine(natRules, "-X", chainString) + } + } + writeLine(natRules, "COMMIT") + + natLines := append(natChains.Bytes(), natRules.Bytes()...) + glog.V(3).Infof("Restoring iptables rules: %s", natLines) + err = plugin.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if err != nil { + glog.Errorf("Failed to execute iptables-restore: %v", err) + return + } + + plugin.cleanupHostportMap(containerPortMap) +} + +func openLocalPort(hp *hostport) (closeable, error) { + // For ports on node IPs, open the actual port and hold it, even though we + // use iptables to redirect traffic. + // This ensures a) that it's safe to use that port and b) that (a) stays + // true. The risk is that some process on the node (e.g. sshd or kubelet) + // is using a port and we give that same port out to a Service. That would + // be bad because iptables would silently claim the traffic but the process + // would never know. + // NOTE: We should not need to have a real listen()ing socket - bind() + // should be enough, but I can't figure out a way to e2e test without + // it. Tools like 'ss' and 'netstat' do not show sockets that are + // bind()ed but not listen()ed, and at least the default debian netcat + // has no way to avoid about 10 seconds of retries. + var socket closeable + switch hp.protocol { + case "tcp": + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + socket = listener + case "udp": + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port)) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + socket = conn + default: + return nil, fmt.Errorf("unknown protocol %q", hp.protocol) + } + glog.V(2).Infof("Opened local port %s", hp.String()) + return socket, nil +} + +// cleanupHostportMap closes obsolete hostports +func (plugin *kubenetNetworkPlugin) cleanupHostportMap(containerPortMap map[api.ContainerPort]targetPod) { + // compute hostports that are supposed to be open + currentHostports := make(map[hostport]bool) + for containerPort := range containerPortMap { + hp := hostport{ + port: containerPort.HostPort, + protocol: string(containerPort.Protocol), + } + currentHostports[hp] = true + } + + // close and delete obsolete hostports + for hp, socket := range plugin.hostPortMap { + if _, ok := currentHostports[hp]; !ok { + socket.Close() + delete(plugin.hostPortMap, hp) + } + } +} diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 4cdd29ee610..83ee6e5babf 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -65,7 +65,8 @@ const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" // the mark-for-masquerade chain -const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" +// TODO: let kubelet manage this chain. Other component should just assume it exists and use it. +const KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" // the mark we apply to traffic needing SNAT // TODO(thockin): Remove this for v1.3 or v1.4. @@ -270,12 +271,12 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err) encounteredError = true } else { - existingNATChains := getChainLines(utiliptables.TableNAT, iptablesSaveRaw) + existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) natChains := bytes.NewBuffer(nil) natRules := bytes.NewBuffer(nil) writeLine(natChains, "*nat") // Start with chains we know we need to remove. - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} { + for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { if _, found := existingNATChains[chain]; found { chainString := string(chain) writeLine(natChains, existingNATChains[chain]) // flush @@ -698,7 +699,7 @@ func (proxier *Proxier) syncProxyRules() { if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output - existingFilterChains = getChainLines(utiliptables.TableFilter, iptablesSaveRaw) + existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw) } existingNATChains := make(map[utiliptables.Chain]string) @@ -706,7 +707,7 @@ func (proxier *Proxier) syncProxyRules() { if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output - existingNATChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw) + existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) } filterChains := bytes.NewBuffer(nil) @@ -723,27 +724,27 @@ func (proxier *Proxier) syncProxyRules() { if chain, ok := existingFilterChains[kubeServicesChain]; ok { writeLine(filterChains, chain) } else { - writeLine(filterChains, makeChainLine(kubeServicesChain)) + writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeServicesChain]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(kubeServicesChain)) + writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeNodePortsChain]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(kubeNodePortsChain)) + writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) } if chain, ok := existingNATChains[kubePostroutingChain]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(kubePostroutingChain)) + writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain)) } - if chain, ok := existingNATChains[kubeMarkMasqChain]; ok { + if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(kubeMarkMasqChain)) + writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) } // Install the kubernetes-specific postrouting rules. We use a whole chain for @@ -760,7 +761,7 @@ func (proxier *Proxier) syncProxyRules() { // this so that it is easier to flush and change, for example if the mark // value should ever change. writeLine(natRules, []string{ - "-A", string(kubeMarkMasqChain), + "-A", string(KubeMarkMasqChain), "-j", "MARK", "--set-xmark", proxier.masqueradeMark, }...) @@ -779,7 +780,7 @@ func (proxier *Proxier) syncProxyRules() { if chain, ok := existingNATChains[svcChain]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(svcChain)) + writeLine(natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true @@ -792,10 +793,10 @@ func (proxier *Proxier) syncProxyRules() { "--dport", fmt.Sprintf("%d", svcInfo.port), } if proxier.masqueradeAll { - writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) + writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) } if len(proxier.clusterCIDR) > 0 { - writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(kubeMarkMasqChain))...) + writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) } writeLine(natRules, append(args, "-j", string(svcChain))...) @@ -833,7 +834,7 @@ func (proxier *Proxier) syncProxyRules() { "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets to external IPs. - writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) + writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // nor from a local process to be forwarded to the service. @@ -860,7 +861,7 @@ func (proxier *Proxier) syncProxyRules() { "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets from external IPs. - writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) + writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(natRules, append(args, "-j", string(svcChain))...) } } @@ -896,7 +897,7 @@ func (proxier *Proxier) syncProxyRules() { "--dport", fmt.Sprintf("%d", svcInfo.nodePort), } // Nodeports need SNAT. - writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...) + writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. writeLine(natRules, append(args, "-j", string(svcChain))...) } @@ -927,7 +928,7 @@ func (proxier *Proxier) syncProxyRules() { if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { writeLine(natChains, chain) } else { - writeLine(natChains, makeChainLine(endpointChain)) + writeLine(natChains, utiliptables.MakeChainLine(endpointChain)) } activeNATChains[endpointChain] = true } @@ -975,7 +976,7 @@ func (proxier *Proxier) syncProxyRules() { // TODO: if we grow logic to get this node's pod CIDR, we can use it. writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]), - "-j", string(kubeMarkMasqChain))...) + "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { @@ -1057,92 +1058,6 @@ func writeLine(buf *bytes.Buffer, words ...string) { buf.WriteString(strings.Join(words, " ") + "\n") } -// return an iptables-save/restore formatted chain line given a Chain -func makeChainLine(chain utiliptables.Chain) string { - return fmt.Sprintf(":%s - [0:0]", chain) -} - -// getChainLines parses a table's iptables-save data to find chains in the table. -// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc). -func getChainLines(table utiliptables.Table, save []byte) map[utiliptables.Chain]string { - chainsMap := make(map[utiliptables.Chain]string) - tablePrefix := "*" + string(table) - readIndex := 0 - // find beginning of table - for readIndex < len(save) { - line, n := readLine(readIndex, save) - readIndex = n - if strings.HasPrefix(line, tablePrefix) { - break - } - } - // parse table lines - for readIndex < len(save) { - line, n := readLine(readIndex, save) - readIndex = n - if len(line) == 0 { - continue - } - if strings.HasPrefix(line, "COMMIT") || strings.HasPrefix(line, "*") { - break - } else if strings.HasPrefix(line, "#") { - continue - } else if strings.HasPrefix(line, ":") && len(line) > 1 { - chain := utiliptables.Chain(strings.SplitN(line[1:], " ", 2)[0]) - chainsMap[chain] = line - } - } - return chainsMap -} - -func readLine(readIndex int, byteArray []byte) (string, int) { - currentReadIndex := readIndex - - // consume left spaces - for currentReadIndex < len(byteArray) { - if byteArray[currentReadIndex] == ' ' { - currentReadIndex++ - } else { - break - } - } - - // leftTrimIndex stores the left index of the line after the line is left-trimmed - leftTrimIndex := currentReadIndex - - // rightTrimIndex stores the right index of the line after the line is right-trimmed - // it is set to -1 since the correct value has not yet been determined. - rightTrimIndex := -1 - - for ; currentReadIndex < len(byteArray); currentReadIndex++ { - if byteArray[currentReadIndex] == ' ' { - // set rightTrimIndex - if rightTrimIndex == -1 { - rightTrimIndex = currentReadIndex - } - } else if (byteArray[currentReadIndex] == '\n') || (currentReadIndex == (len(byteArray) - 1)) { - // end of line or byte buffer is reached - if currentReadIndex <= leftTrimIndex { - return "", currentReadIndex + 1 - } - // set the rightTrimIndex - if rightTrimIndex == -1 { - rightTrimIndex = currentReadIndex - if currentReadIndex == (len(byteArray)-1) && (byteArray[currentReadIndex] != '\n') { - // ensure that the last character is part of the returned string, - // unless the last character is '\n' - rightTrimIndex = currentReadIndex + 1 - } - } - return string(byteArray[leftTrimIndex:rightTrimIndex]), currentReadIndex + 1 - } else { - // unset rightTrimIndex - rightTrimIndex = -1 - } - } - return "", currentReadIndex -} - func isLocalIP(ip string) (bool, error) { addrs, err := net.InterfaceAddrs() if err != nil { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 21e59590b2a..1ab1c5f7af1 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -30,7 +30,7 @@ import ( ) func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) { - chainLines := getChainLines(table, save) + chainLines := utiliptables.GetChainLines(table, save) for chain, line := range chainLines { if expected, exists := expectedLines[chain]; exists { if expected != line { @@ -47,7 +47,7 @@ func TestReadLinesFromByteBuffer(t *testing.T) { index := 0 readIndex := 0 for ; readIndex < len(byteArray); index++ { - line, n := readLine(readIndex, byteArray) + line, n := utiliptables.ReadLine(readIndex, byteArray) readIndex = n if expected[index] != line { t.Errorf("expected:%q, actual:%q", expected[index], line) diff --git a/pkg/util/iptables/save_restore.go b/pkg/util/iptables/save_restore.go new file mode 100644 index 00000000000..d86eb755ce7 --- /dev/null +++ b/pkg/util/iptables/save_restore.go @@ -0,0 +1,108 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "fmt" + "strings" +) + +// MakeChainLine return an iptables-save/restore formatted chain line given a Chain +func MakeChainLine(chain Chain) string { + return fmt.Sprintf(":%s - [0:0]", chain) +} + +// GetChainLines parses a table's iptables-save data to find chains in the table. +// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc). +func GetChainLines(table Table, save []byte) map[Chain]string { + chainsMap := make(map[Chain]string) + tablePrefix := "*" + string(table) + readIndex := 0 + // find beginning of table + for readIndex < len(save) { + line, n := ReadLine(readIndex, save) + readIndex = n + if strings.HasPrefix(line, tablePrefix) { + break + } + } + // parse table lines + for readIndex < len(save) { + line, n := ReadLine(readIndex, save) + readIndex = n + if len(line) == 0 { + continue + } + if strings.HasPrefix(line, "COMMIT") || strings.HasPrefix(line, "*") { + break + } else if strings.HasPrefix(line, "#") { + continue + } else if strings.HasPrefix(line, ":") && len(line) > 1 { + chain := Chain(strings.SplitN(line[1:], " ", 2)[0]) + chainsMap[chain] = line + } + } + return chainsMap +} + +func ReadLine(readIndex int, byteArray []byte) (string, int) { + currentReadIndex := readIndex + + // consume left spaces + for currentReadIndex < len(byteArray) { + if byteArray[currentReadIndex] == ' ' { + currentReadIndex++ + } else { + break + } + } + + // leftTrimIndex stores the left index of the line after the line is left-trimmed + leftTrimIndex := currentReadIndex + + // rightTrimIndex stores the right index of the line after the line is right-trimmed + // it is set to -1 since the correct value has not yet been determined. + rightTrimIndex := -1 + + for ; currentReadIndex < len(byteArray); currentReadIndex++ { + if byteArray[currentReadIndex] == ' ' { + // set rightTrimIndex + if rightTrimIndex == -1 { + rightTrimIndex = currentReadIndex + } + } else if (byteArray[currentReadIndex] == '\n') || (currentReadIndex == (len(byteArray) - 1)) { + // end of line or byte buffer is reached + if currentReadIndex <= leftTrimIndex { + return "", currentReadIndex + 1 + } + // set the rightTrimIndex + if rightTrimIndex == -1 { + rightTrimIndex = currentReadIndex + if currentReadIndex == (len(byteArray)-1) && (byteArray[currentReadIndex] != '\n') { + // ensure that the last character is part of the returned string, + // unless the last character is '\n' + rightTrimIndex = currentReadIndex + 1 + } + } + return string(byteArray[leftTrimIndex:rightTrimIndex]), currentReadIndex + 1 + } else { + // unset rightTrimIndex + rightTrimIndex = -1 + } + } + return "", currentReadIndex +}