mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Use etcd compare and swap to update the list of pods, to remove a race.
This commit is contained in:
parent
affaf173bf
commit
b25f950362
@ -7,6 +7,7 @@
|
|||||||
"desiredState": {
|
"desiredState": {
|
||||||
"manifest": {
|
"manifest": {
|
||||||
"containers": [{
|
"containers": [{
|
||||||
|
"name": "nginx",
|
||||||
"image": "dockerfile/nginx",
|
"image": "dockerfile/nginx",
|
||||||
"ports": [{"containerPort": 80, "hostPort": 8080}]
|
"ports": [{"containerPort": 80, "hostPort": 8080}]
|
||||||
}]
|
}]
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@ -36,6 +37,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
runtime.GOMAXPROCS(4)
|
||||||
util.InitLogs()
|
util.InitLogs()
|
||||||
defer util.FlushLogs()
|
defer util.FlushLogs()
|
||||||
|
|
||||||
@ -51,7 +53,7 @@ func main() {
|
|||||||
|
|
||||||
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(apiserver.URL, nil))
|
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), client.New(apiserver.URL, nil))
|
||||||
|
|
||||||
controllerManager.Run(10 * time.Second)
|
controllerManager.Run(1 * time.Second)
|
||||||
|
|
||||||
// Kublet
|
// Kublet
|
||||||
fakeDocker1 := &kubelet.FakeDockerClient{}
|
fakeDocker1 := &kubelet.FakeDockerClient{}
|
||||||
@ -102,7 +104,7 @@ func main() {
|
|||||||
// Validate that they're truly up.
|
// Validate that they're truly up.
|
||||||
pods, err := kubeClient.ListPods(nil)
|
pods, err := kubeClient.ListPods(nil)
|
||||||
if err != nil || len(pods.Items) != 2 {
|
if err != nil || len(pods.Items) != 2 {
|
||||||
glog.Fatal("FAILED")
|
glog.Fatal("FAILED: %#v", pods.Items)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that kubelet tried to make the pods.
|
// Check that kubelet tried to make the pods.
|
||||||
@ -124,7 +126,7 @@ func main() {
|
|||||||
// We expect 5: 2 net containers + 2 pods from the replication controller +
|
// We expect 5: 2 net containers + 2 pods from the replication controller +
|
||||||
// 1 net container + 2 pods from the URL.
|
// 1 net container + 2 pods from the URL.
|
||||||
if len(createdPods) != 7 {
|
if len(createdPods) != 7 {
|
||||||
glog.Fatalf("Unexpected list of created pods: %#v\n", createdPods)
|
glog.Fatalf("Unexpected list of created pods: %#v %#v %#v\n", createdPods, fakeDocker1.Created, fakeDocker2.Created)
|
||||||
}
|
}
|
||||||
glog.Infof("OK")
|
glog.Infof("OK")
|
||||||
}
|
}
|
||||||
@ -146,6 +148,7 @@ const (
|
|||||||
// This is copied from, and should be kept in sync with:
|
// This is copied from, and should be kept in sync with:
|
||||||
// https://raw.githubusercontent.com/GoogleCloudPlatform/container-vm-guestbook-redis-python/master/manifest.yaml
|
// https://raw.githubusercontent.com/GoogleCloudPlatform/container-vm-guestbook-redis-python/master/manifest.yaml
|
||||||
testManifestFile = `version: v1beta1
|
testManifestFile = `version: v1beta1
|
||||||
|
id: web-test
|
||||||
containers:
|
containers:
|
||||||
- name: redis
|
- name: redis
|
||||||
image: dockerfile/redis
|
image: dockerfile/redis
|
||||||
|
@ -90,13 +90,17 @@ func makeContainerKey(machine string) string {
|
|||||||
return "/registry/hosts/" + machine + "/kubelet"
|
return "/registry/hosts/" + machine + "/kubelet"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, err error) {
|
func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) {
|
||||||
err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
|
err, index = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
|
||||||
return
|
return manifests, index, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest) error {
|
func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest, index uint64) error {
|
||||||
return registry.helper().SetObj(makeContainerKey(machine), manifests)
|
if index != 0 {
|
||||||
|
return registry.helper().CompareAndSwapObj(makeContainerKey(machine), manifests, index)
|
||||||
|
} else {
|
||||||
|
return registry.helper().SetObj(makeContainerKey(machine), manifests)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
|
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
|
||||||
@ -108,7 +112,7 @@ func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
||||||
manifests, err := registry.loadManifests(machine)
|
manifests, index, err := registry.loadManifests(machine)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -124,8 +128,18 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
manifests = append(manifests, manifest)
|
for {
|
||||||
return registry.updateManifests(machine, manifests)
|
manifests = append(manifests, manifest)
|
||||||
|
err = registry.updateManifests(machine, manifests, index)
|
||||||
|
if util.IsEtcdConflict(err) {
|
||||||
|
manifests, index, err = registry.loadManifests(machine)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
|
func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
|
||||||
@ -141,36 +155,42 @@ func (registry *EtcdRegistry) DeletePod(podID string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
|
func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
|
||||||
manifests, err := registry.loadManifests(machine)
|
for {
|
||||||
if err != nil {
|
manifests, index, err := registry.loadManifests(machine)
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
newManifests := make([]api.ContainerManifest, 0)
|
|
||||||
found := false
|
|
||||||
for _, manifest := range manifests {
|
|
||||||
if manifest.Id != podID {
|
|
||||||
newManifests = append(newManifests, manifest)
|
|
||||||
} else {
|
|
||||||
found = true
|
|
||||||
}
|
}
|
||||||
}
|
newManifests := make([]api.ContainerManifest, 0)
|
||||||
if !found {
|
found := false
|
||||||
// This really shouldn't happen, it indicates something is broken, and likely
|
for _, manifest := range manifests {
|
||||||
// there is a lost pod somewhere.
|
if manifest.Id != podID {
|
||||||
// However it is "deleted" so log it and move on
|
newManifests = append(newManifests, manifest)
|
||||||
glog.Infof("Couldn't find: %s in %#v", podID, manifests)
|
} else {
|
||||||
}
|
found = true
|
||||||
if err = registry.updateManifests(machine, newManifests); err != nil {
|
}
|
||||||
return err
|
}
|
||||||
|
if !found {
|
||||||
|
// This really shouldn't happen, it indicates something is broken, and likely
|
||||||
|
// there is a lost pod somewhere.
|
||||||
|
// However it is "deleted" so log it and move on
|
||||||
|
glog.Infof("Couldn't find: %s in %#v", podID, manifests)
|
||||||
|
}
|
||||||
|
if err = registry.updateManifests(machine, newManifests, index); err != nil {
|
||||||
|
if util.IsEtcdConflict(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
key := makePodKey(machine, podID)
|
key := makePodKey(machine, podID)
|
||||||
_, err = registry.etcdClient.Delete(key, true)
|
_, err := registry.etcdClient.Delete(key, true)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
|
||||||
key := makePodKey(machine, podID)
|
key := makePodKey(machine, podID)
|
||||||
err = registry.helper().ExtractObj(key, &pod, false)
|
err, _ = registry.helper().ExtractObj(key, &pod, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -205,7 +225,7 @@ func makeControllerKey(id string) string {
|
|||||||
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
|
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
|
||||||
var controller api.ReplicationController
|
var controller api.ReplicationController
|
||||||
key := makeControllerKey(controllerID)
|
key := makeControllerKey(controllerID)
|
||||||
err := registry.helper().ExtractObj(key, &controller, false)
|
err, _ := registry.helper().ExtractObj(key, &controller, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -244,7 +264,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error {
|
|||||||
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
|
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
|
||||||
key := makeServiceKey(name)
|
key := makeServiceKey(name)
|
||||||
var svc api.Service
|
var svc api.Service
|
||||||
err := registry.helper().ExtractObj(key, &svc, false)
|
err, _ := registry.helper().ExtractObj(key, &svc, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -640,3 +640,18 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
|
|||||||
t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints)
|
t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO We need a test for the compare and swap behavior. This basically requires two things:
|
||||||
|
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
|
||||||
|
// channel, this will enable us to orchestrate the flow of etcd requests in the test.
|
||||||
|
// 2) We need to make the map from key to (response, error) actually be a [](response, error) and pop
|
||||||
|
// our way through the responses. That will enable us to hand back multiple different responses for
|
||||||
|
// the same key.
|
||||||
|
// Once that infrastructure is in place, the test looks something like:
|
||||||
|
// Routine #1 Routine #2
|
||||||
|
// Read
|
||||||
|
// Wait for sync on update Read
|
||||||
|
// Update
|
||||||
|
// Update
|
||||||
|
// In the buggy case, this will result in lost data. In the correct case, the second update should fail
|
||||||
|
// and be retried.
|
||||||
|
@ -24,10 +24,24 @@ import (
|
|||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// EtcdClient is an injectable interface for testing.
|
||||||
|
type EtcdClient interface {
|
||||||
|
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
||||||
|
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||||
|
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
|
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
|
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||||
|
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||||
|
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
|
||||||
|
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
|
||||||
|
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Interface exposing only the etcd operations needed by EtcdHelper.
|
// Interface exposing only the etcd operations needed by EtcdHelper.
|
||||||
type EtcdGetSet interface {
|
type EtcdGetSet interface {
|
||||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||||
|
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
|
||||||
@ -37,6 +51,16 @@ type EtcdHelper struct {
|
|||||||
|
|
||||||
// Returns true iff err is an etcd not found error.
|
// Returns true iff err is an etcd not found error.
|
||||||
func IsEtcdNotFound(err error) bool {
|
func IsEtcdNotFound(err error) bool {
|
||||||
|
return isEtcdErrorNum(err, 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true iff err is an etcd write conflict.
|
||||||
|
func IsEtcdConflict(err error) bool {
|
||||||
|
return isEtcdErrorNum(err, 101)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns true iff err is an etcd error, whose errorCode matches errorCode
|
||||||
|
func isEtcdErrorNum(err error, errorCode int) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -46,7 +70,7 @@ func IsEtcdNotFound(err error) bool {
|
|||||||
if etcdError == nil {
|
if etcdError == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if etcdError.ErrorCode == 100 {
|
if etcdError.ErrorCode == errorCode {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -92,23 +116,33 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
|
|||||||
// Unmarshals json found at key into objPtr. On a not found error, will either return
|
// Unmarshals json found at key into objPtr. On a not found error, will either return
|
||||||
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
|
||||||
// empty responses and nil response nodes exactly like a not found error.
|
// empty responses and nil response nodes exactly like a not found error.
|
||||||
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
|
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (error, uint64) {
|
||||||
response, err := h.Client.Get(key, false, false)
|
response, err := h.Client.Get(key, false, false)
|
||||||
|
|
||||||
if err != nil && !IsEtcdNotFound(err) {
|
if err != nil && !IsEtcdNotFound(err) {
|
||||||
return err
|
return err, 0
|
||||||
}
|
}
|
||||||
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
|
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
|
||||||
if ignoreNotFound {
|
if ignoreNotFound {
|
||||||
pv := reflect.ValueOf(objPtr)
|
pv := reflect.ValueOf(objPtr)
|
||||||
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
|
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
|
||||||
return nil
|
return nil, 0
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err, 0
|
||||||
}
|
}
|
||||||
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
|
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response), 0
|
||||||
}
|
}
|
||||||
return json.Unmarshal([]byte(response.Node.Value), objPtr)
|
return json.Unmarshal([]byte(response.Node.Value), objPtr), response.Node.ModifiedIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches the previous modified index
|
||||||
|
func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error {
|
||||||
|
data, err := json.Marshal(obj)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", index)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetObj marshals obj via json, and stores under key.
|
// SetObj marshals obj via json, and stores under key.
|
||||||
|
@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) {
|
|||||||
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
|
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
|
||||||
helper := EtcdHelper{fakeClient}
|
helper := EtcdHelper{fakeClient}
|
||||||
var got testMarshalType
|
var got testMarshalType
|
||||||
err := helper.ExtractObj("/some/key", &got, false)
|
err, _ := helper.ExtractObj("/some/key", &got, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %#v", err)
|
t.Errorf("Unexpected error %#v", err)
|
||||||
}
|
}
|
||||||
@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) {
|
|||||||
helper := EtcdHelper{fakeClient}
|
helper := EtcdHelper{fakeClient}
|
||||||
try := func(key string) {
|
try := func(key string) {
|
||||||
var got testMarshalType
|
var got testMarshalType
|
||||||
err := helper.ExtractObj(key, &got, false)
|
err, _ := helper.ExtractObj(key, &got, false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("%s: wanted error but didn't get one", key)
|
t.Errorf("%s: wanted error but didn't get one", key)
|
||||||
}
|
}
|
||||||
err = helper.ExtractObj(key, &got, true)
|
err, _ = helper.ExtractObj(key, &got, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s: didn't want error but got %#v", key, err)
|
t.Errorf("%s: didn't want error but got %#v", key, err)
|
||||||
}
|
}
|
||||||
|
@ -23,18 +23,6 @@ import (
|
|||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EtcdClient is an injectable interface for testing.
|
|
||||||
type EtcdClient interface {
|
|
||||||
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
|
|
||||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
|
||||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
|
||||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
|
||||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
|
||||||
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
|
|
||||||
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
|
|
||||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type EtcdResponseWithError struct {
|
type EtcdResponseWithError struct {
|
||||||
R *etcd.Response
|
R *etcd.Response
|
||||||
E error
|
E error
|
||||||
@ -87,6 +75,12 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err
|
|||||||
f.Data[key] = result
|
f.Data[key] = result
|
||||||
return result.R, f.Err
|
return result.R, f.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
|
||||||
|
// TODO: Maybe actually implement compare and swap here?
|
||||||
|
return f.Set(key, value, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
|
func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
|
||||||
return f.Set(key, value, ttl)
|
return f.Set(key, value, ttl)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user