Compare commits

...

32 Commits

Author SHA1 Message Date
Kubernetes Publisher
82608c6a81 Update dependencies to v0.26.9 tag 2023-09-13 22:37:52 +00:00
Kubernetes Publisher
7eeeef0f16 Merge pull request #120066 from HirazawaUi/automated-cherry-pick-of-#116506-upstream-release-1.26
Automated cherry pick of #116506: generate ReportingInstance and ReportingController in Event

Kubernetes-commit: dbe3960db953446e03877cda9f4d9e83590ebc30
2023-08-28 04:55:42 -07:00
HirazawaUi
b625a191b9 generate ReportingInstance and ReportingController in Event
Kubernetes-commit: 062aac6deed5c7d8316d0223f610710ac152f305
2023-03-27 22:23:45 +08:00
Kubernetes Publisher
0d6350fa4e Merge pull request #119871 from liggitt/automated-cherry-pick-of-#119835-upstream-release-1.26
Automated cherry pick of #119835: Avoid returning nil responseKind in v1beta1 aggregated

Kubernetes-commit: 81c519fc099227707ac2eb73df0fa34759e08c5d
2023-08-10 16:37:46 +00:00
Jordan Liggitt
7b6e8d8480 Avoid returning nil responseKind in v1beta1 aggregated discovery
Kubernetes-commit: fc529b6d0c93caa5fb5c94dcab80bc8943216f6b
2023-08-08 14:25:56 -04:00
Kubernetes Publisher
ee23718387 Merge pull request #119375 from dgrisonnet/automated-cherry-pick-of-#114237-#114236-#112334-upstream-release-1.26
Automated cherry pick of #114237: tools/events: retry on AlreadyExist for Series
#114236: tools/events: fix data race when emitting series
#112334: events: fix EventSeries starting count discrepancy

Kubernetes-commit: 694c7d3710afaafae8754356d86b35e93bb87658
2023-08-02 15:56:05 +00:00
Kubernetes Publisher
8429124260 Merge pull request #119114 from champtar/automated-cherry-pick-of-#118922-upstream-release-1.26
Automated cherry pick of #118922: kubeadm: backdate generated CAs

Kubernetes-commit: 4cf40e5617f8f368fef7835f6a41e14aa4f91ea2
2023-08-02 12:06:49 +00:00
Etienne Champetier
5d715fe8c7 client-go: allow to set NotBefore in NewSelfSignedCACert()
Signed-off-by: Etienne Champetier <e.champetier@ateme.com>

Kubernetes-commit: a85d04f86172369202efabd86d7b815f8b79c3ff
2023-06-28 00:01:34 -04:00
Kubernetes Publisher
cea5ee961b Merge pull request #118970 from champtar/automated-cherry-pick-of-#117791-upstream-release-1.26
Automated cherry pick of #117791: update serial number to a valid non-zero number in ca

Kubernetes-commit: 76fa65bdfbb2fa4d38606c1c4cf524124240b359
2023-07-05 09:40:58 -07:00
Min Ni
c4c506ff2e update serial number to a valid non-zero number in ca certificate
Kubernetes-commit: 26285c1d6685720002464ce3660b44094568c21d
2023-05-08 16:39:35 -07:00
Kubernetes Publisher
cb4594adb3 Merge pull request #118555 from puerco/bump-1.26-go-1.19.10
[release-1.26] releng/go: Update images, deps and version to go 1.19.10

Kubernetes-commit: afe3fae16cb7c3625bc61796e03d6b3eeced889b
2023-06-12 17:41:53 +00:00
Adolfo García Veytia (Puerco)
5023c62056 update-vendor: update vendored go.sums
Run of ./hack/update-vendor.sh

Signed-off-by: Adolfo García Veytia (Puerco) <adolfo.garcia@uservers.net>

Kubernetes-commit: b204a2b57f12800767b5c06069d518513363dbce
2023-06-08 00:14:59 -06:00
Kubernetes Publisher
a3a549a55a Merge pull request #115051 from MadhavJivrajani/release-1.26
[1.26] Cherry Pick of #114766: [Prepare for go1.20] *: Bump versions and fix tests

Kubernetes-commit: 4dfe380379bc9b3c39f8b041a95d12b4e6ac0cf3
2023-05-23 18:45:31 +00:00
Kubernetes Publisher
038b381bf6 Merge pull request #117691 from dims/re-do-of-117242-on-release-1.26
[1.26] Bump runc/libcontainer to 1.1.6

Kubernetes-commit: 7970b8a1efee7a08e64bb272438871dca021166a
2023-05-04 16:32:59 +00:00
Davanum Srinivas
cd83e43d17 Bump runc go module v1.1.4 -> v1.1.6
Signed-off-by: Davanum Srinivas <davanum@gmail.com>

Kubernetes-commit: 808ce27cae46bd31b2978875069eb7c77af21886
2023-04-29 22:27:41 -04:00
Kubernetes Publisher
dbfbc039f8 Merge pull request #117686 from ardaguclu/automated-cherry-pick-of-#117495-upstream-release-1.26
Automated cherry pick of #117495: Use absolute path instead requestURI in openapiv3 discovery

Kubernetes-commit: 8a1ec6f79b3621043412d227b251e94fcd315493
2023-04-29 18:07:49 +00:00
Arda Güçlü
d72dec4966 Use absolute path instead requestURI in openapiv3 discovery
Currently, openapiv3 discovery uses requestURI to discover resources.
However, that does not work when the rest endpoint contains prefixes
(e.g. `http://localhost/test-endpoint/`).
Because requestURI overwrites prefixes also in rest endpoint
(e.g. `http://localhost/openapiv3/apis/apps/v1`).

Since `absPath` keeps the prefixes in the rest endpoint,
this PR changes to absPath instead requestURI.

Kubernetes-commit: e2bfa0db4441c58f53c64c4e3d9eb1dc494abfd2
2023-04-20 09:53:28 +03:00
Kubernetes Publisher
a5144d412c Merge pull request #117638 from seans3/automated-cherry-pick-of-#117571-origin-release-1.26
Automated cherry pick of #117571: Refactors discovery content-type and helper functions

Kubernetes-commit: 7041b95474bcbf8c612fe2d9f648c1ebd437afc1
2023-04-27 00:14:15 -07:00
Sean Sullivan
d6f8d0460a Refactors discovery content-type and helper functions
Kubernetes-commit: 2a17b5518c6ab9a1da749c930705b9512a480f02
2023-04-24 17:40:07 -07:00
Kubernetes Publisher
2dd0093cac Merge pull request #115899 from odinuge/automated-cherry-pick-of-#115620-upstream-release-1.26
Automated cherry pick of #115620: client-go/cache: fix missing delete event on replace  (+ #116623)

