miragesdk: rework the control plane protocol

Previously, the control plane was using HTTP client/server, that various people
found way too complex to run in a privileged container (for very good reasons).

So switching to a simpler binary protocol, using c-like structures. Will
probably switch to an other serialization protocol later (eg. protobuf
or cap-n-proto).

Signed-off-by: Thomas Gazagnaire <thomas@gazagnaire.org>
This commit is contained in:
Thomas Gazagnaire 2017-03-29 19:25:37 +02:00
parent df71c0f299
commit b5a3d4b2aa
7 changed files with 194 additions and 138 deletions

View File

@ -0,0 +1,46 @@
open Lwt.Infix
let src = Logs.Src.create "IO" ~doc:"IO helpers"
module Log = (val Logs.src_log src : Logs.LOG)
let rec really_write fd buf off len =
Log.debug (fun l -> l "really_write");
match len with
| 0 -> Lwt.return_unit
| len ->
Lwt_unix.write fd buf off len >>= fun n ->
really_write fd buf (off+n) (len-n)
let rec really_read fd buf off len =
Log.debug (fun l -> l "really_read");
match len with
| 0 -> Lwt.return_unit
| len ->
Lwt_unix.read fd buf off len >>= fun n ->
really_read fd buf (off+n) (len-n)
let read_all fd =
Log.debug (fun l -> l "read_all");
let len = 16 * 1024 in
let buf = Bytes.create len in
let rec loop acc =
Lwt_unix.read fd buf 0 len >>= fun n ->
let acc = String.sub buf 0 n :: acc in
if n <= len then Lwt.return (List.rev acc)
else loop acc
in
loop [] >|= fun bufs ->
String.concat "" bufs
let read_n fd len =
Log.debug (fun l -> l "read_n");
let buf = Bytes.create len in
let rec loop acc len =
Lwt_unix.read fd buf 0 len >>= fun n ->
let acc = String.sub buf 0 n :: acc in
match len - n with
| 0 -> Lwt.return (List.rev acc)
| r -> loop acc r
in
loop [] len >|= fun bufs ->
String.concat "" bufs

View File

@ -0,0 +1,13 @@
(** IO helpers *)
val really_write: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t
(** [really_write fd buf off len] writes exactly [len] bytes to [fd]. *)
val really_read: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t
(** [really_read fd buf off len] reads exactly [len] bytes from [fd]. *)
val read_all: Lwt_unix.file_descr -> string Lwt.t
(** [read_all fd] reads as much data as it is available in [fd]. *)
val read_n: Lwt_unix.file_descr -> int -> string Lwt.t
(** [read_n fd n] reads exactly [n] bytes from [fd]. *)

View File

