Merge pull request #87696 from liggitt/node2

Switch node authorizer indexes to reference counts, add fastpath edge removal
This commit is contained in:
Kubernetes Prow Robot 2020-02-10 12:45:54 -08:00 committed by GitHub
commit 7a506ff342
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 100 deletions

View File

@ -175,7 +175,7 @@ func (g *Graph) deleteVertex_locked(vertexType vertexType, namespace, name strin
// find existing neighbors with a single edge (meaning we are their only neighbor) // find existing neighbors with a single edge (meaning we are their only neighbor)
neighborsToRemove := []graph.Node{} neighborsToRemove := []graph.Node{}
neighborsToRecompute := []graph.Node{} edgesToRemoveFromIndexes := []graph.Edge{}
g.graph.VisitFrom(vertex, func(neighbor graph.Node) bool { g.graph.VisitFrom(vertex, func(neighbor graph.Node) bool {
// this downstream neighbor has only one edge (which must be from us), so remove them as well // this downstream neighbor has only one edge (which must be from us), so remove them as well
if g.graph.Degree(neighbor) == 1 { if g.graph.Degree(neighbor) == 1 {
@ -188,8 +188,8 @@ func (g *Graph) deleteVertex_locked(vertexType vertexType, namespace, name strin
// this upstream neighbor has only one edge (which must be to us), so remove them as well // this upstream neighbor has only one edge (which must be to us), so remove them as well
neighborsToRemove = append(neighborsToRemove, neighbor) neighborsToRemove = append(neighborsToRemove, neighbor)
} else { } else {
// recompute the destination edge index on this neighbor // decrement the destination edge index on this neighbor if the edge between us was a destination edge
neighborsToRecompute = append(neighborsToRecompute, neighbor) edgesToRemoveFromIndexes = append(edgesToRemoveFromIndexes, g.graph.EdgeBetween(vertex, neighbor))
} }
return true return true
}) })
@ -202,9 +202,9 @@ func (g *Graph) deleteVertex_locked(vertexType vertexType, namespace, name strin
g.removeVertex_locked(neighbor.(*namedVertex)) g.removeVertex_locked(neighbor.(*namedVertex))
} }
// recompute destination indexes for neighbors that dropped outbound edges // remove edges from destination indexes for neighbors that dropped outbound edges
for _, neighbor := range neighborsToRecompute { for _, edge := range edgesToRemoveFromIndexes {
g.recomputeDestinationIndex_locked(neighbor) g.removeEdgeFromDestinationIndex_locked(edge)
} }
} }
@ -220,19 +220,17 @@ func (g *Graph) deleteEdges_locked(fromType, toType vertexType, toNamespace, toN
// delete all edges between vertices of fromType and toVert // delete all edges between vertices of fromType and toVert
neighborsToRemove := []*namedVertex{} neighborsToRemove := []*namedVertex{}
neighborsToRecompute := []*namedVertex{} edgesToRemove := []graph.Edge{}
g.graph.VisitTo(toVert, func(from graph.Node) bool { g.graph.VisitTo(toVert, func(from graph.Node) bool {
fromVert := from.(*namedVertex) fromVert := from.(*namedVertex)
if fromVert.vertexType != fromType { if fromVert.vertexType != fromType {
return true return true
} }
// remove the edge // this neighbor has only one edge (which must be to us), so remove them as well
g.graph.RemoveEdge(simple.Edge{F: fromVert, T: toVert}) if g.graph.Degree(fromVert) == 1 {
// track vertexes that changed edges
if g.graph.Degree(fromVert) == 0 {
neighborsToRemove = append(neighborsToRemove, fromVert) neighborsToRemove = append(neighborsToRemove, fromVert)
} else { } else {
neighborsToRecompute = append(neighborsToRecompute, fromVert) edgesToRemove = append(edgesToRemove, g.graph.EdgeBetween(from, toVert))
} }
return true return true
}) })
@ -242,9 +240,30 @@ func (g *Graph) deleteEdges_locked(fromType, toType vertexType, toNamespace, toN
g.removeVertex_locked(v) g.removeVertex_locked(v)
} }
// recompute destination indexes for neighbors that dropped outbound edges // remove edges and decrement destination indexes for neighbors that dropped outbound edges
for _, v := range neighborsToRecompute { for _, edge := range edgesToRemove {
g.recomputeDestinationIndex_locked(v) g.graph.RemoveEdge(edge)
g.removeEdgeFromDestinationIndex_locked(edge)
}
}
// A fastpath for recomputeDestinationIndex_locked for "removing edge" case.
func (g *Graph) removeEdgeFromDestinationIndex_locked(e graph.Edge) {
n := e.From()
// don't maintain indices for nodes with few edges
edgeCount := g.graph.Degree(n)
if edgeCount < g.destinationEdgeThreshold {
delete(g.destinationEdgeIndex, n.ID())
return
}
// decrement the nodeID->destinationID refcount in the index, if the index exists
index := g.destinationEdgeIndex[n.ID()]
if index == nil {
return
}
if destinationEdge, ok := e.(*destinationEdge); ok {
index.decrement(destinationEdge.DestinationID())
} }
} }
@ -259,7 +278,7 @@ func (g *Graph) addEdgeToDestinationIndex_locked(e graph.Edge) {
} }
// fast-add the new edge to an existing index // fast-add the new edge to an existing index
if destinationEdge, ok := e.(*destinationEdge); ok { if destinationEdge, ok := e.(*destinationEdge); ok {
index.mark(destinationEdge.DestinationID()) index.increment(destinationEdge.DestinationID())
} }
} }
@ -290,25 +309,17 @@ func (g *Graph) recomputeDestinationIndex_locked(n graph.Node) {
if index == nil { if index == nil {
index = newIntSet() index = newIntSet()
} else { } else {
index.startNewGeneration() index.reset()
} }
// populate the index // populate the index
g.graph.VisitFrom(n, func(dest graph.Node) bool { g.graph.VisitFrom(n, func(dest graph.Node) bool {
if destinationEdge, ok := g.graph.EdgeBetween(n, dest).(*destinationEdge); ok { if destinationEdge, ok := g.graph.EdgeBetween(n, dest).(*destinationEdge); ok {
index.mark(destinationEdge.DestinationID()) index.increment(destinationEdge.DestinationID())
} }
return true return true
}) })
g.destinationEdgeIndex[n.ID()] = index
// remove existing items no longer in the list
index.sweep()
if len(index.members) < g.destinationEdgeThreshold {
delete(g.destinationEdgeIndex, n.ID())
} else {
g.destinationEdgeIndex[n.ID()] = index
}
} }
// AddPod should only be called once spec.NodeName is populated. // AddPod should only be called once spec.NodeName is populated.
@ -451,7 +462,9 @@ func (g *Graph) SetNodeConfigMap(nodeName, configMapName, configMapNamespace str
if len(configMapName) > 0 && len(configMapNamespace) > 0 { if len(configMapName) > 0 && len(configMapNamespace) > 0 {
configmapVertex := g.getOrCreateVertex_locked(configMapVertexType, configMapNamespace, configMapName) configmapVertex := g.getOrCreateVertex_locked(configMapVertexType, configMapNamespace, configMapName)
nodeVertex := g.getOrCreateVertex_locked(nodeVertexType, "", nodeName) nodeVertex := g.getOrCreateVertex_locked(nodeVertexType, "", nodeName)
g.graph.SetEdge(newDestinationEdge(configmapVertex, nodeVertex, nodeVertex)) e := newDestinationEdge(configmapVertex, nodeVertex, nodeVertex)
g.graph.SetEdge(e)
g.addEdgeToDestinationIndex_locked(e)
} }
} }

