mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	Supports additional options for CreateDataset and DeleteDataset for dynamic provisioning. Bugfix for timeouts during CreateDataset
		
			
				
	
	
		
			377 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package flocker
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"encoding/json"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net/http"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
// From https://github.com/ClusterHQ/flocker-docker-plugin/blob/master/flockerdockerplugin/adapter.py#L18
 | 
						|
const defaultVolumeSize = json.Number("107374182400")
 | 
						|
 | 
						|
var (
 | 
						|
	// A volume can take a long time to be available, if we don't want
 | 
						|
	// Kubernetes to wait forever we need to stop trying after some time, that
 | 
						|
	// time is defined here
 | 
						|
	timeoutWaitingForVolume = 2 * time.Minute
 | 
						|
	tickerWaitingForVolume  = 5 * time.Second
 | 
						|
 | 
						|
	errStateNotFound         = errors.New("State not found by Dataset ID")
 | 
						|
	errConfigurationNotFound = errors.New("Configuration not found by Name")
 | 
						|
 | 
						|
	errFlockerControlServiceHost = errors.New("The volume config must have a key CONTROL_SERVICE_HOST defined in the OtherAttributes field")
 | 
						|
	errFlockerControlServicePort = errors.New("The volume config must have a key CONTROL_SERVICE_PORT defined in the OtherAttributes field")
 | 
						|
 | 
						|
	errVolumeAlreadyExists = errors.New("The volume already exists")
 | 
						|
	errVolumeDoesNotExist  = errors.New("The volume does not exist")
 | 
						|
 | 
						|
	errUpdatingDataset = errors.New("It was impossible to update the dataset")
 | 
						|
)
 | 
						|
 | 
						|
// Clientable exposes the needed methods to implement your own Flocker Client.
 | 
						|
type Clientable interface {
 | 
						|
	CreateDataset(options *CreateDatasetOptions) (*DatasetState, error)
 | 
						|
	DeleteDataset(datasetID string) error
 | 
						|
 | 
						|
	GetDatasetState(datasetID string) (*DatasetState, error)
 | 
						|
	GetDatasetID(metaName string) (datasetID string, err error)
 | 
						|
	GetPrimaryUUID() (primaryUUID string, err error)
 | 
						|
 | 
						|
	ListNodes() (nodes []NodeState, err error)
 | 
						|
 | 
						|
	UpdatePrimaryForDataset(primaryUUID, datasetID string) (*DatasetState, error)
 | 
						|
}
 | 
						|
 | 
						|
// Client is a default Flocker Client.
 | 
						|
type Client struct {
 | 
						|
	*http.Client
 | 
						|
 | 
						|
	schema  string
 | 
						|
	host    string
 | 
						|
	port    int
 | 
						|
	version string
 | 
						|
 | 
						|
	clientIP string
 | 
						|
 | 
						|
	maximumSize json.Number
 | 
						|
}
 | 
						|
 | 
						|
var _ Clientable = &Client{}
 | 
						|
 | 
						|
// NewClient creates a wrapper over http.Client to communicate with the flocker control service.
 | 
						|
