diff --git a/contrib/mesos/pkg/scheduler/components/framework/framework.go b/contrib/mesos/pkg/scheduler/components/framework/framework.go index 97eaaf3be90..1498b8f7892 100644 --- a/contrib/mesos/pkg/scheduler/components/framework/framework.go +++ b/contrib/mesos/pkg/scheduler/components/framework/framework.go @@ -28,6 +28,7 @@ import ( mesos "github.com/mesos/mesos-go/mesosproto" mutil "github.com/mesos/mesos-go/mesosutil" bindings "github.com/mesos/mesos-go/scheduler" + "golang.org/x/net/context" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/contrib/mesos/pkg/offers" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/proc" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/tasksreconciler" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" @@ -71,7 +73,7 @@ type framework struct { failoverTimeout float64 // in seconds reconcileInterval int64 nodeRegistrator node.Registrator - storeFrameworkId func(id string) + storeFrameworkId frameworkid.StoreFunc lookupNode node.LookupFunc executorId *mesos.ExecutorID @@ -97,7 +99,7 @@ type Config struct { SchedulerConfig schedcfg.Config ExecutorId *mesos.ExecutorID Client *client.Client - StoreFrameworkId func(id string) + StoreFrameworkId frameworkid.StoreFunc FailoverTimeout float64 ReconcileInterval int64 ReconcileCooldown time.Duration @@ -334,9 +336,41 @@ func (k *framework) onInitialRegistration(driver bindings.SchedulerDriver) { if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() { refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second } + + // wait until we've written the framework ID at least once before proceeding + firstStore := make(chan struct{}) go runtime.Until(func() { - k.storeFrameworkId(k.frameworkId.GetValue()) + // only close firstStore once + select { + case <-firstStore: + default: + defer close(firstStore) + } + err := k.storeFrameworkId(context.TODO(), k.frameworkId.GetValue()) + if err != nil { + log.Errorf("failed to store framework ID: %v", err) + if err == frameworkid.ErrMismatch { + // we detected a framework ID in storage that doesn't match what we're trying + // to save. this is a dangerous state: + // (1) perhaps we failed to initially recover the framework ID and so mesos + // issued us a new one. now that we're trying to save it there's a mismatch. + // (2) we've somehow bungled the framework ID and we're out of alignment with + // what mesos is expecting. + // (3) multiple schedulers were launched at the same time, and both have + // registered with mesos (because when they each checked, there was no ID in + // storage, so they asked for a new one). one of them has already written the + // ID to storage -- we lose. + log.Error("aborting due to framework ID mismatch") + driver.Abort() + } + } }, refreshInterval, k.terminate) + + // wait for the first store attempt of the framework ID + select { + case <-firstStore: + case <-k.terminate: + } } r1 := k.makeTaskRegistryReconciler() diff --git a/contrib/mesos/pkg/scheduler/components/framework/frameworkid/etcd/etcd.go b/contrib/mesos/pkg/scheduler/components/framework/frameworkid/etcd/etcd.go new file mode 100644 index 00000000000..c6e09f754eb --- /dev/null +++ b/contrib/mesos/pkg/scheduler/components/framework/frameworkid/etcd/etcd.go @@ -0,0 +1,61 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "fmt" + "time" + + etcd "github.com/coreos/etcd/client" + "golang.org/x/net/context" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid" + etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" +) + +type storage struct { + frameworkid.LookupFunc + frameworkid.StoreFunc + frameworkid.RemoveFunc +} + +func Store(api etcd.KeysAPI, path string, ttl time.Duration) frameworkid.Storage { + // TODO(jdef) validate Config + return &storage{ + LookupFunc: func(ctx context.Context) (string, error) { + if response, err := api.Get(ctx, path, nil); err != nil { + if !etcdutil.IsEtcdNotFound(err) { + return "", fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err) + } + } else { + return response.Node.Value, nil + } + return "", nil + }, + RemoveFunc: func(ctx context.Context) (err error) { + if _, err = api.Delete(ctx, path, &etcd.DeleteOptions{Recursive: true}); err != nil { + if !etcdutil.IsEtcdNotFound(err) { + return fmt.Errorf("failed to delete framework ID from etcd: %v", err) + } + } + return + }, + StoreFunc: func(ctx context.Context, id string) (err error) { + _, err = api.Set(ctx, path, id, &etcd.SetOptions{TTL: ttl}) + return + }, + } +} diff --git a/contrib/mesos/pkg/scheduler/components/framework/frameworkid/frameworkid.go b/contrib/mesos/pkg/scheduler/components/framework/frameworkid/frameworkid.go new file mode 100644 index 00000000000..f7803c89516 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/components/framework/frameworkid/frameworkid.go @@ -0,0 +1,58 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package frameworkid + +import ( + "errors" + + "golang.org/x/net/context" +) + +type ( + // LookupFunc retrieves a framework ID from persistent storage + LookupFunc func(context.Context) (string, error) + + // StoreFunc stores a framework ID in persistent storage + StoreFunc func(context.Context, string) error + + // RemoveFunc removes a framework ID from persistent storage + RemoveFunc func(context.Context) error + + Getter interface { + Get(context.Context) (string, error) + } + + Setter interface { + Set(context.Context, string) error + } + + Remover interface { + Remove(context.Context) error + } + + Storage interface { + Getter + Setter + Remover + } +) + +var ErrMismatch = errors.New("framework ID mismatch") + +func (f LookupFunc) Get(c context.Context) (string, error) { return f(c) } +func (f StoreFunc) Set(c context.Context, id string) error { return f(c, id) } +func (f RemoveFunc) Remove(c context.Context) error { return f(c) } diff --git a/contrib/mesos/pkg/scheduler/components/framework/frameworkid/zk/zk.go b/contrib/mesos/pkg/scheduler/components/framework/frameworkid/zk/zk.go new file mode 100644 index 00000000000..f37862691aa --- /dev/null +++ b/contrib/mesos/pkg/scheduler/components/framework/frameworkid/zk/zk.go @@ -0,0 +1,157 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package zk + +import ( + "fmt" + "net/url" + "path" + "strings" + "time" + + "github.com/samuel/go-zookeeper/zk" + "golang.org/x/net/context" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid" +) + +const RPC_TIMEOUT = time.Second * 5 + +type storage struct { + frameworkid.LookupFunc + frameworkid.StoreFunc + frameworkid.RemoveFunc +} + +func Store(zkurl, frameworkName string) frameworkid.Storage { + // TODO(jdef) validate Config + zkServers, zkChroot, parseErr := parseZk(zkurl) + withConnection := func(ctx context.Context, f func(c *zk.Conn) error) error { + if parseErr != nil { + return parseErr + } + timeout, err := timeout(ctx) + if err != nil { + return err + } + c, _, err := zk.Connect(zkServers, timeout) + if err != nil { + return err + } + defer c.Close() + return f(c) + } + return &storage{ + LookupFunc: func(ctx context.Context) (rawData string, lookupErr error) { + lookupErr = withConnection(ctx, func(c *zk.Conn) error { + data, _, err := c.Get(path.Join(zkChroot, frameworkName)) + if err == nil { + rawData = string(data) + } else if err != zk.ErrNoNode { + return err + } + return nil + }) + return + }, + RemoveFunc: func(ctx context.Context) error { + return withConnection(ctx, func(c *zk.Conn) error { + err := c.Delete(path.Join(zkChroot, frameworkName), -1) + if err != zk.ErrNoNode { + return err + } + return nil + }) + }, + StoreFunc: func(ctx context.Context, id string) error { + return withConnection(ctx, func(c *zk.Conn) error { + // attempt to create the path + _, err := c.Create( + zkChroot, + []byte(""), + 0, + zk.WorldACL(zk.PermAll), + ) + if err != nil && err != zk.ErrNodeExists { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // attempt to write framework ID to / + fpath := path.Join(zkChroot, frameworkName) + _, err = c.Create(fpath, []byte(id), 0, zk.WorldACL(zk.PermAll)) + if err != nil && err == zk.ErrNodeExists { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // cross-check value + data, _, err := c.Get(fpath) + if err != nil { + return err + } + if string(data) != id { + return frameworkid.ErrMismatch + } + return nil + } + return err + }) + }, + } +} + +func parseZk(zkurl string) ([]string, string, error) { + u, err := url.Parse(zkurl) + if err != nil { + return nil, "", fmt.Errorf("bad zk url: %v", err) + } + if u.Scheme != "zk" { + return nil, "", fmt.Errorf("invalid url scheme for zk url: '%v'", u.Scheme) + } + return strings.Split(u.Host, ","), u.Path, nil +} + +func timeout(ctx context.Context) (time.Duration, error) { + deadline, ok := ctx.Deadline() + if !ok { + // no deadline set + return RPC_TIMEOUT, nil + } + if now := time.Now(); now.Before(deadline) { + d := deadline.Sub(now) + if d > RPC_TIMEOUT { + // deadline is too far out, use our built-in + return RPC_TIMEOUT, nil + } + return d, nil + } + + // deadline has expired.. + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + // this should never happen because Done() should be closed + // according to the contract of context. but we have this here + // just in case. + return 0, context.DeadlineExceeded + } +} diff --git a/contrib/mesos/pkg/scheduler/meta/store.go b/contrib/mesos/pkg/scheduler/meta/store.go index 7203a12c948..82545edb44e 100644 --- a/contrib/mesos/pkg/scheduler/meta/store.go +++ b/contrib/mesos/pkg/scheduler/meta/store.go @@ -16,9 +16,8 @@ limitations under the License. package meta -// keys for things that we store -const ( - //TODO(jdef) this should also be a format instead of a fixed path - FrameworkIDKey = "/mesos/k8sm/frameworkid" - DefaultElectionFormat = "/mesos/k8sm/framework/%s/leader" -) +const StoreChroot = "/k8sm" + +func ElectionPath(frameworkName string) string { + return StoreChroot + "/" + frameworkName + "/leader" +} diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 9b556f52017..1ceb05cc975 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -23,9 +23,11 @@ import ( "io/ioutil" "net" "net/http" + "net/url" "os" "os/exec" "os/user" + "path" "path/filepath" "strconv" "strings" @@ -56,6 +58,9 @@ import ( "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid" + frameworkidEtcd "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid/etcd" + frameworkidZk "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid/zk" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" @@ -74,7 +79,6 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/healthz" "k8s.io/kubernetes/pkg/master/ports" - etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util/sets" // lock to this API version, compilation will fail when this becomes unsupported @@ -117,6 +121,7 @@ type SchedulerServer struct { checkpoint bool failoverTimeout float64 generateTaskDiscovery bool + frameworkStoreURI string executorLogV int executorBindall bool @@ -184,9 +189,10 @@ type schedulerProcessInterface interface { // NewSchedulerServer creates a new SchedulerServer with default parameters func NewSchedulerServer() *SchedulerServer { s := SchedulerServer{ - port: ports.SchedulerPort, - address: net.ParseIP("127.0.0.1"), - failoverTimeout: time.Duration((1 << 62) - 1).Seconds(), + port: ports.SchedulerPort, + address: net.ParseIP("127.0.0.1"), + failoverTimeout: time.Duration((1 << 62) - 1).Seconds(), + frameworkStoreURI: "etcd://", runProxy: true, executorSuicideTimeout: execcfg.DefaultSuicideTimeout, @@ -272,6 +278,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.graceful, "graceful", s.graceful, "Indicator of a graceful failover, intended for internal use only.") fs.BoolVar(&s.ha, "ha", s.ha, "Run the scheduler in high availability mode with leader election. All peers should be configured exactly the same.") fs.StringVar(&s.frameworkName, "framework-name", s.frameworkName, "The framework name to register with Mesos.") + fs.StringVar(&s.frameworkStoreURI, "framework-store-uri", s.frameworkStoreURI, "Where the framework should store metadata, either in Zookeeper (zk://host:port/path) or in etcd (etcd://path).") fs.StringVar(&s.frameworkWebURI, "framework-weburi", s.frameworkWebURI, "A URI that points to a web-based interface for interacting with the framework.") fs.StringVar(&s.advertisedAddress, "advertised-address", s.advertisedAddress, "host:port address that is advertised to clients. May be used to construct artifact download URIs.") fs.IPVar(&s.serviceAddress, "service-address", s.serviceAddress, "The service portal IP address that the scheduler should register with (if unset, chooses randomly)") @@ -593,7 +600,7 @@ func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error { if s.ha { validation := ha.ValidationFunc(validateLeadershipTransition) srv := ha.NewCandidate(schedulerProcess, driverFactory, validation) - path := fmt.Sprintf(meta.DefaultElectionFormat, s.frameworkName) + path := meta.ElectionPath(s.frameworkName) log.Infof("registering for election at %v with id %v", path, eid.GetValue()) go election.Notify(election.NewEtcdMasterElector(etcdClient), path, eid.GetValue(), srv, nil) } else { @@ -746,7 +753,10 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config pr := podtask.NewDefaultProcurement(eiPrototype, eiRegistry) fcfs := podschedulers.NewFCFSPodScheduler(pr, lookupNode) - + frameworkIDStorage, err := s.frameworkIDStorage(keysAPI) + if err != nil { + log.Fatalf("cannot init framework ID storage: %v", err) + } framework := framework.New(framework.Config{ SchedulerConfig: *sc, Client: client, @@ -754,16 +764,9 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config ReconcileInterval: s.reconcileInterval, ReconcileCooldown: s.reconcileCooldown, LookupNode: lookupNode, - StoreFrameworkId: func(id string) { - // TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available - _, err := keysAPI.Set(context.TODO(), meta.FrameworkIDKey, id, &etcd.SetOptions{TTL: time.Duration(s.failoverTimeout) * time.Second}) - if err != nil { - log.Errorf("failed to renew frameworkId TTL: %v", err) - } - }, - ExecutorId: eiPrototype.GetExecutorId(), + StoreFrameworkId: frameworkIDStorage.Set, + ExecutorId: eiPrototype.GetExecutorId(), }) - masterUri := s.mesosMaster info, cred, err := s.buildFrameworkInfo() if err != nil { @@ -819,18 +822,33 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config if err = framework.Init(sched, schedulerProcess.Master(), s.mux); err != nil { return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err) } + log.V(1).Infoln("deferred init complete") - // defer obtaining framework ID to prevent multiple schedulers - // from overwriting each other's framework IDs - dconfig.Framework.Id, err = s.fetchFrameworkID(keysAPI) - if err != nil { - return nil, fmt.Errorf("failed to fetch framework ID from etcd: %v", err) + if s.failoverTimeout > 0 { + // defer obtaining framework ID to prevent multiple schedulers + // from overwriting each other's framework IDs + var frameworkID string + frameworkID, err = frameworkIDStorage.Get(context.TODO()) + if err != nil { + return nil, fmt.Errorf("failed to fetch framework ID from storage: %v", err) + } + if frameworkID != "" { + log.Infof("configuring FrameworkInfo with ID found in storage: %q", frameworkID) + dconfig.Framework.Id = &mesos.FrameworkID{Value: &frameworkID} + } else { + log.V(1).Infof("did not find framework ID in storage") + } + } else { + // TODO(jdef) this is a hack, really for development, to simplify clean up of old framework IDs + frameworkIDStorage.Remove(context.TODO()) } + log.V(1).Infoln("constructing mesos scheduler driver") drv, err = bindings.NewMesosSchedulerDriver(*dconfig) if err != nil { return nil, fmt.Errorf("failed to construct scheduler driver: %v", err) } + log.V(1).Infoln("constructed mesos scheduler driver:", drv) s.setDriver(drv) return drv, nil @@ -944,29 +962,6 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred return } -func (s *SchedulerServer) fetchFrameworkID(client etcd.KeysAPI) (*mesos.FrameworkID, error) { - if s.failoverTimeout > 0 { - if response, err := client.Get(context.TODO(), meta.FrameworkIDKey, nil); err != nil { - if !etcdutil.IsEtcdNotFound(err) { - return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err) - } - log.V(1).Infof("did not find framework ID in etcd") - } else if response.Node.Value != "" { - log.Infof("configuring FrameworkInfo with Id found in etcd: '%s'", response.Node.Value) - return mutil.NewFrameworkID(response.Node.Value), nil - } - } else { - //TODO(jdef) this seems like a totally hackish way to clean up the framework ID - if _, err := client.Delete(context.TODO(), meta.FrameworkIDKey, &etcd.DeleteOptions{Recursive: true}); err != nil { - if !etcdutil.IsEtcdNotFound(err) { - return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err) - } - log.V(1).Infof("nothing to delete: did not find framework ID in etcd") - } - } - return nil, nil -} - func (s *SchedulerServer) getUsername() (username string, err error) { username = s.mesosUser if username == "" { @@ -979,3 +974,24 @@ func (s *SchedulerServer) getUsername() (username string, err error) { } return } + +func (s *SchedulerServer) frameworkIDStorage(keysAPI etcd.KeysAPI) (frameworkid.Storage, error) { + u, err := url.Parse(s.frameworkStoreURI) + if err != nil { + return nil, fmt.Errorf("cannot parse framework store URI: %v", err) + } + + switch u.Scheme { + case "etcd": + idpath := meta.StoreChroot + if u.Path != "" { + idpath = path.Join("/", u.Path) + } + idpath = path.Join(idpath, s.frameworkName, "frameworkid") + return frameworkidEtcd.Store(keysAPI, idpath, time.Duration(s.failoverTimeout)*time.Second), nil + case "zk": + return frameworkidZk.Store(s.frameworkStoreURI, s.frameworkName), nil + default: + return nil, fmt.Errorf("unsupported framework storage scheme: %q", u.Scheme) + } +} diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 3451f871a5e..272c2a988a0 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -111,6 +111,7 @@ file-suffix file_content_in_loop forward-services framework-name +framework-store-uri framework-weburi from-file from-literal