View File

@ -247,8 +247,8 @@ func TestIndex(t *testing.T) {
actual := map[string][]string{} actual := map[string][]string{}
for from, to := range g.destinationEdgeIndex { for from, to := range g.destinationEdgeIndex {
sortedValues := []string{} sortedValues := []string{}
for member := range to.members { for member, count := range to.members {
sortedValues = append(sortedValues, toString(member)) sortedValues = append(sortedValues, fmt.Sprintf("%s=%d", toString(member), count))
} }
sort.Strings(sortedValues) sort.Strings(sortedValues)
actual[toString(from)] = sortedValues actual[toString(from)] = sortedValues
@ -280,10 +280,10 @@ func TestIndex(t *testing.T) {
"serviceAccount:ns/sa1": {"pod:ns/pod1", "pod:ns/pod2", "pod:ns/pod3"}, "serviceAccount:ns/sa1": {"pod:ns/pod1", "pod:ns/pod2", "pod:ns/pod3"},
}) })
expectIndex(map[string][]string{ expectIndex(map[string][]string{
"configmap:ns/cm1": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm1": {"node:node1=1", "node:node2=1", "node:node3=1"},
"configmap:ns/cm2": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm2": {"node:node1=1", "node:node2=1", "node:node3=1"},
"configmap:ns/cm3": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm3": {"node:node1=1", "node:node2=1", "node:node3=1"},
"serviceAccount:ns/sa1": {"node:node1", "node:node2", "node:node3"}, "serviceAccount:ns/sa1": {"node:node1=1", "node:node2=1", "node:node3=1"},
}) })
// delete one to drop below the threshold // delete one to drop below the threshold
@ -317,10 +317,10 @@ func TestIndex(t *testing.T) {
"serviceAccount:ns/sa1": {"pod:ns/pod1", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "serviceAccount:ns/sa1": {"pod:ns/pod1", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
}) })
expectIndex(map[string][]string{ expectIndex(map[string][]string{
"configmap:ns/cm1": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm1": {"node:node1=2", "node:node2=1", "node:node3=1"},
"configmap:ns/cm2": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm2": {"node:node1=2", "node:node2=1", "node:node3=1"},
"configmap:ns/cm3": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm3": {"node:node1=2", "node:node2=1", "node:node3=1"},
"serviceAccount:ns/sa1": {"node:node1", "node:node2", "node:node3"}, "serviceAccount:ns/sa1": {"node:node1=2", "node:node2=1", "node:node3=1"},
}) })
// delete one to remain above the threshold // delete one to remain above the threshold
@ -338,33 +338,35 @@ func TestIndex(t *testing.T) {
"serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
}) })
expectIndex(map[string][]string{ expectIndex(map[string][]string{
"configmap:ns/cm1": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm1": {"node:node1=1", "node:node2=1", "node:node3=1"},
"configmap:ns/cm2": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm2": {"node:node1=1", "node:node2=1", "node:node3=1"},
"configmap:ns/cm3": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm3": {"node:node1=1", "node:node2=1", "node:node3=1"},
"serviceAccount:ns/sa1": {"node:node1", "node:node2", "node:node3"}, "serviceAccount:ns/sa1": {"node:node1=1", "node:node2=1", "node:node3=1"},
}) })
// Set node->configmap references // Set node->configmap references
g.SetNodeConfigMap("node1", "cm1", "ns") g.SetNodeConfigMap("node1", "cm1", "ns")
g.SetNodeConfigMap("node2", "cm1", "ns") g.SetNodeConfigMap("node2", "cm1", "ns")
g.SetNodeConfigMap("node3", "cm1", "ns") g.SetNodeConfigMap("node3", "cm1", "ns")
g.SetNodeConfigMap("node4", "cm1", "ns")
expectGraph(map[string][]string{ expectGraph(map[string][]string{
"node:node1": {}, "node:node1": {},
"node:node2": {}, "node:node2": {},
"node:node3": {}, "node:node3": {},
"node:node4": {},
"pod:ns/pod2": {"node:node2"}, "pod:ns/pod2": {"node:node2"},
"pod:ns/pod3": {"node:node3"}, "pod:ns/pod3": {"node:node3"},
"pod:ns/pod4": {"node:node1"}, "pod:ns/pod4": {"node:node1"},
"configmap:ns/cm1": {"node:node1", "node:node2", "node:node3", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "configmap:ns/cm1": {"node:node1", "node:node2", "node:node3", "node:node4", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
"configmap:ns/cm2": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "configmap:ns/cm2": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
"configmap:ns/cm3": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "configmap:ns/cm3": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
"serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
}) })
expectIndex(map[string][]string{ expectIndex(map[string][]string{
"configmap:ns/cm1": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm1": {"node:node1=2", "node:node2=2", "node:node3=2", "node:node4=1"},
"configmap:ns/cm2": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm2": {"node:node1=1", "node:node2=1", "node:node3=1"},
"configmap:ns/cm3": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm3": {"node:node1=1", "node:node2=1", "node:node3=1"},
"serviceAccount:ns/sa1": {"node:node1", "node:node2", "node:node3"}, "serviceAccount:ns/sa1": {"node:node1=1", "node:node2=1", "node:node3=1"},
}) })
// Update node->configmap reference // Update node->configmap reference
@ -373,27 +375,30 @@ func TestIndex(t *testing.T) {
"node:node1": {}, "node:node1": {},
"node:node2": {}, "node:node2": {},
"node:node3": {}, "node:node3": {},
"node:node4": {},
"pod:ns/pod2": {"node:node2"}, "pod:ns/pod2": {"node:node2"},
"pod:ns/pod3": {"node:node3"}, "pod:ns/pod3": {"node:node3"},
"pod:ns/pod4": {"node:node1"}, "pod:ns/pod4": {"node:node1"},
"configmap:ns/cm1": {"node:node2", "node:node3", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "configmap:ns/cm1": {"node:node2", "node:node3", "node:node4", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
"configmap:ns/cm2": {"node:node1", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "configmap:ns/cm2": {"node:node1", "pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
"configmap:ns/cm3": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "configmap:ns/cm3": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
"serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
}) })
expectIndex(map[string][]string{ expectIndex(map[string][]string{
"configmap:ns/cm1": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm1": {"node:node1=1", "node:node2=2", "node:node3=2", "node:node4=1"},
"configmap:ns/cm2": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm2": {"node:node1=2", "node:node2=1", "node:node3=1"},
"configmap:ns/cm3": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm3": {"node:node1=1", "node:node2=1", "node:node3=1"},
"serviceAccount:ns/sa1": {"node:node1", "node:node2", "node:node3"}, "serviceAccount:ns/sa1": {"node:node1=1", "node:node2=1", "node:node3=1"},
}) })
// Remove node->configmap reference // Remove node->configmap reference
g.SetNodeConfigMap("node1", "", "") g.SetNodeConfigMap("node1", "", "")
g.SetNodeConfigMap("node4", "", "")
expectGraph(map[string][]string{ expectGraph(map[string][]string{
"node:node1": {}, "node:node1": {},
"node:node2": {}, "node:node2": {},
"node:node3": {}, "node:node3": {},
"node:node4": {},
"pod:ns/pod2": {"node:node2"}, "pod:ns/pod2": {"node:node2"},
"pod:ns/pod3": {"node:node3"}, "pod:ns/pod3": {"node:node3"},
"pod:ns/pod4": {"node:node1"}, "pod:ns/pod4": {"node:node1"},
@ -403,9 +408,9 @@ func TestIndex(t *testing.T) {
"serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"}, "serviceAccount:ns/sa1": {"pod:ns/pod2", "pod:ns/pod3", "pod:ns/pod4"},
}) })
expectIndex(map[string][]string{ expectIndex(map[string][]string{
"configmap:ns/cm1": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm1": {"node:node1=1", "node:node2=2", "node:node3=2"},
"configmap:ns/cm2": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm2": {"node:node1=1", "node:node2=1", "node:node3=1"},
"configmap:ns/cm3": {"node:node1", "node:node2", "node:node3"}, "configmap:ns/cm3": {"node:node1=1", "node:node2=1", "node:node3=1"},
"serviceAccount:ns/sa1": {"node:node1", "node:node2", "node:node3"}, "serviceAccount:ns/sa1": {"node:node1=1", "node:node2=1", "node:node3=1"},
}) })
} }

View File

@ -16,47 +16,47 @@ limitations under the License.
package node package node
// intSet maintains a set of ints, and supports promoting and culling the previous generation. // intSet maintains a map of id to refcounts
// this allows tracking a large, mostly-stable set without constantly reallocating the entire set.
type intSet struct { type intSet struct {
currentGeneration byte // members is a map of id to refcounts
members map[int]byte members map[int]int
} }
func newIntSet() *intSet { func newIntSet() *intSet {
return &intSet{members: map[int]byte{}} return &intSet{members: map[int]int{}}
} }
// has returns true if the specified int is in the set. // has returns true if the specified id has a positive refcount.
// it is safe to call concurrently, but must not be called concurrently with any of the other methods. // it is safe to call concurrently, but must not be called concurrently with any of the other methods.
func (s *intSet) has(i int) bool { func (s *intSet) has(i int) bool {
if s == nil { if s == nil {
return false return false
} }
_, present := s.members[i] return s.members[i] > 0
return present
} }
// startNewGeneration begins a new generation. // reset removes all ids, effectively setting their refcounts to 0.
// it must be followed by a call to mark() for every member of the generation,
// then a call to sweep() to remove members not present in the generation.
// it is not thread-safe. // it is not thread-safe.
func (s *intSet) startNewGeneration() { func (s *intSet) reset() {
s.currentGeneration++ for k := range s.members {
} delete(s.members, k)
}
// mark indicates the specified int belongs to the current generation. }
// it is not thread-safe.
func (s *intSet) mark(i int) { // increment adds one to the refcount of the specified id.
s.members[i] = s.currentGeneration // it is not thread-safe.
} func (s *intSet) increment(i int) {
s.members[i]++
// sweep removes items not in the current generation. }
// it is not thread-safe.
func (s *intSet) sweep() { // decrement removes one from the refcount of the specified id,
for k, v := range s.members { // and removes the id if the resulting refcount is <= 0.
if v != s.currentGeneration { // it will not track refcounts lower than zero.
delete(s.members, k) // it is not thread-safe.
} func (s *intSet) decrement(i int) {
if s.members[i] <= 1 {
delete(s.members, i)
} else {
s.members[i]--
} }
} }

View File

@ -30,33 +30,36 @@ func TestIntSet(t *testing.T) {
assert.False(t, i.has(3)) assert.False(t, i.has(3))
assert.False(t, i.has(4)) assert.False(t, i.has(4))
i.startNewGeneration() i.reset()
i.mark(1) i.increment(1) // to 1
i.mark(2) i.increment(2) // to 1
i.sweep()
assert.True(t, i.has(1)) assert.True(t, i.has(1))
assert.True(t, i.has(2)) assert.True(t, i.has(2))
assert.False(t, i.has(3)) assert.False(t, i.has(3))
assert.False(t, i.has(4)) assert.False(t, i.has(4))
i.startNewGeneration() i.decrement(1) // to 0
i.mark(2) i.increment(3) // to 1
i.mark(3)
i.sweep()
assert.False(t, i.has(1)) assert.False(t, i.has(1)) // removed
assert.True(t, i.has(2)) assert.True(t, i.has(2)) // still present
assert.True(t, i.has(3)) assert.True(t, i.has(3)) // added
assert.False(t, i.has(4)) assert.False(t, i.has(4)) // not yet present
i.startNewGeneration() i.decrement(2) // to 0
i.mark(3) i.increment(3) // to 2
i.mark(4) i.decrement(3) // to 1
i.sweep() i.increment(4) // to 1
assert.False(t, i.has(1)) assert.False(t, i.has(1))
assert.False(t, i.has(2)) assert.False(t, i.has(2))
assert.True(t, i.has(3)) assert.True(t, i.has(3))
assert.True(t, i.has(4)) assert.True(t, i.has(4))
i.reset()
assert.False(t, i.has(1))
assert.False(t, i.has(2))
assert.False(t, i.has(3))
assert.False(t, i.has(4))
} }