add firewall chain to filter request based on loadbalancer source range

This commit is contained in:
Minhan Xia 2016-08-11 16:06:22 -07:00
parent 3bf8679232
commit 643fc3803b
2 changed files with 123 additions and 39 deletions

View File

@ -204,8 +204,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// IPTablesMasqueradeBit must be specified or defaulted.
return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
}
proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname)
proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
@ -409,3 +408,18 @@ func tryIptablesProxy(iptver iptables.IptablesVersioner, kcompat iptables.Kernel
func (s *ProxyServer) birthCry() {
s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")
}
func getNodeIP(client *kubeclient.Client, hostname string) net.IP {
var nodeIP net.IP
node, err := client.Nodes().Get(hostname)
if err != nil {
glog.Warningf("Failed to retrieve node info: %v", err)
return nil
}
nodeIP, err = nodeutil.GetNodeHostIP(node)
if err != nil {
glog.Warningf("Failed to retrieve node IP: %v", err)
return nil
}
return nodeIP
}

View File

@ -45,6 +45,7 @@ import (
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
)
const (
// iptablesMinVersion is the minimum version of iptables for which we will use the Proxier
// from this package instead of the userspace Proxier. While most of the
// features we need were available earlier, the '-C' flag was added more
@ -52,24 +53,27 @@ import (
// have it, we have to be extra careful about the exact args we feed in being
// the same as the args we read back (iptables itself normalizes some args).
// This is the "new" Proxier, so we require "new" versions of tools.
const iptablesMinVersion = utiliptables.MinCheckVersion
iptablesMinVersion = utiliptables.MinCheckVersion
// the services chain
const kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// the nodeports chain
const kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
// the kubernetes postrouting chain
const kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
// the mark-for-masquerade chain
// TODO: let kubelet manage this chain. Other component should just assume it exists and use it.
const KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// the mark we apply to traffic needing SNAT
// TODO(thockin): Remove this for v1.3 or v1.4.
const oldIptablesMasqueradeMark = "0x4d415351"
oldIptablesMasqueradeMark = "0x4d415351"
// the mark-for-drop chain
KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
)
// IptablesVersioner can query the current iptables version.
type IptablesVersioner interface {
@ -136,6 +140,7 @@ type serviceInfo struct {
sessionAffinityType api.ServiceAffinity
stickyMaxAgeSeconds int
externalIPs []string
loadBalancerSourceRanges []string
}
// returns a new serviceInfo struct
@ -164,6 +169,7 @@ type Proxier struct {
exec utilexec.Interface
clusterCIDR string
hostname string
nodeIP net.IP
}
type localPort struct {
@ -189,7 +195,7 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails.
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string) (*Proxier, error) {
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string, nodeIP net.IP) (*Proxier, error) {
// Set the route_localnet sysctl we need for
if err := utilsysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
@ -209,6 +215,10 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
if nodeIP == nil {
glog.Warningf("invalid nodeIP, initialize kube-proxy with 127.0.0.1 as nodeIP")
nodeIP = net.ParseIP("127.0.0.1")
}
return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
endpointsMap: make(map[proxy.ServicePortName][]string),
@ -220,6 +230,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
}, nil
}
@ -428,6 +439,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
// Deep-copy in case the service instance changes
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.sessionAffinityType = service.Spec.SessionAffinity
info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
proxier.serviceMap[serviceName] = info
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
@ -552,15 +564,28 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string {
return result
}
// servicePortChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
// this because Iptables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
func servicePortChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain {
// portProtoHash takes the ServicePortName and protocol for a service
// returns the associated 16 character hash. This is computed by hashing (sha256)
// then encoding to base32 and truncating to 16 chars. We do this because Iptables
// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read.
func portProtoHash(s proxy.ServicePortName, protocol string) string {
hash := sha256.Sum256([]byte(s.String() + protocol))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain("KUBE-SVC-" + encoded[:16])
return encoded[:16]
}
// servicePortChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-".
func servicePortChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain {
return utiliptables.Chain("KUBE-SVC-" + portProtoHash(s, protocol))
}
// servicePortChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-FW-".
func serviceFirewallChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain {
return utiliptables.Chain("KUBE-FW-" + portProtoHash(s, protocol))
}
// This is the same as servicePortChainName but with the endpoint included.
@ -854,9 +879,54 @@ func (proxier *Proxier) syncProxyRules() {
"-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// create service firewall chain
fwChain := serviceFirewallChainName(svcName, protocol)
if chain, ok := existingNATChains[fwChain]; ok {
writeLine(natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(fwChain))
}
// jump to service firewall chain
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
writeLine(natRules, append(args, "-j", string(fwChain))...)
args = []string{
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
"-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", svcInfo.port),
}
// We have to SNAT packets from external IPs.
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
if len(svcInfo.loadBalancerSourceRanges) == 0 {
// allow all sources, so jump directly to KUBE-SVC chain
writeLine(natRules, append(args, "-j", string(svcChain))...)
} else {
// firewall filter based on each source range
allowFromNode := false
for _, src := range svcInfo.loadBalancerSourceRanges {
writeLine(natRules, append(args, "-s", src, "-j", string(svcChain))...)
// ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) {
allowFromNode = true
}
}
// generally, ip route rule was added to intercept request to loadbalancer vip from the
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host.
if allowFromNode {
writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(svcChain))...)
}
}
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...)
}
}