diff --git a/staging/src/k8s.io/apiextensions-apiserver/go.sum b/staging/src/k8s.io/apiextensions-apiserver/go.sum index 6f8bd4918ce..ade8ae4f15d 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/go.sum +++ b/staging/src/k8s.io/apiextensions-apiserver/go.sum @@ -393,6 +393,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/staging/src/k8s.io/apiserver/go.mod b/staging/src/k8s.io/apiserver/go.mod index 01b925bab48..483962858a3 100644 --- a/staging/src/k8s.io/apiserver/go.mod +++ b/staging/src/k8s.io/apiserver/go.mod @@ -38,6 +38,7 @@ require ( go.uber.org/zap v1.10.0 golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873 // indirect google.golang.org/grpc v1.23.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/staging/src/k8s.io/apiserver/go.sum b/staging/src/k8s.io/apiserver/go.sum index a9014243b34..5500f75f466 100644 --- a/staging/src/k8s.io/apiserver/go.sum +++ b/staging/src/k8s.io/apiserver/go.sum @@ -295,7 +295,10 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTm golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD index 1f08a3a050f..771634d747a 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/BUILD @@ -18,6 +18,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", "//vendor/github.com/google/uuid:go_default_library", ], ) @@ -32,9 +33,12 @@ go_library( importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/authentication/token/cache", importpath = "k8s.io/apiserver/pkg/authentication/token/cache", deps = [ + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/cache:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", + "//vendor/golang.org/x/sync/singleflight:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go index ef0a8c87215..66a9fc577f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go @@ -22,16 +22,26 @@ import ( "crypto/rand" "crypto/sha256" "encoding/binary" + "errors" "hash" "io" + "runtime" "sync" "time" "unsafe" + "golang.org/x/sync/singleflight" + + apierrors "k8s.io/apimachinery/pkg/api/errors" utilclock "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apiserver/pkg/authentication/authenticator" + "k8s.io/klog" ) +var errAuthnCrash = apierrors.NewInternalError(errors.New("authentication failed unexpectedly")) + +const sharedLookupTimeout = 30 * time.Second + // cacheRecord holds the three return values of the authenticator.Token AuthenticateToken method type cacheRecord struct { resp *authenticator.Response @@ -47,6 +57,7 @@ type cachedTokenAuthenticator struct { failureTTL time.Duration cache cache + group singleflight.Group // hashPool is a per authenticator pool of hash.Hash (to avoid allocations from building the Hash) // HMAC with SHA-256 and a random key is used to prevent precomputation and length extension attacks @@ -98,26 +109,71 @@ func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, // AuthenticateToken implements authenticator.Token func (a *cachedTokenAuthenticator) AuthenticateToken(ctx context.Context, token string) (*authenticator.Response, bool, error) { - auds, _ := authenticator.AudiencesFrom(ctx) + auds, audsOk := authenticator.AudiencesFrom(ctx) key := keyFunc(a.hashPool, auds, token) if record, ok := a.cache.get(key); ok { return record.resp, record.ok, record.err } - resp, ok, err := a.authenticator.AuthenticateToken(ctx, token) - if !a.cacheErrs && err != nil { - return resp, ok, err + type lookup struct { + resp *authenticator.Response + ok bool } - switch { - case ok && a.successTTL > 0: - a.cache.set(key, &cacheRecord{resp: resp, ok: ok, err: err}, a.successTTL) - case !ok && a.failureTTL > 0: - a.cache.set(key, &cacheRecord{resp: resp, ok: ok, err: err}, a.failureTTL) - } + c := a.group.DoChan(key, func() (val interface{}, err error) { + // We're leaving the request handling stack so we need to handle crashes + // ourselves. Log a stack trace and return a 500 if something panics. + defer func() { + if r := recover(); r != nil { + err = errAuthnCrash + // Same as stdlib http server code. Manually allocate stack + // trace buffer size to prevent excessively large logs + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + klog.Errorf("%v\n%s", r, buf) + } + }() - return resp, ok, err + // Check again for a cached record. We may have raced with a fetch. + if record, ok := a.cache.get(key); ok { + return lookup{record.resp, record.ok}, record.err + } + + // Detach the context because the lookup may be shared by multiple callers, + // however propagate the audience. + ctx, cancel := context.WithTimeout(context.Background(), sharedLookupTimeout) + defer cancel() + + if audsOk { + ctx = authenticator.WithAudiences(ctx, auds) + } + + resp, ok, err := a.authenticator.AuthenticateToken(ctx, token) + if !a.cacheErrs && err != nil { + return nil, err + } + + switch { + case ok && a.successTTL > 0: + a.cache.set(key, &cacheRecord{resp: resp, ok: ok, err: err}, a.successTTL) + case !ok && a.failureTTL > 0: + a.cache.set(key, &cacheRecord{resp: resp, ok: ok, err: err}, a.failureTTL) + } + return lookup{resp, ok}, err + }) + + select { + case result := <-c: + if result.Err != nil { + return nil, false, result.Err + } + lookup := result.Val.(lookup) + return lookup.resp, lookup.ok, nil + case <-ctx.Done(): + return nil, false, ctx.Err() + } } // keyFunc generates a string key by hashing the inputs. diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go index c6fb207e3a7..7252a0a24d4 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -30,6 +30,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" utilclock "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/authentication/authenticator" @@ -173,6 +174,106 @@ func BenchmarkKeyFunc(b *testing.B) { }) } +func TestSharedLookup(t *testing.T) { + var chewie = &authenticator.Response{User: &user.DefaultInfo{Name: "chewbacca"}} + + t.Run("actually shared", func(t *testing.T) { + var lookups uint32 + c := make(chan struct{}) + a := New(authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { + <-c + atomic.AddUint32(&lookups, 1) + return chewie, true, nil + }), true, time.Minute, 0) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + a.AuthenticateToken(context.Background(), "") + }() + } + + // no good way to make sure that all the callers are queued so we sleep. + time.Sleep(1 * time.Second) + close(c) + wg.Wait() + + if lookups > 3 { + t.Fatalf("unexpected number of lookups: got=%d, wanted less than 3", lookups) + } + }) + + t.Run("first caller bails, second caller gets result", func(t *testing.T) { + c := make(chan struct{}) + a := New(authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { + <-c + return chewie, true, nil + }), true, time.Minute, 0) + + var wg sync.WaitGroup + wg.Add(2) + + ctx1, cancel1 := context.WithCancel(context.Background()) + go func() { + defer wg.Done() + a.AuthenticateToken(ctx1, "") + }() + + ctx2 := context.Background() + + var ( + resp *authenticator.Response + ok bool + err error + ) + go func() { + defer wg.Done() + resp, ok, err = a.AuthenticateToken(ctx2, "") + }() + + time.Sleep(1 * time.Second) + cancel1() + close(c) + wg.Wait() + + if want := chewie; !cmp.Equal(resp, want) { + t.Errorf("Unexpected diff: %v", cmp.Diff(resp, want)) + } + if !ok { + t.Errorf("Expected ok response") + } + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + }) + + t.Run("lookup panics", func(t *testing.T) { + a := New(authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { + panic("uh oh") + }), true, time.Minute, 0) + + _, _, err := a.AuthenticateToken(context.Background(), "") + if err != errAuthnCrash { + t.Errorf("expected error: %v", err) + } + }) + + t.Run("audiences are forwarded", func(t *testing.T) { + ctx := authenticator.WithAudiences(context.Background(), []string{"a"}) + a := New(authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { + auds, _ := authenticator.AudiencesFrom(ctx) + if got, want := auds, []string{"a"}; cmp.Equal(got, want) { + t.Fatalf("unexpeced audiences: %v", cmp.Diff(got, want)) + } + return nil, false, nil + }), true, time.Minute, 0) + + a.AuthenticateToken(ctx, "") + }) +} + func BenchmarkCachedTokenAuthenticator(b *testing.B) { tokenCount := []int{100, 500, 2500, 12500, 62500} threadCount := []int{1, 16, 256} diff --git a/staging/src/k8s.io/kube-aggregator/go.sum b/staging/src/k8s.io/kube-aggregator/go.sum index bfcd7b4aa8a..66856c451af 100644 --- a/staging/src/k8s.io/kube-aggregator/go.sum +++ b/staging/src/k8s.io/kube-aggregator/go.sum @@ -324,6 +324,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/staging/src/k8s.io/legacy-cloud-providers/go.sum b/staging/src/k8s.io/legacy-cloud-providers/go.sum index 99f76dac0cc..c7582129917 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/go.sum +++ b/staging/src/k8s.io/legacy-cloud-providers/go.sum @@ -286,6 +286,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/staging/src/k8s.io/sample-apiserver/go.sum b/staging/src/k8s.io/sample-apiserver/go.sum index 3ea72ac63e9..de313c3d1c5 100644 --- a/staging/src/k8s.io/sample-apiserver/go.sum +++ b/staging/src/k8s.io/sample-apiserver/go.sum @@ -321,6 +321,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/vendor/BUILD b/vendor/BUILD index 123d9dc10e9..b382ec1a4ac 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -372,6 +372,7 @@ filegroup( "//vendor/golang.org/x/net/websocket:all-srcs", "//vendor/golang.org/x/oauth2:all-srcs", "//vendor/golang.org/x/sync/errgroup:all-srcs", + "//vendor/golang.org/x/sync/singleflight:all-srcs", "//vendor/golang.org/x/sys/cpu:all-srcs", "//vendor/golang.org/x/sys/unix:all-srcs", "//vendor/golang.org/x/sys/windows:all-srcs", diff --git a/vendor/golang.org/x/sync/singleflight/BUILD b/vendor/golang.org/x/sync/singleflight/BUILD new file mode 100644 index 00000000000..036740f471f --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/BUILD @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["singleflight.go"], + importmap = "k8s.io/kubernetes/vendor/golang.org/x/sync/singleflight", + importpath = "golang.org/x/sync/singleflight", + visibility = ["//visibility:public"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 00000000000..97a1aa4bb30 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,120 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import "sync" + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // forgotten indicates whether Forget was called with this call's key + // while the call was still in flight. + forgotten bool + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + c.val, c.err = fn() + c.wg.Done() + + g.mu.Lock() + if !c.forgotten { + delete(g.m, key) + } + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + g.mu.Unlock() +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + if c, ok := g.m[key]; ok { + c.forgotten = true + } + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c9edf8655bd..a890aa0d251 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -886,6 +886,7 @@ golang.org/x/oauth2/jws golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e => golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sync/errgroup +golang.org/x/sync/singleflight # golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 => golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a golang.org/x/sys/cpu golang.org/x/sys/unix