Merge pull request #96440 from robscott/endpointslice-pre-ga

Adding NodeName to EndpointSlice API, deprecation updates
This commit is contained in:
Kubernetes Prow Robot
2020-11-12 16:03:13 -08:00
committed by GitHub
32 changed files with 771 additions and 244 deletions

View File

@@ -57,12 +57,6 @@ type EndpointSlice struct {
type AddressType string
const (
// AddressTypeIP represents an IP Address.
// This address type has been deprecated and has been replaced by the IPv4
// and IPv6 adddress types. New resources with this address type will be
// considered invalid. This will be fully removed in 1.18.
// +deprecated
AddressTypeIP = AddressType("IP")
// AddressTypeIPv4 represents an IPv4 Address.
AddressTypeIPv4 = AddressType(api.IPv4Protocol)
// AddressTypeIPv6 represents an IPv6 Address.
@@ -105,8 +99,14 @@ type Endpoint struct {
// endpoint is located. This should match the corresponding node label.
// * topology.kubernetes.io/region: the value indicates the region where the
// endpoint is located. This should match the corresponding node label.
// This field is deprecated and will be removed in future api versions.
// +optional
Topology map[string]string
// nodeName represents the name of the Node hosting this endpoint. This can
// be used to determine endpoints local to a Node. This field can be enabled
// with the EndpointSliceNodeName feature gate.
// +optional
NodeName *string
}
// EndpointConditions represents the current condition of an endpoint.
@@ -118,15 +118,18 @@ type EndpointConditions struct {
// "true" for terminating endpoints.
Ready *bool
// serving is identical to ready except that it is set regardless of the terminating
// state of endpoints. This condition should be set to true for a ready endpoint that
// is terminating. If nil, consumers should defer to the ready condition.
// serving is identical to ready except that it is set regardless of the
// terminating state of endpoints. This condition should be set to true for
// a ready endpoint that is terminating. If nil, consumers should defer to
// the ready condition. This field can be enabled with the
// EndpointSliceTerminatingCondition feature gate.
// +optional
Serving *bool
// terminating indicates that this endpoint is terminating. A nil value indicates an
// unknown state. Consumers should interpret this unknown state to mean that the
// endpoint is not terminating.
// terminating indicates that this endpoint is terminating. A nil value
// indicates an unknown state. Consumers should interpret this unknown state
// to mean that the endpoint is not terminating. This field can be enabled
// with the EndpointSliceTerminatingCondition feature gate.
// +optional
Terminating *bool
}

View File

@@ -99,6 +99,7 @@ func autoConvert_v1alpha1_Endpoint_To_discovery_Endpoint(in *v1alpha1.Endpoint,
out.Hostname = (*string)(unsafe.Pointer(in.Hostname))
out.TargetRef = (*core.ObjectReference)(unsafe.Pointer(in.TargetRef))
out.Topology = *(*map[string]string)(unsafe.Pointer(&in.Topology))
out.NodeName = (*string)(unsafe.Pointer(in.NodeName))
return nil
}
@@ -115,6 +116,7 @@ func autoConvert_discovery_Endpoint_To_v1alpha1_Endpoint(in *discovery.Endpoint,
out.Hostname = (*string)(unsafe.Pointer(in.Hostname))
out.TargetRef = (*v1.ObjectReference)(unsafe.Pointer(in.TargetRef))
out.Topology = *(*map[string]string)(unsafe.Pointer(&in.Topology))
out.NodeName = (*string)(unsafe.Pointer(in.NodeName))
return nil
}

View File

@@ -99,6 +99,7 @@ func autoConvert_v1beta1_Endpoint_To_discovery_Endpoint(in *v1beta1.Endpoint, ou
out.Hostname = (*string)(unsafe.Pointer(in.Hostname))
out.TargetRef = (*core.ObjectReference)(unsafe.Pointer(in.TargetRef))
out.Topology = *(*map[string]string)(unsafe.Pointer(&in.Topology))
out.NodeName = (*string)(unsafe.Pointer(in.NodeName))
return nil
}
@@ -115,6 +116,7 @@ func autoConvert_discovery_Endpoint_To_v1beta1_Endpoint(in *discovery.Endpoint,
out.Hostname = (*string)(unsafe.Pointer(in.Hostname))
out.TargetRef = (*v1.ObjectReference)(unsafe.Pointer(in.TargetRef))
out.Topology = *(*map[string]string)(unsafe.Pointer(&in.Topology))
out.NodeName = (*string)(unsafe.Pointer(in.NodeName))
return nil
}

View File

@@ -33,9 +33,6 @@ var (
string(discovery.AddressTypeIPv6),
string(discovery.AddressTypeFQDN),
)
deprecatedAddressTypes = sets.NewString(
string(discovery.AddressTypeIP),
)
supportedPortProtocols = sets.NewString(
string(api.ProtocolTCP),
string(api.ProtocolUDP),
@@ -53,9 +50,9 @@ var (
var ValidateEndpointSliceName = apimachineryvalidation.NameIsDNSSubdomain
// ValidateEndpointSlice validates an EndpointSlice.
func ValidateEndpointSlice(endpointSlice *discovery.EndpointSlice, validAddressTypes sets.String) field.ErrorList {
func ValidateEndpointSlice(endpointSlice *discovery.EndpointSlice) field.ErrorList {
allErrs := apivalidation.ValidateObjectMeta(&endpointSlice.ObjectMeta, true, ValidateEndpointSliceName, field.NewPath("metadata"))
allErrs = append(allErrs, validateAddressType(endpointSlice.AddressType, validAddressTypes)...)
allErrs = append(allErrs, validateAddressType(endpointSlice.AddressType)...)
allErrs = append(allErrs, validateEndpoints(endpointSlice.Endpoints, endpointSlice.AddressType, field.NewPath("endpoints"))...)
allErrs = append(allErrs, validatePorts(endpointSlice.Ports, field.NewPath("ports"))...)
@@ -64,12 +61,12 @@ func ValidateEndpointSlice(endpointSlice *discovery.EndpointSlice, validAddressT
// ValidateEndpointSliceCreate validates an EndpointSlice when it is created.
func ValidateEndpointSliceCreate(endpointSlice *discovery.EndpointSlice) field.ErrorList {
return ValidateEndpointSlice(endpointSlice, supportedAddressTypes)
return ValidateEndpointSlice(endpointSlice)
}
// ValidateEndpointSliceUpdate validates an EndpointSlice when it is updated.
func ValidateEndpointSliceUpdate(newEndpointSlice, oldEndpointSlice *discovery.EndpointSlice) field.ErrorList {
allErrs := ValidateEndpointSlice(newEndpointSlice, supportedAddressTypes.Union(deprecatedAddressTypes))
allErrs := ValidateEndpointSlice(newEndpointSlice)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newEndpointSlice.AddressType, oldEndpointSlice.AddressType, field.NewPath("addressType"))...)
return allErrs
@@ -97,10 +94,6 @@ func validateEndpoints(endpoints []discovery.Endpoint, addrType discovery.Addres
// This validates known address types, unknown types fall through
// and do not get validated.
switch addrType {
case discovery.AddressTypeIP:
for _, msg := range validation.IsValidIP(address) {
allErrs = append(allErrs, field.Invalid(addressPath.Index(i), address, msg))
}
case discovery.AddressTypeIPv4:
allErrs = append(allErrs, validation.IsValidIPv4Address(addressPath.Index(i), address)...)
case discovery.AddressTypeIPv6:
@@ -110,6 +103,13 @@ func validateEndpoints(endpoints []discovery.Endpoint, addrType discovery.Addres
}
}
if endpoint.NodeName != nil {
nnPath := idxPath.Child("nodeName")
for _, msg := range apivalidation.ValidateNodeName(*endpoint.NodeName, false) {
allErrs = append(allErrs, field.Invalid(nnPath, *endpoint.NodeName, msg))
}
}
topologyPath := idxPath.Child("topology")
if len(endpoint.Topology) > maxTopologyLabels {
allErrs = append(allErrs, field.TooMany(topologyPath, len(endpoint.Topology), maxTopologyLabels))
@@ -162,13 +162,13 @@ func validatePorts(endpointPorts []discovery.EndpointPort, fldPath *field.Path)
return allErrs
}
func validateAddressType(addressType discovery.AddressType, validAddressTypes sets.String) field.ErrorList {
func validateAddressType(addressType discovery.AddressType) field.ErrorList {
allErrs := field.ErrorList{}
if addressType == "" {
allErrs = append(allErrs, field.Required(field.NewPath("addressType"), ""))
} else if !validAddressTypes.Has(string(addressType)) {
allErrs = append(allErrs, field.NotSupported(field.NewPath("addressType"), addressType, validAddressTypes.List()))
} else if !supportedAddressTypes.Has(string(addressType)) {
allErrs = append(allErrs, field.NotSupported(field.NewPath("addressType"), addressType, supportedAddressTypes.List()))
}
return allErrs

View File

@@ -92,7 +92,7 @@ func TestValidateEndpointSlice(t *testing.T) {
expectedErrors: 0,
endpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("one"),
Protocol: protocolPtr(api.ProtocolTCP),
@@ -378,7 +378,7 @@ func TestValidateEndpointSlice(t *testing.T) {
expectedErrors: 1,
endpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Protocol: protocolPtr(api.ProtocolTCP),
@@ -438,7 +438,7 @@ func TestValidateEndpointSlice(t *testing.T) {
expectedErrors: 1,
endpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Protocol: protocolPtr(api.ProtocolTCP),
@@ -458,7 +458,7 @@ func TestValidateEndpointSlice(t *testing.T) {
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
errs := ValidateEndpointSlice(testCase.endpointSlice, supportedAddressTypes.Union(deprecatedAddressTypes))
errs := ValidateEndpointSlice(testCase.endpointSlice)
if len(errs) != testCase.expectedErrors {
t.Errorf("Expected %d errors, got %d errors: %v", testCase.expectedErrors, len(errs), errs)
}
@@ -473,8 +473,9 @@ func TestValidateEndpointSliceCreate(t *testing.T) {
}
testCases := map[string]struct {
expectedErrors int
endpointSlice *discovery.EndpointSlice
expectedErrors int
endpointSlice *discovery.EndpointSlice
nodeNameGateEnabled bool
}{
"good-slice": {
expectedErrors: 0,
@@ -491,13 +492,45 @@ func TestValidateEndpointSliceCreate(t *testing.T) {
}},
},
},
"good-slice-node-name": {
expectedErrors: 0,
endpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Protocol: protocolPtr(api.ProtocolTCP),
}},
Endpoints: []discovery.Endpoint{{
Addresses: generateIPAddresses(1),
Hostname: utilpointer.StringPtr("valid-123"),
NodeName: utilpointer.StringPtr("valid-node-name"),
}},
},
},
// expected failures
"bad-node-name": {
expectedErrors: 1,
endpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Protocol: protocolPtr(api.ProtocolTCP),
}},
Endpoints: []discovery.Endpoint{{
Addresses: generateIPAddresses(1),
Hostname: utilpointer.StringPtr("valid-123"),
NodeName: utilpointer.StringPtr("INvalid-node-name"),
}},
},
},
"deprecated-address-type": {
expectedErrors: 1,
endpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
AddressType: discovery.AddressType("IP"),
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Protocol: protocolPtr(api.ProtocolTCP),
@@ -537,56 +570,78 @@ func TestValidateEndpointSliceUpdate(t *testing.T) {
standardMeta := metav1.ObjectMeta{Name: "es1", Namespace: "test"}
testCases := map[string]struct {
expectedErrors int
newEndpointSlice *discovery.EndpointSlice
oldEndpointSlice *discovery.EndpointSlice
expectedErrors int
nodeNameGateEnabled bool
oldEndpointSlice *discovery.EndpointSlice
newEndpointSlice *discovery.EndpointSlice
}{
"valid and identical slices": {
newEndpointSlice: &discovery.EndpointSlice{
oldEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIPv6,
},
oldEndpointSlice: &discovery.EndpointSlice{
newEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIPv6,
},
expectedErrors: 0,
},
"deprecated address type": {
expectedErrors: 0,
newEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
},
// expected errors
"invalide node name set": {
oldEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
}},
},
newEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
NodeName: utilpointer.StringPtr("INVALID foo"),
}},
},
expectedErrors: 1,
},
"deprecated address type": {
expectedErrors: 1,
oldEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressType("IP"),
},
newEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressType("IP"),
},
},
"valid and identical slices with different address types": {
newEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
},
oldEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressType("other"),
},
newEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIPv4,
},
expectedErrors: 1,
},
"invalid slices with valid address types": {
oldEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIPv4,
},
newEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),
Protocol: protocolPtr(api.Protocol("invalid")),
}},
},
oldEndpointSlice: &discovery.EndpointSlice{
ObjectMeta: standardMeta,
AddressType: discovery.AddressTypeIP,
},
expectedErrors: 1,
},
}

