mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-10-04 09:36:16 +00:00
Add StorageOS volume plugin
This commit is contained in:
189
vendor/github.com/storageos/go-api/event.go
generated
vendored
Normal file
189
vendor/github.com/storageos/go-api/event.go
generated
vendored
Normal file
@@ -0,0 +1,189 @@
|
||||
package storageos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"github.com/storageos/go-api/types"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
// EventAPIPrefix is a partial path to the HTTP endpoint.
|
||||
EventAPIPrefix = "event"
|
||||
|
||||
// ErrNoSuchEvent is the error returned when the event does not exist.
|
||||
ErrNoSuchEvent = errors.New("no such event")
|
||||
)
|
||||
|
||||
// EventList returns the list of available events.
|
||||
func (c *Client) EventList(opts types.ListOptions) ([]*types.Event, error) {
|
||||
listOpts := doOptions{
|
||||
fieldSelector: opts.FieldSelector,
|
||||
labelSelector: opts.LabelSelector,
|
||||
context: opts.Context,
|
||||
}
|
||||
resp, err := c.do("GET", EventAPIPrefix, listOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
var events []*types.Event
|
||||
if err := json.NewDecoder(resp.Body).Decode(&events); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// Events returns a stream of events in the daemon. It's up to the caller to close the stream
|
||||
// by cancelling the context. Once the stream has been completely read an io.EOF error will
|
||||
// be sent over the error channel. If an error is sent all processing will be stopped. It's up
|
||||
// to the caller to reopen the stream in the event of an error by reinvoking this method.
|
||||
func (c *Client) Events(ctx context.Context, opts types.ListOptions) (<-chan types.Request, <-chan error) {
|
||||
|
||||
// listOpts := doOptions{
|
||||
// fieldSelector: opts.FieldSelector,
|
||||
// labelSelector: opts.LabelSelector,
|
||||
// context: ctx,
|
||||
// }
|
||||
|
||||
messages := make(chan types.Request)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
// started := make(chan struct{})
|
||||
ws, _, err := websocket.DefaultDialer.Dial("ws://10.245.103.2:8000/v1/ws/event", nil)
|
||||
if err != nil {
|
||||
// close(started)
|
||||
// errs <- err
|
||||
log.Fatal(err)
|
||||
}
|
||||
// defer ws.Close()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer ws.Close()
|
||||
defer close(done)
|
||||
for {
|
||||
_, message, err := ws.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println("read:", err)
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
// log.Printf("recv: %s", message)
|
||||
var request types.Request
|
||||
if err := json.Unmarshal(message, &request); err != nil {
|
||||
log.Printf("decode error: %s", message)
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
messages <- request
|
||||
}
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case t := <-ticker.C:
|
||||
log.Printf("tick: %s\n", t.String())
|
||||
err := ws.WriteMessage(websocket.TextMessage, []byte(t.String()))
|
||||
if err != nil {
|
||||
log.Println("write:", err)
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
log.Println("done")
|
||||
err := ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||
if err != nil {
|
||||
log.Println("write close:", err)
|
||||
return
|
||||
}
|
||||
errs <- ctx.Err()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
ws.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// go func() {
|
||||
// defer ws.Close()
|
||||
// defer close(errs)
|
||||
//
|
||||
// // query, err := buildEventsQueryParams(cli.version, options)
|
||||
// // if err != nil {
|
||||
// // close(started)
|
||||
// // errs <- err
|
||||
// // return
|
||||
// // }
|
||||
//
|
||||
// // resp, err := cli.get(ctx, "/events", query, nil)
|
||||
//
|
||||
// // decoder := json.NewDecoder(resp.Body)
|
||||
//
|
||||
// close(started)
|
||||
// for {
|
||||
// select {
|
||||
// case <-ctx.Done():
|
||||
// log.Println("done")
|
||||
// errs <- ctx.Err()
|
||||
// return
|
||||
// default:
|
||||
// log.Println("default")
|
||||
// _, message, err := ws.ReadMessage()
|
||||
// if err != nil {
|
||||
// log.Println("read:", err)
|
||||
// return
|
||||
// }
|
||||
// log.Printf("recv: %s", message)
|
||||
// var event types.Event
|
||||
// if err := json.Unmarshal(message, &event); err != nil {
|
||||
// log.Printf("decode error: %s", message)
|
||||
// errs <- err
|
||||
// return
|
||||
// }
|
||||
// log.Printf("sent: %v", event)
|
||||
// messages <- event
|
||||
//
|
||||
// // select {
|
||||
// // case messages <- event:
|
||||
// // case <-ctx.Done():
|
||||
// // errs <- ctx.Err()
|
||||
// // return
|
||||
// // }
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
// <-started
|
||||
log.Println("returning")
|
||||
return messages, errs
|
||||
}
|
||||
|
||||
// Event returns a event by its reference.
|
||||
func (c *Client) Event(ref string) (*types.Event, error) {
|
||||
resp, err := c.do("GET", EventAPIPrefix+"/"+ref, doOptions{})
|
||||
if err != nil {
|
||||
if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound {
|
||||
return nil, ErrNoSuchEvent
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
var event types.Event
|
||||
if err := json.NewDecoder(resp.Body).Decode(&event); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &event, nil
|
||||
}
|
Reference in New Issue
Block a user