1
0
mirror of https://github.com/rancher/norman.git synced 2025-09-10 19:49:01 +00:00

Add support for k8s 1.12

This commit is contained in:
Darren Shepherd
2018-10-09 19:27:02 -07:00
parent c580ac3ab0
commit 611d7cb0f1
6 changed files with 139 additions and 14 deletions

View File

@@ -7,9 +7,9 @@ import (
"context"
"github.com/rancher/norman/objectclient"
"github.com/rancher/norman/objectclient/dynamic"
"github.com/rancher/norman/controller"
"github.com/rancher/norman/restwatch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
@@ -30,8 +30,7 @@ type Client struct {
func NewForConfig(config rest.Config) (Interface, error) {
if config.NegotiatedSerializer == nil {
configConfig := dynamic.ContentConfig()
config.NegotiatedSerializer = configConfig.NegotiatedSerializer
config.NegotiatedSerializer = dynamic.NegotiatedSerializer
}
restClient, err := restwatch.UnversionedRESTClientFor(&config)

70
leader12/leader.go Normal file
View File

@@ -0,0 +1,70 @@
package leader
import (
"context"
"os"
"time"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
type Callback func(cb context.Context)
func RunOrDie(ctx context.Context, name string, client kubernetes.Interface, cb Callback) {
err := run(ctx, name, client, cb)
if err != nil {
logrus.Fatalf("Failed to start leader election for %s", name)
}
panic("Failed to start leader election for " + name)
}
func run(ctx context.Context, name string, client kubernetes.Interface, cb Callback) error {
id, err := os.Hostname()
if err != nil {
return err
}
recorder := createRecorder(name, client)
rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock,
"kube-system",
name,
client.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
logrus.Fatalf("error creating leader lock for %s: %v", name, err)
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
go cb(ctx)
},
OnStoppedLeading: func() {
logrus.Fatalf("leaderelection lost for %s", name)
},
},
})
panic("unreachable")
}
func createRecorder(name string, kubeClient kubernetes.Interface) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Debugf)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
return eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: name})
}

View File

@@ -0,0 +1,58 @@
package dynamic
import (
ejson "encoding/json"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var (
NegotiatedSerializer = negotiatedSerializer{}
)
type negotiatedSerializer struct{}
func (s negotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{
{
MediaType: "application/json",
EncodesAsText: true,
Serializer: dynamicCodec{
Encoder: unstructured.UnstructuredJSONScheme,
},
},
}
}
func (s negotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return encoder
}
func (s negotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return decoder
}
type dynamicCodec struct {
runtime.Encoder
}
func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj)
if err != nil {
return nil, nil, err
}
if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" {
obj = &metav1.Status{}
err := ejson.Unmarshal(data, obj)
if err != nil {
return nil, nil, err
}
}
return obj, gvk, nil
}

View File

@@ -16,7 +16,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
restclientwatch "k8s.io/client-go/rest/watch"
)
@@ -138,7 +137,7 @@ func (p *ObjectClient) GetNamespaced(namespace, name string, opts metav1.GetOpti
}
err := req.
Resource(p.resource.Name).
VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback).
VersionedParams(&opts, metav1.ParameterCodec).
Name(name).
Do().
Into(result)
@@ -153,7 +152,7 @@ func (p *ObjectClient) Get(name string, opts metav1.GetOptions) (runtime.Object,
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version).
NamespaceIfScoped(p.ns, p.resource.Namespaced).
Resource(p.resource.Name).
VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback).
VersionedParams(&opts, metav1.ParameterCodec).
Name(name).
Do().
Into(result)
@@ -215,7 +214,7 @@ func (p *ObjectClient) List(opts metav1.ListOptions) (runtime.Object, error) {
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version).
NamespaceIfScoped(p.ns, p.resource.Namespaced).
Resource(p.resource.Name).
VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback).
VersionedParams(&opts, metav1.ParameterCodec).
Do().
Into(result)
}
@@ -231,7 +230,7 @@ func (p *ObjectClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
Prefix("watch").
NamespaceIfScoped(p.ns, p.resource.Namespaced).
Resource(p.resource.Name).
VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback).
VersionedParams(&opts, metav1.ParameterCodec).
Stream()
if err != nil {
return nil, err
@@ -279,7 +278,7 @@ func (p *ObjectClient) DeleteCollection(deleteOptions *metav1.DeleteOptions, lis
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version).
NamespaceIfScoped(p.ns, p.resource.Namespaced).
Resource(p.resource.Name).
VersionedParams(&listOptions, dynamic.VersionedParameterEncoderWithV1Fallback).
VersionedParams(&listOptions, metav1.ParameterCodec).
Body(deleteOptions).
Do().
Error()

View File

@@ -5,7 +5,7 @@ cd $(dirname $0)/..
echo Running validation
PACKAGES="$(find -name '*.go' | xargs -I{} dirname {} | cut -f2 -d/ | sort -u | grep -Ev '(^\.$|.git|.trash-cache|vendor|bin)' | sed -e 's!^!./!' -e 's!$!/...!')"
PACKAGES="$(find -name '*.go' | xargs -I{} dirname {} | cut -f2 -d/ | sort -u | grep -Ev '(^\.$|.git|.trash-cache|vendor|bin|leader12)' | sed -e 's!^!./!' -e 's!$!/...!')"
echo Running: go vet
go vet ${PACKAGES}

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/rancher/norman/httperror"
"github.com/rancher/norman/objectclient/dynamic"
"github.com/rancher/norman/pkg/broadcast"
"github.com/rancher/norman/restwatch"
"github.com/rancher/norman/types"
@@ -25,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
restclientwatch "k8s.io/client-go/rest/watch"
)
@@ -52,8 +52,7 @@ type simpleClientGetter struct {
func NewClientGetterFromConfig(config rest.Config) (ClientGetter, error) {
dynamicConfig := config
if dynamicConfig.NegotiatedSerializer == nil {
configConfig := dynamic.ContentConfig()
dynamicConfig.NegotiatedSerializer = configConfig.NegotiatedSerializer
dynamicConfig.NegotiatedSerializer = dynamic.NegotiatedSerializer
}
unversionedClient, err := rest.UnversionedRESTClientFor(&dynamicConfig)
@@ -232,7 +231,7 @@ func (s *Store) realWatch(apiContext *types.APIContext, schema *types.Schema, op
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: "0",
}, dynamic.VersionedParameterEncoderWithV1Fallback)
}, metav1.ParameterCodec)
body, err := req.Stream()
if err != nil {