mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-26 11:07:45 +00:00 
			
		
		
		
	Supports additional options for CreateDataset and DeleteDataset for dynamic provisioning. Bugfix for timeouts during CreateDataset
		
			
				
	
	
		
			377 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package flocker
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // From https://github.com/ClusterHQ/flocker-docker-plugin/blob/master/flockerdockerplugin/adapter.py#L18
 | |
| const defaultVolumeSize = json.Number("107374182400")
 | |
| 
 | |
| var (
 | |
| 	// A volume can take a long time to be available, if we don't want
 | |
| 	// Kubernetes to wait forever we need to stop trying after some time, that
 | |
| 	// time is defined here
 | |
| 	timeoutWaitingForVolume = 2 * time.Minute
 | |
| 	tickerWaitingForVolume  = 5 * time.Second
 | |
| 
 | |
| 	errStateNotFound         = errors.New("State not found by Dataset ID")
 | |
| 	errConfigurationNotFound = errors.New("Configuration not found by Name")
 | |
| 
 | |
| 	errFlockerControlServiceHost = errors.New("The volume config must have a key CONTROL_SERVICE_HOST defined in the OtherAttributes field")
 | |
| 	errFlockerControlServicePort = errors.New("The volume config must have a key CONTROL_SERVICE_PORT defined in the OtherAttributes field")
 | |
| 
 | |
| 	errVolumeAlreadyExists = errors.New("The volume already exists")
 | |
| 	errVolumeDoesNotExist  = errors.New("The volume does not exist")
 | |
| 
 | |
| 	errUpdatingDataset = errors.New("It was impossible to update the dataset")
 | |
| )
 | |
| 
 | |
| // Clientable exposes the needed methods to implement your own Flocker Client.
 | |
| type Clientable interface {
 | |
| 	CreateDataset(options *CreateDatasetOptions) (*DatasetState, error)
 | |
| 	DeleteDataset(datasetID string) error
 | |
| 
 | |
| 	GetDatasetState(datasetID string) (*DatasetState, error)
 | |
| 	GetDatasetID(metaName string) (datasetID string, err error)
 | |
| 	GetPrimaryUUID() (primaryUUID string, err error)
 | |
| 
 | |
| 	ListNodes() (nodes []NodeState, err error)
 | |
| 
 | |
| 	UpdatePrimaryForDataset(primaryUUID, datasetID string) (*DatasetState, error)
 | |
| }
 | |
| 
 | |
| // Client is a default Flocker Client.
 | |
| type Client struct {
 | |
| 	*http.Client
 | |
| 
 | |
| 	schema  string
 | |
| 	host    string
 | |
| 	port    int
 | |
| 	version string
 | |
| 
 | |
| 	clientIP string
 | |
| 
 | |
| 	maximumSize json.Number
 | |
| }
 | |
| 
 | |
| var _ Clientable = &Client{}
 | |
| 
 | |
| // NewClient creates a wrapper over http.Client to communicate with the flocker control service.
 | |
