diff --git a/generator/k8s_client_template.go b/generator/k8s_client_template.go index 323823b3..05311829 100644 --- a/generator/k8s_client_template.go +++ b/generator/k8s_client_template.go @@ -7,9 +7,9 @@ import ( "context" "github.com/rancher/norman/objectclient" + "github.com/rancher/norman/objectclient/dynamic" "github.com/rancher/norman/controller" "github.com/rancher/norman/restwatch" - "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" ) @@ -30,8 +30,7 @@ type Client struct { func NewForConfig(config rest.Config) (Interface, error) { if config.NegotiatedSerializer == nil { - configConfig := dynamic.ContentConfig() - config.NegotiatedSerializer = configConfig.NegotiatedSerializer + config.NegotiatedSerializer = dynamic.NegotiatedSerializer } restClient, err := restwatch.UnversionedRESTClientFor(&config) diff --git a/leader12/leader.go b/leader12/leader.go new file mode 100644 index 00000000..357e7b34 --- /dev/null +++ b/leader12/leader.go @@ -0,0 +1,70 @@ +package leader + +import ( + "context" + "os" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api/legacyscheme" +) + +type Callback func(cb context.Context) + +func RunOrDie(ctx context.Context, name string, client kubernetes.Interface, cb Callback) { + err := run(ctx, name, client, cb) + if err != nil { + logrus.Fatalf("Failed to start leader election for %s", name) + } + panic("Failed to start leader election for " + name) +} + +func run(ctx context.Context, name string, client kubernetes.Interface, cb Callback) error { + id, err := os.Hostname() + if err != nil { + return err + } + + recorder := createRecorder(name, client) + + rl, err := resourcelock.New(resourcelock.ConfigMapsResourceLock, + "kube-system", + name, + client.CoreV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }) + if err != nil { + logrus.Fatalf("error creating leader lock for %s: %v", name, err) + } + + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + go cb(ctx) + }, + OnStoppedLeading: func() { + logrus.Fatalf("leaderelection lost for %s", name) + }, + }, + }) + panic("unreachable") +} + +func createRecorder(name string, kubeClient kubernetes.Interface) record.EventRecorder { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(logrus.Debugf) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) + return eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: name}) +} diff --git a/objectclient/dynamic/content.go b/objectclient/dynamic/content.go new file mode 100644 index 00000000..794f15b4 --- /dev/null +++ b/objectclient/dynamic/content.go @@ -0,0 +1,58 @@ +package dynamic + +import ( + ejson "encoding/json" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var ( + NegotiatedSerializer = negotiatedSerializer{} +) + +type negotiatedSerializer struct{} + +func (s negotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo { + return []runtime.SerializerInfo{ + { + MediaType: "application/json", + EncodesAsText: true, + Serializer: dynamicCodec{ + Encoder: unstructured.UnstructuredJSONScheme, + }, + }, + } +} + +func (s negotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder { + return encoder +} + +func (s negotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder { + return decoder +} + +type dynamicCodec struct { + runtime.Encoder +} + +func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj) + if err != nil { + return nil, nil, err + } + + if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" { + obj = &metav1.Status{} + err := ejson.Unmarshal(data, obj) + if err != nil { + return nil, nil, err + } + } + + return obj, gvk, nil +} diff --git a/objectclient/object_client.go b/objectclient/object_client.go index 29b85811..d75896a2 100644 --- a/objectclient/object_client.go +++ b/objectclient/object_client.go @@ -16,7 +16,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" restclientwatch "k8s.io/client-go/rest/watch" ) @@ -138,7 +137,7 @@ func (p *ObjectClient) GetNamespaced(namespace, name string, opts metav1.GetOpti } err := req. Resource(p.resource.Name). - VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback). + VersionedParams(&opts, metav1.ParameterCodec). Name(name). Do(). Into(result) @@ -153,7 +152,7 @@ func (p *ObjectClient) Get(name string, opts metav1.GetOptions) (runtime.Object, Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). NamespaceIfScoped(p.ns, p.resource.Namespaced). Resource(p.resource.Name). - VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback). + VersionedParams(&opts, metav1.ParameterCodec). Name(name). Do(). Into(result) @@ -215,7 +214,7 @@ func (p *ObjectClient) List(opts metav1.ListOptions) (runtime.Object, error) { Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). NamespaceIfScoped(p.ns, p.resource.Namespaced). Resource(p.resource.Name). - VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback). + VersionedParams(&opts, metav1.ParameterCodec). Do(). Into(result) } @@ -231,7 +230,7 @@ func (p *ObjectClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { Prefix("watch"). NamespaceIfScoped(p.ns, p.resource.Namespaced). Resource(p.resource.Name). - VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback). + VersionedParams(&opts, metav1.ParameterCodec). Stream() if err != nil { return nil, err @@ -279,7 +278,7 @@ func (p *ObjectClient) DeleteCollection(deleteOptions *metav1.DeleteOptions, lis Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). NamespaceIfScoped(p.ns, p.resource.Namespaced). Resource(p.resource.Name). - VersionedParams(&listOptions, dynamic.VersionedParameterEncoderWithV1Fallback). + VersionedParams(&listOptions, metav1.ParameterCodec). Body(deleteOptions). Do(). Error() diff --git a/scripts/validate b/scripts/validate index ff9e078e..a3b265c7 100755 --- a/scripts/validate +++ b/scripts/validate @@ -5,7 +5,7 @@ cd $(dirname $0)/.. echo Running validation -PACKAGES="$(find -name '*.go' | xargs -I{} dirname {} | cut -f2 -d/ | sort -u | grep -Ev '(^\.$|.git|.trash-cache|vendor|bin)' | sed -e 's!^!./!' -e 's!$!/...!')" +PACKAGES="$(find -name '*.go' | xargs -I{} dirname {} | cut -f2 -d/ | sort -u | grep -Ev '(^\.$|.git|.trash-cache|vendor|bin|leader12)' | sed -e 's!^!./!' -e 's!$!/...!')" echo Running: go vet go vet ${PACKAGES} diff --git a/store/proxy/proxy_store.go b/store/proxy/proxy_store.go index 54f66fb7..54cf1053 100644 --- a/store/proxy/proxy_store.go +++ b/store/proxy/proxy_store.go @@ -9,6 +9,7 @@ import ( "time" "github.com/rancher/norman/httperror" + "github.com/rancher/norman/objectclient/dynamic" "github.com/rancher/norman/pkg/broadcast" "github.com/rancher/norman/restwatch" "github.com/rancher/norman/types" @@ -25,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" restclientwatch "k8s.io/client-go/rest/watch" ) @@ -52,8 +52,7 @@ type simpleClientGetter struct { func NewClientGetterFromConfig(config rest.Config) (ClientGetter, error) { dynamicConfig := config if dynamicConfig.NegotiatedSerializer == nil { - configConfig := dynamic.ContentConfig() - dynamicConfig.NegotiatedSerializer = configConfig.NegotiatedSerializer + dynamicConfig.NegotiatedSerializer = dynamic.NegotiatedSerializer } unversionedClient, err := rest.UnversionedRESTClientFor(&dynamicConfig) @@ -232,7 +231,7 @@ func (s *Store) realWatch(apiContext *types.APIContext, schema *types.Schema, op Watch: true, TimeoutSeconds: &timeout, ResourceVersion: "0", - }, dynamic.VersionedParameterEncoderWithV1Fallback) + }, metav1.ParameterCodec) body, err := req.Stream() if err != nil {