Merge pull request #93597 from wojtek-t/validate_endpoint_slices

Wait for both endpoints and endpointslices in e2e tests
This commit is contained in:
Kubernetes Prow Robot 2020-08-01 20:22:04 -07:00 committed by GitHub
commit cf6a0868be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 108 additions and 14 deletions

View File

@ -105,6 +105,7 @@ filegroup(
"//test/e2e/framework/config:all-srcs",
"//test/e2e/framework/deployment:all-srcs",
"//test/e2e/framework/endpoints:all-srcs",
"//test/e2e/framework/endpointslice:all-srcs",
"//test/e2e/framework/events:all-srcs",
"//test/e2e/framework/ginkgowrapper:all-srcs",
"//test/e2e/framework/gpu:all-srcs",

View File

@ -14,12 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
/*
This soak tests places a specified number of pods on each node and then
repeatedly sends queries to a service running on these pods via
a serivce
*/
package endpoints
import (

View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["ports.go"],
importpath = "k8s.io/kubernetes/test/e2e/framework/endpointslice",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,46 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpointslice
import (
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
)
// PortsByPodUID is a map that maps pod UID to container ports.
type PortsByPodUID map[types.UID][]int
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetContainerPortsByPodUID(eps []discoveryv1beta1.EndpointSlice) PortsByPodUID {
m := PortsByPodUID{}
for _, es := range eps {
for _, port := range es.Ports {
if port.Port == nil {
continue
}
for _, ep := range es.Endpoints {
containerPort := *port.Port
if _, ok := m[ep.TargetRef.UID]; !ok {
m[ep.TargetRef.UID] = make([]int, 0)
}
m[ep.TargetRef.UID] = append(m[ep.TargetRef.UID], int(containerPort))
}
}
}
return m
}

View File

@ -74,6 +74,7 @@ go_library(
"//test/e2e/framework/auth:go_default_library",
"//test/e2e/framework/deployment:go_default_library",
"//test/e2e/framework/endpoints:go_default_library",
"//test/e2e/framework/endpointslice:go_default_library",
"//test/e2e/framework/ingress:go_default_library",
"//test/e2e/framework/kubesystem:go_default_library",
"//test/e2e/framework/network:go_default_library",

View File

@ -37,6 +37,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
@ -50,6 +51,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice"
e2ekubesystem "k8s.io/kubernetes/test/e2e/framework/kubesystem"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -104,6 +106,9 @@ var (
// portsByPodName is a map that maps pod name to container ports.
type portsByPodName map[string][]int
// portsByPodUID is a map that maps pod name to container ports.
type portsByPodUID map[types.UID][]int
// affinityCheckFromPod returns interval, timeout and function pinging the service and
// returning pinged hosts for pinging the service from execPod.
func affinityCheckFromPod(execPod *v1.Pod, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) {
@ -3696,7 +3701,7 @@ func enableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(sv
return framework.TestContext.CloudConfig.Provider.EnableAndDisableInternalLB()
}
func validatePorts(ep e2eendpoints.PortsByPodUID, expectedEndpoints e2eendpoints.PortsByPodUID) error {
func validatePorts(ep, expectedEndpoints portsByPodUID) error {
if len(ep) != len(expectedEndpoints) {
// should not happen because we check this condition before
return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
@ -3719,8 +3724,8 @@ func validatePorts(ep e2eendpoints.PortsByPodUID, expectedEndpoints e2eendpoints
return nil
}
func translatePodNameToUID(c clientset.Interface, ns string, expectedEndpoints portsByPodName) (e2eendpoints.PortsByPodUID, error) {
portsByUID := make(e2eendpoints.PortsByPodUID)
func translatePodNameToUID(c clientset.Interface, ns string, expectedEndpoints portsByPodName) (portsByPodUID, error) {
portsByUID := make(portsByPodUID)
for name, portList := range expectedEndpoints {
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
@ -3741,21 +3746,42 @@ func validateEndpointsPorts(c clientset.Interface, namespace, serviceName string
i := 0
if pollErr := wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) {
i++
ep, err := c.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
framework.Logf("Failed go get Endpoints object: %v", err)
// Retry the error
return false, nil
}
portsByPodUID := e2eendpoints.GetContainerPortsByPodUID(ep)
i++
if err := validatePorts(portsByPodUID, expectedPortsByPodUID); err != nil {
portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep))
if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByPodUID, expectedEndpoints)
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
}
return false, nil
}
// If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects
// were also create/updated/deleted.
if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1beta1.SchemeGroupVersion.String()); err == nil {
opts := metav1.ListOptions{
LabelSelector: "kubernetes.io/service-name=" + serviceName,
}
es, err := c.DiscoveryV1beta1().EndpointSlices(namespace).List(context.TODO(), opts)
if err != nil {
framework.Logf("Failed go list EndpointSlice objects: %v", err)
// Retry the error
return false, nil
}
portsByUID = portsByPodUID(e2eendpointslice.GetContainerPortsByPodUID(es.Items))
if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
}
return false, nil
}
}
framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v",
serviceName, namespace, expectedEndpoints)
return true, nil