diff --git a/apis/core/v1/zz_generated_deepcopy.go b/apis/core/v1/zz_generated_deepcopy.go index 9344ebbc..43ef00f7 100644 --- a/apis/core/v1/zz_generated_deepcopy.go +++ b/apis/core/v1/zz_generated_deepcopy.go @@ -5,6 +5,40 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeList) DeepCopyInto(out *NodeList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]core_v1.Node, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeList. +func (in *NodeList) DeepCopy() *NodeList { + if in == nil { + return nil + } + out := new(NodeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } else { + return nil + } +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodList) DeepCopyInto(out *PodList) { *out = *in diff --git a/apis/core/v1/zz_generated_k8s_client.go b/apis/core/v1/zz_generated_k8s_client.go index a371f0df..bf8d8eec 100644 --- a/apis/core/v1/zz_generated_k8s_client.go +++ b/apis/core/v1/zz_generated_k8s_client.go @@ -15,6 +15,7 @@ type Interface interface { controller.Starter PodsGetter + NodesGetter } type Client struct { @@ -22,7 +23,8 @@ type Client struct { restClient rest.Interface starters []controller.Starter - podControllers map[string]PodController + podControllers map[string]PodController + nodeControllers map[string]NodeController } func NewForConfig(config rest.Config) (Interface, error) { @@ -39,7 +41,8 @@ func NewForConfig(config rest.Config) (Interface, error) { return &Client{ restClient: restClient, - podControllers: map[string]PodController{}, + podControllers: map[string]PodController{}, + nodeControllers: map[string]NodeController{}, }, nil } @@ -67,3 +70,16 @@ func (c *Client) Pods(namespace string) PodInterface { objectClient: objectClient, } } + +type NodesGetter interface { + Nodes(namespace string) NodeInterface +} + +func (c *Client) Nodes(namespace string) NodeInterface { + objectClient := clientbase.NewObjectClient(namespace, c.restClient, &NodeResource, NodeGroupVersionKind, nodeFactory{}) + return &nodeClient{ + ns: namespace, + client: c, + objectClient: objectClient, + } +} diff --git a/apis/core/v1/zz_generated_node_controller.go b/apis/core/v1/zz_generated_node_controller.go new file mode 100644 index 00000000..264c2681 --- /dev/null +++ b/apis/core/v1/zz_generated_node_controller.go @@ -0,0 +1,188 @@ +package v1 + +import ( + "context" + + "github.com/rancher/norman/clientbase" + "github.com/rancher/norman/controller" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +var ( + NodeGroupVersionKind = schema.GroupVersionKind{ + Version: "v1", + Group: "", + Kind: "Node", + } + NodeResource = metav1.APIResource{ + Name: "nodes", + SingularName: "node", + Namespaced: false, + Kind: NodeGroupVersionKind.Kind, + } +) + +type NodeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []v1.Node +} + +type NodeHandlerFunc func(key string, obj *v1.Node) error + +type NodeLister interface { + List(namespace string, selector labels.Selector) (ret []*v1.Node, err error) + Get(namespace, name string) (*v1.Node, error) +} + +type NodeController interface { + Informer() cache.SharedIndexInformer + Lister() NodeLister + AddHandler(handler NodeHandlerFunc) + Enqueue(namespace, name string) + Sync(ctx context.Context) error + Start(ctx context.Context, threadiness int) error +} + +type NodeInterface interface { + ObjectClient() *clientbase.ObjectClient + Create(*v1.Node) (*v1.Node, error) + Get(name string, opts metav1.GetOptions) (*v1.Node, error) + Update(*v1.Node) (*v1.Node, error) + Delete(name string, options *metav1.DeleteOptions) error + List(opts metav1.ListOptions) (*v1.NodeList, error) + Watch(opts metav1.ListOptions) (watch.Interface, error) + DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error + Controller() NodeController +} + +type nodeLister struct { + controller *nodeController +} + +func (l *nodeLister) List(namespace string, selector labels.Selector) (ret []*v1.Node, err error) { + err = cache.ListAllByNamespace(l.controller.Informer().GetIndexer(), namespace, selector, func(obj interface{}) { + ret = append(ret, obj.(*v1.Node)) + }) + return +} + +func (l *nodeLister) Get(namespace, name string) (*v1.Node, error) { + obj, exists, err := l.controller.Informer().GetIndexer().GetByKey(namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(schema.GroupResource{ + Group: NodeGroupVersionKind.Group, + Resource: "node", + }, name) + } + return obj.(*v1.Node), nil +} + +type nodeController struct { + controller.GenericController +} + +func (c *nodeController) Lister() NodeLister { + return &nodeLister{ + controller: c, + } +} + +func (c *nodeController) AddHandler(handler NodeHandlerFunc) { + c.GenericController.AddHandler(func(key string) error { + obj, exists, err := c.Informer().GetStore().GetByKey(key) + if err != nil { + return err + } + if !exists { + return handler(key, nil) + } + return handler(key, obj.(*v1.Node)) + }) +} + +type nodeFactory struct { +} + +func (c nodeFactory) Object() runtime.Object { + return &v1.Node{} +} + +func (c nodeFactory) List() runtime.Object { + return &NodeList{} +} + +func (s *nodeClient) Controller() NodeController { + s.client.Lock() + defer s.client.Unlock() + + c, ok := s.client.nodeControllers[s.ns] + if ok { + return c + } + + genericController := controller.NewGenericController(NodeGroupVersionKind.Kind+"Controller", + s.objectClient) + + c = &nodeController{ + GenericController: genericController, + } + + s.client.nodeControllers[s.ns] = c + s.client.starters = append(s.client.starters, c) + + return c +} + +type nodeClient struct { + client *Client + ns string + objectClient *clientbase.ObjectClient + controller NodeController +} + +func (s *nodeClient) ObjectClient() *clientbase.ObjectClient { + return s.objectClient +} + +func (s *nodeClient) Create(o *v1.Node) (*v1.Node, error) { + obj, err := s.objectClient.Create(o) + return obj.(*v1.Node), err +} + +func (s *nodeClient) Get(name string, opts metav1.GetOptions) (*v1.Node, error) { + obj, err := s.objectClient.Get(name, opts) + return obj.(*v1.Node), err +} + +func (s *nodeClient) Update(o *v1.Node) (*v1.Node, error) { + obj, err := s.objectClient.Update(o.Name, o) + return obj.(*v1.Node), err +} + +func (s *nodeClient) Delete(name string, options *metav1.DeleteOptions) error { + return s.objectClient.Delete(name, options) +} + +func (s *nodeClient) List(opts metav1.ListOptions) (*v1.NodeList, error) { + obj, err := s.objectClient.List(opts) + return obj.(*v1.NodeList), err +} + +func (s *nodeClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { + return s.objectClient.Watch(opts) +} + +func (s *nodeClient) DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error { + return s.objectClient.DeleteCollection(deleteOpts, listOpts) +} diff --git a/config/context.go b/config/context.go index 56a63c00..bce53f26 100644 --- a/config/context.go +++ b/config/context.go @@ -18,17 +18,35 @@ import ( type ClusterContext struct { RESTConfig rest.Config UnversionedClient rest.Interface - Cluster clusterv1.Interface - Authorization authzv1.Interface + + Cluster clusterv1.Interface + Authorization authzv1.Interface +} + +func (c *ClusterContext) controllers() []controller.Starter { + return []controller.Starter{ + c.Cluster, + c.Authorization, + } } type WorkloadContext struct { Cluster *ClusterContext + ClusterName string RESTConfig rest.Config UnversionedClient rest.Interface - Apps appsv1beta2.Interface - Workload workloadv1.Interface - Core corev1.Interface + + Apps appsv1beta2.Interface + Workload workloadv1.Interface + Core corev1.Interface +} + +func (w *WorkloadContext) controllers() []controller.Starter { + return []controller.Starter{ + w.Apps, + w.Workload, + w.Core, + } } func NewClusterContext(config rest.Config) (*ClusterContext, error) { @@ -63,25 +81,8 @@ func NewClusterContext(config rest.Config) (*ClusterContext, error) { } func (c *ClusterContext) Start(ctx context.Context) error { - logrus.Info("Syncing cluster controllers") - err := controller.Sync(ctx, - c.Cluster, - c.Authorization) - if err != nil { - return err - } - logrus.Info("Starting cluster controllers") - if err := c.Cluster.Start(ctx, 5); err != nil { - return err - } - - if err := c.Authorization.Start(ctx, 5); err != nil { - return err - } - - logrus.Info("Cluster context started") - return nil + return controller.SyncThenSync(ctx, 5, c.controllers()...) } func (c *ClusterContext) StartAndWait() error { @@ -91,7 +92,7 @@ func (c *ClusterContext) StartAndWait() error { return ctx.Err() } -func NewWorkloadContext(clusterConfig, config rest.Config) (*WorkloadContext, error) { +func NewWorkloadContext(clusterConfig, config rest.Config, clusterName string) (*WorkloadContext, error) { var err error context := &WorkloadContext{ RESTConfig: config, @@ -130,3 +131,17 @@ func NewWorkloadContext(clusterConfig, config rest.Config) (*WorkloadContext, er return context, err } + +func (w *WorkloadContext) Start(ctx context.Context) error { + logrus.Info("Starting workload controllers") + controllers := w.Cluster.controllers() + controllers = append(controllers, w.controllers()...) + return controller.SyncThenSync(ctx, 5, controllers...) +} + +func (w *WorkloadContext) StartAndWait() error { + ctx := signal.SigTermCancelContext(context.Background()) + w.Start(ctx) + <-ctx.Done() + return ctx.Err() +} diff --git a/main.go b/main.go index bd98e178..3696ae2c 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,6 @@ func main() { generator.Generate(workloadSchema.Schemas) generator.Generate(authzSchema.Schemas) // Group by API group - generator.GenerateNativeTypes(v1.Pod{}) + generator.GenerateNativeTypes(v1.Pod{}, v1.Node{}) generator.GenerateNativeTypes(v1beta2.Deployment{}) } diff --git a/vendor.conf b/vendor.conf index 011529e2..96961e5a 100644 --- a/vendor.conf +++ b/vendor.conf @@ -3,5 +3,5 @@ github.com/rancher/types k8s.io/kubernetes v1.8.3 transitive=true,staging=true bitbucket.org/ww/goautoneg a547fc61f48d567d5b4ec6f8aee5573d8efce11d https://github.com/rancher/goautoneg.git -github.com/rancher/norman 391a96f33dd0b1f893f8b194e2d7cad3e26d1351 +github.com/rancher/norman ee148b4d18535ee26bce4b90380d91a1d9e4ac15 golang.org/x/sync fd80eb99c8f653c847d294a001bdf2a3a6f768f5 diff --git a/vendor/github.com/rancher/norman/controller/generic_controller.go b/vendor/github.com/rancher/norman/controller/generic_controller.go index a1f4816f..dde151c0 100644 --- a/vendor/github.com/rancher/norman/controller/generic_controller.go +++ b/vendor/github.com/rancher/norman/controller/generic_controller.go @@ -98,13 +98,14 @@ func (g *genericController) sync(ctx context.Context) error { DeleteFunc: g.queueObject, }) - logrus.Infof("Starting %s Controller", g.name) + logrus.Infof("Syncing %s Controller", g.name) go g.informer.Run(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) { return fmt.Errorf("failed to sync controller %s", g.name) } + logrus.Infof("Syncing %s Controller Done", g.name) g.synced = true return nil diff --git a/vendor/github.com/rancher/norman/controller/starter.go b/vendor/github.com/rancher/norman/controller/starter.go index 3a735de9..2c488a17 100644 --- a/vendor/github.com/rancher/norman/controller/starter.go +++ b/vendor/github.com/rancher/norman/controller/starter.go @@ -11,8 +11,15 @@ type Starter interface { Start(ctx context.Context, threadiness int) error } +func SyncThenSync(ctx context.Context, threadiness int, starters ...Starter) error { + if err := Sync(ctx, starters...); err != nil { + return err + } + return Start(ctx, threadiness, starters...) +} + func Sync(ctx context.Context, starters ...Starter) error { - eg, ctx := errgroup.WithContext(ctx) + eg, _ := errgroup.WithContext(ctx) for _, starter := range starters { func(starter Starter) { eg.Go(func() error {