View File

@@ -51,6 +51,11 @@ func (in *Endpoint) DeepCopyInto(out *Endpoint) {
(*out)[key] = val
}
}
if in.NodeName != nil {
in, out := &in.NodeName, &out.NodeName
*out = new(string)
**out = **in
}
return
}

View File

@@ -917,7 +917,8 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
namespace := "test"
svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
endpointMeta.AddressType = discovery.AddressTypeIP
// "IP" is a deprecated address type, ensuring that it is handled properly.
endpointMeta.AddressType = discovery.AddressType("IP")
existingSlices := []*discovery.EndpointSlice{}
pods := []*corev1.Pod{}

View File

@@ -87,6 +87,10 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service,
ep.Conditions.Terminating = &terminating
}
if pod.Spec.NodeName != "" && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceNodeName) {
ep.NodeName = &pod.Spec.NodeName
}
if endpointutil.ShouldSetHostname(pod, service) {
ep.Hostname = &pod.Spec.Hostname
}

View File

@@ -252,6 +252,7 @@ func TestPodToEndpoint(t *testing.T) {
expectedEndpoint discovery.Endpoint
publishNotReadyAddresses bool
terminatingGateEnabled bool
nodeNameGateEnabled bool
}{
{
name: "Ready pod",
@@ -321,6 +322,25 @@ func TestPodToEndpoint(t *testing.T) {
},
},
},
{
name: "Ready pod + node name gate enabled",
pod: readyPod,
svc: &svc,
nodeNameGateEnabled: true,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
NodeName: utilpointer.StringPtr("node-1"),
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: readyPod.Name,
UID: readyPod.UID,
ResourceVersion: readyPod.ResourceVersion,
},
},
},
{
name: "Ready pod + node labels",
pod: readyPod,
@@ -499,6 +519,7 @@ func TestPodToEndpoint(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testCase.terminatingGateEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceNodeName, testCase.nodeNameGateEnabled)()
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.svc, discovery.AddressTypeIPv4)
if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) {

View File

@@ -18,6 +18,7 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/controller/endpointslicemirroring/metrics:go_default_library",
"//pkg/controller/util/endpoint:go_default_library",
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@@ -28,6 +29,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
@@ -58,6 +60,7 @@ go_test(
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/endpointslicemirroring/metrics:go_default_library",
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@@ -66,6 +69,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
@@ -73,6 +77,7 @@ go_test(
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",

View File

@@ -27,10 +27,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/apis/discovery/validation"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
"k8s.io/kubernetes/pkg/features"
)
// addrTypePortMapKey is used to uniquely identify groups of endpoint ports and
@@ -138,6 +140,9 @@ func addressToEndpoint(address corev1.EndpointAddress, ready bool) *discovery.En
endpoint.Topology = map[string]string{
"kubernetes.io/hostname": *address.NodeName,
}
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceNodeName) {
endpoint.NodeName = address.NodeName
}
}
if address.Hostname != "" {
endpoint.Hostname = &address.Hostname

View File

@@ -27,8 +27,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/rand"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
)
func TestNewEndpointSlice(t *testing.T) {
@@ -76,6 +80,86 @@ func TestNewEndpointSlice(t *testing.T) {
}
}
func TestAddressToEndpoint(t *testing.T) {
testCases := []struct {
name string
epAddress v1.EndpointAddress
expectedEndpoint discovery.Endpoint
ready bool
nodeNameGateEnabled bool
}{{
name: "simple + gate enabled",
epAddress: v1.EndpointAddress{
IP: "10.1.2.3",
Hostname: "foo",
NodeName: utilpointer.StringPtr("node-abc"),
TargetRef: &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Namespace: "default",
Name: "foo",
},
},
ready: true,
nodeNameGateEnabled: true,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"10.1.2.3"},
Hostname: utilpointer.StringPtr("foo"),
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Topology: map[string]string{
"kubernetes.io/hostname": "node-abc",
},
TargetRef: &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Namespace: "default",
Name: "foo",
},
NodeName: utilpointer.StringPtr("node-abc"),
},
}, {
name: "simple + gate disabled",
epAddress: v1.EndpointAddress{
IP: "10.1.2.3",
Hostname: "foo",
NodeName: utilpointer.StringPtr("node-abc"),
TargetRef: &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Namespace: "default",
Name: "foo",
},
},
ready: true,
nodeNameGateEnabled: false,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"10.1.2.3"},
Hostname: utilpointer.StringPtr("foo"),
Conditions: discovery.EndpointConditions{
Ready: utilpointer.BoolPtr(true),
},
Topology: map[string]string{
"kubernetes.io/hostname": "node-abc",
},
TargetRef: &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Namespace: "default",
Name: "foo",
},
},
}}
for _, tc := range testCases {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceNodeName, tc.nodeNameGateEnabled)()
ep := addressToEndpoint(tc.epAddress, tc.ready)
assert.EqualValues(t, tc.expectedEndpoint, *ep)
}
}
// Test helpers
func newClientset() *fake.Clientset {

View File

@@ -14,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/endpoints:go_default_library",
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@@ -23,6 +24,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",

View File

@@ -23,8 +23,10 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1beta1"
"k8s.io/kubernetes/pkg/features"
utilnet "k8s.io/utils/net"
)
@@ -166,17 +168,22 @@ func getEndpointsFromAddresses(addresses []corev1.EndpointAddress, addressType d
// endpointFromAddress generates an Endpoint from an EndpointAddress resource.
func endpointFromAddress(address corev1.EndpointAddress, ready bool) discovery.Endpoint {
topology := map[string]string{}
if address.NodeName != nil {
topology["kubernetes.io/hostname"] = *address.NodeName
}
return discovery.Endpoint{
ep := discovery.Endpoint{
Addresses: []string{address.IP},
Conditions: discovery.EndpointConditions{Ready: &ready},
TargetRef: address.TargetRef,
Topology: topology,
}
if address.NodeName != nil {
ep.Topology = map[string]string{
"kubernetes.io/hostname": *address.NodeName,
}
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceNodeName) {
ep.NodeName = address.NodeName
}
}
return ep
}
// allAddressesIPv6 returns true if all provided addresses are IPv6.

View File

@@ -232,7 +232,8 @@ func TestEndpointsAdapterUpdate(t *testing.T) {
// with one that has an IPv4 address type.
endpoints4, _ := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.7", "10.1.2.8"})
_, epSlice4IP := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.7", "10.1.2.8"})
epSlice4IP.AddressType = discovery.AddressTypeIP
// "IP" is a deprecated address type, ensuring that it is handled properly.
epSlice4IP.AddressType = discovery.AddressType("IP")
_, epSlice4IPv4 := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.7", "10.1.2.8"})
testCases := map[string]struct {

View File

@@ -670,6 +670,12 @@ const (
// Enable Terminating condition in Endpoint Slices.
EndpointSliceTerminatingCondition featuregate.Feature = "EndpointSliceTerminatingCondition"
// owner: @robscott
// alpha: v1.20
//
// Enable NodeName field on Endpoint Slices.
EndpointSliceNodeName featuregate.Feature = "EndpointSliceNodeName"
// owner: @derekwaynecarr
// alpha: v1.20
//
@@ -787,6 +793,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
EndpointSlice: {Default: true, PreRelease: featuregate.Beta},
EndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta},
EndpointSliceTerminatingCondition: {Default: false, PreRelease: featuregate.Alpha},
EndpointSliceNodeName: {Default: false, PreRelease: featuregate.Alpha},
WindowsEndpointSliceProxying: {Default: false, PreRelease: featuregate.Alpha},
EvenPodsSpread: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.21
StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23

View File

@@ -19,6 +19,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/proxy",
deps = [
"//pkg/api/v1/service:go_default_library",
"//pkg/features:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library",
@@ -26,6 +27,7 @@ go_library(
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/net:go_default_library",

View File

@@ -36,7 +36,6 @@ import (
)
var supportedEndpointSliceAddressTypes = sets.NewString(
string(discovery.AddressTypeIP), // IP is a deprecated address type
string(discovery.AddressTypeIPv4),
string(discovery.AddressTypeIPv6),
)

View File

@@ -26,8 +26,10 @@ import (
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilnet "k8s.io/utils/net"
)
@@ -76,6 +78,7 @@ type endpointSliceInfo struct {
// Addresses and Topology are copied from EndpointSlice Endpoints.
type endpointInfo struct {
Addresses []string
NodeName *string
Topology map[string]string
}
@@ -120,10 +123,14 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *
if !remove {
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready {
esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
eInfo := endpointInfo{
Addresses: endpoint.Addresses,
Topology: endpoint.Topology,
})
}
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceNodeName) {
eInfo.NodeName = endpoint.NodeName
}
esInfo.Endpoints = append(esInfo.Endpoints, &eInfo)
}
}
@@ -255,7 +262,13 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName
continue
}
isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname])
isLocal := false
if endpoint.NodeName != nil {
isLocal = cache.isLocal(*endpoint.NodeName)
} else {
isLocal = cache.isLocal(endpoint.Topology[v1.LabelHostname])
}
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology)
// This logic ensures we're deduping potential overlapping endpoints

