mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Merge pull request #9999 from thockin/low-node-ports
Actually hold NodePorts open in kube-proxy
This commit is contained in:
commit
8fed897f78
@ -144,13 +144,31 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
|
|||||||
// Proxier is an iptables based proxy for connections between a localhost:lport
|
// Proxier is an iptables based proxy for connections between a localhost:lport
|
||||||
// and services that provide the actual backends.
|
// and services that provide the actual backends.
|
||||||
type Proxier struct {
|
type Proxier struct {
|
||||||
mu sync.Mutex // protects serviceMap
|
mu sync.Mutex // protects the following fields
|
||||||
serviceMap map[proxy.ServicePortName]*serviceInfo
|
serviceMap map[proxy.ServicePortName]*serviceInfo
|
||||||
syncPeriod time.Duration
|
portsMap map[localPort]closeable
|
||||||
iptables utiliptables.Interface
|
|
||||||
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
|
||||||
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
|
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
|
||||||
MasqueradeAll bool
|
|
||||||
|
// These are effectively const and do not need the mutex to be held.
|
||||||
|
syncPeriod time.Duration
|
||||||
|
iptables utiliptables.Interface
|
||||||
|
masqueradeAll bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type localPort struct {
|
||||||
|
desc string
|
||||||
|
ip string
|
||||||
|
port int
|
||||||
|
protocol string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lp *localPort) String() string {
|
||||||
|
return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
type closeable interface {
|
||||||
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxier implements ProxyProvider
|
// Proxier implements ProxyProvider
|
||||||
@ -161,8 +179,7 @@ var _ proxy.ProxyProvider = &Proxier{}
|
|||||||
// An error will be returned if iptables fails to update or acquire the initial lock.
|
// An error will be returned if iptables fails to update or acquire the initial lock.
|
||||||
// Once a proxier is created, it will keep iptables up to date in the background and
|
// Once a proxier is created, it will keep iptables up to date in the background and
|
||||||
// will not terminate if a particular iptables call fails.
|
// will not terminate if a particular iptables call fails.
|
||||||
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, MasqueradeAll bool) (*Proxier, error) {
|
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool) (*Proxier, error) {
|
||||||
|
|
||||||
// Set the route_localnet sysctl we need for
|
// Set the route_localnet sysctl we need for
|
||||||
if err := setSysctl(sysctlRouteLocalnet, 1); err != nil {
|
if err := setSysctl(sysctlRouteLocalnet, 1); err != nil {
|
||||||
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
|
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
|
||||||
@ -178,9 +195,10 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
|
|||||||
|
|
||||||
return &Proxier{
|
return &Proxier{
|
||||||
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
||||||
|
portsMap: make(map[localPort]closeable),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
iptables: ipt,
|
iptables: ipt,
|
||||||
MasqueradeAll: MasqueradeAll,
|
masqueradeAll: masqueradeAll,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -241,9 +259,7 @@ func (proxier *Proxier) SyncLoop() {
|
|||||||
func() {
|
func() {
|
||||||
proxier.mu.Lock()
|
proxier.mu.Lock()
|
||||||
defer proxier.mu.Unlock()
|
defer proxier.mu.Unlock()
|
||||||
if err := proxier.syncProxyRules(); err != nil {
|
proxier.syncProxyRules()
|
||||||
glog.Errorf("Failed to sync iptables rules: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -259,17 +275,24 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
|||||||
|
|
||||||
for i := range allServices {
|
for i := range allServices {
|
||||||
service := &allServices[i]
|
service := &allServices[i]
|
||||||
|
svcName := types.NamespacedName{
|
||||||
|
Namespace: service.Namespace,
|
||||||
|
Name: service.Name,
|
||||||
|
}
|
||||||
|
|
||||||
// if ClusterIP is "None" or empty, skip proxying
|
// if ClusterIP is "None" or empty, skip proxying
|
||||||
if !api.IsServiceIPSet(service) {
|
if !api.IsServiceIPSet(service) {
|
||||||
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
|
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
|
|
||||||
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
|
serviceName := proxy.ServicePortName{
|
||||||
|
NamespacedName: svcName,
|
||||||
|
Port: servicePort.Name,
|
||||||
|
}
|
||||||
activeServices[serviceName] = true
|
activeServices[serviceName] = true
|
||||||
info, exists := proxier.serviceMap[serviceName]
|
info, exists := proxier.serviceMap[serviceName]
|
||||||
if exists && proxier.sameConfig(info, service, servicePort) {
|
if exists && proxier.sameConfig(info, service, servicePort) {
|
||||||
@ -308,9 +331,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := proxier.syncProxyRules(); err != nil {
|
proxier.syncProxyRules()
|
||||||
glog.Errorf("Failed to sync iptables rules: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
// OnEndpointsUpdate takes in a slice of updated endpoints.
|
||||||
@ -371,9 +392,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := proxier.syncProxyRules(); err != nil {
|
proxier.syncProxyRules()
|
||||||
glog.Errorf("Failed to sync iptables rules: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// used in OnEndpointsUpdate
|
// used in OnEndpointsUpdate
|
||||||
@ -404,25 +423,26 @@ func flattenValidEndpoints(endpoints []hostPortPair) []string {
|
|||||||
hpp := &endpoints[i]
|
hpp := &endpoints[i]
|
||||||
if isValidEndpoint(hpp) {
|
if isValidEndpoint(hpp) {
|
||||||
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
|
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
|
||||||
|
} else {
|
||||||
|
glog.Warningf("got invalid endpoint: %+v", *hpp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// servicePortToServiceChain takes the ServicePortName for a service and
|
// servicePortChainName takes the ServicePortName for a service and
|
||||||
// returns the associated iptables chain. This is computed by hashing (sha256)
|
// returns the associated iptables chain. This is computed by hashing (sha256)
|
||||||
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
|
// then encoding to base32 and truncating with the prefix "KUBE-SVC-". We do
|
||||||
// this because Iptables Chain Names must be <= 28 chars long, and the longer
|
// this because Iptables Chain Names must be <= 28 chars long, and the longer
|
||||||
// they are the harder they are to read.
|
// they are the harder they are to read.
|
||||||
func servicePortToServiceChain(s proxy.ServicePortName, protocol string) utiliptables.Chain {
|
func servicePortChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain {
|
||||||
hash := sha256.Sum256([]byte(s.String() + protocol))
|
hash := sha256.Sum256([]byte(s.String() + protocol))
|
||||||
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
||||||
return utiliptables.Chain("KUBE-SVC-" + encoded[:16])
|
return utiliptables.Chain("KUBE-SVC-" + encoded[:16])
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is the same as servicePortToServiceChain but with the endpoint
|
// This is the same as servicePortChainName but with the endpoint included.
|
||||||
// included.
|
func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain {
|
||||||
func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain {
|
|
||||||
hash := sha256.Sum256([]byte(s.String() + protocol + endpoint))
|
hash := sha256.Sum256([]byte(s.String() + protocol + endpoint))
|
||||||
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
||||||
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
|
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
|
||||||
@ -431,11 +451,11 @@ func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol stri
|
|||||||
// This is where all of the iptables-save/restore calls happen.
|
// This is where all of the iptables-save/restore calls happen.
|
||||||
// The only other iptables rules are those that are setup in iptablesInit()
|
// The only other iptables rules are those that are setup in iptablesInit()
|
||||||
// assumes proxier.mu is held
|
// assumes proxier.mu is held
|
||||||
func (proxier *Proxier) syncProxyRules() error {
|
func (proxier *Proxier) syncProxyRules() {
|
||||||
// don't sync rules till we've received services and endpoints
|
// don't sync rules till we've received services and endpoints
|
||||||
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
|
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
|
||||||
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
|
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
glog.V(3).Infof("Syncing iptables rules")
|
glog.V(3).Infof("Syncing iptables rules")
|
||||||
|
|
||||||
@ -444,12 +464,14 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
// Link the services chain.
|
// Link the services chain.
|
||||||
for _, chain := range inputChains {
|
for _, chain := range inputChains {
|
||||||
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil {
|
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, iptablesServicesChain); err != nil {
|
||||||
return err
|
glog.Errorf("Failed to ensure that chain %s exists: %v", iptablesServicesChain, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
comment := "kubernetes service portals"
|
comment := "kubernetes service portals"
|
||||||
args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)}
|
args := []string{"-m", "comment", "--comment", comment, "-j", string(iptablesServicesChain)}
|
||||||
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, chain, args...); err != nil {
|
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, chain, args...); err != nil {
|
||||||
return err
|
glog.Errorf("Failed to ensure that chain %s jumps to %s: %v", chain, iptablesServicesChain, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Link the output rules.
|
// Link the output rules.
|
||||||
@ -457,7 +479,8 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
comment := "kubernetes service traffic requiring SNAT"
|
comment := "kubernetes service traffic requiring SNAT"
|
||||||
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"}
|
args := []string{"-m", "comment", "--comment", comment, "-m", "mark", "--mark", iptablesMasqueradeMark, "-j", "MASQUERADE"}
|
||||||
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
|
if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
|
||||||
return err
|
glog.Errorf("Failed to ensure that chain %s obeys MASQUERADE mark: %v", utiliptables.ChainPostrouting, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -491,14 +514,17 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Accumulate chains to keep.
|
// Accumulate chains to keep.
|
||||||
activeChains := make(map[utiliptables.Chain]bool) // use a map as a set
|
activeChains := map[utiliptables.Chain]bool{} // use a map as a set
|
||||||
|
|
||||||
|
// Accumulate new local ports that we have opened.
|
||||||
|
newLocalPorts := map[localPort]closeable{}
|
||||||
|
|
||||||
// Build rules for each service.
|
// Build rules for each service.
|
||||||
for name, info := range proxier.serviceMap {
|
for svcName, svcInfo := range proxier.serviceMap {
|
||||||
protocol := strings.ToLower(string(info.protocol))
|
protocol := strings.ToLower(string(svcInfo.protocol))
|
||||||
|
|
||||||
// Create the per-service chain, retaining counters if possible.
|
// Create the per-service chain, retaining counters if possible.
|
||||||
svcChain := servicePortToServiceChain(name, protocol)
|
svcChain := servicePortChainName(svcName, protocol)
|
||||||
if chain, ok := existingChains[svcChain]; ok {
|
if chain, ok := existingChains[svcChain]; ok {
|
||||||
writeLine(chainsLines, chain)
|
writeLine(chainsLines, chain)
|
||||||
} else {
|
} else {
|
||||||
@ -509,12 +535,12 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
// Capture the clusterIP.
|
// Capture the clusterIP.
|
||||||
args := []string{
|
args := []string{
|
||||||
"-A", string(iptablesServicesChain),
|
"-A", string(iptablesServicesChain),
|
||||||
"-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", name.String()),
|
"-m", "comment", "--comment", fmt.Sprintf("\"%s cluster IP\"", svcName.String()),
|
||||||
"-m", protocol, "-p", protocol,
|
"-m", protocol, "-p", protocol,
|
||||||
"-d", fmt.Sprintf("%s/32", info.clusterIP.String()),
|
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
|
||||||
"--dport", fmt.Sprintf("%d", info.port),
|
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||||
}
|
}
|
||||||
if proxier.MasqueradeAll {
|
if proxier.masqueradeAll {
|
||||||
writeLine(rulesLines, append(args,
|
writeLine(rulesLines, append(args,
|
||||||
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
|
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
|
||||||
}
|
}
|
||||||
@ -522,13 +548,36 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
"-j", string(svcChain))...)
|
"-j", string(svcChain))...)
|
||||||
|
|
||||||
// Capture externalIPs.
|
// Capture externalIPs.
|
||||||
for _, externalIP := range info.externalIPs {
|
for _, externalIP := range svcInfo.externalIPs {
|
||||||
|
// If the "external" IP happens to be an IP that is local to this
|
||||||
|
// machine, hold the local port open so no other process can open it
|
||||||
|
// (because the socket might open but it would never work).
|
||||||
|
if local, err := isLocalIP(externalIP); err != nil {
|
||||||
|
glog.Errorf("can't determine if IP is local, assuming not: %v", err)
|
||||||
|
} else if local {
|
||||||
|
lp := localPort{
|
||||||
|
desc: "externalIP for " + svcName.String(),
|
||||||
|
ip: externalIP,
|
||||||
|
port: svcInfo.port,
|
||||||
|
protocol: protocol,
|
||||||
|
}
|
||||||
|
if proxier.portsMap[lp] != nil {
|
||||||
|
newLocalPorts[lp] = proxier.portsMap[lp]
|
||||||
|
} else {
|
||||||
|
socket, err := openLocalPort(&lp)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newLocalPorts[lp] = socket
|
||||||
|
}
|
||||||
|
} // We're holding the port, so it's OK to install iptables rules.
|
||||||
args := []string{
|
args := []string{
|
||||||
"-A", string(iptablesServicesChain),
|
"-A", string(iptablesServicesChain),
|
||||||
"-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", name.String()),
|
"-m", "comment", "--comment", fmt.Sprintf("\"%s external IP\"", svcName.String()),
|
||||||
"-m", protocol, "-p", protocol,
|
"-m", protocol, "-p", protocol,
|
||||||
"-d", fmt.Sprintf("%s/32", externalIP),
|
"-d", fmt.Sprintf("%s/32", externalIP),
|
||||||
"--dport", fmt.Sprintf("%d", info.port),
|
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||||
}
|
}
|
||||||
// We have to SNAT packets to external IPs.
|
// We have to SNAT packets to external IPs.
|
||||||
writeLine(rulesLines, append(args,
|
writeLine(rulesLines, append(args,
|
||||||
@ -551,14 +600,14 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Capture load-balancer ingress.
|
// Capture load-balancer ingress.
|
||||||
for _, ingress := range info.loadBalancerStatus.Ingress {
|
for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
|
||||||
if ingress.IP != "" {
|
if ingress.IP != "" {
|
||||||
args := []string{
|
args := []string{
|
||||||
"-A", string(iptablesServicesChain),
|
"-A", string(iptablesServicesChain),
|
||||||
"-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", name.String()),
|
"-m", "comment", "--comment", fmt.Sprintf("\"%s loadbalancer IP\"", svcName.String()),
|
||||||
"-m", protocol, "-p", protocol,
|
"-m", protocol, "-p", protocol,
|
||||||
"-d", fmt.Sprintf("%s/32", ingress.IP),
|
"-d", fmt.Sprintf("%s/32", ingress.IP),
|
||||||
"--dport", fmt.Sprintf("%d", info.port),
|
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||||
}
|
}
|
||||||
// We have to SNAT packets from external IPs.
|
// We have to SNAT packets from external IPs.
|
||||||
writeLine(rulesLines, append(args,
|
writeLine(rulesLines, append(args,
|
||||||
@ -571,20 +620,38 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
// Capture nodeports. If we had more than 2 rules it might be
|
// Capture nodeports. If we had more than 2 rules it might be
|
||||||
// worthwhile to make a new per-service chain for nodeport rules, but
|
// worthwhile to make a new per-service chain for nodeport rules, but
|
||||||
// with just 2 rules it ends up being a waste and a cognitive burden.
|
// with just 2 rules it ends up being a waste and a cognitive burden.
|
||||||
if info.nodePort != 0 {
|
if svcInfo.nodePort != 0 {
|
||||||
|
// Hold the local port open so no other process can open it
|
||||||
|
// (because the socket might open but it would never work).
|
||||||
|
lp := localPort{
|
||||||
|
desc: "nodePort for " + svcName.String(),
|
||||||
|
ip: "",
|
||||||
|
port: svcInfo.nodePort,
|
||||||
|
protocol: protocol,
|
||||||
|
}
|
||||||
|
if proxier.portsMap[lp] != nil {
|
||||||
|
newLocalPorts[lp] = proxier.portsMap[lp]
|
||||||
|
} else {
|
||||||
|
socket, err := openLocalPort(&lp)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newLocalPorts[lp] = socket
|
||||||
|
} // We're holding the port, so it's OK to install iptables rules.
|
||||||
// Nodeports need SNAT.
|
// Nodeports need SNAT.
|
||||||
writeLine(rulesLines,
|
writeLine(rulesLines,
|
||||||
"-A", string(iptablesNodePortsChain),
|
"-A", string(iptablesNodePortsChain),
|
||||||
"-m", "comment", "--comment", name.String(),
|
"-m", "comment", "--comment", svcName.String(),
|
||||||
"-m", protocol, "-p", protocol,
|
"-m", protocol, "-p", protocol,
|
||||||
"--dport", fmt.Sprintf("%d", info.nodePort),
|
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
|
||||||
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))
|
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))
|
||||||
// Jump to the service chain.
|
// Jump to the service chain.
|
||||||
writeLine(rulesLines,
|
writeLine(rulesLines,
|
||||||
"-A", string(iptablesNodePortsChain),
|
"-A", string(iptablesNodePortsChain),
|
||||||
"-m", "comment", "--comment", name.String(),
|
"-m", "comment", "--comment", svcName.String(),
|
||||||
"-m", protocol, "-p", protocol,
|
"-m", protocol, "-p", protocol,
|
||||||
"--dport", fmt.Sprintf("%d", info.nodePort),
|
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
|
||||||
"-j", string(svcChain))
|
"-j", string(svcChain))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -592,9 +659,9 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
// can group rules together.
|
// can group rules together.
|
||||||
endpoints := make([]string, 0)
|
endpoints := make([]string, 0)
|
||||||
endpointChains := make([]utiliptables.Chain, 0)
|
endpointChains := make([]utiliptables.Chain, 0)
|
||||||
for _, ep := range info.endpoints {
|
for _, ep := range svcInfo.endpoints {
|
||||||
endpoints = append(endpoints, ep)
|
endpoints = append(endpoints, ep)
|
||||||
endpointChain := servicePortAndEndpointToServiceChain(name, protocol, ep)
|
endpointChain := servicePortEndpointChainName(svcName, protocol, ep)
|
||||||
endpointChains = append(endpointChains, endpointChain)
|
endpointChains = append(endpointChains, endpointChain)
|
||||||
|
|
||||||
// Create the endpoint chain, retaining counters if possible.
|
// Create the endpoint chain, retaining counters if possible.
|
||||||
@ -607,13 +674,13 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// First write session affinity rules, if applicable.
|
// First write session affinity rules, if applicable.
|
||||||
if info.sessionAffinityType == api.ServiceAffinityClientIP {
|
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
|
||||||
for _, endpointChain := range endpointChains {
|
for _, endpointChain := range endpointChains {
|
||||||
writeLine(rulesLines,
|
writeLine(rulesLines,
|
||||||
"-A", string(svcChain),
|
"-A", string(svcChain),
|
||||||
"-m", "comment", "--comment", name.String(),
|
"-m", "comment", "--comment", svcName.String(),
|
||||||
"-m", "recent", "--name", string(endpointChain),
|
"-m", "recent", "--name", string(endpointChain),
|
||||||
"--rcheck", "--seconds", fmt.Sprintf("%d", info.stickyMaxAgeSeconds), "--reap",
|
"--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeSeconds), "--reap",
|
||||||
"-j", string(endpointChain))
|
"-j", string(endpointChain))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -624,7 +691,7 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
// Balancing rules in the per-service chain.
|
// Balancing rules in the per-service chain.
|
||||||
args := []string{
|
args := []string{
|
||||||
"-A", string(svcChain),
|
"-A", string(svcChain),
|
||||||
"-m", "comment", "--comment", name.String(),
|
"-m", "comment", "--comment", svcName.String(),
|
||||||
}
|
}
|
||||||
if i < (n - 1) {
|
if i < (n - 1) {
|
||||||
// Each rule is a probabilistic match.
|
// Each rule is a probabilistic match.
|
||||||
@ -640,7 +707,7 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
// Rules in the per-endpoint chain.
|
// Rules in the per-endpoint chain.
|
||||||
args = []string{
|
args = []string{
|
||||||
"-A", string(endpointChain),
|
"-A", string(endpointChain),
|
||||||
"-m", "comment", "--comment", name.String(),
|
"-m", "comment", "--comment", svcName.String(),
|
||||||
}
|
}
|
||||||
// Handle traffic that loops back to the originator with SNAT.
|
// Handle traffic that loops back to the originator with SNAT.
|
||||||
// Technically we only need to do this if the endpoint is on this
|
// Technically we only need to do this if the endpoint is on this
|
||||||
@ -652,7 +719,7 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
|
"-j", "MARK", "--set-xmark", fmt.Sprintf("%s/0xffffffff", iptablesMasqueradeMark))...)
|
||||||
|
|
||||||
// Update client-affinity lists.
|
// Update client-affinity lists.
|
||||||
if info.sessionAffinityType == api.ServiceAffinityClientIP {
|
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
|
||||||
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
|
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
|
||||||
}
|
}
|
||||||
// DNAT to final destination.
|
// DNAT to final destination.
|
||||||
@ -694,7 +761,23 @@ func (proxier *Proxier) syncProxyRules() error {
|
|||||||
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
|
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
|
||||||
lines := append(chainsLines.Bytes(), rulesLines.Bytes()...)
|
lines := append(chainsLines.Bytes(), rulesLines.Bytes()...)
|
||||||
glog.V(3).Infof("Syncing rules: %s", lines)
|
glog.V(3).Infof("Syncing rules: %s", lines)
|
||||||
return proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
err = proxier.iptables.Restore(utiliptables.TableNAT, lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to sync iptables rules: %v", err)
|
||||||
|
// Revert new local ports.
|
||||||
|
for k, v := range newLocalPorts {
|
||||||
|
glog.Errorf("Closing local port %s", k.String())
|
||||||
|
v.Close()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Close old local ports and save new ones.
|
||||||
|
for k, v := range proxier.portsMap {
|
||||||
|
if newLocalPorts[k] == nil {
|
||||||
|
v.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
proxier.portsMap = newLocalPorts
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Join all words with spaces, terminate with newline and write to buf.
|
// Join all words with spaces, terminate with newline and write to buf.
|
||||||
@ -736,3 +819,58 @@ func getChainLines(table utiliptables.Table, save []byte) map[utiliptables.Chain
|
|||||||
}
|
}
|
||||||
return chainsMap
|
return chainsMap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isLocalIP(ip string) (bool, error) {
|
||||||
|
addrs, err := net.InterfaceAddrs()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
for i := range addrs {
|
||||||
|
intf, _, err := net.ParseCIDR(addrs[i].String())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if net.ParseIP(ip).Equal(intf) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func openLocalPort(lp *localPort) (closeable, error) {
|
||||||
|
// For ports on node IPs, open the actual port and hold it, even though we
|
||||||
|
// use iptables to redirect traffic.
|
||||||
|
// This ensures a) that it's safe to use that port and b) that (a) stays
|
||||||
|
// true. The risk is that some process on the node (e.g. sshd or kubelet)
|
||||||
|
// is using a port and we give that same port out to a Service. That would
|
||||||
|
// be bad because iptables would silently claim the traffic but the process
|
||||||
|
// would never know.
|
||||||
|
// NOTE: We should not need to have a real listen()ing socket - bind()
|
||||||
|
// should be enough, but I can't figure out a way to e2e test without
|
||||||
|
// it. Tools like 'ss' and 'netstat' do not show sockets that are
|
||||||
|
// bind()ed but not listen()ed, and at least the default debian netcat
|
||||||
|
// has no way to avoid about 10 seconds of retries.
|
||||||
|
var socket closeable
|
||||||
|
switch lp.protocol {
|
||||||
|
case "tcp":
|
||||||
|
listener, err := net.Listen("tcp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
socket = listener
|
||||||
|
case "udp":
|
||||||
|
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(lp.ip, strconv.Itoa(lp.port)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
conn, err := net.ListenUDP("udp", addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
socket = conn
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown protocol %q", lp.protocol)
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Opened local port %s", lp.String())
|
||||||
|
return socket, nil
|
||||||
|
}
|
||||||
|
@ -73,7 +73,7 @@ type Proxier struct {
|
|||||||
serviceMap map[proxy.ServicePortName]*serviceInfo
|
serviceMap map[proxy.ServicePortName]*serviceInfo
|
||||||
syncPeriod time.Duration
|
syncPeriod time.Duration
|
||||||
portMapMutex sync.Mutex
|
portMapMutex sync.Mutex
|
||||||
portMap map[portMapKey]proxy.ServicePortName
|
portMap map[portMapKey]*portMapValue
|
||||||
numProxyLoops int32 // use atomic ops to access this; mostly for testing
|
numProxyLoops int32 // use atomic ops to access this; mostly for testing
|
||||||
listenIP net.IP
|
listenIP net.IP
|
||||||
iptables iptables.Interface
|
iptables iptables.Interface
|
||||||
@ -84,14 +84,24 @@ type Proxier struct {
|
|||||||
// assert Proxier is a ProxyProvider
|
// assert Proxier is a ProxyProvider
|
||||||
var _ proxy.ProxyProvider = &Proxier{}
|
var _ proxy.ProxyProvider = &Proxier{}
|
||||||
|
|
||||||
// A key for the portMap
|
// A key for the portMap. The ip has to be a tring because slices can't be map
|
||||||
|
// keys.
|
||||||
type portMapKey struct {
|
type portMapKey struct {
|
||||||
|
ip string
|
||||||
port int
|
port int
|
||||||
protocol api.Protocol
|
protocol api.Protocol
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *portMapKey) String() string {
|
func (k *portMapKey) String() string {
|
||||||
return fmt.Sprintf("%s/%d", k.protocol, k.port)
|
return fmt.Sprintf("%s:%d/%s", k.ip, k.port, k.protocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
// A value for the portMap
|
||||||
|
type portMapValue struct {
|
||||||
|
owner proxy.ServicePortName
|
||||||
|
socket interface {
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -146,7 +156,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
|
|||||||
return &Proxier{
|
return &Proxier{
|
||||||
loadBalancer: loadBalancer,
|
loadBalancer: loadBalancer,
|
||||||
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
||||||
portMap: make(map[portMapKey]proxy.ServicePortName),
|
portMap: make(map[portMapKey]*portMapValue),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
listenIP: listenIP,
|
listenIP: listenIP,
|
||||||
iptables: iptables,
|
iptables: iptables,
|
||||||
@ -451,6 +461,15 @@ func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceI
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
|
func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) error {
|
||||||
|
if local, err := isLocalIP(portal.ip); err != nil {
|
||||||
|
return fmt.Errorf("can't determine if IP is local, assuming not: %v", err)
|
||||||
|
} else if local {
|
||||||
|
err := proxier.claimNodePort(portal.ip, portal.port, protocol, name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Handle traffic from containers.
|
// Handle traffic from containers.
|
||||||
args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
|
args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
|
||||||
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
|
existed, err := proxier.iptables.EnsureRule(iptables.Append, iptables.TableNAT, iptablesContainerPortalChain, args...)
|
||||||
@ -499,42 +518,55 @@ func (proxier *Proxier) openOnePortal(portal portal, protocol api.Protocol, prox
|
|||||||
|
|
||||||
// Marks a port as being owned by a particular service, or returns error if already claimed.
|
// Marks a port as being owned by a particular service, or returns error if already claimed.
|
||||||
// Idempotent: reclaiming with the same owner is not an error
|
// Idempotent: reclaiming with the same owner is not an error
|
||||||
func (proxier *Proxier) claimPort(port int, protocol api.Protocol, owner proxy.ServicePortName) error {
|
func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {
|
||||||
proxier.portMapMutex.Lock()
|
proxier.portMapMutex.Lock()
|
||||||
defer proxier.portMapMutex.Unlock()
|
defer proxier.portMapMutex.Unlock()
|
||||||
|
|
||||||
// TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports
|
// TODO: We could pre-populate some reserved ports into portMap and/or blacklist some well-known ports
|
||||||
|
|
||||||
key := portMapKey{port: port, protocol: protocol}
|
key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
|
||||||
existing, found := proxier.portMap[key]
|
existing, found := proxier.portMap[key]
|
||||||
if !found {
|
if !found {
|
||||||
proxier.portMap[key] = owner
|
// Hold the actual port open, even though we use iptables to redirect
|
||||||
|
// it. This ensures that a) it's safe to take and b) that stays true.
|
||||||
|
// NOTE: We should not need to have a real listen()ing socket - bind()
|
||||||
|
// should be enough, but I can't figure out a way to e2e test without
|
||||||
|
// it. Tools like 'ss' and 'netstat' do not show sockets that are
|
||||||
|
// bind()ed but not listen()ed, and at least the default debian netcat
|
||||||
|
// has no way to avoid about 10 seconds of retries.
|
||||||
|
socket, err := newProxySocket(protocol, ip, port)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
|
||||||
|
}
|
||||||
|
proxier.portMap[key] = &portMapValue{owner: owner, socket: socket}
|
||||||
|
glog.V(2).Infof("Claimed local port %s", key.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if existing == owner {
|
if existing.owner == owner {
|
||||||
// We are idempotent
|
// We are idempotent
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("Port conflict detected on port %v. %v vs %v", key, owner, existing)
|
return fmt.Errorf("Port conflict detected on port %s. %v vs %v", key.String(), owner, existing)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release a claim on a port. Returns an error if the owner does not match the claim.
|
// Release a claim on a port. Returns an error if the owner does not match the claim.
|
||||||
// Tolerates release on an unclaimed port, to simplify .
|
// Tolerates release on an unclaimed port, to simplify .
|
||||||
func (proxier *Proxier) releasePort(port int, protocol api.Protocol, owner proxy.ServicePortName) error {
|
func (proxier *Proxier) releaseNodePort(ip net.IP, port int, protocol api.Protocol, owner proxy.ServicePortName) error {
|
||||||
proxier.portMapMutex.Lock()
|
proxier.portMapMutex.Lock()
|
||||||
defer proxier.portMapMutex.Unlock()
|
defer proxier.portMapMutex.Unlock()
|
||||||
|
|
||||||
key := portMapKey{port: port, protocol: protocol}
|
key := portMapKey{ip: ip.String(), port: port, protocol: protocol}
|
||||||
existing, found := proxier.portMap[key]
|
existing, found := proxier.portMap[key]
|
||||||
if !found {
|
if !found {
|
||||||
// We tolerate this, it happens if we are cleaning up a failed allocation
|
// We tolerate this, it happens if we are cleaning up a failed allocation
|
||||||
glog.Infof("Ignoring release on unowned port: %v", key)
|
glog.Infof("Ignoring release on unowned port: %v", key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if existing != owner {
|
if existing.owner != owner {
|
||||||
return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
|
return fmt.Errorf("Port conflict detected on port %v (unowned unlock). %v vs %v", key, owner, existing)
|
||||||
}
|
}
|
||||||
delete(proxier.portMap, key)
|
delete(proxier.portMap, key)
|
||||||
|
existing.socket.Close()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -542,7 +574,7 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI
|
|||||||
// TODO: Do we want to allow containers to access public services? Probably yes.
|
// TODO: Do we want to allow containers to access public services? Probably yes.
|
||||||
// TODO: We could refactor this to be the same code as portal, but with IP == nil
|
// TODO: We could refactor this to be the same code as portal, but with IP == nil
|
||||||
|
|
||||||
err := proxier.claimPort(nodePort, protocol, name)
|
err := proxier.claimNodePort(nil, nodePort, protocol, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -596,6 +628,14 @@ func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *service
|
|||||||
func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
|
func (proxier *Proxier) closeOnePortal(portal portal, protocol api.Protocol, proxyIP net.IP, proxyPort int, name proxy.ServicePortName) []error {
|
||||||
el := []error{}
|
el := []error{}
|
||||||
|
|
||||||
|
if local, err := isLocalIP(portal.ip); err != nil {
|
||||||
|
el = append(el, fmt.Errorf("can't determine if IP is local, assuming not: %v", err))
|
||||||
|
} else if local {
|
||||||
|
if err := proxier.releaseNodePort(nil, portal.port, protocol, name); err != nil {
|
||||||
|
el = append(el, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Handle traffic from containers.
|
// Handle traffic from containers.
|
||||||
args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
|
args := proxier.iptablesContainerPortalArgs(portal.ip, portal.isExternal, false, portal.port, protocol, proxyIP, proxyPort, name)
|
||||||
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerPortalChain, args...); err != nil {
|
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesContainerPortalChain, args...); err != nil {
|
||||||
@ -645,13 +685,30 @@ func (proxier *Proxier) closeNodePort(nodePort int, protocol api.Protocol, proxy
|
|||||||
el = append(el, err)
|
el = append(el, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := proxier.releasePort(nodePort, protocol, name); err != nil {
|
if err := proxier.releaseNodePort(nil, nodePort, protocol, name); err != nil {
|
||||||
el = append(el, err)
|
el = append(el, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return el
|
return el
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isLocalIP(ip net.IP) (bool, error) {
|
||||||
|
addrs, err := net.InterfaceAddrs()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
for i := range addrs {
|
||||||
|
intf, _, err := net.ParseCIDR(addrs[i].String())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if ip.Equal(intf) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// See comments in the *PortalArgs() functions for some details about why we
|
// See comments in the *PortalArgs() functions for some details about why we
|
||||||
// use two chains for portals.
|
// use two chains for portals.
|
||||||
var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER"
|
var iptablesContainerPortalChain iptables.Chain = "KUBE-PORTALS-CONTAINER"
|
||||||
|
@ -46,7 +46,11 @@ type proxySocket interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
|
func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
|
||||||
host := ip.String()
|
host := ""
|
||||||
|
if ip != nil {
|
||||||
|
host = ip.String()
|
||||||
|
}
|
||||||
|
|
||||||
switch strings.ToUpper(string(protocol)) {
|
switch strings.ToUpper(string(protocol)) {
|
||||||
case "TCP":
|
case "TCP":
|
||||||
listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
|
listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
|
||||||
|
@ -69,6 +69,7 @@ var _ = Describe("Services", func() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// TODO: We get coverage of TCP/UDP and multi-port services through the DNS test. We should have a simpler test for multi-port TCP here.
|
// TODO: We get coverage of TCP/UDP and multi-port services through the DNS test. We should have a simpler test for multi-port TCP here.
|
||||||
It("should provide secure master service", func() {
|
It("should provide secure master service", func() {
|
||||||
_, err := c.Services(api.NamespaceDefault).Get("kubernetes")
|
_, err := c.Services(api.NamespaceDefault).Get("kubernetes")
|
||||||
@ -460,6 +461,16 @@ var _ = Describe("Services", func() {
|
|||||||
By("hitting the pod through the service's NodePort")
|
By("hitting the pod through the service's NodePort")
|
||||||
ip := pickMinionIP(c)
|
ip := pickMinionIP(c)
|
||||||
testReachable(ip, nodePort)
|
testReachable(ip, nodePort)
|
||||||
|
|
||||||
|
hosts, err := NodeSSHHosts(c)
|
||||||
|
if err != nil {
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort)
|
||||||
|
_, _, code, err := SSH(cmd, hosts[0], testContext.Provider)
|
||||||
|
if code != 0 {
|
||||||
|
Failf("expected node port (%d) to be in use", nodePort)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
It("should be able to change the type and nodeport settings of a service", func() {
|
It("should be able to change the type and nodeport settings of a service", func() {
|
||||||
@ -878,16 +889,26 @@ var _ = Describe("Services", func() {
|
|||||||
if !ServiceNodePortRange.Contains(port.NodePort) {
|
if !ServiceNodePortRange.Contains(port.NodePort) {
|
||||||
Failf("got unexpected (out-of-range) port for new service: %v", service)
|
Failf("got unexpected (out-of-range) port for new service: %v", service)
|
||||||
}
|
}
|
||||||
port1 := port.NodePort
|
nodePort := port.NodePort
|
||||||
|
|
||||||
By("deleting original service " + serviceName)
|
By("deleting original service " + serviceName)
|
||||||
err = t.DeleteService(serviceName)
|
err = t.DeleteService(serviceName)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", port1))
|
hosts, err := NodeSSHHosts(c)
|
||||||
|
if err != nil {
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}
|
||||||
|
cmd := fmt.Sprintf(`test -n "$(ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN)"`, nodePort)
|
||||||
|
_, _, code, err := SSH(cmd, hosts[0], testContext.Provider)
|
||||||
|
if code == 0 {
|
||||||
|
Failf("expected node port (%d) to not be in use", nodePort)
|
||||||
|
}
|
||||||
|
|
||||||
|
By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))
|
||||||
service = t.BuildServiceSpec()
|
service = t.BuildServiceSpec()
|
||||||
service.Spec.Type = api.ServiceTypeNodePort
|
service.Spec.Type = api.ServiceTypeNodePort
|
||||||
service.Spec.Ports[0].NodePort = port1
|
service.Spec.Ports[0].NodePort = nodePort
|
||||||
service, err = t.CreateService(service)
|
service, err = t.CreateService(service)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user