mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 05:57:25 +00:00
Generic atomic update code
This commit is contained in:
parent
4102abe11c
commit
529870d121
@ -91,7 +91,7 @@ func makeContainerKey(machine string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) {
|
func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) {
|
||||||
err, index = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
|
index, err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
|
||||||
return manifests, index, err
|
return manifests, index, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,7 +190,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
|||||||
|
|
||||||
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
||||||
key := makePodKey(machine, podID)
|
key := makePodKey(machine, podID)
|
||||||
err, _ = registry.helper().ExtractObj(key, &pod, false)
|
_, err = registry.helper().ExtractObj(key, &pod, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -225,7 +225,7 @@ func makeControllerKey(id string) string {
|
|||||||
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
|
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
|
||||||
var controller api.ReplicationController
|
var controller api.ReplicationController
|
||||||
key := makeControllerKey(controllerID)
|
key := makeControllerKey(controllerID)
|
||||||
err, _ := registry.helper().ExtractObj(key, &controller, false)
|
_, err := registry.helper().ExtractObj(key, &controller, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -264,7 +264,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error {
|
|||||||
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
|
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
|
||||||
key := makeServiceKey(name)
|
key := makeServiceKey(name)
|
||||||
var svc api.Service
|
var svc api.Service
|
||||||
err, _ := registry.helper().ExtractObj(key, &svc, false)
|
_, err := registry.helper().ExtractObj(key, &svc, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -116,26 +116,13 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
|
|||||||
// Unmarshals json found at key into objPtr. On a not found error, will either return
|
// Unmarshals json found at key into objPtr. On a not found error, will either return
|
||||||
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
||||||
// empty responses and nil response nodes exactly like a not found error.
|
// empty responses and nil response nodes exactly like a not found error.
|
||||||
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (error, uint64) {
|
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (modifiedIndex uint64, err error) {
|
||||||
response, err := h.Client.Get(key, false, false)
|
_, modifiedIndex, err = h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
||||||
|
return modifiedIndex, err
|
||||||
if err != nil && !IsEtcdNotFound(err) {
|
|
||||||
return err, 0
|
|
||||||
}
|
|
||||||
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
|
|
||||||
if ignoreNotFound {
|
|
||||||
pv := reflect.ValueOf(objPtr)
|
|
||||||
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
|
|
||||||
return nil, 0
|
|
||||||
} else if err != nil {
|
|
||||||
return err, 0
|
|
||||||
}
|
|
||||||
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response), 0
|
|
||||||
}
|
|
||||||
return json.Unmarshal([]byte(response.Node.Value), objPtr), response.Node.ModifiedIndex
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches the previous modified index
|
// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches
|
||||||
|
// the previous modified index.
|
||||||
func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error {
|
func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error {
|
||||||
data, err := json.Marshal(obj)
|
data, err := json.Marshal(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -145,6 +132,32 @@ func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
|
||||||
|
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
||||||
|
return err
|
||||||
|
}*/
|
||||||
|
|
||||||
|
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
|
||||||
|
response, err := h.Client.Get(key, false, false)
|
||||||
|
|
||||||
|
if err != nil && !IsEtcdNotFound(err) {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
|
||||||
|
if ignoreNotFound {
|
||||||
|
pv := reflect.ValueOf(objPtr)
|
||||||
|
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
|
||||||
|
return "", 0, nil
|
||||||
|
} else if err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
|
||||||
|
}
|
||||||
|
body = response.Node.Value
|
||||||
|
return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr)
|
||||||
|
}
|
||||||
|
|
||||||
// SetObj marshals obj via json, and stores under key.
|
// SetObj marshals obj via json, and stores under key.
|
||||||
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
|
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
|
||||||
data, err := json.Marshal(obj)
|
data, err := json.Marshal(obj)
|
||||||
@ -154,3 +167,46 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
|
|||||||
_, err = h.Client.Set(key, string(data), 0)
|
_, err = h.Client.Set(key, string(data), 0)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
|
||||||
|
// See the comment for AtomicUpdate for more detail.
|
||||||
|
type EtcdUpdateFunc func() (interface{}, error)
|
||||||
|
|
||||||
|
// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
|
||||||
|
// Note, tryUpdate may be called more than once.
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// h := &util.EtcdHelper{client}
|
||||||
|
// var currentObj MyType
|
||||||
|
// err := h.AtomicUpdate("myKey", ¤tObj, func() (interface{}, error) {
|
||||||
|
// // Before this function is called, currentObj has been reset to etcd's current
|
||||||
|
// // contents for "myKey".
|
||||||
|
//
|
||||||
|
// // Make a *modification*.
|
||||||
|
// currentObj.Counter++
|
||||||
|
//
|
||||||
|
// // Return the modified object. Return an error to stop iterating.
|
||||||
|
// return currentObj, nil
|
||||||
|
// })
|
||||||
|
//
|
||||||
|
func (h *EtcdHelper) AtomicUpdate(key string, objPtr interface{}, tryUpdate EtcdUpdateFunc) error {
|
||||||
|
for {
|
||||||
|
origBody, index, err := h.bodyAndExtractObj(key, objPtr, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
ret, err := tryUpdate()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(ret)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) {
|
|||||||
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
|
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
|
||||||
helper := EtcdHelper{fakeClient}
|
helper := EtcdHelper{fakeClient}
|
||||||
var got testMarshalType
|
var got testMarshalType
|
||||||
err, _ := helper.ExtractObj("/some/key", &got, false)
|
_, err := helper.ExtractObj("/some/key", &got, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
|||||||
helper := EtcdHelper{fakeClient}
|
helper := EtcdHelper{fakeClient}
|
||||||
try := func(key string) {
|
try := func(key string) {
|
||||||
var got testMarshalType
|
var got testMarshalType
|
||||||
err, _ := helper.ExtractObj(key, &got, false)
|
_, err := helper.ExtractObj(key, &got, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("%s: wanted error but didn't get one", key)
|
t.Errorf("%s: wanted error but didn't get one", key)
|
||||||
}
|
}
|
||||||
err, _ = helper.ExtractObj(key, &got, true)
|
_, err = helper.ExtractObj(key, &got, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s: didn't want error but got %#v", key, err)
|
t.Errorf("%s: didn't want error but got %#v", key, err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user