diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 40bd4c7d214..71d907b549c 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -14,6 +14,10 @@ "Comment": "null-5", "Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675" }, + { + "ImportPath": "github.com/ClusterHQ/flocker-go", + "Rev": "3f33ece70f6571f0ec45bfae2f243ab11fab6c52" + }, { "ImportPath": "github.com/Sirupsen/logrus", "Comment": "v0.6.2-10-g51fe59a", diff --git a/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/README.md b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/README.md new file mode 100644 index 00000000000..7c57e11295e --- /dev/null +++ b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/README.md @@ -0,0 +1,18 @@ +flocker-go +========== + +[![circleci](https://circleci.com/gh/ClusterHQ/flocker-go.svg)](https://circleci.com/gh/ClusterHQ/flocker-go) + +flocker-go implements the package `flocker` that will let you easily interact +with a Flocker Control Service. + +What can it do? +--------------- + +You can check the package documentation here: https://godoc.org/github.com/ClusterHQ/flocker-go + +TODO +---- + +- Define a proper interface `flockerClientable` with all the needed methods for + wrapping the Flocker API. diff --git a/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/client.go b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/client.go new file mode 100644 index 00000000000..e054ee121ee --- /dev/null +++ b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/client.go @@ -0,0 +1,323 @@ +package flocker + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" +) + +// From https://github.com/ClusterHQ/flocker-docker-plugin/blob/master/flockerdockerplugin/adapter.py#L18 +const defaultVolumeSize = json.Number("107374182400") + +var ( + // A volume can take a long time to be available, if we don't want + // Kubernetes to wait forever we need to stop trying after some time, that + // time is defined here + timeoutWaitingForVolume = 2 * time.Minute + tickerWaitingForVolume = 5 * time.Second + + errStateNotFound = errors.New("State not found by Dataset ID") + errConfigurationNotFound = errors.New("Configuration not found by Name") + + errFlockerControlServiceHost = errors.New("The volume config must have a key CONTROL_SERVICE_HOST defined in the OtherAttributes field") + errFlockerControlServicePort = errors.New("The volume config must have a key CONTROL_SERVICE_PORT defined in the OtherAttributes field") + + errVolumeAlreadyExists = errors.New("The volume already exists") + errVolumeDoesNotExist = errors.New("The volume does not exist") + + errUpdatingDataset = errors.New("It was impossible to update the dataset") +) + +// Clientable exposes the needed methods to implement your own Flocker Client. +type Clientable interface { + CreateDataset(metaName string) (*DatasetState, error) + + GetDatasetState(datasetID string) (*DatasetState, error) + GetDatasetID(metaName string) (datasetID string, err error) + GetPrimaryUUID() (primaryUUID string, err error) + + UpdatePrimaryForDataset(primaryUUID, datasetID string) (*DatasetState, error) +} + +// Client is a default Flocker Client. +type Client struct { + *http.Client + + schema string + host string + port int + version string + + clientIP string + + maximumSize json.Number +} + +// NewClient creates a wrapper over http.Client to communicate with the flocker control service. +func NewClient(host string, port int, clientIP string, caCertPath, keyPath, certPath string) (*Client, error) { + client, err := newTLSClient(caCertPath, keyPath, certPath) + if err != nil { + return nil, err + } + + return &Client{ + Client: client, + schema: "https", + host: host, + port: port, + version: "v1", + maximumSize: defaultVolumeSize, + clientIP: clientIP, + }, nil +} + +/* +request do a request using the http.Client embedded to the control service +and returns the response or an error in case it happens. + +Note: you will need to deal with the response body call to Close if you +don't want to deal with problems later. +*/ +func (c Client) request(method, url string, payload interface{}) (*http.Response, error) { + var ( + b []byte + err error + ) + + if method == "POST" { // Just allow payload on POST + b, err = json.Marshal(payload) + if err != nil { + return nil, err + } + } + + req, err := http.NewRequest(method, url, bytes.NewBuffer(b)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + // REMEMBER TO CLOSE THE BODY IN THE OUTSIDE FUNCTION + return c.Do(req) +} + +// post performs a post request with the indicated payload +func (c Client) post(url string, payload interface{}) (*http.Response, error) { + return c.request("POST", url, payload) +} + +// get performs a get request +func (c Client) get(url string) (*http.Response, error) { + return c.request("GET", url, nil) +} + +// getURL returns a full URI to the control service +func (c Client) getURL(path string) string { + return fmt.Sprintf("%s://%s:%d/%s/%s", c.schema, c.host, c.port, c.version, path) +} + +type configurationPayload struct { + Primary string `json:"primary"` + DatasetID string `json:"dataset_id,omitempty"` + MaximumSize json.Number `json:"maximum_size,omitempty"` + Metadata metadataPayload `json:"metadata,omitempty"` +} + +type metadataPayload struct { + Name string `json:"name,omitempty"` +} + +type DatasetState struct { + Path string `json:"path"` + DatasetID string `json:"dataset_id"` + Primary string `json:"primary,omitempty"` + MaximumSize json.Number `json:"maximum_size,omitempty"` +} + +type datasetStatePayload struct { + *DatasetState +} + +type nodeStatePayload struct { + UUID string `json:"uuid"` + Host string `json:"host"` +} + +// findIDInConfigurationsPayload returns the datasetID if it was found in the +// configurations payload, otherwise it will return an error. +func (c Client) findIDInConfigurationsPayload(body io.ReadCloser, name string) (datasetID string, err error) { + var configurations []configurationPayload + if err = json.NewDecoder(body).Decode(&configurations); err == nil { + for _, r := range configurations { + if r.Metadata.Name == name { + return r.DatasetID, nil + } + } + return "", errConfigurationNotFound + } + return "", err +} + +// GetPrimaryUUID returns the UUID of the primary Flocker Control Service for +// the given host. +func (c Client) GetPrimaryUUID() (uuid string, err error) { + resp, err := c.get(c.getURL("state/nodes")) + if err != nil { + return "", err + } + defer resp.Body.Close() + + var states []nodeStatePayload + if err = json.NewDecoder(resp.Body).Decode(&states); err == nil { + for _, s := range states { + if s.Host == c.clientIP { + return s.UUID, nil + } + } + return "", errStateNotFound + } + return "", err +} + +// GetDatasetState performs a get request to get the state of the given datasetID, if +// something goes wrong or the datasetID was not found it returns an error. +func (c Client) GetDatasetState(datasetID string) (*DatasetState, error) { + resp, err := c.get(c.getURL("state/datasets")) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var states []datasetStatePayload + if err = json.NewDecoder(resp.Body).Decode(&states); err == nil { + for _, s := range states { + if s.DatasetID == datasetID { + return s.DatasetState, nil + } + } + return nil, errStateNotFound + } + + return nil, err +} + +/* +CreateDataset creates a volume in Flocker, waits for it to be ready and +returns the dataset id. + +This process is a little bit complex but follows this flow: + +1. Find the Flocker Control Service UUID +2. Try to create the dataset +3. If it already exists an error is returned +4. If it didn't previously exist, wait for it to be ready +*/ +func (c Client) CreateDataset(metaName string) (*DatasetState, error) { + // 1) Find the primary Flocker UUID + // Note: it could be cached, but doing this query we health check it + primary, err := c.GetPrimaryUUID() + if err != nil { + return nil, err + } + + // 2) Try to create the dataset in the given Primary + payload := configurationPayload{ + Primary: primary, + MaximumSize: json.Number(c.maximumSize), + Metadata: metadataPayload{ + Name: metaName, + }, + } + + resp, err := c.post(c.getURL("configuration/datasets"), payload) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + // 3) Return if the dataset was previously created + if resp.StatusCode == http.StatusConflict { + return nil, errVolumeAlreadyExists + } + + if resp.StatusCode >= 300 { + return nil, fmt.Errorf("Expected: {1,2}xx creating the volume, got: %d", resp.StatusCode) + } + + var p configurationPayload + if err := json.NewDecoder(resp.Body).Decode(&p); err != nil { + return nil, err + } + + // 4) Wait until the dataset is ready for usage. In case it never gets + // ready there is a timeoutChan that will return an error + timeoutChan := time.NewTimer(timeoutWaitingForVolume).C + tickChan := time.NewTicker(tickerWaitingForVolume).C + + for { + if s, err := c.GetDatasetState(p.DatasetID); err == nil { + return s, nil + } else if err != errStateNotFound { + return nil, err + } + + select { + case <-timeoutChan: + return nil, err + case <-tickChan: + break + } + } +} + +// UpdatePrimaryForDataset will update the Primary for the given dataset +// returning the current DatasetState. +func (c Client) UpdatePrimaryForDataset(newPrimaryUUID, datasetID string) (*DatasetState, error) { + payload := struct { + Primary string `json:"primary"` + }{ + Primary: newPrimaryUUID, + } + + url := c.getURL(fmt.Sprintf("configuration/datasets/%s", datasetID)) + resp, err := c.post(url, payload) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + return nil, errUpdatingDataset + } + + var s DatasetState + if err := json.NewDecoder(resp.Body).Decode(&s); err != nil { + return nil, err + } + + return &s, nil +} + +// GetDatasetID will return the DatasetID found for the given metadata name. +func (c Client) GetDatasetID(metaName string) (datasetID string, err error) { + resp, err := c.get(c.getURL("configuration/datasets")) + if err != nil { + return "", err + } + defer resp.Body.Close() + + var configurations []configurationPayload + if err = json.NewDecoder(resp.Body).Decode(&configurations); err == nil { + for _, c := range configurations { + if c.Metadata.Name == metaName { + return c.DatasetID, nil + } + } + return "", errConfigurationNotFound + } + return "", err +} diff --git a/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/client_test.go b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/client_test.go new file mode 100644 index 00000000000..ef0feaf7e97 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/client_test.go @@ -0,0 +1,316 @@ +package flocker + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "strings" + "testing" + "time" + + "k8s.io/kubernetes/pkg/volume" + + "github.com/stretchr/testify/assert" +) + +func TestMaximumSizeIs1024Multiple(t *testing.T) { + assert := assert.New(t) + + n, err := strconv.Atoi(string(defaultVolumeSize)) + assert.NoError(err) + assert.Equal(0, n%1024) +} + +func TestPost(t *testing.T) { + const ( + expectedPayload = "foobar" + expectedStatusCode = 418 + ) + + assert := assert.New(t) + + type payload struct { + Test string `json:"test"` + } + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var result payload + err := json.NewDecoder(r.Body).Decode(&result) + assert.NoError(err) + assert.Equal(expectedPayload, result.Test) + w.WriteHeader(expectedStatusCode) + })) + defer ts.Close() + + c := Client{Client: &http.Client{}} + + resp, err := c.post(ts.URL, payload{expectedPayload}) + assert.NoError(err) + assert.Equal(expectedStatusCode, resp.StatusCode) +} + +func TestGet(t *testing.T) { + const ( + expectedStatusCode = 418 + ) + + assert := assert.New(t) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(expectedStatusCode) + })) + defer ts.Close() + + c := Client{Client: &http.Client{}} + + resp, err := c.get(ts.URL) + assert.NoError(err) + assert.Equal(expectedStatusCode, resp.StatusCode) +} + +func TestFindIDInConfigurationsPayload(t *testing.T) { + const ( + searchedName = "search-for-this-name" + expected = "The-42-id" + ) + assert := assert.New(t) + + c := Client{} + + payload := fmt.Sprintf( + `[{"dataset_id": "1-2-3", "metadata": {"name": "test"}}, {"dataset_id": "The-42-id", "metadata": {"name": "%s"}}]`, + searchedName, + ) + + id, err := c.findIDInConfigurationsPayload( + ioutil.NopCloser(bytes.NewBufferString(payload)), searchedName, + ) + assert.NoError(err) + assert.Equal(expected, id) + + id, err = c.findIDInConfigurationsPayload( + ioutil.NopCloser(bytes.NewBufferString(payload)), "it will not be found", + ) + assert.Equal(errConfigurationNotFound, err) + + id, err = c.findIDInConfigurationsPayload( + ioutil.NopCloser(bytes.NewBufferString("invalid { json")), "", + ) + assert.Error(err) +} + +func TestFindPrimaryUUID(t *testing.T) { + const expectedPrimary = "primary-uuid" + assert := assert.New(t) + + var ( + mockedHost = "127.0.0.1" + mockedPrimary = expectedPrimary + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal("GET", r.Method) + assert.Equal("/v1/state/nodes", r.URL.Path) + w.Write([]byte(fmt.Sprintf(`[{"host": "%s", "uuid": "%s"}]`, mockedHost, mockedPrimary))) + })) + + host, port, err := getHostAndPortFromTestServer(ts) + assert.NoError(err) + + c := newFlockerTestClient(host, port) + assert.NoError(err) + + mockedPrimary = expectedPrimary + primary, err := c.GetPrimaryUUID() + assert.NoError(err) + assert.Equal(expectedPrimary, primary) + + c.clientIP = "not.found" + _, err = c.GetPrimaryUUID() + assert.Equal(errStateNotFound, err) +} + +func TestGetURL(t *testing.T) { + const ( + expectedHost = "host" + expectedPort = 42 + ) + + assert := assert.New(t) + + c := newFlockerTestClient(expectedHost, expectedPort) + var expectedURL = fmt.Sprintf("%s://%s:%d/v1/test", c.schema, expectedHost, expectedPort) + + url := c.getURL("test") + assert.Equal(expectedURL, url) +} + +func getHostAndPortFromTestServer(ts *httptest.Server) (string, int, error) { + tsURL, err := url.Parse(ts.URL) + if err != nil { + return "", 0, err + } + + hostSplits := strings.Split(tsURL.Host, ":") + + port, err := strconv.Atoi(hostSplits[1]) + if err != nil { + return "", 0, nil + } + return hostSplits[0], port, nil +} + +func getVolumeConfig(host string, port int) volume.VolumeConfig { + return volume.VolumeConfig{ + OtherAttributes: map[string]string{ + "CONTROL_SERVICE_HOST": host, + "CONTROL_SERVICE_PORT": strconv.Itoa(port), + }, + } +} + +func TestHappyPathCreateDatasetFromNonExistent(t *testing.T) { + const ( + expectedDatasetName = "dir" + expectedPrimary = "A-B-C-D" + expectedDatasetID = "datasetID" + ) + expectedPath := fmt.Sprintf("/flocker/%s", expectedDatasetID) + + assert := assert.New(t) + var ( + numCalls int + err error + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + numCalls++ + switch numCalls { + case 1: + assert.Equal("GET", r.Method) + assert.Equal("/v1/state/nodes", r.URL.Path) + w.Write([]byte(fmt.Sprintf(`[{"host": "127.0.0.1", "uuid": "%s"}]`, expectedPrimary))) + case 2: + assert.Equal("POST", r.Method) + assert.Equal("/v1/configuration/datasets", r.URL.Path) + + var c configurationPayload + err := json.NewDecoder(r.Body).Decode(&c) + assert.NoError(err) + assert.Equal(expectedPrimary, c.Primary) + assert.Equal(defaultVolumeSize, c.MaximumSize) + assert.Equal(expectedDatasetName, c.Metadata.Name) + + w.Write([]byte(fmt.Sprintf(`{"dataset_id": "%s"}`, expectedDatasetID))) + case 3: + assert.Equal("GET", r.Method) + assert.Equal("/v1/state/datasets", r.URL.Path) + w.Write([]byte(`[]`)) + case 4: + assert.Equal("GET", r.Method) + assert.Equal("/v1/state/datasets", r.URL.Path) + w.Write([]byte(fmt.Sprintf(`[{"dataset_id": "%s", "path": "/flocker/%s"}]`, expectedDatasetID, expectedDatasetID))) + } + })) + + host, port, err := getHostAndPortFromTestServer(ts) + assert.NoError(err) + + c := newFlockerTestClient(host, port) + assert.NoError(err) + + tickerWaitingForVolume = 1 * time.Millisecond // TODO: this is overriding globally + + s, err := c.CreateDataset(expectedDatasetName) + assert.NoError(err) + assert.Equal(expectedPath, s.Path) +} + +func TestCreateDatasetThatAlreadyExists(t *testing.T) { + const ( + datasetName = "dir" + expectedPrimary = "A-B-C-D" + ) + + assert := assert.New(t) + var numCalls int + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + numCalls++ + switch numCalls { + case 1: + assert.Equal("GET", r.Method) + assert.Equal("/v1/state/nodes", r.URL.Path) + w.Write([]byte(fmt.Sprintf(`[{"host": "127.0.0.1", "uuid": "%s"}]`, expectedPrimary))) + case 2: + assert.Equal("POST", r.Method) + assert.Equal("/v1/configuration/datasets", r.URL.Path) + w.WriteHeader(http.StatusConflict) + } + })) + + host, port, err := getHostAndPortFromTestServer(ts) + assert.NoError(err) + + c := newFlockerTestClient(host, port) + assert.NoError(err) + + _, err = c.CreateDataset(datasetName) + assert.Equal(errVolumeAlreadyExists, err) +} + +func TestUpdatePrimaryForDataset(t *testing.T) { + const ( + dir = "dir" + expectedPrimary = "the-new-primary" + expectedDatasetID = "datasetID" + ) + expectedURL := fmt.Sprintf("/v1/configuration/datasets/%s", expectedDatasetID) + + assert := assert.New(t) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal("POST", r.Method) + assert.Equal(expectedURL, r.URL.Path) + + var c configurationPayload + err := json.NewDecoder(r.Body).Decode(&c) + assert.NoError(err) + + assert.Equal(expectedPrimary, c.Primary) + + w.Write([]byte(fmt.Sprintf(`{"dataset_id": "%s", "path": "just-to-double-check"}`, expectedDatasetID))) + })) + + host, port, err := getHostAndPortFromTestServer(ts) + assert.NoError(err) + + c := newFlockerTestClient(host, port) + assert.NoError(err) + + s, err := c.UpdatePrimaryForDataset(expectedPrimary, expectedDatasetID) + assert.NoError(err) + assert.Equal(expectedDatasetID, s.DatasetID) + assert.NotEqual("", s.Path) +} + +func TestInterfaceIsImplemented(t *testing.T) { + assert.Implements(t, (*Clientable)(nil), Client{}) +} + +func newFlockerTestClient(host string, port int) *Client { + return &Client{ + Client: &http.Client{}, + host: host, + port: port, + version: "v1", + schema: "http", + maximumSize: defaultVolumeSize, + clientIP: "127.0.0.1", + } +} diff --git a/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/doc.go b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/doc.go new file mode 100644 index 00000000000..f3cd05b0196 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/doc.go @@ -0,0 +1,2 @@ +// flocker package allows you to easily interact with a Flocker Control Service. +package flocker diff --git a/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/util.go b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/util.go new file mode 100644 index 00000000000..8322ea8cd78 --- /dev/null +++ b/Godeps/_workspace/src/github.com/ClusterHQ/flocker-go/util.go @@ -0,0 +1,34 @@ +package flocker + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net/http" +) + +// newTLSClient returns a new TLS http client +func newTLSClient(caCertPath, keyPath, certPath string) (*http.Client, error) { + // Client certificate + cert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + return nil, err + } + + // CA certificate + caCert, err := ioutil.ReadFile(caCertPath) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + tlsConfig.BuildNameToCertificate() + transport := &http.Transport{TLSClientConfig: tlsConfig} + + return &http.Client{Transport: transport}, nil +} diff --git a/api/swagger-spec/v1.json b/api/swagger-spec/v1.json index 6657c31eb61..32de6904ade 100644 --- a/api/swagger-spec/v1.json +++ b/api/swagger-spec/v1.json @@ -12186,6 +12186,10 @@ "$ref": "v1.FCVolumeSource", "description": "FC represents a Fibre Channel resource that is attached to a kubelet's host machine and then exposed to the pod." }, + "flocker": { + "$ref": "v1.FlockerVolumeSource", + "description": "Flocker represents a Flocker volume attached to a kubelet's host machine and exposed to the pod for its usage. This depends on the Flocker control service being running" + }, "accessModes": { "type": "array", "items": { @@ -12490,6 +12494,19 @@ } } }, + "v1.FlockerVolumeSource": { + "id": "v1.FlockerVolumeSource", + "description": "FlockerVolumeSource represents a Flocker volume mounted by the Flocker agent.", + "required": [ + "datasetName" + ], + "properties": { + "datasetName": { + "type": "string", + "description": "Required: the volume name. This is going to be store on metadata -\u003e name on the payload for Flocker" + } + } + }, "v1.PersistentVolumeStatus": { "id": "v1.PersistentVolumeStatus", "description": "PersistentVolumeStatus is the current status of a persistent volume.", @@ -12701,6 +12718,10 @@ "$ref": "v1.CephFSVolumeSource", "description": "CephFS represents a Ceph FS mount on the host that shares a pod's lifetime" }, + "flocker": { + "$ref": "v1.FlockerVolumeSource", + "description": "Flocker represents a Flocker volume attached to a kubelet's host machine. This depends on the Flocker control service being running" + }, "downwardAPI": { "$ref": "v1.DownwardAPIVolumeSource", "description": "DownwardAPI represents downward API about the pod that should populate this volume" diff --git a/cmd/kubelet/app/plugins.go b/cmd/kubelet/app/plugins.go index d2be67656ae..de41804db28 100644 --- a/cmd/kubelet/app/plugins.go +++ b/cmd/kubelet/app/plugins.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/volume/downwardapi" "k8s.io/kubernetes/pkg/volume/empty_dir" "k8s.io/kubernetes/pkg/volume/fc" + "k8s.io/kubernetes/pkg/volume/flocker" "k8s.io/kubernetes/pkg/volume/gce_pd" "k8s.io/kubernetes/pkg/volume/git_repo" "k8s.io/kubernetes/pkg/volume/glusterfs" @@ -70,6 +71,7 @@ func ProbeVolumePlugins() []volume.VolumePlugin { allPlugins = append(allPlugins, cephfs.ProbeVolumePlugins()...) allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...) allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...) return allPlugins } diff --git a/docs/user-guide/volumes.md b/docs/user-guide/volumes.md index 38398fc48b5..447e0e6cbde 100644 --- a/docs/user-guide/volumes.md +++ b/docs/user-guide/volumes.md @@ -58,6 +58,7 @@ Familiarity with [pods](pods.md) is suggested. - [AWS EBS Example configuration](#aws-ebs-example-configuration) - [nfs](#nfs) - [iscsi](#iscsi) + - [flocker](#flocker) - [glusterfs](#glusterfs) - [rbd](#rbd) - [gitRepo](#gitrepo) @@ -114,6 +115,7 @@ Kubernetes supports several types of Volumes: * `awsElasticBlockStore` * `nfs` * `iscsi` + * `flocker` * `glusterfs` * `rbd` * `gitRepo` @@ -317,6 +319,21 @@ simultaneous readers allowed. See the [iSCSI example](../../examples/iscsi/) for more details. +### flocker + +[Flocker](https://clusterhq.com/flocker) is an open-source clustered container data volume manager. It provides management +and orchestration of data volumes backed by a variety of storage backends. + +A `flocker` volume allows a Flocker dataset to be mounted into a pod. If the +dataset does not already exist in Flocker, it needs to be created with Flocker +CLI or the using the Flocker API. If the dataset already exists it will +reattached by Flocker to the node that the pod is scheduled. This means data +can be "handed off" between pods as required. + +__Important: You must have your own Flocker installation running before you can use it__ + +See the [Flocker example](../../examples/flocker/) for more details. + ### glusterfs A `glusterfs` volume allows a [Glusterfs](http://www.gluster.org) (an open diff --git a/examples/flocker/README.md b/examples/flocker/README.md new file mode 100644 index 00000000000..3e91b65b54e --- /dev/null +++ b/examples/flocker/README.md @@ -0,0 +1,128 @@ + + + + +WARNING +WARNING +WARNING +WARNING +WARNING + +

PLEASE NOTE: This document applies to the HEAD of the source tree

+ +If you are using a released version of Kubernetes, you should +refer to the docs that go with that version. + + +The latest 1.0.x release of this document can be found +[here](http://releases.k8s.io/release-1.0/examples/flocker/README.md). + +Documentation for other releases can be found at +[releases.k8s.io](http://releases.k8s.io). + +-- + + + + + +## Using Flocker volumes + +[Flocker](https://clusterhq.com/flocker) is an open-source clustered container data volume manager. It provides management +and orchestration of data volumes backed by a variety of storage backends. + +This example provides information about how to set-up a Flocker installation and configure it in Kubernetes, as well as how to use the plugin to use Flocker datasets as volumes in Kubernetes. + +### Prerequisites + +A Flocker cluster is required to use Flocker with Kubernetes. A Flocker cluster comprises: + +- *Flocker Control Service*: provides a REST over HTTP API to modify the desired configuration of the cluster; +- *Flocker Dataset Agent(s)*: a convergence agent that modifies the cluster state to match the desired configuration; +- *Flocker Container Agent(s)*: a convergence agent that modifies the cluster state to match the desired configuration (unused in this configuration but still required in the cluster). + +Read more about the [Flocker Cluster Architecture](https://docs.clusterhq.com/en/latest/concepts/architecture.html) at the [Flocker Documentation](https://docs.clusterhq.com/). + +It is recommended to follow [Installing Flocker](https://docs.clusterhq.com/en/latest/install/index.html) and the instructions below to set-up the Flocker cluster to be used with Kubernetes. + +#### Flocker Control Service + +The Flocker Control Service should be installed manually on a host, . In the future, this may be deployed in pod(s) and exposed as a Kubernetes service. + +#### Flocker Agent(s) + +The Flocker Agents should be manually installed on *all* Kubernetes nodes. These agents are responsible for (de)attachment and (un)mounting and are therefore services that should be run with appropriate privileges on these hosts. + +In order for the plugin to connect to Flocker (via REST API), several environment variables must be specified on *all* Kubernetes nodes. This may be specified in an init script for the node's Kubelet service, for example. + +- `FLOCKER_CONTROL_SERVICE_HOST` should refer to the hostname of the Control Service +- `FLOCKER_CONTROL_SERVICE_PORT` should refer to the port of the Control Service (the API service defaults to 4523 but this must still be specified) + +The following environment variables should refer to keys and certificates on the host that are specific to that host. + +- `FLOCKER_CONTROL_SERVICE_CA_FILE` should refer to the full path to the cluster certificate file +- `FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE` should refer to the full path to the key file for the API user +- `FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE` should refer to the full path to the certificate file for the API user + +More details regarding cluster authentication can be found at the documentation: [Flocker Cluster Security & Authentication](https://docs.clusterhq.com/en/latest/concepts/security.html) and [Configuring Cluster Authentication](https://docs.clusterhq.com/en/latest/config/configuring-authentication.html). + +### Create a pod with a Flocker volume + +**Note**: A new dataset must first be provisioned using the Flocker tools. For example, using the [Volumes CLI](https://docs.clusterhq.com/en/latest/labs/volumes-cli.html)), create a new dataset called 'my-flocker-vol' of size 10GB: + +```sh +flocker-volumes create -m name=my-flocker-vol -s 10G +``` + +The following *volume* spec from the [example pod](flocker-pod.yml) illustrates how to use this Flocker dataset as a volume. + +```yaml + volumes: + - name: www-root + flocker: + datasetName: my-flocker-vol +``` + +- **datasetName** is the unique name for the Flocker dataset and should match the *name* in the metadata. + +Use `kubetctl` to create the pod. + +```sh +$ kubectl create -f examples/flocker/flocker-pod.yml +``` + +You should now verify that the pod is running and determine it's IP address: + +```sh +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +flocker 1/1 Running 0 3m +$ kubectl get pods flocker -t '{{.status.hostIP}}{{"\n"}}' +172.31.25.62 +``` + +An `ls` of the `/flocker` directory on the host (identified by the IP as above) will show the mount point for the volume. + +```sh +$ ls /flocker +0cf8789f-00da-4da0-976a-b6b1dc831159 +``` + +Add an index.html inside this directory and use `curl` to see this HTML file served up by nginx. + +```sh + +$ curl ip + +``` + + + + +[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/flocker/README.md?pixel)]() + diff --git a/examples/flocker/flocker-pod.yml b/examples/flocker/flocker-pod.yml new file mode 100644 index 00000000000..fb923cd49fb --- /dev/null +++ b/examples/flocker/flocker-pod.yml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Pod +metadata: + name: flocker-web +spec: + containers: + - name: web + image: nginx + ports: + - name: web + containerPort: 80 + volumeMounts: + # name must match the volume name below + - name: www-root + mountPath: "/usr/share/nginx/html" + volumes: + - name: www-root + flocker: + datasetName: my-flocker-vol diff --git a/pkg/api/deep_copy_generated.go b/pkg/api/deep_copy_generated.go index 41f8f4ea0a3..64acf98541f 100644 --- a/pkg/api/deep_copy_generated.go +++ b/pkg/api/deep_copy_generated.go @@ -571,6 +571,11 @@ func deepCopy_api_FCVolumeSource(in FCVolumeSource, out *FCVolumeSource, c *conv return nil } +func deepCopy_api_FlockerVolumeSource(in FlockerVolumeSource, out *FlockerVolumeSource, c *conversion.Cloner) error { + out.DatasetName = in.DatasetName + return nil +} + func deepCopy_api_GCEPersistentDiskVolumeSource(in GCEPersistentDiskVolumeSource, out *GCEPersistentDiskVolumeSource, c *conversion.Cloner) error { out.PDName = in.PDName out.FSType = in.FSType @@ -1295,6 +1300,14 @@ func deepCopy_api_PersistentVolumeSource(in PersistentVolumeSource, out *Persist } else { out.FC = nil } + if in.Flocker != nil { + out.Flocker = new(FlockerVolumeSource) + if err := deepCopy_api_FlockerVolumeSource(*in.Flocker, out.Flocker, c); err != nil { + return err + } + } else { + out.Flocker = nil + } return nil } @@ -2211,6 +2224,14 @@ func deepCopy_api_VolumeSource(in VolumeSource, out *VolumeSource, c *conversion } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(FlockerVolumeSource) + if err := deepCopy_api_FlockerVolumeSource(*in.Flocker, out.Flocker, c); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(DownwardAPIVolumeSource) if err := deepCopy_api_DownwardAPIVolumeSource(*in.DownwardAPI, out.DownwardAPI, c); err != nil { @@ -2308,6 +2329,7 @@ func init() { deepCopy_api_EventSource, deepCopy_api_ExecAction, deepCopy_api_FCVolumeSource, + deepCopy_api_FlockerVolumeSource, deepCopy_api_GCEPersistentDiskVolumeSource, deepCopy_api_GitRepoVolumeSource, deepCopy_api_GlusterfsVolumeSource, diff --git a/pkg/api/types.go b/pkg/api/types.go index 858feb2d50d..274ac5614b0 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -204,6 +204,9 @@ type VolumeSource struct { // CephFS represents a Cephfs mount on the host that shares a pod's lifetime CephFS *CephFSVolumeSource `json:"cephfs,omitempty"` + // Flocker represents a Flocker volume attached to a kubelet's host machine. This depends on the Flocker control service being running + Flocker *FlockerVolumeSource `json:"flocker,omitempty"` + // DownwardAPI represents metadata about the pod that should populate this volume DownwardAPI *DownwardAPIVolumeSource `json:"downwardAPI,omitempty"` // FC represents a Fibre Channel resource that is attached to a kubelet's host machine and then exposed to the pod. @@ -239,6 +242,8 @@ type PersistentVolumeSource struct { CephFS *CephFSVolumeSource `json:"cephfs,omitempty"` // FC represents a Fibre Channel resource that is attached to a kubelet's host machine and then exposed to the pod. FC *FCVolumeSource `json:"fc,omitempty"` + // Flocker represents a Flocker volume attached to a kubelet's host machine. This depends on the Flocker control service being running + Flocker *FlockerVolumeSource `json:"flocker,omitempty"` } type PersistentVolumeClaimVolumeSource struct { @@ -592,6 +597,12 @@ type CephFSVolumeSource struct { ReadOnly bool `json:"readOnly,omitempty"` } +// FlockerVolumeSource represents a Flocker volume mounted by the Flocker agent. +type FlockerVolumeSource struct { + // Required: the volume name. This is going to be store on metadata -> name on the payload for Flocker + DatasetName string `json:"datasetName"` +} + // DownwardAPIVolumeSource represents a volume containing downward API info type DownwardAPIVolumeSource struct { // Items is a list of DownwardAPIVolume file diff --git a/pkg/api/v1/conversion_generated.go b/pkg/api/v1/conversion_generated.go index 34549d564af..2969a4cb639 100644 --- a/pkg/api/v1/conversion_generated.go +++ b/pkg/api/v1/conversion_generated.go @@ -790,6 +790,18 @@ func convert_api_FCVolumeSource_To_v1_FCVolumeSource(in *api.FCVolumeSource, out return autoconvert_api_FCVolumeSource_To_v1_FCVolumeSource(in, out, s) } +func autoconvert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in *api.FlockerVolumeSource, out *FlockerVolumeSource, s conversion.Scope) error { + if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { + defaulting.(func(*api.FlockerVolumeSource))(in) + } + out.DatasetName = in.DatasetName + return nil +} + +func convert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in *api.FlockerVolumeSource, out *FlockerVolumeSource, s conversion.Scope) error { + return autoconvert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in, out, s) +} + func autoconvert_api_GCEPersistentDiskVolumeSource_To_v1_GCEPersistentDiskVolumeSource(in *api.GCEPersistentDiskVolumeSource, out *GCEPersistentDiskVolumeSource, s conversion.Scope) error { if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { defaulting.(func(*api.GCEPersistentDiskVolumeSource))(in) @@ -1777,6 +1789,14 @@ func autoconvert_api_PersistentVolumeSource_To_v1_PersistentVolumeSource(in *api } else { out.FC = nil } + if in.Flocker != nil { + out.Flocker = new(FlockerVolumeSource) + if err := convert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in.Flocker, out.Flocker, s); err != nil { + return err + } + } else { + out.Flocker = nil + } return nil } @@ -2984,6 +3004,14 @@ func autoconvert_api_VolumeSource_To_v1_VolumeSource(in *api.VolumeSource, out * } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(FlockerVolumeSource) + if err := convert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in.Flocker, out.Flocker, s); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(DownwardAPIVolumeSource) if err := convert_api_DownwardAPIVolumeSource_To_v1_DownwardAPIVolumeSource(in.DownwardAPI, out.DownwardAPI, s); err != nil { @@ -3771,6 +3799,18 @@ func convert_v1_FCVolumeSource_To_api_FCVolumeSource(in *FCVolumeSource, out *ap return autoconvert_v1_FCVolumeSource_To_api_FCVolumeSource(in, out, s) } +func autoconvert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in *FlockerVolumeSource, out *api.FlockerVolumeSource, s conversion.Scope) error { + if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { + defaulting.(func(*FlockerVolumeSource))(in) + } + out.DatasetName = in.DatasetName + return nil +} + +func convert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in *FlockerVolumeSource, out *api.FlockerVolumeSource, s conversion.Scope) error { + return autoconvert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in, out, s) +} + func autoconvert_v1_GCEPersistentDiskVolumeSource_To_api_GCEPersistentDiskVolumeSource(in *GCEPersistentDiskVolumeSource, out *api.GCEPersistentDiskVolumeSource, s conversion.Scope) error { if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { defaulting.(func(*GCEPersistentDiskVolumeSource))(in) @@ -4758,6 +4798,14 @@ func autoconvert_v1_PersistentVolumeSource_To_api_PersistentVolumeSource(in *Per } else { out.FC = nil } + if in.Flocker != nil { + out.Flocker = new(api.FlockerVolumeSource) + if err := convert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in.Flocker, out.Flocker, s); err != nil { + return err + } + } else { + out.Flocker = nil + } return nil } @@ -5965,6 +6013,14 @@ func autoconvert_v1_VolumeSource_To_api_VolumeSource(in *VolumeSource, out *api. } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(api.FlockerVolumeSource) + if err := convert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in.Flocker, out.Flocker, s); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(api.DownwardAPIVolumeSource) if err := convert_v1_DownwardAPIVolumeSource_To_api_DownwardAPIVolumeSource(in.DownwardAPI, out.DownwardAPI, s); err != nil { @@ -6022,6 +6078,7 @@ func init() { autoconvert_api_Event_To_v1_Event, autoconvert_api_ExecAction_To_v1_ExecAction, autoconvert_api_FCVolumeSource_To_v1_FCVolumeSource, + autoconvert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource, autoconvert_api_GCEPersistentDiskVolumeSource_To_v1_GCEPersistentDiskVolumeSource, autoconvert_api_GitRepoVolumeSource_To_v1_GitRepoVolumeSource, autoconvert_api_GlusterfsVolumeSource_To_v1_GlusterfsVolumeSource, @@ -6139,6 +6196,7 @@ func init() { autoconvert_v1_Event_To_api_Event, autoconvert_v1_ExecAction_To_api_ExecAction, autoconvert_v1_FCVolumeSource_To_api_FCVolumeSource, + autoconvert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource, autoconvert_v1_GCEPersistentDiskVolumeSource_To_api_GCEPersistentDiskVolumeSource, autoconvert_v1_GitRepoVolumeSource_To_api_GitRepoVolumeSource, autoconvert_v1_GlusterfsVolumeSource_To_api_GlusterfsVolumeSource, diff --git a/pkg/api/v1/deep_copy_generated.go b/pkg/api/v1/deep_copy_generated.go index 0d440d4d856..b49972d845b 100644 --- a/pkg/api/v1/deep_copy_generated.go +++ b/pkg/api/v1/deep_copy_generated.go @@ -607,6 +607,11 @@ func deepCopy_v1_FCVolumeSource(in FCVolumeSource, out *FCVolumeSource, c *conve return nil } +func deepCopy_v1_FlockerVolumeSource(in FlockerVolumeSource, out *FlockerVolumeSource, c *conversion.Cloner) error { + out.DatasetName = in.DatasetName + return nil +} + func deepCopy_v1_GCEPersistentDiskVolumeSource(in GCEPersistentDiskVolumeSource, out *GCEPersistentDiskVolumeSource, c *conversion.Cloner) error { out.PDName = in.PDName out.FSType = in.FSType @@ -1315,6 +1320,14 @@ func deepCopy_v1_PersistentVolumeSource(in PersistentVolumeSource, out *Persiste } else { out.FC = nil } + if in.Flocker != nil { + out.Flocker = new(FlockerVolumeSource) + if err := deepCopy_v1_FlockerVolumeSource(*in.Flocker, out.Flocker, c); err != nil { + return err + } + } else { + out.Flocker = nil + } return nil } @@ -2245,6 +2258,14 @@ func deepCopy_v1_VolumeSource(in VolumeSource, out *VolumeSource, c *conversion. } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(FlockerVolumeSource) + if err := deepCopy_v1_FlockerVolumeSource(*in.Flocker, out.Flocker, c); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(DownwardAPIVolumeSource) if err := deepCopy_v1_DownwardAPIVolumeSource(*in.DownwardAPI, out.DownwardAPI, c); err != nil { @@ -2321,6 +2342,7 @@ func init() { deepCopy_v1_EventSource, deepCopy_v1_ExecAction, deepCopy_v1_FCVolumeSource, + deepCopy_v1_FlockerVolumeSource, deepCopy_v1_GCEPersistentDiskVolumeSource, deepCopy_v1_GitRepoVolumeSource, deepCopy_v1_GlusterfsVolumeSource, diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index b177ae78b16..906682be3a5 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -250,6 +250,9 @@ type VolumeSource struct { // CephFS represents a Ceph FS mount on the host that shares a pod's lifetime CephFS *CephFSVolumeSource `json:"cephfs,omitempty"` + // Flocker represents a Flocker volume attached to a kubelet's host machine. This depends on the Flocker control service being running + Flocker *FlockerVolumeSource `json:"flocker,omitempty"` + // DownwardAPI represents downward API about the pod that should populate this volume DownwardAPI *DownwardAPIVolumeSource `json:"downwardAPI,omitempty"` // FC represents a Fibre Channel resource that is attached to a kubelet's host machine and then exposed to the pod. @@ -306,6 +309,8 @@ type PersistentVolumeSource struct { CephFS *CephFSVolumeSource `json:"cephfs,omitempty"` // FC represents a Fibre Channel resource that is attached to a kubelet's host machine and then exposed to the pod. FC *FCVolumeSource `json:"fc,omitempty"` + // Flocker represents a Flocker volume attached to a kubelet's host machine and exposed to the pod for its usage. This depends on the Flocker control service being running + Flocker *FlockerVolumeSource `json:"flocker,omitempty"` } // PersistentVolume (PV) is a storage resource provisioned by an administrator. @@ -589,6 +594,12 @@ type CephFSVolumeSource struct { ReadOnly bool `json:"readOnly,omitempty"` } +// FlockerVolumeSource represents a Flocker volume mounted by the Flocker agent. +type FlockerVolumeSource struct { + // Required: the volume name. This is going to be store on metadata -> name on the payload for Flocker + DatasetName string `json:"datasetName"` +} + const ( StorageMediumDefault StorageMedium = "" // use whatever the default is for the node StorageMediumMemory StorageMedium = "Memory" // use memory (tmpfs) diff --git a/pkg/api/v1/types_swagger_doc_generated.go b/pkg/api/v1/types_swagger_doc_generated.go index 92d4eeaf863..f694d049fe5 100644 --- a/pkg/api/v1/types_swagger_doc_generated.go +++ b/pkg/api/v1/types_swagger_doc_generated.go @@ -389,6 +389,15 @@ func (FCVolumeSource) SwaggerDoc() map[string]string { return map_FCVolumeSource } +var map_FlockerVolumeSource = map[string]string{ + "": "FlockerVolumeSource represents a Flocker volume mounted by the Flocker agent.", + "datasetName": "Required: the volume name. This is going to be store on metadata -> name on the payload for Flocker", +} + +func (FlockerVolumeSource) SwaggerDoc() map[string]string { + return map_FlockerVolumeSource +} + var map_GCEPersistentDiskVolumeSource = map[string]string{ "": "GCEPersistentDiskVolumeSource represents a Persistent Disk resource in Google Compute Engine.\n\nA GCE PD must exist and be formatted before mounting to a container. The disk must also be in the same GCE project and zone as the kubelet. A GCE PD can only be mounted as read/write once.", "pdName": "Unique name of the PD resource in GCE. Used to identify the disk in GCE. More info: http://releases.k8s.io/HEAD/docs/user-guide/volumes.md#gcepersistentdisk", @@ -847,6 +856,7 @@ var map_PersistentVolumeSource = map[string]string{ "cinder": "Cinder represents a cinder volume attached and mounted on kubelets host machine More info: http://releases.k8s.io/HEAD/examples/mysql-cinder-pd/README.md", "cephfs": "CephFS represents a Ceph FS mount on the host that shares a pod's lifetime", "fc": "FC represents a Fibre Channel resource that is attached to a kubelet's host machine and then exposed to the pod.", + "flocker": "Flocker represents a Flocker volume attached to a kubelet's host machine and exposed to the pod for its usage. This depends on the Flocker control service being running", } func (PersistentVolumeSource) SwaggerDoc() map[string]string { @@ -1358,6 +1368,7 @@ var map_VolumeSource = map[string]string{ "rbd": "RBD represents a Rados Block Device mount on the host that shares a pod's lifetime. More info: http://releases.k8s.io/HEAD/examples/rbd/README.md", "cinder": "Cinder represents a cinder volume attached and mounted on kubelets host machine More info: http://releases.k8s.io/HEAD/examples/mysql-cinder-pd/README.md", "cephfs": "CephFS represents a Ceph FS mount on the host that shares a pod's lifetime", + "flocker": "Flocker represents a Flocker volume attached to a kubelet's host machine. This depends on the Flocker control service being running", "downwardAPI": "DownwardAPI represents downward API about the pod that should populate this volume", "fc": "FC represents a Fibre Channel resource that is attached to a kubelet's host machine and then exposed to the pod.", } diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 867fd87a4e1..8c720630db7 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -379,6 +379,10 @@ func validateSource(source *api.VolumeSource) errs.ValidationErrorList { numVolumes++ allErrs = append(allErrs, validateGlusterfs(source.Glusterfs).Prefix("glusterfs")...) } + if source.Flocker != nil { + numVolumes++ + allErrs = append(allErrs, validateFlocker(source.Flocker).Prefix("flocker")...) + } if source.PersistentVolumeClaim != nil { numVolumes++ allErrs = append(allErrs, validatePersistentClaimVolumeSource(source.PersistentVolumeClaim).Prefix("persistentVolumeClaim")...) @@ -531,6 +535,17 @@ func validateGlusterfs(glusterfs *api.GlusterfsVolumeSource) errs.ValidationErro return allErrs } +func validateFlocker(flocker *api.FlockerVolumeSource) errs.ValidationErrorList { + allErrs := errs.ValidationErrorList{} + if flocker.DatasetName == "" { + allErrs = append(allErrs, errs.NewFieldRequired("datasetName")) + } + if strings.Contains(flocker.DatasetName, "/") { + allErrs = append(allErrs, errs.NewFieldInvalid("datasetName", flocker.DatasetName, "must not contain '/'")) + } + return allErrs +} + var validDownwardAPIFieldPathExpressions = sets.NewString("metadata.name", "metadata.namespace", "metadata.labels", "metadata.annotations") func validateDownwardAPIVolumeSource(downwardAPIVolume *api.DownwardAPIVolumeSource) errs.ValidationErrorList { @@ -636,6 +651,10 @@ func ValidatePersistentVolume(pv *api.PersistentVolume) errs.ValidationErrorList numVolumes++ allErrs = append(allErrs, validateGlusterfs(pv.Spec.Glusterfs).Prefix("glusterfs")...) } + if pv.Spec.Flocker != nil { + numVolumes++ + allErrs = append(allErrs, validateFlocker(pv.Spec.Flocker).Prefix("flocker")...) + } if pv.Spec.NFS != nil { numVolumes++ allErrs = append(allErrs, validateNFS(pv.Spec.NFS).Prefix("nfs")...) diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 9b542673b3c..7604cec7821 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -458,6 +458,7 @@ func TestValidateVolumes(t *testing.T) { {Name: "iscsidisk", VolumeSource: api.VolumeSource{ISCSI: &api.ISCSIVolumeSource{TargetPortal: "127.0.0.1", IQN: "iqn.2015-02.example.com:test", Lun: 1, FSType: "ext4", ReadOnly: false}}}, {Name: "secret", VolumeSource: api.VolumeSource{Secret: &api.SecretVolumeSource{SecretName: "my-secret"}}}, {Name: "glusterfs", VolumeSource: api.VolumeSource{Glusterfs: &api.GlusterfsVolumeSource{EndpointsName: "host1", Path: "path", ReadOnly: false}}}, + {Name: "flocker", VolumeSource: api.VolumeSource{Flocker: &api.FlockerVolumeSource{DatasetName: "datasetName"}}}, {Name: "rbd", VolumeSource: api.VolumeSource{RBD: &api.RBDVolumeSource{CephMonitors: []string{"foo"}, RBDImage: "bar", FSType: "ext4"}}}, {Name: "cinder", VolumeSource: api.VolumeSource{Cinder: &api.CinderVolumeSource{"29ea5088-4f60-4757-962e-dba678767887", "ext4", false}}}, {Name: "cephfs", VolumeSource: api.VolumeSource{CephFS: &api.CephFSVolumeSource{Monitors: []string{"foo"}}}}, @@ -501,6 +502,7 @@ func TestValidateVolumes(t *testing.T) { emptyIQN := api.VolumeSource{ISCSI: &api.ISCSIVolumeSource{TargetPortal: "127.0.0.1", IQN: "", Lun: 1, FSType: "ext4", ReadOnly: false}} emptyHosts := api.VolumeSource{Glusterfs: &api.GlusterfsVolumeSource{EndpointsName: "", Path: "path", ReadOnly: false}} emptyPath := api.VolumeSource{Glusterfs: &api.GlusterfsVolumeSource{EndpointsName: "host", Path: "", ReadOnly: false}} + emptyName := api.VolumeSource{Flocker: &api.FlockerVolumeSource{DatasetName: ""}} emptyMon := api.VolumeSource{RBD: &api.RBDVolumeSource{CephMonitors: []string{}, RBDImage: "bar", FSType: "ext4"}} emptyImage := api.VolumeSource{RBD: &api.RBDVolumeSource{CephMonitors: []string{"foo"}, RBDImage: "", FSType: "ext4"}} emptyCephFSMon := api.VolumeSource{CephFS: &api.CephFSVolumeSource{Monitors: []string{}}} @@ -531,30 +533,33 @@ func TestValidateVolumes(t *testing.T) { }} zeroWWN := api.VolumeSource{FC: &api.FCVolumeSource{[]string{}, &lun, "ext4", false}} emptyLun := api.VolumeSource{FC: &api.FCVolumeSource{[]string{"wwn"}, nil, "ext4", false}} + slashInName := api.VolumeSource{Flocker: &api.FlockerVolumeSource{DatasetName: "foo/bar"}} errorCases := map[string]struct { V []api.Volume T errors.ValidationErrorType F string D string }{ - "zero-length name": {[]api.Volume{{Name: "", VolumeSource: emptyVS}}, errors.ValidationErrorTypeRequired, "[0].name", ""}, - "name > 63 characters": {[]api.Volume{{Name: strings.Repeat("a", 64), VolumeSource: emptyVS}}, errors.ValidationErrorTypeInvalid, "[0].name", "must be a DNS label (at most 63 characters, matching regex [a-z0-9]([-a-z0-9]*[a-z0-9])?): e.g. \"my-name\""}, - "name not a DNS label": {[]api.Volume{{Name: "a.b.c", VolumeSource: emptyVS}}, errors.ValidationErrorTypeInvalid, "[0].name", "must be a DNS label (at most 63 characters, matching regex [a-z0-9]([-a-z0-9]*[a-z0-9])?): e.g. \"my-name\""}, - "name not unique": {[]api.Volume{{Name: "abc", VolumeSource: emptyVS}, {Name: "abc", VolumeSource: emptyVS}}, errors.ValidationErrorTypeDuplicate, "[1].name", ""}, - "empty portal": {[]api.Volume{{Name: "badportal", VolumeSource: emptyPortal}}, errors.ValidationErrorTypeRequired, "[0].source.iscsi.targetPortal", ""}, - "empty iqn": {[]api.Volume{{Name: "badiqn", VolumeSource: emptyIQN}}, errors.ValidationErrorTypeRequired, "[0].source.iscsi.iqn", ""}, - "empty hosts": {[]api.Volume{{Name: "badhost", VolumeSource: emptyHosts}}, errors.ValidationErrorTypeRequired, "[0].source.glusterfs.endpoints", ""}, - "empty path": {[]api.Volume{{Name: "badpath", VolumeSource: emptyPath}}, errors.ValidationErrorTypeRequired, "[0].source.glusterfs.path", ""}, - "empty mon": {[]api.Volume{{Name: "badmon", VolumeSource: emptyMon}}, errors.ValidationErrorTypeRequired, "[0].source.rbd.monitors", ""}, - "empty image": {[]api.Volume{{Name: "badimage", VolumeSource: emptyImage}}, errors.ValidationErrorTypeRequired, "[0].source.rbd.image", ""}, - "empty cephfs mon": {[]api.Volume{{Name: "badmon", VolumeSource: emptyCephFSMon}}, errors.ValidationErrorTypeRequired, "[0].source.cephfs.monitors", ""}, - "empty metatada path": {[]api.Volume{{Name: "emptyname", VolumeSource: emptyPathName}}, errors.ValidationErrorTypeRequired, "[0].source.downwardApi.path", ""}, - "absolute path": {[]api.Volume{{Name: "absolutepath", VolumeSource: absolutePathName}}, errors.ValidationErrorTypeForbidden, "[0].source.downwardApi.path", ""}, - "dot dot path": {[]api.Volume{{Name: "dotdotpath", VolumeSource: dotDotInPath}}, errors.ValidationErrorTypeInvalid, "[0].source.downwardApi.path", "must not contain \"..\"."}, - "dot dot file name": {[]api.Volume{{Name: "dotdotfilename", VolumeSource: dotDotPathName}}, errors.ValidationErrorTypeInvalid, "[0].source.downwardApi.path", "must not start with \"..\"."}, - "dot dot first level dirent ": {[]api.Volume{{Name: "dotdotdirfilename", VolumeSource: dotDotFirstLevelDirent}}, errors.ValidationErrorTypeInvalid, "[0].source.downwardApi.path", "must not start with \"..\"."}, - "empty wwn": {[]api.Volume{{Name: "badimage", VolumeSource: zeroWWN}}, errors.ValidationErrorTypeRequired, "[0].source.fc.targetWWNs", ""}, - "empty lun": {[]api.Volume{{Name: "badimage", VolumeSource: emptyLun}}, errors.ValidationErrorTypeRequired, "[0].source.fc.lun", ""}, + "zero-length name": {[]api.Volume{{Name: "", VolumeSource: emptyVS}}, errors.ValidationErrorTypeRequired, "[0].name", ""}, + "name > 63 characters": {[]api.Volume{{Name: strings.Repeat("a", 64), VolumeSource: emptyVS}}, errors.ValidationErrorTypeInvalid, "[0].name", "must be a DNS label (at most 63 characters, matching regex [a-z0-9]([-a-z0-9]*[a-z0-9])?): e.g. \"my-name\""}, + "name not a DNS label": {[]api.Volume{{Name: "a.b.c", VolumeSource: emptyVS}}, errors.ValidationErrorTypeInvalid, "[0].name", "must be a DNS label (at most 63 characters, matching regex [a-z0-9]([-a-z0-9]*[a-z0-9])?): e.g. \"my-name\""}, + "name not unique": {[]api.Volume{{Name: "abc", VolumeSource: emptyVS}, {Name: "abc", VolumeSource: emptyVS}}, errors.ValidationErrorTypeDuplicate, "[1].name", ""}, + "empty portal": {[]api.Volume{{Name: "badportal", VolumeSource: emptyPortal}}, errors.ValidationErrorTypeRequired, "[0].source.iscsi.targetPortal", ""}, + "empty iqn": {[]api.Volume{{Name: "badiqn", VolumeSource: emptyIQN}}, errors.ValidationErrorTypeRequired, "[0].source.iscsi.iqn", ""}, + "empty hosts": {[]api.Volume{{Name: "badhost", VolumeSource: emptyHosts}}, errors.ValidationErrorTypeRequired, "[0].source.glusterfs.endpoints", ""}, + "empty path": {[]api.Volume{{Name: "badpath", VolumeSource: emptyPath}}, errors.ValidationErrorTypeRequired, "[0].source.glusterfs.path", ""}, + "empty datasetName": {[]api.Volume{{Name: "badname", VolumeSource: emptyName}}, errors.ValidationErrorTypeRequired, "[0].source.flocker.datasetName", ""}, + "empty mon": {[]api.Volume{{Name: "badmon", VolumeSource: emptyMon}}, errors.ValidationErrorTypeRequired, "[0].source.rbd.monitors", ""}, + "empty image": {[]api.Volume{{Name: "badimage", VolumeSource: emptyImage}}, errors.ValidationErrorTypeRequired, "[0].source.rbd.image", ""}, + "empty cephfs mon": {[]api.Volume{{Name: "badmon", VolumeSource: emptyCephFSMon}}, errors.ValidationErrorTypeRequired, "[0].source.cephfs.monitors", ""}, + "empty metatada path": {[]api.Volume{{Name: "emptyname", VolumeSource: emptyPathName}}, errors.ValidationErrorTypeRequired, "[0].source.downwardApi.path", ""}, + "absolute path": {[]api.Volume{{Name: "absolutepath", VolumeSource: absolutePathName}}, errors.ValidationErrorTypeForbidden, "[0].source.downwardApi.path", ""}, + "dot dot path": {[]api.Volume{{Name: "dotdotpath", VolumeSource: dotDotInPath}}, errors.ValidationErrorTypeInvalid, "[0].source.downwardApi.path", "must not contain \"..\"."}, + "dot dot file name": {[]api.Volume{{Name: "dotdotfilename", VolumeSource: dotDotPathName}}, errors.ValidationErrorTypeInvalid, "[0].source.downwardApi.path", "must not start with \"..\"."}, + "dot dot first level dirent": {[]api.Volume{{Name: "dotdotdirfilename", VolumeSource: dotDotFirstLevelDirent}}, errors.ValidationErrorTypeInvalid, "[0].source.downwardApi.path", "must not start with \"..\"."}, + "empty wwn": {[]api.Volume{{Name: "badimage", VolumeSource: zeroWWN}}, errors.ValidationErrorTypeRequired, "[0].source.fc.targetWWNs", ""}, + "empty lun": {[]api.Volume{{Name: "badimage", VolumeSource: emptyLun}}, errors.ValidationErrorTypeRequired, "[0].source.fc.lun", ""}, + "slash in datasetName": {[]api.Volume{{Name: "slashinname", VolumeSource: slashInName}}, errors.ValidationErrorTypeInvalid, "[0].source.flocker.datasetName", "must not contain '/'"}, } for k, v := range errorCases { _, errs := validateVolumes(v.V) diff --git a/pkg/apis/experimental/deep_copy_generated.go b/pkg/apis/experimental/deep_copy_generated.go index 147b393d0b4..056784d5384 100644 --- a/pkg/apis/experimental/deep_copy_generated.go +++ b/pkg/apis/experimental/deep_copy_generated.go @@ -273,6 +273,11 @@ func deepCopy_api_FCVolumeSource(in api.FCVolumeSource, out *api.FCVolumeSource, return nil } +func deepCopy_api_FlockerVolumeSource(in api.FlockerVolumeSource, out *api.FlockerVolumeSource, c *conversion.Cloner) error { + out.DatasetName = in.DatasetName + return nil +} + func deepCopy_api_GCEPersistentDiskVolumeSource(in api.GCEPersistentDiskVolumeSource, out *api.GCEPersistentDiskVolumeSource, c *conversion.Cloner) error { out.PDName = in.PDName out.FSType = in.FSType @@ -761,6 +766,14 @@ func deepCopy_api_VolumeSource(in api.VolumeSource, out *api.VolumeSource, c *co } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(api.FlockerVolumeSource) + if err := deepCopy_api_FlockerVolumeSource(*in.Flocker, out.Flocker, c); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(api.DownwardAPIVolumeSource) if err := deepCopy_api_DownwardAPIVolumeSource(*in.DownwardAPI, out.DownwardAPI, c); err != nil { @@ -1441,6 +1454,7 @@ func init() { deepCopy_api_EnvVarSource, deepCopy_api_ExecAction, deepCopy_api_FCVolumeSource, + deepCopy_api_FlockerVolumeSource, deepCopy_api_GCEPersistentDiskVolumeSource, deepCopy_api_GitRepoVolumeSource, deepCopy_api_GlusterfsVolumeSource, diff --git a/pkg/apis/experimental/v1alpha1/conversion_generated.go b/pkg/apis/experimental/v1alpha1/conversion_generated.go index e6824ac3298..463c91237aa 100644 --- a/pkg/apis/experimental/v1alpha1/conversion_generated.go +++ b/pkg/apis/experimental/v1alpha1/conversion_generated.go @@ -363,6 +363,18 @@ func convert_api_FCVolumeSource_To_v1_FCVolumeSource(in *api.FCVolumeSource, out return autoconvert_api_FCVolumeSource_To_v1_FCVolumeSource(in, out, s) } +func autoconvert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in *api.FlockerVolumeSource, out *v1.FlockerVolumeSource, s conversion.Scope) error { + if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { + defaulting.(func(*api.FlockerVolumeSource))(in) + } + out.DatasetName = in.DatasetName + return nil +} + +func convert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in *api.FlockerVolumeSource, out *v1.FlockerVolumeSource, s conversion.Scope) error { + return autoconvert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in, out, s) +} + func autoconvert_api_GCEPersistentDiskVolumeSource_To_v1_GCEPersistentDiskVolumeSource(in *api.GCEPersistentDiskVolumeSource, out *v1.GCEPersistentDiskVolumeSource, s conversion.Scope) error { if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { defaulting.(func(*api.GCEPersistentDiskVolumeSource))(in) @@ -1031,6 +1043,14 @@ func autoconvert_api_VolumeSource_To_v1_VolumeSource(in *api.VolumeSource, out * } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(v1.FlockerVolumeSource) + if err := convert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource(in.Flocker, out.Flocker, s); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(v1.DownwardAPIVolumeSource) if err := convert_api_DownwardAPIVolumeSource_To_v1_DownwardAPIVolumeSource(in.DownwardAPI, out.DownwardAPI, s); err != nil { @@ -1389,6 +1409,18 @@ func convert_v1_FCVolumeSource_To_api_FCVolumeSource(in *v1.FCVolumeSource, out return autoconvert_v1_FCVolumeSource_To_api_FCVolumeSource(in, out, s) } +func autoconvert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in *v1.FlockerVolumeSource, out *api.FlockerVolumeSource, s conversion.Scope) error { + if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { + defaulting.(func(*v1.FlockerVolumeSource))(in) + } + out.DatasetName = in.DatasetName + return nil +} + +func convert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in *v1.FlockerVolumeSource, out *api.FlockerVolumeSource, s conversion.Scope) error { + return autoconvert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in, out, s) +} + func autoconvert_v1_GCEPersistentDiskVolumeSource_To_api_GCEPersistentDiskVolumeSource(in *v1.GCEPersistentDiskVolumeSource, out *api.GCEPersistentDiskVolumeSource, s conversion.Scope) error { if defaulting, found := s.DefaultingInterface(reflect.TypeOf(*in)); found { defaulting.(func(*v1.GCEPersistentDiskVolumeSource))(in) @@ -2058,6 +2090,14 @@ func autoconvert_v1_VolumeSource_To_api_VolumeSource(in *v1.VolumeSource, out *a } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(api.FlockerVolumeSource) + if err := convert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource(in.Flocker, out.Flocker, s); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(api.DownwardAPIVolumeSource) if err := convert_v1_DownwardAPIVolumeSource_To_api_DownwardAPIVolumeSource(in.DownwardAPI, out.DownwardAPI, s); err != nil { @@ -3790,6 +3830,7 @@ func init() { autoconvert_api_EnvVar_To_v1_EnvVar, autoconvert_api_ExecAction_To_v1_ExecAction, autoconvert_api_FCVolumeSource_To_v1_FCVolumeSource, + autoconvert_api_FlockerVolumeSource_To_v1_FlockerVolumeSource, autoconvert_api_GCEPersistentDiskVolumeSource_To_v1_GCEPersistentDiskVolumeSource, autoconvert_api_GitRepoVolumeSource_To_v1_GitRepoVolumeSource, autoconvert_api_GlusterfsVolumeSource_To_v1_GlusterfsVolumeSource, @@ -3869,6 +3910,7 @@ func init() { autoconvert_v1_EnvVar_To_api_EnvVar, autoconvert_v1_ExecAction_To_api_ExecAction, autoconvert_v1_FCVolumeSource_To_api_FCVolumeSource, + autoconvert_v1_FlockerVolumeSource_To_api_FlockerVolumeSource, autoconvert_v1_GCEPersistentDiskVolumeSource_To_api_GCEPersistentDiskVolumeSource, autoconvert_v1_GitRepoVolumeSource_To_api_GitRepoVolumeSource, autoconvert_v1_GlusterfsVolumeSource_To_api_GlusterfsVolumeSource, diff --git a/pkg/apis/experimental/v1alpha1/deep_copy_generated.go b/pkg/apis/experimental/v1alpha1/deep_copy_generated.go index 6752729d7b4..cfb2033a3d1 100644 --- a/pkg/apis/experimental/v1alpha1/deep_copy_generated.go +++ b/pkg/apis/experimental/v1alpha1/deep_copy_generated.go @@ -311,6 +311,11 @@ func deepCopy_v1_FCVolumeSource(in v1.FCVolumeSource, out *v1.FCVolumeSource, c return nil } +func deepCopy_v1_FlockerVolumeSource(in v1.FlockerVolumeSource, out *v1.FlockerVolumeSource, c *conversion.Cloner) error { + out.DatasetName = in.DatasetName + return nil +} + func deepCopy_v1_GCEPersistentDiskVolumeSource(in v1.GCEPersistentDiskVolumeSource, out *v1.GCEPersistentDiskVolumeSource, c *conversion.Cloner) error { out.PDName = in.PDName out.FSType = in.FSType @@ -800,6 +805,14 @@ func deepCopy_v1_VolumeSource(in v1.VolumeSource, out *v1.VolumeSource, c *conve } else { out.CephFS = nil } + if in.Flocker != nil { + out.Flocker = new(v1.FlockerVolumeSource) + if err := deepCopy_v1_FlockerVolumeSource(*in.Flocker, out.Flocker, c); err != nil { + return err + } + } else { + out.Flocker = nil + } if in.DownwardAPI != nil { out.DownwardAPI = new(v1.DownwardAPIVolumeSource) if err := deepCopy_v1_DownwardAPIVolumeSource(*in.DownwardAPI, out.DownwardAPI, c); err != nil { @@ -1467,6 +1480,7 @@ func init() { deepCopy_v1_EnvVarSource, deepCopy_v1_ExecAction, deepCopy_v1_FCVolumeSource, + deepCopy_v1_FlockerVolumeSource, deepCopy_v1_GCEPersistentDiskVolumeSource, deepCopy_v1_GitRepoVolumeSource, deepCopy_v1_GlusterfsVolumeSource, diff --git a/pkg/volume/flocker/doc.go b/pkg/volume/flocker/doc.go new file mode 100644 index 00000000000..e07a0d519aa --- /dev/null +++ b/pkg/volume/flocker/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package flocker contains the internal representation of Flocker volumes +package flocker diff --git a/pkg/volume/flocker/plugin.go b/pkg/volume/flocker/plugin.go new file mode 100644 index 00000000000..c6dbca868a7 --- /dev/null +++ b/pkg/volume/flocker/plugin.go @@ -0,0 +1,232 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "fmt" + "path" + "strconv" + "time" + + flockerClient "github.com/ClusterHQ/flocker-go" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/mount" + "k8s.io/kubernetes/pkg/volume" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +const ( + flockerPluginName = "kubernetes.io/flocker" + + defaultHost = "localhost" + defaultPort = 4523 + defaultCACertFile = "/etc/flocker/cluster.crt" + defaultClientKeyFile = "/etc/flocker/apiuser.key" + defaultClientCertFile = "/etc/flocker/apiuser.crt" + + timeoutWaitingForVolume = 2 * time.Minute + tickerWaitingForVolume = 5 * time.Second +) + +func ProbeVolumePlugins() []volume.VolumePlugin { + return []volume.VolumePlugin{&flockerPlugin{}} +} + +type flockerPlugin struct { + host volume.VolumeHost +} + +type flocker struct { + datasetName string + path string + pod *api.Pod + mounter mount.Interface + plugin *flockerPlugin +} + +func (p *flockerPlugin) Init(host volume.VolumeHost) { + p.host = host +} + +func (p flockerPlugin) Name() string { + return flockerPluginName +} + +func (p flockerPlugin) CanSupport(spec *volume.Spec) bool { + return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) || + (spec.Volume != nil && spec.Volume.Flocker != nil) +} + +func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*api.FlockerVolumeSource, bool) { + // AFAIK this will always be r/w, but perhaps for the future it will be needed + readOnly := false + + if spec.Volume != nil && spec.Volume.Flocker != nil { + return spec.Volume.Flocker, readOnly + } + return spec.PersistentVolume.Spec.Flocker, readOnly +} + +func (p *flockerPlugin) NewBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) { + source, readOnly := p.getFlockerVolumeSource(spec) + builder := flockerBuilder{ + flocker: &flocker{ + datasetName: source.DatasetName, + pod: pod, + mounter: p.host.GetMounter(), + plugin: p, + }, + exe: exec.New(), + opts: opts, + readOnly: readOnly, + } + return &builder, nil +} + +func (p *flockerPlugin) NewCleaner(datasetName string, podUID types.UID) (volume.Cleaner, error) { + // Flocker agent will take care of this, there is nothing we can do here + return nil, nil +} + +type flockerBuilder struct { + *flocker + client flockerClient.Clientable + exe exec.Interface + opts volume.VolumeOptions + readOnly bool +} + +func (b flockerBuilder) GetPath() string { + return b.flocker.path +} + +func (b flockerBuilder) SetUp() error { + return b.SetUpAt(b.flocker.datasetName) +} + +// newFlockerClient uses environment variables and pod attributes to return a +// flocker client capable of talking with the Flocker control service. +func (b flockerBuilder) newFlockerClient() (*flockerClient.Client, error) { + host := getenvOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost) + portConfig := getenvOrFallback("FLOCKER_CONTROL_SERVICE_PORT", strconv.Itoa(defaultPort)) + port, err := strconv.Atoi(portConfig) + if err != nil { + return nil, err + } + caCertPath := getenvOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile) + keyPath := getenvOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile) + certPath := getenvOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile) + + c, err := flockerClient.NewClient(host, port, b.flocker.pod.Status.HostIP, caCertPath, keyPath, certPath) + return c, err +} + +func (b *flockerBuilder) getMetaDir() string { + return path.Join( + b.plugin.host.GetPodPluginDir( + b.flocker.pod.UID, util.EscapeQualifiedNameForDisk(flockerPluginName), + ), + b.datasetName, + ) +} + +/* +SetUpAt will setup a Flocker volume following this flow of calls to the Flocker +control service: + +1. Get the dataset id for the given volume name/dir +2. It should already be there, if it's not the user needs to manually create it +3. Check the current Primary UUID +4. If it doesn't match with the Primary UUID that we got on 2, then we will + need to update the Primary UUID for this volume. +5. Wait until the Primary UUID was updated or timeout. +*/ +func (b flockerBuilder) SetUpAt(dir string) error { + if volumeutil.IsReady(b.getMetaDir()) { + return nil + } + + if b.client == nil { + c, err := b.newFlockerClient() + if err != nil { + return err + } + b.client = c + } + + datasetID, err := b.client.GetDatasetID(dir) + if err != nil { + return err + } + + s, err := b.client.GetDatasetState(datasetID) + if err != nil { + return fmt.Errorf("The volume '%s' is not available in Flocker. You need to create this manually with Flocker CLI before using it.", dir) + } + + primaryUUID, err := b.client.GetPrimaryUUID() + if err != nil { + return err + } + + if s.Primary != primaryUUID { + if err := b.updateDatasetPrimary(datasetID, primaryUUID); err != nil { + return err + } + } + + b.flocker.path = s.Path + volumeutil.SetReady(b.getMetaDir()) + return nil +} + +func (b flockerBuilder) IsReadOnly() bool { + return b.readOnly +} + +// updateDatasetPrimary will update the primary in Flocker and wait for it to +// be ready. If it never gets to ready state it will timeout and error. +func (b flockerBuilder) updateDatasetPrimary(datasetID, primaryUUID string) error { + // We need to update the primary and wait for it to be ready + _, err := b.client.UpdatePrimaryForDataset(primaryUUID, datasetID) + if err != nil { + return err + } + + timeoutChan := time.NewTimer(timeoutWaitingForVolume).C + tickChan := time.NewTicker(tickerWaitingForVolume).C + + for { + if s, err := b.client.GetDatasetState(datasetID); err == nil && s.Primary == primaryUUID { + return nil + } + + select { + case <-timeoutChan: + return fmt.Errorf( + "Timed out waiting for the dataset_id: '%s' to be moved to the primary: '%s'\n%v", + datasetID, primaryUUID, err, + ) + case <-tickChan: + break + } + } + +} diff --git a/pkg/volume/flocker/plugin_test.go b/pkg/volume/flocker/plugin_test.go new file mode 100644 index 00000000000..9fd24096937 --- /dev/null +++ b/pkg/volume/flocker/plugin_test.go @@ -0,0 +1,209 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "testing" + + flockerClient "github.com/ClusterHQ/flocker-go" + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/volume" +) + +const pluginName = "kubernetes.io/flocker" + +func newInitializedVolumePlugMgr() volume.VolumePluginMgr { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("/foo/bar", nil, nil)) + return plugMgr +} + +func TestGetByName(t *testing.T) { + assert := assert.New(t) + plugMgr := newInitializedVolumePlugMgr() + + plug, err := plugMgr.FindPluginByName(pluginName) + assert.NotNil(plug, "Can't find the plugin by name") + assert.NoError(err) +} + +func TestCanSupport(t *testing.T) { + assert := assert.New(t) + plugMgr := newInitializedVolumePlugMgr() + + plug, err := plugMgr.FindPluginByName(pluginName) + assert.NoError(err) + + specs := map[*volume.Spec]bool{ + &volume.Spec{ + Volume: &api.Volume{ + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{}, + }, + }, + }: true, + &volume.Spec{ + PersistentVolume: &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + Flocker: &api.FlockerVolumeSource{}, + }, + }, + }, + }: true, + &volume.Spec{ + Volume: &api.Volume{ + VolumeSource: api.VolumeSource{}, + }, + }: false, + } + + for spec, expected := range specs { + actual := plug.CanSupport(spec) + assert.Equal(expected, actual) + } +} + +func TestGetFlockerVolumeSource(t *testing.T) { + assert := assert.New(t) + + p := flockerPlugin{} + + spec := &volume.Spec{ + Volume: &api.Volume{ + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{}, + }, + }, + } + vs, ro := p.getFlockerVolumeSource(spec) + assert.False(ro) + assert.Equal(spec.Volume.Flocker, vs) + + spec = &volume.Spec{ + PersistentVolume: &api.PersistentVolume{ + Spec: api.PersistentVolumeSpec{ + PersistentVolumeSource: api.PersistentVolumeSource{ + Flocker: &api.FlockerVolumeSource{}, + }, + }, + }, + } + vs, ro = p.getFlockerVolumeSource(spec) + assert.False(ro) + assert.Equal(spec.PersistentVolume.Spec.Flocker, vs) +} + +func TestNewBuilder(t *testing.T) { + assert := assert.New(t) + + plugMgr := newInitializedVolumePlugMgr() + plug, err := plugMgr.FindPluginByName(pluginName) + assert.NoError(err) + + spec := &volume.Spec{ + Volume: &api.Volume{ + VolumeSource: api.VolumeSource{ + Flocker: &api.FlockerVolumeSource{ + DatasetName: "something", + }, + }, + }, + } + + _, err = plug.NewBuilder(spec, &api.Pod{}, volume.VolumeOptions{}) + assert.NoError(err) +} + +func TestNewCleaner(t *testing.T) { + assert := assert.New(t) + + p := flockerPlugin{} + + cleaner, err := p.NewCleaner("", types.UID("")) + assert.Nil(cleaner) + assert.NoError(err) +} + +func TestIsReadOnly(t *testing.T) { + b := flockerBuilder{readOnly: true} + assert.True(t, b.IsReadOnly()) +} + +func TestGetPath(t *testing.T) { + const expectedPath = "/flocker/expected" + + assert := assert.New(t) + + b := flockerBuilder{flocker: &flocker{path: expectedPath}} + assert.Equal(expectedPath, b.GetPath()) +} + +type mockFlockerClient struct { + datasetID, primaryUUID, path string + datasetState *flockerClient.DatasetState +} + +func newMockFlockerClient(mockDatasetID, mockPrimaryUUID, mockPath string) *mockFlockerClient { + return &mockFlockerClient{ + datasetID: mockDatasetID, + primaryUUID: mockPrimaryUUID, + path: mockPath, + datasetState: &flockerClient.DatasetState{ + Path: mockPath, + DatasetID: mockDatasetID, + Primary: mockPrimaryUUID, + }, + } +} + +func (m mockFlockerClient) CreateDataset(metaName string) (*flockerClient.DatasetState, error) { + return m.datasetState, nil +} +func (m mockFlockerClient) GetDatasetState(datasetID string) (*flockerClient.DatasetState, error) { + return m.datasetState, nil +} +func (m mockFlockerClient) GetDatasetID(metaName string) (string, error) { + return m.datasetID, nil +} +func (m mockFlockerClient) GetPrimaryUUID() (string, error) { + return m.primaryUUID, nil +} +func (m mockFlockerClient) UpdatePrimaryForDataset(primaryUUID, datasetID string) (*flockerClient.DatasetState, error) { + return m.datasetState, nil +} + +func TestSetUpAtInternal(t *testing.T) { + const dir = "dir" + mockPath := "expected-to-be-set-properly" // package var + expectedPath := mockPath + + assert := assert.New(t) + + plugMgr := newInitializedVolumePlugMgr() + plug, err := plugMgr.FindPluginByName(flockerPluginName) + assert.NoError(err) + + pod := &api.Pod{ObjectMeta: api.ObjectMeta{UID: types.UID("poduid")}} + b := flockerBuilder{flocker: &flocker{pod: pod, plugin: plug.(*flockerPlugin)}} + b.client = newMockFlockerClient("dataset-id", "primary-uid", mockPath) + + assert.NoError(b.SetUpAt(dir)) + assert.Equal(expectedPath, b.flocker.path) +} diff --git a/pkg/volume/flocker/util.go b/pkg/volume/flocker/util.go new file mode 100644 index 00000000000..25e54f7b1dc --- /dev/null +++ b/pkg/volume/flocker/util.go @@ -0,0 +1,28 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import "os" + +// getenvOrDefault returns the value of the enviroment variable if it's set, +// otherwise it return the default value provided. +func getenvOrFallback(key, defaultValue string) string { + if v := os.Getenv(key); v != "" { + return v + } + return defaultValue +} diff --git a/pkg/volume/flocker/util_test.go b/pkg/volume/flocker/util_test.go new file mode 100644 index 00000000000..72812f2b06b --- /dev/null +++ b/pkg/volume/flocker/util_test.go @@ -0,0 +1,37 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flocker + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetenvOrFallback(t *testing.T) { + const expected = "foo" + + assert := assert.New(t) + + key := "FLOCKER_SET_VAR" + os.Setenv(key, expected) + assert.Equal(expected, getenvOrFallback(key, "~"+expected)) + + key = "FLOCKER_UNSET_VAR" + assert.Equal(expected, getenvOrFallback(key, expected)) +}