Maintain index of high-cardinality edges in node authorizer graph

This commit is contained in:
Jordan Liggitt 2018-04-19 22:22:25 -04:00
parent ad7d5505b9
commit ff8cdabfd4
No known key found for this signature in database
GPG Key ID: 39928704103C7229
6 changed files with 261 additions and 2 deletions

View File

@ -10,6 +10,7 @@ go_test(
name = "go_default_test",
srcs = [
"graph_test.go",
"intset_test.go",
"node_authorizer_test.go",
],
embed = [":go_default_library"],
@ -33,6 +34,7 @@ go_library(
srcs = [
"graph.go",
"graph_populator.go",
"intset.go",
"node_authorizer.go",
],
importpath = "k8s.io/kubernetes/plugin/pkg/auth/authorizer/node",

View File

@ -80,6 +80,11 @@ type Graph struct {
graph *simple.DirectedAcyclicGraph
// vertices is a map of type -> namespace -> name -> vertex
vertices map[vertexType]namespaceVertexMapping
// destinationEdgeIndex is a map of vertex -> set of destination IDs
destinationEdgeIndex map[int]*intSet
// destinationEdgeThreshold is the minimum number of distinct destination IDs at which to maintain an index
destinationEdgeThreshold int
}
// namespaceVertexMapping is a map of namespace -> name -> vertex
@ -92,6 +97,11 @@ func NewGraph() *Graph {
return &Graph{
vertices: map[vertexType]namespaceVertexMapping{},
graph: simple.NewDirectedAcyclicGraph(0, 0),
destinationEdgeIndex: map[int]*intSet{},
// experimentally determined to be the point at which iteration adds an order of magnitude to the authz check.
// since maintaining indexes costs time/memory while processing graph changes, we don't want to make this too low.
destinationEdgeThreshold: 200,
}
}
@ -165,6 +175,7 @@ func (g *Graph) deleteVertex_locked(vertexType vertexType, namespace, name strin
// find existing neighbors with a single edge (meaning we are their only neighbor)
neighborsToRemove := []graph.Node{}
neighborsToRecompute := []graph.Node{}
g.graph.VisitFrom(vertex, func(neighbor graph.Node) bool {
// this downstream neighbor has only one edge (which must be from us), so remove them as well
if g.graph.Degree(neighbor) == 1 {
@ -176,6 +187,9 @@ func (g *Graph) deleteVertex_locked(vertexType vertexType, namespace, name strin
if g.graph.Degree(neighbor) == 1 {
// this upstream neighbor has only one edge (which must be to us), so remove them as well
neighborsToRemove = append(neighborsToRemove, neighbor)
} else {
// recompute the destination edge index on this neighbor
neighborsToRecompute = append(neighborsToRemove, neighbor)
}
return true
})
@ -187,6 +201,11 @@ func (g *Graph) deleteVertex_locked(vertexType vertexType, namespace, name strin
for _, neighbor := range neighborsToRemove {
g.removeVertex_locked(neighbor.(*namedVertex))
}
// recompute destination indexes for neighbors that dropped outbound edges
for _, neighbor := range neighborsToRecompute {
g.recomputeDestinationIndex_locked(neighbor)
}
}
// must be called under write lock
@ -201,6 +220,7 @@ func (g *Graph) deleteEdges_locked(fromType, toType vertexType, toNamespace, toN
// delete all edges between vertices of fromType and toVert
neighborsToRemove := []*namedVertex{}
neighborsToRecompute := []*namedVertex{}
g.graph.VisitTo(toVert, func(from graph.Node) bool {
fromVert := from.(*namedVertex)
if fromVert.vertexType != fromType {
@ -211,6 +231,8 @@ func (g *Graph) deleteEdges_locked(fromType, toType vertexType, toNamespace, toN
// track vertexes that changed edges
if g.graph.Degree(fromVert) == 0 {
neighborsToRemove = append(neighborsToRemove, fromVert)
} else {
neighborsToRecompute = append(neighborsToRecompute, fromVert)
}
return true
})
@ -219,6 +241,11 @@ func (g *Graph) deleteEdges_locked(fromType, toType vertexType, toNamespace, toN
for _, v := range neighborsToRemove {
g.removeVertex_locked(v)
}
// recompute destination indexes for neighbors that dropped outbound edges
for _, v := range neighborsToRecompute {
g.recomputeDestinationIndex_locked(v)
}
}
// must be called under write lock
@ -226,12 +253,49 @@ func (g *Graph) deleteEdges_locked(fromType, toType vertexType, toNamespace, toN
// It does nothing to indexes of neighbor vertices.
func (g *Graph) removeVertex_locked(v *namedVertex) {
g.graph.RemoveNode(v)
delete(g.destinationEdgeIndex, v.ID())
delete(g.vertices[v.vertexType][v.namespace], v.name)
if len(g.vertices[v.vertexType][v.namespace]) == 0 {
delete(g.vertices[v.vertexType], v.namespace)
}
}
// must be called under write lock
// recomputeDestinationIndex_locked recomputes the index of destination ids for the specified vertex
func (g *Graph) recomputeDestinationIndex_locked(n graph.Node) {
// don't maintain indices for nodes with few edges
edgeCount := g.graph.Degree(n)
if edgeCount < g.destinationEdgeThreshold {
delete(g.destinationEdgeIndex, n.ID())
return
}
// get or create the index
index := g.destinationEdgeIndex[n.ID()]
if index == nil {
index = newIntSet()
} else {
index.startNewGeneration()
}
// populate the index
g.graph.VisitFrom(n, func(dest graph.Node) bool {
if destinationEdge, ok := g.graph.EdgeBetween(n, dest).(*destinationEdge); ok {
index.mark(destinationEdge.DestinationID())
}
return true
})
// remove existing items no longer in the list
index.sweep()
if len(index.members) < g.destinationEdgeThreshold {
delete(g.destinationEdgeIndex, n.ID())
} else {
g.destinationEdgeIndex[n.ID()] = index
}
}
// AddPod should only be called once spec.NodeName is populated.
// It sets up edges for the following relationships (which are immutable for a pod once bound to a node):
//
@ -257,17 +321,20 @@ func (g *Graph) AddPod(pod *api.Pod) {
if len(pod.Spec.ServiceAccountName) > 0 {
serviceAccountVertex := g.getOrCreateVertex_locked(serviceAccountVertexType, pod.Namespace, pod.Spec.ServiceAccountName)
g.graph.SetEdge(newDestinationEdge(serviceAccountVertex, podVertex, nodeVertex))
g.recomputeDestinationIndex_locked(serviceAccountVertex)
}
podutil.VisitPodSecretNames(pod, func(secret string) bool {
secretVertex := g.getOrCreateVertex_locked(secretVertexType, pod.Namespace, secret)
g.graph.SetEdge(newDestinationEdge(secretVertex, podVertex, nodeVertex))
g.recomputeDestinationIndex_locked(secretVertex)
return true
})
podutil.VisitPodConfigmapNames(pod, func(configmap string) bool {
configmapVertex := g.getOrCreateVertex_locked(configMapVertexType, pod.Namespace, configmap)
g.graph.SetEdge(newDestinationEdge(configmapVertex, podVertex, nodeVertex))
g.recomputeDestinationIndex_locked(configmapVertex)
return true
})
@ -275,6 +342,7 @@ func (g *Graph) AddPod(pod *api.Pod) {
if v.PersistentVolumeClaim != nil {
pvcVertex := g.getOrCreateVertex_locked(pvcVertexType, pod.Namespace, v.PersistentVolumeClaim.ClaimName)
g.graph.SetEdge(newDestinationEdge(pvcVertex, podVertex, nodeVertex))
g.recomputeDestinationIndex_locked(pvcVertex)
}
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
// intSet maintains a set of ints, and supports promoting and culling the previous generation.
// this allows tracking a large, mostly-stable set without constantly reallocating the entire set.
type intSet struct {
currentGeneration byte
members map[int]byte
}
func newIntSet() *intSet {
return &intSet{members: map[int]byte{}}
}
// has returns true if the specified int is in the set.
// it is safe to call concurrently, but must not be called concurrently with any of the other methods.
func (s *intSet) has(i int) bool {
if s == nil {
return false
}
_, present := s.members[i]
return present
}
// startNewGeneration begins a new generation.
// it must be followed by a call to mark() for every member of the generation,
// then a call to sweep() to remove members not present in the generation.
// it is not thread-safe.
func (s *intSet) startNewGeneration() {
s.currentGeneration++
}
// mark indicates the specified int belongs to the current generation.
// it is not thread-safe.
func (s *intSet) mark(i int) {
s.members[i] = s.currentGeneration
}
// sweep removes items not in the current generation.
// it is not thread-safe.
func (s *intSet) sweep() {
for k, v := range s.members {
if v != s.currentGeneration {
delete(s.members, k)
}
}
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package node
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestIntSet(t *testing.T) {
i := newIntSet()
assert.False(t, i.has(1))
assert.False(t, i.has(2))
assert.False(t, i.has(3))
assert.False(t, i.has(4))
i.startNewGeneration()
i.mark(1)
i.mark(2)
i.sweep()
assert.True(t, i.has(1))
assert.True(t, i.has(2))
assert.False(t, i.has(3))
assert.False(t, i.has(4))
i.startNewGeneration()
i.mark(2)
i.mark(3)
i.sweep()
assert.False(t, i.has(1))
assert.True(t, i.has(2))
assert.True(t, i.has(3))
assert.False(t, i.has(4))
i.startNewGeneration()
i.mark(3)
i.mark(4)
i.sweep()
assert.False(t, i.has(1))
assert.False(t, i.has(2))
assert.True(t, i.has(3))
assert.True(t, i.has(4))
}

View File

@ -212,6 +212,11 @@ func (r *NodeAuthorizer) hasPathFrom(nodeName string, startingType vertexType, s
return false, fmt.Errorf("node %q cannot get unknown %s %s/%s", nodeName, vertexTypes[startingType], startingNamespace, startingName)
}
// Fast check to see if we know of a destination edge
if r.graph.destinationEdgeIndex[startingVertex.ID()].has(nodeVertex.ID()) {
return true, nil
}
found := false
traversal := &traverse.VisitingDepthFirst{
EdgeFilter: func(edge graph.Edge) bool {

View File

@ -222,6 +222,7 @@ func TestAuthorizer(t *testing.T) {
func TestAuthorizerSharedResources(t *testing.T) {
g := NewGraph()
g.destinationEdgeThreshold = 1
identifier := nodeidentifier.NewDefaultNodeIdentifier()
authz := NewAuthorizer(g, identifier, bootstrappolicy.NodeRules())
@ -250,7 +251,8 @@ func TestAuthorizerSharedResources(t *testing.T) {
},
},
})
g.AddPod(&api.Pod{
pod3 := &api.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pod3-node3", Namespace: "ns1"},
Spec: api.PodSpec{
NodeName: "node3",
@ -258,7 +260,8 @@ func TestAuthorizerSharedResources(t *testing.T) {
{VolumeSource: api.VolumeSource{Secret: &api.SecretVolumeSource{SecretName: "shared-all"}}},
},
},
})
}
g.AddPod(pod3)
g.SetNodeConfigMap("node1", "shared-configmap", "ns1")
g.SetNodeConfigMap("node2", "shared-configmap", "ns1")
@ -318,6 +321,30 @@ func TestAuthorizerSharedResources(t *testing.T) {
t.Errorf("%d: expected %v, got %v", i, tc.ExpectAllowed, decision)
}
}
{
node3SharedSecretGet := authorizer.AttributesRecord{User: node3, ResourceRequest: true, Verb: "get", Resource: "secrets", Namespace: "ns1", Name: "shared-all"}
decision, _, err := authz.Authorize(node3SharedSecretGet)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if decision != authorizer.DecisionAllow {
t.Error("expected allowed")
}
// should trigger recalculation of the shared secret index
pod3.Spec.Volumes = nil
g.AddPod(pod3)
decision, _, err = authz.Authorize(node3SharedSecretGet)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if decision == authorizer.DecisionAllow {
t.Errorf("unexpectedly allowed")
}
}
}
type sampleDataOpts struct {
@ -403,6 +430,39 @@ func BenchmarkPopulationRetention(b *testing.B) {
}
}
func BenchmarkWriteIndexMaintenance(b *testing.B) {
// Run with:
// go test ./plugin/pkg/auth/authorizer/node -benchmem -bench BenchmarkWriteIndexMaintenance -run None
opts := sampleDataOpts{
// simulate high replication in a small number of namespaces:
nodes: 5000,
namespaces: 1,
podsPerNode: 1,
attachmentsPerNode: 20,
sharedConfigMapsPerPod: 0,
uniqueConfigMapsPerPod: 1,
sharedSecretsPerPod: 1,
uniqueSecretsPerPod: 1,
sharedPVCsPerPod: 0,
uniquePVCsPerPod: 1,
}
nodes, pods, pvs, attachments := generate(opts)
g := NewGraph()
populate(g, nodes, pods, pvs, attachments)
// Garbage collect before the first iteration
runtime.GC()
b.ResetTimer()
b.SetParallelism(100)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
g.AddPod(pods[0])
}
})
}
func BenchmarkAuthorization(b *testing.B) {
g := NewGraph()