1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-16 06:59:25 +00:00

Merge pull request #179 from moelsayed/refactor_retries

Create a general retry function for k8s requests
This commit is contained in:
Alena Prokharchyk
2018-01-02 13:33:53 -08:00
committed by GitHub
6 changed files with 97 additions and 119 deletions

View File

@@ -1,61 +1,48 @@
package k8s package k8s
import ( import (
"bytes"
"time"
rbacv1 "k8s.io/api/rbac/v1" rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
func UpdateClusterRoleBindingFromYaml(k8sClient *kubernetes.Clientset, clusterRoleBindingYaml string) error { func UpdateClusterRoleBindingFromYaml(k8sClient *kubernetes.Clientset, clusterRoleBindingYaml string) error {
clusterRoleBinding := rbacv1.ClusterRoleBinding{} clusterRoleBinding := rbacv1.ClusterRoleBinding{}
decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(clusterRoleBindingYaml))) if err := decodeYamlResource(&clusterRoleBinding, clusterRoleBindingYaml); err != nil {
err := decoder.Decode(&clusterRoleBinding)
if err != nil {
return err return err
} }
return retryTo(updateClusterRoleBinding, k8sClient, clusterRoleBinding, DefaultRetries, DefaultSleepSeconds)
for retries := 0; retries <= 5; retries++ {
if _, err = k8sClient.RbacV1().ClusterRoleBindings().Create(&clusterRoleBinding); err != nil {
if apierrors.IsAlreadyExists(err) {
if _, err = k8sClient.RbacV1().ClusterRoleBindings().Update(&clusterRoleBinding); err == nil {
return nil
}
}
} else {
return nil
}
time.Sleep(time.Second * 5)
}
return err
} }
func UpdateClusterRoleFromYaml(k8sClient *kubernetes.Clientset, clusterRoleYaml string) error { func updateClusterRoleBinding(k8sClient *kubernetes.Clientset, crb interface{}) error {
clusterRole := rbacv1.ClusterRole{} clusterRoleBinding := crb.(rbacv1.ClusterRoleBinding)
err := decodeYamlResource(&clusterRole, clusterRoleYaml) if _, err := k8sClient.RbacV1().ClusterRoleBindings().Create(&clusterRoleBinding); err != nil {
if err != nil {
return err
}
for retries := 0; retries <= 5; retries++ {
if err = updateClusterRole(k8sClient, clusterRole); err != nil {
time.Sleep(time.Second * 5)
continue
}
return nil
}
return err
}
func updateClusterRole(k8sClient *kubernetes.Clientset, cr rbacv1.ClusterRole) error {
if _, err := k8sClient.RbacV1().ClusterRoles().Create(&cr); err != nil {
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
return err return err
} }
if _, err := k8sClient.RbacV1().ClusterRoles().Update(&cr); err != nil { if _, err := k8sClient.RbacV1().ClusterRoleBindings().Update(&clusterRoleBinding); err != nil {
return err
}
}
return nil
}
func UpdateClusterRoleFromYaml(k8sClient *kubernetes.Clientset, clusterRoleYaml string) error {
clusterRole := rbacv1.ClusterRole{}
if err := decodeYamlResource(&clusterRole, clusterRoleYaml); err != nil {
return err
}
return retryTo(updateClusterRole, k8sClient, clusterRole, DefaultRetries, DefaultSleepSeconds)
}
func updateClusterRole(k8sClient *kubernetes.Clientset, cr interface{}) error {
clusterRole := cr.(rbacv1.ClusterRole)
if _, err := k8sClient.RbacV1().ClusterRoles().Create(&clusterRole); err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
if _, err := k8sClient.RbacV1().ClusterRoles().Update(&clusterRole); err != nil {
return err return err
} }
} }

View File