func NewClient(host string, port int, clientIP string, caCertPath, keyPath, certPath string) (*Client, error) {
 | 
						|
	client, err := newTLSClient(caCertPath, keyPath, certPath)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &Client{
 | 
						|
		Client:      client,
 | 
						|
		schema:      "https",
 | 
						|
		host:        host,
 | 
						|
		port:        port,
 | 
						|
		version:     "v1",
 | 
						|
		maximumSize: defaultVolumeSize,
 | 
						|
		clientIP:    clientIP,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
request do a request using the http.Client embedded to the control service
 | 
						|
and returns the response or an error in case it happens.
 | 
						|
 | 
						|
Note: you will need to deal with the response body call to Close if you
 | 
						|
don't want to deal with problems later.
 | 
						|
*/
 | 
						|
func (c Client) request(method, url string, payload interface{}) (*http.Response, error) {
 | 
						|
	var (
 | 
						|
		b   []byte
 | 
						|
		err error
 | 
						|
	)
 | 
						|
 | 
						|
	if method == "POST" { // Just allow payload on POST
 | 
						|
		b, err = json.Marshal(payload)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	req, err := http.NewRequest(method, url, bytes.NewBuffer(b))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	req.Header.Set("Content-Type", "application/json")
 | 
						|
 | 
						|
	// REMEMBER TO CLOSE THE BODY IN THE OUTSIDE FUNCTION
 | 
						|
	return c.Do(req)
 | 
						|
}
 | 
						|
 | 
						|
// post performs a post request with the indicated payload
 | 
						|
func (c Client) post(url string, payload interface{}) (*http.Response, error) {
 | 
						|
	return c.request("POST", url, payload)
 | 
						|
}
 | 
						|
 | 
						|
// delete performs a delete request with the indicated payload
 | 
						|
func (c Client) delete(url string, payload interface{}) (*http.Response, error) {
 | 
						|
	return c.request("DELETE", url, payload)
 | 
						|
}
 | 
						|
 | 
						|
// get performs a get request
 | 
						|
func (c Client) get(url string) (*http.Response, error) {
 | 
						|
	return c.request("GET", url, nil)
 | 
						|
}
 | 
						|
 | 
						|
// getURL returns a full URI to the control service
 | 
						|
func (c Client) getURL(path string) string {
 | 
						|
	return fmt.Sprintf("%s://%s:%d/%s/%s", c.schema, c.host, c.port, c.version, path)
 | 
						|
}
 | 
						|
 | 
						|
type configurationPayload struct {
 | 
						|
	Deleted     bool            `json:"deleted"`
 | 
						|
	Primary     string          `json:"primary"`
 | 
						|
	DatasetID   string          `json:"dataset_id,omitempty"`
 | 
						|
	MaximumSize json.Number     `json:"maximum_size,omitempty"`
 | 
						|
	Metadata    metadataPayload `json:"metadata,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type CreateDatasetOptions struct {
 | 
						|
	Primary     string            `json:"primary"`
 | 
						|
	DatasetID   string            `json:"dataset_id,omitempty"`
 | 
						|
	MaximumSize int64             `json:"maximum_size,omitempty"`
 | 
						|
	Metadata    map[string]string `json:"metadata,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type metadataPayload struct {
 | 
						|
	Name string `json:"name,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type DatasetState struct {
 | 
						|
	Path        string      `json:"path"`
 | 
						|
	DatasetID   string      `json:"dataset_id"`
 | 
						|
	Primary     string      `json:"primary,omitempty"`
 | 
						|
	MaximumSize json.Number `json:"maximum_size,omitempty"`
 | 
						|
}
 | 
						|
 | 
						|
type datasetStatePayload struct {
 | 
						|
	*DatasetState
 | 
						|
}
 | 
						|
 | 
						|
type NodeState struct {
 | 
						|
	UUID string `json:"uuid"`
 | 
						|
	Host string `json:"host"`
 | 
						|
}
 | 
						|
 | 
						|
// findIDInConfigurationsPayload returns the datasetID if it was found in the
 | 
						|
// configurations payload, otherwise it will return an error.
 | 
						|
func (c Client) findIDInConfigurationsPayload(body io.ReadCloser, name string) (datasetID string, err error) {
 | 
						|
	var configurations []configurationPayload
 | 
						|
	if err = json.NewDecoder(body).Decode(&configurations); err == nil {
 | 
						|
		for _, r := range configurations {
 | 
						|
			if r.Metadata.Name == name {
 | 
						|
				return r.DatasetID, nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return "", errConfigurationNotFound
 | 
						|
	}
 | 
						|
	return "", err
 | 
						|
}
 | 
						|
 | 
						|
// ListNodes returns a list of dataset agent nodes from Flocker Control Service
 | 
						|
func (c *Client) ListNodes() (nodes []NodeState, err error) {
 | 
						|
	resp, err := c.get(c.getURL("state/nodes"))
 | 
						|
	if err != nil {
 | 
						|
		return []NodeState{}, err
 | 
						|
	}
 | 
						|
	defer resp.Body.Close()
 | 
						|
	if resp.StatusCode >= 300 {
 | 
						|
		return []NodeState{}, fmt.Errorf("Expected: {1,2}xx listing nodes, got: %d", resp.StatusCode)
 | 
						|
	}
 | 
						|
 | 
						|
	err = json.NewDecoder(resp.Body).Decode(&nodes)
 | 
						|
	if err != nil {
 | 
						|
		return []NodeState{}, err
 | 
						|
	}
 | 
						|
	return nodes, err
 | 
						|
}
 | 
						|
 | 
						|
// GetPrimaryUUID returns the UUID of the primary Flocker Control Service for
 | 
						|
// the given host.
 | 
						|
func (c Client) GetPrimaryUUID() (uuid string, err error) {
 | 
						|
	states, err := c.ListNodes()
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	for _, s := range states {
 | 
						|
		if s.Host == c.clientIP {
 | 
						|
			return s.UUID, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return "", fmt.Errorf("No node found with IP '%s', available nodes %+v", c.clientIP, states)
 | 
						|
}
 | 
						|
 | 
						|
// DeleteDataset performs a delete request to the given datasetID
 | 
						|
func (c *Client) DeleteDataset(datasetID string) error {
 | 
						|
	url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
 | 
						|
	resp, err := c.delete(url, nil)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer resp.Body.Close()
 | 
						|
 | 
						|
	if resp.StatusCode >= 300 {
 | 
						|
		return fmt.Errorf("Expected: {1,2}xx deleting the dataset %s, got: %d", datasetID, resp.StatusCode)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// GetDatasetState performs a get request to get the state of the given datasetID, if
 | 
						|
// something goes wrong or the datasetID was not found it returns an error.
 | 
						|
func (c Client) GetDatasetState(datasetID string) (*DatasetState, error) {
 | 
						|
	resp, err := c.get(c.getURL("state/datasets"))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	defer resp.Body.Close()
 | 
						|
 | 
						|
	var states []datasetStatePayload
 | 
						|
	if err = json.NewDecoder(resp.Body).Decode(&states); err == nil {
 | 
						|
		for _, s := range states {
 | 
						|
			if s.DatasetID == datasetID {
 | 
						|
				return s.DatasetState, nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return nil, errStateNotFound
 | 
						|
	}
 | 
						|
 | 
						|
	return nil, err
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
CreateDataset creates a volume in Flocker, waits for it to be ready and
 | 
						|
returns the dataset id.
 | 
						|
 | 
						|
This process is a little bit complex but follows this flow:
 | 
						|
 | 
						|
1. Find the Flocker Control Service UUID
 | 
						|
2. If it already exists an error is returned
 | 
						|
3. If it didn't previously exist, wait for it to be ready
 | 
						|
*/
 | 
						|
func (c *Client) CreateDataset(options *CreateDatasetOptions) (datasetState *DatasetState, err error) {
 | 
						|
	// 1) Find the primary Flocker UUID
 | 
						|
	// Note: it could be cached, but doing this query we health check it
 | 
						|
	if options.Primary == "" {
 | 
						|
		options.Primary, err = c.GetPrimaryUUID()
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if options.MaximumSize == 0 {
 | 
						|
		options.MaximumSize, _ = c.maximumSize.Int64()
 | 
						|
	}
 | 
						|
 | 
						|
	resp, err := c.post(c.getURL("configuration/datasets"), options)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	defer resp.Body.Close()
 | 
						|
 | 
						|
	// 2) Return if the dataset was previously created
 | 
						|
	if resp.StatusCode == http.StatusConflict {
 | 
						|
		return nil, errVolumeAlreadyExists
 | 
						|
	}
 | 
						|
 | 
						|
	if resp.StatusCode >= 300 {
 | 
						|
		return nil, fmt.Errorf("Expected: {1,2}xx creating the volume, got: %d", resp.StatusCode)
 | 
						|
	}
 | 
						|
 | 
						|
	var p configurationPayload
 | 
						|
	if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// 3) Wait until the dataset is ready for usage. In case it never gets
 | 
						|
	// ready there is a timeoutChan that will return an error
 | 
						|
	timeoutChan := time.NewTimer(timeoutWaitingForVolume).C
 | 
						|
	tickChan := time.NewTicker(tickerWaitingForVolume).C
 | 
						|
 | 
						|
	for {
 | 
						|
		var strErrDel string
 | 
						|
		s, err := c.GetDatasetState(p.DatasetID)
 | 
						|
		if err == nil {
 | 
						|
			return s, nil
 | 
						|
		} else if err != errStateNotFound {
 | 
						|
			errDel := c.DeleteDataset(p.DatasetID)
 | 
						|
			if errDel != nil {
 | 
						|
				strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
 | 
						|
			}
 | 
						|
			return nil, fmt.Errorf("Flocker API error during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
 | 
						|
		}
 | 
						|
 | 
						|
		select {
 | 
						|
		case <-timeoutChan:
 | 
						|
			errDel := c.DeleteDataset(p.DatasetID)
 | 
						|
			if errDel != nil {
 | 
						|
				strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
 | 
						|
			}
 | 
						|
			return nil, fmt.Errorf("Flocker API timeout during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
 | 
						|
		case <-tickChan:
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// UpdatePrimaryForDataset will update the Primary for the given dataset
 | 
						|
// returning the current DatasetState.
 | 
						|
func (c Client) UpdatePrimaryForDataset(newPrimaryUUID, datasetID string) (*DatasetState, error) {
 | 
						|
	payload := struct {
 | 
						|
		Primary string `json:"primary"`
 | 
						|
	}{
 | 
						|
		Primary: newPrimaryUUID,
 | 
						|
	}
 | 
						|
 | 
						|
	url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
 | 
						|
	resp, err := c.post(url, payload)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	defer resp.Body.Close()
 | 
						|
 | 
						|
	if resp.StatusCode >= 300 {
 | 
						|
		return nil, errUpdatingDataset
 | 
						|
	}
 | 
						|
 | 
						|
	var s DatasetState
 | 
						|
	if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &s, nil
 | 
						|
}
 | 
						|
 | 
						|
// GetDatasetID will return the DatasetID found for the given metadata name.
 | 
						|
func (c Client) GetDatasetID(metaName string) (datasetID string, err error) {
 | 
						|
	resp, err := c.get(c.getURL("configuration/datasets"))
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	defer resp.Body.Close()
 | 
						|
 | 
						|
	var configurations []configurationPayload
 | 
						|
	if err = json.NewDecoder(resp.Body).Decode(&configurations); err == nil {
 | 
						|
		for _, c := range configurations {
 | 
						|
			if c.Metadata.Name == metaName && c.Deleted == false {
 | 
						|
				return c.DatasetID, nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return "", errConfigurationNotFound
 | 
						|
	}
 | 
						|
	return "", err
 | 
						|
}
 |