From 24a8cceb5cf31827640c691c64cddfd46a73b057 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 16 Apr 2015 14:45:57 -0700 Subject: [PATCH] add work queue; test coverage 100% --- pkg/util/workqueue/doc.go | 26 +++++++ pkg/util/workqueue/queue.go | 128 +++++++++++++++++++++++++++++++ pkg/util/workqueue/queue_test.go | 115 +++++++++++++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 pkg/util/workqueue/doc.go create mode 100644 pkg/util/workqueue/queue.go create mode 100644 pkg/util/workqueue/queue_test.go diff --git a/pkg/util/workqueue/doc.go b/pkg/util/workqueue/doc.go new file mode 100644 index 00000000000..53000cfba75 --- /dev/null +++ b/pkg/util/workqueue/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package workqueue provides a simple queue that supports the following +// features: +// * Fair: items processed in the order in which they are added. +// * Stingy: a single item will not be processed multiple times concurrently, +// and if an item is added multiple times before it can be processed, it +// will only be processed once. +// * Multiple consumers and producers. In particular, it is allowed for an +// item to be reenqueued while it is being processed. +// * Shutdown notifications. +package workqueue diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go new file mode 100644 index 00000000000..794e4488f41 --- /dev/null +++ b/pkg/util/workqueue/queue.go @@ -0,0 +1,128 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" +) + +// New constructs a new workqueue (see the package comment). +func New() *Type { + return &Type{ + dirty: set{}, + processing: set{}, + cond: sync.NewCond(&sync.Mutex{}), + } +} + +// Type is a work queue (see the package comment). +type Type struct { + // queue defines the order in which we will work on items. Every + // element of queue should be in the dirty set and not in the + // processing set. + queue []t + + // dirty defines all of the items that need to be processed. + dirty set + + // Things that are currently being processed are in the processing set. + // These things may be simultaneously in the dirty set. When we finish + // processing something and remove it from this set, we'll check if + // it's in the dirty set, and if so, add it to the queue. + processing set + + cond *sync.Cond + + shuttingDown bool +} + +type empty struct{} +type t interface{} +type set map[t]empty + +func (s set) has(item t) bool { + _, exists := s[item] + return exists +} + +func (s set) insert(item t) { + s[item] = empty{} +} + +func (s set) delete(item t) { + delete(s, item) +} + +// Add marks item as needing processing. +func (q *Type) Add(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown { + return + } + if q.dirty.has(item) { + return + } + q.dirty.insert(item) + if q.processing.has(item) { + return + } + q.queue = append(q.queue, item) + q.cond.Signal() +} + +// Get blocks until it can return an item to be processed. If shutdown = true, +// the caller should end their goroutine. You must call Done with item when you +// have finished processing it. +func (q *Type) Get() (item interface{}, shutdown bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + for len(q.queue) == 0 && !q.shuttingDown { + q.cond.Wait() + } + if len(q.queue) == 0 { + // We must be shutting down. + return nil, true + } + item, q.queue = q.queue[0], q.queue[1:] + q.processing.insert(item) + q.dirty.delete(item) + return item, false +} + +// Done marks item as done processing, and if it has been marked as dirty again +// while it was being processed, it will be re-added to the queue for +// re-processing. +func (q *Type) Done(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.processing.delete(item) + if q.dirty.has(item) { + q.queue = append(q.queue, item) + q.cond.Signal() + } +} + +// Shutdown will cause q to ignore all new items added to it. As soon as the +// worker goroutines have drained the existing items in the queue, they will be +// instructed to exit. +func (q *Type) ShutDown() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.shuttingDown = true + q.cond.Broadcast() +} diff --git a/pkg/util/workqueue/queue_test.go b/pkg/util/workqueue/queue_test.go new file mode 100644 index 00000000000..f040764a1ae --- /dev/null +++ b/pkg/util/workqueue/queue_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue_test + +import ( + "sync" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue" +) + +func TestBasic(t *testing.T) { + // If something is seriously wrong this test will never complete. + q := workqueue.New() + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + for j := 0; j < 50; j++ { + q.Add(i) + time.Sleep(time.Millisecond) + } + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + for { + item, quit := q.Get() + if item == "added after shutdown!" { + t.Errorf("Got an item added after shutdown.") + } + if quit { + return + } + t.Logf("Worker %v: begin processing %v", i, item) + time.Sleep(3 * time.Millisecond) + t.Logf("Worker %v: done processing %v", i, item) + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + q.Add("added after shutdown!") + consumerWG.Wait() +} + +func TestAddWhileProcessing(t *testing.T) { + q := workqueue.New() + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + q.Add(i) + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + // Every worker will re-add every item up to two times. + // This tests the dirty-while-processing case. + counters := map[interface{}]int{} + for { + item, quit := q.Get() + if quit { + return + } + counters[item]++ + if counters[item] < 2 { + q.Add(item) + } + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + consumerWG.Wait() +}