@@ -1,24 +1,20 @@
package k8s package k8s
import ( import (
"bytes"
"fmt" "fmt"
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/api/batch/v1" "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
) )
func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error { func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error {
job := v1.Job{} job := v1.Job{}
decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(jobYaml))) if err := decodeYamlResource(&job, jobYaml); err != nil {
err := decoder.Decode(&job)
if err != nil {
return err return err
} }
if job.Namespace == metav1.NamespaceNone { if job.Namespace == metav1.NamespaceNone {
@@ -35,22 +31,23 @@ func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error {
} }
return err return err
} }
existingJob := &v1.Job{}
logrus.Debugf("[k8s] waiting for job %s to complete..", job.Name) logrus.Debugf("[k8s] waiting for job %s to complete..", job.Name)
for retries := 0; retries <= 5; retries++ { return retryTo(ensureJobCompleted, k8sClient, job, DefaultRetries, DefaultSleepSeconds)
time.Sleep(time.Second * 5) }
existingJob, err = k8sClient.BatchV1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("Failed to update job status: %v", err)
} func ensureJobCompleted(k8sClient *kubernetes.Clientset, j interface{}) error {
for _, condition := range existingJob.Status.Conditions { job := j.(v1.Job)
if condition.Type == v1.JobComplete && condition.Status == corev1.ConditionTrue { existingJob := &v1.Job{}
logrus.Debugf("[k8s] Job %s completed successfully..", job.Name) existingJob, err := k8sClient.BatchV1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
return nil if err != nil {
} return fmt.Errorf("Failed to update job status: %v", err)
}
} }
for _, condition := range existingJob.Status.Conditions {
if condition.Type == v1.JobComplete && condition.Status == corev1.ConditionTrue {
logrus.Debugf("[k8s] Job %s completed successfully..", job.Name)
return nil
}
}
return fmt.Errorf("Failed to get job complete status: %v", err) return fmt.Errorf("Failed to get job complete status: %v", err)
} }

View File

@@ -2,12 +2,20 @@ package k8s
import ( import (
"bytes" "bytes"
"time"
yamlutil "k8s.io/apimachinery/pkg/util/yaml" yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
) )
const (
DefaultRetries = 5
DefaultSleepSeconds = 5
)
type k8sCall func(*kubernetes.Clientset, interface{}) error
func NewClient(kubeConfigPath string) (*kubernetes.Clientset, error) { func NewClient(kubeConfigPath string) (*kubernetes.Clientset, error) {
// use the current admin kubeconfig // use the current admin kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
@@ -25,3 +33,21 @@ func decodeYamlResource(resource interface{}, yamlManifest string) error {
decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(yamlManifest))) decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(yamlManifest)))
return decoder.Decode(&resource) return decoder.Decode(&resource)
} }
func retryTo(runFunc k8sCall, k8sClient *kubernetes.Clientset, resource interface{}, retries, sleepSeconds int) error {
var err error
if retries == 0 {
retries = DefaultRetries
}
if sleepSeconds == 0 {
sleepSeconds = DefaultSleepSeconds
}
for i := 0; i < retries; i++ {
if err = runFunc(k8sClient, resource); err != nil {
time.Sleep(time.Second * time.Duration(sleepSeconds))
continue
}
return nil
}
return err
}

View File