| func NewClient(host string, port int, clientIP string, caCertPath, keyPath, certPath string) (*Client, error) {
 | |
| 	client, err := newTLSClient(caCertPath, keyPath, certPath)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &Client{
 | |
| 		Client:      client,
 | |
| 		schema:      "https",
 | |
| 		host:        host,
 | |
| 		port:        port,
 | |
| 		version:     "v1",
 | |
| 		maximumSize: defaultVolumeSize,
 | |
| 		clientIP:    clientIP,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| /*
 | |
| request do a request using the http.Client embedded to the control service
 | |
| and returns the response or an error in case it happens.
 | |
| 
 | |
| Note: you will need to deal with the response body call to Close if you
 | |
| don't want to deal with problems later.
 | |
| */
 | |
| func (c Client) request(method, url string, payload interface{}) (*http.Response, error) {
 | |
| 	var (
 | |
| 		b   []byte
 | |
| 		err error
 | |
| 	)
 | |
| 
 | |
| 	if method == "POST" { // Just allow payload on POST
 | |
| 		b, err = json.Marshal(payload)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	req, err := http.NewRequest(method, url, bytes.NewBuffer(b))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	req.Header.Set("Content-Type", "application/json")
 | |
| 
 | |
| 	// REMEMBER TO CLOSE THE BODY IN THE OUTSIDE FUNCTION
 | |
| 	return c.Do(req)
 | |
| }
 | |
| 
 | |
| // post performs a post request with the indicated payload
 | |
| func (c Client) post(url string, payload interface{}) (*http.Response, error) {
 | |
| 	return c.request("POST", url, payload)
 | |
| }
 | |
| 
 | |
| // delete performs a delete request with the indicated payload
 | |
| func (c Client) delete(url string, payload interface{}) (*http.Response, error) {
 | |
| 	return c.request("DELETE", url, payload)
 | |
| }
 | |
| 
 | |
| // get performs a get request
 | |
| func (c Client) get(url string) (*http.Response, error) {
 | |
| 	return c.request("GET", url, nil)
 | |
| }
 | |
| 
 | |
| // getURL returns a full URI to the control service
 | |
| func (c Client) getURL(path string) string {
 | |
| 	return fmt.Sprintf("%s://%s:%d/%s/%s", c.schema, c.host, c.port, c.version, path)
 | |
| }
 | |
| 
 | |
| type configurationPayload struct {
 | |
| 	Deleted     bool            `json:"deleted"`
 | |
| 	Primary     string          `json:"primary"`
 | |
| 	DatasetID   string          `json:"dataset_id,omitempty"`
 | |
| 	MaximumSize json.Number     `json:"maximum_size,omitempty"`
 | |
| 	Metadata    metadataPayload `json:"metadata,omitempty"`
 | |
| }
 | |
| 
 | |
| type CreateDatasetOptions struct {
 | |
| 	Primary     string            `json:"primary"`
 | |
| 	DatasetID   string            `json:"dataset_id,omitempty"`
 | |
| 	MaximumSize int64             `json:"maximum_size,omitempty"`
 | |
| 	Metadata    map[string]string `json:"metadata,omitempty"`
 | |
| }
 | |
| 
 | |
| type metadataPayload struct {
 | |
| 	Name string `json:"name,omitempty"`
 | |
| }
 | |
| 
 | |
| type DatasetState struct {
 | |
| 	Path        string      `json:"path"`
 | |
| 	DatasetID   string      `json:"dataset_id"`
 | |
| 	Primary     string      `json:"primary,omitempty"`
 | |
| 	MaximumSize json.Number `json:"maximum_size,omitempty"`
 | |
| }
 | |
| 
 | |
| type datasetStatePayload struct {
 | |
| 	*DatasetState
 | |
| }
 | |
| 
 | |
| type NodeState struct {
 | |
| 	UUID string `json:"uuid"`
 | |
| 	Host string `json:"host"`
 | |
| }
 | |
| 
 | |
| // findIDInConfigurationsPayload returns the datasetID if it was found in the
 | |
| // configurations payload, otherwise it will return an error.
 | |
| func (c Client) findIDInConfigurationsPayload(body io.ReadCloser, name string) (datasetID string, err error) {
 | |
| 	var configurations []configurationPayload
 | |
| 	if err = json.NewDecoder(body).Decode(&configurations); err == nil {
 | |
| 		for _, r := range configurations {
 | |
| 			if r.Metadata.Name == name {
 | |
| 				return r.DatasetID, nil
 | |
| 			}
 | |
| 		}
 | |
| 		return "", errConfigurationNotFound
 | |
| 	}
 | |
| 	return "", err
 | |
| }
 | |
| 
 | |
| // ListNodes returns a list of dataset agent nodes from Flocker Control Service
 | |
| func (c *Client) ListNodes() (nodes []NodeState, err error) {
 | |
| 	resp, err := c.get(c.getURL("state/nodes"))
 | |
| 	if err != nil {
 | |
| 		return []NodeState{}, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 	if resp.StatusCode >= 300 {
 | |
| 		return []NodeState{}, fmt.Errorf("Expected: {1,2}xx listing nodes, got: %d", resp.StatusCode)
 | |
| 	}
 | |
| 
 | |
| 	err = json.NewDecoder(resp.Body).Decode(&nodes)
 | |
| 	if err != nil {
 | |
| 		return []NodeState{}, err
 | |
| 	}
 | |
| 	return nodes, err
 | |
| }
 | |
| 
 | |
| // GetPrimaryUUID returns the UUID of the primary Flocker Control Service for
 | |
| // the given host.
 | |
| func (c Client) GetPrimaryUUID() (uuid string, err error) {
 | |
| 	states, err := c.ListNodes()
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	for _, s := range states {
 | |
| 		if s.Host == c.clientIP {
 | |
| 			return s.UUID, nil
 | |
| 		}
 | |
| 	}
 | |
| 	return "", fmt.Errorf("No node found with IP '%s', available nodes %+v", c.clientIP, states)
 | |
| }
 | |
| 
 | |
| // DeleteDataset performs a delete request to the given datasetID
 | |
| func (c *Client) DeleteDataset(datasetID string) error {
 | |
| 	url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
 | |
| 	resp, err := c.delete(url, nil)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	if resp.StatusCode >= 300 {
 | |
| 		return fmt.Errorf("Expected: {1,2}xx deleting the dataset %s, got: %d", datasetID, resp.StatusCode)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetDatasetState performs a get request to get the state of the given datasetID, if
 | |
| // something goes wrong or the datasetID was not found it returns an error.
 | |
| func (c Client) GetDatasetState(datasetID string) (*DatasetState, error) {
 | |
| 	resp, err := c.get(c.getURL("state/datasets"))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	var states []datasetStatePayload
 | |
| 	if err = json.NewDecoder(resp.Body).Decode(&states); err == nil {
 | |
| 		for _, s := range states {
 | |
| 			if s.DatasetID == datasetID {
 | |
| 				return s.DatasetState, nil
 | |
| 			}
 | |
| 		}
 | |
| 		return nil, errStateNotFound
 | |
| 	}
 | |
| 
 | |
| 	return nil, err
 | |
| }
 | |
| 
 | |
| /*
 | |
| CreateDataset creates a volume in Flocker, waits for it to be ready and
 | |
| returns the dataset id.
 | |
| 
 | |
| This process is a little bit complex but follows this flow:
 | |
| 
 | |
| 1. Find the Flocker Control Service UUID
 | |
| 2. If it already exists an error is returned
 | |
| 3. If it didn't previously exist, wait for it to be ready
 | |
| */
 | |
| func (c *Client) CreateDataset(options *CreateDatasetOptions) (datasetState *DatasetState, err error) {
 | |
| 	// 1) Find the primary Flocker UUID
 | |
| 	// Note: it could be cached, but doing this query we health check it
 | |
| 	if options.Primary == "" {
 | |
| 		options.Primary, err = c.GetPrimaryUUID()
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if options.MaximumSize == 0 {
 | |
| 		options.MaximumSize, _ = c.maximumSize.Int64()
 | |
| 	}
 | |
| 
 | |
| 	resp, err := c.post(c.getURL("configuration/datasets"), options)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	// 2) Return if the dataset was previously created
 | |
| 	if resp.StatusCode == http.StatusConflict {
 | |
| 		return nil, errVolumeAlreadyExists
 | |
| 	}
 | |
| 
 | |
| 	if resp.StatusCode >= 300 {
 | |
| 		return nil, fmt.Errorf("Expected: {1,2}xx creating the volume, got: %d", resp.StatusCode)
 | |
| 	}
 | |
| 
 | |
| 	var p configurationPayload
 | |
| 	if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// 3) Wait until the dataset is ready for usage. In case it never gets
 | |
| 	// ready there is a timeoutChan that will return an error
 | |
| 	timeoutChan := time.NewTimer(timeoutWaitingForVolume).C
 | |
| 	tickChan := time.NewTicker(tickerWaitingForVolume).C
 | |
| 
 | |
| 	for {
 | |
| 		var strErrDel string
 | |
| 		s, err := c.GetDatasetState(p.DatasetID)
 | |
| 		if err == nil {
 | |
| 			return s, nil
 | |
| 		} else if err != errStateNotFound {
 | |
| 			errDel := c.DeleteDataset(p.DatasetID)
 | |
| 			if errDel != nil {
 | |
| 				strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
 | |
| 			}
 | |
| 			return nil, fmt.Errorf("Flocker API error during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-timeoutChan:
 | |
| 			errDel := c.DeleteDataset(p.DatasetID)
 | |
| 			if errDel != nil {
 | |
| 				strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
 | |
| 			}
 | |
| 			return nil, fmt.Errorf("Flocker API timeout during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
 | |
| 		case <-tickChan:
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // UpdatePrimaryForDataset will update the Primary for the given dataset
 | |
| // returning the current DatasetState.
 | |
| func (c Client) UpdatePrimaryForDataset(newPrimaryUUID, datasetID string) (*DatasetState, error) {
 | |
| 	payload := struct {
 | |
| 		Primary string `json:"primary"`
 | |
| 	}{
 | |
| 		Primary: newPrimaryUUID,
 | |
| 	}
 | |
| 
 | |
| 	url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
 | |
| 	resp, err := c.post(url, payload)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	if resp.StatusCode >= 300 {
 | |
| 		return nil, errUpdatingDataset
 | |
| 	}
 | |
| 
 | |
| 	var s DatasetState
 | |
| 	if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &s, nil
 | |
| }
 | |
| 
 | |
| // GetDatasetID will return the DatasetID found for the given metadata name.
 | |
| func (c Client) GetDatasetID(metaName string) (datasetID string, err error) {
 | |
| 	resp, err := c.get(c.getURL("configuration/datasets"))
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	defer resp.Body.Close()
 | |
| 
 | |
| 	var configurations []configurationPayload
 | |
| 	if err = json.NewDecoder(resp.Body).Decode(&configurations); err == nil {
 | |
| 		for _, c := range configurations {
 | |
| 			if c.Metadata.Name == metaName && c.Deleted == false {
 | |
| 				return c.DatasetID, nil
 | |
| 			}
 | |
| 		}
 | |
| 		return "", errConfigurationNotFound
 | |
| 	}
 | |
| 	return "", err
 | |
| }
 |