mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Cleanup to use AtomicUpdate.
This commit is contained in:
parent
529870d121
commit
999ea50c2a
@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package registry
|
package registry
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@ -90,56 +89,35 @@ func makeContainerKey(machine string) string {
|
|||||||
return "/registry/hosts/" + machine + "/kubelet"
|
return "/registry/hosts/" + machine + "/kubelet"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) {
|
|
||||||
index, err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
|
|
||||||
return manifests, index, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest, index uint64) error {
|
|
||||||
if index != 0 {
|
|
||||||
return registry.helper().CompareAndSwapObj(makeContainerKey(machine), manifests, index)
|
|
||||||
} else {
|
|
||||||
return registry.helper().SetObj(makeContainerKey(machine), manifests)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
|
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
|
||||||
podOut, machine, err := registry.findPod(pod.ID)
|
podOut, machine, err := registry.findPod(pod.ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
// TODO: this error message looks racy.
|
||||||
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
|
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
|
||||||
}
|
}
|
||||||
return registry.runPod(pod, machineIn)
|
return registry.runPod(pod, machineIn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
||||||
manifests, index, err := registry.loadManifests(machine)
|
podKey := makePodKey(machine, pod.ID)
|
||||||
if err != nil {
|
err := registry.helper().SetObj(podKey, pod)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
key := makePodKey(machine, pod.ID)
|
|
||||||
data, err := json.Marshal(pod)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = registry.etcdClient.Create(key, string(data), 0)
|
|
||||||
|
|
||||||
manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
|
manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for {
|
|
||||||
manifests = append(manifests, manifest)
|
contKey := makeContainerKey(machine)
|
||||||
err = registry.updateManifests(machine, manifests, index)
|
var manifests []api.ContainerManifest
|
||||||
if util.IsEtcdConflict(err) {
|
err = registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) {
|
||||||
manifests, index, err = registry.loadManifests(machine)
|
return append(manifests, manifest), nil
|
||||||
if err != nil {
|
})
|
||||||
return err
|
if err != nil {
|
||||||
}
|
// Don't strand stuff.
|
||||||
continue
|
registry.etcdClient.Delete(podKey, false)
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
|
func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
|
||||||
@ -155,12 +133,19 @@ func (registry *EtcdRegistry) DeletePod(podID string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
|
func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
|
||||||
for {
|
// First delete the pod, so a scheduler doesn't notice it getting removed from the
|
||||||
manifests, index, err := registry.loadManifests(machine)
|
// machine and attempt to put it somewhere.
|
||||||
if err != nil {
|
podKey := makePodKey(machine, podID)
|
||||||
return err
|
_, err := registry.etcdClient.Delete(podKey, true)
|
||||||
}
|
if err != nil {
|
||||||
newManifests := make([]api.ContainerManifest, 0)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next, remove the pod from the machine atomically.
|
||||||
|
contKey := makeContainerKey(machine)
|
||||||
|
var manifests []api.ContainerManifest
|
||||||
|
return registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) {
|
||||||
|
newManifests := make([]api.ContainerManifest, 0, len(manifests))
|
||||||
found := false
|
found := false
|
||||||
for _, manifest := range manifests {
|
for _, manifest := range manifests {
|
||||||
if manifest.Id != podID {
|
if manifest.Id != podID {
|
||||||
@ -175,22 +160,13 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
|
|||||||
// However it is "deleted" so log it and move on
|
// However it is "deleted" so log it and move on
|
||||||
glog.Infof("Couldn't find: %s in %#v", podID, manifests)
|
glog.Infof("Couldn't find: %s in %#v", podID, manifests)
|
||||||
}
|
}
|
||||||
if err = registry.updateManifests(machine, newManifests, index); err != nil {
|
return newManifests, nil
|
||||||
if util.IsEtcdConflict(err) {
|
})
|
||||||
continue
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
key := makePodKey(machine, podID)
|
|
||||||
_, err := registry.etcdClient.Delete(key, true)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
||||||
key := makePodKey(machine, podID)
|
key := makePodKey(machine, podID)
|
||||||
_, err = registry.helper().ExtractObj(key, &pod, false)
|
err = registry.helper().ExtractObj(key, &pod, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -225,7 +201,7 @@ func makeControllerKey(id string) string {
|
|||||||
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
|
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
|
||||||
var controller api.ReplicationController
|
var controller api.ReplicationController
|
||||||
key := makeControllerKey(controllerID)
|
key := makeControllerKey(controllerID)
|
||||||
_, err := registry.helper().ExtractObj(key, &controller, false)
|
err := registry.helper().ExtractObj(key, &controller, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -264,7 +240,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error {
|
|||||||
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
|
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
|
||||||
key := makeServiceKey(name)
|
key := makeServiceKey(name)
|
||||||
var svc api.Service
|
var svc api.Service
|
||||||
_, err := registry.helper().ExtractObj(key, &svc, false)
|
err := registry.helper().ExtractObj(key, &svc, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -267,8 +267,7 @@ func TestEtcdDeletePod(t *testing.T) {
|
|||||||
expectNoError(t, err)
|
expectNoError(t, err)
|
||||||
if len(fakeClient.DeletedKeys) != 1 {
|
if len(fakeClient.DeletedKeys) != 1 {
|
||||||
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
|
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
|
||||||
}
|
} else if fakeClient.DeletedKeys[0] != key {
|
||||||
if fakeClient.DeletedKeys[0] != key {
|
|
||||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||||
}
|
}
|
||||||
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
|
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
|
||||||
|
@ -116,27 +116,10 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
|
|||||||
// Unmarshals json found at key into objPtr. On a not found error, will either return
|
// Unmarshals json found at key into objPtr. On a not found error, will either return
|
||||||
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
||||||
// empty responses and nil response nodes exactly like a not found error.
|
// empty responses and nil response nodes exactly like a not found error.
|
||||||
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (modifiedIndex uint64, err error) {
|
|
||||||
_, modifiedIndex, err = h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
|
||||||
return modifiedIndex, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches
|
|
||||||
// the previous modified index.
|
|
||||||
func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error {
|
|
||||||
data, err := json.Marshal(obj)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", index)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
|
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
|
||||||
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
|
||||||
return err
|
return err
|
||||||
}*/
|
}
|
||||||
|
|
||||||
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
|
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
|
||||||
response, err := h.Client.Get(key, false, false)
|
response, err := h.Client.Get(key, false, false)
|
||||||
@ -202,11 +185,19 @@ func (h *EtcdHelper) AtomicUpdate(key string, objPtr interface{}, tryUpdate Etcd
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First time this key has been used, just set.
|
||||||
|
if index == 0 {
|
||||||
|
//return h.SetObj(key, ret)
|
||||||
|
}
|
||||||
|
|
||||||
data, err := json.Marshal(ret)
|
data, err := json.Marshal(ret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
|
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
|
||||||
|
if IsEtcdConflict(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) {
|
|||||||
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
|
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
|
||||||
helper := EtcdHelper{fakeClient}
|
helper := EtcdHelper{fakeClient}
|
||||||
var got testMarshalType
|
var got testMarshalType
|
||||||
_, err := helper.ExtractObj("/some/key", &got, false)
|
err := helper.ExtractObj("/some/key", &got, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
|||||||
helper := EtcdHelper{fakeClient}
|
helper := EtcdHelper{fakeClient}
|
||||||
try := func(key string) {
|
try := func(key string) {
|
||||||
var got testMarshalType
|
var got testMarshalType
|
||||||
_, err := helper.ExtractObj(key, &got, false)
|
err := helper.ExtractObj(key, &got, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("%s: wanted error but didn't get one", key)
|
t.Errorf("%s: wanted error but didn't get one", key)
|
||||||
}
|
}
|
||||||
_, err = helper.ExtractObj(key, &got, true)
|
err = helper.ExtractObj(key, &got, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s: didn't want error but got %#v", key, err)
|
t.Errorf("%s: didn't want error but got %#v", key, err)
|
||||||
}
|
}
|
||||||
|
@ -61,6 +61,7 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response,
|
|||||||
f.t.Errorf("Unexpected get for %s", key)
|
f.t.Errorf("Unexpected get for %s", key)
|
||||||
return &etcd.Response{}, &etcd.EtcdError{ErrorCode: 100} // Key not found
|
return &etcd.Response{}, &etcd.EtcdError{ErrorCode: 100} // Key not found
|
||||||
}
|
}
|
||||||
|
f.t.Logf("returning %v: %v %#v", key, result.R, result.E)
|
||||||
return result.R, result.E
|
return result.R, result.E
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,6 +86,13 @@ func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response,
|
|||||||
return f.Set(key, value, ttl)
|
return f.Set(key, value, ttl)
|
||||||
}
|
}
|
||||||
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
|
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
|
||||||
|
f.Data[key] = EtcdResponseWithError{
|
||||||
|
R: &etcd.Response{
|
||||||
|
Node: nil,
|
||||||
|
},
|
||||||
|
E: &etcd.EtcdError{ErrorCode: 100},
|
||||||
|
}
|
||||||
|
|
||||||
f.DeletedKeys = append(f.DeletedKeys, key)
|
f.DeletedKeys = append(f.DeletedKeys, key)
|
||||||
return &etcd.Response{}, f.Err
|
return &etcd.Response{}, f.Err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user