mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Changes according to the approved KEP. SCTP is supported for HostPort and LoadBalancer. Alpha feature flag SCTPSupport controls the support of SCTP. Kube-proxy config parameter is removed.
This commit is contained in:
parent
a6da2b1472
commit
e466bdc67e
@ -166,7 +166,6 @@ func newProxyServer(
|
||||
recorder,
|
||||
healthzUpdater,
|
||||
config.NodePortAddresses,
|
||||
config.SCTPUserSpaceNode,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
@ -206,7 +205,6 @@ func newProxyServer(
|
||||
healthzServer,
|
||||
config.IPVS.Scheduler,
|
||||
config.NodePortAddresses,
|
||||
config.SCTPUserSpaceNode,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
|
@ -211,7 +211,6 @@ udpIdleTimeout: 123ms
|
||||
nodePortAddresses:
|
||||
- "10.20.30.40/16"
|
||||
- "fd00:1::0/64"
|
||||
sctpUserSpaceNode: false
|
||||
`
|
||||
|
||||
testCases := []struct {
|
||||
@ -326,7 +325,6 @@ sctpUserSpaceNode: false
|
||||
ResourceContainer: "/foo",
|
||||
UDPIdleTimeout: metav1.Duration{Duration: 123 * time.Millisecond},
|
||||
NodePortAddresses: []string{"10.20.30.40/16", "fd00:1::0/64"},
|
||||
SCTPUserSpaceNode: false,
|
||||
}
|
||||
|
||||
options := NewOptions()
|
||||
|
@ -61,7 +61,6 @@ ComponentConfigs:
|
||||
OOMScoreAdj: -999
|
||||
PortRange: ""
|
||||
ResourceContainer: /kube-proxy
|
||||
SCTPUserSpaceNode: false
|
||||
UDPIdleTimeout: 250ms
|
||||
Kubelet:
|
||||
Address: 1.2.3.4
|
||||
|
@ -64,7 +64,6 @@ kubeProxy:
|
||||
oomScoreAdj: -999
|
||||
portRange: ""
|
||||
resourceContainer: /kube-proxy
|
||||
sctpUserSpaceNode: false
|
||||
udpIdleTimeout: 250ms
|
||||
kubeletConfiguration:
|
||||
baseConfig:
|
||||
|
@ -80,7 +80,6 @@ nodePortAddresses: null
|
||||
oomScoreAdj: -999
|
||||
portRange: ""
|
||||
resourceContainer: /kube-proxy
|
||||
sctpUserSpaceNode: false
|
||||
udpIdleTimeout: 250ms
|
||||
---
|
||||
address: 1.2.3.4
|
||||
|
@ -75,7 +75,6 @@ nodePortAddresses: null
|
||||
oomScoreAdj: -999
|
||||
portRange: ""
|
||||
resourceContainer: /kube-proxy
|
||||
sctpUserSpaceNode: false
|
||||
udpIdleTimeout: 250ms
|
||||
---
|
||||
address: 0.0.0.0
|
||||
|
@ -1572,7 +1572,7 @@ type ContainerPort struct {
|
||||
HostPort int32
|
||||
// Required: This must be a valid port number, 0 < x < 65536.
|
||||
ContainerPort int32
|
||||
// Required: Supports "TCP" and "UDP". "SCTP" is supported only if no HostPort is specified.
|
||||
// Required: Supports "TCP", "UDP" and "SCTP"
|
||||
// +optional
|
||||
Protocol Protocol
|
||||
// Optional: What host IP to bind the external port to.
|
||||
@ -3177,7 +3177,7 @@ type ServicePort struct {
|
||||
// the 'Name' field in EndpointPort objects.
|
||||
Name string
|
||||
|
||||
// The IP protocol for this port. Supports "TCP", "UDP", and SCTP.
|
||||
// The IP protocol for this port. Supports "TCP", "UDP", and "SCTP".
|
||||
Protocol Protocol
|
||||
|
||||
// The port that will be exposed on the service.
|
||||
|
@ -1918,10 +1918,7 @@ func ValidatePersistentVolumeClaimStatusUpdate(newPvc, oldPvc *core.PersistentVo
|
||||
return allErrs
|
||||
}
|
||||
|
||||
var supportedServicePortProtocols = sets.NewString(string(core.ProtocolTCP), string(core.ProtocolUDP), string(core.ProtocolSCTP))
|
||||
var supportedLoadBalancerProtocols = sets.NewString(string(core.ProtocolTCP), string(core.ProtocolUDP))
|
||||
var supportedContainerPortProtocols = sets.NewString(string(core.ProtocolTCP), string(core.ProtocolUDP), string(core.ProtocolSCTP))
|
||||
var supportedHostPortProtocols = sets.NewString(string(core.ProtocolTCP), string(core.ProtocolUDP))
|
||||
var supportedPortProtocols = sets.NewString(string(core.ProtocolTCP), string(core.ProtocolUDP), string(core.ProtocolSCTP))
|
||||
|
||||
func validateContainerPorts(ports []core.ContainerPort, fldPath *field.Path) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
@ -1954,12 +1951,10 @@ func validateContainerPorts(ports []core.ContainerPort, fldPath *field.Path) fie
|
||||
}
|
||||
if len(port.Protocol) == 0 {
|
||||
allErrs = append(allErrs, field.Required(idxPath.Child("protocol"), ""))
|
||||
} else if port.HostPort != 0 {
|
||||
if !supportedHostPortProtocols.Has(string(port.Protocol)) {
|
||||
allErrs = append(allErrs, field.NotSupported(idxPath.Child("protocol"), port.Protocol, supportedHostPortProtocols.List()))
|
||||
}
|
||||
} else if !supportedContainerPortProtocols.Has(string(port.Protocol)) {
|
||||
allErrs = append(allErrs, field.NotSupported(idxPath.Child("protocol"), port.Protocol, supportedContainerPortProtocols.List()))
|
||||
} else if !utilfeature.DefaultFeatureGate.Enabled(features.SCTPSupport) && port.Protocol == core.ProtocolSCTP {
|
||||
allErrs = append(allErrs, field.NotSupported(idxPath.Child("protocol"), port.Protocol, []string{string(core.ProtocolTCP), string(core.ProtocolUDP)}))
|
||||
} else if !supportedPortProtocols.Has(string(port.Protocol)) {
|
||||
allErrs = append(allErrs, field.NotSupported(idxPath.Child("protocol"), port.Protocol, supportedPortProtocols.List()))
|
||||
}
|
||||
}
|
||||
return allErrs
|
||||
@ -3733,8 +3728,10 @@ func ValidateService(service *core.Service) field.ErrorList {
|
||||
includeProtocols := sets.NewString()
|
||||
for i := range service.Spec.Ports {
|
||||
portPath := portsPath.Index(i)
|
||||
if !supportedLoadBalancerProtocols.Has(string(service.Spec.Ports[i].Protocol)) {
|
||||
allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports"))
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.SCTPSupport) && service.Spec.Ports[i].Protocol == core.ProtocolSCTP {
|
||||
allErrs = append(allErrs, field.NotSupported(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, []string{string(core.ProtocolTCP), string(core.ProtocolUDP)}))
|
||||
} else if !supportedPortProtocols.Has(string(service.Spec.Ports[i].Protocol)) {
|
||||
allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP/SCTP ports"))
|
||||
} else {
|
||||
includeProtocols.Insert(string(service.Spec.Ports[i].Protocol))
|
||||
}
|
||||
@ -3829,11 +3826,13 @@ func validateServicePort(sp *core.ServicePort, requireName, isHeadlessService bo
|
||||
for _, msg := range validation.IsValidPortNum(int(sp.Port)) {
|
||||
allErrs = append(allErrs, field.Invalid(fldPath.Child("port"), sp.Port, msg))
|
||||
}
|
||||
|
||||
|
||||
if len(sp.Protocol) == 0 {
|
||||
allErrs = append(allErrs, field.Required(fldPath.Child("protocol"), ""))
|
||||
} else if !supportedServicePortProtocols.Has(string(sp.Protocol)) {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("protocol"), sp.Protocol, supportedServicePortProtocols.List()))
|
||||
} else if !utilfeature.DefaultFeatureGate.Enabled(features.SCTPSupport) && sp.Protocol == core.ProtocolSCTP {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("protocol"), sp.Protocol, []string{string(core.ProtocolTCP), string(core.ProtocolUDP)}))
|
||||
} else if !supportedPortProtocols.Has(string(sp.Protocol)) {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("protocol"), sp.Protocol, supportedPortProtocols.List()))
|
||||
}
|
||||
|
||||
allErrs = append(allErrs, ValidatePortNumOrName(sp.TargetPort, fldPath.Child("targetPort"))...)
|
||||
@ -5201,8 +5200,10 @@ func validateEndpointPort(port *core.EndpointPort, requireName bool, fldPath *fi
|
||||
}
|
||||
if len(port.Protocol) == 0 {
|
||||
allErrs = append(allErrs, field.Required(fldPath.Child("protocol"), ""))
|
||||
} else if !supportedServicePortProtocols.Has(string(port.Protocol)) {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("protocol"), port.Protocol, supportedServicePortProtocols.List()))
|
||||
} else if !utilfeature.DefaultFeatureGate.Enabled(features.SCTPSupport) && port.Protocol == core.ProtocolSCTP {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("protocol"), port.Protocol, []string{string(core.ProtocolTCP), string(core.ProtocolUDP)}))
|
||||
} else if !supportedPortProtocols.Has(string(port.Protocol)) {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("protocol"), port.Protocol, supportedPortProtocols.List()))
|
||||
}
|
||||
return allErrs
|
||||
}
|
||||
|
@ -4060,6 +4060,7 @@ func TestValidateResourceQuotaWithAlphaLocalStorageCapacityIsolation(t *testing.
|
||||
}
|
||||
|
||||
func TestValidatePorts(t *testing.T) {
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SCTPSupport, true)()
|
||||
successCase := []core.ContainerPort{
|
||||
{Name: "abc", ContainerPort: 80, HostPort: 80, Protocol: "TCP"},
|
||||
{Name: "easy", ContainerPort: 82, Protocol: "TCP"},
|
||||
@ -8590,6 +8591,8 @@ func makeValidService() core.Service {
|
||||
}
|
||||
|
||||
func TestValidateService(t *testing.T) {
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SCTPSupport, true)()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tweakSvc func(svc *core.Service) // given a basic valid service, each test case can customize it
|
||||
|
@ -26,6 +26,8 @@ import (
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
apivalidation "k8s.io/kubernetes/pkg/apis/core/validation"
|
||||
"k8s.io/kubernetes/pkg/apis/networking"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
// ValidateNetworkPolicyName can be used to check whether the given networkpolicy
|
||||
@ -37,9 +39,12 @@ func ValidateNetworkPolicyName(name string, prefix bool) []string {
|
||||
// ValidateNetworkPolicyPort validates a NetworkPolicyPort
|
||||
func ValidateNetworkPolicyPort(port *networking.NetworkPolicyPort, portPath *field.Path) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
|
||||
if port.Protocol != nil && *port.Protocol != api.ProtocolTCP && *port.Protocol != api.ProtocolUDP && *port.Protocol != api.ProtocolSCTP {
|
||||
allErrs = append(allErrs, field.NotSupported(portPath.Child("protocol"), *port.Protocol, []string{string(api.ProtocolTCP), string(api.ProtocolUDP), string(api.ProtocolSCTP)}))
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.SCTPSupport) {
|
||||
if port.Protocol != nil && *port.Protocol != api.ProtocolTCP && *port.Protocol != api.ProtocolUDP && *port.Protocol != api.ProtocolSCTP {
|
||||
allErrs = append(allErrs, field.NotSupported(portPath.Child("protocol"), *port.Protocol, []string{string(api.ProtocolTCP), string(api.ProtocolUDP), string(api.ProtocolSCTP)}))
|
||||
}
|
||||
} else if port.Protocol != nil && *port.Protocol != api.ProtocolTCP && *port.Protocol != api.ProtocolUDP {
|
||||
allErrs = append(allErrs, field.NotSupported(portPath.Child("protocol"), *port.Protocol, []string{string(api.ProtocolTCP), string(api.ProtocolUDP)}))
|
||||
}
|
||||
if port.Port != nil {
|
||||
if port.Port.Type == intstr.Int {
|
||||
|
@ -23,6 +23,9 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/apis/networking"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
func TestValidateNetworkPolicy(t *testing.T) {
|
||||
@ -30,6 +33,8 @@ func TestValidateNetworkPolicy(t *testing.T) {
|
||||
protocolUDP := api.ProtocolUDP
|
||||
protocolICMP := api.Protocol("ICMP")
|
||||
protocolSCTP := api.ProtocolSCTP
|
||||
|
||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SCTPSupport, true)()
|
||||
|
||||
successCases := []networking.NetworkPolicy{
|
||||
{
|
||||
@ -279,6 +284,7 @@ func TestValidateNetworkPolicy(t *testing.T) {
|
||||
}
|
||||
|
||||
// Success cases are expected to pass validation.
|
||||
|
||||
for k, v := range successCases {
|
||||
if errs := ValidateNetworkPolicy(&v); len(errs) != 0 {
|
||||
t.Errorf("Expected success for %d, got %v", k, errs)
|
||||
|
@ -357,6 +357,12 @@ const (
|
||||
// Kubelet uses the new Lease API to report node heartbeats,
|
||||
// (Kube) Node Lifecycle Controller uses these heartbeats as a node health signal.
|
||||
NodeLease utilfeature.Feature = "NodeLease"
|
||||
|
||||
// owner: @janosi
|
||||
// alpha: v1.12
|
||||
//
|
||||
// Enables SCTP as new protocol for Service ports, NetworkPolicy, and ContainerPort in Pod/Containers definition
|
||||
SCTPSupport utilfeature.Feature = "SCTPSupport"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -416,7 +422,11 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
|
||||
ResourceQuotaScopeSelectors: {Default: true, PreRelease: utilfeature.Beta},
|
||||
CSIBlockVolume: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
RuntimeClass: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
<<<<<<< HEAD
|
||||
NodeLease: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
=======
|
||||
SCTPSupport: {Default: false, PreRelease: utilfeature.Alpha},
|
||||
>>>>>>> Changes according to the approved KEP. SCTP is supported for HostPort and LoadBalancer. Alpha feature flag SCTPSupport controls the support of SCTP. Kube-proxy config parameter is removed.
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
// unintentionally on either side:
|
||||
|
@ -264,6 +264,12 @@ func (hm *hostportManager) openHostports(podPortMapping *PodPortMapping) (map[ho
|
||||
if pm.HostPort <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// We do not open host ports for SCTP ports, as we agreed in the Support of SCTP KEP
|
||||
if pm.Protocol == v1.ProtocolSCTP {
|
||||
continue
|
||||
}
|
||||
|
||||
hp := portMappingToHostport(pm)
|
||||
socket, err := hm.portOpener(&hp)
|
||||
if err != nil {
|
||||
|
@ -152,6 +152,11 @@ func TestOpenCloseHostports(t *testing.T) {
|
||||
{HostPort: 7070, Protocol: v1.Protocol("TCP")},
|
||||
},
|
||||
},
|
||||
{
|
||||
portMappings: []*PortMapping{
|
||||
{HostPort: 7777, Protocol: v1.Protocol("SCTP")},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range closePortCases {
|
||||
@ -197,6 +202,11 @@ func TestHostportManager(t *testing.T) {
|
||||
ContainerPort: 81,
|
||||
Protocol: v1.ProtocolUDP,
|
||||
},
|
||||
{
|
||||
HostPort: 8083,
|
||||
ContainerPort: 83,
|
||||
Protocol: v1.ProtocolSCTP,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: false,
|
||||
@ -218,6 +228,11 @@ func TestHostportManager(t *testing.T) {
|
||||
ContainerPort: 81,
|
||||
Protocol: v1.ProtocolUDP,
|
||||
},
|
||||
{
|
||||
HostPort: 8083,
|
||||
ContainerPort: 83,
|
||||
Protocol: v1.ProtocolSCTP,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectError: true,
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
"k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
// HostportSyncer takes a list of PodPortMappings and implements hostport all at once
|
||||
@ -74,6 +75,12 @@ func (h *hostportSyncer) openHostports(podHostportMapping *PodPortMapping) error
|
||||
// Assume hostport is not specified in this portmapping. So skip
|
||||
continue
|
||||
}
|
||||
|
||||
// We do not open host ports for SCTP ports, as we agreed in the Support of SCTP KEP
|
||||
if port.Protocol == v1.ProtocolSCTP {
|
||||
continue
|
||||
}
|
||||
|
||||
hp := hostport{
|
||||
port: port.HostPort,
|
||||
protocol: strings.ToLower(string(port.Protocol)),
|
||||
|
@ -94,7 +94,6 @@ func NewHollowProxyOrDie(
|
||||
recorder,
|
||||
nil,
|
||||
[]string{},
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
|
@ -155,12 +155,6 @@ type KubeProxyConfiguration struct {
|
||||
// If set it to a non-zero IP block, kube-proxy will filter that down to just the IPs that applied to the node.
|
||||
// An empty string slice is meant to select all network interfaces.
|
||||
NodePortAddresses []string
|
||||
// sctpUserSpaceNode is to enable the deployment of applications that use a userspace SCTP protocol implementation.
|
||||
// If set to "true" the kube-proxy does not start listening on the host node on the SCTP ports of Services
|
||||
// specified with externalIP or type=NodePort.
|
||||
// If set to "false" kube-proxy listens on the SCTP ports of Services specified with externalIP or type=NodePort.
|
||||
// Default value is "false".
|
||||
SCTPUserSpaceNode bool
|
||||
}
|
||||
|
||||
// Currently, three modes of proxy are available in Linux platform: 'userspace' (older, going to be EOL), 'iptables'
|
||||
|
@ -151,12 +151,6 @@ type KubeProxyConfiguration struct {
|
||||
// If set it to a non-zero IP block, kube-proxy will filter that down to just the IPs that applied to the node.
|
||||
// An empty string slice is meant to select all network interfaces.
|
||||
NodePortAddresses []string `json:"nodePortAddresses"`
|
||||
// sctpUserSpaceNode is to enable the deployment of applications that use a userspace SCTP protocol implementation.
|
||||
// If set to "true" the kube-proxy does not start listening on the host node on the SCTP ports of Services
|
||||
// specified with externalIP or type=NodePort.
|
||||
// If set to "false" kube-proxy listens on the SCTP ports of Services specified with externalIP or type=NodePort.
|
||||
// Default value is "false".
|
||||
SCTPUserSpaceNode bool `json:"sctpUserSpaceNode"`
|
||||
}
|
||||
|
||||
// Currently, three modes of proxy are available in Linux platform: 'userspace' (older, going to be EOL), 'iptables'
|
||||
|
@ -144,7 +144,6 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_kubeproxyconfig_KubeProxyCon
|
||||
}
|
||||
out.ConfigSyncPeriod = in.ConfigSyncPeriod
|
||||
out.NodePortAddresses = *(*[]string)(unsafe.Pointer(&in.NodePortAddresses))
|
||||
out.SCTPUserSpaceNode = in.SCTPUserSpaceNode
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -180,7 +179,6 @@ func autoConvert_kubeproxyconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
|
||||
}
|
||||
out.ConfigSyncPeriod = in.ConfigSyncPeriod
|
||||
out.NodePortAddresses = *(*[]string)(unsafe.Pointer(&in.NodePortAddresses))
|
||||
out.SCTPUserSpaceNode = in.SCTPUserSpaceNode
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/ishidawataru/sctp"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -260,10 +259,6 @@ type Proxier struct {
|
||||
// networkInterfacer defines an interface for several net library functions.
|
||||
// Inject for test purpose.
|
||||
networkInterfacer utilproxy.NetworkInterfacer
|
||||
|
||||
// Indicates whether a node is dedicated for applications that use userspace
|
||||
// SCTP stack.
|
||||
sctpUserSpaceNode bool
|
||||
}
|
||||
|
||||
// listenPortOpener opens ports by calling bind() and listen().
|
||||
@ -295,7 +290,6 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
recorder record.EventRecorder,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
nodePortAddresses []string,
|
||||
sctpUserSpaceNode bool,
|
||||
) (*Proxier, error) {
|
||||
// Set the route_localnet sysctl we need for
|
||||
if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
|
||||
@ -352,7 +346,6 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
nodePortAddresses: nodePortAddresses,
|
||||
networkInterfacer: utilproxy.RealNetwork{},
|
||||
sctpUserSpaceNode: sctpUserSpaceNode,
|
||||
}
|
||||
burstSyncs := 2
|
||||
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
|
||||
@ -852,15 +845,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// If the "external" IP happens to be an IP that is local to this
|
||||
// machine, hold the local port open so no other process can open it
|
||||
// (because the socket might open but it would never work).
|
||||
// Exception: if the node is dedicated for applications that use
|
||||
// userspace SCTP stack we must not start listening on the local port
|
||||
// in the kernel because that would load the SCTP kernel module, and
|
||||
// there are interworking issues between the SCTP kernel module and
|
||||
// userspace SCTP stacks.
|
||||
if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
|
||||
glog.Errorf("can't determine if IP is local, assuming not: %v", err)
|
||||
} else if local && (svcInfo.GetProtocol() != api.ProtocolSCTP || !proxier.sctpUserSpaceNode) {
|
||||
|
||||
} else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) {
|
||||
lp := utilproxy.LocalPort{
|
||||
Description: "externalIP for " + svcNameString,
|
||||
IP: externalIP,
|
||||
@ -1029,7 +1016,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if proxier.portsMap[lp] != nil {
|
||||
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
|
||||
replacementPortsMap[lp] = proxier.portsMap[lp]
|
||||
} else if svcInfo.GetProtocol() != api.ProtocolSCTP || !proxier.sctpUserSpaceNode {
|
||||
} else if svcInfo.GetProtocol() != v1.ProtocolSCTP {
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp)
|
||||
if err != nil {
|
||||
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
|
||||
@ -1432,18 +1419,6 @@ func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
|
||||
return nil, err
|
||||
}
|
||||
socket = conn
|
||||
case "sctp":
|
||||
// There is not any golang/net way to bind or listen on an SCTP socket.
|
||||
// We have to use a 3rd part lib - the same which is used in Docker
|
||||
addr, err := sctp.ResolveSCTPAddr("sctp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := sctp.ListenSCTP("sctp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
socket = conn
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
|
||||
}
|
||||
|
@ -161,9 +161,9 @@ func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, prot
|
||||
|
||||
func TestDeleteEndpointConnections(t *testing.T) {
|
||||
const (
|
||||
UDP = api.ProtocolUDP
|
||||
TCP = api.ProtocolTCP
|
||||
SCTP = api.ProtocolSCTP
|
||||
UDP = v1.ProtocolUDP
|
||||
TCP = v1.ProtocolTCP
|
||||
SCTP = v1.ProtocolSCTP
|
||||
)
|
||||
testCases := []struct {
|
||||
description string
|
||||
@ -394,7 +394,6 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
nodePortAddresses: make([]string, 0),
|
||||
networkInterfacer: utilproxytest.NewFakeNetwork(),
|
||||
sctpUserSpaceNode: false,
|
||||
}
|
||||
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
||||
return p
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/ishidawataru/sctp"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -234,7 +233,6 @@ type Proxier struct {
|
||||
// networkInterfacer defines an interface for several net library functions.
|
||||
// Inject for test purpose.
|
||||
networkInterfacer utilproxy.NetworkInterfacer
|
||||
sctpUserSpaceNode bool
|
||||
}
|
||||
|
||||
// IPGetter helps get node network interface IP
|
||||
@ -300,7 +298,6 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
healthzServer healthcheck.HealthzUpdater,
|
||||
scheduler string,
|
||||
nodePortAddresses []string,
|
||||
sctpUserSpaceNode bool,
|
||||
) (*Proxier, error) {
|
||||
// Set the route_localnet sysctl we need for
|
||||
if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
|
||||
@ -382,7 +379,6 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
ipset: ipset,
|
||||
nodePortAddresses: nodePortAddresses,
|
||||
networkInterfacer: utilproxy.RealNetwork{},
|
||||
sctpUserSpaceNode: sctpUserSpaceNode,
|
||||
}
|
||||
// initialize ipsetList with all sets we needed
|
||||
proxier.ipsetList = make(map[string]*IPSet)
|
||||
@ -815,7 +811,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
for _, externalIP := range svcInfo.ExternalIPs {
|
||||
if local, err := utilproxy.IsLocalIP(externalIP); err != nil {
|
||||
glog.Errorf("can't determine if IP is local, assuming not: %v", err)
|
||||
} else if local && (svcInfo.GetProtocol() != api.ProtocolSCTP || !proxier.sctpUserSpaceNode) {
|
||||
// We do not start listening on SCTP ports, according to our agreement in the
|
||||
// SCTP support KEP
|
||||
} else if local && (svcInfo.GetProtocol() != v1.ProtocolSCTP) {
|
||||
lp := utilproxy.LocalPort{
|
||||
Description: "externalIP for " + svcNameString,
|
||||
IP: externalIP,
|
||||
@ -1014,7 +1012,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if proxier.portsMap[lp] != nil {
|
||||
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
|
||||
replacementPortsMap[lp] = proxier.portsMap[lp]
|
||||
} else if svcInfo.GetProtocol() != api.ProtocolSCTP || !proxier.sctpUserSpaceNode {
|
||||
// We do not start listening on SCTP ports, according to our agreement in the
|
||||
// SCTP support KEP
|
||||
} else if svcInfo.GetProtocol() != v1.ProtocolSCTP {
|
||||
socket, err := proxier.portMapper.OpenLocalPort(&lp)
|
||||
if err != nil {
|
||||
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
|
||||
@ -1650,18 +1650,6 @@ func openLocalPort(lp *utilproxy.LocalPort) (utilproxy.Closeable, error) {
|
||||
return nil, err
|
||||
}
|
||||
socket = conn
|
||||
case "sctp":
|
||||
// SCTP is not supported by golang/net, or any other built-in lib,
|
||||
// so we have to manage SCTP via a 3rd party lib.
|
||||
sctpAddr, err := sctp.ResolveSCTPAddr("sctp", net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := sctp.ListenSCTP("sctp", sctpAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
socket = conn
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
|
||||
}
|
||||
|
@ -171,7 +171,6 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
||||
ipsetList: ipsetList,
|
||||
nodePortAddresses: make([]string, 0),
|
||||
networkInterfacer: proxyutiltest.NewFakeNetwork(),
|
||||
sctpUserSpaceNode: false,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1362,8 +1361,8 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar", "UDP", 8675, 30061, 7000)
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8676, 30062, 7001)
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpfoo", "SCTP", 8677, 30063, 7002)
|
||||
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
||||
Ingress: []api.LoadBalancerIngress{
|
||||
svc.Status.LoadBalancer = v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{
|
||||
{IP: "10.1.2.4"},
|
||||
},
|
||||
}
|
||||
@ -1375,8 +1374,8 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "foobar2", "UDP", 8677, 30063, 7002)
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "baz", "UDP", 8678, 30064, 7003)
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sctpbaz", "SCTP", 8679, 30065, 7004)
|
||||
svc.Status.LoadBalancer = api.LoadBalancerStatus{
|
||||
Ingress: []api.LoadBalancerIngress{
|
||||
svc.Status.LoadBalancer = v1.LoadBalancerStatus{
|
||||
Ingress: []v1.LoadBalancerIngress{
|
||||
{IP: "10.1.2.3"},
|
||||
},
|
||||
}
|
||||
@ -1460,9 +1459,9 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = v1.ClusterIPNone
|
||||
}),
|
||||
makeTestService("somewhere-else", "headless-sctp", func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = api.ClusterIPNone
|
||||
makeTestService("somewhere-else", "headless-sctp", func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = v1.ClusterIPNone
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 1235, 0, 0)
|
||||
}),
|
||||
)
|
||||
@ -2634,7 +2633,7 @@ func Test_syncService(t *testing.T) {
|
||||
// case 4, SCTP, old virtual server is same as new virtual server
|
||||
oldVirtualServer: &utilipvs.VirtualServer{
|
||||
Address: net.ParseIP("1.2.3.4"),
|
||||
Protocol: string(api.ProtocolSCTP),
|
||||
Protocol: string(v1.ProtocolSCTP),
|
||||
Port: 80,
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
@ -2642,7 +2641,7 @@ func Test_syncService(t *testing.T) {
|
||||
svcName: "foo",
|
||||
newVirtualServer: &utilipvs.VirtualServer{
|
||||
Address: net.ParseIP("1.2.3.4"),
|
||||
Protocol: string(api.ProtocolSCTP),
|
||||
Protocol: string(v1.ProtocolSCTP),
|
||||
Port: 80,
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
@ -2653,7 +2652,7 @@ func Test_syncService(t *testing.T) {
|
||||
// case 5, old virtual server is different from new virtual server
|
||||
oldVirtualServer: &utilipvs.VirtualServer{
|
||||
Address: net.ParseIP("1.2.3.4"),
|
||||
Protocol: string(api.ProtocolSCTP),
|
||||
Protocol: string(v1.ProtocolSCTP),
|
||||
Port: 8080,
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
@ -2661,7 +2660,7 @@ func Test_syncService(t *testing.T) {
|
||||
svcName: "bar",
|
||||
newVirtualServer: &utilipvs.VirtualServer{
|
||||
Address: net.ParseIP("1.2.3.4"),
|
||||
Protocol: string(api.ProtocolSCTP),
|
||||
Protocol: string(v1.ProtocolSCTP),
|
||||
Port: 8080,
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagPersistent,
|
||||
@ -2672,7 +2671,7 @@ func Test_syncService(t *testing.T) {
|
||||
// case 6, old virtual server is different from new virtual server
|
||||
oldVirtualServer: &utilipvs.VirtualServer{
|
||||
Address: net.ParseIP("1.2.3.4"),
|
||||
Protocol: string(api.ProtocolSCTP),
|
||||
Protocol: string(v1.ProtocolSCTP),
|
||||
Port: 8080,
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
@ -2680,7 +2679,7 @@ func Test_syncService(t *testing.T) {
|
||||
svcName: "bar",
|
||||
newVirtualServer: &utilipvs.VirtualServer{
|
||||
Address: net.ParseIP("1.2.3.4"),
|
||||
Protocol: string(api.ProtocolSCTP),
|
||||
Protocol: string(v1.ProtocolSCTP),
|
||||
Port: 8080,
|
||||
Scheduler: "wlc",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
@ -2693,7 +2692,7 @@ func Test_syncService(t *testing.T) {
|
||||
svcName: "baz",
|
||||
newVirtualServer: &utilipvs.VirtualServer{
|
||||
Address: net.ParseIP("1.2.3.4"),
|
||||
Protocol: string(api.ProtocolSCTP),
|
||||
Protocol: string(v1.ProtocolSCTP),
|
||||
Port: 53,
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
@ -2828,7 +2827,6 @@ func TestCleanLegacyService(t *testing.T) {
|
||||
nil,
|
||||
DefaultScheduler,
|
||||
make([]string, 0),
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
|
@ -116,9 +116,9 @@ func TestServiceToServiceMap(t *testing.T) {
|
||||
},
|
||||
{
|
||||
desc: "headless sctp service",
|
||||
service: makeTestService("ns2", "headless", func(svc *api.Service) {
|
||||
svc.Spec.Type = api.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = api.ClusterIPNone
|
||||
service: makeTestService("ns2", "headless", func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeClusterIP
|
||||
svc.Spec.ClusterIP = v1.ClusterIPNone
|
||||
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "sip", "SCTP", 7777, 0, 0)
|
||||
}),
|
||||
expected: map[ServicePortName]*BaseServiceInfo{},
|
||||
|
@ -1664,7 +1664,7 @@ type ContainerPort struct {
|
||||
// Number of port to expose on the pod's IP address.
|
||||
// This must be a valid port number, 0 < x < 65536.
|
||||
ContainerPort int32 `json:"containerPort" protobuf:"varint,3,opt,name=containerPort"`
|
||||
// Protocol for port. Must be UDP, TCP, or SCTP. "SCTP" is supported only if "HostPort" is not specified.
|
||||
// Protocol for port. Must be UDP, TCP, or SCTP.
|
||||
// Defaults to "TCP".
|
||||
// +optional
|
||||
Protocol Protocol `json:"protocol,omitempty" protobuf:"bytes,4,opt,name=protocol,casttype=Protocol"`
|
||||
|
Loading…
Reference in New Issue
Block a user