Kubernetes-commit: d9a7f46fc5cac57240eaf531e15eabcacf2a2dc3
2023-04-04 18:29:50 +00:00
Kubernetes Publisher
f3ae5cbd83 Merge pull request #116666 from seans3/automated-cherry-pick-of-#116603-origin-release-1.26
Automated cherry pick of #116603: Aggregated discovery resilient to nil GVK

Kubernetes-commit: 9936142a8268ba1545a258405b7fc50fa728876d
2023-03-30 22:30:52 +00:00
Daniel Smith
fffc68d58e Change where transformers are called.
odinuge: sorted out some function signature changes during
cherry-picking that caused conflicts.

(cherry picked from commit e76dff38cf74c3c8ad9ed4d3bc6e3641d9b64565)
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: a8d2bc0ff7537bcb17e0b85333615dafd7c1e9a9
2023-03-14 23:05:20 +00:00
Sean Sullivan
5ebee1886e Aggregated discovery resilient to nil GVK
Kubernetes-commit: 67e6297764bdfc1377919b14175c3d20d97e639a
2023-03-14 17:59:19 +00:00
Odin Ugedal
8190aa4d37 client-go/cache: update Replace comment to be more clear
Since the behavior is now changed, and the old behavior leaked objects,
this adds a new comment about how Replace works.

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: cd7deae436c328085bcb50681b06e1cc275801db
2023-02-13 11:23:50 +00:00
Odin Ugedal
b667227efd client-go/cache: rewrite Replace to check queue first
This is useful to both reduce the code complexity, and to ensure clients
get the "newest" version of an object known when its deleted. This is
all best-effort, but for clients it makes more sense giving them the
newest object they observed rather than an old one.

This is especially useful when an object is recreated. eg.

Object A with key K is in the KnownObjects store;
- DELETE delta for A is queued with key K
- CREATE delta for B is queued with key K
- Replace without any object with key K in it.

In this situation its better to create a DELETE delta with
DeletedFinalStateUnknown with B (with this patch), than it is to give
the client an DeletedFinalStateUnknown with A (without this patch).

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: 4f55d416f2e6b566eb397670b451d96712e638f1
2023-02-13 11:12:37 +00:00
Odin Ugedal
30215cd5a1 client-go/cache: merge ReplaceMakesDeletionsForObjectsInQueue tests
Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: d7878cdf2d6a7ec82b589aa95fd83770ba3edf2d
2023-02-10 14:30:10 +00:00
Odin Ugedal
ba3596940d client-go/cache: fix missing delete event on replace without knownObjects
This fixes an issue where a relist could result in a DELETED delta
with an object wrapped in a DeletedFinalStateUnknown object; and then on
the next relist, it would wrap that object inside another
DeletedFinalStateUnknown, leaving the user with a "double" layer
of DeletedFinalStateUnknown's.

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: 8509d70d3c33a038f0b5111a5e5696c833f6685b
2023-02-10 14:16:26 +00:00
Odin Ugedal
97cf9cb9c2 client-go/cache: fix missing delete event on replace
This fixes a race condition when a "short lived" object
is created and the create event is still present on the queue
when a relist replaces the state. Previously that would lead in the
object being leaked.

The way this could happen is roughly;

1. new Object is added O, agent gets CREATED event for it
2. watch is terminated, and the agent runs a new list, L
3. CREATE event for O is still on the queue to be processed.
4. informer replaces the old data in store with L, and O is not in L
  - Since O is not in the store, and not in the list L, no DELETED event
    is queued
5. CREATE event for O is still on the queue to be processed.
6. CREATE event for O is processed
7. O is <leaked>; its present in the cache but not in k8s.

With this patch, on step 4. above it would create a DELETED event
ensuring that the object will be removed.

Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>

Kubernetes-commit: bd4ec0acec8844bddc7780d322f8fc215d045046
2023-02-08 14:57:23 +00:00
Madhav Jivrajani
fd7800c967 *: Bump version of vmware/govmomi
Bumping version to include changes that
better handle TLS errors. Bump nescessary
to prepare for when the version of Go is
bumped to 1.20

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>

Kubernetes-commit: 3ac70147ec3de3752b360a06bcb7d7c418619da2
2023-01-13 12:06:22 +05:30
Damien Grisonnet
e7876b900b events: fix EventSeries starting count discrepancy
The kube-apiserver validation expects the Count of an EventSeries to be
at least 2, otherwise it rejects the Event. There was is discrepancy
between the client and the server since the client was iniatizing an
EventSeries to a count of 1.

According to the original KEP, the first event emitted should have an
EventSeries set to nil and the second isomorphic event should have an
EventSeries with a count of 2. Thus, we should matcht the behavior
define by the KEP and update the client.

Also, as an effort to make the old clients compatible with the servers,
we should allow Events with an EventSeries count of 1 to prevent any
unexpected rejections.

Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com>

Kubernetes-commit: 62c9fa8fe6c2e04b1a40970e93055c2e92259b12
2022-09-08 23:50:41 +02:00
Damien Grisonnet
08d548e149 tools/events: fix data race when emitting series
There was a data race in the recordToSink function that caused changes
to the events cache to be overriden if events were emitted
simultaneously via Eventf calls.

The race lies in the fact that when recording an Event, there might be
multiple calls updating the cache simultaneously. The lock period is
optimized so that after updating the cache with the new Event, the lock
is unlocked until the Event is recorded on the apiserver side and then
the cache is locked again to be updated with the new value returned by
the apiserver.

The are a few problem with the approach:

1. If two identical Events are emitted successively the changes of the
   second Event will override the first one. In code the following
   happen:
   1. Eventf(ev1)
   2. Eventf(ev2)
   3. Lock cache
   4. Set cache[getKey(ev1)] = &ev1
   5. Unlock cache
   6. Lock cache
   7. Update cache[getKey(ev2)] = &ev1 + Series{Count: 1}
   8. Unlock cache
   9. Start attempting to record the first event &ev1 on the apiserver side.

   This can be mitigated by recording a copy of the Event stored in
   cache instead of reusing the pointer from the cache.

2. When the Event has been recorded on the apiserver the cache is
   updated again with the value of the Event returned by the server.
   This update will override any changes made to the cache entry when
   attempting to record the new Event since the cache was unlocked at
   that time. This might lead to some inconsistencies when dealing with
   EventSeries since the count may be overriden or the client might even
   try to record the first isomorphic Event multiple time.

   This could be mitigated with a lock that has a larger scope, but we
   shouldn't want to reflect Event returned by the apiserver in the
   cache in the first place since mutation could mess with the
   aggregation by either allowing users to manipulate values to update
   a different cache entry or even having two cache entries for the same
   Events.

Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com>

