Merge pull request #97922 from liggitt/heketi

Update gluster client, configure with filtered dialer
This commit is contained in:
Kubernetes Prow Robot 2021-01-11 16:40:45 -08:00 committed by GitHub
commit 0b75828dfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 259 additions and 18 deletions

4
go.mod
View File

@ -60,7 +60,7 @@ require (
github.com/googleapis/gnostic v0.4.1 github.com/googleapis/gnostic v0.4.1
github.com/gorilla/context v1.1.1 // indirect github.com/gorilla/context v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.1 github.com/hashicorp/golang-lru v0.5.1
github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible github.com/heketi/heketi v10.2.0+incompatible
github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6 // indirect github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6 // indirect
github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5 github.com/ishidawataru/sctp v0.0.0-20190723014705-7c296d48a2b5
github.com/json-iterator/go v1.1.10 github.com/json-iterator/go v1.1.10
@ -317,7 +317,7 @@ replace (
github.com/hashicorp/mdns => github.com/hashicorp/mdns v1.0.0 github.com/hashicorp/mdns => github.com/hashicorp/mdns v1.0.0
github.com/hashicorp/memberlist => github.com/hashicorp/memberlist v0.1.3 github.com/hashicorp/memberlist => github.com/hashicorp/memberlist v0.1.3
github.com/hashicorp/serf => github.com/hashicorp/serf v0.8.2 github.com/hashicorp/serf => github.com/hashicorp/serf v0.8.2
github.com/heketi/heketi => github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible github.com/heketi/heketi => github.com/heketi/heketi v10.2.0+incompatible
github.com/heketi/tests => github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6 github.com/heketi/tests => github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6
github.com/hpcloud/tail => github.com/hpcloud/tail v1.0.0 github.com/hpcloud/tail => github.com/hpcloud/tail v1.0.0
github.com/ianlancetaylor/demangle => github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 github.com/ianlancetaylor/demangle => github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6

4
go.sum
View File

@ -287,8 +287,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible h1:ysqc8k973k1lLJ4BOOHAkx14K2nt4cLjsIm+hwWDZDE= github.com/heketi/heketi v10.2.0+incompatible h1:kw0rXzWGCXZP5XMP07426kKiz4hGFgR9ok+GTg+wDS8=
github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible/go.mod h1:bB9ly3RchcQqsQ9CpyaQwvva7RS5ytVoSoholZQON6o= github.com/heketi/heketi v10.2.0+incompatible/go.mod h1:bB9ly3RchcQqsQ9CpyaQwvva7RS5ytVoSoholZQON6o=
github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6 h1:oJ/NLadJn5HoxvonA6VxG31lg0d6XOURNA09BTtM4fY= github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6 h1:oJ/NLadJn5HoxvonA6VxG31lg0d6XOURNA09BTtM4fY=
github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6/go.mod h1:xGMAM8JLi7UkZt1i4FQeQy0R2T8GLUwQhOP5M1gBhy4= github.com/heketi/tests v0.0.0-20151005000721-f3775cbcefd6/go.mod h1:xGMAM8JLi7UkZt1i4FQeQy0R2T8GLUwQhOP5M1gBhy4=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=

View File

@ -17,6 +17,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/volume/glusterfs", importpath = "k8s.io/kubernetes/pkg/volume/glusterfs",
deps = [ deps = [
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library", "//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -18,10 +18,12 @@ package glusterfs
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
"net" "net"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -46,6 +48,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
volumehelpers "k8s.io/cloud-provider/volume/helpers" volumehelpers "k8s.io/cloud-provider/volume/helpers"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util" volutil "k8s.io/kubernetes/pkg/volume/util"
) )
@ -662,7 +665,7 @@ func (d *glusterfsVolumeDeleter) Delete() error {
return fmt.Errorf("failed to release gid %v: %v", gid, err) return fmt.Errorf("failed to release gid %v: %v", gid, err)
} }
} }
cli := gcli.NewClient(d.url, d.user, d.secretValue) cli := filterClient(gcli.NewClient(d.url, d.user, d.secretValue), d.plugin.host.GetFilteredDialOptions())
if cli == nil { if cli == nil {
klog.Errorf("failed to create glusterfs REST client") klog.Errorf("failed to create glusterfs REST client")
return fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed") return fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed")
@ -703,6 +706,20 @@ func (d *glusterfsVolumeDeleter) Delete() error {
return nil return nil
} }
func filterClient(client *gcli.Client, opts *proxyutil.FilteredDialOptions) *gcli.Client {
if opts == nil {
return client
}
dialer := proxyutil.NewFilteredDialContext(nil, nil, opts)
client.SetClientFunc(func(tlsConfig *tls.Config, checkRedirect gcli.CheckRedirectFunc) (gcli.HttpPerformer, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.DialContext = dialer
transport.TLSClientConfig = tlsConfig
return &http.Client{Transport: transport, CheckRedirect: checkRedirect}, nil
})
return client
}
func (p *glusterfsVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) { func (p *glusterfsVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
if !volutil.AccessModesContainedInAll(p.plugin.GetAccessModes(), p.options.PVC.Spec.AccessModes) { if !volutil.AccessModesContainedInAll(p.plugin.GetAccessModes(), p.options.PVC.Spec.AccessModes) {
return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", p.options.PVC.Spec.AccessModes, p.plugin.GetAccessModes()) return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", p.options.PVC.Spec.AccessModes, p.plugin.GetAccessModes())
@ -794,7 +811,7 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersi
if p.url == "" { if p.url == "" {
return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST URL is empty") return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST URL is empty")
} }
cli := gcli.NewClient(p.url, p.user, p.secretValue) cli := filterClient(gcli.NewClient(p.url, p.user, p.secretValue), p.plugin.host.GetFilteredDialOptions())
if cli == nil { if cli == nil {
return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed") return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed")
} }
@ -1205,7 +1222,7 @@ func (plugin *glusterfsPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize res
klog.V(4).Infof("expanding volume: %q", volumeID) klog.V(4).Infof("expanding volume: %q", volumeID)
//Create REST server connection //Create REST server connection
cli := gcli.NewClient(cfg.url, cfg.user, cfg.secretValue) cli := filterClient(gcli.NewClient(cfg.url, cfg.user, cfg.secretValue), plugin.host.GetFilteredDialOptions())
if cli == nil { if cli == nil {
klog.Errorf("failed to create glusterfs REST client") klog.Errorf("failed to create glusterfs REST client")
return oldSize, fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed") return oldSize, fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed")

View File

@ -6,6 +6,7 @@ go_library(
"admin.go", "admin.go",
"backup.go", "backup.go",
"block_volume.go", "block_volume.go",
"brick.go",
"client.go", "client.go",
"cluster.go", "cluster.go",
"db.go", "db.go",

View File

@ -157,3 +157,56 @@ func (c *Client) BlockVolumeDelete(id string) error {
return nil return nil
} }
func (c *Client) BlockVolumeExpand(id string, request *api.BlockVolumeExpandRequest) (
*api.BlockVolumeInfoResponse, error) {
// Marshal request to JSON
buffer, err := json.Marshal(request)
if err != nil {
return nil, err
}
// Create a request
req, err := http.NewRequest("POST",
c.host+"/blockvolumes/"+id+"/expand",
bytes.NewBuffer(buffer))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
// Set token
err = c.setToken(req)
if err != nil {
return nil, err
}
// Send request
r, err := c.do(req)
if err != nil {
return nil, err
}
defer r.Body.Close()
if r.StatusCode != http.StatusAccepted {
return nil, utils.GetErrorFromResponse(r)
}
// Wait for response
r, err = c.pollResponse(r)
if err != nil {
return nil, err
}
if r.StatusCode != http.StatusOK {
return nil, utils.GetErrorFromResponse(r)
}
// Read JSON response
var blockvolume api.BlockVolumeInfoResponse
err = utils.GetJsonFromResponse(r, &blockvolume)
if err != nil {
return nil, err
}
return &blockvolume, nil
}

View File

@ -0,0 +1,72 @@
//
// Copyright (c) 2019 The heketi Authors
//
// This file is licensed to you under your choice of the GNU Lesser
// General Public License, version 3 or any later version (LGPLv3 or
// later), as published by the Free Software Foundation,
// or under the Apache License, Version 2.0 <LICENSE-APACHE2 or
// http://www.apache.org/licenses/LICENSE-2.0>.
//
// You may not use this file except in compliance with those terms.
//
package client
import (
"bytes"
"encoding/json"
"io"
"net/http"
"github.com/heketi/heketi/pkg/glusterfs/api"
"github.com/heketi/heketi/pkg/utils"
)
// BrickEvict requests that Heketi evict the given brick from the
// underlying gluster volume, automatically replacing it with a new brick.
//
// NOTE: options is currently empty but reserved for future extensions
// to the api.
func (c *Client) BrickEvict(id string, request *api.BrickEvictOptions) error {
var buf io.Reader
if request != nil {
b, err := json.Marshal(request)
if err != nil {
return err
}
buf = bytes.NewBuffer(b)
}
// Create a request
req, err := http.NewRequest("POST", c.host+"/bricks/to-evict/"+id, buf)
if err != nil {
return err
}
// Set token
err = c.setToken(req)
if err != nil {
return err
}
// Send request
r, err := c.do(req)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != http.StatusAccepted {
return utils.GetErrorFromResponse(r)
}
// Wait for response
r, err = c.pollResponse(r)
if err != nil {
return err
}
if r.StatusCode != http.StatusNoContent {
return utils.GetErrorFromResponse(r)
}
return nil
}

View File

@ -71,6 +71,9 @@ type Client struct {
// allow plugging in custom do wrappers // allow plugging in custom do wrappers
do func(*http.Request) (*http.Response, error) do func(*http.Request) (*http.Response, error)
// allow plugging in custom http client fetcher
getClient ClientFunc
} }
var defaultClientOptions = ClientOptions{ var defaultClientOptions = ClientOptions{
@ -154,6 +157,10 @@ func (c *Client) SetTLSOptions(o *ClientTLSOptions) error {
return nil return nil
} }
func (c *Client) SetClientFunc(f ClientFunc) {
c.getClient = f
}
// Simple Hello test to check if the server is up // Simple Hello test to check if the server is up
func (c *Client) Hello() error { func (c *Client) Hello() error {
// Create request // Create request
@ -189,13 +196,14 @@ func (c *Client) doBasic(req *http.Request) (*http.Response, error) {
<-c.throttle <-c.throttle
}() }()
httpClient := &http.Client{} getClient := c.getClient
if c.tlsClientConfig != nil { if getClient == nil {
httpClient.Transport = &http.Transport{ getClient = HeketiHttpClient
TLSClientConfig: c.tlsClientConfig, }
} httpClient, err := getClient(c.tlsClientConfig, c.checkRedirect)
if err != nil {
return nil, err
} }
httpClient.CheckRedirect = c.checkRedirect
return httpClient.Do(req) return httpClient.Do(req)
} }
@ -355,3 +363,30 @@ func (c *ClientOptions) retryDelay(r *http.Response) time.Duration {
s := rand.Intn(max-min) + min s := rand.Intn(max-min) + min
return time.Second * time.Duration(s) return time.Second * time.Duration(s)
} }
// CheckRedirectFunc is an alias for the somewhat complex function signature
// of the CheckRedirect function of the http.Client.
type CheckRedirectFunc func(*http.Request, []*http.Request) error
// ClientFunc is an alias for the function signature needed to create custom
// http clients.
type ClientFunc func(*tls.Config, CheckRedirectFunc) (HttpPerformer, error)
// HttpPerformer is an interface that the heketi api client needs from the http
// client.
type HttpPerformer interface {
Do(req *http.Request) (*http.Response, error)
}
// HeketiHttpClient constructs a new http client for use by the heketi
// api client, using the traditional heketi approach.
func HeketiHttpClient(tlsConfig *tls.Config, checkRedirect CheckRedirectFunc) (HttpPerformer, error) {
httpClient := &http.Client{}
if tlsConfig != nil {
httpClient.Transport = &http.Transport{
TLSClientConfig: tlsConfig,
}
}
httpClient.CheckRedirect = checkRedirect
return httpClient, nil
}

View File

@ -86,12 +86,35 @@ func ValidateDurabilityType(value interface{}) error {
return nil return nil
} }
type HealInfoCheck string
const (
HealCheckUnknown HealInfoCheck = ""
HealCheckEnable HealInfoCheck = "enable"
HealCheckDisable HealInfoCheck = "disable"
)
func ValidateHealCheck(value interface{}) error {
h, _ := value.(HealInfoCheck)
err := validation.Validate(h, validation.In(HealCheckUnknown, HealCheckEnable, HealCheckDisable))
if err != nil {
return fmt.Errorf("%v is not valid heal info check", h)
}
return nil
}
// Common // Common
type StateRequest struct { type StateRequest struct {
State EntryState `json:"state"` State EntryState `json:"state"`
HealCheck HealInfoCheck `json:"healcheck"`
} }
func (statereq StateRequest) Validate() error { func (statereq StateRequest) Validate() error {
if err := validation.ValidateStruct(&statereq,
validation.Field(&statereq.HealCheck, validation.By(ValidateHealCheck))); err != nil {
return err
}
return validation.ValidateStruct(&statereq, return validation.ValidateStruct(&statereq,
validation.Field(&statereq.State, validation.Required, validation.By(ValidateEntryState)), validation.Field(&statereq.State, validation.Required, validation.By(ValidateEntryState)),
) )
@ -422,6 +445,7 @@ type BlockVolumeInfo struct {
} `json:"blockvolume"` } `json:"blockvolume"`
Cluster string `json:"cluster,omitempty"` Cluster string `json:"cluster,omitempty"`
BlockHostingVolume string `json:"blockhostingvolume,omitempty"` BlockHostingVolume string `json:"blockhostingvolume,omitempty"`
UsableSize int `json:"usablesize,omitempty"`
} }
type BlockVolumeInfoResponse struct { type BlockVolumeInfoResponse struct {
@ -432,6 +456,16 @@ type BlockVolumeListResponse struct {
BlockVolumes []string `json:"blockvolumes"` BlockVolumes []string `json:"blockvolumes"`
} }
type BlockVolumeExpandRequest struct {
Size int `json:"new_size"`
}
func (blockVolExpandReq BlockVolumeExpandRequest) Validate() error {
return validation.ValidateStruct(&blockVolExpandReq,
validation.Field(&blockVolExpandReq.Size, validation.Required, validation.Min(1)),
)
}
type LogLevelInfo struct { type LogLevelInfo struct {
// should contain one or more logger to log-level-name mapping // should contain one or more logger to log-level-name mapping
LogLevel map[string]string `json:"loglevel"` LogLevel map[string]string `json:"loglevel"`
@ -554,6 +588,7 @@ func NewBlockVolumeInfoResponse() *BlockVolumeInfoResponse {
func (v *BlockVolumeInfoResponse) String() string { func (v *BlockVolumeInfoResponse) String() string {
s := fmt.Sprintf("Name: %v\n"+ s := fmt.Sprintf("Name: %v\n"+
"Size: %v\n"+ "Size: %v\n"+
"UsableSize: %v\n"+
"Volume Id: %v\n"+ "Volume Id: %v\n"+
"Cluster Id: %v\n"+ "Cluster Id: %v\n"+
"Hosts: %v\n"+ "Hosts: %v\n"+
@ -565,6 +600,7 @@ func (v *BlockVolumeInfoResponse) String() string {
"Block Hosting Volume: %v\n", "Block Hosting Volume: %v\n",
v.Name, v.Name,
v.Size, v.Size,
v.UsableSize,
v.Id, v.Id,
v.Cluster, v.Cluster,
v.BlockVolume.Hosts, v.BlockVolume.Hosts,
@ -680,3 +716,13 @@ func ValidateIds(v interface{}) error {
} }
return nil return nil
} }
// reserving a type for future options for brick evict
type BrickEvictOptions struct {
HealCheck HealInfoCheck `json:"healcheck"`
}
func (brickops BrickEvictOptions) Validate() error {
return validation.ValidateStruct(&brickops,
validation.Field(&brickops.HealCheck, validation.By(ValidateHealCheck)))
}

View File

@ -21,9 +21,22 @@ import (
"strings" "strings"
) )
var (
errMax = int64(4096)
strMax = int64(8192)
)
// Return the body from a response as a string // Return the body from a response as a string
func GetStringFromResponse(r *http.Response) (string, error) { func GetStringFromResponse(r *http.Response) (string, error) {
body, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength)) // If the content length is not set, limit reading to 8K worth of data.
return getResponse(r, strMax)
}
func getResponse(r *http.Response, max int64) (string, error) {
if r.ContentLength >= 0 {
max = r.ContentLength
}
body, err := ioutil.ReadAll(io.LimitReader(r.Body, max))
defer r.Body.Close() defer r.Body.Close()
if err != nil { if err != nil {
return "", err return "", err
@ -33,7 +46,10 @@ func GetStringFromResponse(r *http.Response) (string, error) {
// Return the body from a response as an error // Return the body from a response as an error
func GetErrorFromResponse(r *http.Response) error { func GetErrorFromResponse(r *http.Response) error {
s, err := GetStringFromResponse(r) // If the content length is not set, limit reading to 4K worth of data.
// It is probably way more than needed because an error that long is
// very unusual. Plus it will only cut it off rather than show nothing.
s, err := getResponse(r, errMax)
if err != nil { if err != nil {
return err return err
} }

4
vendor/modules.txt vendored
View File

@ -686,9 +686,9 @@ github.com/hashicorp/hcl/json/token
# github.com/hashicorp/mdns => github.com/hashicorp/mdns v1.0.0 # github.com/hashicorp/mdns => github.com/hashicorp/mdns v1.0.0
# github.com/hashicorp/memberlist => github.com/hashicorp/memberlist v0.1.3 # github.com/hashicorp/memberlist => github.com/hashicorp/memberlist v0.1.3
# github.com/hashicorp/serf => github.com/hashicorp/serf v0.8.2 # github.com/hashicorp/serf => github.com/hashicorp/serf v0.8.2
# github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible => github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible # github.com/heketi/heketi v10.2.0+incompatible => github.com/heketi/heketi v10.2.0+incompatible
## explicit ## explicit
# github.com/heketi/heketi => github.com/heketi/heketi v9.0.1-0.20190917153846-c2e2a4ab7ab9+incompatible # github.com/heketi/heketi => github.com/heketi/heketi v10.2.0+incompatible
github.com/heketi/heketi/client/api/go-client github.com/heketi/heketi/client/api/go-client
github.com/heketi/heketi/pkg/glusterfs/api github.com/heketi/heketi/pkg/glusterfs/api
github.com/heketi/heketi/pkg/utils github.com/heketi/heketi/pkg/utils