mirror of
https://github.com/rancher/dynamiclistener.git
synced 2025-12-07 13:15:04 +00:00
Compare commits
9 Commits
v0.6.2-rc.
...
v0.6.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79b54f39dc | ||
|
|
0e2161b34b | ||
|
|
2ee4a16846 | ||
|
|
12ccb5f265 | ||
|
|
737b624f6b | ||
|
|
246fdcf607 | ||
|
|
1be223c6b0 | ||
|
|
389ebca7c5 | ||
|
|
3f4c7c1f3a |
19
.github/renovate.json
vendored
19
.github/renovate.json
vendored
@@ -5,5 +5,22 @@
|
||||
"baseBranches": [
|
||||
"main", "release/v0.3", "release/v0.4", "release/v0.5"
|
||||
],
|
||||
"prHourlyLimit": 2
|
||||
"prHourlyLimit": 2,
|
||||
"packageRules": [
|
||||
{
|
||||
"matchPackagePatterns": [
|
||||
"k8s.io/*",
|
||||
"sigs.k8s.io/*",
|
||||
"github.com/prometheus/*"
|
||||
],
|
||||
"enabled": false
|
||||
},
|
||||
{
|
||||
"matchPackagePatterns": [
|
||||
"github.com/rancher/wrangler/*"
|
||||
],
|
||||
"matchUpdateTypes": ["major", "minor"],
|
||||
"enabled": false
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
10
.github/workflows/ci.yaml
vendored
10
.github/workflows/ci.yaml
vendored
@@ -10,11 +10,11 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
# https://github.com/actions/checkout/releases/tag/v4.1.1
|
||||
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||
# https://github.com/actions/checkout/releases/tag/VERSION
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
- name: Install Go
|
||||
# https://github.com/actions/setup-go/releases/tag/v5.0.0
|
||||
uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0
|
||||
# https://github.com/actions/setup-go/releases/tag/VERSION
|
||||
uses: actions/setup-go@0aaccfd150d50ccaeb58ebd88d36e91967a5f35b # v5.4.0
|
||||
with:
|
||||
go-version-file: 'go.mod'
|
||||
- run: go test -race -cover ./...
|
||||
- run: go test -v -race -cover ./...
|
||||
|
||||
2
.github/workflows/release.yaml
vendored
2
.github/workflows/release.yaml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
contents: write
|
||||
steps:
|
||||
- name : Checkout repository
|
||||
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
|
||||
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Create release on Github
|
||||
run: |
|
||||
|
||||
@@ -16,9 +16,13 @@ on:
|
||||
schedule:
|
||||
- cron: '30 4,6 * * *'
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
id-token: write
|
||||
|
||||
jobs:
|
||||
call-workflow:
|
||||
uses: rancher/renovate-config/.github/workflows/renovate.yml@release
|
||||
uses: rancher/renovate-config/.github/workflows/renovate-vault.yml@release
|
||||
with:
|
||||
logLevel: ${{ inputs.logLevel || 'info' }}
|
||||
overrideSchedule: ${{ github.event.inputs.overrideSchedule == 'true' && '{''schedule'':null}' || '' }}
|
||||
12
VERSION.md
12
VERSION.md
@@ -2,9 +2,9 @@ DynamicListener follows a pre-release (v0.x) strategy of semver. There is limite
|
||||
|
||||
The current supported release lines are:
|
||||
|
||||
| DynamicListener Branch | DynamicListener Minor version | Kubernetes Version Range |
|
||||
|--------------------------|------------------------------------|------------------------------------------------|
|
||||
| main | v0.6 | v1.27+ |
|
||||
| release/v0.5 | v0.5 | v1.26 - v1.30 |
|
||||
| release/v0.4 | v0.4 | v1.25 - v1.28 |
|
||||
| release/v0.3 | v0.3 | v1.23 - v1.27 |
|
||||
| DynamicListener Branch | DynamicListener Minor version | Kubernetes Version Range | Wrangler Version |
|
||||
|--------------------------|------------------------------------|------------------------------------------------|------------------------------------------------|
|
||||
| main | v0.6 | v1.27+ | v3 |
|
||||
| release/v0.5 | v0.5 | v1.26 - v1.30 | v3 |
|
||||
| release/v0.4 | v0.4 | v1.25 - v1.28 | v2 |
|
||||
| release/v0.3 | v0.3 | v1.23 - v1.27 | v2 |
|
||||
|
||||
23
cert/cert.go
23
cert/cert.go
@@ -73,15 +73,15 @@ func NewPrivateKey() (*rsa.PrivateKey, error) {
|
||||
|
||||
// NewSelfSignedCACert creates a CA certificate
|
||||
func NewSelfSignedCACert(cfg Config, key crypto.Signer) (*x509.Certificate, error) {
|
||||
now := time.Now()
|
||||
notBefore := CalculateNotBefore(nil)
|
||||
tmpl := x509.Certificate{
|
||||
SerialNumber: new(big.Int).SetInt64(0),
|
||||
Subject: pkix.Name{
|
||||
CommonName: cfg.CommonName,
|
||||
Organization: cfg.Organization,
|
||||
},
|
||||
NotBefore: now.UTC(),
|
||||
NotAfter: now.Add(duration365d * 10).UTC(),
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notBefore.Add(duration365d * 10),
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||
BasicConstraintsValid: true,
|
||||
IsCA: true,
|
||||
@@ -125,6 +125,7 @@ func NewSignedCert(cfg Config, key crypto.Signer, caCert *x509.Certificate, caKe
|
||||
}
|
||||
}
|
||||
|
||||
notBefore := CalculateNotBefore(caCert)
|
||||
certTmpl := x509.Certificate{
|
||||
Subject: pkix.Name{
|
||||
CommonName: cfg.CommonName,
|
||||
@@ -133,8 +134,8 @@ func NewSignedCert(cfg Config, key crypto.Signer, caCert *x509.Certificate, caKe
|
||||
DNSNames: cfg.AltNames.DNSNames,
|
||||
IPAddresses: cfg.AltNames.IPs,
|
||||
SerialNumber: serial,
|
||||
NotBefore: caCert.NotBefore,
|
||||
NotAfter: time.Now().Add(expiresAt).UTC(),
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notBefore.Add(expiresAt),
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: cfg.Usages,
|
||||
}
|
||||
@@ -186,8 +187,8 @@ func GenerateSelfSignedCertKey(host string, alternateIPs []net.IP, alternateDNS
|
||||
// <host>_<ip>-<ip>_<alternateDNS>-<alternateDNS>.key
|
||||
// Certs/keys not existing in that directory are created.
|
||||
func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, alternateDNS []string, fixtureDirectory string) ([]byte, []byte, error) {
|
||||
validFrom := time.Now().Add(-time.Hour) // valid an hour earlier to avoid flakes due to clock skew
|
||||
maxAge := time.Hour * 24 * 365 // one year self-signed certs
|
||||
notBefore := CalculateNotBefore(nil)
|
||||
maxAge := time.Hour * 24 * 365 // one year self-signed certs
|
||||
|
||||
baseName := fmt.Sprintf("%s_%s_%s", host, strings.Join(ipsToStrings(alternateIPs), "-"), strings.Join(alternateDNS, "-"))
|
||||
certFixturePath := filepath.Join(fixtureDirectory, baseName+".crt")
|
||||
@@ -214,8 +215,8 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
|
||||
Subject: pkix.Name{
|
||||
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
|
||||
},
|
||||
NotBefore: validFrom,
|
||||
NotAfter: validFrom.Add(maxAge),
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notBefore.Add(maxAge),
|
||||
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||
BasicConstraintsValid: true,
|
||||
@@ -242,8 +243,8 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
|
||||
Subject: pkix.Name{
|
||||
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
|
||||
},
|
||||
NotBefore: validFrom,
|
||||
NotAfter: validFrom.Add(maxAge),
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notBefore.Add(maxAge),
|
||||
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
||||
|
||||
39
cert/validity.go
Normal file
39
cert/validity.go
Normal file
@@ -0,0 +1,39 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cert
|
||||
|
||||
import (
|
||||
"crypto/x509"
|
||||
"time"
|
||||
|
||||
clockutil "k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
var clock clockutil.PassiveClock = &clockutil.RealClock{}
|
||||
|
||||
// CalculateNotBefore calculates a NotBefore time of 1 hour in the past, or the
|
||||
// NotBefore time of the optionally provided *x509.Certificate, whichever is greater.
|
||||
func CalculateNotBefore(ca *x509.Certificate) time.Time {
|
||||
// Subtract 1 hour for clock skew
|
||||
now := clock.Now().UTC().Add(-time.Hour)
|
||||
|
||||
// It makes no sense to return a time before the CA itself is valid.
|
||||
if ca != nil && now.Before(ca.NotBefore) {
|
||||
return ca.NotBefore
|
||||
}
|
||||
return now
|
||||
}
|
||||
77
cert/validity_test.go
Normal file
77
cert/validity_test.go
Normal file
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cert
|
||||
|
||||
import (
|
||||
"crypto/x509"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clocktest "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
func TestCalculateNotBefore(t *testing.T) {
|
||||
baseTime := time.Date(2025, 9, 29, 12, 0, 0, 0, time.UTC)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
ca *x509.Certificate
|
||||
now time.Time
|
||||
expected time.Time
|
||||
}{
|
||||
{
|
||||
name: "nil CA returns 1h ago",
|
||||
ca: nil,
|
||||
now: baseTime,
|
||||
expected: baseTime.Add(-time.Hour),
|
||||
},
|
||||
{
|
||||
name: "CA notBefore before now returns 1h ago",
|
||||
ca: &x509.Certificate{
|
||||
NotBefore: baseTime.Add(-2 * time.Hour),
|
||||
},
|
||||
now: baseTime,
|
||||
expected: baseTime.Add(-time.Hour),
|
||||
},
|
||||
{
|
||||
name: "CA notBefore after now returns CA.NotBefore",
|
||||
ca: &x509.Certificate{
|
||||
NotBefore: baseTime.Add(2 * time.Hour),
|
||||
},
|
||||
now: baseTime,
|
||||
expected: baseTime.Add(2 * time.Hour),
|
||||
},
|
||||
{
|
||||
name: "CA notBefore equal to now returns now",
|
||||
ca: &x509.Certificate{
|
||||
NotBefore: baseTime,
|
||||
},
|
||||
now: baseTime,
|
||||
expected: baseTime,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
clock = clocktest.NewFakePassiveClock(tt.now)
|
||||
result := CalculateNotBefore(tt.ca)
|
||||
if !result.Equal(tt.expected) {
|
||||
t.Errorf("Expected %v, got %v", tt.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/dynamiclistener/cert"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -24,13 +25,13 @@ const (
|
||||
)
|
||||
|
||||
func NewSelfSignedCACert(key crypto.Signer, cn string, org ...string) (*x509.Certificate, error) {
|
||||
now := time.Now()
|
||||
notBefore := cert.CalculateNotBefore(nil)
|
||||
tmpl := x509.Certificate{
|
||||
BasicConstraintsValid: true,
|
||||
IsCA: true,
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||
NotAfter: now.Add(time.Hour * 24 * 365 * 10).UTC(),
|
||||
NotBefore: now.UTC(),
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notBefore.Add(time.Hour * 24 * 365 * 10),
|
||||
SerialNumber: new(big.Int).SetInt64(0),
|
||||
Subject: pkix.Name{
|
||||
CommonName: cn,
|
||||
@@ -55,11 +56,12 @@ func NewSignedClientCert(signer crypto.Signer, caCert *x509.Certificate, caKey c
|
||||
return nil, err
|
||||
}
|
||||
|
||||
notBefore := cert.CalculateNotBefore(caCert)
|
||||
parent := x509.Certificate{
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
NotAfter: time.Now().Add(time.Hour * 24 * 365).UTC(),
|
||||
NotBefore: caCert.NotBefore,
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notBefore.Add(time.Hour * 24 * 365),
|
||||
SerialNumber: serialNumber,
|
||||
Subject: pkix.Name{
|
||||
CommonName: cn,
|
||||
@@ -98,13 +100,14 @@ func NewSignedCert(signer crypto.Signer, caCert *x509.Certificate, caKey crypto.
|
||||
}
|
||||
}
|
||||
|
||||
notBefore := cert.CalculateNotBefore(caCert)
|
||||
parent := x509.Certificate{
|
||||
DNSNames: domains,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
||||
IPAddresses: ips,
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
NotAfter: time.Now().Add(time.Hour * 24 * time.Duration(expirationDays)).UTC(),
|
||||
NotBefore: caCert.NotBefore,
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notBefore.Add(time.Hour * 24 * time.Duration(expirationDays)),
|
||||
SerialNumber: serialNumber,
|
||||
Subject: pkix.Name{
|
||||
CommonName: cn,
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
const (
|
||||
cnPrefix = "listener.cattle.io/cn-"
|
||||
Static = "listener.cattle.io/static"
|
||||
fingerprint = "listener.cattle.io/fingerprint"
|
||||
Fingerprint = "listener.cattle.io/fingerprint"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -189,12 +189,16 @@ func (t *TLS) generateCert(secret *v1.Secret, cn ...string) (*v1.Secret, bool, e
|
||||
secret.Type = v1.SecretTypeTLS
|
||||
secret.Data[v1.TLSCertKey] = certBytes
|
||||
secret.Data[v1.TLSPrivateKeyKey] = keyBytes
|
||||
secret.Annotations[fingerprint] = fmt.Sprintf("SHA1=%X", sha1.Sum(newCert.Raw))
|
||||
secret.Annotations[Fingerprint] = fmt.Sprintf("SHA1=%X", sha1.Sum(newCert.Raw))
|
||||
|
||||
return secret, true, nil
|
||||
}
|
||||
|
||||
func (t *TLS) IsExpired(secret *v1.Secret) bool {
|
||||
if secret == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
certsPem := secret.Data[v1.TLSCertKey]
|
||||
if len(certsPem) == 0 {
|
||||
return false
|
||||
|
||||
2
go.mod
2
go.mod
@@ -12,6 +12,7 @@ require (
|
||||
k8s.io/api v0.32.1
|
||||
k8s.io/apimachinery v0.32.1
|
||||
k8s.io/client-go v0.32.1
|
||||
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -58,7 +59,6 @@ require (
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/klog/v2 v2.130.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
|
||||
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
|
||||
sigs.k8s.io/yaml v1.4.0 // indirect
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -40,7 +42,24 @@ type ListenOpts struct {
|
||||
// Override legacy behavior where server logs written to the application's logrus object
|
||||
// were dropped unless logrus was set to debug-level (such as by launching steve with '--debug').
|
||||
// Setting this to true results in server logs appearing at an ERROR level.
|
||||
DisplayServerLogs bool
|
||||
DisplayServerLogs bool
|
||||
IgnoreTLSHandshakeError bool
|
||||
}
|
||||
|
||||
var TLSHandshakeError = []byte("http: TLS handshake error")
|
||||
|
||||
var _ io.Writer = &TLSErrorDebugger{}
|
||||
|
||||
type TLSErrorDebugger struct{}
|
||||
|
||||
func (t *TLSErrorDebugger) Write(p []byte) (n int, err error) {
|
||||
p = bytes.TrimSpace(p)
|
||||
if bytes.HasPrefix(p, TLSHandshakeError) {
|
||||
logrus.Debug(string(p))
|
||||
} else {
|
||||
logrus.Error(string(p))
|
||||
}
|
||||
return len(p), err
|
||||
}
|
||||
|
||||
func ListenAndServe(ctx context.Context, httpsPort, httpPort int, handler http.Handler, opts *ListenOpts) error {
|
||||
@@ -52,9 +71,15 @@ func ListenAndServe(ctx context.Context, httpsPort, httpPort int, handler http.H
|
||||
if opts.DisplayServerLogs {
|
||||
writer = logger.WriterLevel(logrus.ErrorLevel)
|
||||
}
|
||||
// Otherwise preserve legacy behaviour of displaying server logs only in debug mode.
|
||||
|
||||
errorLog := log.New(writer, "", log.LstdFlags)
|
||||
var errorLog *log.Logger
|
||||
if opts.IgnoreTLSHandshakeError {
|
||||
debugWriter := &TLSErrorDebugger{}
|
||||
errorLog = log.New(debugWriter, "", 0)
|
||||
} else {
|
||||
// Otherwise preserve legacy behaviour of displaying server logs only in debug mode.
|
||||
errorLog = log.New(writer, "", 0)
|
||||
}
|
||||
|
||||
if opts.TLSListenerConfig.TLSConfig == nil {
|
||||
opts.TLSListenerConfig.TLSConfig = &tls.Config{}
|
||||
|
||||
@@ -39,6 +39,47 @@ func (s *safeWriter) Write(p []byte) (n int, err error) {
|
||||
return s.writer.Write(p)
|
||||
}
|
||||
|
||||
func TestTLSHandshakeErrorWriter(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ignoreTLSHandshakeError bool
|
||||
message []byte
|
||||
expectedLevel logrus.Level
|
||||
}{
|
||||
{
|
||||
name: "TLS handshake error is logged as debug",
|
||||
message: []byte("http: TLS handshake error: EOF"),
|
||||
expectedLevel: logrus.DebugLevel,
|
||||
},
|
||||
{
|
||||
name: "other errors are logged as error",
|
||||
message: []byte("some other server error"),
|
||||
expectedLevel: logrus.ErrorLevel,
|
||||
},
|
||||
}
|
||||
var baseLogLevel = logrus.GetLevel()
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
assert := assertPkg.New(t)
|
||||
|
||||
var buf bytes.Buffer
|
||||
logrus.SetOutput(&buf)
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
|
||||
debugger := &TLSErrorDebugger{}
|
||||
n, err := debugger.Write(tt.message)
|
||||
|
||||
assert.Nil(err)
|
||||
assert.Equal(len(tt.message), n)
|
||||
|
||||
logOutput := buf.String()
|
||||
assert.Contains(logOutput, "level="+tt.expectedLevel.String())
|
||||
assert.Contains(logOutput, string(tt.message))
|
||||
})
|
||||
}
|
||||
logrus.SetLevel(baseLogLevel)
|
||||
}
|
||||
|
||||
func TestHttpServerLogWithLogrus(t *testing.T) {
|
||||
assert := assertPkg.New(t)
|
||||
message := "debug-level writer"
|
||||
@@ -84,7 +125,7 @@ func doRequest(safeWriter *safeWriter, message string, logLevel logrus.Level) er
|
||||
msg := fmt.Sprintf("panicking context: %s", message)
|
||||
handler := alwaysPanicHandler{msg: msg}
|
||||
listenOpts := &ListenOpts{
|
||||
BindHost: host,
|
||||
BindHost: host,
|
||||
DisplayServerLogs: logLevel == logrus.ErrorLevel,
|
||||
}
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ func (s *storage) Get() (*v1.Secret, error) {
|
||||
}
|
||||
|
||||
func (s *storage) Update(secret *v1.Secret) error {
|
||||
f, err := os.Create(s.file)
|
||||
f, err := os.OpenFile(s.file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2,33 +2,47 @@ package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"maps"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/dynamiclistener"
|
||||
"github.com/rancher/dynamiclistener/cert"
|
||||
"github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
|
||||
v1controller "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1"
|
||||
"github.com/rancher/wrangler/v3/pkg/start"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
toolswatch "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
type CoreGetter func() *core.Factory
|
||||
|
||||
type storage struct {
|
||||
namespace, name string
|
||||
storage dynamiclistener.TLSStorage
|
||||
secrets v1controller.SecretController
|
||||
tls dynamiclistener.TLSFactory
|
||||
queue workqueue.TypedInterface[string]
|
||||
queuedSecret *v1.Secret
|
||||
}
|
||||
|
||||
func Load(ctx context.Context, secrets v1controller.SecretController, namespace, name string, backing dynamiclistener.TLSStorage) dynamiclistener.TLSStorage {
|
||||
storage := &storage{
|
||||
name: name,
|
||||
namespace: namespace,
|
||||
storage: backing,
|
||||
ctx: ctx,
|
||||
initSync: &sync.Once{},
|
||||
queue: workqueue.NewTyped[string](),
|
||||
}
|
||||
storage.init(secrets)
|
||||
storage.runQueue()
|
||||
storage.init(ctx, secrets)
|
||||
return storage
|
||||
}
|
||||
|
||||
@@ -37,16 +51,16 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
|
||||
name: name,
|
||||
namespace: namespace,
|
||||
storage: backing,
|
||||
ctx: ctx,
|
||||
initSync: &sync.Once{},
|
||||
queue: workqueue.NewTyped[string](),
|
||||
}
|
||||
storage.runQueue()
|
||||
|
||||
// lazy init
|
||||
go func() {
|
||||
wait.PollImmediateUntilWithContext(ctx, time.Second, func(cxt context.Context) (bool, error) {
|
||||
if coreFactory := core(); coreFactory != nil {
|
||||
storage.init(coreFactory.Core().V1().Secret())
|
||||
return true, start.All(ctx, 5, coreFactory)
|
||||
storage.init(ctx, coreFactory.Core().V1().Secret())
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
@@ -55,100 +69,94 @@ func New(ctx context.Context, core CoreGetter, namespace, name string, backing d
|
||||
return storage
|
||||
}
|
||||
|
||||
type storage struct {
|
||||
sync.RWMutex
|
||||
|
||||
namespace, name string
|
||||
storage dynamiclistener.TLSStorage
|
||||
secrets v1controller.SecretController
|
||||
ctx context.Context
|
||||
tls dynamiclistener.TLSFactory
|
||||
initialized bool
|
||||
initSync *sync.Once
|
||||
}
|
||||
|
||||
func (s *storage) SetFactory(tls dynamiclistener.TLSFactory) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.tls = tls
|
||||
}
|
||||
|
||||
func (s *storage) init(secrets v1controller.SecretController) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
secrets.OnChange(s.ctx, "tls-storage", func(key string, secret *v1.Secret) (*v1.Secret, error) {
|
||||
if secret == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if secret.Namespace == s.namespace && secret.Name == s.name {
|
||||
if err := s.Update(secret); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return secret, nil
|
||||
})
|
||||
s.secrets = secrets
|
||||
|
||||
// Asynchronously sync the backing storage to the Kubernetes secret, as doing so inline may
|
||||
// block the listener from accepting new connections if the apiserver becomes unavailable
|
||||
// after the Secrets controller has been initialized. We're not passing around any contexts
|
||||
// here, nor does the controller accept any, so there's no good way to soft-fail with a
|
||||
// reasonable timeout.
|
||||
go s.syncStorage()
|
||||
}
|
||||
|
||||
func (s *storage) syncStorage() {
|
||||
var updateStorage bool
|
||||
secret, err := s.Get()
|
||||
if err == nil && cert.IsValidTLSSecret(secret) {
|
||||
// local storage had a cached secret, ensure that it exists in Kubernetes
|
||||
_, err := s.secrets.Create(&v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: s.name,
|
||||
Namespace: s.namespace,
|
||||
Annotations: secret.Annotations,
|
||||
},
|
||||
Type: v1.SecretTypeTLS,
|
||||
Data: secret.Data,
|
||||
})
|
||||
if err != nil && !errors.IsAlreadyExists(err) {
|
||||
logrus.Warnf("Failed to create Kubernetes secret: %v", err)
|
||||
}
|
||||
} else {
|
||||
// local storage was empty, try to populate it
|
||||
secret, err = s.secrets.Get(s.namespace, s.name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
logrus.Warnf("Failed to init Kubernetes secret: %v", err)
|
||||
}
|
||||
} else {
|
||||
updateStorage = true
|
||||
}
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.initialized = true
|
||||
if updateStorage {
|
||||
if err := s.storage.Update(secret); err != nil {
|
||||
logrus.Warnf("Failed to init backing storage secret: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// always return secret from backing storage
|
||||
func (s *storage) Get() (*v1.Secret, error) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
return s.storage.Get()
|
||||
}
|
||||
|
||||
func (s *storage) targetSecret() (*v1.Secret, error) {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
// sync secret to Kubernetes and backing storage via workqueue
|
||||
func (s *storage) Update(secret *v1.Secret) error {
|
||||
// Asynchronously update the Kubernetes secret, as doing so inline may block the listener from
|
||||
// accepting new connections if the apiserver becomes unavailable after the Secrets controller
|
||||
// has been initialized.
|
||||
s.queuedSecret = secret
|
||||
s.queue.Add(s.name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *storage) SetFactory(tls dynamiclistener.TLSFactory) {
|
||||
s.tls = tls
|
||||
}
|
||||
|
||||
func (s *storage) init(ctx context.Context, secrets v1controller.SecretController) {
|
||||
s.secrets = secrets
|
||||
|
||||
// Watch just the target secret, instead of using a wrangler OnChange handler
|
||||
// which watches all secrets in all namespaces. Changes to the secret
|
||||
// will be sent through the workqueue.
|
||||
go func() {
|
||||
fieldSelector := fields.Set{"metadata.name": s.name}.String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return secrets.List(s.namespace, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return secrets.Watch(s.namespace, options)
|
||||
},
|
||||
}
|
||||
_, _, watch, done := toolswatch.NewIndexerInformerWatcher(lw, &v1.Secret{})
|
||||
|
||||
defer func() {
|
||||
s.queue.ShutDown()
|
||||
watch.Stop()
|
||||
<-done
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ev := <-watch.ResultChan():
|
||||
if secret, ok := ev.Object.(*v1.Secret); ok {
|
||||
s.queuedSecret = secret
|
||||
s.queue.Add(secret.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// enqueue initial sync of the backing secret
|
||||
s.queuedSecret, _ = s.Get()
|
||||
s.queue.Add(s.name)
|
||||
}
|
||||
|
||||
// runQueue starts a goroutine to process secrets updates from the workqueue
|
||||
func (s *storage) runQueue() {
|
||||
go func() {
|
||||
for s.processQueue() {
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// processQueue processes the secret update queue.
|
||||
// The key doesn't actually matter, as we are only handling a single secret with a single worker.
|
||||
func (s *storage) processQueue() bool {
|
||||
key, shutdown := s.queue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
|
||||
defer s.queue.Done(key)
|
||||
if err := s.update(); err != nil {
|
||||
logrus.Errorf("Failed to update Secret %s/%s: %v", s.namespace, s.name, err)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *storage) targetSecret() (*v1.Secret, error) {
|
||||
existingSecret, err := s.secrets.Get(s.namespace, s.name, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
return &v1.Secret{
|
||||
@@ -162,22 +170,16 @@ func (s *storage) targetSecret() (*v1.Secret, error) {
|
||||
return existingSecret, err
|
||||
}
|
||||
|
||||
// saveInK8s handles merging the provided secret with the kubernetes secret.
|
||||
// This includes calling the tls factory to sign a new certificate with the
|
||||
// merged SAN entries, if possible. Note that the provided secret could be
|
||||
// either from Kubernetes due to the secret being changed by another client, or
|
||||
// from the listener trying to add SANs or regenerate the cert.
|
||||
func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
|
||||
if !s.initComplete() {
|
||||
// Start a goroutine to attempt to save the secret later, once init is complete.
|
||||
// If this was already handled by initComplete, it should be a no-op, or at worst get
|
||||
// merged with the Kubernetes secret.
|
||||
go s.initSync.Do(func() {
|
||||
if err := wait.Poll(100*time.Millisecond, 15*time.Minute, func() (bool, error) {
|
||||
if !s.initComplete() {
|
||||
return false, nil
|
||||
}
|
||||
_, err := s.saveInK8s(secret)
|
||||
return true, err
|
||||
}); err != nil {
|
||||
logrus.Errorf("Failed to save TLS secret after controller init: %v", err)
|
||||
}
|
||||
})
|
||||
// secret controller not initialized yet, just return the current secret.
|
||||
// if there is an existing secret in Kubernetes, that will get synced by the
|
||||
// list/watch once the controller is initialized.
|
||||
if s.secrets == nil {
|
||||
return secret, nil
|
||||
}
|
||||
|
||||
@@ -210,58 +212,42 @@ func (s *storage) saveInK8s(secret *v1.Secret) (*v1.Secret, error) {
|
||||
|
||||
// ensure that the merged secret actually contains data before overwriting the existing fields
|
||||
if !cert.IsValidTLSSecret(secret) {
|
||||
logrus.Warnf("Skipping save of TLS secret for %s/%s due to missing certificate data", secret.Namespace, secret.Name)
|
||||
logrus.Warnf("Skipping save of TLS secret for %s/%s due to missing certificate data", s.namespace, s.name)
|
||||
return targetSecret, nil
|
||||
}
|
||||
|
||||
targetSecret.Annotations = secret.Annotations
|
||||
// Any changes to the cert will change the fingerprint annotation, so we can use that
|
||||
// for change detection, and skip updating an existing secret if it has not changed.
|
||||
changed := !maps.Equal(targetSecret.Annotations, secret.Annotations)
|
||||
|
||||
targetSecret.Type = v1.SecretTypeTLS
|
||||
targetSecret.Annotations = secret.Annotations
|
||||
targetSecret.Data = secret.Data
|
||||
|
||||
if targetSecret.UID == "" {
|
||||
logrus.Infof("Creating new TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
|
||||
return s.secrets.Create(targetSecret)
|
||||
} else if changed {
|
||||
logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
|
||||
return s.secrets.Update(targetSecret)
|
||||
}
|
||||
logrus.Infof("Updating TLS secret for %s/%s (count: %d): %v", targetSecret.Namespace, targetSecret.Name, len(targetSecret.Annotations)-1, targetSecret.Annotations)
|
||||
return s.secrets.Update(targetSecret)
|
||||
}
|
||||
|
||||
func (s *storage) Update(secret *v1.Secret) error {
|
||||
// Asynchronously update the Kubernetes secret, as doing so inline may block the listener from
|
||||
// accepting new connections if the apiserver becomes unavailable after the Secrets controller
|
||||
// has been initialized. We're not passing around any contexts here, nor does the controller
|
||||
// accept any, so there's no good way to soft-fail with a reasonable timeout.
|
||||
go func() {
|
||||
if err := s.update(secret); err != nil {
|
||||
logrus.Errorf("Failed to save TLS secret for %s/%s: %v", secret.Namespace, secret.Name, err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
return targetSecret, nil
|
||||
}
|
||||
|
||||
func isConflictOrAlreadyExists(err error) bool {
|
||||
return errors.IsConflict(err) || errors.IsAlreadyExists(err)
|
||||
}
|
||||
|
||||
func (s *storage) update(secret *v1.Secret) (err error) {
|
||||
// update wraps a conflict retry around saveInK8s, which handles merging the
|
||||
// queued secret with the Kubernetes secret. Only after successfully
|
||||
// updating the Kubernetes secret will the backing storage be updated.
|
||||
func (s *storage) update() (err error) {
|
||||
var newSecret *v1.Secret
|
||||
err = retry.OnError(retry.DefaultRetry, isConflictOrAlreadyExists, func() error {
|
||||
newSecret, err = s.saveInK8s(secret)
|
||||
if err := retry.OnError(retry.DefaultRetry, isConflictOrAlreadyExists, func() error {
|
||||
newSecret, err = s.saveInK8s(s.queuedSecret)
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only hold the lock while updating underlying storage
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
return s.storage.Update(newSecret)
|
||||
}
|
||||
|
||||
func (s *storage) initComplete() bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
return s.initialized
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package memory
|
||||
|
||||
import (
|
||||
"github.com/rancher/dynamiclistener"
|
||||
"github.com/rancher/dynamiclistener/factory"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
@@ -32,7 +33,7 @@ func (m *memory) Get() (*v1.Secret, error) {
|
||||
}
|
||||
|
||||
func (m *memory) Update(secret *v1.Secret) error {
|
||||
if m.secret == nil || m.secret.ResourceVersion == "" || m.secret.ResourceVersion != secret.ResourceVersion {
|
||||
if isChanged(m.secret, secret) {
|
||||
if m.storage != nil {
|
||||
if err := m.storage.Update(secret); err != nil {
|
||||
return err
|
||||
@@ -44,3 +45,22 @@ func (m *memory) Update(secret *v1.Secret) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isChanged(old, new *v1.Secret) bool {
|
||||
if new == nil {
|
||||
return false
|
||||
}
|
||||
if old == nil {
|
||||
return true
|
||||
}
|
||||
if old.ResourceVersion == "" {
|
||||
return true
|
||||
}
|
||||
if old.ResourceVersion != new.ResourceVersion {
|
||||
return true
|
||||
}
|
||||
if old.Annotations[factory.Fingerprint] != new.Annotations[factory.Fingerprint] {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user