From 7fdad4bcf9477e337cacb57905df242c7d9737ca Mon Sep 17 00:00:00 2001 From: moelsayed Date: Thu, 21 Dec 2017 01:11:50 +0200 Subject: [PATCH] Create a general retry function for k8s requests --- k8s/clusterrole.go | 69 ++++++++++++++++++------------------------- k8s/job.go | 35 ++++++++++------------ k8s/k8s.go | 26 ++++++++++++++++ k8s/psp.go | 17 +++-------- k8s/role.go | 34 +++++---------------- k8s/serviceaccount.go | 35 ++++++++++------------ 6 files changed, 97 insertions(+), 119 deletions(-) diff --git a/k8s/clusterrole.go b/k8s/clusterrole.go index cc6e03a9..ae231c51 100644 --- a/k8s/clusterrole.go +++ b/k8s/clusterrole.go @@ -1,61 +1,48 @@ package k8s import ( - "bytes" - "time" - rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - yamlutil "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" ) func UpdateClusterRoleBindingFromYaml(k8sClient *kubernetes.Clientset, clusterRoleBindingYaml string) error { clusterRoleBinding := rbacv1.ClusterRoleBinding{} - decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(clusterRoleBindingYaml))) - err := decoder.Decode(&clusterRoleBinding) - if err != nil { + if err := decodeYamlResource(&clusterRoleBinding, clusterRoleBindingYaml); err != nil { return err } - - for retries := 0; retries <= 5; retries++ { - if _, err = k8sClient.RbacV1().ClusterRoleBindings().Create(&clusterRoleBinding); err != nil { - if apierrors.IsAlreadyExists(err) { - if _, err = k8sClient.RbacV1().ClusterRoleBindings().Update(&clusterRoleBinding); err == nil { - return nil - } - } - } else { - return nil - } - time.Sleep(time.Second * 5) - } - return err + return retryTo(updateClusterRoleBinding, k8sClient, clusterRoleBinding, DefaultRetries, DefaultSleepSeconds) } -func UpdateClusterRoleFromYaml(k8sClient *kubernetes.Clientset, clusterRoleYaml string) error { - clusterRole := rbacv1.ClusterRole{} - err := decodeYamlResource(&clusterRole, clusterRoleYaml) - if err != nil { - return err - } - - for retries := 0; retries <= 5; retries++ { - if err = updateClusterRole(k8sClient, clusterRole); err != nil { - time.Sleep(time.Second * 5) - continue - } - return nil - } - return err -} - -func updateClusterRole(k8sClient *kubernetes.Clientset, cr rbacv1.ClusterRole) error { - if _, err := k8sClient.RbacV1().ClusterRoles().Create(&cr); err != nil { +func updateClusterRoleBinding(k8sClient *kubernetes.Clientset, crb interface{}) error { + clusterRoleBinding := crb.(rbacv1.ClusterRoleBinding) + if _, err := k8sClient.RbacV1().ClusterRoleBindings().Create(&clusterRoleBinding); err != nil { if !apierrors.IsAlreadyExists(err) { return err } - if _, err := k8sClient.RbacV1().ClusterRoles().Update(&cr); err != nil { + if _, err := k8sClient.RbacV1().ClusterRoleBindings().Update(&clusterRoleBinding); err != nil { + return err + } + } + return nil +} + +func UpdateClusterRoleFromYaml(k8sClient *kubernetes.Clientset, clusterRoleYaml string) error { + clusterRole := rbacv1.ClusterRole{} + if err := decodeYamlResource(&clusterRole, clusterRoleYaml); err != nil { + return err + } + + return retryTo(updateClusterRole, k8sClient, clusterRole, DefaultRetries, DefaultSleepSeconds) +} + +func updateClusterRole(k8sClient *kubernetes.Clientset, cr interface{}) error { + clusterRole := cr.(rbacv1.ClusterRole) + if _, err := k8sClient.RbacV1().ClusterRoles().Create(&clusterRole); err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + if _, err := k8sClient.RbacV1().ClusterRoles().Update(&clusterRole); err != nil { return err } } diff --git a/k8s/job.go b/k8s/job.go index b9eae0f4..80586c96 100644 --- a/k8s/job.go +++ b/k8s/job.go @@ -1,24 +1,20 @@ package k8s import ( - "bytes" "fmt" - "time" "github.com/sirupsen/logrus" "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - yamlutil "k8s.io/apimachinery/pkg/util/yaml" ) func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error { job := v1.Job{} - decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(jobYaml))) - err := decoder.Decode(&job) - if err != nil { + if err := decodeYamlResource(&job, jobYaml); err != nil { return err } if job.Namespace == metav1.NamespaceNone { @@ -35,22 +31,23 @@ func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error { } return err } - existingJob := &v1.Job{} logrus.Debugf("[k8s] waiting for job %s to complete..", job.Name) - for retries := 0; retries <= 5; retries++ { - time.Sleep(time.Second * 5) - existingJob, err = k8sClient.BatchV1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("Failed to update job status: %v", err) + return retryTo(ensureJobCompleted, k8sClient, job, DefaultRetries, DefaultSleepSeconds) +} - } - for _, condition := range existingJob.Status.Conditions { - if condition.Type == v1.JobComplete && condition.Status == corev1.ConditionTrue { - logrus.Debugf("[k8s] Job %s completed successfully..", job.Name) - return nil - } - } +func ensureJobCompleted(k8sClient *kubernetes.Clientset, j interface{}) error { + job := j.(v1.Job) + existingJob := &v1.Job{} + existingJob, err := k8sClient.BatchV1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("Failed to update job status: %v", err) } + for _, condition := range existingJob.Status.Conditions { + if condition.Type == v1.JobComplete && condition.Status == corev1.ConditionTrue { + logrus.Debugf("[k8s] Job %s completed successfully..", job.Name) + return nil + } + } return fmt.Errorf("Failed to get job complete status: %v", err) } diff --git a/k8s/k8s.go b/k8s/k8s.go index 7831cd1a..b68bd188 100644 --- a/k8s/k8s.go +++ b/k8s/k8s.go @@ -2,12 +2,20 @@ package k8s import ( "bytes" + "time" yamlutil "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) +const ( + DefaultRetries = 5 + DefaultSleepSeconds = 5 +) + +type k8sCall func(*kubernetes.Clientset, interface{}) error + func NewClient(kubeConfigPath string) (*kubernetes.Clientset, error) { // use the current admin kubeconfig config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) @@ -25,3 +33,21 @@ func decodeYamlResource(resource interface{}, yamlManifest string) error { decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(yamlManifest))) return decoder.Decode(&resource) } + +func retryTo(runFunc k8sCall, k8sClient *kubernetes.Clientset, resource interface{}, retries, sleepSeconds int) error { + var err error + if retries == 0 { + retries = DefaultRetries + } + if sleepSeconds == 0 { + sleepSeconds = DefaultSleepSeconds + } + for i := 0; i < retries; i++ { + if err = runFunc(k8sClient, resource); err != nil { + time.Sleep(time.Second * time.Duration(sleepSeconds)) + continue + } + return nil + } + return err +} diff --git a/k8s/psp.go b/k8s/psp.go index 0f368189..a41d7cd2 100644 --- a/k8s/psp.go +++ b/k8s/psp.go @@ -1,8 +1,6 @@ package k8s import ( - "time" - "k8s.io/api/extensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" @@ -10,21 +8,14 @@ import ( func UpdatePodSecurityPolicyFromYaml(k8sClient *kubernetes.Clientset, pspYaml string) error { psp := v1beta1.PodSecurityPolicy{} - err := decodeYamlResource(&psp, pspYaml) - if err != nil { + if err := decodeYamlResource(&psp, pspYaml); err != nil { return err } - for retries := 0; retries <= 5; retries++ { - if err = updatePodSecurityPolicy(k8sClient, psp); err != nil { - time.Sleep(time.Second * 5) - continue - } - return nil - } - return err + return retryTo(updatePodSecurityPolicy, k8sClient, psp, DefaultRetries, DefaultSleepSeconds) } -func updatePodSecurityPolicy(k8sClient *kubernetes.Clientset, psp v1beta1.PodSecurityPolicy) error { +func updatePodSecurityPolicy(k8sClient *kubernetes.Clientset, p interface{}) error { + psp := p.(v1beta1.PodSecurityPolicy) if _, err := k8sClient.ExtensionsV1beta1().PodSecurityPolicies().Create(&psp); err != nil { if !apierrors.IsAlreadyExists(err) { return err diff --git a/k8s/role.go b/k8s/role.go index 5da857c7..4fb696ca 100644 --- a/k8s/role.go +++ b/k8s/role.go @@ -1,8 +1,6 @@ package k8s import ( - "time" - rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" @@ -10,22 +8,14 @@ import ( func UpdateRoleBindingFromYaml(k8sClient *kubernetes.Clientset, roleBindingYaml string) error { roleBinding := rbacv1.RoleBinding{} - err := decodeYamlResource(&roleBinding, roleBindingYaml) - if err != nil { + if err := decodeYamlResource(&roleBinding, roleBindingYaml); err != nil { return err } - - for retries := 0; retries <= 5; retries++ { - if err = updateRoleBinding(k8sClient, roleBinding); err != nil { - time.Sleep(time.Second * 5) - continue - } - return nil - } - return err + return retryTo(updateRoleBinding, k8sClient, roleBinding, DefaultRetries, DefaultSleepSeconds) } -func updateRoleBinding(k8sClient *kubernetes.Clientset, roleBinding rbacv1.RoleBinding) error { +func updateRoleBinding(k8sClient *kubernetes.Clientset, rb interface{}) error { + roleBinding := rb.(rbacv1.RoleBinding) if _, err := k8sClient.RbacV1().RoleBindings(roleBinding.Namespace).Create(&roleBinding); err != nil { if !apierrors.IsAlreadyExists(err) { return err @@ -39,22 +29,14 @@ func updateRoleBinding(k8sClient *kubernetes.Clientset, roleBinding rbacv1.RoleB func UpdateRoleFromYaml(k8sClient *kubernetes.Clientset, roleYaml string) error { role := rbacv1.Role{} - err := decodeYamlResource(&role, roleYaml) - if err != nil { + if err := decodeYamlResource(&role, roleYaml); err != nil { return err } - - for retries := 0; retries <= 5; retries++ { - if err = updateRole(k8sClient, role); err != nil { - time.Sleep(time.Second * 5) - continue - } - return nil - } - return err + return retryTo(updateRole, k8sClient, role, DefaultRetries, DefaultSleepSeconds) } -func updateRole(k8sClient *kubernetes.Clientset, role rbacv1.Role) error { +func updateRole(k8sClient *kubernetes.Clientset, r interface{}) error { + role := r.(rbacv1.Role) if _, err := k8sClient.RbacV1().Roles(role.Namespace).Create(&role); err != nil { if !apierrors.IsAlreadyExists(err) { return err diff --git a/k8s/serviceaccount.go b/k8s/serviceaccount.go index ded90b72..8a433d39 100644 --- a/k8s/serviceaccount.go +++ b/k8s/serviceaccount.go @@ -1,35 +1,30 @@ package k8s import ( - "bytes" - "time" - "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - yamlutil "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" ) func UpdateServiceAccountFromYaml(k8sClient *kubernetes.Clientset, serviceAccountYaml string) error { serviceAccount := v1.ServiceAccount{} - decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(serviceAccountYaml))) - err := decoder.Decode(&serviceAccount) - if err != nil { + if err := decodeYamlResource(&serviceAccount, serviceAccountYaml); err != nil { return err } - for retries := 0; retries <= 5; retries++ { - if _, err = k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(&serviceAccount); err != nil { - if apierrors.IsAlreadyExists(err) { - if _, err = k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Update(&serviceAccount); err == nil { - return nil - } - } - } else { - return nil - } - time.Sleep(time.Second * 5) - } - return err + return retryTo(updateServiceAccount, k8sClient, serviceAccount, DefaultRetries, DefaultSleepSeconds) +} + +func updateServiceAccount(k8sClient *kubernetes.Clientset, s interface{}) error { + serviceAccount := s.(v1.ServiceAccount) + if _, err := k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(&serviceAccount); err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + if _, err := k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Update(&serviceAccount); err != nil { + return err + } + } + return nil }