From b5a3d4b2aa9dc0774bdd958b9b0351c15662e031 Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Wed, 29 Mar 2017 19:25:37 +0200 Subject: [PATCH] miragesdk: rework the control plane protocol Previously, the control plane was using HTTP client/server, that various people found way too complex to run in a privileged container (for very good reasons). So switching to a simpler binary protocol, using c-like structures. Will probably switch to an other serialization protocol later (eg. protobuf or cap-n-proto). Signed-off-by: Thomas Gazagnaire --- projects/miragesdk/src/sdk/IO.ml | 46 +++++++ projects/miragesdk/src/sdk/IO.mli | 13 ++ projects/miragesdk/src/sdk/ctl.ml | 192 ++++++++++++++++------------ projects/miragesdk/src/sdk/ctl.mli | 23 ++++ projects/miragesdk/src/sdk/init.ml | 37 +----- projects/miragesdk/src/sdk/init.mli | 16 --- projects/miragesdk/src/sdk/jbuild | 5 +- 7 files changed, 194 insertions(+), 138 deletions(-) create mode 100644 projects/miragesdk/src/sdk/IO.ml create mode 100644 projects/miragesdk/src/sdk/IO.mli diff --git a/projects/miragesdk/src/sdk/IO.ml b/projects/miragesdk/src/sdk/IO.ml new file mode 100644 index 000000000..2d9345729 --- /dev/null +++ b/projects/miragesdk/src/sdk/IO.ml @@ -0,0 +1,46 @@ +open Lwt.Infix + +let src = Logs.Src.create "IO" ~doc:"IO helpers" +module Log = (val Logs.src_log src : Logs.LOG) + +let rec really_write fd buf off len = + Log.debug (fun l -> l "really_write"); + match len with + | 0 -> Lwt.return_unit + | len -> + Lwt_unix.write fd buf off len >>= fun n -> + really_write fd buf (off+n) (len-n) + +let rec really_read fd buf off len = + Log.debug (fun l -> l "really_read"); + match len with + | 0 -> Lwt.return_unit + | len -> + Lwt_unix.read fd buf off len >>= fun n -> + really_read fd buf (off+n) (len-n) + +let read_all fd = + Log.debug (fun l -> l "read_all"); + let len = 16 * 1024 in + let buf = Bytes.create len in + let rec loop acc = + Lwt_unix.read fd buf 0 len >>= fun n -> + let acc = String.sub buf 0 n :: acc in + if n <= len then Lwt.return (List.rev acc) + else loop acc + in + loop [] >|= fun bufs -> + String.concat "" bufs + +let read_n fd len = + Log.debug (fun l -> l "read_n"); + let buf = Bytes.create len in + let rec loop acc len = + Lwt_unix.read fd buf 0 len >>= fun n -> + let acc = String.sub buf 0 n :: acc in + match len - n with + | 0 -> Lwt.return (List.rev acc) + | r -> loop acc r + in + loop [] len >|= fun bufs -> + String.concat "" bufs diff --git a/projects/miragesdk/src/sdk/IO.mli b/projects/miragesdk/src/sdk/IO.mli new file mode 100644 index 000000000..121ba33c7 --- /dev/null +++ b/projects/miragesdk/src/sdk/IO.mli @@ -0,0 +1,13 @@ +(** IO helpers *) + +val really_write: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t +(** [really_write fd buf off len] writes exactly [len] bytes to [fd]. *) + +val really_read: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t +(** [really_read fd buf off len] reads exactly [len] bytes from [fd]. *) + +val read_all: Lwt_unix.file_descr -> string Lwt.t +(** [read_all fd] reads as much data as it is available in [fd]. *) + +val read_n: Lwt_unix.file_descr -> int -> string Lwt.t +(** [read_n fd n] reads exactly [n] bytes from [fd]. *) diff --git a/projects/miragesdk/src/sdk/ctl.ml b/projects/miragesdk/src/sdk/ctl.ml index feadedf35..d561e067b 100644 --- a/projects/miragesdk/src/sdk/ctl.ml +++ b/projects/miragesdk/src/sdk/ctl.ml @@ -4,7 +4,7 @@ let src = Logs.Src.create "init" ~doc:"Init steps" module Log = (val Logs.src_log src : Logs.LOG) (* FIXME: to avoid linking with gmp *) -module IO = struct +module No_IO = struct type ic = unit type oc = unit type ctx = unit @@ -17,7 +17,7 @@ module IO = struct end (* FIXME: we don't use Irmin_unix.Git.FS.KV to avoid linking with gmp *) -module Store = Irmin_git.FS.KV(IO)(Inflator)(Io_fs) +module Store = Irmin_git.FS.KV(No_IO)(Inflator)(Io_fs) module KV = Store(Irmin.Contents.String) let v path = @@ -28,17 +28,84 @@ let v path = let () = Irmin.Private.Watch.set_listen_dir_hook Irmin_watcher.hook +module Message = struct + + [%%cenum + type operation = + | Write + | Read + | Delete + [@@uint8_t] + ] + + type t = { + operation: operation; + path : string; + payload : string option; + } + + [%%cstruct type message = { + operation : uint8_t; (* = type operation *) + path : uint16_t; + payload : uint16_t; + } [@@little_endian] + ] + + (* to avoid warning 32 *) + let _ = hexdump_message + let _ = operation_to_string + let _ = string_to_operation + + let read_message fd = + IO.read_n fd 4 >>= fun buf -> + let len = + Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0 + |> Int32.to_int + in + IO.read_n fd len >>= fun buf -> + let buf = Cstruct.of_string buf in + let operation = match int_to_operation (get_message_operation buf) with + | None -> failwith "invalid operation" + | Some o -> o + in + let path_len = get_message_path buf in + let payload_len = get_message_payload buf in + IO.read_n fd path_len >>= fun path -> + (match payload_len with + | 0 -> Lwt.return None + | n -> IO.read_n fd n >|= fun x -> Some x) + >|= fun payload -> + { operation; path; payload } + + let write_message fd msg = + let operation = operation_to_int msg.operation in + let path = String.length msg.path in + let payload = match msg.payload with + | None -> 0 + | Some x -> String.length x + in + let len = sizeof_message + path + payload in + let buf = Cstruct.create len in + set_message_operation buf operation; + set_message_path buf path; + set_message_payload buf path; + Cstruct.blit_from_bytes msg.path 0 buf sizeof_message path; + let () = match msg.payload with + | None -> () + | Some x -> Cstruct.blit_from_bytes x 0 buf (sizeof_message+path) payload + in + IO.really_write fd (Cstruct.to_string buf) 0 len + +end + module Dispatch = struct - module Wm = struct - module Rd = Webmachine.Rd - include Webmachine.Make(Cohttp_lwt_unix.Server.IO) - end + open Message - let with_key rd f = - match KV.Key.of_string rd.Wm.Rd.dispatch_path with - | Ok x -> f x - | Error _ -> Wm.respond 404 rd + let with_key msg f = + match KV.Key.of_string msg.path with + | Ok x -> f x + | Error (`Msg e) -> Fmt.kstrf Lwt.fail_with "invalid key: %s" e let infof fmt = Fmt.kstrf (fun msg () -> @@ -46,91 +113,50 @@ module Dispatch = struct Irmin.Info.v ~date ~author:"calf" msg ) fmt - let ok = "{\"status\": \"ok\"}" - - class item db = object(self) - - inherit [Cohttp_lwt_body.t] Wm.resource - - method private of_string rd = - Cohttp_lwt_body.to_string rd.Wm.Rd.req_body >>= fun value -> - with_key rd (fun key -> + let dispatch db msg = + with_key msg (fun key -> + match msg.operation with + | Write -> let info = infof "Updating %a" KV.Key.pp key in - KV.set db ~info key value >>= fun () -> - let resp_body = `String ok in - let rd = { rd with Wm.Rd.resp_body } in - Wm.continue true rd - ) + (match msg.payload with + | None -> Fmt.kstrf Lwt.fail_with "dispatch: missing payload" + | Some v -> KV.set db ~info key v) + | _ -> failwith "TODO" + ) - method private to_string rd = - with_key rd (fun key -> - KV.find db key >>= function - | Some value -> Wm.continue (`String value) rd - | None -> assert false - ) - - method resource_exists rd = - with_key rd (fun key -> - KV.mem db key >>= fun mem -> - Wm.continue mem rd - ) - - method allowed_methods rd = - Wm.continue [`GET; `HEAD; `PUT; `DELETE] rd - - method content_types_provided rd = - Wm.continue [ - "plain", self#to_string - ] rd - - method content_types_accepted rd = - Wm.continue [ - "plain", self#of_string - ] rd - - method delete_resource rd = - with_key rd (fun key -> - let info = infof "Deleting %a" KV.Key.pp key in - KV.remove db ~info key >>= fun () -> - let resp_body = `String ok in - Wm.continue true { rd with Wm.Rd.resp_body } - ) - end - - let v db routes = - let routes = List.map (fun r -> r, fun () -> new item db) routes in - let callback (_ch, _conn) request body = - let open Cohttp in - (Wm.dispatch' routes ~body ~request >|= function - | None -> (`Not_found, Header.init (), `String "Not found", []) - | Some result -> result) - >>= fun (status, headers, body, path) -> - Log.info (fun l -> - l "%d - %s %s" - (Code.code_of_status status) - (Code.string_of_method (Request.meth request)) - (Uri.path (Request.uri request))); - Log.debug (fun l -> l "path=%a" Fmt.(Dump.list string) path); - (* Finally, send the response to the client *) - Cohttp_lwt_unix.Server.respond ~headers ~body ~status () + let serve fd db ~routes = + let msgs = Queue.create () in + let cond = Lwt_condition.create () in + let rec listen () = + read_message fd >>= fun msg -> + Queue.add msg msgs; + Lwt_condition.signal cond (); + listen () in - (* create the server and handle requests with the function defined above *) - let conn_closed (_, conn) = - Log.info (fun l -> - l "connection %s closed\n%!" (Cohttp.Connection.to_string conn)) + let rec process () = + Lwt_condition.wait cond >>= fun () -> + let msg = Queue.pop msgs in + (if List.mem msg.path routes then dispatch db msg + else ( + Log.err (fun l -> l "%s is not an allowed path" msg.path); + Lwt.return_unit; + )) >>= fun () -> + process () in - Cohttp_lwt_unix.Server.make ~callback ~conn_closed () + Lwt.pick [ + listen (); + process (); + ] + end let int_of_fd (t:Lwt_unix.file_descr) = (Obj.magic (Lwt_unix.unix_file_descr t): int) let serve ~routes db fd = - let http = Dispatch.v db routes in - let on_exn e = Log.err (fun l -> l "ERROR: %a" Fmt.exn e) in Lwt_unix.blocking fd >>= fun blocking -> Log.debug (fun l -> l "Serving the control state over fd:%d (blocking=%b)" (int_of_fd fd) blocking ); - Cohttp_lwt_unix.Server.create ~on_exn ~mode:(`Fd fd) http + Dispatch.serve fd db ~routes diff --git a/projects/miragesdk/src/sdk/ctl.mli b/projects/miragesdk/src/sdk/ctl.mli index b88e4a7cf..5af101be8 100644 --- a/projects/miragesdk/src/sdk/ctl.mli +++ b/projects/miragesdk/src/sdk/ctl.mli @@ -3,6 +3,29 @@ module KV: Irmin.KV with type contents = string +module Message: sig + + (** The type for operations. *) + type operation = + | Write + | Read + | Delete + + (** The type for control messages. *) + type t = { + operation: operation; + path : string; + payload : string option; + } + + val write_message: Lwt_unix.file_descr -> t -> unit Lwt.t + (** [write_message fd t] writes a control message. *) + + val read_message: Lwt_unix.file_descr -> t Lwt.t + (** [read_message fd] reads a control message. *) + +end + val v: string -> KV.t Lwt.t (** [v p] is the KV store storing the control state, located at path [p] in the filesystem of the privileged container. *) diff --git a/projects/miragesdk/src/sdk/init.ml b/projects/miragesdk/src/sdk/init.ml index 6bd657361..b9e2cd7be 100644 --- a/projects/miragesdk/src/sdk/init.ml +++ b/projects/miragesdk/src/sdk/init.ml @@ -5,35 +5,6 @@ module Log = (val Logs.src_log src : Logs.LOG) let failf fmt = Fmt.kstrf Lwt.fail_with fmt -module IO = struct - - let rec really_write fd buf off len = - match len with - | 0 -> Lwt.return_unit - | len -> - Lwt_unix.write fd buf off len >>= fun n -> - really_write fd buf (off+n) (len-n) - - let rec really_read fd buf off len = - match len with - | 0 -> Lwt.return_unit - | len -> - Lwt_unix.read fd buf off len >>= fun n -> - really_write fd buf (off+n) (len-n) - - let read_all fd = - let len = 16 * 1024 in - let buf = Bytes.create len in - let rec loop acc = - Lwt_unix.read fd buf 0 len >>= fun len -> - let res = String.sub buf 0 len in - loop (res :: acc) - in - loop [] >|= fun bufs -> - String.concat "" (List.rev bufs) - -end - module Fd = struct type t = { @@ -120,12 +91,6 @@ module Fd = struct in loop () - let proxy x y = - Lwt.pick [ - forward ~src:x ~dst:y; - forward ~src:y ~dst:x; - ] - end module Pipe = struct @@ -229,7 +194,7 @@ let exec_priv ~pid ~cmd ~net ~ctl ~handlers = Fd.forward ~src:Pipe.(priv stderr) ~dst:Fd.stderr; (* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *) ctl (); -(* handlers (); *) + handlers (); ]) let run ~net ~ctl ~handlers cmd = diff --git a/projects/miragesdk/src/sdk/init.mli b/projects/miragesdk/src/sdk/init.mli index 3cc3108cc..fc50863cd 100644 --- a/projects/miragesdk/src/sdk/init.mli +++ b/projects/miragesdk/src/sdk/init.mli @@ -14,22 +14,6 @@ data, e.g. the IP address once a DHCP lease is obtained.} }*) - -module IO: sig - - (** {IO helpers} *) - - val really_write: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t - (** [really_write fd buf off len] writes exactly [len] bytes. *) - - val really_read: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t - (** [really_read fd buf off len] reads exactly [len] bytes. *) - - val read_all: Lwt_unix.file_descr -> string Lwt.t - (** [read_all fd] reads all the contents of [fd] bytes. *) - -end - module Fd: sig type t diff --git a/projects/miragesdk/src/sdk/jbuild b/projects/miragesdk/src/sdk/jbuild index a6e199b3a..53403e2ad 100644 --- a/projects/miragesdk/src/sdk/jbuild +++ b/projects/miragesdk/src/sdk/jbuild @@ -2,9 +2,8 @@ (library ((name sdk) - (libraries (threads cohttp.lwt cstruct.lwt - cmdliner fmt.cli logs.fmt logs.cli fmt.tty decompress - irmin irmin-git irmin-http lwt.unix rawlink tuntap + (libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty + decompress irmin irmin-git lwt.unix rawlink tuntap dispatch irmin-watcher inotify)) (preprocess (per_file ((pps (cstruct.ppx)) (ctl)))) ))