Merge pull request #31005 from simonswine/feature-flocker-dyn-provisioning

Automatic merge from submit-queue

Dynamic provisioning for flocker volume plugin

Refactor flocker volume plugin
* [x] Support provisioning beta (#29006)
* [x] Support deletion
* [x] Use bind mounts instead of /flocker in containers

* [x] support ownership management or SELinux relabeling.
* [x] adds volume specification via datasetUUID (this is guranted to be unique)

I based my refactor work to replicate pretty much GCE-PD behaviour 

**Related issues**: #29006 #26908

@jsafrane @mattbates @wallrj @wallnerryan
This commit is contained in:
Kubernetes Submit Queue
2016-09-28 01:46:43 -07:00
committed by GitHub
37 changed files with 38059 additions and 37053 deletions

16
vendor/github.com/clusterhq/flocker-go/circle.yml generated vendored Normal file
View File

@@ -0,0 +1,16 @@
machine:
timezone:
America/Los_Angeles
# Output the test output to circle.
test:
pre:
- go get -u github.com/jstemmer/go-junit-report
override:
- go test -coverprofile=coverage.out -v -race ./... > test.out
- cat test.out | go-junit-report > report.xml
- go tool cover -func=coverage.out
post:
- mv test.out $CIRCLE_ARTIFACTS/
- mv report.xml $CIRCLE_TEST_REPORTS/

View File

@@ -34,12 +34,15 @@ var (
// Clientable exposes the needed methods to implement your own Flocker Client.
type Clientable interface {
CreateDataset(metaName string) (*DatasetState, error)
CreateDataset(options *CreateDatasetOptions) (*DatasetState, error)
DeleteDataset(datasetID string) error
GetDatasetState(datasetID string) (*DatasetState, error)
GetDatasetID(metaName string) (datasetID string, err error)
GetPrimaryUUID() (primaryUUID string, err error)
ListNodes() (nodes []NodeState, err error)
UpdatePrimaryForDataset(primaryUUID, datasetID string) (*DatasetState, error)
}
@@ -57,6 +60,8 @@ type Client struct {
maximumSize json.Number
}
var _ Clientable = &Client{}
// NewClient creates a wrapper over http.Client to communicate with the flocker control service.
func NewClient(host string, port int, clientIP string, caCertPath, keyPath, certPath string) (*Client, error) {
client, err := newTLSClient(caCertPath, keyPath, certPath)
@@ -110,6 +115,11 @@ func (c Client) post(url string, payload interface{}) (*http.Response, error) {
return c.request("POST", url, payload)
}
// delete performs a delete request with the indicated payload
func (c Client) delete(url string, payload interface{}) (*http.Response, error) {
return c.request("DELETE", url, payload)
}
// get performs a get request
func (c Client) get(url string) (*http.Response, error) {
return c.request("GET", url, nil)
@@ -128,6 +138,13 @@ type configurationPayload struct {
Metadata metadataPayload `json:"metadata,omitempty"`
}
type CreateDatasetOptions struct {
Primary string `json:"primary"`
DatasetID string `json:"dataset_id,omitempty"`
MaximumSize int64 `json:"maximum_size,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
type metadataPayload struct {
Name string `json:"name,omitempty"`
}
@@ -143,7 +160,7 @@ type datasetStatePayload struct {
*DatasetState
}
type nodeStatePayload struct {
type NodeState struct {
UUID string `json:"uuid"`
Host string `json:"host"`
}
@@ -163,25 +180,54 @@ func (c Client) findIDInConfigurationsPayload(body io.ReadCloser, name string) (
return "", err
}
// ListNodes returns a list of dataset agent nodes from Flocker Control Service
func (c *Client) ListNodes() (nodes []NodeState, err error) {
resp, err := c.get(c.getURL("state/nodes"))
if err != nil {
return []NodeState{}, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return []NodeState{}, fmt.Errorf("Expected: {1,2}xx listing nodes, got: %d", resp.StatusCode)
}
err = json.NewDecoder(resp.Body).Decode(&nodes)
if err != nil {
return []NodeState{}, err
}
return nodes, err
}
// GetPrimaryUUID returns the UUID of the primary Flocker Control Service for
// the given host.
func (c Client) GetPrimaryUUID() (uuid string, err error) {
resp, err := c.get(c.getURL("state/nodes"))
states, err := c.ListNodes()
if err != nil {
return "", err
}
for _, s := range states {
if s.Host == c.clientIP {
return s.UUID, nil
}
}
return "", fmt.Errorf("No node found with IP '%s', available nodes %+v", c.clientIP, states)
}
// DeleteDataset performs a delete request to the given datasetID
func (c *Client) DeleteDataset(datasetID string) error {
url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID))
resp, err := c.delete(url, nil)
if err != nil {
return err
}
defer resp.Body.Close()
var states []nodeStatePayload
if err = json.NewDecoder(resp.Body).Decode(&states); err == nil {
for _, s := range states {
if s.Host == c.clientIP {
return s.UUID, nil
}
}
return "", errStateNotFound
if resp.StatusCode >= 300 {
return fmt.Errorf("Expected: {1,2}xx deleting the dataset %s, got: %d", datasetID, resp.StatusCode)
}
return "", err
return nil
}
// GetDatasetState performs a get request to get the state of the given datasetID, if
@@ -213,34 +259,30 @@ returns the dataset id.
This process is a little bit complex but follows this flow:
1. Find the Flocker Control Service UUID
2. Try to create the dataset
3. If it already exists an error is returned
4. If it didn't previously exist, wait for it to be ready
2. If it already exists an error is returned
3. If it didn't previously exist, wait for it to be ready
*/
func (c Client) CreateDataset(metaName string) (*DatasetState, error) {
func (c *Client) CreateDataset(options *CreateDatasetOptions) (datasetState *DatasetState, err error) {
// 1) Find the primary Flocker UUID
// Note: it could be cached, but doing this query we health check it
primary, err := c.GetPrimaryUUID()
if err != nil {
return nil, err
if options.Primary == "" {
options.Primary, err = c.GetPrimaryUUID()
if err != nil {
return nil, err
}
}
// 2) Try to create the dataset in the given Primary
payload := configurationPayload{
Primary: primary,
MaximumSize: json.Number(c.maximumSize),
Metadata: metadataPayload{
Name: metaName,
},
if options.MaximumSize == 0 {
options.MaximumSize, _ = c.maximumSize.Int64()
}
resp, err := c.post(c.getURL("configuration/datasets"), payload)
resp, err := c.post(c.getURL("configuration/datasets"), options)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// 3) Return if the dataset was previously created
// 2) Return if the dataset was previously created
if resp.StatusCode == http.StatusConflict {
return nil, errVolumeAlreadyExists
}
@@ -254,21 +296,31 @@ func (c Client) CreateDataset(metaName string) (*DatasetState, error) {
return nil, err
}
// 4) Wait until the dataset is ready for usage. In case it never gets
// 3) Wait until the dataset is ready for usage. In case it never gets
// ready there is a timeoutChan that will return an error
timeoutChan := time.NewTimer(timeoutWaitingForVolume).C
tickChan := time.NewTicker(tickerWaitingForVolume).C
for {
if s, err := c.GetDatasetState(p.DatasetID); err == nil {
var strErrDel string
s, err := c.GetDatasetState(p.DatasetID)
if err == nil {
return s, nil
} else if err != errStateNotFound {
return nil, err
errDel := c.DeleteDataset(p.DatasetID)
if errDel != nil {
strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
}
return nil, fmt.Errorf("Flocker API error during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
}
select {
case <-timeoutChan:
return nil, err
errDel := c.DeleteDataset(p.DatasetID)
if errDel != nil {
strErrDel = fmt.Sprintf(", deletion of dataset failed with %s", errDel)
}
return nil, fmt.Errorf("Flocker API timeout during dataset creation (datasetID %s): %s%s", p.DatasetID, err, strErrDel)
case <-tickChan:
break
}