Kubernetes-commit: cfdd40b569d7630b9b31ddbe0557159b1f8b0f9e
2022-12-01 15:39:34 +01:00
Damien Grisonnet
b1a83534de tools/events: retry on AlreadyExist for Series
When attempting to record a new Event and a new Serie on the apiserver
at the same time, the patch of the Serie might happen before the Event
is actually created. In that case, we handle the error and try to create
the Event. But the Event might be created during that period of time and
it is treated as an error today. So in order to handle that scenario, we
need to retry when a Create call for a Serie results in an AlreadyExist
error.

Signed-off-by: Damien Grisonnet <dgrisonn@redhat.com>

Kubernetes-commit: 92c7042e640e148f54aa112591a4550d5450a132
2022-12-01 15:40:01 +01:00
22 changed files with 1102 additions and 246 deletions

View File

@@ -92,12 +92,18 @@ func convertAPIGroup(g apidiscovery.APIGroupDiscovery) (
resourceList := &metav1.APIResourceList{}
resourceList.GroupVersion = gv.String()
for _, r := range v.Resources {
resource := convertAPIResource(r)
resourceList.APIResources = append(resourceList.APIResources, resource)
resource, err := convertAPIResource(r)
if err == nil {
resourceList.APIResources = append(resourceList.APIResources, resource)
}
// Subresources field in new format get transformed into full APIResources.
// It is possible a partial result with an error was returned to be used
// as the parent resource for the subresource.
for _, subresource := range r.Subresources {
sr := convertAPISubresource(resource, subresource)
resourceList.APIResources = append(resourceList.APIResources, sr)
sr, err := convertAPISubresource(resource, subresource)
if err == nil {
resourceList.APIResources = append(resourceList.APIResources, sr)
}
}
}
gvResources[gv] = resourceList
@@ -105,30 +111,46 @@ func convertAPIGroup(g apidiscovery.APIGroupDiscovery) (
return group, gvResources, failedGVs
}
// convertAPIResource tranforms a APIResourceDiscovery to an APIResource.
func convertAPIResource(in apidiscovery.APIResourceDiscovery) metav1.APIResource {
return metav1.APIResource{
var emptyKind = metav1.GroupVersionKind{}
// convertAPIResource tranforms a APIResourceDiscovery to an APIResource. We are
// resilient to missing GVK, since this resource might be the parent resource
// for a subresource. If the parent is missing a GVK, it is not returned in
// discovery, and the subresource MUST have the GVK.
func convertAPIResource(in apidiscovery.APIResourceDiscovery) (metav1.APIResource, error) {
result := metav1.APIResource{
Name: in.Resource,
SingularName: in.SingularResource,
Namespaced: in.Scope == apidiscovery.ScopeNamespace,
Group: in.ResponseKind.Group,
Version: in.ResponseKind.Version,
Kind: in.ResponseKind.Kind,
Verbs: in.Verbs,
ShortNames: in.ShortNames,
Categories: in.Categories,
}
var err error
if in.ResponseKind != nil && (*in.ResponseKind) != emptyKind {
result.Group = in.ResponseKind.Group
result.Version = in.ResponseKind.Version
result.Kind = in.ResponseKind.Kind
} else {
err = fmt.Errorf("discovery resource %s missing GVK", in.Resource)
}
// Can return partial result with error, which can be the parent for a
// subresource. Do not add this result to the returned discovery resources.
return result, err
}
// convertAPISubresource tranforms a APISubresourceDiscovery to an APIResource.
func convertAPISubresource(parent metav1.APIResource, in apidiscovery.APISubresourceDiscovery) metav1.APIResource {
return metav1.APIResource{
Name: fmt.Sprintf("%s/%s", parent.Name, in.Subresource),
SingularName: parent.SingularName,
Namespaced: parent.Namespaced,
Group: in.ResponseKind.Group,
Version: in.ResponseKind.Version,
Kind: in.ResponseKind.Kind,
Verbs: in.Verbs,
func convertAPISubresource(parent metav1.APIResource, in apidiscovery.APISubresourceDiscovery) (metav1.APIResource, error) {
result := metav1.APIResource{}
if in.ResponseKind == nil || (*in.ResponseKind) == emptyKind {
return result, fmt.Errorf("subresource %s/%s missing GVK", parent.Name, in.Subresource)
}
result.Name = fmt.Sprintf("%s/%s", parent.Name, in.Subresource)
result.SingularName = parent.SingularName
result.Namespaced = parent.Namespaced
result.Group = in.ResponseKind.Group
result.Version = in.ResponseKind.Version
result.Kind = in.ResponseKind.Kind
result.Verbs = in.Verbs
return result, nil
}

View File

@@ -541,6 +541,145 @@ func TestSplitGroupsAndResources(t *testing.T) {
},
expectedFailedGVs: map[schema.GroupVersion]error{},
},
{
name: "Aggregated discovery with single subresource and parent missing GVK",
agg: apidiscovery.APIGroupDiscoveryList{
Items: []apidiscovery.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{
Name: "external.metrics.k8s.io",
},
Versions: []apidiscovery.APIVersionDiscovery{
{
Version: "v1beta1",
Resources: []apidiscovery.APIResourceDiscovery{
{
// resilient to nil GVK for parent
Resource: "*",
Scope: apidiscovery.ScopeNamespace,
SingularResource: "",
Subresources: []apidiscovery.APISubresourceDiscovery{
{
Subresource: "other-external-metric",
ResponseKind: &metav1.GroupVersionKind{
Kind: "MetricValueList",
},
Verbs: []string{"get"},
},
},
},
},
},
},
},
},
},
expectedGroups: metav1.APIGroupList{
Groups: []metav1.APIGroup{
{
Name: "external.metrics.k8s.io",
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: "external.metrics.k8s.io/v1beta1",
Version: "v1beta1",
},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: "external.metrics.k8s.io/v1beta1",
Version: "v1beta1",
},
},
},
},
expectedGVResources: map[schema.GroupVersion]*metav1.APIResourceList{
{Group: "external.metrics.k8s.io", Version: "v1beta1"}: {
GroupVersion: "external.metrics.k8s.io/v1beta1",
APIResources: []metav1.APIResource{
// Since parent GVK was nil, it is NOT returned--only the subresource.
{
Name: "*/other-external-metric",
SingularName: "",
Namespaced: true,
Group: "",
Version: "",
Kind: "MetricValueList",
Verbs: []string{"get"},
},
},
},
},
expectedFailedGVs: map[schema.GroupVersion]error{},
},
{
name: "Aggregated discovery with single subresource and parent empty GVK",
agg: apidiscovery.APIGroupDiscoveryList{
Items: []apidiscovery.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{
Name: "external.metrics.k8s.io",
},
Versions: []apidiscovery.APIVersionDiscovery{
{
Version: "v1beta1",
Resources: []apidiscovery.APIResourceDiscovery{
{
// resilient to empty GVK for parent
Resource: "*",
Scope: apidiscovery.ScopeNamespace,
SingularResource: "",
ResponseKind: &metav1.GroupVersionKind{},
Subresources: []apidiscovery.APISubresourceDiscovery{
{
Subresource: "other-external-metric",
ResponseKind: &metav1.GroupVersionKind{
Kind: "MetricValueList",
},
Verbs: []string{"get"},
},
},
},
},
},
},
},
},
},
expectedGroups: metav1.APIGroupList{
Groups: []metav1.APIGroup{
{
Name: "external.metrics.k8s.io",
Versions: []metav1.GroupVersionForDiscovery{
{
GroupVersion: "external.metrics.k8s.io/v1beta1",
Version: "v1beta1",
},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: "external.metrics.k8s.io/v1beta1",
Version: "v1beta1",
},
},
},
},
expectedGVResources: map[schema.GroupVersion]*metav1.APIResourceList{
{Group: "external.metrics.k8s.io", Version: "v1beta1"}: {
GroupVersion: "external.metrics.k8s.io/v1beta1",
APIResources: []metav1.APIResource{
// Since parent GVK was nil, it is NOT returned--only the subresource.
{
Name: "*/other-external-metric",
SingularName: "",
Namespaced: true,
Group: "",
Version: "",
Kind: "MetricValueList",
Verbs: []string{"get"},
},
},
},
},
expectedFailedGVs: map[schema.GroupVersion]error{},
},
{
name: "Aggregated discovery with multiple subresources",
agg: apidiscovery.APIGroupDiscoveryList{

View File

@@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"mime"
"net/http"
"net/url"
"sort"
@@ -58,8 +59,9 @@ const (
defaultBurst = 300
AcceptV1 = runtime.ContentTypeJSON
// Aggregated discovery content-type (currently v2beta1). NOTE: Currently, we are assuming the order
// for "g", "v", and "as" from the server. We can only compare this string if we can make that assumption.
// Aggregated discovery content-type (v2beta1). NOTE: content-type parameters
// MUST be ordered (g, v, as) for server in "Accept" header (BUT we are resilient
// to ordering when comparing returned values in "Content-Type" header).
AcceptV2Beta1 = runtime.ContentTypeJSON + ";" + "g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList"
// Prioritize aggregated discovery by placing first in the order of discovery accept types.
acceptDiscoveryFormats = AcceptV2Beta1 + "," + AcceptV1
@@ -259,8 +261,16 @@ func (d *DiscoveryClient) downloadLegacy() (
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch responseContentType {
case AcceptV1:
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
// Default is unaggregated discovery v1.
var v metav1.APIVersions
err = json.Unmarshal(body, &v)
if err != nil {
@@ -271,15 +281,6 @@ func (d *DiscoveryClient) downloadLegacy() (
apiGroup = apiVersionsToAPIGroup(&v)
}
apiGroupList.Groups = []metav1.APIGroup{apiGroup}
case AcceptV2Beta1:
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
return nil, nil, nil, fmt.Errorf("Unknown discovery response content-type: %s", responseContentType)
}
return apiGroupList, resourcesByGV, failedGVs, nil
@@ -313,13 +314,8 @@ func (d *DiscoveryClient) downloadAPIs() (
failedGVs := map[schema.GroupVersion]error{}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch responseContentType {
case AcceptV1:
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, nil, err
}
case AcceptV2Beta1:
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
@@ -327,12 +323,38 @@ func (d *DiscoveryClient) downloadAPIs() (
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
return nil, nil, nil, fmt.Errorf("Unknown discovery response content-type: %s", responseContentType)
// Default is unaggregated discovery v1.
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, nil, err
}
}
return apiGroupList, resourcesByGV, failedGVs, nil
}
// isV2Beta1ContentType checks of the content-type string is both
// "application/json" and contains the v2beta1 content-type params.
// NOTE: This function is resilient to the ordering of the
// content-type parameters, as well as parameters added by
// intermediaries such as proxies or gateways. Examples:
//
// "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList" = true
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io" = true
// "application/json; as=APIGroupDiscoveryList;v=v2beta1;g=apidiscovery.k8s.io;charset=utf-8" = true
// "application/json" = false
// "application/json; charset=UTF-8" = false
func isV2Beta1ContentType(contentType string) bool {
base, params, err := mime.ParseMediaType(contentType)
if err != nil {
return false
}
return runtime.ContentTypeJSON == base &&
params["g"] == "apidiscovery.k8s.io" &&
params["v"] == "v2beta1" &&
params["as"] == "APIGroupDiscoveryList"
}
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *DiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {

View File

@@ -1400,8 +1400,9 @@ func TestAggregatedServerGroups(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-Type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(output)
}))
@@ -1990,8 +1991,9 @@ func TestAggregatedServerGroupsAndResources(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(output)
}))
@@ -2130,8 +2132,9 @@ func TestAggregatedServerGroupsAndResourcesWithErrors(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(status)
w.Write(output)
}))
@@ -2738,8 +2741,9 @@ func TestAggregatedServerPreferredResources(t *testing.T) {
}
output, err := json.Marshal(agg)
require.NoError(t, err)
// Content-type is "aggregated" discovery format.
w.Header().Set("Content-Type", AcceptV2Beta1)
// Content-type is "aggregated" discovery format. Add extra parameter
// to ensure we are resilient to these extra parameters.
w.Header().Set("Content-Type", AcceptV2Beta1+"; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write(output)
}))
@@ -2763,6 +2767,58 @@ func TestAggregatedServerPreferredResources(t *testing.T) {
}
}
func TestDiscoveryContentTypeVersion(t *testing.T) {
tests := []struct {
contentType string
isV2Beta1 bool
}{
{
contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList",
isV2Beta1: true,
},
{
// content-type parameters are not in correct order, but comparison ignores order.
contentType: "application/json; v=v2beta1;as=APIGroupDiscoveryList;g=apidiscovery.k8s.io",
isV2Beta1: true,
},
{
// content-type parameters are not in correct order, but comparison ignores order.
contentType: "application/json; as=APIGroupDiscoveryList;g=apidiscovery.k8s.io;v=v2beta1",
isV2Beta1: true,
},
{
// Ignores extra parameter "charset=utf-8"
contentType: "application/json; g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList;charset=utf-8",
isV2Beta1: true,
},
{
contentType: "application/json",
isV2Beta1: false,
},
{
contentType: "application/json; charset=UTF-8",
isV2Beta1: false,
},
{
contentType: "text/json",
isV2Beta1: false,
},
{
contentType: "text/html",
isV2Beta1: false,
},
{
contentType: "",
isV2Beta1: false,
},
}
for _, test := range tests {
isV2Beta1 := isV2Beta1ContentType(test.contentType)
assert.Equal(t, test.isV2Beta1, isV2Beta1)
}
}
func TestUseLegacyDiscovery(t *testing.T) {
// Default client sends aggregated discovery accept format (first) as well as legacy format.
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

18
go.mod
View File

@@ -13,19 +13,19 @@ require (
github.com/google/gnostic v0.5.7-v3refs
github.com/google/go-cmp v0.5.9
github.com/google/gofuzz v1.1.0
github.com/google/uuid v1.1.2
github.com/google/uuid v1.3.0
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
github.com/imdario/mergo v0.3.6
github.com/peterbourgon/diskv v2.0.1+incompatible
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
golang.org/x/net v0.7.0
golang.org/x/net v0.8.0
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b
golang.org/x/term v0.5.0
golang.org/x/term v0.6.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/protobuf v1.28.1
k8s.io/api v0.0.0-20230215103918-5fd8a44fa3de
k8s.io/apimachinery v0.0.0-20230215102121-53ecdf01b997
k8s.io/api v0.26.9
k8s.io/apimachinery v0.26.9
k8s.io/klog/v2 v2.80.1
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280
k8s.io/utils v0.0.0-20221107191617-1a15be271d1d
@@ -49,8 +49,8 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
@@ -59,6 +59,6 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20230215103918-5fd8a44fa3de
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20230215102121-53ecdf01b997
k8s.io/api => k8s.io/api v0.26.9
k8s.io/apimachinery => k8s.io/apimachinery v0.26.9
)

28
go.sum
View File

@@ -128,8 +128,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@@ -263,8 +263,8 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -309,19 +309,19 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -476,10 +476,10 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20230215103918-5fd8a44fa3de h1:qSZiw9Qed4asmzSO2BUK7AC+ZvLJYvnJMPzYyXcg8Gw=
k8s.io/api v0.0.0-20230215103918-5fd8a44fa3de/go.mod h1:hf9uiKf2s1FXvyGkg/nAkwMb35yQaTZrvdCtquvBpAE=
k8s.io/apimachinery v0.0.0-20230215102121-53ecdf01b997 h1:ZbxFKMGLvUrVMRxcfkGr9rVsPCERc+NhrdqpGY2XaCo=
k8s.io/apimachinery v0.0.0-20230215102121-53ecdf01b997/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I=
k8s.io/api v0.26.9 h1:s8Y+G1u2JM55b90+Yo2RVb3PGT/hkWNVPN4idPERxJg=
k8s.io/api v0.26.9/go.mod h1:W/W4fEWRVzPD36820LlVUQfNBiSbiq0VPWRFJKwzmUg=
k8s.io/apimachinery v0.26.9 h1:5yAV9cFR7Z4gIorKcAjWnx4uxtxiFsERwq4Pvmx0CCg=
k8s.io/apimachinery v0.26.9/go.mod h1:qYzLkrQ9lhrZRh0jNKo2cfvf/R1/kQONnSiyB7NUJU0=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E=

View File

@@ -19,6 +19,7 @@ package openapi
import (
"context"
"encoding/json"
"strings"
"k8s.io/client-go/rest"
"k8s.io/kube-openapi/pkg/handler3"
@@ -58,7 +59,11 @@ func (c *client) Paths() (map[string]GroupVersion, error) {
// Create GroupVersions for each element of the result
result := map[string]GroupVersion{}
for k, v := range discoMap.Paths {
result[k] = newGroupVersion(c, v)
// If the server returned a URL rooted at /openapi/v3, preserve any additional client-side prefix.
// If the server returned a URL not rooted at /openapi/v3, treat it as an actual server-relative URL.
// See https://github.com/kubernetes/kubernetes/issues/117463 for details
useClientPrefix := strings.HasPrefix(v.ServerRelativeURL, "/openapi/v3")
result[k] = newGroupVersion(c, v, useClientPrefix)
}
return result, nil
}

View File

@@ -18,6 +18,7 @@ package openapi
import (
"context"
"net/url"
"k8s.io/kube-openapi/pkg/handler3"
)
@@ -29,18 +30,41 @@ type GroupVersion interface {
}
type groupversion struct {
client *client
item handler3.OpenAPIV3DiscoveryGroupVersion
client *client
item handler3.OpenAPIV3DiscoveryGroupVersion
useClientPrefix bool
}
func newGroupVersion(client *client, item handler3.OpenAPIV3DiscoveryGroupVersion) *groupversion {
return &groupversion{client: client, item: item}
func newGroupVersion(client *client, item handler3.OpenAPIV3DiscoveryGroupVersion, useClientPrefix bool) *groupversion {
return &groupversion{client: client, item: item, useClientPrefix: useClientPrefix}
}
func (g *groupversion) Schema(contentType string) ([]byte, error) {
return g.client.restClient.Get().
RequestURI(g.item.ServerRelativeURL).
SetHeader("Accept", contentType).
Do(context.TODO()).
Raw()
if !g.useClientPrefix {
return g.client.restClient.Get().
RequestURI(g.item.ServerRelativeURL).
SetHeader("Accept", contentType).
Do(context.TODO()).
Raw()
}
locator, err := url.Parse(g.item.ServerRelativeURL)
if err != nil {
return nil, err
}
path := g.client.restClient.Get().
AbsPath(locator.Path).
SetHeader("Accept", contentType)
// Other than root endpoints(openapiv3/apis), resources have hash query parameter to support etags.
// However, absPath does not support handling query parameters internally,
// so that hash query parameter is added manually
for k, value := range locator.Query() {
for _, v := range value {
path.Param(k, v)
}
}
return path.Do(context.TODO()).Raw()
}

View File

@@ -0,0 +1,106 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openapi
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
func TestGroupVersion(t *testing.T) {
tests := []struct {
name string
prefix string
serverReturnsPrefix bool
}{
{
name: "no prefix",
prefix: "",
serverReturnsPrefix: false,
},
{
name: "prefix not in discovery",
prefix: "/test-endpoint",
serverReturnsPrefix: false,
},
{
name: "prefix in discovery",
prefix: "/test-endpoint",
serverReturnsPrefix: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == test.prefix+"/openapi/v3/apis/apps/v1" && r.URL.RawQuery == "hash=014fbff9a07c":
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"}}`))
case r.URL.Path == test.prefix+"/openapi/v3":
// return root content
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if test.serverReturnsPrefix {
w.Write([]byte(fmt.Sprintf(`{"paths":{"apis/apps/v1":{"serverRelativeURL":"%s/openapi/v3/apis/apps/v1?hash=014fbff9a07c"}}}`, test.prefix)))
} else {
w.Write([]byte(`{"paths":{"apis/apps/v1":{"serverRelativeURL":"/openapi/v3/apis/apps/v1?hash=014fbff9a07c"}}}`))
}
default:
t.Errorf("unexpected request: %s", r.URL.String())
w.WriteHeader(http.StatusNotFound)
return
}
}))
defer server.Close()
c, err := rest.RESTClientFor(&rest.Config{
Host: server.URL + test.prefix,
ContentConfig: rest.ContentConfig{
NegotiatedSerializer: scheme.Codecs,
GroupVersion: &appsv1.SchemeGroupVersion,
},
})
if err != nil {
t.Fatalf("unexpected error occurred: %v", err)
}
openapiClient := NewClient(c)
paths, err := openapiClient.Paths()
if err != nil {
t.Fatalf("unexpected error occurred: %v", err)
}
schema, err := paths["apis/apps/v1"].Schema(runtime.ContentTypeJSON)
if err != nil {
t.Fatalf("unexpected error occurred: %v", err)
}
expectedResult := `{"openapi":"3.0.0","info":{"title":"Kubernetes","version":"unversioned"}}`
if string(schema) != expectedResult {
t.Fatalf("unexpected result actual: %s expected: %s", string(schema), expectedResult)
}
})
}
}

