Merge pull request #34319 from mwielgus/daemonset-controller

Automatic merge from submit-queue

Federated DaemonSet controller

Based on the secrets controller. E2e tests will come in the next PR.

**Release note**:
```release-note
Federated DaemonSet controller. Supports all the API that regular DaemonSet has.
```

cc: @quinton-hoole @kubernetes/sig-cluster-federation
This commit is contained in:
Kubernetes Submit Queue 2016-10-12 18:24:55 -07:00 committed by GitHub
commit eed41cd6d4
3 changed files with 507 additions and 0 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
daemonset "k8s.io/kubernetes/federation/pkg/federation-controller/daemonset"
deploymentcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/deployment"
ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress"
namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace"
@ -157,6 +158,10 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
secretcontroller := secretcontroller.NewSecretController(secretcontrollerClientset)
secretcontroller.Run(wait.NeverStop)
daemonsetcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "daemonset-controller"))
daemonsetcontroller := daemonset.NewDaemonSetController(daemonsetcontrollerClientset)
daemonsetcontroller.Run(wait.NeverStop)
replicaSetClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, replicasetcontroller.UserAgentName))
replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset)
go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop)

View File

@ -0,0 +1,356 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package daemonset
import (
"reflect"
"time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (
allClustersKey = "ALL_CLUSTERS"
)
type DaemonSetController struct {
// For triggering single daemonset reconciliation. This is used when there is an
// add/update/delete operation on a daemonset in either federated API server or
// in some member of the federation.
daemonsetDeliverer *util.DelayingDeliverer
// For triggering all daemonsets reconciliation. This is used when
// a new cluster becomes available.
clusterDeliverer *util.DelayingDeliverer
// Contains daemonsets present in members of federation.
daemonsetFederatedInformer util.FederatedInformer
// For updating members of federation.
federatedUpdater util.FederatedUpdater
// Definitions of daemonsets that should be federated.
daemonsetInformerStore cache.Store
// Informer controller for daemonsets that should be federated.
daemonsetInformerController cache.ControllerInterface
// Client to federated api server.
federatedApiClient federationclientset.Interface
// Backoff manager for daemonsets
daemonsetBackoff *flowcontrol.Backoff
// For events
eventRecorder record.EventRecorder
daemonsetReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
updateTimeout time.Duration
}
// NewDaemonSetController returns a new daemonset controller
func NewDaemonSetController(client federationclientset.Interface) *DaemonSetController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-daemonset-controller"})
daemonsetcontroller := &DaemonSetController{
federatedApiClient: client,
daemonsetReviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
daemonsetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
}
// Build deliverers for triggering reconciliations.
daemonsetcontroller.daemonsetDeliverer = util.NewDelayingDeliverer()
daemonsetcontroller.clusterDeliverer = util.NewDelayingDeliverer()
// Start informer in federated API servers on daemonsets that should be federated.
daemonsetcontroller.daemonsetInformerStore, daemonsetcontroller.daemonsetInformerController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
versionedOptions := util.VersionizeV1ListOptions(options)
return client.Extensions().DaemonSets(api_v1.NamespaceAll).List(versionedOptions)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
versionedOptions := util.VersionizeV1ListOptions(options)
return client.Extensions().DaemonSets(api_v1.NamespaceAll).Watch(versionedOptions)
},
},
&extensionsv1.DaemonSet{},
controller.NoResyncPeriodFunc(),
util.NewTriggerOnAllChanges(func(obj pkg_runtime.Object) { daemonsetcontroller.deliverDaemonSetObj(obj, 0, false) }))
// Federated informer on daemonsets in members of federation.
daemonsetcontroller.daemonsetFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federation_api.Cluster, targetClient kubeclientset.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
versionedOptions := util.VersionizeV1ListOptions(options)
return targetClient.Extensions().DaemonSets(api_v1.NamespaceAll).List(versionedOptions)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
versionedOptions := util.VersionizeV1ListOptions(options)
return targetClient.Extensions().DaemonSets(api_v1.NamespaceAll).Watch(versionedOptions)
},
},
&extensionsv1.DaemonSet{},
controller.NoResyncPeriodFunc(),
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
// would be just confirmation that some daemonset opration succeeded.
util.NewTriggerOnAllChanges(
func(obj pkg_runtime.Object) {
daemonsetcontroller.deliverDaemonSetObj(obj, daemonsetcontroller.daemonsetReviewDelay, false)
},
))
},
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
// When new cluster becomes available process all the daemonsets again.
daemonsetcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(daemonsetcontroller.clusterAvailableDelay))
},
},
)
// Federated updater along with Create/Update/Delete operations.
daemonsetcontroller.federatedUpdater = util.NewFederatedUpdater(daemonsetcontroller.daemonsetFederatedInformer,
func(client kubeclientset.Interface, obj pkg_runtime.Object) error {
daemonset := obj.(*extensionsv1.DaemonSet)
glog.V(4).Infof("Attempting to create daemonset: %s/%s", daemonset.Namespace, daemonset.Name)
_, err := client.Extensions().DaemonSets(daemonset.Namespace).Create(daemonset)
if err != nil {
glog.Errorf("Error creating daemonset %s/%s/: %v", daemonset.Namespace, daemonset.Name, err)
} else {
glog.V(4).Infof("Successfully created deamonset %s/%s", daemonset.Namespace, daemonset.Name)
}
return err
},
func(client kubeclientset.Interface, obj pkg_runtime.Object) error {
daemonset := obj.(*extensionsv1.DaemonSet)
glog.V(4).Infof("Attempting to update daemonset: %s/%s", daemonset.Namespace, daemonset.Name)
_, err := client.Extensions().DaemonSets(daemonset.Namespace).Update(daemonset)
if err != nil {
glog.Errorf("Error updating daemonset %s/%s/: %v", daemonset.Namespace, daemonset.Name, err)
} else {
glog.V(4).Infof("Successfully updating deamonset %s/%s", daemonset.Namespace, daemonset.Name)
}
return err
},
func(client kubeclientset.Interface, obj pkg_runtime.Object) error {
daemonset := obj.(*extensionsv1.DaemonSet)
glog.V(4).Infof("Attempting to delete daemonset: %s/%s", daemonset.Namespace, daemonset.Name)
err := client.Extensions().DaemonSets(daemonset.Namespace).Delete(daemonset.Name, &api_v1.DeleteOptions{})
if err != nil {
glog.Errorf("Error deleting daemonset %s/%s/: %v", daemonset.Namespace, daemonset.Name, err)
} else {
glog.V(4).Infof("Successfully deleting deamonset %s/%s", daemonset.Namespace, daemonset.Name)
}
return err
})
return daemonsetcontroller
}
func (daemonsetcontroller *DaemonSetController) Run(stopChan <-chan struct{}) {
glog.V(1).Infof("Starting daemonset controllr")
go daemonsetcontroller.daemonsetInformerController.Run(stopChan)
glog.V(1).Infof("Starting daemonset federated informer")
daemonsetcontroller.daemonsetFederatedInformer.Start()
go func() {
<-stopChan
daemonsetcontroller.daemonsetFederatedInformer.Stop()
}()
glog.V(1).Infof("Starting daemonset deliverers")
daemonsetcontroller.daemonsetDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
daemonset := item.Value.(*types.NamespacedName)
glog.V(4).Infof("Trigerring reconciliation of daemonset %s", daemonset.String())
daemonsetcontroller.reconcileDaemonSet(daemonset.Namespace, daemonset.Name)
})
daemonsetcontroller.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
glog.V(4).Infof("Triggering reconciliation of all daemonsets")
daemonsetcontroller.reconcileDaemonSetsOnClusterChange()
})
util.StartBackoffGC(daemonsetcontroller.daemonsetBackoff, stopChan)
}
func getDaemonSetKey(namespace, name string) string {
return types.NamespacedName{
Namespace: namespace,
Name: name,
}.String()
}
func (daemonsetcontroller *DaemonSetController) deliverDaemonSetObj(obj interface{}, delay time.Duration, failed bool) {
daemonset := obj.(*extensionsv1.DaemonSet)
daemonsetcontroller.deliverDaemonSet(daemonset.Namespace, daemonset.Name, delay, failed)
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
func (daemonsetcontroller *DaemonSetController) deliverDaemonSet(namespace string, name string, delay time.Duration, failed bool) {
key := getDaemonSetKey(namespace, name)
if failed {
daemonsetcontroller.daemonsetBackoff.Next(key, time.Now())
delay = delay + daemonsetcontroller.daemonsetBackoff.Get(key)
} else {
daemonsetcontroller.daemonsetBackoff.Reset(key)
}
daemonsetcontroller.daemonsetDeliverer.DeliverAfter(key,
&types.NamespacedName{Namespace: namespace, Name: name}, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server.
func (daemonsetcontroller *DaemonSetController) isSynced() bool {
if !daemonsetcontroller.daemonsetFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced")
return false
}
clusters, err := daemonsetcontroller.daemonsetFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err)
return false
}
if !daemonsetcontroller.daemonsetFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
return false
}
return true
}
// The function triggers reconciliation of all federated daemonsets.
func (daemonsetcontroller *DaemonSetController) reconcileDaemonSetsOnClusterChange() {
if !daemonsetcontroller.isSynced() {
daemonsetcontroller.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(daemonsetcontroller.clusterAvailableDelay))
}
for _, obj := range daemonsetcontroller.daemonsetInformerStore.List() {
daemonset := obj.(*extensionsv1.DaemonSet)
daemonsetcontroller.deliverDaemonSet(daemonset.Namespace, daemonset.Name, daemonsetcontroller.smallDelay, false)
}
}
func (daemonsetcontroller *DaemonSetController) reconcileDaemonSet(namespace string, daemonsetName string) {
glog.V(4).Infof("Reconciling daemonset %s/%s", namespace, daemonsetName)
if !daemonsetcontroller.isSynced() {
glog.V(4).Infof("Daemonset controller is not synced")
daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, daemonsetcontroller.clusterAvailableDelay, false)
return
}
key := getDaemonSetKey(namespace, daemonsetName)
baseDaemonSetObj, exist, err := daemonsetcontroller.daemonsetInformerStore.GetByKey(key)
if err != nil {
glog.Errorf("Failed to query main daemonset store for %v: %v", key, err)
daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true)
return
}
if !exist {
glog.V(4).Infof("Skipping daemonset %s/%s - not federated", namespace, daemonsetName)
// Not federated daemonset, ignoring.
return
}
baseDaemonSet := baseDaemonSetObj.(*extensionsv1.DaemonSet)
clusters, err := daemonsetcontroller.daemonsetFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, daemonsetcontroller.clusterAvailableDelay, false)
return
}
operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterDaemonSetObj, found, err := daemonsetcontroller.daemonsetFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err)
daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true)
return
}
desiredDaemonSet := &extensionsv1.DaemonSet{
ObjectMeta: util.CopyObjectMeta(baseDaemonSet.ObjectMeta),
Spec: baseDaemonSet.Spec,
}
if !found {
glog.V(4).Infof("Creating daemonset %s/%s in cluster %s", namespace, daemonsetName, cluster.Name)
daemonsetcontroller.eventRecorder.Eventf(baseDaemonSet, api.EventTypeNormal, "CreateInCluster",
"Creating daemonset in cluster %s", cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd,
Obj: desiredDaemonSet,
ClusterName: cluster.Name,
})
} else {
clusterDaemonSet := clusterDaemonSetObj.(*extensionsv1.DaemonSet)
// Update existing daemonset, if needed.
if !util.ObjectMetaEquivalent(desiredDaemonSet.ObjectMeta, clusterDaemonSet.ObjectMeta) ||
!reflect.DeepEqual(desiredDaemonSet.Spec, clusterDaemonSet.Spec) {
glog.V(4).Infof("Upadting daemonset %s/%s in cluster %s", namespace, daemonsetName, cluster.Name)
daemonsetcontroller.eventRecorder.Eventf(baseDaemonSet, api.EventTypeNormal, "UpdateInCluster",
"Updating daemonset in cluster %s", cluster.Name)
operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeUpdate,
Obj: desiredDaemonSet,
ClusterName: cluster.Name,
})
}
}
}
if len(operations) == 0 {
glog.V(4).Infof("No operation needed for %s/%s", namespace, daemonsetName)
// Everything is in order
return
}
err = daemonsetcontroller.federatedUpdater.UpdateWithOnError(operations, daemonsetcontroller.updateTimeout,
func(op util.FederatedOperation, operror error) {
daemonsetcontroller.eventRecorder.Eventf(baseDaemonSet, api.EventTypeNormal, "UpdateInClusterFailed",
"DaemonSet update in cluster %s failed: %v", op.ClusterName, operror)
})
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v, retrying shortly", key, err)
daemonsetcontroller.deliverDaemonSet(namespace, daemonsetName, 0, true)
return
}
}

