Merge pull request #25604 from freehan/kubenethostport

Kubenet host-port support through iptables
This commit is contained in:
Alex Mohr 2016-05-26 15:49:12 -07:00
commit aab6c43a33
4 changed files with 500 additions and 115 deletions

View File

@ -19,25 +19,31 @@ limitations under the License.
package kubenet
import (
"bytes"
"crypto/sha256"
"encoding/base32"
"fmt"
"net"
"strings"
"sync"
"syscall"
"time"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netlink/nl"
"github.com/appc/cni/libcni"
cnitypes "github.com/appc/cni/pkg/types"
"github.com/golang/glog"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netlink/nl"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/util/bandwidth"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsets "k8s.io/kubernetes/pkg/util/sets"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
)
@ -48,6 +54,11 @@ const (
DefaultCNIDir = "/opt/cni/bin"
sysctlBridgeCallIptables = "net/bridge/bridge-nf-call-iptables"
// the hostport chain
kubenetHostportsChain utiliptables.Chain = "KUBENET-HOSTPORTS"
// prefix for kubenet hostport chains
kubenetHostportChainPrefix string = "KUBENET-HP-"
)
type kubenetNetworkPlugin struct {
@ -64,13 +75,22 @@ type kubenetNetworkPlugin struct {
execer utilexec.Interface
nsenterPath string
hairpinMode componentconfig.HairpinMode
hostPortMap map[hostport]closeable
iptables utiliptables.Interface
}
func NewPlugin() network.NetworkPlugin {
protocol := utiliptables.ProtocolIpv4
execer := utilexec.New()
dbus := utildbus.New()
iptInterface := utiliptables.New(execer, dbus, protocol)
return &kubenetNetworkPlugin{
podCIDRs: make(map[kubecontainer.ContainerID]string),
MTU: 1460,
execer: utilexec.New(),
podCIDRs: make(map[kubecontainer.ContainerID]string),
hostPortMap: make(map[hostport]closeable),
MTU: 1460, //TODO: don't hardcode this
execer: utilexec.New(),
iptables: iptInterface,
}
}
@ -268,6 +288,16 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
if !ok {
return fmt.Errorf("pod %q cannot be found", name)
}
// try to open pod host port if specified
hostportMap, err := plugin.openPodHostports(pod)
if err != nil {
return err
}
if len(hostportMap) > 0 {
// defer to decide whether to keep the host port open based on the result of SetUpPod
defer plugin.syncHostportMap(id, hostportMap)
}
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
@ -322,6 +352,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
}
}
plugin.syncHostportsRules()
return nil
}
@ -353,6 +384,7 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
}
delete(plugin.podCIDRs, id)
plugin.syncHostportsRules()
return nil
}
@ -453,3 +485,333 @@ func (plugin *kubenetNetworkPlugin) getNsenterPath() (string, error) {
}
return plugin.nsenterPath, nil
}
type closeable interface {
Close() error
}
type hostport struct {
port int32
protocol string
}
type targetPod struct {
podFullName string
podIP string
}
func (hp *hostport) String() string {
return fmt.Sprintf("%s:%d", hp.protocol, hp.port)
}
//openPodHostports opens all hostport for pod and returns the map of hostport and socket
func (plugin *kubenetNetworkPlugin) openPodHostports(pod *api.Pod) (map[hostport]closeable, error) {
var retErr error
hostportMap := make(map[hostport]closeable)
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort <= 0 {
// Ignore
continue
}
hp := hostport{
port: port.HostPort,
protocol: strings.ToLower(string(port.Protocol)),
}
socket, err := openLocalPort(&hp)
if err != nil {
retErr = fmt.Errorf("Cannot open hostport %d for pod %s: %v", port.HostPort, kubecontainer.GetPodFullName(pod), err)
break
}
hostportMap[hp] = socket
}
if retErr != nil {
break
}
}
// If encounter any error, close all hostports that just got opened.
if retErr != nil {
for hp, socket := range hostportMap {
if err := socket.Close(); err != nil {
glog.Errorf("Cannot clean up hostport %d for pod %s: %v", hp.port, kubecontainer.GetPodFullName(pod), err)
}
}
}
return hostportMap, retErr
}
//syncHostportMap syncs newly opened hostports to kubenet on successful pod setup. If pod setup failed, then clean up.
func (plugin *kubenetNetworkPlugin) syncHostportMap(id kubecontainer.ContainerID, hostportMap map[hostport]closeable) {
// if pod ip cannot be retrieved from podCIDR, then assume pod setup failed.
if _, ok := plugin.podCIDRs[id]; !ok {
for hp, socket := range hostportMap {
err := socket.Close()
if err != nil {
glog.Errorf("Failed to close socket for hostport %v", hp)
}
}
return
}
// add newly opened hostports
for hp, socket := range hostportMap {
plugin.hostPortMap[hp] = socket
}
}
// gatherAllHostports returns all hostports that should be presented on node
func (plugin *kubenetNetworkPlugin) gatherAllHostports() (map[api.ContainerPort]targetPod, error) {
podHostportMap := make(map[api.ContainerPort]targetPod)
pods, err := plugin.host.GetRuntime().GetPods(false)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
for _, p := range pods {
var podInfraContainerId kubecontainer.ContainerID
for _, c := range p.Containers {
if c.Name == dockertools.PodInfraContainerName {
podInfraContainerId = c.ID
break
}
}
// Assuming if kubenet has the pod's ip, the pod is alive and its host port should be presented.
cidr, ok := plugin.podCIDRs[podInfraContainerId]
if !ok {
// The POD has been delete. Ignore
continue
}
podIP, _, err := net.ParseCIDR(strings.Trim(cidr, "\n"))
if err != nil {
glog.V(3).Info("Failed to retrieve pod ip for %s-%s: %v", p.Namespace, p.Name, err)
continue
}
// Need the complete api.Pod object
pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name)
if ok {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort != 0 {
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP.String()}
}
}
}
}
}
return podHostportMap, nil
}
// Join all words with spaces, terminate with newline and write to buf.
func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n")
}
//hostportChainName takes containerPort for a pod and returns associated iptables chain.
// This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
// this because Iptables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Chain {
hash := sha256.Sum256([]byte(string(cp.HostPort) + string(cp.Protocol) + podFullName))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain(kubenetHostportChainPrefix + encoded[:16])
}
// syncHostportsRules gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports
func (plugin *kubenetNetworkPlugin) syncHostportsRules() {
start := time.Now()
defer func() {
glog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
}()
containerPortMap, err := plugin.gatherAllHostports()
if err != nil {
glog.Errorf("Fail to get hostports: %v", err)
return
}
glog.V(4).Info("Ensuring kubenet hostport chains")
// Ensure kubenetHostportChain
if _, err := plugin.iptables.EnsureChain(utiliptables.TableNAT, kubenetHostportsChain); err != nil {
glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubenetHostportsChain, err)
return
}
tableChainsNeedJumpServices := []struct {
table utiliptables.Table
chain utiliptables.Chain
}{
{utiliptables.TableNAT, utiliptables.ChainOutput},
{utiliptables.TableNAT, utiliptables.ChainPrerouting},
}
args := []string{"-m", "comment", "--comment", "kubenet hostport portals",
"-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubenetHostportsChain)}
for _, tc := range tableChainsNeedJumpServices {
if _, err := plugin.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubenetHostportsChain, err)
return
}
}
// Need to SNAT traffic from localhost
args = []string{"-m", "comment", "--comment", "SNAT for localhost access to hostports", "-o", BridgeName, "-s", "127.0.0.0/8", "-j", "MASQUERADE"}
if _, err := plugin.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
glog.Errorf("Failed to ensure that %s chain %s jumps to MASQUERADE: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, err)
return
}
// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingNATChains := make(map[utiliptables.Chain]string)
iptablesSaveRaw, err := plugin.iptables.Save(utiliptables.TableNAT)
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
}
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
// Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above).
if chain, ok := existingNATChains[kubenetHostportsChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(kubenetHostportsChain))
}
// Assuming the node is running kube-proxy in iptables mode
// Reusing kube-proxy's KubeMarkMasqChain for SNAT
// TODO: let kubelet manage KubeMarkMasqChain. Other components should just be able to use it
if chain, ok := existingNATChains[iptablesproxy.KubeMarkMasqChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(iptablesproxy.KubeMarkMasqChain))
}
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
for containerPort, target := range containerPortMap {
protocol := strings.ToLower(string(containerPort.Protocol))
hostportChain := hostportChainName(containerPort, target.podFullName)
if chain, ok := existingNATChains[hostportChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(hostportChain))
}
activeNATChains[hostportChain] = true
// Redirect to hostport chain
args := []string{
"-A", string(kubenetHostportsChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", containerPort.HostPort),
"-j", string(hostportChain),
}
writeLine(natRules, args...)
// If the request comes from the pod that is serving the hostport, then SNAT
args = []string{
"-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-s", target.podIP, "-j", string(iptablesproxy.KubeMarkMasqChain),
}
writeLine(natRules, args...)
// Create hostport chain to DNAT traffic to final destination
// Iptables will maintained the stats for this chain
args = []string{
"-A", string(hostportChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s hostport %d"`, target.podFullName, containerPort.HostPort),
"-m", protocol, "-p", protocol,
"-j", "DNAT", fmt.Sprintf("--to-destination=%s:%d", target.podIP, containerPort.ContainerPort),
}
writeLine(natRules, args...)
}
// Delete chains no longer in use.
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !strings.HasPrefix(chainString, kubenetHostportChainPrefix) {
// Ignore chains that aren't ours.
continue
}
// We must (as per iptables) write a chain-line for it, which has
// the nice effect of flushing the chain. Then we can remove the
// chain.
writeLine(natChains, existingNATChains[chain])
writeLine(natRules, "-X", chainString)
}
}
writeLine(natRules, "COMMIT")
natLines := append(natChains.Bytes(), natRules.Bytes()...)
glog.V(3).Infof("Restoring iptables rules: %s", natLines)
err = plugin.iptables.RestoreAll(natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to execute iptables-restore: %v", err)
return
}
plugin.cleanupHostportMap(containerPortMap)
}
func openLocalPort(hp *hostport) (closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket closeable
switch hp.protocol {
case "tcp":
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
socket = listener
case "udp":
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", hp.port))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", hp.protocol)
}
glog.V(2).Infof("Opened local port %s", hp.String())
return socket, nil
}
// cleanupHostportMap closes obsolete hostports
func (plugin *kubenetNetworkPlugin) cleanupHostportMap(containerPortMap map[api.ContainerPort]targetPod) {
// compute hostports that are supposed to be open
currentHostports := make(map[hostport]bool)
for containerPort := range containerPortMap {
hp := hostport{
port: containerPort.HostPort,
protocol: string(containerPort.Protocol),
}
currentHostports[hp] = true
}
// close and delete obsolete hostports
for hp, socket := range plugin.hostPortMap {
if _, ok := currentHostports[hp]; !ok {
socket.Close()
delete(plugin.hostPortMap, hp)
}
}
}

