mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #20387 from mesosphere/jdef_frameworkid_storage
Auto commit by PR queue bot
This commit is contained in:
commit
3af79b09ca
@ -28,6 +28,7 @@ import (
|
||||
mesos "github.com/mesos/mesos-go/mesosproto"
|
||||
mutil "github.com/mesos/mesos-go/mesosutil"
|
||||
bindings "github.com/mesos/mesos-go/scheduler"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/node"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
|
||||
@ -35,6 +36,7 @@ import (
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/tasksreconciler"
|
||||
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||
merrors "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
|
||||
@ -71,7 +73,7 @@ type framework struct {
|
||||
failoverTimeout float64 // in seconds
|
||||
reconcileInterval int64
|
||||
nodeRegistrator node.Registrator
|
||||
storeFrameworkId func(id string)
|
||||
storeFrameworkId frameworkid.StoreFunc
|
||||
lookupNode node.LookupFunc
|
||||
executorId *mesos.ExecutorID
|
||||
|
||||
@ -97,7 +99,7 @@ type Config struct {
|
||||
SchedulerConfig schedcfg.Config
|
||||
ExecutorId *mesos.ExecutorID
|
||||
Client *client.Client
|
||||
StoreFrameworkId func(id string)
|
||||
StoreFrameworkId frameworkid.StoreFunc
|
||||
FailoverTimeout float64
|
||||
ReconcileInterval int64
|
||||
ReconcileCooldown time.Duration
|
||||
@ -334,9 +336,41 @@ func (k *framework) onInitialRegistration(driver bindings.SchedulerDriver) {
|
||||
if k.failoverTimeout < k.schedulerConfig.FrameworkIdRefreshInterval.Duration.Seconds() {
|
||||
refreshInterval = time.Duration(math.Max(1, k.failoverTimeout/2)) * time.Second
|
||||
}
|
||||
|
||||
// wait until we've written the framework ID at least once before proceeding
|
||||
firstStore := make(chan struct{})
|
||||
go runtime.Until(func() {
|
||||
k.storeFrameworkId(k.frameworkId.GetValue())
|
||||
// only close firstStore once
|
||||
select {
|
||||
case <-firstStore:
|
||||
default:
|
||||
defer close(firstStore)
|
||||
}
|
||||
err := k.storeFrameworkId(context.TODO(), k.frameworkId.GetValue())
|
||||
if err != nil {
|
||||
log.Errorf("failed to store framework ID: %v", err)
|
||||
if err == frameworkid.ErrMismatch {
|
||||
// we detected a framework ID in storage that doesn't match what we're trying
|
||||
// to save. this is a dangerous state:
|
||||
// (1) perhaps we failed to initially recover the framework ID and so mesos
|
||||
// issued us a new one. now that we're trying to save it there's a mismatch.
|
||||
// (2) we've somehow bungled the framework ID and we're out of alignment with
|
||||
// what mesos is expecting.
|
||||
// (3) multiple schedulers were launched at the same time, and both have
|
||||
// registered with mesos (because when they each checked, there was no ID in
|
||||
// storage, so they asked for a new one). one of them has already written the
|
||||
// ID to storage -- we lose.
|
||||
log.Error("aborting due to framework ID mismatch")
|
||||
driver.Abort()
|
||||
}
|
||||
}
|
||||
}, refreshInterval, k.terminate)
|
||||
|
||||
// wait for the first store attempt of the framework ID
|
||||
select {
|
||||
case <-firstStore:
|
||||
case <-k.terminate:
|
||||
}
|
||||
}
|
||||
|
||||
r1 := k.makeTaskRegistryReconciler()
|
||||
|
@ -0,0 +1,61 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
etcd "github.com/coreos/etcd/client"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
)
|
||||
|
||||
type storage struct {
|
||||
frameworkid.LookupFunc
|
||||
frameworkid.StoreFunc
|
||||
frameworkid.RemoveFunc
|
||||
}
|
||||
|
||||
func Store(api etcd.KeysAPI, path string, ttl time.Duration) frameworkid.Storage {
|
||||
// TODO(jdef) validate Config
|
||||
return &storage{
|
||||
LookupFunc: func(ctx context.Context) (string, error) {
|
||||
if response, err := api.Get(ctx, path, nil); err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return "", fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err)
|
||||
}
|
||||
} else {
|
||||
return response.Node.Value, nil
|
||||
}
|
||||
return "", nil
|
||||
},
|
||||
RemoveFunc: func(ctx context.Context) (err error) {
|
||||
if _, err = api.Delete(ctx, path, &etcd.DeleteOptions{Recursive: true}); err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return fmt.Errorf("failed to delete framework ID from etcd: %v", err)
|
||||
}
|
||||
}
|
||||
return
|
||||
},
|
||||
StoreFunc: func(ctx context.Context, id string) (err error) {
|
||||
_, err = api.Set(ctx, path, id, &etcd.SetOptions{TTL: ttl})
|
||||
return
|
||||
},
|
||||
}
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package frameworkid
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type (
|
||||
// LookupFunc retrieves a framework ID from persistent storage
|
||||
LookupFunc func(context.Context) (string, error)
|
||||
|
||||
// StoreFunc stores a framework ID in persistent storage
|
||||
StoreFunc func(context.Context, string) error
|
||||
|
||||
// RemoveFunc removes a framework ID from persistent storage
|
||||
RemoveFunc func(context.Context) error
|
||||
|
||||
Getter interface {
|
||||
Get(context.Context) (string, error)
|
||||
}
|
||||
|
||||
Setter interface {
|
||||
Set(context.Context, string) error
|
||||
}
|
||||
|
||||
Remover interface {
|
||||
Remove(context.Context) error
|
||||
}
|
||||
|
||||
Storage interface {
|
||||
Getter
|
||||
Setter
|
||||
Remover
|
||||
}
|
||||
)
|
||||
|
||||
var ErrMismatch = errors.New("framework ID mismatch")
|
||||
|
||||
func (f LookupFunc) Get(c context.Context) (string, error) { return f(c) }
|
||||
func (f StoreFunc) Set(c context.Context, id string) error { return f(c, id) }
|
||||
func (f RemoveFunc) Remove(c context.Context) error { return f(c) }
|
@ -0,0 +1,157 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package zk
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/samuel/go-zookeeper/zk"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid"
|
||||
)
|
||||
|
||||
const RPC_TIMEOUT = time.Second * 5
|
||||
|
||||
type storage struct {
|
||||
frameworkid.LookupFunc
|
||||
frameworkid.StoreFunc
|
||||
frameworkid.RemoveFunc
|
||||
}
|
||||
|
||||
func Store(zkurl, frameworkName string) frameworkid.Storage {
|
||||
// TODO(jdef) validate Config
|
||||
zkServers, zkChroot, parseErr := parseZk(zkurl)
|
||||
withConnection := func(ctx context.Context, f func(c *zk.Conn) error) error {
|
||||
if parseErr != nil {
|
||||
return parseErr
|
||||
}
|
||||
timeout, err := timeout(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c, _, err := zk.Connect(zkServers, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer c.Close()
|
||||
return f(c)
|
||||
}
|
||||
return &storage{
|
||||
LookupFunc: func(ctx context.Context) (rawData string, lookupErr error) {
|
||||
lookupErr = withConnection(ctx, func(c *zk.Conn) error {
|
||||
data, _, err := c.Get(path.Join(zkChroot, frameworkName))
|
||||
if err == nil {
|
||||
rawData = string(data)
|
||||
} else if err != zk.ErrNoNode {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
},
|
||||
RemoveFunc: func(ctx context.Context) error {
|
||||
return withConnection(ctx, func(c *zk.Conn) error {
|
||||
err := c.Delete(path.Join(zkChroot, frameworkName), -1)
|
||||
if err != zk.ErrNoNode {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
},
|
||||
StoreFunc: func(ctx context.Context, id string) error {
|
||||
return withConnection(ctx, func(c *zk.Conn) error {
|
||||
// attempt to create the path
|
||||
_, err := c.Create(
|
||||
zkChroot,
|
||||
[]byte(""),
|
||||
0,
|
||||
zk.WorldACL(zk.PermAll),
|
||||
)
|
||||
if err != nil && err != zk.ErrNodeExists {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
// attempt to write framework ID to <path> / <frameworkName>
|
||||
fpath := path.Join(zkChroot, frameworkName)
|
||||
_, err = c.Create(fpath, []byte(id), 0, zk.WorldACL(zk.PermAll))
|
||||
if err != nil && err == zk.ErrNodeExists {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
// cross-check value
|
||||
data, _, err := c.Get(fpath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if string(data) != id {
|
||||
return frameworkid.ErrMismatch
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func parseZk(zkurl string) ([]string, string, error) {
|
||||
u, err := url.Parse(zkurl)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("bad zk url: %v", err)
|
||||
}
|
||||
if u.Scheme != "zk" {
|
||||
return nil, "", fmt.Errorf("invalid url scheme for zk url: '%v'", u.Scheme)
|
||||
}
|
||||
return strings.Split(u.Host, ","), u.Path, nil
|
||||
}
|
||||
|
||||
func timeout(ctx context.Context) (time.Duration, error) {
|
||||
deadline, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
// no deadline set
|
||||
return RPC_TIMEOUT, nil
|
||||
}
|
||||
if now := time.Now(); now.Before(deadline) {
|
||||
d := deadline.Sub(now)
|
||||
if d > RPC_TIMEOUT {
|
||||
// deadline is too far out, use our built-in
|
||||
return RPC_TIMEOUT, nil
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// deadline has expired..
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
default:
|
||||
// this should never happen because Done() should be closed
|
||||
// according to the contract of context. but we have this here
|
||||
// just in case.
|
||||
return 0, context.DeadlineExceeded
|
||||
}
|
||||
}
|
@ -16,9 +16,8 @@ limitations under the License.
|
||||
|
||||
package meta
|
||||
|
||||
// keys for things that we store
|
||||
const (
|
||||
//TODO(jdef) this should also be a format instead of a fixed path
|
||||
FrameworkIDKey = "/mesos/k8sm/frameworkid"
|
||||
DefaultElectionFormat = "/mesos/k8sm/framework/%s/leader"
|
||||
)
|
||||
const StoreChroot = "/k8sm"
|
||||
|
||||
func ElectionPath(frameworkName string) string {
|
||||
return StoreChroot + "/" + frameworkName + "/leader"
|
||||
}
|
||||
|
@ -23,9 +23,11 @@ import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/user"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -56,6 +58,9 @@ import (
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid"
|
||||
frameworkidEtcd "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid/etcd"
|
||||
frameworkidZk "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/framework/frameworkid/zk"
|
||||
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/executorinfo"
|
||||
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
|
||||
@ -74,7 +79,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
// lock to this API version, compilation will fail when this becomes unsupported
|
||||
@ -117,6 +121,7 @@ type SchedulerServer struct {
|
||||
checkpoint bool
|
||||
failoverTimeout float64
|
||||
generateTaskDiscovery bool
|
||||
frameworkStoreURI string
|
||||
|
||||
executorLogV int
|
||||
executorBindall bool
|
||||
@ -184,9 +189,10 @@ type schedulerProcessInterface interface {
|
||||
// NewSchedulerServer creates a new SchedulerServer with default parameters
|
||||
func NewSchedulerServer() *SchedulerServer {
|
||||
s := SchedulerServer{
|
||||
port: ports.SchedulerPort,
|
||||
address: net.ParseIP("127.0.0.1"),
|
||||
failoverTimeout: time.Duration((1 << 62) - 1).Seconds(),
|
||||
port: ports.SchedulerPort,
|
||||
address: net.ParseIP("127.0.0.1"),
|
||||
failoverTimeout: time.Duration((1 << 62) - 1).Seconds(),
|
||||
frameworkStoreURI: "etcd://",
|
||||
|
||||
runProxy: true,
|
||||
executorSuicideTimeout: execcfg.DefaultSuicideTimeout,
|
||||
@ -272,6 +278,7 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) {
|
||||
fs.BoolVar(&s.graceful, "graceful", s.graceful, "Indicator of a graceful failover, intended for internal use only.")
|
||||
fs.BoolVar(&s.ha, "ha", s.ha, "Run the scheduler in high availability mode with leader election. All peers should be configured exactly the same.")
|
||||
fs.StringVar(&s.frameworkName, "framework-name", s.frameworkName, "The framework name to register with Mesos.")
|
||||
fs.StringVar(&s.frameworkStoreURI, "framework-store-uri", s.frameworkStoreURI, "Where the framework should store metadata, either in Zookeeper (zk://host:port/path) or in etcd (etcd://path).")
|
||||
fs.StringVar(&s.frameworkWebURI, "framework-weburi", s.frameworkWebURI, "A URI that points to a web-based interface for interacting with the framework.")
|
||||
fs.StringVar(&s.advertisedAddress, "advertised-address", s.advertisedAddress, "host:port address that is advertised to clients. May be used to construct artifact download URIs.")
|
||||
fs.IPVar(&s.serviceAddress, "service-address", s.serviceAddress, "The service portal IP address that the scheduler should register with (if unset, chooses randomly)")
|
||||
@ -593,7 +600,7 @@ func (s *SchedulerServer) Run(hks hyperkube.Interface, _ []string) error {
|
||||
if s.ha {
|
||||
validation := ha.ValidationFunc(validateLeadershipTransition)
|
||||
srv := ha.NewCandidate(schedulerProcess, driverFactory, validation)
|
||||
path := fmt.Sprintf(meta.DefaultElectionFormat, s.frameworkName)
|
||||
path := meta.ElectionPath(s.frameworkName)
|
||||
log.Infof("registering for election at %v with id %v", path, eid.GetValue())
|
||||
go election.Notify(election.NewEtcdMasterElector(etcdClient), path, eid.GetValue(), srv, nil)
|
||||
} else {
|
||||
@ -746,7 +753,10 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
|
||||
pr := podtask.NewDefaultProcurement(eiPrototype, eiRegistry)
|
||||
fcfs := podschedulers.NewFCFSPodScheduler(pr, lookupNode)
|
||||
|
||||
frameworkIDStorage, err := s.frameworkIDStorage(keysAPI)
|
||||
if err != nil {
|
||||
log.Fatalf("cannot init framework ID storage: %v", err)
|
||||
}
|
||||
framework := framework.New(framework.Config{
|
||||
SchedulerConfig: *sc,
|
||||
Client: client,
|
||||
@ -754,16 +764,9 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
ReconcileInterval: s.reconcileInterval,
|
||||
ReconcileCooldown: s.reconcileCooldown,
|
||||
LookupNode: lookupNode,
|
||||
StoreFrameworkId: func(id string) {
|
||||
// TODO(jdef): port FrameworkId store to generic Kubernetes config store as soon as available
|
||||
_, err := keysAPI.Set(context.TODO(), meta.FrameworkIDKey, id, &etcd.SetOptions{TTL: time.Duration(s.failoverTimeout) * time.Second})
|
||||
if err != nil {
|
||||
log.Errorf("failed to renew frameworkId TTL: %v", err)
|
||||
}
|
||||
},
|
||||
ExecutorId: eiPrototype.GetExecutorId(),
|
||||
StoreFrameworkId: frameworkIDStorage.Set,
|
||||
ExecutorId: eiPrototype.GetExecutorId(),
|
||||
})
|
||||
|
||||
masterUri := s.mesosMaster
|
||||
info, cred, err := s.buildFrameworkInfo()
|
||||
if err != nil {
|
||||
@ -819,18 +822,33 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
|
||||
if err = framework.Init(sched, schedulerProcess.Master(), s.mux); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize pod scheduler: %v", err)
|
||||
}
|
||||
|
||||
log.V(1).Infoln("deferred init complete")
|
||||
// defer obtaining framework ID to prevent multiple schedulers
|
||||
// from overwriting each other's framework IDs
|
||||
dconfig.Framework.Id, err = s.fetchFrameworkID(keysAPI)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch framework ID from etcd: %v", err)
|
||||
if s.failoverTimeout > 0 {
|
||||
// defer obtaining framework ID to prevent multiple schedulers
|
||||
// from overwriting each other's framework IDs
|
||||
var frameworkID string
|
||||
frameworkID, err = frameworkIDStorage.Get(context.TODO())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch framework ID from storage: %v", err)
|
||||
}
|
||||
if frameworkID != "" {
|
||||
log.Infof("configuring FrameworkInfo with ID found in storage: %q", frameworkID)
|
||||
dconfig.Framework.Id = &mesos.FrameworkID{Value: &frameworkID}
|
||||
} else {
|
||||
log.V(1).Infof("did not find framework ID in storage")
|
||||
}
|
||||
} else {
|
||||
// TODO(jdef) this is a hack, really for development, to simplify clean up of old framework IDs
|
||||
frameworkIDStorage.Remove(context.TODO())
|
||||
}
|
||||
|
||||
log.V(1).Infoln("constructing mesos scheduler driver")
|
||||
drv, err = bindings.NewMesosSchedulerDriver(*dconfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to construct scheduler driver: %v", err)
|
||||
}
|
||||
|
||||
log.V(1).Infoln("constructed mesos scheduler driver:", drv)
|
||||
s.setDriver(drv)
|
||||
return drv, nil
|
||||
@ -944,29 +962,6 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SchedulerServer) fetchFrameworkID(client etcd.KeysAPI) (*mesos.FrameworkID, error) {
|
||||
if s.failoverTimeout > 0 {
|
||||
if response, err := client.Get(context.TODO(), meta.FrameworkIDKey, nil); err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err)
|
||||
}
|
||||
log.V(1).Infof("did not find framework ID in etcd")
|
||||
} else if response.Node.Value != "" {
|
||||
log.Infof("configuring FrameworkInfo with Id found in etcd: '%s'", response.Node.Value)
|
||||
return mutil.NewFrameworkID(response.Node.Value), nil
|
||||
}
|
||||
} else {
|
||||
//TODO(jdef) this seems like a totally hackish way to clean up the framework ID
|
||||
if _, err := client.Delete(context.TODO(), meta.FrameworkIDKey, &etcd.DeleteOptions{Recursive: true}); err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err)
|
||||
}
|
||||
log.V(1).Infof("nothing to delete: did not find framework ID in etcd")
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *SchedulerServer) getUsername() (username string, err error) {
|
||||
username = s.mesosUser
|
||||
if username == "" {
|
||||
@ -979,3 +974,24 @@ func (s *SchedulerServer) getUsername() (username string, err error) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *SchedulerServer) frameworkIDStorage(keysAPI etcd.KeysAPI) (frameworkid.Storage, error) {
|
||||
u, err := url.Parse(s.frameworkStoreURI)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse framework store URI: %v", err)
|
||||
}
|
||||
|
||||
switch u.Scheme {
|
||||
case "etcd":
|
||||
idpath := meta.StoreChroot
|
||||
if u.Path != "" {
|
||||
idpath = path.Join("/", u.Path)
|
||||
}
|
||||
idpath = path.Join(idpath, s.frameworkName, "frameworkid")
|
||||
return frameworkidEtcd.Store(keysAPI, idpath, time.Duration(s.failoverTimeout)*time.Second), nil
|
||||
case "zk":
|
||||
return frameworkidZk.Store(s.frameworkStoreURI, s.frameworkName), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported framework storage scheme: %q", u.Scheme)
|
||||
}
|
||||
}
|
||||
|
@ -111,6 +111,7 @@ file-suffix
|
||||
file_content_in_loop
|
||||
forward-services
|
||||
framework-name
|
||||
framework-store-uri
|
||||
framework-weburi
|
||||
from-file
|
||||
from-literal
|
||||
|
Loading…
Reference in New Issue
Block a user