View File

@@ -50,7 +50,7 @@ func (endpointSliceStrategy) PrepareForCreate(ctx context.Context, obj runtime.O
endpointSlice := obj.(*discovery.EndpointSlice)
endpointSlice.Generation = 1
dropDisabledConditionsOnCreate(endpointSlice)
dropDisabledFieldsOnCreate(endpointSlice)
}
// PrepareForUpdate clears fields that are not allowed to be set by end users on update.
@@ -72,7 +72,7 @@ func (endpointSliceStrategy) PrepareForUpdate(ctx context.Context, obj, old runt
newEPS.ObjectMeta = ogNewMeta
oldEPS.ObjectMeta = ogOldMeta
dropDisabledConditionsOnUpdate(oldEPS, newEPS)
dropDisabledFieldsOnUpdate(oldEPS, newEPS)
}
// Validate validates a new EndpointSlice.
@@ -103,41 +103,56 @@ func (endpointSliceStrategy) AllowUnconditionalUpdate() bool {
return true
}
// dropDisabledConditionsOnCreate will drop the terminating condition if the
// EndpointSliceTerminatingCondition is disabled. Otherwise the field is left untouched.
func dropDisabledConditionsOnCreate(endpointSlice *discovery.EndpointSlice) {
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) {
return
}
// dropDisabledConditionsOnCreate will drop any fields that are disabled.
func dropDisabledFieldsOnCreate(endpointSlice *discovery.EndpointSlice) {
dropNodeName := !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceNodeName)
dropTerminating := !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition)
// Always drop the serving/terminating conditions on create when feature gate is disabled.
for i := range endpointSlice.Endpoints {
endpointSlice.Endpoints[i].Conditions.Serving = nil
endpointSlice.Endpoints[i].Conditions.Terminating = nil
if dropNodeName || dropTerminating {
for i := range endpointSlice.Endpoints {
if dropNodeName {
endpointSlice.Endpoints[i].NodeName = nil
}
if dropTerminating {
endpointSlice.Endpoints[i].Conditions.Serving = nil
endpointSlice.Endpoints[i].Conditions.Terminating = nil
}
}
}
}
// dropDisabledConditionsOnUpdate will drop the terminating condition field if the EndpointSliceTerminatingCondition
// feature gate is disabled unless an existing EndpointSlice object has the field already set. This ensures
// the field is not dropped on rollback.
func dropDisabledConditionsOnUpdate(oldEPS, newEPS *discovery.EndpointSlice) {
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) {
return
}
// Only drop the serving/terminating condition if the existing EndpointSlice doesn't have it set.
dropConditions := true
for _, ep := range oldEPS.Endpoints {
if ep.Conditions.Serving != nil || ep.Conditions.Terminating != nil {
dropConditions = false
break
// dropDisabledFieldsOnUpdate will drop any disable fields that have not already
// been set on the EndpointSlice.
func dropDisabledFieldsOnUpdate(oldEPS, newEPS *discovery.EndpointSlice) {
dropNodeName := !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceNodeName)
if dropNodeName {
for _, ep := range oldEPS.Endpoints {
if ep.NodeName != nil {
dropNodeName = false
break
}
}
}
if dropConditions {
dropTerminating := !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition)
if dropTerminating {
for _, ep := range oldEPS.Endpoints {
if ep.Conditions.Serving != nil || ep.Conditions.Terminating != nil {
dropTerminating = false
break
}
}
}
if dropNodeName || dropTerminating {
for i := range newEPS.Endpoints {
newEPS.Endpoints[i].Conditions.Serving = nil
newEPS.Endpoints[i].Conditions.Terminating = nil
if dropNodeName {
newEPS.Endpoints[i].NodeName = nil
}
if dropTerminating {
newEPS.Endpoints[i].Conditions.Serving = nil
newEPS.Endpoints[i].Conditions.Terminating = nil
}
}
}
}

View File

@@ -27,15 +27,16 @@ import (
utilpointer "k8s.io/utils/pointer"
)
func Test_dropConditionsOnCreate(t *testing.T) {
func Test_dropDisabledFieldsOnCreate(t *testing.T) {
testcases := []struct {
name string
terminatingGateEnabled bool
nodeNameGateEnabled bool
eps *discovery.EndpointSlice
expectedEPS *discovery.EndpointSlice
}{
{
name: "gate enabled, field should be allowed",
name: "terminating gate enabled, field should be allowed",
terminatingGateEnabled: true,
eps: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
@@ -83,7 +84,7 @@ func Test_dropConditionsOnCreate(t *testing.T) {
},
},
{
name: "gate disabled, field should be set to nil",
name: "terminating gate disabled, field should be set to nil",
terminatingGateEnabled: false,
eps: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
@@ -130,13 +131,62 @@ func Test_dropConditionsOnCreate(t *testing.T) {
},
},
},
{
name: "node name gate enabled, field should be allowed",
nodeNameGateEnabled: true,
eps: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
expectedEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
},
{
name: "node name gate disabled, field should be allowed",
nodeNameGateEnabled: false,
eps: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
expectedEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: nil,
},
{
NodeName: nil,
},
},
},
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testcase.terminatingGateEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceNodeName, testcase.nodeNameGateEnabled)()
dropDisabledConditionsOnCreate(testcase.eps)
dropDisabledFieldsOnCreate(testcase.eps)
if !apiequality.Semantic.DeepEqual(testcase.eps, testcase.expectedEPS) {
t.Logf("actual endpointslice: %v", testcase.eps)
t.Logf("expected endpointslice: %v", testcase.expectedEPS)
@@ -146,16 +196,17 @@ func Test_dropConditionsOnCreate(t *testing.T) {
}
}
func Test_dropTerminatingConditionOnUpdate(t *testing.T) {
func Test_dropDisabledFieldsOnUpdate(t *testing.T) {
testcases := []struct {
name string
terminatingGateEnabled bool
nodeNameGateEnabled bool
oldEPS *discovery.EndpointSlice
newEPS *discovery.EndpointSlice
expectedEPS *discovery.EndpointSlice
}{
{
name: "gate enabled, field should be allowed",
name: "terminating gate enabled, field should be allowed",
terminatingGateEnabled: true,
oldEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
@@ -225,7 +276,7 @@ func Test_dropTerminatingConditionOnUpdate(t *testing.T) {
},
},
{
name: "gate disabled, and not set on existing EPS",
name: "terminating gate disabled, and not set on existing EPS",
terminatingGateEnabled: false,
oldEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
@@ -295,7 +346,7 @@ func Test_dropTerminatingConditionOnUpdate(t *testing.T) {
},
},
{
name: "gate disabled, and set on existing EPS",
name: "terminating gate disabled, and set on existing EPS",
terminatingGateEnabled: false,
oldEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
@@ -365,7 +416,7 @@ func Test_dropTerminatingConditionOnUpdate(t *testing.T) {
},
},
{
name: "gate disabled, and set on existing EPS with new values",
name: "terminating gate disabled, and set on existing EPS with new values",
terminatingGateEnabled: false,
oldEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
@@ -431,13 +482,116 @@ func Test_dropTerminatingConditionOnUpdate(t *testing.T) {
},
},
},
{
name: "node name gate enabled, set on new EPS",
nodeNameGateEnabled: true,
oldEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: nil,
},
{
NodeName: nil,
},
},
},
newEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
expectedEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
},
{
name: "node name gate disabled, set on new EPS",
nodeNameGateEnabled: false,
oldEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: nil,
},
{
NodeName: nil,
},
},
},
newEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
expectedEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: nil,
},
{
NodeName: nil,
},
},
},
},
{
name: "node name gate disabled, set on old and updated EPS",
nodeNameGateEnabled: false,
oldEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1-old"),
},
{
NodeName: utilpointer.StringPtr("node-2-old"),
},
},
},
newEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
expectedEPS: &discovery.EndpointSlice{
Endpoints: []discovery.Endpoint{
{
NodeName: utilpointer.StringPtr("node-1"),
},
{
NodeName: utilpointer.StringPtr("node-2"),
},
},
},
},
}
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testcase.terminatingGateEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceNodeName, testcase.nodeNameGateEnabled)()
dropDisabledConditionsOnUpdate(testcase.oldEPS, testcase.newEPS)
dropDisabledFieldsOnUpdate(testcase.oldEPS, testcase.newEPS)
if !apiequality.Semantic.DeepEqual(testcase.newEPS, testcase.expectedEPS) {
t.Logf("actual endpointslice: %v", testcase.newEPS)
t.Logf("expected endpointslice: %v", testcase.expectedEPS)