View File

@@ -353,17 +353,6 @@ func NewIndexerInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
}
// TransformFunc allows for transforming an object before it will be processed
// and put into the controller cache and before the corresponding handlers will
// be called on it.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
// given controller doesn't care for them
type TransformFunc func(interface{}) (interface{}, error)
// NewTransformingInformer returns a Store and a controller for populating
// the store while also providing event notifications. You should only used
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
@@ -411,19 +400,11 @@ func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
@@ -475,6 +456,7 @@ func newInformer(
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transformer,
})
cfg := &Config{
@@ -486,7 +468,7 @@ func newInformer(
Process: func(obj interface{}) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
return processDeltas(h, clientState, deltas)
}
return errors.New("object given as Process argument is not Deltas")
},

View File

@@ -23,7 +23,7 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -32,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
fcache "k8s.io/client-go/tools/cache/testing"
"github.com/google/gofuzz"
fuzz "github.com/google/gofuzz"
)
func Example() {

View File

@@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
// When true, `Replaced` events will be sent for items passed to a Replace() call.
// When false, `Sync` events will be sent instead.
EmitDeltaTypeReplaced bool
// If set, will be called for objects before enqueueing them. Please
// see the comment on TransformFunc for details.
Transformer TransformFunc
}
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@@ -129,8 +133,32 @@ type DeltaFIFO struct {
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
// Called with every object if non-nil.
transformer TransformFunc
}
// TransformFunc allows for transforming an object before it will be processed.
// TransformFunc (similarly to ResourceEventHandler functions) should be able
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
//
// New in v1.27: In such cases, the contained object will already have gone
// through the transform object separately (when it was added / updated prior
// to the delete), so the TransformFunc can likely safely ignore such objects
// (i.e., just return the input object).
//
// The most common usage pattern is to clean-up some parts of the object to
// reduce component memory usage if a given component doesn't care about them.
//
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
// sees the object before any other actor, and it is now safe to mutate the
// object in place instead of making a copy.
//
// Note that TransformFunc is called while inserting objects into the
// notification queue and is therefore extremely performance sensitive; please
// do not do anything that will take a long time.
type TransformFunc func(interface{}) (interface{}, error)
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string
@@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
}
f.cond.L = &f.lock
return f
@@ -411,6 +440,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
// Every object comes through this code path once, so this is a good
// place to call the transform func. If obj is a
// DeletedFinalStateUnknown tombstone, then the containted inner object
// will already have gone through the transformer, but we document that
// this can happen. In cases involving Replace(), such an object can
// come through multiple times.
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
@@ -566,12 +610,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K. If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
// object of K. The pre-existing keys are those in the union set of the keys in
// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
// the one present in the last delta in `f.items`. If there is no delta for K
// in `f.items`, it is the object in `f.knownObjects`
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
f.lock.Lock()
defer f.lock.Unlock()
@@ -595,56 +638,54 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
}
}
if f.knownObjects == nil {
// Do deletion detection against our own list.
queuedDeletions := 0
for k, oldItem := range f.items {
// Do deletion detection against objects in the queue
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
// if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
deletedObj = d.Obj
}
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if f.knownObjects != nil {
// Detect deletions for objects not present in the queue, but present in KnownObjects
knownKeys := f.knownObjects.ListKeys()
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
if len(f.items[k]) > 0 {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = keys.Len() + queuedDeletions
}
return nil
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {

View File

@@ -121,6 +121,130 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
}
}
func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) {
obj := mkFifoObj("foo", 2)
objV2 := mkFifoObj("foo", 3)
table := []struct {
name string
operations func(f *DeltaFIFO)
expectedDeltas Deltas
}{
{
name: "Added object should be deleted on Replace",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Replaced object should have only a single Delete",
operations: func(f *DeltaFIFO) {
f.emitDeltaTypeReplaced = true
f.Add(obj)
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Replaced, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Deleted object should have only a single Delete",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Delete(obj)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, obj},
},
},
{
name: "Synced objects should have a single delete",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{obj}, "0")
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Sync, obj},
{Sync, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Added objects should have a single delete on multiple Replaces",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Replace([]interface{}{}, "0")
f.Replace([]interface{}{}, "1")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
},
},
{
name: "Added and deleted and added object should be deleted",
operations: func(f *DeltaFIFO) {
f.Add(obj)
f.Delete(obj)
f.Add(objV2)
f.Replace([]interface{}{}, "0")
},
expectedDeltas: Deltas{
{Added, obj},
{Deleted, obj},
{Added, objV2},
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}},
},
},
}
for _, tt := range table {
tt := tt
t.Run(tt.name, func(t *testing.T) {
// Test with a DeltaFIFO with a backing KnownObjects
fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{}
}),
})
tt.operations(fWithKnownObjects)
actualDeltasWithKnownObjects := Pop(fWithKnownObjects)
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) {
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects)
}
if len(fWithKnownObjects.items) != 0 {
t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items)
}
// Test with a DeltaFIFO without a backing KnownObjects
fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
})
tt.operations(fWithoutKnownObjects)
actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects)
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) {
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects)
}
if len(fWithoutKnownObjects.items) != 0 {
t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items)
}
})
}
}
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
@@ -203,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
}
}
type rvAndXfrm struct {
rv int
xfrm int
}
func TestDeltaFIFO_transformer(t *testing.T) {
mk := func(name string, rv int) testFifoObject {
return mkFifoObj(name, &rvAndXfrm{rv, 0})
}
xfrm := TransformFunc(func(obj interface{}) (interface{}, error) {
switch v := obj.(type) {
case testFifoObject:
v.val.(*rvAndXfrm).xfrm++
case DeletedFinalStateUnknown:
if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 {
return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj)
}
default:
return nil, fmt.Errorf("unexpected object: %#v", obj)
}
return obj, nil
})
must := func(err error) {
if err != nil {
t.Fatal(err)
}
}
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
Transformer: xfrm,
})
must(f.Add(mk("foo", 10)))
must(f.Add(mk("bar", 11)))
must(f.Update(mk("foo", 12)))
must(f.Delete(mk("foo", 15)))
must(f.Replace([]interface{}{}, ""))
must(f.Add(mk("bar", 16)))
must(f.Replace([]interface{}{}, ""))
// Should be empty
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
for i := 0; i < 2; i++ {
obj, err := f.Pop(func(o interface{}) error { return nil })
if err != nil {
t.Fatalf("got nothing on try %v?", i)
}
obj = obj.(Deltas).Newest().Object
switch v := obj.(type) {
case testFifoObject:
if v.name != "foo" {
t.Errorf("expected regular deletion of foo, got %q", v.name)
}
rx := v.val.(*rvAndXfrm)
if rx.rv != 15 {
t.Errorf("expected last message, got %#v", obj)
}
if rx.xfrm != 1 {
t.Errorf("obj %v transformed wrong number of times.", obj)
}
case DeletedFinalStateUnknown:
tf := v.Obj.(testFifoObject)
rx := tf.val.(*rvAndXfrm)
if tf.name != "bar" {
t.Errorf("expected tombstone deletion of bar, got %q", tf.name)
}
if rx.rv != 16 {
t.Errorf("expected last message, got %#v", obj)
}
if rx.xfrm != 1 {
t.Errorf("tombstoned obj %v transformed wrong number of times.", obj)
}
default:
t.Errorf("unknown item %#v", obj)
}
}
}
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("foo", 10))
@@ -371,7 +577,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
expectedList = []Deltas{
{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
{{Sync, mkFifoObj("foo", 5)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
@@ -385,6 +591,67 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
}
}
// Now try deleting and recreating the object in the queue, then delete it by a Replace call
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
})
f.Delete(mkFifoObj("bar", 6))
f.Add(mkFifoObj("bar", 100))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{
{Deleted, mkFifoObj("bar", 6)},
{Added, mkFifoObj("bar", 100)},
// Since "bar" has a newer object in the queue than in the state,
// it should get a tombstone key with the latest object from the queue
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
},
{{Sync, mkFifoObj("foo", 5)}},
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// Now try syncing it first to ensure the delete use the latest version
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
})
f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0")
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{
{Sync, mkFifoObj("bar", 100)},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
},
{
{Sync, mkFifoObj("foo", 5)},
{Sync, mkFifoObj("foo", 5)},
},
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// Now try starting without an explicit KeyListerGetter
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("baz", 10))

