mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-02 01:42:19 +00:00
Merge pull request #73030 from tnozicka/fix-csr-list-watch
Switch WaitForCertificate to informers to avoid broken watches Kubernetes-commit: 1b26097e1e623e8e58ec6d5d1aa5a479957f39c9
This commit is contained in:
commit
2dda7ceeec
@ -40,9 +40,9 @@ import (
|
|||||||
"k8s.io/client-go/util/certificate/csr"
|
"k8s.io/client-go/util/certificate/csr"
|
||||||
)
|
)
|
||||||
|
|
||||||
// certificateWaitBackoff controls the amount and timing of retries when the
|
// certificateWaitTimeout controls the amount of time we wait for certificate
|
||||||
// watch for certificate approval is interrupted.
|
// approval in one iteration.
|
||||||
var certificateWaitBackoff = wait.Backoff{Duration: 30 * time.Second, Steps: 4, Factor: 1.5, Jitter: 0.1}
|
var certificateWaitTimeout = 15 * time.Minute
|
||||||
|
|
||||||
// Manager maintains and updates the certificates in use by this certificate
|
// Manager maintains and updates the certificates in use by this certificate
|
||||||
// manager. In the background it communicates with the API server to get new
|
// manager. In the background it communicates with the API server to get new
|
||||||
@ -388,29 +388,10 @@ func (m *manager) rotateCerts() (bool, error) {
|
|||||||
// Once we've successfully submitted a CSR for this template, record that we did so
|
// Once we've successfully submitted a CSR for this template, record that we did so
|
||||||
m.setLastRequest(template)
|
m.setLastRequest(template)
|
||||||
|
|
||||||
// Wait for the certificate to be signed. Instead of one long watch, we retry with slightly longer
|
// Wait for the certificate to be signed. This interface and internal timout
|
||||||
// intervals each time in order to tolerate failures from the server AND to preserve the liveliness
|
// is a remainder after the old design using raw watch wrapped with backoff.
|
||||||
// of the cert manager loop. This creates slightly more traffic against the API server in return
|
crtPEM, err := csr.WaitForCertificate(client, req, certificateWaitTimeout)
|
||||||
// for bounding the amount of time we wait when a certificate expires.
|
if err != nil {
|
||||||
var crtPEM []byte
|
|
||||||
watchDuration := time.Minute
|
|
||||||
if err := wait.ExponentialBackoff(certificateWaitBackoff, func() (bool, error) {
|
|
||||||
data, err := csr.WaitForCertificate(client, req, watchDuration)
|
|
||||||
switch {
|
|
||||||
case err == nil:
|
|
||||||
crtPEM = data
|
|
||||||
return true, nil
|
|
||||||
case err == wait.ErrWaitTimeout:
|
|
||||||
watchDuration += time.Minute
|
|
||||||
if watchDuration > 5*time.Minute {
|
|
||||||
watchDuration = 5 * time.Minute
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
default:
|
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to check certificate signing status: %v", err))
|
|
||||||
return false, m.updateServerError(err)
|
|
||||||
}
|
|
||||||
}); err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
|
utilruntime.HandleError(fmt.Errorf("Certificate request was not signed: %v", err))
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,6 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
watch "k8s.io/apimachinery/pkg/watch"
|
watch "k8s.io/apimachinery/pkg/watch"
|
||||||
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
|
||||||
)
|
)
|
||||||
@ -433,7 +432,8 @@ func TestRotateCertWaitingForResultError(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
certificateWaitBackoff = wait.Backoff{Steps: 1}
|
defer func(t time.Duration) { certificateWaitTimeout = t }(certificateWaitTimeout)
|
||||||
|
certificateWaitTimeout = 1 * time.Millisecond
|
||||||
if success, err := m.rotateCerts(); success {
|
if success, err := m.rotateCerts(); success {
|
||||||
t.Errorf("Got success from 'rotateCerts', wanted failure.")
|
t.Errorf("Got success from 'rotateCerts', wanted failure.")
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -880,15 +880,6 @@ func TestServerHealth(t *testing.T) {
|
|||||||
expectRotateFail: true,
|
expectRotateFail: true,
|
||||||
expectHealthy: true,
|
expectHealthy: true,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
description: "Conflict error on watch",
|
|
||||||
certs: currentCerts,
|
|
||||||
|
|
||||||
failureType: watchError,
|
|
||||||
clientErr: errors.NewGenericServerResponse(409, "POST", schema.GroupResource{}, "", "", 0, false),
|
|
||||||
expectRotateFail: true,
|
|
||||||
expectHealthy: false,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package csr
|
package csr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto"
|
"crypto"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
@ -83,19 +84,23 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
|
|||||||
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
|
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
|
||||||
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
|
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
|
||||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
|
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
|
||||||
|
lw := &cache.ListWatch{
|
||||||
event, err := watchtools.ListWatchUntil(
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
timeout,
|
options.FieldSelector = fieldSelector
|
||||||
&cache.ListWatch{
|
return client.List(options)
|
||||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
||||||
options.FieldSelector = fieldSelector
|
|
||||||
return client.List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
||||||
options.FieldSelector = fieldSelector
|
|
||||||
return client.Watch(options)
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
options.FieldSelector = fieldSelector
|
||||||
|
return client.Watch(options)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||||
|
defer cancel()
|
||||||
|
event, err := watchtools.UntilWithSync(
|
||||||
|
ctx,
|
||||||
|
lw,
|
||||||
|
&certificates.CertificateSigningRequest{},
|
||||||
|
nil,
|
||||||
func(event watch.Event) (bool, error) {
|
func(event watch.Event) (bool, error) {
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case watch.Modified, watch.Added:
|
case watch.Modified, watch.Added:
|
||||||
|
Loading…
Reference in New Issue
Block a user