Merge pull request #82289 from robscott/endpointslice-fixes

Fixing bugs related to Endpoint Slices
This commit is contained in:
Kubernetes Prow Robot
2019-09-05 09:03:10 -07:00
committed by GitHub
22 changed files with 132 additions and 78 deletions

View File

@@ -179,7 +179,11 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
return false
}
namespacedName, _ := endpointSliceCacheKeys(endpointSlice)
namespacedName, _, err := endpointSliceCacheKeys(endpointSlice)
if err != nil {
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
return false
}
metrics.EndpointChangesTotal.Inc()

View File

@@ -17,6 +17,7 @@ limitations under the License.
package proxy
import (
"fmt"
"sort"
"k8s.io/api/core/v1"
@@ -79,12 +80,12 @@ func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
// Update a slice in the cache.
func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) {
serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice)
// This should never actually happen
if serviceKey.Name == "" || serviceKey.Namespace == "" || sliceKey == "" {
klog.Errorf("Invalid endpoint slice, name and owner reference required %v", endpointSlice)
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
if err != nil {
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
return
}
esInfo := &endpointSliceInfo{
Ports: endpointSlice.Ports,
Endpoints: []*endpointInfo{},
@@ -105,7 +106,11 @@ func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice)
// Delete a slice from the cache.
func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) {
serviceKey, sliceKey := endpointSliceCacheKeys(endpointSlice)
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
if err != nil {
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
return
}
delete(cache.sliceByServiceMap[serviceKey], sliceKey)
}
@@ -214,16 +219,15 @@ func formatEndpointsList(endpoints []Endpoint) []string {
}
// endpointSliceCacheKeys returns cache keys used for a given EndpointSlice.
func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string) {
if len(endpointSlice.OwnerReferences) == 0 {
klog.Errorf("No owner reference set on endpoint slice: %s", endpointSlice.Name)
return types.NamespacedName{}, endpointSlice.Name
func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string, error) {
var err error
serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
if !ok || serviceName == "" {
err = fmt.Errorf("No %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name)
} else if endpointSlice.Namespace == "" || endpointSlice.Name == "" {
err = fmt.Errorf("Expected EndpointSlice name and namespace to be set: %v", endpointSlice)
}
if len(endpointSlice.OwnerReferences) > 1 {
klog.Errorf("More than 1 owner reference set on endpoint slice: %s", endpointSlice.Name)
}
ownerRef := endpointSlice.OwnerReferences[0]
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: ownerRef.Name}, endpointSlice.Name
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
}
// byIP helps sort endpoints by IP

View File

@@ -200,9 +200,9 @@ func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, of
ipAddressType := discovery.AddressTypeIP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", serviceName, sliceNum),
Namespace: namespace,
OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}},
Name: fmt.Sprintf("%s-%d", serviceName, sliceNum),
Namespace: namespace,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{},
AddressType: &ipAddressType,

View File

@@ -505,7 +505,11 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced)
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
proxier.setInitialized(proxier.endpointSlicesSynced)
} else {
proxier.setInitialized(proxier.endpointsSynced)
}
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@@ -537,7 +541,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.setInitialized(proxier.servicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@@ -573,7 +577,7 @@ func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointS
func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.mu.Lock()
proxier.endpointSlicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced)
proxier.setInitialized(proxier.servicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@@ -675,7 +679,7 @@ func (proxier *Proxier) syncProxyRules() {
defer proxier.mu.Unlock()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || !proxier.servicesSynced {
if !proxier.isInitialized() {
klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}

View File

@@ -390,6 +390,7 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
nodePortAddresses: make([]string, 0),
networkInterfacer: utilproxytest.NewFakeNetwork(),
}
p.setInitialized(true)
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
return p
}
@@ -2363,9 +2364,9 @@ COMMIT
ipAddressType := discovery.AddressTypeIP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}},
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),

View File

@@ -19,6 +19,7 @@ go_test(
"//pkg/proxy/ipvs/testing:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/proxy/util/testing:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/ipset:go_default_library",
"//pkg/util/ipset/testing:go_default_library",
"//pkg/util/iptables:go_default_library",

View File

@@ -820,7 +820,11 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
proxier.setInitialized(proxier.endpointsSynced || proxier.endpointSlicesSynced)
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
proxier.setInitialized(proxier.endpointSlicesSynced)
} else {
proxier.setInitialized(proxier.endpointsSynced)
}
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@@ -848,7 +852,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.setInitialized(proxier.servicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@@ -884,7 +888,7 @@ func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointS
func (proxier *Proxier) OnEndpointSlicesSynced() {
proxier.mu.Lock()
proxier.endpointSlicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointSlicesSynced)
proxier.setInitialized(proxier.servicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
@@ -901,7 +905,7 @@ func (proxier *Proxier) syncProxyRules() {
defer proxier.mu.Unlock()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || !proxier.servicesSynced {
if !proxier.isInitialized() {
klog.V(2).Info("Not syncing ipvs rules until Services and Endpoints have been received from master")
return
}

View File

@@ -24,6 +24,7 @@ import (
"sort"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
@@ -36,6 +37,7 @@ import (
netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
"k8s.io/kubernetes/pkg/util/async"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
@@ -134,7 +136,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
for _, is := range ipsetInfo {
ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment)
}
return &Proxier{
p := &Proxier{
exec: fexec,
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
@@ -164,6 +166,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
networkInterfacer: proxyutiltest.NewFakeNetwork(),
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
}
p.setInitialized(true)
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
return p
}
func makeNSN(namespace, name string) types.NamespacedName {
@@ -3690,9 +3695,9 @@ func TestEndpointSliceE2E(t *testing.T) {
ipAddressType := discovery.AddressTypeIP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
OwnerReferences: []metav1.OwnerReference{{Kind: "Service", Name: serviceName}},
Name: fmt.Sprintf("%s-1", serviceName),
Namespace: namespaceName,
Labels: map[string]string{discovery.LabelServiceName: serviceName},
},
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr(""),