mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #110255 from robscott/fix-pod-eviction-ip
Endpoints and EndpointSlices should not publish IPs for terminal pods
This commit is contained in:
commit
8b6dd065d7
@ -301,6 +301,16 @@ func IsPodReady(pod *v1.Pod) bool {
|
|||||||
return IsPodReadyConditionTrue(pod.Status)
|
return IsPodReadyConditionTrue(pod.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsPodTerminal returns true if a pod is terminal, all containers are stopped and cannot ever regress.
|
||||||
|
func IsPodTerminal(pod *v1.Pod) bool {
|
||||||
|
return IsPodPhaseTerminal(pod.Status.Phase)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsPhaseTerminal returns true if the pod's phase is terminal.
|
||||||
|
func IsPodPhaseTerminal(phase v1.PodPhase) bool {
|
||||||
|
return phase == v1.PodFailed || phase == v1.PodSucceeded
|
||||||
|
}
|
||||||
|
|
||||||
// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
|
// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
|
||||||
func IsPodReadyConditionTrue(status v1.PodStatus) bool {
|
func IsPodReadyConditionTrue(status v1.PodStatus) bool {
|
||||||
condition := GetPodReadyCondition(status)
|
condition := GetPodReadyCondition(status)
|
||||||
|
@ -749,6 +749,48 @@ func TestIsPodAvailable(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsPodTerminal(t *testing.T) {
|
||||||
|
now := metav1.Now()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
podPhase v1.PodPhase
|
||||||
|
expected bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
podPhase: v1.PodFailed,
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podPhase: v1.PodSucceeded,
|
||||||
|
expected: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podPhase: v1.PodUnknown,
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podPhase: v1.PodPending,
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podPhase: v1.PodRunning,
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
expected: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
pod := newPod(now, true, 0)
|
||||||
|
pod.Status.Phase = test.podPhase
|
||||||
|
isTerminal := IsPodTerminal(pod)
|
||||||
|
if isTerminal != test.expected {
|
||||||
|
t.Errorf("[tc #%d] expected terminal pod: %t, got: %t", i, test.expected, isTerminal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetContainerStatus(t *testing.T) {
|
func TestGetContainerStatus(t *testing.T) {
|
||||||
type ExpectedStruct struct {
|
type ExpectedStruct struct {
|
||||||
status v1.ContainerStatus
|
status v1.ContainerStatus
|
||||||
|
@ -402,9 +402,6 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the user specified the older (deprecated) annotation, we have to respect it.
|
|
||||||
tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses
|
|
||||||
|
|
||||||
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
|
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
|
||||||
// state of the trigger time tracker gets updated even if the sync turns out
|
// state of the trigger time tracker gets updated even if the sync turns out
|
||||||
// to be no-op and we don't update the endpoints object.
|
// to be no-op and we don't update the endpoints object.
|
||||||
@ -416,12 +413,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
var totalNotReadyEps int
|
var totalNotReadyEps int
|
||||||
|
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
if len(pod.Status.PodIP) == 0 {
|
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
|
||||||
klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
|
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
|
|
||||||
klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -441,7 +434,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
// Allow headless service not to have ports.
|
// Allow headless service not to have ports.
|
||||||
if len(service.Spec.Ports) == 0 {
|
if len(service.Spec.Ports) == 0 {
|
||||||
if service.Spec.ClusterIP == api.ClusterIPNone {
|
if service.Spec.ClusterIP == api.ClusterIPNone {
|
||||||
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
|
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
|
||||||
// No need to repack subsets for headless service without ports.
|
// No need to repack subsets for headless service without ports.
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -455,7 +448,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
epp := endpointPortFromServicePort(servicePort, portNum)
|
epp := endpointPortFromServicePort(servicePort, portNum)
|
||||||
|
|
||||||
var readyEps, notReadyEps int
|
var readyEps, notReadyEps int
|
||||||
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
|
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
|
||||||
totalReadyEps = totalReadyEps + readyEps
|
totalReadyEps = totalReadyEps + readyEps
|
||||||
totalNotReadyEps = totalNotReadyEps + notReadyEps
|
totalNotReadyEps = totalNotReadyEps + notReadyEps
|
||||||
}
|
}
|
||||||
@ -591,6 +584,10 @@ func (e *Controller) checkLeftoverEndpoints() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addEndpointSubset add the endpoints addresses and ports to the EndpointSubset.
|
||||||
|
// The addresses are added to the corresponding field, ready or not ready, depending
|
||||||
|
// on the pod status and the Service PublishNotReadyAddresses field value.
|
||||||
|
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints.
|
||||||
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
|
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
|
||||||
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
|
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
|
||||||
var readyEps int
|
var readyEps int
|
||||||
@ -605,7 +602,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
|
|||||||
Ports: ports,
|
Ports: ports,
|
||||||
})
|
})
|
||||||
readyEps++
|
readyEps++
|
||||||
} else if shouldPodBeInEndpoints(pod) {
|
} else { // if it is not a ready address it has to be not ready
|
||||||
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
|
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
|
||||||
subsets = append(subsets, v1.EndpointSubset{
|
subsets = append(subsets, v1.EndpointSubset{
|
||||||
NotReadyAddresses: []v1.EndpointAddress{epa},
|
NotReadyAddresses: []v1.EndpointAddress{epa},
|
||||||
@ -616,17 +613,6 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
|
|||||||
return subsets, readyEps, notReadyEps
|
return subsets, readyEps, notReadyEps
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldPodBeInEndpoints(pod *v1.Pod) bool {
|
|
||||||
switch pod.Spec.RestartPolicy {
|
|
||||||
case v1.RestartPolicyNever:
|
|
||||||
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
|
|
||||||
case v1.RestartPolicyOnFailure:
|
|
||||||
return pod.Status.Phase != v1.PodSucceeded
|
|
||||||
default:
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
|
func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
|
||||||
epp := &v1.EndpointPort{
|
epp := &v1.EndpointPort{
|
||||||
Name: servicePort.Name,
|
Name: servicePort.Name,
|
||||||
|
@ -1155,97 +1155,6 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
|
|||||||
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
|
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
|
|
||||||
// Just list all of the 3 false cases and 3 of the 12 true cases.
|
|
||||||
func TestShouldPodBeInEndpoints(t *testing.T) {
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
pod *v1.Pod
|
|
||||||
expected bool
|
|
||||||
}{
|
|
||||||
// Pod should not be in endpoints cases:
|
|
||||||
{
|
|
||||||
name: "Failed pod with Never RestartPolicy",
|
|
||||||
pod: &v1.Pod{
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
RestartPolicy: v1.RestartPolicyNever,
|
|
||||||
},
|
|
||||||
Status: v1.PodStatus{
|
|
||||||
Phase: v1.PodFailed,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Succeeded pod with Never RestartPolicy",
|
|
||||||
pod: &v1.Pod{
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
RestartPolicy: v1.RestartPolicyNever,
|
|
||||||
},
|
|
||||||
Status: v1.PodStatus{
|
|
||||||
Phase: v1.PodSucceeded,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Succeeded pod with OnFailure RestartPolicy",
|
|
||||||
pod: &v1.Pod{
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
RestartPolicy: v1.RestartPolicyOnFailure,
|
|
||||||
},
|
|
||||||
Status: v1.PodStatus{
|
|
||||||
Phase: v1.PodSucceeded,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: false,
|
|
||||||
},
|
|
||||||
// Pod should be in endpoints cases:
|
|
||||||
{
|
|
||||||
name: "Failed pod with Always RestartPolicy",
|
|
||||||
pod: &v1.Pod{
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
RestartPolicy: v1.RestartPolicyAlways,
|
|
||||||
},
|
|
||||||
Status: v1.PodStatus{
|
|
||||||
Phase: v1.PodFailed,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Pending pod with Never RestartPolicy",
|
|
||||||
pod: &v1.Pod{
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
RestartPolicy: v1.RestartPolicyNever,
|
|
||||||
},
|
|
||||||
Status: v1.PodStatus{
|
|
||||||
Phase: v1.PodPending,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Unknown pod with OnFailure RestartPolicy",
|
|
||||||
pod: &v1.Pod{
|
|
||||||
Spec: v1.PodSpec{
|
|
||||||
RestartPolicy: v1.RestartPolicyOnFailure,
|
|
||||||
},
|
|
||||||
Status: v1.PodStatus{
|
|
||||||
Phase: v1.PodUnknown,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expected: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, test := range testCases {
|
|
||||||
result := shouldPodBeInEndpoints(test.pod)
|
|
||||||
if result != test.expected {
|
|
||||||
t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPodToEndpointAddressForService(t *testing.T) {
|
func TestPodToEndpointAddressForService(t *testing.T) {
|
||||||
ipv4 := v1.IPv4Protocol
|
ipv4 := v1.IPv4Protocol
|
||||||
ipv6 := v1.IPv6Protocol
|
ipv6 := v1.IPv6Protocol
|
||||||
@ -2313,6 +2222,235 @@ func TestMultipleServiceChanges(t *testing.T) {
|
|||||||
close(stopChan)
|
close(stopChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncServiceAddresses(t *testing.T) {
|
||||||
|
makeService := func(tolerateUnready bool) *v1.Service {
|
||||||
|
return &v1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"},
|
||||||
|
Spec: v1.ServiceSpec{
|
||||||
|
Selector: map[string]string{"foo": "bar"},
|
||||||
|
PublishNotReadyAddresses: tolerateUnready,
|
||||||
|
Type: v1.ServiceTypeClusterIP,
|
||||||
|
ClusterIP: "1.1.1.1",
|
||||||
|
Ports: []v1.ServicePort{{Port: 80}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod {
|
||||||
|
statusCondition := v1.ConditionFalse
|
||||||
|
if isReady {
|
||||||
|
statusCondition = v1.ConditionTrue
|
||||||
|
}
|
||||||
|
|
||||||
|
now := metav1.Now()
|
||||||
|
deletionTimestamp := &now
|
||||||
|
if !terminating {
|
||||||
|
deletionTimestamp = nil
|
||||||
|
}
|
||||||
|
return &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Namespace: "ns",
|
||||||
|
Name: "fakepod",
|
||||||
|
DeletionTimestamp: deletionTimestamp,
|
||||||
|
Labels: map[string]string{"foo": "bar"},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{{Ports: []v1.ContainerPort{
|
||||||
|
{Name: "port1", ContainerPort: int32(8080)},
|
||||||
|
}}},
|
||||||
|
},
|
||||||
|
Status: v1.PodStatus{
|
||||||
|
Phase: phase,
|
||||||
|
Conditions: []v1.PodCondition{
|
||||||
|
{
|
||||||
|
Type: v1.PodReady,
|
||||||
|
Status: statusCondition,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PodIP: "10.1.1.1",
|
||||||
|
PodIPs: []v1.PodIP{
|
||||||
|
{IP: "10.1.1.1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
pod *v1.Pod
|
||||||
|
service *v1.Service
|
||||||
|
expectedReady int
|
||||||
|
expectedUnready int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "pod running phase",
|
||||||
|
pod: makePod(v1.PodRunning, true, false),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod running phase being deleted",
|
||||||
|
pod: makePod(v1.PodRunning, true, true),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod unknown phase container ready",
|
||||||
|
pod: makePod(v1.PodUnknown, true, false),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod unknown phase container ready being deleted",
|
||||||
|
pod: makePod(v1.PodUnknown, true, true),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod pending phase container ready",
|
||||||
|
pod: makePod(v1.PodPending, true, false),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod pending phase container ready being deleted",
|
||||||
|
pod: makePod(v1.PodPending, true, true),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod unknown phase container not ready",
|
||||||
|
pod: makePod(v1.PodUnknown, false, false),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod pending phase container not ready",
|
||||||
|
pod: makePod(v1.PodPending, false, false),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod failed phase",
|
||||||
|
pod: makePod(v1.PodFailed, false, false),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod succeeded phase",
|
||||||
|
pod: makePod(v1.PodSucceeded, false, false),
|
||||||
|
service: makeService(false),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod running phase and tolerate unready",
|
||||||
|
pod: makePod(v1.PodRunning, false, false),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod running phase and tolerate unready being deleted",
|
||||||
|
pod: makePod(v1.PodRunning, false, true),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod unknown phase and tolerate unready",
|
||||||
|
pod: makePod(v1.PodUnknown, false, false),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod unknown phase and tolerate unready being deleted",
|
||||||
|
pod: makePod(v1.PodUnknown, false, true),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod pending phase and tolerate unready",
|
||||||
|
pod: makePod(v1.PodPending, false, false),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod pending phase and tolerate unready being deleted",
|
||||||
|
pod: makePod(v1.PodPending, false, true),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 1,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod failed phase and tolerate unready",
|
||||||
|
pod: makePod(v1.PodFailed, false, false),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "pod succeeded phase and tolerate unready endpoints",
|
||||||
|
pod: makePod(v1.PodSucceeded, false, false),
|
||||||
|
service: makeService(true),
|
||||||
|
expectedReady: 0,
|
||||||
|
expectedUnready: 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
ns := tc.service.Namespace
|
||||||
|
client, c := newFakeController(0 * time.Second)
|
||||||
|
|
||||||
|
err := c.podStore.Add(tc.pod)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error adding pod %v", err)
|
||||||
|
}
|
||||||
|
err = c.serviceStore.Add(tc.service)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error adding service %v", err)
|
||||||
|
}
|
||||||
|
err = c.syncService(context.TODO(), fmt.Sprintf("%s/%s", ns, tc.service.Name))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error syncing service %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), tc.service.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
readyEndpoints := 0
|
||||||
|
unreadyEndpoints := 0
|
||||||
|
for _, subset := range endpoints.Subsets {
|
||||||
|
readyEndpoints += len(subset.Addresses)
|
||||||
|
unreadyEndpoints += len(subset.NotReadyAddresses)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.expectedReady != readyEndpoints {
|
||||||
|
t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.expectedUnready != unreadyEndpoints {
|
||||||
|
t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEndpointsDeletionEvents(t *testing.T) {
|
func TestEndpointsDeletionEvents(t *testing.T) {
|
||||||
ns := metav1.NamespaceDefault
|
ns := metav1.NamespaceDefault
|
||||||
testServer, _ := makeTestServer(t, ns)
|
testServer, _ := makeTestServer(t, ns)
|
||||||
|
@ -154,7 +154,7 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
|
|||||||
|
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
includeTerminating := service.Spec.PublishNotReadyAddresses || utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition)
|
includeTerminating := service.Spec.PublishNotReadyAddresses || utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition)
|
||||||
if !endpointutil.ShouldPodBeInEndpointSlice(pod, includeTerminating) {
|
if !endpointutil.ShouldPodBeInEndpoints(pod, includeTerminating) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,9 +135,17 @@ func DeepHashObjectToString(objectToWrite interface{}) string {
|
|||||||
return hex.EncodeToString(hasher.Sum(nil)[0:])
|
return hex.EncodeToString(hasher.Sum(nil)[0:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an EndpointSlice object.
|
// ShouldPodBeInEndpoints returns true if a specified pod should be in an
|
||||||
// Terminating pods are only included if includeTerminating is true
|
// Endpoints or EndpointSlice resource. Terminating pods are only included if
|
||||||
func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool {
|
// includeTerminating is true.
|
||||||
|
func ShouldPodBeInEndpoints(pod *v1.Pod, includeTerminating bool) bool {
|
||||||
|
// "Terminal" describes when a Pod is complete (in a succeeded or failed phase).
|
||||||
|
// This is distinct from the "Terminating" condition which represents when a Pod
|
||||||
|
// is being terminated (metadata.deletionTimestamp is non nil).
|
||||||
|
if podutil.IsPodTerminal(pod) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
|
if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -146,14 +154,6 @@ func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
|
|
||||||
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
|
|
||||||
}
|
|
||||||
|
|
||||||
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
|
|
||||||
return pod.Status.Phase != v1.PodSucceeded
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,10 +99,7 @@ func TestDetermineNeededServiceUpdates(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase).
|
func TestShouldPodBeInEndpoints(t *testing.T) {
|
||||||
// Not listing them all here. Just listing all of the 3 false cases and 3 of the
|
|
||||||
// 12 true cases.
|
|
||||||
func TestShouldPodBeInEndpointSlice(t *testing.T) {
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
pod *v1.Pod
|
pod *v1.Pod
|
||||||
@ -179,7 +176,6 @@ func TestShouldPodBeInEndpointSlice(t *testing.T) {
|
|||||||
},
|
},
|
||||||
expected: false,
|
expected: false,
|
||||||
},
|
},
|
||||||
// Pod should be in endpoints:
|
|
||||||
{
|
{
|
||||||
name: "Failed pod with Always RestartPolicy",
|
name: "Failed pod with Always RestartPolicy",
|
||||||
pod: &v1.Pod{
|
pod: &v1.Pod{
|
||||||
@ -191,8 +187,9 @@ func TestShouldPodBeInEndpointSlice(t *testing.T) {
|
|||||||
PodIP: "1.2.3.4",
|
PodIP: "1.2.3.4",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expected: true,
|
expected: false,
|
||||||
},
|
},
|
||||||
|
// Pod should be in endpoints:
|
||||||
{
|
{
|
||||||
name: "Pending pod with Never RestartPolicy",
|
name: "Pending pod with Never RestartPolicy",
|
||||||
pod: &v1.Pod{
|
pod: &v1.Pod{
|
||||||
@ -266,7 +263,7 @@ func TestShouldPodBeInEndpointSlice(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range testCases {
|
for _, test := range testCases {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
result := ShouldPodBeInEndpointSlice(test.pod, test.includeTerminating)
|
result := ShouldPodBeInEndpoints(test.pod, test.includeTerminating)
|
||||||
if result != test.expected {
|
if result != test.expected {
|
||||||
t.Errorf("expected: %t, got: %t", test.expected, result)
|
t.Errorf("expected: %t, got: %t", test.expected, result)
|
||||||
}
|
}
|
||||||
|
@ -859,7 +859,7 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon
|
|||||||
// the Kubelet exclusively owns must be released prior to a pod being reported terminal,
|
// the Kubelet exclusively owns must be released prior to a pod being reported terminal,
|
||||||
// while resources that have participanting components above the API use the pod's
|
// while resources that have participanting components above the API use the pod's
|
||||||
// transition to a terminal phase (or full deletion) to release those resources.
|
// transition to a terminal phase (or full deletion) to release those resources.
|
||||||
if !isPhaseTerminal(oldPodStatus.Phase) && isPhaseTerminal(newPodStatus.Phase) {
|
if !podutil.IsPodPhaseTerminal(oldPodStatus.Phase) && podutil.IsPodPhaseTerminal(newPodStatus.Phase) {
|
||||||
if couldHaveRunningContainers {
|
if couldHaveRunningContainers {
|
||||||
newPodStatus.Phase = oldPodStatus.Phase
|
newPodStatus.Phase = oldPodStatus.Phase
|
||||||
newPodStatus.Reason = oldPodStatus.Reason
|
newPodStatus.Reason = oldPodStatus.Reason
|
||||||
@ -870,11 +870,6 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon
|
|||||||
return newPodStatus
|
return newPodStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
// isPhaseTerminal returns true if the pod's phase is terminal.
|
|
||||||
func isPhaseTerminal(phase v1.PodPhase) bool {
|
|
||||||
return phase == v1.PodFailed || phase == v1.PodSucceeded
|
|
||||||
}
|
|
||||||
|
|
||||||
// NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile
|
// NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile
|
||||||
func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
|
func NeedToReconcilePodReadiness(pod *v1.Pod) bool {
|
||||||
if len(pod.Spec.ReadinessGates) == 0 {
|
if len(pod.Spec.ReadinessGates) == 0 {
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
discoveryv1 "k8s.io/api/discovery/v1"
|
discoveryv1 "k8s.io/api/discovery/v1"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
@ -2040,6 +2041,97 @@ var _ = common.SIGDescribe("Services", func() {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// regression test for https://issues.k8s.io/109414 and https://issues.k8s.io/109718
|
||||||
|
ginkgo.It("should be rejected for evicted pods (no endpoints exist)", func() {
|
||||||
|
namespace := f.Namespace.Name
|
||||||
|
serviceName := "evicted-pods"
|
||||||
|
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
|
||||||
|
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, e2eservice.MaxNodesForEndpointsTests)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
nodeName := nodes.Items[0].Name
|
||||||
|
|
||||||
|
port := 80
|
||||||
|
|
||||||
|
ginkgo.By("creating a service with no endpoints")
|
||||||
|
_, err = jig.CreateTCPServiceWithPort(func(s *v1.Service) {
|
||||||
|
// set publish not ready addresses to cover edge cases too
|
||||||
|
s.Spec.PublishNotReadyAddresses = true
|
||||||
|
}, int32(port))
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
// Create a pod in one node to get evicted
|
||||||
|
ginkgo.By("creating a client pod that is going to be evicted for the service " + serviceName)
|
||||||
|
evictedPod := e2epod.NewAgnhostPod(namespace, "evicted-pod", nil, nil, nil)
|
||||||
|
evictedPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "sleep 10; fallocate -l 10M file; sleep 10000"}
|
||||||
|
evictedPod.Spec.Containers[0].Name = "evicted-pod"
|
||||||
|
evictedPod.Spec.Containers[0].Resources = v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{"ephemeral-storage": resource.MustParse("5Mi")},
|
||||||
|
}
|
||||||
|
f.PodClient().Create(evictedPod)
|
||||||
|
err = e2epod.WaitForPodTerminatedInNamespace(f.ClientSet, evictedPod.Name, "Evicted", f.Namespace.Name)
|
||||||
|
if err != nil {
|
||||||
|
framework.Failf("error waiting for pod to be evicted: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
podName := "execpod-evictedpods"
|
||||||
|
ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName))
|
||||||
|
execPod := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) {
|
||||||
|
nodeSelection := e2epod.NodeSelection{Name: nodeName}
|
||||||
|
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||||
|
})
|
||||||
|
|
||||||
|
if epErr := wait.PollImmediate(framework.Poll, e2eservice.ServiceEndpointsTimeout, func() (bool, error) {
|
||||||
|
endpoints, err := cs.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
framework.Logf("error fetching '%s/%s' Endpoints: %s", namespace, serviceName, err.Error())
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if len(endpoints.Subsets) > 0 {
|
||||||
|
framework.Logf("expected '%s/%s' Endpoints to be empty, got: %v", namespace, serviceName, endpoints.Subsets)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
epsList, err := cs.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
|
||||||
|
if err != nil {
|
||||||
|
framework.Logf("error fetching '%s/%s' EndpointSlices: %s", namespace, serviceName, err.Error())
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if len(epsList.Items) != 1 {
|
||||||
|
framework.Logf("expected exactly 1 EndpointSlice, got: %d", len(epsList.Items))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
endpointSlice := epsList.Items[0]
|
||||||
|
if len(endpointSlice.Endpoints) > 0 {
|
||||||
|
framework.Logf("expected EndpointSlice to be empty, got %d endpoints", len(endpointSlice.Endpoints))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}); epErr != nil {
|
||||||
|
framework.ExpectNoError(epErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port))
|
||||||
|
framework.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress)
|
||||||
|
cmd := fmt.Sprintf("/agnhost connect --timeout=3s %s", serviceAddress)
|
||||||
|
|
||||||
|
ginkgo.By(fmt.Sprintf("hitting service %v from pod %v on node %v expected to be refused", serviceAddress, podName, nodeName))
|
||||||
|
expectedErr := "REFUSED"
|
||||||
|
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) {
|
||||||
|
_, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if strings.Contains(err.Error(), expectedErr) {
|
||||||
|
framework.Logf("error contained '%s', as expected: %s", expectedErr, err.Error())
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
framework.Logf("error didn't contain '%s', keep trying: %s", expectedErr, err.Error())
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, errors.New("expected connect call to fail")
|
||||||
|
}); pollErr != nil {
|
||||||
|
framework.ExpectNoError(pollErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
ginkgo.It("should respect internalTrafficPolicy=Local Pod to Pod [Feature:ServiceInternalTrafficPolicy]", func() {
|
ginkgo.It("should respect internalTrafficPolicy=Local Pod to Pod [Feature:ServiceInternalTrafficPolicy]", func() {
|
||||||
// windows kube-proxy does not support this feature yet
|
// windows kube-proxy does not support this feature yet
|
||||||
// TODO: remove this skip when windows-based proxies implement internalTrafficPolicy
|
// TODO: remove this skip when windows-based proxies implement internalTrafficPolicy
|
||||||
|
@ -298,7 +298,49 @@ var _ = SIGDescribe("Pods Extended", func() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
ginkgo.It("evicted pods should be terminal", func() {
|
||||||
|
ginkgo.By("creating the pod that should be evicted")
|
||||||
|
|
||||||
|
name := "pod-should-be-evicted" + string(uuid.NewUUID())
|
||||||
|
image := imageutils.GetE2EImage(imageutils.BusyBox)
|
||||||
|
pod := &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
RestartPolicy: v1.RestartPolicyOnFailure,
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: "bar",
|
||||||
|
Image: image,
|
||||||
|
Command: []string{
|
||||||
|
"/bin/sh", "-c", "sleep 10; fallocate -l 10M file; sleep 10000",
|
||||||
|
},
|
||||||
|
Resources: v1.ResourceRequirements{
|
||||||
|
Limits: v1.ResourceList{
|
||||||
|
"ephemeral-storage": resource.MustParse("5Mi"),
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ginkgo.By("submitting the pod to kubernetes")
|
||||||
|
podClient.Create(pod)
|
||||||
|
defer func() {
|
||||||
|
ginkgo.By("deleting the pod")
|
||||||
|
podClient.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := e2epod.WaitForPodTerminatedInNamespace(f.ClientSet, pod.Name, "Evicted", f.Namespace.Name)
|
||||||
|
if err != nil {
|
||||||
|
framework.Failf("error waiting for pod to be evicted: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
func createAndTestPodRepeatedly(workers, iterations int, scenario podScenario, podClient v1core.PodInterface) {
|
func createAndTestPodRepeatedly(workers, iterations int, scenario podScenario, podClient v1core.PodInterface) {
|
||||||
|
Loading…
Reference in New Issue
Block a user