View File

@ -65,7 +65,8 @@ const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
// the mark-for-masquerade chain
const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// TODO: let kubelet manage this chain. Other component should just assume it exists and use it.
const KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// the mark we apply to traffic needing SNAT
// TODO(thockin): Remove this for v1.3 or v1.4.
@ -270,12 +271,12 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
glog.Errorf("Failed to execute iptables-save for %s: %v", utiliptables.TableNAT, err)
encounteredError = true
} else {
existingNATChains := getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)
writeLine(natChains, "*nat")
// Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} {
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
if _, found := existingNATChains[chain]; found {
chainString := string(chain)
writeLine(natChains, existingNATChains[chain]) // flush
@ -698,7 +699,7 @@ func (proxier *Proxier) syncProxyRules() {
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingFilterChains = getChainLines(utiliptables.TableFilter, iptablesSaveRaw)
existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw)
}
existingNATChains := make(map[utiliptables.Chain]string)
@ -706,7 +707,7 @@ func (proxier *Proxier) syncProxyRules() {
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingNATChains = getChainLines(utiliptables.TableNAT, iptablesSaveRaw)
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
}
filterChains := bytes.NewBuffer(nil)
@ -723,27 +724,27 @@ func (proxier *Proxier) syncProxyRules() {
if chain, ok := existingFilterChains[kubeServicesChain]; ok {
writeLine(filterChains, chain)
} else {
writeLine(filterChains, makeChainLine(kubeServicesChain))
writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[kubeServicesChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeServicesChain))
writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeNodePortsChain))
writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain))
}
if chain, ok := existingNATChains[kubePostroutingChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubePostroutingChain))
writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain))
}
if chain, ok := existingNATChains[kubeMarkMasqChain]; ok {
if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(kubeMarkMasqChain))
writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
}
// Install the kubernetes-specific postrouting rules. We use a whole chain for
@ -760,7 +761,7 @@ func (proxier *Proxier) syncProxyRules() {
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
"-A", string(kubeMarkMasqChain),
"-A", string(KubeMarkMasqChain),
"-j", "MARK", "--set-xmark", proxier.masqueradeMark,
}...)
@ -779,7 +780,7 @@ func (proxier *Proxier) syncProxyRules() {
if chain, ok := existingNATChains[svcChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(svcChain))
writeLine(natChains, utiliptables.MakeChainLine(svcChain))
}
activeNATChains[svcChain] = true
@ -792,10 +793,10 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
if proxier.masqueradeAll {
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if len(proxier.clusterCIDR) > 0 {
writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
}
writeLine(natRules, append(args, "-j", string(svcChain))...)
@ -833,7 +834,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// We have to SNAT packets to external IPs.
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
// nor from a local process to be forwarded to the service.
@ -860,7 +861,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// We have to SNAT packets from external IPs.
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(svcChain))...)
}
}
@ -896,7 +897,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
}
// Nodeports need SNAT.
writeLine(natRules, append(args, "-j", string(kubeMarkMasqChain))...)
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Jump to the service chain.
writeLine(natRules, append(args, "-j", string(svcChain))...)
}
@ -927,7 +928,7 @@ func (proxier *Proxier) syncProxyRules() {
if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, makeChainLine(endpointChain))
writeLine(natChains, utiliptables.MakeChainLine(endpointChain))
}
activeNATChains[endpointChain] = true
}
@ -975,7 +976,7 @@ func (proxier *Proxier) syncProxyRules() {
// TODO: if we grow logic to get this node's pod CIDR, we can use it.
writeLine(natRules, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]),
"-j", string(kubeMarkMasqChain))...)
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
@ -1057,92 +1058,6 @@ func writeLine(buf *bytes.Buffer, words ...string) {
buf.WriteString(strings.Join(words, " ") + "\n")
}
// return an iptables-save/restore formatted chain line given a Chain
func makeChainLine(chain utiliptables.Chain) string {
return fmt.Sprintf(":%s - [0:0]", chain)
}
// getChainLines parses a table's iptables-save data to find chains in the table.
// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc).
func getChainLines(table utiliptables.Table, save []byte) map[utiliptables.Chain]string {
chainsMap := make(map[utiliptables.Chain]string)
tablePrefix := "*" + string(table)
readIndex := 0
// find beginning of table
for readIndex < len(save) {
line, n := readLine(readIndex, save)
readIndex = n
if strings.HasPrefix(line, tablePrefix) {
break
}
}
// parse table lines
for readIndex < len(save) {
line, n := readLine(readIndex, save)
readIndex = n
if len(line) == 0 {
continue
}
if strings.HasPrefix(line, "COMMIT") || strings.HasPrefix(line, "*") {
break
} else if strings.HasPrefix(line, "#") {
continue
} else if strings.HasPrefix(line, ":") && len(line) > 1 {
chain := utiliptables.Chain(strings.SplitN(line[1:], " ", 2)[0])
chainsMap[chain] = line
}
}
return chainsMap
}
func readLine(readIndex int, byteArray []byte) (string, int) {
currentReadIndex := readIndex
// consume left spaces
for currentReadIndex < len(byteArray) {
if byteArray[currentReadIndex] == ' ' {
currentReadIndex++
} else {
break
}
}
// leftTrimIndex stores the left index of the line after the line is left-trimmed
leftTrimIndex := currentReadIndex
// rightTrimIndex stores the right index of the line after the line is right-trimmed
// it is set to -1 since the correct value has not yet been determined.
rightTrimIndex := -1
for ; currentReadIndex < len(byteArray); currentReadIndex++ {
if byteArray[currentReadIndex] == ' ' {
// set rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
}
} else if (byteArray[currentReadIndex] == '\n') || (currentReadIndex == (len(byteArray) - 1)) {
// end of line or byte buffer is reached
if currentReadIndex <= leftTrimIndex {
return "", currentReadIndex + 1
}
// set the rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
if currentReadIndex == (len(byteArray)-1) && (byteArray[currentReadIndex] != '\n') {
// ensure that the last character is part of the returned string,
// unless the last character is '\n'
rightTrimIndex = currentReadIndex + 1
}
}
return string(byteArray[leftTrimIndex:rightTrimIndex]), currentReadIndex + 1
} else {
// unset rightTrimIndex
rightTrimIndex = -1
}
}
return "", currentReadIndex
}
func isLocalIP(ip string) (bool, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {

View File

@ -30,7 +30,7 @@ import (
)
func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
chainLines := getChainLines(table, save)
chainLines := utiliptables.GetChainLines(table, save)
for chain, line := range chainLines {
if expected, exists := expectedLines[chain]; exists {
if expected != line {
@ -47,7 +47,7 @@ func TestReadLinesFromByteBuffer(t *testing.T) {
index := 0
readIndex := 0
for ; readIndex < len(byteArray); index++ {
line, n := readLine(readIndex, byteArray)
line, n := utiliptables.ReadLine(readIndex, byteArray)
readIndex = n
if expected[index] != line {
t.Errorf("expected:%q, actual:%q", expected[index], line)

View File

@ -0,0 +1,108 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package iptables
import (
"fmt"
"strings"
)
// MakeChainLine return an iptables-save/restore formatted chain line given a Chain
func MakeChainLine(chain Chain) string {
return fmt.Sprintf(":%s - [0:0]", chain)
}
// GetChainLines parses a table's iptables-save data to find chains in the table.
// It returns a map of iptables.Chain to string where the string is the chain line from the save (with counters etc).
func GetChainLines(table Table, save []byte) map[Chain]string {
chainsMap := make(map[Chain]string)
tablePrefix := "*" + string(table)
readIndex := 0
// find beginning of table
for readIndex < len(save) {
line, n := ReadLine(readIndex, save)
readIndex = n
if strings.HasPrefix(line, tablePrefix) {
break
}
}
// parse table lines
for readIndex < len(save) {
line, n := ReadLine(readIndex, save)
readIndex = n
if len(line) == 0 {
continue
}
if strings.HasPrefix(line, "COMMIT") || strings.HasPrefix(line, "*") {
break
} else if strings.HasPrefix(line, "#") {
continue
} else if strings.HasPrefix(line, ":") && len(line) > 1 {
chain := Chain(strings.SplitN(line[1:], " ", 2)[0])
chainsMap[chain] = line
}
}
return chainsMap
}
func ReadLine(readIndex int, byteArray []byte) (string, int) {
currentReadIndex := readIndex
// consume left spaces
for currentReadIndex < len(byteArray) {
if byteArray[currentReadIndex] == ' ' {
currentReadIndex++
} else {
break
}
}
// leftTrimIndex stores the left index of the line after the line is left-trimmed
leftTrimIndex := currentReadIndex
// rightTrimIndex stores the right index of the line after the line is right-trimmed
// it is set to -1 since the correct value has not yet been determined.
rightTrimIndex := -1
for ; currentReadIndex < len(byteArray); currentReadIndex++ {
if byteArray[currentReadIndex] == ' ' {
// set rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
}
} else if (byteArray[currentReadIndex] == '\n') || (currentReadIndex == (len(byteArray) - 1)) {
// end of line or byte buffer is reached
if currentReadIndex <= leftTrimIndex {
return "", currentReadIndex + 1
}
// set the rightTrimIndex
if rightTrimIndex == -1 {
rightTrimIndex = currentReadIndex
if currentReadIndex == (len(byteArray)-1) && (byteArray[currentReadIndex] != '\n') {
// ensure that the last character is part of the returned string,
// unless the last character is '\n'
rightTrimIndex = currentReadIndex + 1
}
}
return string(byteArray[leftTrimIndex:rightTrimIndex]), currentReadIndex + 1
} else {
// unset rightTrimIndex
rightTrimIndex = -1
}
}
return "", currentReadIndex
}