View File

@@ -198,10 +198,7 @@ type SharedInformer interface {
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
// Please see the comment on TransformFunc for more details.
SetTransform(handler TransformFunc) error
// IsStopped reports whether the informer has already been stopped.
@@ -422,6 +419,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
cfg := &Config{
@@ -585,7 +583,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
return processDeltas(s, s.indexer, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}

View File

@@ -395,9 +395,8 @@ func TestSharedInformerTransformer(t *testing.T) {
name := pod.GetName()
if upper := strings.ToUpper(name); upper != name {
copied := pod.DeepCopyObject().(*v1.Pod)
copied.SetName(upper)
return copied, nil
pod.SetName(upper)
return pod, nil
}
}
return obj, nil

View File

@@ -181,22 +181,24 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
return nil
}
isomorphicEvent.Series = &eventsv1.EventSeries{
Count: 1,
Count: 2,
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
}
return isomorphicEvent
// Make a copy of the Event to make sure that recording it
// doesn't mess with the object stored in cache.
return isomorphicEvent.DeepCopy()
}
e.eventCache[eventKey] = eventCopy
return eventCopy
// Make a copy of the Event to make sure that recording it doesn't
// mess with the object stored in cache.
return eventCopy.DeepCopy()
}()
if evToRecord != nil {
recordedEvent := e.attemptRecording(evToRecord)
if recordedEvent != nil {
recordedEventKey := getKey(recordedEvent)
e.mu.Lock()
defer e.mu.Unlock()
e.eventCache[recordedEventKey] = recordedEvent
}
// TODO: Add a metric counting the number of recording attempts
e.attemptRecording(evToRecord)
// We don't want the new recorded Event to be reflected in the
// client's cache because server-side mutations could mess with the
// aggregation mechanism used by the client.
}
}()
}
@@ -248,6 +250,14 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
return nil, false
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
// If we tried to create an Event from an EventSerie, it means that
// the original Patch request failed because the Event we were
// trying to patch didn't exist. If the creation failed because the
// Event now exists, it is safe to retry. This occurs when a new
// Event is emitted twice in a very short period of time.
if isEventSeries {
return nil, true
}
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)

