mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 05:46:16 +00:00
Add ServiceType = NodePort; wire everything up
This commit is contained in:
parent
03cdc077c3
commit
7346cc8042
@ -187,7 +187,7 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
|
|||||||
*p = types[c.Rand.Intn(len(types))]
|
*p = types[c.Rand.Intn(len(types))]
|
||||||
},
|
},
|
||||||
func(p *api.ServiceType, c fuzz.Continue) {
|
func(p *api.ServiceType, c fuzz.Continue) {
|
||||||
types := []api.ServiceType{api.ServiceTypeClusterIP, api.ServiceTypeLoadBalancer}
|
types := []api.ServiceType{api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer}
|
||||||
*p = types[c.Rand.Intn(len(types))]
|
*p = types[c.Rand.Intn(len(types))]
|
||||||
},
|
},
|
||||||
func(ct *api.Container, c fuzz.Continue) {
|
func(ct *api.Container, c fuzz.Continue) {
|
||||||
|
@ -1019,6 +1019,10 @@ const (
|
|||||||
// cluster, via the portal IP.
|
// cluster, via the portal IP.
|
||||||
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
||||||
|
|
||||||
|
// ServiceTypeNodePort means a service will be exposed on one port of
|
||||||
|
// every node, in addition to 'ClusterIP' visibility.
|
||||||
|
ServiceTypeNodePort ServiceType = "NodePort"
|
||||||
|
|
||||||
// ServiceTypeLoadBalancer means a service will be exposed via an
|
// ServiceTypeLoadBalancer means a service will be exposed via an
|
||||||
// external load balancer (if the cloud provider supports it), in addition
|
// external load balancer (if the cloud provider supports it), in addition
|
||||||
// to 'NodePort' type.
|
// to 'NodePort' type.
|
||||||
|
@ -1001,6 +1001,10 @@ const (
|
|||||||
// cluster, via the portal IP.
|
// cluster, via the portal IP.
|
||||||
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
||||||
|
|
||||||
|
// ServiceTypeNodePort means a service will be exposed on one port of
|
||||||
|
// every node, in addition to 'ClusterIP' visibility.
|
||||||
|
ServiceTypeNodePort ServiceType = "NodePort"
|
||||||
|
|
||||||
// ServiceTypeLoadBalancer means a service will be exposed via an
|
// ServiceTypeLoadBalancer means a service will be exposed via an
|
||||||
// external load balancer (if the cloud provider supports it), in addition
|
// external load balancer (if the cloud provider supports it), in addition
|
||||||
// to 'NodePort' type.
|
// to 'NodePort' type.
|
||||||
|
@ -843,6 +843,10 @@ const (
|
|||||||
// cluster, via the portal IP.
|
// cluster, via the portal IP.
|
||||||
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
||||||
|
|
||||||
|
// ServiceTypeNodePort means a service will be exposed on one port of
|
||||||
|
// every node, in addition to 'ClusterIP' visibility.
|
||||||
|
ServiceTypeNodePort ServiceType = "NodePort"
|
||||||
|
|
||||||
// ServiceTypeLoadBalancer means a service will be exposed via an
|
// ServiceTypeLoadBalancer means a service will be exposed via an
|
||||||
// external load balancer (if the cloud provider supports it), in addition
|
// external load balancer (if the cloud provider supports it), in addition
|
||||||
// to 'NodePort' type.
|
// to 'NodePort' type.
|
||||||
|
@ -845,6 +845,10 @@ const (
|
|||||||
// cluster, via the portal IP.
|
// cluster, via the portal IP.
|
||||||
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
||||||
|
|
||||||
|
// ServiceTypeNodePort means a service will be exposed on one port of
|
||||||
|
// every node, in addition to 'ClusterIP' visibility.
|
||||||
|
ServiceTypeNodePort ServiceType = "NodePort"
|
||||||
|
|
||||||
// ServiceTypeLoadBalancer means a service will be exposed via an
|
// ServiceTypeLoadBalancer means a service will be exposed via an
|
||||||
// external load balancer (if the cloud provider supports it), in addition
|
// external load balancer (if the cloud provider supports it), in addition
|
||||||
// to 'NodePort' type.
|
// to 'NodePort' type.
|
||||||
|
@ -1005,6 +1005,10 @@ const (
|
|||||||
// cluster, via the portal IP.
|
// cluster, via the portal IP.
|
||||||
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
ServiceTypeClusterIP ServiceType = "ClusterIP"
|
||||||
|
|
||||||
|
// ServiceTypeNodePort means a service will be exposed on one port of
|
||||||
|
// every node, in addition to 'ClusterIP' visibility.
|
||||||
|
ServiceTypeNodePort ServiceType = "NodePort"
|
||||||
|
|
||||||
// ServiceTypeLoadBalancer means a service will be exposed via an
|
// ServiceTypeLoadBalancer means a service will be exposed via an
|
||||||
// external load balancer (if the cloud provider supports it), in addition
|
// external load balancer (if the cloud provider supports it), in addition
|
||||||
// to 'NodePort' type.
|
// to 'NodePort' type.
|
||||||
|
@ -1032,7 +1032,7 @@ func ValidatePodTemplateUpdate(newPod, oldPod *api.PodTemplate) errs.ValidationE
|
|||||||
}
|
}
|
||||||
|
|
||||||
var supportedSessionAffinityType = util.NewStringSet(string(api.ServiceAffinityClientIP), string(api.ServiceAffinityNone))
|
var supportedSessionAffinityType = util.NewStringSet(string(api.ServiceAffinityClientIP), string(api.ServiceAffinityNone))
|
||||||
var supportedServiceType = util.NewStringSet(string(api.ServiceTypeClusterIP),
|
var supportedServiceType = util.NewStringSet(string(api.ServiceTypeClusterIP), string(api.ServiceTypeNodePort),
|
||||||
string(api.ServiceTypeLoadBalancer))
|
string(api.ServiceTypeLoadBalancer))
|
||||||
|
|
||||||
// ValidateService tests if required fields in the service are set.
|
// ValidateService tests if required fields in the service are set.
|
||||||
@ -1086,6 +1086,14 @@ func ValidateService(service *api.Service) errs.ValidationErrorList {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if service.Spec.Type == api.ServiceTypeClusterIP {
|
||||||
|
for i := range service.Spec.Ports {
|
||||||
|
if service.Spec.Ports[i].NodePort != 0 {
|
||||||
|
allErrs = append(allErrs, errs.NewFieldInvalid("spec.ports", service.Spec.Ports[i], "cannot specify a node port with cluster-visibility services"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Check for duplicate NodePorts, considering (protocol,port) pairs
|
// Check for duplicate NodePorts, considering (protocol,port) pairs
|
||||||
nodePorts := make(map[api.ServicePort]bool)
|
nodePorts := make(map[api.ServicePort]bool)
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
@ -1103,11 +1111,6 @@ func ValidateService(service *api.Service) errs.ValidationErrorList {
|
|||||||
nodePorts[key] = true
|
nodePorts[key] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Temporary validation to prevent people creating NodePorts before we have the full infrastructure in place
|
|
||||||
if len(nodePorts) > 0 {
|
|
||||||
allErrs = append(allErrs, errs.NewFieldInvalid("spec.ports", service.Spec.Ports[0], "nodePorts not (yet) enabled"))
|
|
||||||
}
|
|
||||||
|
|
||||||
return allErrs
|
return allErrs
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1373,7 +1376,7 @@ func ValidateSecret(secret *api.Secret) errs.ValidationErrorList {
|
|||||||
allErrs = append(allErrs, errs.NewFieldRequired(fmt.Sprintf("metadata.annotations[%s]", api.ServiceAccountNameKey)))
|
allErrs = append(allErrs, errs.NewFieldRequired(fmt.Sprintf("metadata.annotations[%s]", api.ServiceAccountNameKey)))
|
||||||
}
|
}
|
||||||
case api.SecretTypeOpaque, "":
|
case api.SecretTypeOpaque, "":
|
||||||
// no-op
|
// no-op
|
||||||
case api.SecretTypeDockercfg:
|
case api.SecretTypeDockercfg:
|
||||||
dockercfgBytes, exists := secret.Data[api.DockerConfigKey]
|
dockercfgBytes, exists := secret.Data[api.DockerConfigKey]
|
||||||
if !exists {
|
if !exists {
|
||||||
|
@ -1691,14 +1691,14 @@ func TestValidateService(t *testing.T) {
|
|||||||
numErrs: 0,
|
numErrs: 0,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "valid visbility - cluster",
|
name: "valid type - cluster",
|
||||||
tweakSvc: func(s *api.Service) {
|
tweakSvc: func(s *api.Service) {
|
||||||
s.Spec.Type = api.ServiceTypeClusterIP
|
s.Spec.Type = api.ServiceTypeClusterIP
|
||||||
},
|
},
|
||||||
numErrs: 0,
|
numErrs: 0,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "valid visbility - loadbalancer",
|
name: "valid type - loadbalancer",
|
||||||
tweakSvc: func(s *api.Service) {
|
tweakSvc: func(s *api.Service) {
|
||||||
s.Spec.Type = api.ServiceTypeLoadBalancer
|
s.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
},
|
},
|
||||||
@ -1723,18 +1723,106 @@ func TestValidateService(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "duplicate nodeports",
|
name: "duplicate nodeports",
|
||||||
tweakSvc: func(s *api.Service) {
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeNodePort
|
||||||
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1})
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1})
|
||||||
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "r", Port: 2, Protocol: "TCP", NodePort: 1})
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "r", Port: 2, Protocol: "TCP", NodePort: 1})
|
||||||
},
|
},
|
||||||
numErrs: 2, // TODO(justinsb): change to 1 when NodePorts enabled
|
numErrs: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "duplicate nodeports (different protocols)",
|
name: "duplicate nodeports (different protocols)",
|
||||||
tweakSvc: func(s *api.Service) {
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeNodePort
|
||||||
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1})
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 1, Protocol: "TCP", NodePort: 1})
|
||||||
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "r", Port: 2, Protocol: "UDP", NodePort: 1})
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "r", Port: 2, Protocol: "UDP", NodePort: 1})
|
||||||
},
|
},
|
||||||
numErrs: 1, // TODO(justinsb): change to 0 when NodePorts enabled
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type - cluster",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeClusterIP
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type - nodeport",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeNodePort
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type - loadbalancer",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type loadbalancer 2 ports",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"})
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type loadbalancer with NodePort",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345})
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type=NodePort service with NodePort",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeNodePort
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345})
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type=NodePort service without NodePort",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeNodePort
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"})
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid cluster service without NodePort",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeClusterIP
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"})
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid cluster service with NodePort",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeClusterIP
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP", NodePort: 12345})
|
||||||
|
},
|
||||||
|
numErrs: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid public service with duplicate NodePort",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeNodePort
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p1", Port: 1, Protocol: "TCP", NodePort: 1})
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "p2", Port: 2, Protocol: "TCP", NodePort: 1})
|
||||||
|
},
|
||||||
|
numErrs: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid type=LoadBalancer",
|
||||||
|
tweakSvc: func(s *api.Service) {
|
||||||
|
s.Spec.Type = api.ServiceTypeLoadBalancer
|
||||||
|
s.Spec.Ports = append(s.Spec.Ports, api.ServicePort{Name: "q", Port: 12345, Protocol: "TCP"})
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2511,6 +2599,13 @@ func TestValidateServiceUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
numErrs: 1,
|
numErrs: 1,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "change type -> nodeport",
|
||||||
|
tweakSvc: func(oldSvc, newSvc *api.Service) {
|
||||||
|
newSvc.Spec.Type = api.ServiceTypeNodePort
|
||||||
|
},
|
||||||
|
numErrs: 0,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
@ -145,7 +145,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
// Services do not have external load balancers: no calls should be made.
|
// Services do not have external load balancers: no calls should be made.
|
||||||
services: []*api.Service{
|
services: []*api.Service{
|
||||||
newService("s0", "111", api.ServiceTypeClusterIP),
|
newService("s0", "111", api.ServiceTypeClusterIP),
|
||||||
newService("s1", "222", api.ServiceTypeClusterIP),
|
newService("s1", "222", api.ServiceTypeNodePort),
|
||||||
},
|
},
|
||||||
expectedUpdateCalls: nil,
|
expectedUpdateCalls: nil,
|
||||||
},
|
},
|
||||||
@ -174,7 +174,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
|
|||||||
{
|
{
|
||||||
// Two services have an external load balancer and two don't: two calls.
|
// Two services have an external load balancer and two don't: two calls.
|
||||||
services: []*api.Service{
|
services: []*api.Service{
|
||||||
newService("s0", "777", api.ServiceTypeClusterIP),
|
newService("s0", "777", api.ServiceTypeNodePort),
|
||||||
newService("s1", "888", api.ServiceTypeLoadBalancer),
|
newService("s1", "888", api.ServiceTypeLoadBalancer),
|
||||||
newService("s3", "999", api.ServiceTypeLoadBalancer),
|
newService("s3", "999", api.ServiceTypeLoadBalancer),
|
||||||
newService("s4", "123", api.ServiceTypeClusterIP),
|
newService("s4", "123", api.ServiceTypeClusterIP),
|
||||||
|
@ -180,6 +180,67 @@ func TestGenerateService(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
params: map[string]string{
|
||||||
|
"selector": "foo=bar,baz=blah",
|
||||||
|
"name": "test",
|
||||||
|
"port": "80",
|
||||||
|
"protocol": "UDP",
|
||||||
|
"container-port": "foobar",
|
||||||
|
"type": string(api.ServiceTypeNodePort),
|
||||||
|
},
|
||||||
|
expected: api.Service{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "test",
|
||||||
|
},
|
||||||
|
Spec: api.ServiceSpec{
|
||||||
|
Selector: map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
"baz": "blah",
|
||||||
|
},
|
||||||
|
Ports: []api.ServicePort{
|
||||||
|
{
|
||||||
|
Name: "default",
|
||||||
|
Port: 80,
|
||||||
|
Protocol: "UDP",
|
||||||
|
TargetPort: util.NewIntOrStringFromString("foobar"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Type: api.ServiceTypeNodePort,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
params: map[string]string{
|
||||||
|
"selector": "foo=bar,baz=blah",
|
||||||
|
"name": "test",
|
||||||
|
"port": "80",
|
||||||
|
"protocol": "UDP",
|
||||||
|
"container-port": "foobar",
|
||||||
|
"create-external-load-balancer": "true", // ignored when type is present
|
||||||
|
"type": string(api.ServiceTypeNodePort),
|
||||||
|
},
|
||||||
|
expected: api.Service{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "test",
|
||||||
|
},
|
||||||
|
Spec: api.ServiceSpec{
|
||||||
|
Selector: map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
"baz": "blah",
|
||||||
|
},
|
||||||
|
Ports: []api.ServicePort{
|
||||||
|
{
|
||||||
|
Name: "default",
|
||||||
|
Port: 80,
|
||||||
|
Protocol: "UDP",
|
||||||
|
TargetPort: util.NewIntOrStringFromString("foobar"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Type: api.ServiceTypeNodePort,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
generator := ServiceGenerator{}
|
generator := ServiceGenerator{}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
@ -287,10 +287,9 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
|
|||||||
}
|
}
|
||||||
info.portalIP = serviceIP
|
info.portalIP = serviceIP
|
||||||
info.portalPort = servicePort.Port
|
info.portalPort = servicePort.Port
|
||||||
// TODO(justinsb): switch to servicePort.NodePort when that lands
|
|
||||||
info.nodePort = 0
|
|
||||||
// Deep-copy in case the service instance changes
|
// Deep-copy in case the service instance changes
|
||||||
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
|
||||||
|
info.nodePort = servicePort.NodePort
|
||||||
info.sessionAffinityType = service.Spec.SessionAffinity
|
info.sessionAffinityType = service.Spec.SessionAffinity
|
||||||
glog.V(4).Infof("info: %+v", info)
|
glog.V(4).Infof("info: %+v", info)
|
||||||
|
|
||||||
@ -319,8 +318,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
|
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
|
||||||
// TODO(justinsb): switch to port.NodePort when that lands
|
if info.protocol != port.Protocol || info.portalPort != port.Port || info.nodePort != port.NodePort {
|
||||||
if info.protocol != port.Protocol || info.portalPort != port.Port || info.nodePort != 0 /*port.NodePort*/ {
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if !info.portalIP.Equal(net.ParseIP(service.Spec.PortalIP)) {
|
if !info.portalIP.Equal(net.ParseIP(service.Spec.PortalIP)) {
|
||||||
|
@ -75,10 +75,9 @@ func (c *Repair) RunOnce() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
r := portallocator.NewPortAllocator(c.portRange)
|
r := portallocator.NewPortAllocator(c.portRange)
|
||||||
for _, svc := range list.Items {
|
for i := range list.Items {
|
||||||
ports := []int{}
|
svc := &list.Items[i]
|
||||||
|
ports := service.CollectServiceNodePorts(svc)
|
||||||
// TODO(justinsb): Collect NodePorts
|
|
||||||
if len(ports) == 0 {
|
if len(ports) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -153,7 +153,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
|
|||||||
rs.portals.Release(net.ParseIP(service.Spec.PortalIP))
|
rs.portals.Release(net.ParseIP(service.Spec.PortalIP))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, nodePort := range collectServiceNodePorts(service) {
|
for _, nodePort := range CollectServiceNodePorts(service) {
|
||||||
err := rs.serviceNodePorts.Release(nodePort)
|
err := rs.serviceNodePorts.Release(nodePort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// these should be caught by an eventual reconciliation / restart
|
// these should be caught by an eventual reconciliation / restart
|
||||||
@ -223,7 +223,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
|
|||||||
|
|
||||||
assignNodePorts := shouldAssignNodePorts(service)
|
assignNodePorts := shouldAssignNodePorts(service)
|
||||||
|
|
||||||
oldNodePorts := collectServiceNodePorts(oldService)
|
oldNodePorts := CollectServiceNodePorts(oldService)
|
||||||
|
|
||||||
newNodePorts := []int{}
|
newNodePorts := []int{}
|
||||||
if assignNodePorts {
|
if assignNodePorts {
|
||||||
@ -328,7 +328,7 @@ func contains(haystack []int, needle int) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func collectServiceNodePorts(service *api.Service) []int {
|
func CollectServiceNodePorts(service *api.Service) []int {
|
||||||
servicePorts := []int{}
|
servicePorts := []int{}
|
||||||
for i := range service.Spec.Ports {
|
for i := range service.Spec.Ports {
|
||||||
servicePort := &service.Spec.Ports[i]
|
servicePort := &service.Spec.Ports[i]
|
||||||
@ -340,17 +340,15 @@ func collectServiceNodePorts(service *api.Service) []int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func shouldAssignNodePorts(service *api.Service) bool {
|
func shouldAssignNodePorts(service *api.Service) bool {
|
||||||
// TODO(justinsb): Switch on service.Spec.Type
|
switch service.Spec.Type {
|
||||||
// switch service.Spec.Type {
|
case api.ServiceTypeLoadBalancer:
|
||||||
// case api.ServiceVisibilityLoadBalancer:
|
return true
|
||||||
// return true
|
case api.ServiceTypeNodePort:
|
||||||
// case api.ServiceVisibilityNodePort:
|
return true
|
||||||
// return true
|
case api.ServiceTypeClusterIP:
|
||||||
// case api.ServiceVisibilityCluster:
|
return false
|
||||||
// return false
|
default:
|
||||||
// default:
|
glog.Errorf("Unknown service type: %v", service.Spec.Type)
|
||||||
// glog.Errorf("Unknown visibility value: %v", service.Spec.Visibility)
|
return false
|
||||||
// return false
|
}
|
||||||
// }
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,8 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
@ -351,6 +353,103 @@ var _ = Describe("Services", func() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should be able to create a functioning NodePort service", func() {
|
||||||
|
serviceName := "nodeportservice-test"
|
||||||
|
ns := namespaces[0]
|
||||||
|
labels := map[string]string{
|
||||||
|
"testid": "nodeportservice-test",
|
||||||
|
}
|
||||||
|
service := &api.Service{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: serviceName,
|
||||||
|
},
|
||||||
|
Spec: api.ServiceSpec{
|
||||||
|
Selector: labels,
|
||||||
|
Ports: []api.ServicePort{{
|
||||||
|
Port: 80,
|
||||||
|
TargetPort: util.NewIntOrStringFromInt(80),
|
||||||
|
}},
|
||||||
|
Type: api.ServiceTypeNodePort,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
By("creating service " + serviceName + " with visibility=NodePort in namespace " + ns)
|
||||||
|
result, err := c.Services(ns).Create(service)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
defer func(ns, serviceName string) { // clean up when we're done
|
||||||
|
By("deleting service " + serviceName + " in namespace " + ns)
|
||||||
|
err := c.Services(ns).Delete(serviceName)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
}(ns, serviceName)
|
||||||
|
|
||||||
|
if len(result.Spec.Ports) != 1 {
|
||||||
|
Failf("got unexpected number (%d) of Ports for NodePort service: %v", len(result.Spec.Ports), result)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodePort := result.Spec.Ports[0].NodePort
|
||||||
|
if nodePort == 0 {
|
||||||
|
Failf("got unexpected nodePort (%d) on Ports[0] for NodePort service: %v", nodePort, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
publicIps, err := getMinionPublicIps(c)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if len(publicIps) == 0 {
|
||||||
|
Failf("got unexpected number (%d) of public IPs", len(publicIps))
|
||||||
|
}
|
||||||
|
ip := publicIps[0]
|
||||||
|
|
||||||
|
pod := &api.Pod{
|
||||||
|
TypeMeta: api.TypeMeta{
|
||||||
|
Kind: "Pod",
|
||||||
|
APIVersion: latest.Version,
|
||||||
|
},
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "publicservice-test-" + string(util.NewUUID()),
|
||||||
|
Labels: labels,
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{
|
||||||
|
Name: "webserver",
|
||||||
|
Image: "gcr.io/google_containers/test-webserver",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
By("creating pod to be part of service " + serviceName)
|
||||||
|
podClient := c.Pods(ns)
|
||||||
|
defer func() {
|
||||||
|
By("deleting pod " + pod.Name)
|
||||||
|
defer GinkgoRecover()
|
||||||
|
podClient.Delete(pod.Name, nil)
|
||||||
|
}()
|
||||||
|
if _, err := podClient.Create(pod); err != nil {
|
||||||
|
Failf("Failed to create pod %s: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
expectNoError(waitForPodRunningInNamespace(c, pod.Name, ns))
|
||||||
|
|
||||||
|
By("hitting the pod through the service's external IPs")
|
||||||
|
var resp *http.Response
|
||||||
|
for t := time.Now(); time.Since(t) < podStartTimeout; time.Sleep(5 * time.Second) {
|
||||||
|
resp, err = http.Get(fmt.Sprintf("http://%s:%d", ip, nodePort))
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
Failf("received non-success return status %q trying to access pod through public port; got body: %s", resp.Status, string(body))
|
||||||
|
}
|
||||||
|
if !strings.Contains(string(body), "test-webserver") {
|
||||||
|
Failf("received response body without expected substring 'test-webserver': %s", string(body))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
It("should correctly serve identically named services in different namespaces on different external IP addresses", func() {
|
It("should correctly serve identically named services in different namespaces on different external IP addresses", func() {
|
||||||
if !providerIs("gce", "gke") {
|
if !providerIs("gce", "gke") {
|
||||||
By(fmt.Sprintf("Skipping service namespace collision test; uses ServiceTypeLoadBalancer, a (gce|gke) feature"))
|
By(fmt.Sprintf("Skipping service namespace collision test; uses ServiceTypeLoadBalancer, a (gce|gke) feature"))
|
||||||
@ -534,3 +633,30 @@ func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]s
|
|||||||
_, err := c.Pods(ns).Create(pod)
|
_, err := c.Pods(ns).Create(pod)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func collectAddresses(nodes *api.NodeList, addressType api.NodeAddressType) []string {
|
||||||
|
ips := []string{}
|
||||||
|
for i := range nodes.Items {
|
||||||
|
item := &nodes.Items[i]
|
||||||
|
for j := range item.Status.Addresses {
|
||||||
|
nodeAddress := &item.Status.Addresses[j]
|
||||||
|
if nodeAddress.Type == addressType {
|
||||||
|
ips = append(ips, nodeAddress.Address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ips
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMinionPublicIps(c *client.Client) ([]string, error) {
|
||||||
|
nodes, err := c.Nodes().List(labels.Everything(), fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ips := collectAddresses(nodes, api.NodeExternalIP)
|
||||||
|
if len(ips) == 0 {
|
||||||
|
ips = collectAddresses(nodes, api.NodeLegacyHostIP)
|
||||||
|
}
|
||||||
|
return ips, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user