Merge pull request #11666 from wojtek-t/refactor_etcd_helper

Extract EtcdHelper interface
This commit is contained in:
Mike Danese 2015-07-24 11:07:46 -07:00
commit 859f440f74
7 changed files with 470 additions and 506 deletions

View File

@ -86,7 +86,6 @@ type APIServer struct {
EtcdServerList util.StringList EtcdServerList util.StringList
EtcdConfigFile string EtcdConfigFile string
EtcdPathPrefix string EtcdPathPrefix string
OldEtcdPathPrefix string
CorsAllowedOriginList util.StringList CorsAllowedOriginList util.StringList
AllowPrivileged bool AllowPrivileged bool
ServiceClusterIPRange util.IPNet // TODO: make this a list ServiceClusterIPRange util.IPNet // TODO: make this a list
@ -187,7 +186,6 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.EtcdServerList, "etcd-servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") fs.Var(&s.EtcdServerList, "etcd-servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.") fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.") fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.StringVar(&s.OldEtcdPathPrefix, "old-etcd-prefix", s.OldEtcdPathPrefix, "The previous prefix for all resource paths in etcd, if any.")
fs.Var(&s.CorsAllowedOriginList, "cors-allowed-origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.Var(&s.CorsAllowedOriginList, "cors-allowed-origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.Var(&s.ServiceClusterIPRange, "service-cluster-ip-range", "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.") fs.Var(&s.ServiceClusterIPRange, "service-cluster-ip-range", "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
@ -305,14 +303,6 @@ func (s *APIServer) Run(_ []string) error {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
} }
// TODO Is this the right place for migration to happen? Must *both* old and
// new etcd prefix params be supplied for this to be valid?
if s.OldEtcdPathPrefix != "" {
if err = helper.MigrateKeys(s.OldEtcdPathPrefix); err != nil {
glog.Fatalf("Migration of old etcd keys failed: %v", err)
}
}
n := net.IPNet(s.ServiceClusterIPRange) n := net.IPNet(s.ServiceClusterIPRange)
// Default to the private server key for service account token signing // Default to the private server key for service account token signing

View File

@ -17,12 +17,8 @@ limitations under the License.
package tools package tools
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"net/http"
"os/exec"
"path" "path"
"reflect" "reflect"
"strings" "strings"
@ -38,22 +34,24 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
const maxEtcdCacheEntries int = 50000 func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper {
return EtcdHelper{
func init() { Client: client,
metrics.Register() Codec: codec,
Versioner: APIObjectVersioner{},
Copier: api.Scheme,
PathPrefix: prefix,
cache: util.NewCache(maxEtcdCacheEntries),
}
} }
func getTypeName(obj interface{}) string { // EtcdHelper is the reference implementation of StorageInterface.
return reflect.TypeOf(obj).String() // TODO(wojtekt): Make it private and expose only StorageInterface to outside world.
}
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct { type EtcdHelper struct {
Client EtcdClient Client EtcdClient
Codec runtime.Codec Codec runtime.Codec
Copier runtime.ObjectCopier Copier runtime.ObjectCopier
// optional, no atomic operations can be performed without this interface // optional, has to be set to perform any atomic operations
Versioner EtcdVersioner Versioner EtcdVersioner
// prefix for all etcd keys // prefix for all etcd keys
PathPrefix string PathPrefix string
@ -68,227 +66,110 @@ type EtcdHelper struct {
cache util.Cache cache util.Cache
} }
// NewEtcdHelper creates a helper that works against objects that use the internal func init() {
// Kubernetes API objects. metrics.Register()
// TODO: Refactor to take a runtiem.ObjectCopier
func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper {
return EtcdHelper{
Client: client,
Codec: codec,
Versioner: APIObjectVersioner{},
Copier: api.Scheme,
PathPrefix: prefix,
cache: util.NewCache(maxEtcdCacheEntries),
}
} }
// IsEtcdNotFound returns true iff err is an etcd not found error. // Implements StorageInterface.
func IsEtcdNotFound(err error) bool { func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
return isEtcdErrorNum(err, EtcdErrorCodeNotFound) key = h.prefixEtcdKey(key)
} data, err := h.Codec.Encode(obj)
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}
// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*etcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
}
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
result, err := h.Client.Get(key, true, true)
if err != nil { if err != nil {
index, ok := etcdErrorIndex(err)
if !ok {
index = 0
}
nodes := make([]*etcd.Node, 0)
if IsEtcdNotFound(err) {
return nodes, index, nil
} else {
return nodes, index, err
}
}
return result.Node.Nodes, result.EtcdIndex, nil
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
for _, node := range nodes {
if node.Dir {
trace.Step("Decoding dir " + node.Key + " START")
if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil {
return err
}
trace.Step("Decoding dir " + node.Key + " END")
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex); found {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} else {
obj := reflect.New(v.Type().Elem())
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err return err
} }
if h.Versioner != nil { if h.Versioner != nil {
// being unable to set the version does not prevent the object from being extracted if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex) return errors.New("resourceVersion may not be set on objects to be created")
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
} }
} }
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64) (runtime.Object, bool)
addToCache(index uint64, obj runtime.Object)
}
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now() startTime := time.Now()
defer func() { response, err := h.Client.Create(key, string(data), ttl)
metrics.ObserveGetCache(startTime) metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}()
obj, found := h.cache.Get(index)
if found {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := h.Copier.Copy(obj.(runtime.Object))
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return nil, false
}
metrics.ObserveCacheHit()
return objCopy.(runtime.Object), true
}
metrics.ObserveCacheMiss()
return nil, false
}
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
metrics.ObserveAddCache(startTime)
}()
objCopy, err := h.Copier.Copy(obj)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return
}
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
metrics.ObserveNewEntry()
}
}
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
// definition) and extracts a go object per etcd node into a slice with the resource version.
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractToList " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil { if err != nil {
return err return err
} }
key = h.PrefixEtcdKey(key) if out != nil {
startTime := time.Now() if _, err := conversion.EnforcePtr(out); err != nil {
trace.Step("About to list etcd node") panic("unable to convert output object to pointer")
nodes, index, err := h.listEtcdNode(key) }
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) _, _, err = h.extractObj(response, err, out, false, false)
trace.Step("Etcd node listed") }
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.Codec.Encode(obj)
if err != nil { if err != nil {
return err return err
} }
if err := h.decodeNodeList(nodes, listPtr); err != nil { key = h.prefixEtcdKey(key)
return err
} create := true
trace.Step("Node list decoded")
if h.Versioner != nil { if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, index); err != nil { if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return err create = false
}
}
return nil
}
// ExtractObjToList unmarshals json found at key and opaques it into a *List api object
// (an object that satisfies the runtime.IsList definition).
func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.PrefixEtcdKey(key)
startTime := time.Now() startTime := time.Now()
trace.Step("About to read etcd node") response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
response, err := h.Client.Get(key, false, false) metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil { if err != nil {
if IsEtcdNotFound(err) {
return nil
}
return err return err
} }
}
}
if create {
// Create will fail if a key already exists.
startTime := time.Now()
response, err = h.Client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}
nodes := make([]*etcd.Node, 0) if err != nil {
nodes = append(nodes, response.Node) return err
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err return err
}
trace.Step("Object decoded")
if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
return err
}
}
return nil
} }
// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return // Implements StorageInterface.
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
// empty responses and nil response nodes exactly like a not found error. key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
startTime := time.Now()
response, err := h.Client.Delete(key, false)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
key = h.prefixEtcdKey(key)
startTime := time.Now()
_, err := h.Client.Delete(key, recursive)
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
return err
}
// Implements StorageInterface.
func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error { func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
key = h.PrefixEtcdKey(key) key = h.prefixEtcdKey(key)
_, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) _, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err return err
} }
@ -337,164 +218,144 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
return body, node, err return body, node, err
} }
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds, // Implements StorageInterface.
// and 0 means forever. If no error is returned and out is not nil, out will be set to the read value func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
// from etcd. trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj))
func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error { listPtr, err := runtime.GetItemsPtr(listObj)
key = h.PrefixEtcdKey(key)
data, err := h.Codec.Encode(obj)
if err != nil { if err != nil {
return err return err
} }
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.Client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
if IsEtcdNotFound(err) {
return nil
}
return err
}
nodes := make([]*etcd.Node, 0)
nodes = append(nodes, response.Node)
if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
}
trace.Step("Object decoded")
if h.Versioner != nil {
if err := h.Versioner.UpdateList(listObj, response.EtcdIndex); err != nil {
return err
}
}
return nil
}
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) error {
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(slicePtr)
if err != nil || v.Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
for _, node := range nodes {
if node.Dir {
trace.Step("Decoding dir " + node.Key + " START")
if err := h.decodeNodeList(node.Nodes, slicePtr); err != nil {
return err
}
trace.Step("Decoding dir " + node.Key + " END")
continue
}
if obj, found := h.getFromCache(node.ModifiedIndex); found {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} else {
obj := reflect.New(v.Type().Elem())
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err
}
if h.Versioner != nil { if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 { // being unable to set the version does not prevent the object from being extracted
return errors.New("resourceVersion may not be set on objects to be created") _ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj.Interface().(runtime.Object))
} }
} }
}
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
return nil
}
startTime := time.Now() // Implements StorageInterface.
response, err := h.Client.Create(key, string(data), ttl) func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) trace := util.NewTrace("ExtractToList " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil { if err != nil {
return err return err
} }
if out != nil { key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return err
}
// Delete removes the specified key.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
key = h.PrefixEtcdKey(key)
startTime := time.Now() startTime := time.Now()
_, err := h.Client.Delete(key, recursive) trace.Step("About to list etcd node")
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime) nodes, index, err := h.listEtcdNode(key)
return err metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
} trace.Step("Etcd node listed")
// DeleteObj removes the specified key and returns the value that existed at that spot.
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
key = h.PrefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
startTime := time.Now()
response, err := h.Client.Delete(key, false)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
}
}
return err
}
// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion
// field is set. 'ttl' is time-to-live in seconds, and 0 means forever. If no error is returned and out is
// not nil, out will be set to the read value from etcd.
func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.Codec.Encode(obj)
if err != nil { if err != nil {
return err return err
} }
key = h.PrefixEtcdKey(key) if err := h.decodeNodeList(nodes, listPtr); err != nil {
return err
create := true }
trace.Step("Node list decoded")
if h.Versioner != nil { if h.Versioner != nil {
if version, err := h.Versioner.ObjectResourceVersion(obj); err == nil && version != 0 { if err := h.Versioner.UpdateList(listObj, index); err != nil {
create = false
startTime := time.Now()
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err return err
} }
} }
} return nil
if create {
// Create will fail if a key already exists.
startTime := time.Now()
response, err = h.Client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}
if err != nil {
return err
}
if out != nil {
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
_, _, err = h.extractObj(response, err, out, false, false)
}
return err
} }
// ResponseMeta contains information about the etcd metadata that is associated with func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
// an object. It abstracts the actual underlying objects to prevent coupling with etcd result, err := h.Client.Get(key, true, true)
// and to improve testability. if err != nil {
type ResponseMeta struct { index, ok := etcdErrorIndex(err)
// TTL is the time to live of the node that contained the returned object. It may be if !ok {
// zero or negative in some cases (objects may be expired after the requested index = 0
// expiration time due to server lag). }
TTL int64 nodes := make([]*etcd.Node, 0)
// Expiration is the time at which the node that contained the returned object will expire and be deleted. if IsEtcdNotFound(err) {
// This can be nil if there is no expiration time set for the node. return nodes, index, nil
Expiration *time.Time } else {
// The resource version of the node that contained the returned object. return nodes, index, err
ResourceVersion uint64 }
}
return result.Node.Nodes, result.EtcdIndex, nil
} }
// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more detail.
type EtcdUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error) type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error)
// SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc // SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc
func SimpleUpdate(fn SimpleEtcdUpdateFunc) EtcdUpdateFunc { func SimpleUpdate(fn SimpleEtcdUpdateFunc) StorageUpdateFunc {
return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) { return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
out, err := fn(input) out, err := fn(input)
return out, nil, err return out, nil, err
} }
} }
// GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps // Implements StorageInterface.
// calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error {
// passed to tryUpdate() may change across invocations of tryUpdate() if other writers are simultaneously
// updating it, so tryUpdate() needs to take into account the current contents of the object when
// deciding how the updated object (that it returns) should look.
//
// Example:
//
// h := &util.EtcdHelper{client, encoding, versioning}
// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
// // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey".
//
// cur := input.(*MyType) // Guaranteed to succeed.
//
// // Make a *modification*.
// cur.Counter++
//
// // Return the modified object. Return an error to stop iterating. Return a uint64 to alter
// // the TTL on the object, or nil to keep it the same value.
// return cur, nil, nil
// })
//
func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
v, err := conversion.EnforcePtr(ptrToType) v, err := conversion.EnforcePtr(ptrToType)
if err != nil { if err != nil {
// Panic is appropriate, because this is a programming error. // Panic is appropriate, because this is a programming error.
panic("need ptr to type") panic("need ptr to type")
} }
key = h.PrefixEtcdKey(key) key = h.prefixEtcdKey(key)
for { for {
obj := reflect.New(v.Type()).Interface().(runtime.Object) obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound) origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
@ -564,132 +425,60 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
} }
} }
func (h *EtcdHelper) PrefixEtcdKey(key string) string { func (h *EtcdHelper) prefixEtcdKey(key string) string {
if strings.HasPrefix(key, path.Join("/", h.PathPrefix)) { if strings.HasPrefix(key, path.Join("/", h.PathPrefix)) {
return key return key
} }
return path.Join("/", h.PathPrefix, key) return path.Join("/", h.PathPrefix, key)
} }
// Copies the key-value pairs from their old location to a new location based // etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// on this helper's etcd prefix. All old keys without the prefix are then deleted. // their Node.ModifiedIndex, which is unique across all types.
func (h *EtcdHelper) MigrateKeys(oldPathPrefix string) error { // All implementations must be thread-safe.
// Check to see if a migration is necessary, i.e. is the oldPrefix different type etcdCache interface {
// from the newPrefix? getFromCache(index uint64) (runtime.Object, bool)
if h.PathPrefix == oldPathPrefix { addToCache(index uint64, obj runtime.Object)
return nil }
}
// Get the root node const maxEtcdCacheEntries int = 50000
response, err := h.Client.Get(oldPathPrefix, false, true)
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := h.Copier.Copy(obj.(runtime.Object))
if err != nil { if err != nil {
glog.Infof("Couldn't get the existing etcd root node.") glog.Errorf("Error during DeepCopy of cached object: %q", err)
return err return nil, false
} }
metrics.ObserveCacheHit()
// Perform the migration return objCopy.(runtime.Object), true
if err = h.migrateChildren(response.Node, oldPathPrefix); err != nil {
glog.Infof("Error performing the migration.")
return err
} }
metrics.ObserveCacheMiss()
// Delete the old top-level entry recursively return nil, false
// Quick sanity check: Did the process at least create a new top-level entry?
if _, err = h.Client.Get(h.PathPrefix, false, false); err != nil {
glog.Infof("Couldn't get the new etcd root node.")
return err
} else {
if _, err = h.Client.Delete(oldPathPrefix, true); err != nil {
glog.Infof("Couldn't delete the old etcd root node.")
return err
}
}
return nil
} }
// This recurses through the etcd registry. Each key-value pair is copied with func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
// to a new pair with a prefixed key. startTime := time.Now()
func (h *EtcdHelper) migrateChildren(parent *etcd.Node, oldPathPrefix string) error { defer func() {
for _, child := range parent.Nodes { metrics.ObserveAddCache(startTime)
if child.Dir && len(child.Nodes) > 0 { }()
// Descend into this directory objCopy, err := h.Copier.Copy(obj)
h.migrateChildren(child, oldPathPrefix)
// All children have been migrated, so this directory has
// already been automatically added.
continue
}
// Check if already prefixed (maybe we got interrupted in last attempt)
if strings.HasPrefix(child.Key, h.PathPrefix) {
// Skip this iteration
continue
}
// Create new entry
newKey := path.Join("/", h.PathPrefix, strings.TrimPrefix(child.Key, oldPathPrefix))
if _, err := h.Client.Create(newKey, child.Value, 0); err != nil {
// Assuming etcd is still available, this is due to the key
// already existing, in which case we can skip.
continue
}
}
return nil
}
// GetEtcdVersion performs a version check against the provided Etcd server,
// returning the string response, and error (if any).
func GetEtcdVersion(host string) (string, error) {
response, err := http.Get(host + "/version")
if err != nil { if err != nil {
return "", err glog.Errorf("Error during DeepCopy of cached object: %q", err)
return
} }
defer response.Body.Close() isOverwrite := h.cache.Add(index, objCopy)
if response.StatusCode != http.StatusOK { if !isOverwrite {
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err) metrics.ObserveNewEntry()
} }
versionBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(versionBytes), nil
}
func startEtcd() (*exec.Cmd, error) {
cmd := exec.Command("etcd")
err := cmd.Start()
if err != nil {
return nil, err
}
return cmd, nil
}
func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) {
_, err := GetEtcdVersion(server)
if err != nil {
glog.Infof("Failed to find etcd, attempting to start.")
_, err := startEtcd()
if err != nil {
return nil, err
}
}
servers := []string{server}
return etcd.NewClient(servers), nil
}
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:health`
}
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
} }

View File

@ -846,13 +846,13 @@ func TestPrefixEtcdKey(t *testing.T) {
// Verify prefix is added // Verify prefix is added
keyBefore := baseKey keyBefore := baseKey
keyAfter := helper.PrefixEtcdKey(keyBefore) keyAfter := helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyAfter, path.Join(prefix, baseKey), "Prefix incorrectly added by EtcdHelper") assert.Equal(t, keyAfter, path.Join(prefix, baseKey), "Prefix incorrectly added by EtcdHelper")
// Verify prefix is not added // Verify prefix is not added
keyBefore = path.Join(prefix, baseKey) keyBefore = path.Join(prefix, baseKey)
keyAfter = helper.PrefixEtcdKey(keyBefore) keyAfter = helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper") assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper")
} }

View File

@ -71,7 +71,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
// watch.Interface. resourceVersion may be used to specify what version to begin // watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates). // watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.PrefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h) w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion) go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil return w, nil
@ -81,7 +81,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
// API objects and sent down the returned watch.Interface. // API objects and sent down the returned watch.Interface.
// Errors will be sent down the channel. // Errors will be sent down the channel.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.PrefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h) w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion) go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil return w, nil
@ -103,12 +103,12 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc
// }) // })
// //
// Errors will be sent down the channel. // Errors will be sent down the channel.
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { /*func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
key = h.PrefixEtcdKey(key) key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h) w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h)
go w.etcdWatch(h.Client, key, resourceVersion) go w.etcdWatch(h.Client, key, resourceVersion)
return w return w
} }*/
// TransformFunc attempts to convert an object to another object for use with a watcher. // TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error) type TransformFunc func(runtime.Object) (runtime.Object, error)

121
pkg/tools/etcd_util.go Normal file
View File

@ -0,0 +1,121 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tools
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
// IsEtcdNotFound returns true iff err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNotFound)
}
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}
// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*etcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
}
// GetEtcdVersion performs a version check against the provided Etcd server,
// returning the string response, and error (if any).
func GetEtcdVersion(host string) (string, error) {
response, err := http.Get(host + "/version")
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
}
versionBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(versionBytes), nil
}
func startEtcd() (*exec.Cmd, error) {
cmd := exec.Command("etcd")
err := cmd.Start()
if err != nil {
return nil, err
}
return cmd, nil
}
func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) {
_, err := GetEtcdVersion(server)
if err != nil {
glog.Infof("Failed to find etcd, attempting to start.")
_, err := startEtcd()
if err != nil {
return nil, err
}
}
servers := []string{server}
return etcd.NewClient(servers), nil
}
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:health`
}
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
}

