1
0
mirror of https://github.com/rancher/norman.git synced 2025-09-02 15:54:32 +00:00

Add offspring controller

This commit is contained in:
Darren Shepherd
2017-12-19 21:39:57 -07:00
parent 2825fdb9c3
commit 2550f12a9a
11 changed files with 629 additions and 9 deletions

View File

@@ -65,8 +65,8 @@ var (
APIRoot = types.Schema{
ID: "apiRoot",
Version: Version,
ResourceMethods: []string{},
CollectionMethods: []string{},
CollectionMethods: []string{"GET"},
ResourceMethods: []string{"GET"},
ResourceFields: map[string]types.Field{
"apiVersion": {Type: "map[json]"},
"path": {Type: "string"},

View File

@@ -58,6 +58,10 @@ func (p *ObjectClient) UnstructuredClient() *ObjectClient {
}
}
func (p *ObjectClient) GroupVersionKind() schema.GroupVersionKind {
return p.gvk
}
func (p *ObjectClient) getAPIPrefix() string {
if p.gvk.Group == "" {
return "api"
@@ -90,6 +94,23 @@ func (p *ObjectClient) Create(o runtime.Object) (runtime.Object, error) {
return result, err
}
func (p *ObjectClient) GetNamespace(name, namespace string, opts metav1.GetOptions) (runtime.Object, error) {
result := p.Factory.Object()
req := p.restClient.Get().
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version)
if namespace != "" {
req = req.Namespace(namespace)
}
err := req.NamespaceIfScoped(p.ns, p.resource.Namespaced).
Resource(p.resource.Name).
VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback).
Name(name).
Do().
Into(result)
return result, err
}
func (p *ObjectClient) Get(name string, opts metav1.GetOptions) (runtime.Object, error) {
result := p.Factory.Object()
err := p.restClient.Get().
@@ -123,6 +144,19 @@ func (p *ObjectClient) Update(name string, o runtime.Object) (runtime.Object, er
return result, err
}
func (p *ObjectClient) DeleteNamespace(name, namespace string, opts *metav1.DeleteOptions) error {
req := p.restClient.Delete().
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version)
if namespace != "" {
req = req.Namespace(namespace)
}
return req.Resource(p.resource.Name).
Name(name).
Body(opts).
Do().
Error()
}
func (p *ObjectClient) Delete(name string, opts *metav1.DeleteOptions) error {
return p.restClient.Delete().
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version).

View File

@@ -11,7 +11,7 @@ type Starter interface {
Start(ctx context.Context, threadiness int) error
}
func SyncThenSync(ctx context.Context, threadiness int, starters ...Starter) error {
func SyncThenStart(ctx context.Context, threadiness int, starters ...Starter) error {
if err := Sync(ctx, starters...); err != nil {
return err
}

View File

@@ -26,7 +26,7 @@ type Baz struct {
var (
version = types.APIVersion{
Version: "v1",
Group: "io.cattle.core.example",
Group: "example.core.cattle.io",
Path: "/example/v1",
}

View File

@@ -267,7 +267,7 @@ func generateClient(outputDir string, schemas []*types.Schema) error {
})
}
func GenerateControllerForTypes(version *types.APIVersion, k8sOutputPackage string, objs ...interface{}) error {
func GenerateControllerForTypes(version *types.APIVersion, k8sOutputPackage string, nsObjs []interface{}, objs []interface{}) error {
baseDir := args.DefaultSourceTree()
k8sDir := path.Join(baseDir, k8sOutputPackage)
@@ -294,6 +294,23 @@ func GenerateControllerForTypes(version *types.APIVersion, k8sOutputPackage stri
}
}
for _, obj := range nsObjs {
schema, err := schemas.Import(version, obj)
if err != nil {
return err
}
schema.Scope = types.NamespaceScope
controllers = append(controllers, schema)
if err := generateController(true, k8sDir, schema, schemas); err != nil {
return err
}
if err := generateLifecycle(true, k8sDir, schema, schemas); err != nil {
return err
}
}
if err := deepCopyGen(baseDir, k8sOutputPackage); err != nil {
return err
}

View File

@@ -9,7 +9,7 @@ import (
)
var (
created = "io.cattle.lifecycle.create"
created = "lifecycle.cattle.io/create"
)
type ObjectLifecycle interface {

520
offspring/offspring.go Normal file
View File

@@ -0,0 +1,520 @@
package offspring
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"github.com/rancher/norman/clientbase"
"github.com/sirupsen/logrus"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
)
type ParentLookup func(obj runtime.Object) *ParentReference
type Generator func(obj runtime.Object) (ObjectSet, error)
type Enqueue func(namespace, name string)
type ObjectReference struct {
Kind string
Namespace string
Name string
APIVersion string
}
type ParentReference struct {
Namespace string
Name string
}
type ObjectSet struct {
Parent runtime.Object
Children []runtime.Object
Complete bool
}
type KnownObjectSet struct {
Children map[ObjectReference]runtime.Object
}
func (k KnownObjectSet) clone() KnownObjectSet {
newMap := map[ObjectReference]runtime.Object{}
for k, v := range k.Children {
newMap[k] = v
}
return KnownObjectSet{
Children: newMap,
}
}
type ChildWatch struct {
ObjectClient clientbase.ObjectClient
Informer cache.SharedInformer
}
type change struct {
parent ParentReference
childRef ObjectReference
child runtime.Object
delete bool
}
type Reconciliation struct {
sync.Mutex
Generator Generator
Enqueue Enqueue
ObjectClient *clientbase.ObjectClient
Children []ChildWatcher
running bool
changes chan change
children map[ParentReference]KnownObjectSet
childWatchers map[schema.GroupVersionKind]*ChildWatcher
keys map[string]bool
}
type ChildWatcher struct {
ObjectClient *clientbase.ObjectClient
Informer cache.SharedInformer
Scheme runtime.Scheme
// optional
CompareKeys []string
// optional
ParentLookup ParentLookup
watcher *Reconciliation
keys map[string]bool
}
func NewReconciliation(ctx context.Context, generator Generator, enqueue Enqueue, client *clientbase.ObjectClient, children ...ChildWatcher) *Reconciliation {
r := &Reconciliation{
Generator: generator,
Enqueue: enqueue,
ObjectClient: client,
running: true,
changes: make(chan change, 10),
children: map[ParentReference]KnownObjectSet{},
childWatchers: map[schema.GroupVersionKind]*ChildWatcher{},
keys: getKeys(client.Factory.Object(), nil),
}
for _, child := range children {
if child.ParentLookup == nil {
child.ParentLookup = OwnerReferenceLookup(r.ObjectClient.GroupVersionKind())
}
child.watcher = r
if len(child.CompareKeys) == 0 {
child.keys = getKeys(child.ObjectClient.Factory.Object(), map[string]bool{"Status": true})
} else {
child.keys = map[string]bool{}
for _, key := range child.CompareKeys {
child.keys[key] = true
}
}
childCopy := child
child.Informer.AddEventHandler(&childCopy)
r.childWatchers[child.ObjectClient.GroupVersionKind()] = &childCopy
}
go r.run()
go func() {
<-ctx.Done()
r.Lock()
r.running = false
close(r.changes)
r.Unlock()
}()
return r
}
func OwnerReferenceLookup(gvk schema.GroupVersionKind) ParentLookup {
return func(obj runtime.Object) *ParentReference {
meta, err := apimeta.Accessor(obj)
if err != nil {
logrus.Errorf("Failed to look up parent for %v", obj)
return nil
}
var ownerRef *metav1.OwnerReference
for i, owner := range meta.GetOwnerReferences() {
if owner.Controller != nil && *owner.Controller {
ownerRef = &meta.GetOwnerReferences()[i]
break
}
}
if ownerRef == nil {
return nil
}
apiVersion, kind := gvk.ToAPIVersionAndKind()
if ownerRef.APIVersion != apiVersion || ownerRef.Kind != kind {
return nil
}
return &ParentReference{
Name: ownerRef.Name,
Namespace: meta.GetNamespace(),
}
}
}
func getKeys(obj interface{}, ignore map[string]bool) map[string]bool {
keys := map[string]bool{}
keys["metadata"] = true
value := reflect.ValueOf(obj)
t := value.Type()
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
numFields := t.NumField()
for i := 0; i < numFields; i++ {
field := t.Field(i)
if field.Name != "" && !field.Anonymous && !ignore[field.Name] {
keys[field.Name] = true
}
}
return keys
}
func (w *ChildWatcher) OnAdd(obj interface{}) {
w.changed(obj, false)
}
func (w *ChildWatcher) OnUpdate(oldObj, newObj interface{}) {
w.changed(newObj, false)
}
func (w *ChildWatcher) OnDelete(obj interface{}) {
w.changed(obj, true)
}
func (w *ChildWatcher) changed(obj interface{}, deleted bool) {
ro, ok := obj.(runtime.Object)
if !ok {
logrus.Errorf("Failed to cast %s to runtime.Object", reflect.TypeOf(obj))
return
}
parent := w.ParentLookup(ro)
if parent == nil {
return
}
meta, err := apimeta.Accessor(ro)
if err != nil {
logrus.Errorf("Failed to access metadata of runtime.Object: %v", err)
return
}
w.watcher.Lock()
if w.watcher.running {
gvk := w.ObjectClient.GroupVersionKind()
apiVersion, kind := gvk.ToAPIVersionAndKind()
w.watcher.changes <- change{
parent: *parent,
childRef: ObjectReference{
Namespace: meta.GetNamespace(),
Name: meta.GetName(),
Kind: kind,
APIVersion: apiVersion,
},
child: ro,
delete: deleted,
}
}
w.watcher.Unlock()
}
func (w *Reconciliation) Changed(key string, obj runtime.Object) (runtime.Object, error) {
var (
err error
objectSet ObjectSet
)
if obj == nil {
objectSet.Complete = true
} else {
objectSet, err = w.Generator(obj)
if err != nil {
return obj, err
}
}
parentRef := keyToParentReference(key)
existingSet := w.children[parentRef]
if objectSet.Parent != nil {
if newObj, err := w.updateParent(parentRef, obj, objectSet.Parent); err != nil {
return obj, err
}
obj = newObj
objectSet.Parent = obj
}
var lastErr error
newChildRefs := map[ObjectReference]bool{}
for _, child := range objectSet.Children {
childRef, err := createRef(child)
if err != nil {
return obj, err
}
newChildRefs[childRef] = true
existingChild, ok := existingSet.Children[childRef]
if ok {
if _, err := w.updateChild(childRef, existingChild, child); err != nil {
lastErr = err
}
} else {
if _, err := w.createChild(obj, childRef, child); err != nil {
lastErr = err
}
}
}
if objectSet.Complete {
for childRef, child := range existingSet.Children {
if !newChildRefs[childRef] {
if err := w.deleteChild(childRef, child); err != nil {
lastErr = err
}
}
}
}
return obj, lastErr
}
func createRef(obj runtime.Object) (ObjectReference, error) {
gvk := obj.GetObjectKind().GroupVersionKind()
ref := ObjectReference{}
ref.APIVersion, ref.Kind = gvk.ToAPIVersionAndKind()
meta, err := apimeta.Accessor(obj)
if err != nil {
return ref, err
}
ref.Name = meta.GetName()
ref.Namespace = meta.GetNamespace()
if ref.Name == "" || ref.Kind == "" || ref.APIVersion == "" {
return ref, fmt.Errorf("name, kind, or apiVersion is blank %v", ref)
}
return ref, nil
}
func (w *Reconciliation) createChild(parent runtime.Object, reference ObjectReference, object runtime.Object) (runtime.Object, error) {
childWatcher, err := w.getChildWatcher(reference)
if err != nil {
return object, err
}
parentMeta, err := apimeta.Accessor(parent)
if err != nil {
return object, err
}
meta, err := apimeta.Accessor(object)
if err != nil {
return object, err
}
if meta.GetNamespace() == parentMeta.GetNamespace() {
trueValue := true
ownerRef := metav1.OwnerReference{
Name: parentMeta.GetName(),
UID: parentMeta.GetUID(),
BlockOwnerDeletion: &trueValue,
Controller: &trueValue,
}
gvk := parent.GetObjectKind().GroupVersionKind()
ownerRef.APIVersion, ownerRef.Kind = gvk.ToAPIVersionAndKind()
meta.SetOwnerReferences(append(meta.GetOwnerReferences(), ownerRef))
}
return childWatcher.ObjectClient.Create(object)
}
func (w *Reconciliation) deleteChild(reference ObjectReference, object runtime.Object) error {
childWatcher, err := w.getChildWatcher(reference)
if err != nil {
return err
}
policy := metav1.DeletePropagationForeground
return childWatcher.ObjectClient.DeleteNamespace(reference.Name, reference.Namespace, &metav1.DeleteOptions{
PropagationPolicy: &policy,
})
}
func (w *Reconciliation) getChildWatcher(reference ObjectReference) (*ChildWatcher, error) {
gvk := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind)
childWatcher, ok := w.childWatchers[gvk]
if !ok {
return nil, fmt.Errorf("failed to find childWatcher for %s", gvk)
}
return childWatcher, nil
}
func keyToParentReference(key string) ParentReference {
parts := strings.SplitN(key, "/", 2)
if len(parts) == 1 {
return ParentReference{
Name: parts[0],
}
}
return ParentReference{
Namespace: parts[0],
Name: parts[1],
}
}
func (w *Reconciliation) run() {
for change := range w.changes {
w.Lock()
children := w.children[change.parent]
w.Unlock()
children = children.clone()
if change.delete {
delete(children.Children, change.childRef)
} else {
children.Children[change.childRef] = change.child
}
w.Lock()
if len(children.Children) == 0 {
delete(w.children, change.parent)
} else {
w.children[change.parent] = children
}
w.Unlock()
w.Enqueue(change.parent.Namespace, change.parent.Name)
}
}
func (w *Reconciliation) updateParent(parentRef ParentReference, oldObj runtime.Object, newObj runtime.Object) (runtime.Object, error) {
reference := ObjectReference{
Name: parentRef.Name,
Namespace: parentRef.Namespace,
}
gvk := w.ObjectClient.GroupVersionKind()
reference.APIVersion, reference.Kind = gvk.ToAPIVersionAndKind()
return w.updateObject(reference, oldObj, newObj, w.ObjectClient, w.keys)
}
func (w *Reconciliation) updateChild(reference ObjectReference, oldObj runtime.Object, newObj runtime.Object) (runtime.Object, error) {
childWatcher, err := w.getChildWatcher(reference)
if err != nil {
return nil, err
}
return w.updateObject(reference, oldObj, newObj, childWatcher.ObjectClient, childWatcher.keys)
}
func (w *Reconciliation) updateObject(reference ObjectReference, oldObj runtime.Object, newObj runtime.Object, client *clientbase.ObjectClient, keys map[string]bool) (runtime.Object, error) {
changes := map[string]interface{}{}
oldObj = oldObj.DeepCopyObject()
oldValue := reflect.ValueOf(oldObj).Elem()
newValue := reflect.ValueOf(newObj).Elem()
for key := range keys {
if key == "metadata" {
oldMeta, err := apimeta.Accessor(oldObj)
if err != nil {
return nil, err
}
newMeta, err := apimeta.Accessor(newObj)
if err != nil {
return nil, err
}
if data, changed := compareMaps(oldMeta.GetLabels(), newMeta.GetLabels()); changed {
changes["labels"] = data
}
if data, changed := compareMaps(oldMeta.GetAnnotations(), newMeta.GetAnnotations()); changed {
changes["annotations"] = data
}
} else {
oldField := oldValue.FieldByName(key)
oldIValue := oldField.Interface()
newField := newValue.FieldByName(key)
newIValue := newField.Interface()
if !reflect.DeepEqual(oldIValue, newIValue) {
oldField.Set(newField)
changeName := jsonName(newValue, key)
if changeName != "-" {
changes[changeName] = newValue
}
}
}
}
if len(changes) > 0 {
//newObj := &unstructured.Unstructured{}
//newObj.Object = changes
//newMeta, err := apimeta.Accessor(newObj)
//if err != nil {
// return nil, err
//}
//
//newTypeMeta, err := apimeta.TypeAccessor(newObj)
//if err != nil {
// return nil, err
//}
//
//newMeta.SetName(reference.Name)
//newMeta.SetNamespace(reference.Namespace)
//newTypeMeta.SetKind(reference.Kind)
//newTypeMeta.SetAPIVersion(reference.APIVersion)
fmt.Println("!!!!!!!!!!!!!! UPDATE! !!!!!!!!!!!!!!")
return client.Update(reference.Name, oldObj)
}
return newObj, nil
}
func jsonName(value reflect.Value, fieldName string) string {
field, _ := value.Type().FieldByName(fieldName)
name := strings.Split(field.Tag.Get("json"), ",")[0]
if name == "" {
return fieldName
}
return name
}
func compareMaps(oldValues, newValues map[string]string) (map[string]string, bool) {
changed := false
mergedValues := map[string]string{}
for k, v := range oldValues {
mergedValues[k] = v
}
for k, v := range newValues {
oldV, ok := oldValues[k]
if !ok || v != oldV {
changed = true
}
mergedValues[k] = v
}
return mergedValues, changed
}

View File

@@ -146,5 +146,5 @@ func EncodeToMap(obj interface{}) (map[string]interface{}, error) {
return nil, err
}
result := map[string]interface{}{}
return result, json.Unmarshal(bytes, result)
return result, json.Unmarshal(bytes, &result)
}

View File

@@ -0,0 +1,45 @@
package mapper
import (
"encoding/json"
"github.com/rancher/norman/types"
"github.com/rancher/norman/types/convert"
"github.com/rancher/norman/types/values"
)
type AnnotationField struct {
Field string
Object bool
}
func (e AnnotationField) FromInternal(data map[string]interface{}) {
v, ok := values.RemoveValue(data, "annotations", "field.cattle.io/"+e.Field)
if ok {
if e.Object {
data := map[string]interface{}{}
//ignore error
if err := json.Unmarshal([]byte(convert.ToString(v)), &data); err == nil {
v = data
}
}
data[e.Field] = v
}
}
func (e AnnotationField) ToInternal(data map[string]interface{}) {
v, ok := data[e.Field]
if ok {
if e.Object {
if bytes, err := json.Marshal(v); err == nil {
v = string(bytes)
}
}
values.PutValue(data, v, "annotations", "field.cattle.io/"+e.Field)
}
}
func (e AnnotationField) ModifySchema(schema *types.Schema, schemas *types.Schemas) error {
return validateField(e.Field, schema)
}

View File

@@ -26,6 +26,10 @@ func (e *Embed) FromInternal(data map[string]interface{}) {
}
func (e *Embed) ToInternal(data map[string]interface{}) {
if data == nil {
return
}
sub := map[string]interface{}{}
for _, fieldName := range e.embeddedFields {
if v, ok := data[fieldName]; ok {

View File

@@ -10,7 +10,7 @@ type LabelField struct {
}
func (e LabelField) FromInternal(data map[string]interface{}) {
v, ok := values.RemoveValue(data, "labels", "io.cattle.field."+e.Field)
v, ok := values.RemoveValue(data, "labels", "field.cattle.io/"+e.Field)
if ok {
data[e.Field] = v
}
@@ -19,7 +19,7 @@ func (e LabelField) FromInternal(data map[string]interface{}) {
func (e LabelField) ToInternal(data map[string]interface{}) {
v, ok := data[e.Field]
if ok {
values.PutValue(data, v, "labels", "io.cattle.field."+e.Field)
values.PutValue(data, v, "labels", "field.cattle.io/"+e.Field)
}
}