mirror of
https://github.com/rancher/os.git
synced 2025-09-01 06:40:31 +00:00
migrate to upstream libcompose in one and a half go
This commit is contained in:
729
vendor/github.com/docker/libnetwork/store.go
generated
vendored
729
vendor/github.com/docker/libnetwork/store.go
generated
vendored
@@ -1,371 +1,414 @@
|
||||
package libnetwork
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libnetwork/datastore"
|
||||
"github.com/docker/libnetwork/types"
|
||||
)
|
||||
|
||||
func (c *controller) validateDatastoreConfig() bool {
|
||||
return c.cfg != nil && c.cfg.Datastore.Client.Provider != "" && c.cfg.Datastore.Client.Address != ""
|
||||
}
|
||||
|
||||
func (c *controller) initDataStore() error {
|
||||
func (c *controller) initStores() error {
|
||||
c.Lock()
|
||||
cfg := c.cfg
|
||||
c.Unlock()
|
||||
if !c.validateDatastoreConfig() {
|
||||
return fmt.Errorf("datastore initialization requires a valid configuration")
|
||||
}
|
||||
|
||||
store, err := datastore.NewDataStore(&cfg.Datastore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
c.store = store
|
||||
c.Unlock()
|
||||
|
||||
nws, err := c.getNetworksFromStore()
|
||||
if err == nil {
|
||||
c.processNetworkUpdate(nws, nil)
|
||||
} else if err != datastore.ErrKeyNotFound {
|
||||
log.Warnf("failed to read networks from datastore during init : %v", err)
|
||||
}
|
||||
return c.watchNetworks()
|
||||
}
|
||||
|
||||
func (c *controller) getNetworksFromStore() ([]*store.KVPair, error) {
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
|
||||
}
|
||||
|
||||
func (c *controller) newNetworkFromStore(n *network) error {
|
||||
n.Lock()
|
||||
n.ctrlr = c
|
||||
n.endpoints = endpointTable{}
|
||||
n.Unlock()
|
||||
|
||||
return c.addNetwork(n)
|
||||
}
|
||||
|
||||
func (c *controller) updateNetworkToStore(n *network) error {
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
|
||||
return nil
|
||||
}
|
||||
|
||||
return cs.PutObjectAtomic(n)
|
||||
}
|
||||
|
||||
func (c *controller) deleteNetworkFromStore(n *network) error {
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name())
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := cs.DeleteObjectAtomic(n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) getNetworkFromStore(nid types.UUID) (*network, error) {
|
||||
n := network{id: nid}
|
||||
if err := c.store.GetObject(datastore.Key(n.Key()...), &n); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &n, nil
|
||||
}
|
||||
|
||||
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
|
||||
ep.Lock()
|
||||
n := ep.network
|
||||
id := ep.id
|
||||
ep.Unlock()
|
||||
|
||||
_, err := n.EndpointByID(string(id))
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrNoSuchEndpoint); ok {
|
||||
return n.addEndpoint(ep)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *controller) updateEndpointToStore(ep *endpoint) error {
|
||||
ep.Lock()
|
||||
n := ep.network
|
||||
name := ep.name
|
||||
ep.Unlock()
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. endpoint %s is not added to the store", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
return cs.PutObjectAtomic(ep)
|
||||
}
|
||||
|
||||
func (c *controller) getEndpointFromStore(eid types.UUID) (*endpoint, error) {
|
||||
ep := endpoint{id: eid}
|
||||
if err := c.store.GetObject(datastore.Key(ep.Key()...), &ep); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ep, nil
|
||||
}
|
||||
|
||||
func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
|
||||
ep.Lock()
|
||||
n := ep.network
|
||||
ep.Unlock()
|
||||
global, err := n.isGlobalScoped()
|
||||
if err != nil || !global {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
if cs == nil {
|
||||
log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name())
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := cs.DeleteObjectAtomic(ep); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) watchNetworks() error {
|
||||
if !c.validateDatastoreConfig() {
|
||||
return nil
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
cs := c.store
|
||||
c.Unlock()
|
||||
|
||||
nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case nws := <-nwPairs:
|
||||
c.Lock()
|
||||
tmpview := networkTable{}
|
||||
lview := c.networks
|
||||
c.Unlock()
|
||||
for k, v := range lview {
|
||||
global, _ := v.isGlobalScoped()
|
||||
if global {
|
||||
tmpview[k] = v
|
||||
}
|
||||
}
|
||||
c.processNetworkUpdate(nws, &tmpview)
|
||||
|
||||
// Delete processing
|
||||
for k := range tmpview {
|
||||
c.Lock()
|
||||
existing, ok := c.networks[k]
|
||||
c.Unlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
tmp := network{}
|
||||
if err := c.store.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
|
||||
continue
|
||||
}
|
||||
if err := existing.deleteNetwork(); err != nil {
|
||||
log.Debugf("Delete failed %s: %s", existing.name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *network) watchEndpoints() error {
|
||||
if !n.ctrlr.validateDatastoreConfig() {
|
||||
return nil
|
||||
}
|
||||
|
||||
n.Lock()
|
||||
cs := n.ctrlr.store
|
||||
tmp := endpoint{network: n}
|
||||
n.stopWatchCh = make(chan struct{})
|
||||
stopCh := n.stopWatchCh
|
||||
n.Unlock()
|
||||
|
||||
epPairs, err := cs.KVStore().WatchTree(datastore.Key(tmp.KeyPrefix()...), stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case eps := <-epPairs:
|
||||
n.Lock()
|
||||
tmpview := endpointTable{}
|
||||
lview := n.endpoints
|
||||
n.Unlock()
|
||||
for k, v := range lview {
|
||||
global, _ := v.network.isGlobalScoped()
|
||||
if global {
|
||||
tmpview[k] = v
|
||||
}
|
||||
}
|
||||
for _, epe := range eps {
|
||||
var ep endpoint
|
||||
err := json.Unmarshal(epe.Value, &ep)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
delete(tmpview, ep.id)
|
||||
ep.SetIndex(epe.LastIndex)
|
||||
ep.network = n
|
||||
if n.ctrlr.processEndpointUpdate(&ep) {
|
||||
err = n.ctrlr.newEndpointFromStore(epe.Key, &ep)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Delete processing
|
||||
for k := range tmpview {
|
||||
n.Lock()
|
||||
existing, ok := n.endpoints[k]
|
||||
n.Unlock()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
tmp := endpoint{}
|
||||
if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
|
||||
continue
|
||||
}
|
||||
if err := existing.deleteEndpoint(); err != nil {
|
||||
log.Debugf("Delete failed %s: %s", existing.name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *network) stopWatch() {
|
||||
n.Lock()
|
||||
if n.stopWatchCh != nil {
|
||||
close(n.stopWatchCh)
|
||||
n.stopWatchCh = nil
|
||||
}
|
||||
n.Unlock()
|
||||
}
|
||||
|
||||
func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) {
|
||||
for _, kve := range nws {
|
||||
var n network
|
||||
err := json.Unmarshal(kve.Value, &n)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if prune != nil {
|
||||
delete(*prune, n.id)
|
||||
}
|
||||
n.SetIndex(kve.LastIndex)
|
||||
c.Lock()
|
||||
existing, ok := c.networks[n.id]
|
||||
if c.cfg == nil {
|
||||
c.Unlock()
|
||||
if ok {
|
||||
existing.Lock()
|
||||
// Skip existing network update
|
||||
if existing.dbIndex != n.Index() {
|
||||
// Can't use SetIndex() since existing is locked.
|
||||
existing.dbIndex = n.Index()
|
||||
existing.dbExists = true
|
||||
existing.endpointCnt = n.endpointCnt
|
||||
return nil
|
||||
}
|
||||
scopeConfigs := c.cfg.Scopes
|
||||
c.Unlock()
|
||||
|
||||
for scope, scfg := range scopeConfigs {
|
||||
store, err := datastore.NewDataStore(scope, scfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Lock()
|
||||
c.stores = append(c.stores, store)
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
c.startWatch()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) closeStores() {
|
||||
for _, store := range c.getStores() {
|
||||
store.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) getStore(scope string) datastore.DataStore {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
for _, store := range c.stores {
|
||||
if store.Scope() == scope {
|
||||
return store
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) getStores() []datastore.DataStore {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
return c.stores
|
||||
}
|
||||
|
||||
func (c *controller) getNetworkFromStore(nid string) (*network, error) {
|
||||
for _, store := range c.getStores() {
|
||||
n := &network{id: nid, ctrlr: c}
|
||||
err := store.GetObject(datastore.Key(n.Key()...), n)
|
||||
// Continue searching in the next store if the key is not found in this store
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
log.Debugf("could not find network %s: %v", nid, err)
|
||||
}
|
||||
existing.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if err = c.newNetworkFromStore(&n); err != nil {
|
||||
log.Error(err)
|
||||
ec := &endpointCnt{n: n}
|
||||
err = store.GetObject(datastore.Key(ec.Key()...), ec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err)
|
||||
}
|
||||
|
||||
n.epCnt = ec
|
||||
return n, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("network %s not found", nid)
|
||||
}
|
||||
|
||||
func (c *controller) getNetworksForScope(scope string) ([]*network, error) {
|
||||
var nl []*network
|
||||
|
||||
store := c.getStore(scope)
|
||||
if store == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
|
||||
&network{ctrlr: c})
|
||||
if err != nil && err != datastore.ErrKeyNotFound {
|
||||
return nil, fmt.Errorf("failed to get networks for scope %s: %v",
|
||||
scope, err)
|
||||
}
|
||||
|
||||
for _, kvo := range kvol {
|
||||
n := kvo.(*network)
|
||||
n.ctrlr = c
|
||||
|
||||
ec := &endpointCnt{n: n}
|
||||
err = store.GetObject(datastore.Key(ec.Key()...), ec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
|
||||
}
|
||||
|
||||
n.epCnt = ec
|
||||
nl = append(nl, n)
|
||||
}
|
||||
|
||||
return nl, nil
|
||||
}
|
||||
|
||||
func (c *controller) getNetworksFromStore() ([]*network, error) {
|
||||
var nl []*network
|
||||
|
||||
for _, store := range c.getStores() {
|
||||
kvol, err := store.List(datastore.Key(datastore.NetworkKeyPrefix),
|
||||
&network{ctrlr: c})
|
||||
// Continue searching in the next store if no keys found in this store
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
log.Debugf("failed to get networks for scope %s: %v", store.Scope(), err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, kvo := range kvol {
|
||||
n := kvo.(*network)
|
||||
n.ctrlr = c
|
||||
|
||||
ec := &endpointCnt{n: n}
|
||||
err = store.GetObject(datastore.Key(ec.Key()...), ec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err)
|
||||
}
|
||||
|
||||
n.epCnt = ec
|
||||
nl = append(nl, n)
|
||||
}
|
||||
}
|
||||
|
||||
return nl, nil
|
||||
}
|
||||
|
||||
func (n *network) getEndpointFromStore(eid string) (*endpoint, error) {
|
||||
store := n.ctrlr.getStore(n.Scope())
|
||||
if store == nil {
|
||||
return nil, fmt.Errorf("could not find endpoint %s: datastore not found for scope %s", eid, n.Scope())
|
||||
}
|
||||
|
||||
ep := &endpoint{id: eid, network: n}
|
||||
err := store.GetObject(datastore.Key(ep.Key()...), ep)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not find endpoint %s: %v", eid, err)
|
||||
}
|
||||
return ep, nil
|
||||
}
|
||||
|
||||
func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
|
||||
var epl []*endpoint
|
||||
|
||||
tmp := endpoint{network: n}
|
||||
for _, store := range n.getController().getStores() {
|
||||
kvol, err := store.List(datastore.Key(tmp.KeyPrefix()...), &endpoint{network: n})
|
||||
// Continue searching in the next store if no keys found in this store
|
||||
if err != nil {
|
||||
if err != datastore.ErrKeyNotFound {
|
||||
log.Debugf("failed to get endpoints for network %s scope %s: %v",
|
||||
n.Name(), store.Scope(), err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, kvo := range kvol {
|
||||
ep := kvo.(*endpoint)
|
||||
ep.network = n
|
||||
epl = append(epl, ep)
|
||||
}
|
||||
}
|
||||
|
||||
return epl, nil
|
||||
}
|
||||
|
||||
func (c *controller) updateToStore(kvObject datastore.KVObject) error {
|
||||
cs := c.getStore(kvObject.DataScope())
|
||||
if cs == nil {
|
||||
log.Warnf("datastore for scope %s not initialized. kv object %s is not added to the store", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := cs.PutObjectAtomic(kvObject); err != nil {
|
||||
if err == datastore.ErrKeyModified {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("failed to update store for object type %T: %v", kvObject, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
|
||||
cs := c.getStore(kvObject.DataScope())
|
||||
if cs == nil {
|
||||
log.Debugf("datastore for scope %s not initialized. kv object %s is not deleted from datastore", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
|
||||
return nil
|
||||
}
|
||||
|
||||
retry:
|
||||
if err := cs.DeleteObjectAtomic(kvObject); err != nil {
|
||||
if err == datastore.ErrKeyModified {
|
||||
if err := cs.GetObject(datastore.Key(kvObject.Key()...), kvObject); err != nil {
|
||||
return fmt.Errorf("could not update the kvobject to latest when trying to delete: %v", err)
|
||||
}
|
||||
goto retry
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type netWatch struct {
|
||||
localEps map[string]*endpoint
|
||||
remoteEps map[string]*endpoint
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func (c *controller) getLocalEps(nw *netWatch) []*endpoint {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
var epl []*endpoint
|
||||
for _, ep := range nw.localEps {
|
||||
epl = append(epl, ep)
|
||||
}
|
||||
|
||||
return epl
|
||||
}
|
||||
|
||||
func (c *controller) watchSvcRecord(ep *endpoint) {
|
||||
c.watchCh <- ep
|
||||
}
|
||||
|
||||
func (c *controller) unWatchSvcRecord(ep *endpoint) {
|
||||
c.unWatchCh <- ep
|
||||
}
|
||||
|
||||
func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) {
|
||||
for {
|
||||
select {
|
||||
case <-nw.stopCh:
|
||||
return
|
||||
case o := <-ecCh:
|
||||
ec := o.(*endpointCnt)
|
||||
|
||||
epl, err := ec.n.getEndpointsFromStore()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
var addEp []*endpoint
|
||||
|
||||
delEpMap := make(map[string]*endpoint)
|
||||
renameEpMap := make(map[string]bool)
|
||||
for k, v := range nw.remoteEps {
|
||||
delEpMap[k] = v
|
||||
}
|
||||
|
||||
for _, lEp := range epl {
|
||||
if _, ok := nw.localEps[lEp.ID()]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if ep, ok := nw.remoteEps[lEp.ID()]; ok {
|
||||
// On a container rename EP ID will remain
|
||||
// the same but the name will change. service
|
||||
// records should reflect the change.
|
||||
// Keep old EP entry in the delEpMap and add
|
||||
// EP from the store (which has the new name)
|
||||
// into the new list
|
||||
if lEp.name == ep.name {
|
||||
delete(delEpMap, lEp.ID())
|
||||
continue
|
||||
}
|
||||
renameEpMap[lEp.ID()] = true
|
||||
}
|
||||
nw.remoteEps[lEp.ID()] = lEp
|
||||
addEp = append(addEp, lEp)
|
||||
}
|
||||
|
||||
// EPs whose name are to be deleted from the svc records
|
||||
// should also be removed from nw's remote EP list, except
|
||||
// the ones that are getting renamed.
|
||||
for _, lEp := range delEpMap {
|
||||
if !renameEpMap[lEp.ID()] {
|
||||
delete(nw.remoteEps, lEp.ID())
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
|
||||
for _, lEp := range delEpMap {
|
||||
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), false)
|
||||
|
||||
}
|
||||
for _, lEp := range addEp {
|
||||
ep.getNetwork().updateSvcRecord(lEp, c.getLocalEps(nw), true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
|
||||
nw := ep.network
|
||||
if nw == nil {
|
||||
return true
|
||||
func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
|
||||
c.Lock()
|
||||
nw, ok := nmap[ep.getNetwork().ID()]
|
||||
c.Unlock()
|
||||
|
||||
if ok {
|
||||
// Update the svc db for the local endpoint join right away
|
||||
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
|
||||
|
||||
c.Lock()
|
||||
nw.localEps[ep.ID()] = ep
|
||||
|
||||
// If we had learned that from the kv store remove it
|
||||
// from remote ep list now that we know that this is
|
||||
// indeed a local endpoint
|
||||
delete(nw.remoteEps, ep.ID())
|
||||
c.Unlock()
|
||||
return
|
||||
}
|
||||
nw.Lock()
|
||||
id := nw.id
|
||||
nw.Unlock()
|
||||
|
||||
nw = &netWatch{
|
||||
localEps: make(map[string]*endpoint),
|
||||
remoteEps: make(map[string]*endpoint),
|
||||
}
|
||||
|
||||
// Update the svc db for the local endpoint join right away
|
||||
// Do this before adding this ep to localEps so that we don't
|
||||
// try to update this ep's container's svc records
|
||||
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), true)
|
||||
|
||||
c.Lock()
|
||||
n, ok := c.networks[id]
|
||||
nw.localEps[ep.ID()] = ep
|
||||
nmap[ep.getNetwork().ID()] = nw
|
||||
nw.stopCh = make(chan struct{})
|
||||
c.Unlock()
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
existing, _ := n.EndpointByID(string(ep.id))
|
||||
if existing == nil {
|
||||
return true
|
||||
|
||||
store := c.getStore(ep.getNetwork().DataScope())
|
||||
if store == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ee := existing.(*endpoint)
|
||||
ee.Lock()
|
||||
if ee.dbIndex != ep.Index() {
|
||||
// Can't use SetIndex() because ee is locked.
|
||||
ee.dbIndex = ep.Index()
|
||||
ee.dbExists = true
|
||||
if ee.container != nil && ep.container != nil {
|
||||
// we care only about the container id
|
||||
ee.container.id = ep.container.id
|
||||
} else {
|
||||
// we still care only about the container id, but this is a short-cut to communicate join or leave operation
|
||||
ee.container = ep.container
|
||||
if !store.Watchable() {
|
||||
return
|
||||
}
|
||||
|
||||
ch, err := store.Watch(ep.getNetwork().getEpCnt(), nw.stopCh)
|
||||
if err != nil {
|
||||
log.Warnf("Error creating watch for network: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
go c.networkWatchLoop(nw, ep, ch)
|
||||
}
|
||||
|
||||
func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
|
||||
c.Lock()
|
||||
nw, ok := nmap[ep.getNetwork().ID()]
|
||||
|
||||
if ok {
|
||||
delete(nw.localEps, ep.ID())
|
||||
c.Unlock()
|
||||
|
||||
// Update the svc db about local endpoint leave right away
|
||||
// Do this after we remove this ep from localEps so that we
|
||||
// don't try to remove this svc record from this ep's container.
|
||||
ep.getNetwork().updateSvcRecord(ep, c.getLocalEps(nw), false)
|
||||
|
||||
c.Lock()
|
||||
if len(nw.localEps) == 0 {
|
||||
close(nw.stopCh)
|
||||
|
||||
// This is the last container going away for the network. Destroy
|
||||
// this network's svc db entry
|
||||
delete(c.svcDb, ep.getNetwork().ID())
|
||||
|
||||
delete(nmap, ep.getNetwork().ID())
|
||||
}
|
||||
}
|
||||
ee.Unlock()
|
||||
|
||||
return false
|
||||
c.Unlock()
|
||||
}
|
||||
|
||||
func (c *controller) watchLoop() {
|
||||
for {
|
||||
select {
|
||||
case ep := <-c.watchCh:
|
||||
c.processEndpointCreate(c.nmap, ep)
|
||||
case ep := <-c.unWatchCh:
|
||||
c.processEndpointDelete(c.nmap, ep)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) startWatch() {
|
||||
c.watchCh = make(chan *endpoint)
|
||||
c.unWatchCh = make(chan *endpoint)
|
||||
c.nmap = make(map[string]*netWatch)
|
||||
|
||||
go c.watchLoop()
|
||||
}
|
||||
|
Reference in New Issue
Block a user