View File

@ -17,9 +17,12 @@ limitations under the License.
package tools package tools
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/coreos/go-etcd/etcd"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
) )
const ( const (
@ -64,3 +67,108 @@ type EtcdVersioner interface {
// Should return an error if the specified object does not have a persistable version. // Should return an error if the specified object does not have a persistable version.
ObjectResourceVersion(obj runtime.Object) (uint64, error) ObjectResourceVersion(obj runtime.Object) (uint64, error)
} }
// ResponseMeta contains information about the etcd metadata that is associated with
// an object. It abstracts the actual underlying objects to prevent coupling with etcd
// and to improve testability.
type ResponseMeta struct {
// TTL is the time to live of the node that contained the returned object. It may be
// zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag).
TTL int64
// Expiration is the time at which the node that contained the returned object will expire and be deleted.
// This can be nil if there is no expiration time set for the node.
Expiration *time.Time
// The resource version of the node that contained the returned object.
ResourceVersion uint64
}
// Pass an StorageUpdateFunc to StorageInterface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.
type StorageUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
// StorageInterface offers a common interface for object marshaling/unmarshling operations and
// hids all the storage-related operations behind it.
type StorageInterface interface {
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from etcd.
//
// TODO(wojtekt): Rename to Create().
CreateObj(key string, obj, out runtime.Object, ttl uint64) error
// SetObj marshals obj via json and stores in etcd under key. Will do an atomic update
// if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever).
// If no error is returned and out is not nil, out will be set to the read value from etcd.
//
// TODO(wojtekt): Rename to Set() (or Update?).
SetObj(key string, obj, out runtime.Object, ttl uint64) error
// DeleteObj removes the specified key and returns the value that existed at that spot.
//
// TODO(wojtekt): Rename to Delete().
DeleteObj(key string, out runtime.Object) error
// Delete removes the specified key.
//
// TODO(wojtekt): Unify it with DeleteObj().
Delete(key string, recursive bool) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// Treats empty responses and nil response nodes exactly like a not found error.
//
// TODO(wojtekt): Rename to Get().
ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error
// ExtractObjToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
//
// TODO(wojtekt): Rename to GetToList().
ExtractObjToList(key string, listObj runtime.Object) error
// ExtractToList unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
//
// TODO(wojtekt): Rename to List().
ExtractToList(key string, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is etcd index conflict.
// Note that object passed to tryUpdate may change acress incovations of tryUpdate() if
// other writers are simultanously updateing it, to tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
//
// Exmaple:
//
// s := /* implementation of StorageInterface */
// err := s.GuaranteedUpdate(
// "myKey", &MyType{}, true,
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
// // Before each incovation of the user defined function, "input" is reset to
// // etcd's current contents for "myKey".
// curr := input.(*MyType) // Guaranteed to succeed.
//
// // Make the modification
// curr.Counter++
//
// // Return the modified object - return an error to stop iterating. Return
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
// return cur, nil, nil
// }
// })
GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error
}

View File

@ -142,47 +142,3 @@ func TestWatch(t *testing.T) {
} }
}) })
} }
func TestMigrateKeys(t *testing.T) {
withEtcdKey(func(oldPrefix string) {
client := newEtcdClient()
helper := tools.NewEtcdHelper(client, testapi.Codec(), oldPrefix)
key1 := oldPrefix + "/obj1"
key2 := oldPrefix + "/foo/obj2"
key3 := oldPrefix + "/foo/bar/obj3"
// Create a new entres - these are the 'existing' entries with old prefix
_, _ = helper.Client.Create(key1, "foo", 0)
_, _ = helper.Client.Create(key2, "foo", 0)
_, _ = helper.Client.Create(key3, "foo", 0)
// Change the helper to a new prefix
newPrefix := "/kubernetes.io"
helper = tools.NewEtcdHelper(client, testapi.Codec(), newPrefix)
// Migrate the keys
err := helper.MigrateKeys(oldPrefix)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Check the resources are at the correct new location
newNames := []string{
newPrefix + "/obj1",
newPrefix + "/foo/obj2",
newPrefix + "/foo/bar/obj3",
}
for _, name := range newNames {
_, err := helper.Client.Get(name, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Check the old locations are removed
if _, err := helper.Client.Get(oldPrefix, false, false); err == nil {
t.Fatalf("Old directory still exists.")
}
})
}