@ -4,7 +4,7 @@ let src = Logs.Src.create "init" ~doc:"Init steps"
module Log = (val Logs.src_log src : Logs.LOG)
(* FIXME: to avoid linking with gmp *)
module IO = struct
module No_IO = struct
type ic = unit
type oc = unit
type ctx = unit
@ -17,7 +17,7 @@ module IO = struct
end
(* FIXME: we don't use Irmin_unix.Git.FS.KV to avoid linking with gmp *)
module Store = Irmin_git.FS.KV(IO)(Inflator)(Io_fs)
module Store = Irmin_git.FS.KV(No_IO)(Inflator)(Io_fs)
module KV = Store(Irmin.Contents.String)
let v path =
@ -28,17 +28,84 @@ let v path =
let () =
Irmin.Private.Watch.set_listen_dir_hook Irmin_watcher.hook
module Message = struct
[%%cenum
type operation =
| Write
| Read
| Delete
[@@uint8_t]
]
type t = {
operation: operation;
path : string;
payload : string option;
}
[%%cstruct type message = {
operation : uint8_t; (* = type operation *)
path : uint16_t;
payload : uint16_t;
} [@@little_endian]
]
(* to avoid warning 32 *)
let _ = hexdump_message
let _ = operation_to_string
let _ = string_to_operation
let read_message fd =
IO.read_n fd 4 >>= fun buf ->
let len =
Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0
|> Int32.to_int
in
IO.read_n fd len >>= fun buf ->
let buf = Cstruct.of_string buf in
let operation = match int_to_operation (get_message_operation buf) with
| None -> failwith "invalid operation"
| Some o -> o
in
let path_len = get_message_path buf in
let payload_len = get_message_payload buf in
IO.read_n fd path_len >>= fun path ->
(match payload_len with
| 0 -> Lwt.return None
| n -> IO.read_n fd n >|= fun x -> Some x)
>|= fun payload ->
{ operation; path; payload }
let write_message fd msg =
let operation = operation_to_int msg.operation in
let path = String.length msg.path in
let payload = match msg.payload with
| None -> 0
| Some x -> String.length x
in
let len = sizeof_message + path + payload in
let buf = Cstruct.create len in
set_message_operation buf operation;
set_message_path buf path;
set_message_payload buf path;
Cstruct.blit_from_bytes msg.path 0 buf sizeof_message path;
let () = match msg.payload with
| None -> ()
| Some x -> Cstruct.blit_from_bytes x 0 buf (sizeof_message+path) payload
in
IO.really_write fd (Cstruct.to_string buf) 0 len
end
module Dispatch = struct
module Wm = struct
module Rd = Webmachine.Rd
include Webmachine.Make(Cohttp_lwt_unix.Server.IO)
end
open Message
let with_key rd f =
match KV.Key.of_string rd.Wm.Rd.dispatch_path with
| Ok x -> f x
| Error _ -> Wm.respond 404 rd
let with_key msg f =
match KV.Key.of_string msg.path with
| Ok x -> f x
| Error (`Msg e) -> Fmt.kstrf Lwt.fail_with "invalid key: %s" e
let infof fmt =
Fmt.kstrf (fun msg () ->
@ -46,91 +113,50 @@ module Dispatch = struct
Irmin.Info.v ~date ~author:"calf" msg
) fmt
let ok = "{\"status\": \"ok\"}"
class item db = object(self)
inherit [Cohttp_lwt_body.t] Wm.resource
method private of_string rd =
Cohttp_lwt_body.to_string rd.Wm.Rd.req_body >>= fun value ->
with_key rd (fun key ->
let dispatch db msg =
with_key msg (fun key ->
match msg.operation with
| Write ->
let info = infof "Updating %a" KV.Key.pp key in
KV.set db ~info key value >>= fun () ->
let resp_body = `String ok in
let rd = { rd with Wm.Rd.resp_body } in
Wm.continue true rd
)
(match msg.payload with
| None -> Fmt.kstrf Lwt.fail_with "dispatch: missing payload"
| Some v -> KV.set db ~info key v)
| _ -> failwith "TODO"
)
method private to_string rd =
with_key rd (fun key ->
KV.find db key >>= function
| Some value -> Wm.continue (`String value) rd
| None -> assert false
)
method resource_exists rd =
with_key rd (fun key ->
KV.mem db key >>= fun mem ->
Wm.continue mem rd
)
method allowed_methods rd =
Wm.continue [`GET; `HEAD; `PUT; `DELETE] rd
method content_types_provided rd =
Wm.continue [
"plain", self#to_string
] rd
method content_types_accepted rd =
Wm.continue [
"plain", self#of_string
] rd
method delete_resource rd =
with_key rd (fun key ->
let info = infof "Deleting %a" KV.Key.pp key in
KV.remove db ~info key >>= fun () ->
let resp_body = `String ok in
Wm.continue true { rd with Wm.Rd.resp_body }
)
end
let v db routes =
let routes = List.map (fun r -> r, fun () -> new item db) routes in
let callback (_ch, _conn) request body =
let open Cohttp in
(Wm.dispatch' routes ~body ~request >|= function
| None -> (`Not_found, Header.init (), `String "Not found", [])
| Some result -> result)
>>= fun (status, headers, body, path) ->
Log.info (fun l ->
l "%d - %s %s"
(Code.code_of_status status)
(Code.string_of_method (Request.meth request))
(Uri.path (Request.uri request)));
Log.debug (fun l -> l "path=%a" Fmt.(Dump.list string) path);
(* Finally, send the response to the client *)
Cohttp_lwt_unix.Server.respond ~headers ~body ~status ()
let serve fd db ~routes =
let msgs = Queue.create () in
let cond = Lwt_condition.create () in
let rec listen () =
read_message fd >>= fun msg ->
Queue.add msg msgs;
Lwt_condition.signal cond ();
listen ()
in
(* create the server and handle requests with the function defined above *)
let conn_closed (_, conn) =
Log.info (fun l ->
l "connection %s closed\n%!" (Cohttp.Connection.to_string conn))
let rec process () =
Lwt_condition.wait cond >>= fun () ->
let msg = Queue.pop msgs in
(if List.mem msg.path routes then dispatch db msg
else (
Log.err (fun l -> l "%s is not an allowed path" msg.path);
Lwt.return_unit;
)) >>= fun () ->
process ()
in
Cohttp_lwt_unix.Server.make ~callback ~conn_closed ()
Lwt.pick [
listen ();
process ();
]
end
let int_of_fd (t:Lwt_unix.file_descr) =
(Obj.magic (Lwt_unix.unix_file_descr t): int)
let serve ~routes db fd =
let http = Dispatch.v db routes in
let on_exn e = Log.err (fun l -> l "ERROR: %a" Fmt.exn e) in
Lwt_unix.blocking fd >>= fun blocking ->
Log.debug (fun l ->
l "Serving the control state over fd:%d (blocking=%b)"
(int_of_fd fd) blocking
);
Cohttp_lwt_unix.Server.create ~on_exn ~mode:(`Fd fd) http
Dispatch.serve fd db ~routes

View File

@ -3,6 +3,29 @@
module KV: Irmin.KV with type contents = string
module Message: sig
(** The type for operations. *)
type operation =
| Write
| Read
| Delete
(** The type for control messages. *)
type t = {
operation: operation;
path : string;
payload : string option;
}
val write_message: Lwt_unix.file_descr -> t -> unit Lwt.t
(** [write_message fd t] writes a control message. *)
val read_message: Lwt_unix.file_descr -> t Lwt.t
(** [read_message fd] reads a control message. *)
end
val v: string -> KV.t Lwt.t
(** [v p] is the KV store storing the control state, located at path
[p] in the filesystem of the privileged container. *)

View File

@ -5,35 +5,6 @@ module Log = (val Logs.src_log src : Logs.LOG)
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
module IO = struct
let rec really_write fd buf off len =
match len with
| 0 -> Lwt.return_unit
| len ->
Lwt_unix.write fd buf off len >>= fun n ->
really_write fd buf (off+n) (len-n)
let rec really_read fd buf off len =
match len with
| 0 -> Lwt.return_unit
| len ->
Lwt_unix.read fd buf off len >>= fun n ->
really_write fd buf (off+n) (len-n)
let read_all fd =
let len = 16 * 1024 in
let buf = Bytes.create len in
let rec loop acc =
Lwt_unix.read fd buf 0 len >>= fun len ->
let res = String.sub buf 0 len in
loop (res :: acc)
in
loop [] >|= fun bufs ->
String.concat "" (List.rev bufs)
end
module Fd = struct
type t = {
@ -120,12 +91,6 @@ module Fd = struct
in
loop ()
let proxy x y =
Lwt.pick [
forward ~src:x ~dst:y;
forward ~src:y ~dst:x;
]
end
module Pipe = struct
@ -229,7 +194,7 @@ let exec_priv ~pid ~cmd ~net ~ctl ~handlers =
Fd.forward ~src:Pipe.(priv stderr) ~dst:Fd.stderr;
(* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *)
ctl ();
(* handlers (); *)
handlers ();
])
let run ~net ~ctl ~handlers cmd =

View File

@ -14,22 +14,6 @@
data, e.g. the IP address once a DHCP lease is obtained.}
}*)
module IO: sig
(** {IO helpers} *)
val really_write: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t
(** [really_write fd buf off len] writes exactly [len] bytes. *)
val really_read: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t
(** [really_read fd buf off len] reads exactly [len] bytes. *)
val read_all: Lwt_unix.file_descr -> string Lwt.t
(** [read_all fd] reads all the contents of [fd] bytes. *)
end
module Fd: sig
type t

View File

@ -2,9 +2,8 @@
(library
((name sdk)
(libraries (threads cohttp.lwt cstruct.lwt
cmdliner fmt.cli logs.fmt logs.cli fmt.tty decompress
irmin irmin-git irmin-http lwt.unix rawlink tuntap
(libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty
decompress irmin irmin-git lwt.unix rawlink tuntap dispatch
irmin-watcher inotify))
(preprocess (per_file ((pps (cstruct.ppx)) (ctl))))
))