@@ -1,8 +1,6 @@
package k8s package k8s
import ( import (
"time"
"k8s.io/api/extensions/v1beta1" "k8s.io/api/extensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@@ -10,21 +8,14 @@ import (
func UpdatePodSecurityPolicyFromYaml(k8sClient *kubernetes.Clientset, pspYaml string) error { func UpdatePodSecurityPolicyFromYaml(k8sClient *kubernetes.Clientset, pspYaml string) error {
psp := v1beta1.PodSecurityPolicy{} psp := v1beta1.PodSecurityPolicy{}
err := decodeYamlResource(&psp, pspYaml) if err := decodeYamlResource(&psp, pspYaml); err != nil {
if err != nil {
return err return err
} }
for retries := 0; retries <= 5; retries++ { return retryTo(updatePodSecurityPolicy, k8sClient, psp, DefaultRetries, DefaultSleepSeconds)
if err = updatePodSecurityPolicy(k8sClient, psp); err != nil {
time.Sleep(time.Second * 5)
continue
}
return nil
}
return err
} }
func updatePodSecurityPolicy(k8sClient *kubernetes.Clientset, psp v1beta1.PodSecurityPolicy) error { func updatePodSecurityPolicy(k8sClient *kubernetes.Clientset, p interface{}) error {
psp := p.(v1beta1.PodSecurityPolicy)
if _, err := k8sClient.ExtensionsV1beta1().PodSecurityPolicies().Create(&psp); err != nil { if _, err := k8sClient.ExtensionsV1beta1().PodSecurityPolicies().Create(&psp); err != nil {
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
return err return err

View File

@@ -1,8 +1,6 @@
package k8s package k8s
import ( import (
"time"
rbacv1 "k8s.io/api/rbac/v1" rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@@ -10,22 +8,14 @@ import (
func UpdateRoleBindingFromYaml(k8sClient *kubernetes.Clientset, roleBindingYaml string) error { func UpdateRoleBindingFromYaml(k8sClient *kubernetes.Clientset, roleBindingYaml string) error {
roleBinding := rbacv1.RoleBinding{} roleBinding := rbacv1.RoleBinding{}
err := decodeYamlResource(&roleBinding, roleBindingYaml) if err := decodeYamlResource(&roleBinding, roleBindingYaml); err != nil {
if err != nil {
return err return err
} }
return retryTo(updateRoleBinding, k8sClient, roleBinding, DefaultRetries, DefaultSleepSeconds)
for retries := 0; retries <= 5; retries++ {
if err = updateRoleBinding(k8sClient, roleBinding); err != nil {
time.Sleep(time.Second * 5)
continue
}
return nil
}
return err
} }
func updateRoleBinding(k8sClient *kubernetes.Clientset, roleBinding rbacv1.RoleBinding) error { func updateRoleBinding(k8sClient *kubernetes.Clientset, rb interface{}) error {
roleBinding := rb.(rbacv1.RoleBinding)
if _, err := k8sClient.RbacV1().RoleBindings(roleBinding.Namespace).Create(&roleBinding); err != nil { if _, err := k8sClient.RbacV1().RoleBindings(roleBinding.Namespace).Create(&roleBinding); err != nil {
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
return err return err
@@ -39,22 +29,14 @@ func updateRoleBinding(k8sClient *kubernetes.Clientset, roleBinding rbacv1.RoleB
func UpdateRoleFromYaml(k8sClient *kubernetes.Clientset, roleYaml string) error { func UpdateRoleFromYaml(k8sClient *kubernetes.Clientset, roleYaml string) error {
role := rbacv1.Role{} role := rbacv1.Role{}
err := decodeYamlResource(&role, roleYaml) if err := decodeYamlResource(&role, roleYaml); err != nil {
if err != nil {
return err return err
} }
return retryTo(updateRole, k8sClient, role, DefaultRetries, DefaultSleepSeconds)
for retries := 0; retries <= 5; retries++ {
if err = updateRole(k8sClient, role); err != nil {
time.Sleep(time.Second * 5)
continue
}
return nil
}
return err
} }
func updateRole(k8sClient *kubernetes.Clientset, role rbacv1.Role) error { func updateRole(k8sClient *kubernetes.Clientset, r interface{}) error {
role := r.(rbacv1.Role)
if _, err := k8sClient.RbacV1().Roles(role.Namespace).Create(&role); err != nil { if _, err := k8sClient.RbacV1().Roles(role.Namespace).Create(&role); err != nil {
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
return err return err

View File

@@ -1,35 +1,30 @@
package k8s package k8s
import ( import (
"bytes"
"time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )
func UpdateServiceAccountFromYaml(k8sClient *kubernetes.Clientset, serviceAccountYaml string) error { func UpdateServiceAccountFromYaml(k8sClient *kubernetes.Clientset, serviceAccountYaml string) error {
serviceAccount := v1.ServiceAccount{} serviceAccount := v1.ServiceAccount{}
decoder := yamlutil.NewYAMLToJSONDecoder(bytes.NewReader([]byte(serviceAccountYaml))) if err := decodeYamlResource(&serviceAccount, serviceAccountYaml); err != nil {
err := decoder.Decode(&serviceAccount)
if err != nil {
return err return err
} }
for retries := 0; retries <= 5; retries++ { return retryTo(updateServiceAccount, k8sClient, serviceAccount, DefaultRetries, DefaultSleepSeconds)
if _, err = k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(&serviceAccount); err != nil { }
if apierrors.IsAlreadyExists(err) {
if _, err = k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Update(&serviceAccount); err == nil { func updateServiceAccount(k8sClient *kubernetes.Clientset, s interface{}) error {
return nil serviceAccount := s.(v1.ServiceAccount)
} if _, err := k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(&serviceAccount); err != nil {
} if !apierrors.IsAlreadyExists(err) {
} else { return err
return nil }
} if _, err := k8sClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Update(&serviceAccount); err != nil {
time.Sleep(time.Second * 5) return err
} }
return err }
return nil
} }