View File

@@ -0,0 +1,103 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
"reflect"
"testing"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/kubernetes/fake"
)
func TestRecordEventToSink(t *testing.T) {
nonIsomorphicEvent := eventsv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: metav1.NamespaceDefault,
},
Series: nil,
}
isomorphicEvent := *nonIsomorphicEvent.DeepCopy()
isomorphicEvent.Series = &eventsv1.EventSeries{Count: 2}
testCases := []struct {
name string
eventsToRecord []eventsv1.Event
expectedRecordedEvent eventsv1.Event
}{
{
name: "record one Event",
eventsToRecord: []eventsv1.Event{
nonIsomorphicEvent,
},
expectedRecordedEvent: nonIsomorphicEvent,
},
{
name: "record one Event followed by an isomorphic one",
eventsToRecord: []eventsv1.Event{
nonIsomorphicEvent,
isomorphicEvent,
},
expectedRecordedEvent: isomorphicEvent,
},
{
name: "record one isomorphic Event before the original",
eventsToRecord: []eventsv1.Event{
isomorphicEvent,
nonIsomorphicEvent,
},
expectedRecordedEvent: isomorphicEvent,
},
{
name: "record one isomorphic Event without one already existing",
eventsToRecord: []eventsv1.Event{
isomorphicEvent,
},
expectedRecordedEvent: isomorphicEvent,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
for _, ev := range tc.eventsToRecord {
recordEvent(eventSink, &ev)
}
recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Errorf("expected to be able to list Events from fake client")
}
if len(recordedEvents.Items) != 1 {
t.Errorf("expected one Event to be recorded, found: %d", len(recordedEvents.Items))
}
recordedEvent := recordedEvents.Items[0]
if !reflect.DeepEqual(recordedEvent, tc.expectedRecordedEvent) {
t.Errorf("expected to have recorded Event: %#+v, got: %#+v\n diff: %s", tc.expectedRecordedEvent, recordedEvent, diff.ObjectReflectDiff(tc.expectedRecordedEvent, recordedEvent))
}
})
}
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package events
import (
"context"
"strconv"
"testing"
"time"
@@ -29,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
fake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
@@ -106,7 +108,7 @@ func TestEventSeriesf(t *testing.T) {
nonIsomorphicEvent := expectedEvent.DeepCopy()
nonIsomorphicEvent.Action = "stopped"
expectedEvent.Series = &eventsv1.EventSeries{Count: 1}
expectedEvent.Series = &eventsv1.EventSeries{Count: 2}
table := []struct {
regarding k8sruntime.Object
related k8sruntime.Object
@@ -185,6 +187,44 @@ func TestEventSeriesf(t *testing.T) {
close(stopCh)
}
// TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to
// an EventSink consecutively there is no data race. This test is meant to be
// run with the `-race` option.
func TestEventSeriesWithEventSinkImplRace(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
eventBroadcaster := NewBroadcaster(eventSink)
stopCh := make(chan struct{})
eventBroadcaster.StartRecordingToSink(stopCh)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "test")
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
recorder.Eventf(&v1.ObjectReference{}, nil, v1.EventTypeNormal, "reason", "action", "", "")
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
events, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
if len(events.Items) != 1 {
return false, nil
}
if events.Items[0].Series == nil {
return false, nil
}
return true, nil
})
if err != nil {
t.Fatal("expected that 2 identical Eventf calls would result in the creation of an Event with a Serie")
}
}
func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) {
recvEvent := *actualEvent

View File

@@ -344,6 +344,9 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
event.ReportingInstance = recorder.source.Host
event.ReportingController = recorder.source.Component
// NOTE: events should be a non-blocking operation, but we also need to not
// put this in a goroutine, otherwise we'll race to write to a closed channel
// when we go to shut down this broadcaster. Just drop events if we get overloaded,

View File

@@ -178,11 +178,12 @@ func TestEventf(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
@@ -205,11 +206,12 @@ func TestEventf(t *testing.T) {
UID: "bar",
APIVersion: "v1",
},
Reason: "Killed",
Message: "some other verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Killed",
Message: "some other verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectUpdate: false,
@@ -233,11 +235,12 @@ func TestEventf(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 2,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
@@ -261,11 +264,12 @@ func TestEventf(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
@@ -289,11 +293,12 @@ func TestEventf(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 3,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
@@ -317,11 +322,12 @@ func TestEventf(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: false,
@@ -345,11 +351,12 @@ func TestEventf(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 2,
Type: v1.EventTypeNormal,
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: true,
@@ -697,11 +704,12 @@ func TestMultiSinkCache(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
@@ -724,11 +732,12 @@ func TestMultiSinkCache(t *testing.T) {
UID: "bar",
APIVersion: "v1",
},
Reason: "Killed",
Message: "some other verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Killed",
Message: "some other verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
expectUpdate: false,
@@ -752,11 +761,12 @@ func TestMultiSinkCache(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 2,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
@@ -780,11 +790,12 @@ func TestMultiSinkCache(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: false,
@@ -808,11 +819,12 @@ func TestMultiSinkCache(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[2]",
},
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 3,
Type: v1.EventTypeNormal,
Reason: "Started",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 3,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
expectUpdate: true,
@@ -836,11 +848,12 @@ func TestMultiSinkCache(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 1,
Type: v1.EventTypeNormal,
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 1,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: false,
@@ -864,11 +877,12 @@ func TestMultiSinkCache(t *testing.T) {
APIVersion: "v1",
FieldPath: "spec.containers[3]",
},
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
Count: 2,
Type: v1.EventTypeNormal,
Reason: "Stopped",
Message: "some verbose message: 1",
Source: v1.EventSource{Component: "eventTest"},
ReportingController: "eventTest",
Count: 2,
Type: v1.EventTypeNormal,
},
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
expectUpdate: true,

View File

@@ -115,6 +115,9 @@ func validateEvent(messagePrefix string, actualEvent *v1.Event, expectedEvent *v
// Temp clear time stamps for comparison because actual values don't matter for comparison
recvEvent.FirstTimestamp = expectedEvent.FirstTimestamp
recvEvent.LastTimestamp = expectedEvent.LastTimestamp
recvEvent.ReportingController = expectedEvent.ReportingController
// Check that name has the right prefix.
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)

View File

@@ -25,6 +25,7 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math"
"math/big"
"net"
"os"
@@ -44,6 +45,7 @@ type Config struct {
Organization []string
AltNames AltNames
Usages []x509.ExtKeyUsage
NotBefore time.Time
}
// AltNames contains the domain names and IP addresses that will be added
@@ -57,14 +59,24 @@ type AltNames struct {
// NewSelfSignedCACert creates a CA certificate
func NewSelfSignedCACert(cfg Config, key crypto.Signer) (*x509.Certificate, error) {
now := time.Now()
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
notBefore := now.UTC()
if !cfg.NotBefore.IsZero() {
notBefore = cfg.NotBefore.UTC()
}
tmpl := x509.Certificate{
SerialNumber: new(big.Int).SetInt64(0),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
},
DNSNames: []string{cfg.CommonName},
NotBefore: now.UTC(),
NotBefore: notBefore,
NotAfter: now.Add(duration365d * 10).UTC(),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
@@ -116,9 +128,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
caTemplate := x509.Certificate{
SerialNumber: big.NewInt(1),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s-ca@%d", host, time.Now().Unix()),
},
@@ -144,9 +161,14 @@ func GenerateSelfSignedCertKeyWithFixtures(host string, alternateIPs []net.IP, a
if err != nil {
return nil, nil, err
}
// returns a uniform random value in [0, max-1), then add 1 to serial to make it a uniform random value in [1, max).
serial, err = cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64-1))
if err != nil {
return nil, nil, err
}
serial = new(big.Int).Add(serial, big.NewInt(1))
template := x509.Certificate{
SerialNumber: big.NewInt(2),
SerialNumber: serial,
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s@%d", host, time.Now().Unix()),
},