Adding initial EndpointSlice metrics.

This commit is contained in:
Rob Scott
2019-09-12 17:05:34 -07:00
parent cd274ff270
commit 724b142f07
13 changed files with 657 additions and 56 deletions

View File

@@ -24,10 +24,12 @@ import (
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
)
@@ -37,6 +39,7 @@ type reconciler struct {
client clientset.Interface
nodeLister corelisters.NodeLister
maxEndpointsPerSlice int32
metricsCache *metrics.Cache
}
// endpointMeta includes the attributes we group slices on, this type helps with
@@ -52,20 +55,23 @@ type endpointMeta struct {
// to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
// Build data structures for existing state.
existingSlicesByPortMap := map[portMapKey][]*discovery.EndpointSlice{}
existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
numExistingEndpoints := 0
for _, existingSlice := range existingSlices {
epHash := newPortMapKey(existingSlice.Ports)
epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
numExistingEndpoints += len(existingSlice.Endpoints)
}
// Build data structures for desired state.
desiredMetaByPortMap := map[portMapKey]*endpointMeta{}
desiredEndpointsByPortMap := map[portMapKey]endpointSet{}
desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointSet{}
numDesiredEndpoints := 0
for _, pod := range pods {
if endpointutil.ShouldPodBeInEndpoints(pod) {
endpointPorts := getEndpointPorts(service, pod)
epHash := newPortMapKey(endpointPorts)
epHash := endpointutil.NewPortMapKey(endpointPorts)
if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
desiredEndpointsByPortMap[epHash] = endpointSet{}
}
@@ -85,17 +91,31 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
}
endpoint := podToEndpoint(pod, node)
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
numDesiredEndpoints++
}
}
slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
sliceNamesToDelete := sets.String{}
spMetrics := metrics.NewServicePortCache()
totalAdded := 0
totalRemoved := 0
// Determine changes necessary for each group of slices by port map.
for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
pmSlicesToCreate, pmSlicesToUpdate, pmSliceNamesToDelete := r.reconcileByPortMapping(
numEndpoints := len(desiredEndpoints)
pmSlicesToCreate, pmSlicesToUpdate, pmSliceNamesToDelete, added, removed := r.reconcileByPortMapping(
service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])
totalAdded += added
totalRemoved += removed
spMetrics.Set(portMap, metrics.EfficiencyInfo{
Endpoints: numEndpoints,
Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSliceNamesToDelete),
})
if len(pmSlicesToCreate) > 0 {
slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
}
@@ -121,8 +141,18 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
if len(existingSlices) == sliceNamesToDelete.Len() && len(slicesToCreate) < 1 {
placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}})
slicesToCreate = append(slicesToCreate, placeholderSlice)
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
Endpoints: 0,
Slices: 1,
})
}
metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))
serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
return r.finalize(service, slicesToCreate, slicesToUpdate, sliceNamesToDelete, triggerTime)
}
@@ -151,6 +181,8 @@ func (r *reconciler) finalize(
_, err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Create(endpointSlice)
if err != nil {
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
} else {
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
}
}
@@ -159,6 +191,8 @@ func (r *reconciler) finalize(
_, err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Update(endpointSlice)
if err != nil {
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
} else {
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
}
}
@@ -167,6 +201,8 @@ func (r *reconciler) finalize(
err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Delete(sliceName, &metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceName, service.Namespace, service.Name, err))
} else {
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}
}
@@ -187,11 +223,12 @@ func (r *reconciler) reconcileByPortMapping(
existingSlices []*discovery.EndpointSlice,
desiredSet endpointSet,
endpointMeta *endpointMeta,
) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, sets.String) {
) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, sets.String, int, int) {
slicesByName := map[string]*discovery.EndpointSlice{}
sliceNamesUnchanged := sets.String{}
sliceNamesToUpdate := sets.String{}
sliceNamesToDelete := sets.String{}
numRemoved := 0
// 1. Iterate through existing slices to delete endpoints no longer desired
// and update endpoints that have changed
@@ -217,6 +254,9 @@ func (r *reconciler) reconcileByPortMapping(
// If an endpoint was updated or removed, mark for update or delete
if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) {
if len(existingSlice.Endpoints) > len(newEndpoints) {
numRemoved += len(existingSlice.Endpoints) - len(newEndpoints)
}
if len(newEndpoints) == 0 {
// if no endpoints desired in this slice, mark for deletion
sliceNamesToDelete.Insert(existingSlice.Name)
@@ -231,6 +271,8 @@ func (r *reconciler) reconcileByPortMapping(
}
}
numAdded := desiredSet.Len()
// 2. If we still have desired endpoints to add and slices marked for update,
// iterate through the slices and fill them up with the desired endpoints.
if desiredSet.Len() > 0 && sliceNamesToUpdate.Len() > 0 {
@@ -297,5 +339,9 @@ func (r *reconciler) reconcileByPortMapping(
slicesToUpdate = append(slicesToUpdate, slicesByName[sliceName])
}
return slicesToCreate, slicesToUpdate, sliceNamesToDelete
return slicesToCreate, slicesToUpdate, sliceNamesToDelete, numAdded, numRemoved
}
func (r *reconciler) deleteService(namespace, name string) {
r.metricsCache.DeleteService(types.NamespacedName{Namespace: namespace, Name: name})
}