View File

@ -0,0 +1,146 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package daemonset
import (
"fmt"
"reflect"
"testing"
"time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fake_fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_5/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
fake_kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/stretchr/testify/assert"
)
func TestDaemonSetController(t *testing.T) {
cluster1 := NewCluster("cluster1", api_v1.ConditionTrue)
cluster2 := NewCluster("cluster2", api_v1.ConditionTrue)
fakeClient := &fake_fedclientset.Clientset{}
RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}})
RegisterFakeList("daemonsets", &fakeClient.Fake, &extensionsv1.DaemonSetList{Items: []extensionsv1.DaemonSet{}})
daemonsetWatch := RegisterFakeWatch("daemonsets", &fakeClient.Fake)
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake)
cluster1Client := &fake_kubeclientset.Clientset{}
cluster1Watch := RegisterFakeWatch("daemonsets", &cluster1Client.Fake)
RegisterFakeList("daemonsets", &cluster1Client.Fake, &extensionsv1.DaemonSetList{Items: []extensionsv1.DaemonSet{}})
cluster1CreateChan := RegisterFakeCopyOnCreate("daemonsets", &cluster1Client.Fake, cluster1Watch)
cluster1UpdateChan := RegisterFakeCopyOnUpdate("daemonsets", &cluster1Client.Fake, cluster1Watch)
cluster2Client := &fake_kubeclientset.Clientset{}
cluster2Watch := RegisterFakeWatch("daemonsets", &cluster2Client.Fake)
RegisterFakeList("daemonsets", &cluster2Client.Fake, &extensionsv1.DaemonSetList{Items: []extensionsv1.DaemonSet{}})
cluster2CreateChan := RegisterFakeCopyOnCreate("daemonsets", &cluster2Client.Fake, cluster2Watch)
daemonsetController := NewDaemonSetController(fakeClient)
informer := ToFederatedInformerForTestOnly(daemonsetController.daemonsetFederatedInformer)
informer.SetClientFactory(func(cluster *federation_api.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
return cluster1Client, nil
case cluster2.Name:
return cluster2Client, nil
default:
return nil, fmt.Errorf("Unknown cluster")
}
})
daemonsetController.clusterAvailableDelay = time.Second
daemonsetController.daemonsetReviewDelay = 50 * time.Millisecond
daemonsetController.smallDelay = 20 * time.Millisecond
daemonsetController.updateTimeout = 5 * time.Second
stop := make(chan struct{})
daemonsetController.Run(stop)
daemonset1 := extensionsv1.DaemonSet{
ObjectMeta: api_v1.ObjectMeta{
Name: "test-daemonset",
Namespace: "ns",
SelfLink: "/api/v1/namespaces/ns/daemonsets/test-daemonset",
},
Spec: extensionsv1.DaemonSetSpec{
Selector: &extensionsv1.LabelSelector{
MatchLabels: make(map[string]string),
},
},
}
// Test add federated daemonset.
daemonsetWatch.Add(&daemonset1)
createdDaemonSet := GetDaemonSetFromChan(cluster1CreateChan)
assert.NotNil(t, createdDaemonSet)
assert.Equal(t, daemonset1.Namespace, createdDaemonSet.Namespace)
assert.Equal(t, daemonset1.Name, createdDaemonSet.Name)
assert.True(t, daemonsetsEqual(daemonset1, *createdDaemonSet))
// Wait for the daemonset to appear in the informer store
err := WaitForStoreUpdate(
daemonsetController.daemonsetFederatedInformer.GetTargetStore(),
cluster1.Name, getDaemonSetKey(daemonset1.Namespace, daemonset1.Name), wait.ForeverTestTimeout)
assert.Nil(t, err, "daemonset should have appeared in the informer store")
// Test update federated daemonset.
daemonset1.Annotations = map[string]string{
"A": "B",
}
daemonsetWatch.Modify(&daemonset1)
updatedDaemonSet := GetDaemonSetFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedDaemonSet)
assert.Equal(t, daemonset1.Name, updatedDaemonSet.Name)
assert.Equal(t, daemonset1.Namespace, updatedDaemonSet.Namespace)
assert.True(t, daemonsetsEqual(daemonset1, *updatedDaemonSet))
// Test update federated daemonset.
daemonset1.Spec.Template.Name = "TEST"
daemonsetWatch.Modify(&daemonset1)
updatedDaemonSet = GetDaemonSetFromChan(cluster1UpdateChan)
assert.NotNil(t, updatedDaemonSet)
assert.Equal(t, daemonset1.Name, updatedDaemonSet.Name)
assert.Equal(t, daemonset1.Namespace, updatedDaemonSet.Namespace)
assert.True(t, daemonsetsEqual(daemonset1, *updatedDaemonSet))
// Test add cluster
clusterWatch.Add(cluster2)
createdDaemonSet2 := GetDaemonSetFromChan(cluster2CreateChan)
assert.NotNil(t, createdDaemonSet2)
assert.Equal(t, daemonset1.Name, createdDaemonSet2.Name)
assert.Equal(t, daemonset1.Namespace, createdDaemonSet2.Namespace)
assert.True(t, daemonsetsEqual(daemonset1, *createdDaemonSet2))
close(stop)
}
func daemonsetsEqual(a, b extensionsv1.DaemonSet) bool {
return util.ObjectMetaEquivalent(a.ObjectMeta, b.ObjectMeta) && reflect.DeepEqual(a.Spec, b.Spec)
}
func GetDaemonSetFromChan(c chan runtime.Object) *extensionsv1.DaemonSet {
daemonset := GetObjectFromChan(c).(*extensionsv1.DaemonSet)
return daemonset
}