Files
linuxkit/vendor/github.com/docker/infrakit/pkg/rpc/server/server.go
Rolf Neugebauer 2ab909fcbd vendor: Update to a new version of InfraKit
This pulls in another slew of other packages.

Signed-off-by: Rolf Neugebauer <rolf.neugebauer@docker.com>
2017-04-04 16:07:53 +01:00

183 lines
4.4 KiB
Go

package server
import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/http/httputil"
"time"
log "github.com/Sirupsen/logrus"
broker "github.com/docker/infrakit/pkg/broker/server"
rpc_server "github.com/docker/infrakit/pkg/rpc"
"github.com/docker/infrakit/pkg/spi"
"github.com/docker/infrakit/pkg/spi/event"
"github.com/docker/infrakit/pkg/types"
"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2"
"github.com/gorilla/rpc/v2/json2"
"gopkg.in/tylerb/graceful.v1"
)
// Stoppable support proactive stopping, and blocking until stopped.
type Stoppable interface {
Stop()
AwaitStopped()
Wait() <-chan struct{}
}
type stoppableServer struct {
server *graceful.Server
}
func (s *stoppableServer) Stop() {
s.server.Stop(10 * time.Second)
}
func (s *stoppableServer) Wait() <-chan struct{} {
return s.server.StopChan()
}
func (s *stoppableServer) AwaitStopped() {
<-s.server.StopChan()
}
type loggingHandler struct {
handler http.Handler
}
func (h loggingHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
requestData, err := httputil.DumpRequest(req, true)
if err == nil {
log.Debugf("Received request %s", string(requestData))
} else {
log.Error(err)
}
recorder := httptest.NewRecorder()
h.handler.ServeHTTP(recorder, req)
responseData, err := httputil.DumpResponse(recorder.Result(), true)
if err == nil {
log.Debugf("Sending response %s", string(responseData))
} else {
log.Error(err)
}
w.WriteHeader(recorder.Code)
recorder.Body.WriteTo(w)
}
// A VersionedInterface identifies which Interfaces a plugin supports.
type VersionedInterface interface {
// ImplementedInterface returns the interface being provided.
ImplementedInterface() spi.InterfaceSpec
}
// StartPluginAtPath starts an HTTP server listening on a unix socket at the specified path.
// Returns a Stoppable that can be used to stop or block on the server.
func StartPluginAtPath(socketPath string, receiver VersionedInterface, more ...VersionedInterface) (Stoppable, error) {
server := rpc.NewServer()
server.RegisterCodec(json2.NewCodec(), "application/json")
targets := append([]VersionedInterface{receiver}, more...)
interfaces := []spi.InterfaceSpec{}
for _, t := range targets {
interfaces = append(interfaces, t.ImplementedInterface())
if err := server.RegisterService(t, ""); err != nil {
return nil, err
}
}
// handshake service that can exchange interface versions with client
if err := server.RegisterService(rpc_server.Handshake(interfaces), ""); err != nil {
return nil, err
}
// events handler
events := broker.NewBroker()
// wire up the publish event source channel to the plugin implementations
for _, t := range targets {
pub, is := t.(event.Publisher)
if !is {
continue
}
// We give one channel per source to provide some isolation. This we won't have the
// whole event bus stop just because one plugin closes the channel.
eventChan := make(chan *event.Event)
pub.PublishOn(eventChan)
go func() {
for {
event, ok := <-eventChan
if !ok {
return
}
events.Publish(event.Topic.String(), event, 1*time.Second)
}
}()
}
// info handler
info, err := NewPluginInfo(receiver)
if err != nil {
return nil, err
}
httpLog := log.New()
httpLog.Level = log.GetLevel()
router := mux.NewRouter()
router.HandleFunc(rpc_server.URLAPI, info.ShowAPI)
router.HandleFunc(rpc_server.URLFunctions, info.ShowTemplateFunctions)
intercept := broker.Interceptor{
Pre: func(topic string, headers map[string][]string) error {
for _, target := range targets {
if v, is := target.(event.Validator); is {
if err := v.Validate(types.PathFromString(topic)); err == nil {
return nil
}
}
}
return broker.ErrInvalidTopic(topic)
},
Do: events.ServeHTTP,
Post: func(topic string) {
log.Infoln("Client left", topic)
},
}
router.HandleFunc(rpc_server.URLEventsPrefix, intercept.ServeHTTP)
logger := loggingHandler{handler: server}
router.Handle("/", logger)
gracefulServer := graceful.Server{
Timeout: 10 * time.Second,
Server: &http.Server{Addr: fmt.Sprintf("unix://%s", socketPath), Handler: router},
}
listener, err := net.Listen("unix", socketPath)
if err != nil {
return nil, err
}
log.Infof("Listening at: %s", socketPath)
go func() {
err := gracefulServer.Serve(listener)
if err != nil {
log.Warn(err)
}
events.Stop()
}()
return &stoppableServer{server: &gracefulServer}, nil
}