From a0546bba889525574b066fe17f49f01aea4cfaa9 Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Mon, 10 Apr 2017 22:35:44 +0200 Subject: [PATCH] miragesdk: use cap-n-proto instead of custom binary protocol for calf/priv API Signed-off-by: Thomas Gazagnaire --- projects/miragesdk/examples/mirage-dhcp.yml | 2 +- projects/miragesdk/src/Dockerfile.build | 22 +- projects/miragesdk/src/Dockerfile.dev | 22 +- projects/miragesdk/src/Makefile | 4 +- .../src/dhcp-client/calf/unikernel.ml | 11 +- projects/miragesdk/src/dhcp-client/main.ml | 6 +- projects/miragesdk/src/sdk/ctl.ml | 480 +++++++++--------- projects/miragesdk/src/sdk/ctl.mli | 157 +++--- projects/miragesdk/src/sdk/jbuild | 12 +- projects/miragesdk/src/sdk/proto.capnp | 19 + projects/miragesdk/src/test/test.ml | 111 ++-- 11 files changed, 435 insertions(+), 411 deletions(-) create mode 100644 projects/miragesdk/src/sdk/proto.capnp diff --git a/projects/miragesdk/examples/mirage-dhcp.yml b/projects/miragesdk/examples/mirage-dhcp.yml index 0e1d06db1..b159c620b 100644 --- a/projects/miragesdk/examples/mirage-dhcp.yml +++ b/projects/miragesdk/examples/mirage-dhcp.yml @@ -28,7 +28,7 @@ services: oomScoreAdj: -800 readonly: true - name: dhcp-client - image: mobylinux/dhcp-client:882ad65d1ef89a9a307b019c61f5f69301f59214 + image: mobylinux/dhcp-client:a7a6b49b0ff51ffa2f44ac848cd649e29f946e0c net: host capabilities: - CAP_NET_ADMIN # to bring eth0 up diff --git a/projects/miragesdk/src/Dockerfile.build b/projects/miragesdk/src/Dockerfile.build index 2fd882440..ee8017da3 100644 --- a/projects/miragesdk/src/Dockerfile.build +++ b/projects/miragesdk/src/Dockerfile.build @@ -1,20 +1,22 @@ FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 RUN cd /home/opam/opam-repository && git pull && opam update -u -# to be able to use cstruct.ppx + jbuilder -RUN opam pin add cstruct 2.4.0 -n +## pins for priv + # to bring eth0 up RUN opam pin add tuntap 1.0.0 -n -RUN opam pin add mirage-net-fd --dev -n +RUN opam pin add mirage-net-fd 0.2.0 -n -RUN opam depext -iy mirage-net-unix logs-syslog irmin-unix cohttp decompress -RUN opam depext -iy rawlink tuntap jbuilder irmin-watcher inotify rresult +## pins for calf -# TMP: to compile the calf -RUN opam pin add -n charrua-client https://github.com/yomimono/charrua-client.git#state-halfway -RUN opam depext -uiy ocamlfind topkg-care ocamlbuild lwt mirage-types-lwt mirage -RUN opam depext -uiy charrua-client cohttp conduit mirage-unix -RUN opam depext -uiy mirage-net-fd ptime mirage-logs +RUN opam pin add charrua-client https://github.com/yomimono/charrua-client.git#state-halfway -n + +## depdendencies + +RUN opam depext -iy \ + irmin-unix cohttp decompress rawlink tuntap jbuilder irmin-watcher inotify \ + rresult lwt capnp charrua-client mirage-net-fd ptime bos \ + mirage-flow-lwt mirage-channel-lwt mirage-types-lwt RUN sudo mkdir -p /src COPY ./sdk /src/sdk diff --git a/projects/miragesdk/src/Dockerfile.dev b/projects/miragesdk/src/Dockerfile.dev index 8e55e5084..651fe74f8 100644 --- a/projects/miragesdk/src/Dockerfile.dev +++ b/projects/miragesdk/src/Dockerfile.dev @@ -1,20 +1,22 @@ FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 RUN cd /home/opam/opam-repository && git pull && opam update -u -# to be able to use cstruct.ppx + jbuilder -RUN opam pin add cstruct 2.4.0 -n +## pins for priv + # to bring eth0 up RUN opam pin add tuntap 1.0.0 -n -RUN opam pin add mirage-net-fd --dev -n +RUN opam pin add mirage-net-fd 0.2.0 -n -RUN opam depext -iy mirage-net-unix logs-syslog irmin-unix cohttp decompress -RUN opam depext -iy rawlink tuntap jbuilder irmin-watcher inotify rresult +## pins for calf -# TMP: to compile the calf -RUN opam pin add -n charrua-client https://github.com/yomimono/charrua-client.git#state-halfway -RUN opam depext -uiy ocamlfind topkg-care ocamlbuild lwt mirage-types-lwt mirage -RUN opam depext -uiy charrua-client cohttp conduit mirage-unix -RUN opam depext -uiy mirage-net-fd ptime mirage-logs +RUN opam pin add charrua-client https://github.com/yomimono/charrua-client.git#state-halfway -n + +## depdendencies + +RUN opam depext -iy \ + irmin-unix cohttp decompress rawlink tuntap jbuilder irmin-watcher inotify \ + rresult lwt capnp charrua-client mirage-net-fd ptime bos \ + mirage-flow-lwt mirage-channel-lwt mirage-types-lwt RUN sudo mkdir -p /src /bin COPY . /src diff --git a/projects/miragesdk/src/Makefile b/projects/miragesdk/src/Makefile index 93dc3e834..4e13f2935 100644 --- a/projects/miragesdk/src/Makefile +++ b/projects/miragesdk/src/Makefile @@ -15,7 +15,7 @@ default: push @ .build: Dockerfile.build $(FILES) - docker build -t $(IMAGE):build -f Dockerfile.build -q . > .build || \ + docker build $(NO_CACHE) -t $(IMAGE):build -f Dockerfile.build -q . > .build || \ (rm -f $@ && exit 1) .pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) obj/config.json @@ -23,7 +23,7 @@ default: push (rm -f $@ && exit 1) .dev: Dockerfile.dev init-dev.sh - docker build -t $(IMAGE):dev -f Dockerfile.dev -q . > .dev || \ + docker build $(NO_CACHE) -t $(IMAGE):dev -f Dockerfile.dev -q . > .dev || \ (rm -f $@ && exit 1) enter-pkg: .pkg diff --git a/projects/miragesdk/src/dhcp-client/calf/unikernel.ml b/projects/miragesdk/src/dhcp-client/calf/unikernel.ml index 49fa75bae..322a854da 100644 --- a/projects/miragesdk/src/dhcp-client/calf/unikernel.ml +++ b/projects/miragesdk/src/dhcp-client/calf/unikernel.ml @@ -175,18 +175,21 @@ let setup_log = module Dhcp_client = Dhcp_client_mirage.Make(Time)(Net) +let pp_path = Fmt.(list ~sep:(unit "/") string) + let set_ip ctl k ip = let str = Ipaddr.V4.to_string ip ^ "\n" in Sdk.Ctl.Client.write ctl k str >>= function | Ok () -> Lwt.return_unit - | Error e -> failf "error while writing %s: %a" k Sdk.Ctl.Client.pp_error e + | Error e -> + failf "error while writing %a: %a" pp_path k Sdk.Ctl.Client.pp_error e let set_ip_opt ctl k = function | None -> Lwt.return_unit | Some ip -> set_ip ctl k ip let get_mac ctl = - Sdk.Ctl.Client.read ctl "/mac" >>= function + Sdk.Ctl.Client.read ctl ["mac"] >>= function | Ok None -> Lwt.return None | Ok Some s -> Lwt.return @@ Macaddr.of_string (String.trim s) | Error e -> failf "get_mac: %a" Sdk.Ctl.Client.pp_error e @@ -208,8 +211,8 @@ let start () dhcp_codes net ctl = Lwt_stream.last_new stream >>= fun result -> let result = of_ipv4_config result in Log.info (fun l -> l "found lease: %a" pp result); - set_ip ctl "/ip" result.address >>= fun () -> - set_ip_opt ctl "/gateway" result.gateway + set_ip ctl ["ip"] result.address >>= fun () -> + set_ip_opt ctl ["gateway"] result.gateway (* FIXME: Main end *) diff --git a/projects/miragesdk/src/dhcp-client/main.ml b/projects/miragesdk/src/dhcp-client/main.ml index 717da27b2..6ae838926 100644 --- a/projects/miragesdk/src/dhcp-client/main.ml +++ b/projects/miragesdk/src/dhcp-client/main.ml @@ -84,9 +84,9 @@ let run () cmd ethif path = in Lwt_main.run ( let routes = [ - "/ip" , [`Write]; - "/mac" , [`Read ]; - "/gateway", [`Write]; + ["ip"] , [`Write]; + ["mac"] , [`Read ]; + ["gateway"], [`Write]; ] in Ctl.v path >>= fun db -> let ctl fd = Ctl.Server.listen ~routes db fd in diff --git a/projects/miragesdk/src/sdk/ctl.ml b/projects/miragesdk/src/sdk/ctl.ml index bb911df4d..11a339137 100644 --- a/projects/miragesdk/src/sdk/ctl.ml +++ b/projects/miragesdk/src/sdk/ctl.ml @@ -1,11 +1,8 @@ open Lwt.Infix -open Astring let src = Logs.Src.create "init" ~doc:"Init steps" module Log = (val Logs.src_log src : Logs.LOG) -let failf fmt = Fmt.kstrf Lwt.fail_with fmt - (* FIXME: to avoid linking with gmp *) module No_IO = struct type ic = unit @@ -36,214 +33,199 @@ let () = module C = Mirage_channel_lwt.Make(IO) -module Query = struct +module P = Proto.Make(Capnp.BytesMessage) - (* FIXME: this should probably be replaced by protobuf *) +exception Undefined_field of int - [%%cenum - type operation = - | Write - | Read - | Delete - [@@uint8_t] - ] +module Endpoint = struct + + let compression = `None type t = { - version : int32; - id : int32; - operation: operation; - path : string; - payload : string; + output : IO.t; + input : C.t; (* reads are buffered *) + decoder: Capnp.Codecs.FramedStream.t; } - [%%cstruct type msg = { - version : uint32_t; (* protocol version *) - id : uint32_t; (* session identifier *) - operation : uint8_t; (* = type operation *) - path : uint16_t; - payload : uint32_t; - } [@@little_endian] + type error = [ + | `IO of IO.write_error + | `Channel of C.error + | `Msg of string + | `Undefined_field of int ] - type error = [ `Eof | `Msg of string ] - let pp_error ppf t = Fmt.string ppf (match t with `Eof -> "EOF" | `Msg s -> s) + let pp_error ppf (e:error) = match e with + | `IO e -> Fmt.pf ppf "IO: %a" IO.pp_write_error e + | `Channel e -> Fmt.pf ppf "channel: %a" C.pp_error e + | `Msg e -> Fmt.string ppf e + | `Undefined_field i -> Fmt.pf ppf "undefined field %d" i - (* to avoid warning 32 *) - let _ = hexdump_msg - let _ = string_to_operation + let err_io e = Error (`IO e) + let err_channel e = Error (`Channel e) + let err_msg fmt = Fmt.kstrf (fun s -> Error (`Msg s)) fmt + let err_frame = err_msg "Unsupported Cap'n'Proto frame received" + let err_undefined_field i = Error (`Undefined_field i) - let pp ppf t = - Fmt.pf ppf "%ld:%s:%S:%S" - t.id (operation_to_string t.operation) t.path t.payload + let v fd = + let output = fd in + let input = C.create fd in + let decoder = Capnp.Codecs.FramedStream.empty compression in + { output; input; decoder } - (* FIXME: allocate less ... *) + let send t msg = + let buf = Capnp.Codecs.serialize ~compression msg in + (* FIXME: avoid copying *) + IO.write t.output (Cstruct.of_string buf) >|= function + | Error e -> err_io e + | Ok () -> Ok () - let of_cstruct buf = - let open Rresult.R in - Log.debug (fun l -> l "Query.of_cstruct %S" @@ Cstruct.to_string buf); - let version = get_msg_version buf in - let id = get_msg_id buf in - (match int_to_operation (get_msg_operation buf) with - | None -> Error (`Msg "invalid operation") - | Some o -> Ok o) - >>= fun operation -> - let path_len = get_msg_path buf in - let payload_len = get_msg_payload buf in - let path = - Cstruct.sub buf sizeof_msg path_len - |> Cstruct.to_string - in - let payload = - Cstruct.sub buf (sizeof_msg + path_len) (Int32.to_int payload_len) - |> Cstruct.to_string - in - if String.Ascii.is_valid path then Ok { version; id; operation; path; payload } - else Error (`Msg "invalid path") - - let to_cstruct msg = - Log.debug (fun l -> l "Query.to_cstruct %a" pp msg); - let operation = operation_to_int msg.operation in - let path = String.length msg.path in - let payload = String.length msg.payload in - let len = sizeof_msg + path + payload in - let buf = Cstruct.create len in - set_msg_version buf msg.version; - set_msg_id buf msg.id; - set_msg_operation buf operation; - set_msg_path buf path; - set_msg_payload buf (Int32.of_int payload); - Cstruct.blit_from_bytes msg.path 0 buf sizeof_msg path; - Cstruct.blit_from_bytes msg.payload 0 buf (sizeof_msg+path) payload; - buf - - let err e = Lwt.return (Error (`Msg (Fmt.to_to_string C.pp_error e))) - let err_eof = Lwt.return (Error `Eof) - - let read fd = - let fd = C.create fd in - C.read_exactly fd ~len:4 >>= function - | Ok `Eof -> err_eof - | Error e -> err e - | Ok (`Data buf) -> - let buf = Cstruct.concat buf in - Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf); - let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in - C.read_exactly fd ~len >>= function - | Ok `Eof -> err_eof - | Error e -> err e - | Ok (`Data buf) -> - let buf = Cstruct.concat buf in - Lwt.return (of_cstruct buf) - - let write fd msg = - let buf = to_cstruct msg in - let len = - let len = Cstruct.create 4 in - Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf); - len - in - IO.write fd len >>= function - | Error e -> failf "Query.write(len) %a" IO.pp_write_error e - | Ok () -> IO.write fd buf >>= function - | Ok () -> Lwt.return_unit - | Error e -> failf "Query.write(buf) %a" IO.pp_write_error e + let rec recv t = + match Capnp.Codecs.FramedStream.get_next_frame t.decoder with + | Ok msg -> Lwt.return (Ok (`Data msg)) + | Error Capnp.Codecs.FramingError.Unsupported -> Lwt.return err_frame + | Error Capnp.Codecs.FramingError.Incomplete -> + Log.info (fun f -> f "Endpoint.recv: incomplete; waiting for more data"); + C.read_some ~len:4096 t.input >>= function + | Ok `Eof -> Lwt.return (Ok `Eof) + | Error e -> Lwt.return (err_channel e) + | Ok (`Data data) -> + (* FIXME: avoid copying *) + let data = Cstruct.to_string data in + Log.info (fun f -> f "Got %S" data); + Capnp.Codecs.FramedStream.add_fragment t.decoder data; + recv t end -module Reply = struct +module Request = struct - (* FIXME: this should probably be replaced by protobuf *) + type action = + | Write of string + | Read + | Delete - [%%cenum - type status = - | Ok - | Error - [@@uint8_t] - ] + let pp_action ppf = function + | Write s -> Fmt.pf ppf "write[%S]" s + | Read -> Fmt.pf ppf "read" + | Delete -> Fmt.pf ppf "delete" type t = { - id : int32; - status : status; - payload: string; + id : int32 Lazy.t; + path : string list Lazy.t; + action: action Lazy.t; } - [%%cstruct type msg = { - id : uint32_t; (* session identifier *) - status : uint8_t; (* = type operation *) - payload: uint32_t; - } [@@little_endian] - ] + let id t = Lazy.force t.id + let path t = Lazy.force t.path + let action t = Lazy.force t.action - type error = [ `Eof | `Msg of string ] - let pp_error ppf t = Fmt.string ppf (match t with `Eof -> "EOF" | `Msg s -> s) - - (* to avoid warning 32 *) - let _ = hexdump_msg - let _ = string_to_status + let pp_path = Fmt.(list ~sep:(unit "/") string) let pp ppf t = - Fmt.pf ppf "%ld:%s:%S" t.id (status_to_string t.status) t.payload + let id = id t and path = path t and action = action t in + match action with + | exception Undefined_field i -> Fmt.pf ppf "" i + | action -> Fmt.pf ppf "%ld:%a:%a" id pp_path path pp_action action - (* FIXME: allocate less ... *) + let equal x y = + id x = id y && path x = path y && match action x = action y with + | exception Undefined_field _ -> false + | b -> b - let of_cstruct buf = - let open Rresult.R in - Log.debug (fun l -> l "Message.of_cstruct %S" @@ Cstruct.to_string buf); - let id = get_msg_id buf in - (match int_to_status (get_msg_status buf) with - | None -> Error (`Msg "invalid operation") - | Some o -> Ok o) - >>= fun status -> - let payload_len = Int32.to_int (get_msg_payload buf) in - let payload = - Cstruct.sub buf sizeof_msg payload_len - |> Cstruct.to_string - in - Ok { id; status; payload } + let v ~id ~path action = + { id = lazy id; action = lazy action; path = lazy path } - let to_cstruct msg = - Log.debug (fun l -> l "Message.to_cstruct %a" pp msg); - let status = status_to_int msg.status in - let payload = String.length msg.payload in - let len = sizeof_msg + payload in - let buf = Cstruct.create len in - set_msg_id buf msg.id; - set_msg_status buf status; - set_msg_payload buf (Int32.of_int payload); - Cstruct.blit_from_bytes msg.payload 0 buf sizeof_msg payload; - buf + let read e: (t, Endpoint.error) result Lwt.t = + Endpoint.recv e >|= function + | Error e -> Error e + | Ok `Eof -> Error (`IO `Closed) + | Ok (`Data x) -> + let open P.Reader in + let msg = Request.of_message x in + let id = lazy (Request.id_get msg) in + let path = lazy (Request.path_get_list msg) in + let action = lazy (match Request.get msg with + | Request.Write x -> Write x + | Request.Read -> Read + | Request.Delete -> Delete + | Request.Undefined i -> raise (Undefined_field i) + ) in + Ok { id; path; action } - let err e = Lwt.return (Result.Error (`Msg (Fmt.to_to_string C.pp_error e))) - let err_eof = Lwt.return (Result.Error `Eof) + let write e t = + let open P.Builder in + match action t with + | exception Undefined_field i -> Lwt.return (Endpoint.err_undefined_field i) + | action -> + let msg = + let b = Request.init_root () in + Request.id_set b (id t); + ignore (Request.path_set_list b (path t)); + (match action with + | Write x -> Request.write_set b x + | Read -> Request.read_set b + | Delete -> Request.delete_set b); + b + in + Endpoint.send e (Request.to_message msg) - let read fd = - let fd = C.create fd in - C.read_exactly fd ~len:4 >>= function - | Ok `Eof -> err_eof - | Error e -> err e - | Ok (`Data buf) -> - let buf = Cstruct.concat buf in - Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf); - let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in - C.read_exactly fd ~len >>= function - | Ok `Eof -> err_eof - | Error e -> err e - | Ok (`Data buf) -> - let buf = Cstruct.concat buf in - Lwt.return (of_cstruct buf) +end - let write fd msg = - let buf = to_cstruct msg in - let len = - let len = Cstruct.create 4 in - Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf); - len - in - IO.write fd len >>= function - | Error e -> failf "Reply.write(len) %a" IO.pp_write_error e - | Ok () -> IO.write fd buf >>= function - | Ok () -> Lwt.return_unit - | Error e -> failf "Reply.write(buf) %a" IO.pp_write_error e +module Response = struct + + type status = (string, string) result + + let pp_status ppf = function + | Ok ok -> Fmt.pf ppf "ok:%S" ok + | Error e -> Fmt.pf ppf "error:%S" e + + type t = { + id : int32 Lazy.t; + status: status Lazy.t; + } + + let v ~id status = { id = lazy id; status = lazy status } + let id t = Lazy.force t.id + let status t = Lazy.force t.status + + let pp ppf t = match status t with + | exception Undefined_field i -> Fmt.pf ppf "" i + | s -> Fmt.pf ppf "%ld:%a" (id t) pp_status s + + let equal x y = + id x = id y && match status x = status y with + | exception Undefined_field _ -> false + | b -> b + + let read e: (t, Endpoint.error) result Lwt.t = + Endpoint.recv e >|= function + | Error e -> Error e + | Ok `Eof -> Error (`IO `Closed) + | Ok (`Data x) -> + let open P.Reader in + let msg = Response.of_message x in + let id = lazy (Response.id_get msg) in + let status = lazy (match Response.get msg with + | Response.Ok x -> Ok x + | Response.Error x -> Error x + | Response.Undefined i -> raise (Undefined_field i) + ) in + Ok { id; status } + + let write e t = + let open P.Builder in + match status t with + | exception Undefined_field i -> Lwt.return (Endpoint.err_undefined_field i) + | s -> + let msg = + let b = Response.init_root () in + Response.id_set b (id t); + (match s with + | Error s -> Response.error_set b s + | Ok s -> Response.ok_set b s); + b + in + Endpoint.send e (Response.to_message msg) end @@ -255,8 +237,6 @@ module Client = struct let n = ref 0l in fun () -> n := Int32.succ !n; !n - let version = 0l - type error = [`Msg of string] let pp_error ppf (`Msg s) = Fmt.string ppf s @@ -268,60 +248,64 @@ module Client = struct module Cache = Hashtbl.Make(K) type t = { - fd : IO.t; - replies: Reply.t Cache.t; + e : Endpoint.t; + replies: Response.t Cache.t; } - let v fd = { fd; replies = Cache.create 12 } + let v fd = { e = Endpoint.v fd; replies = Cache.create 12 } + let err e = Fmt.kstrf (fun e -> Error (`Msg e)) "%a" Endpoint.pp_error e - let call t query = - let id = query.Query.id in - Query.write t.fd query >>= fun () -> - let rec loop () = - try - let r = Cache.find t.replies id in - Cache.remove t.replies id; - Lwt.return r - with Not_found -> - Reply.read t.fd >>= function - | Error e -> - Log.err (fun l -> l "Got %a while waiting for a reply to %ld" - Query.pp_error e id); - loop () - | Ok r -> - if r.id = id then Lwt.return r - else ( - (* FIXME: maybe we want to check if id is not already - allocated *) - Cache.add t.replies r.id r; + let call t r = + let id = Request.id r in + Request.write t.e r >>= function + | Error e -> Lwt.return (err e) + | Ok () -> + let rec loop () = + try + let r = Cache.find t.replies id in + Cache.remove t.replies id; + Lwt.return r + with Not_found -> + Response.read t.e >>= function + | Error e -> + Log.err (fun l -> l "Got %a while waiting for a reply to %ld" + Endpoint.pp_error e id); loop () - ) - in - loop () >|= fun r -> - assert (r.Reply.id = id); - match r.Reply.status with - | Ok -> Ok r.Reply.payload - | Error -> Error (`Msg r.Reply.payload) + | Ok r -> + let rid = Response.id r in + if rid = id then Lwt.return r + else ( + (* FIXME: maybe we want to check if id is not already + allocated *) + Cache.add t.replies rid r; + loop () + ) + in + loop () >|= fun r -> + assert (Response.id r = id); + match Response.status r with + | Ok s -> Ok s + | Error s -> Error (`Msg s) - let query operation path payload = + let request path action = let id = new_id () in - { Query.version; id; operation; path; payload } + Request.v ~id ~path action let read t path = - call t (query Read path "") >|= function - | Ok x -> Ok (Some x) - | Error (`Msg e) -> - if e = err_not_found then Ok None - else Error (`Msg e) + call t (request path Read) >|= function + | Ok x -> Ok (Some x) + | Error e -> + if e = `Msg err_not_found then Ok None + else Error e let write t path v = - call t (query Write path v) >|= function + call t (request path @@ Write v) >|= function | Ok "" -> Ok () | Ok _ -> Error (`Msg "invalid return") | Error _ as e -> e let delete t path = - call t (query Delete path "") >|= function + call t (request path Delete) >|= function | Ok "" -> Ok () | Ok _ -> Error (`Msg "invalid return") | Error _ as e -> e @@ -332,17 +316,9 @@ module Server = struct type op = [ `Read | `Write | `Delete ] - let ok q payload = - { Reply.id = q.Query.id; status = Reply.Ok; payload } - - let error q payload = - { Reply.id = q.Query.id; status = Reply.Error; payload } - - let with_key q f = - match KV.Key.of_string q.Query.path with - | Ok x -> f x - | Error (`Msg e) -> - Fmt.kstrf (fun msg -> Lwt.return (error q msg)) "invalid key: %s" e + let ok q s = Response.v ~id:(Request.id q) (Ok s) + let error q s = Response.v ~id:(Request.id q) (Error s) + let with_key q f = f (Request.path q) let infof fmt = Fmt.kstrf (fun msg () -> @@ -351,18 +327,20 @@ module Server = struct ) fmt let not_allowed q = - let path = q.Query.path in - let err = Fmt.strf "%s is not an allowed path" path in - Log.err (fun l -> l "%ld: %s" q.Query.id path); + let path = Request.path q in + let err = Fmt.strf "%a is not an allowed path" Request.pp_path path in + Log.err (fun l -> l "%ld: %a" (Request.id q) Request.pp_path path); error q err let dispatch db op q = with_key q (fun key -> let can x = List.mem x op in - match q.Query.operation with - | Write when can `Write -> + match Request.action q with + | exception Undefined_field i -> + Fmt.kstrf (fun e -> Lwt.return (error q e)) "undefined field %i" i + | Write s when can `Write -> let info = infof "Updating %a" KV.Key.pp key in - KV.set db ~info key q.payload >|= fun () -> + KV.set db ~info key s >|= fun () -> ok q "" | Delete when can `Delete -> let info = infof "Removing %a" KV.Key.pp key in @@ -379,11 +357,14 @@ module Server = struct Log.debug (fun l -> l "Serving the control state over %a" IO.pp fd); let queries = Queue.create () in let cond = Lwt_condition.create () in + let e = Endpoint.v fd in let rec listen () = - Query.read fd >>= function - | Error `Eof -> Lwt.return_unit - | Error (`Msg e) -> - Log.err (fun l -> l "received invalid message: %s" e); + Request.read e >>= function + | Error (`Channel _ | `IO _ as e) -> + Log.err (fun l -> l "fatal error: %a" Endpoint.pp_error e); + Lwt.return_unit + | Error (`Msg _ | `Undefined_field _ as e) -> + Log.err (fun l -> l "transient error: %a" Endpoint.pp_error e); listen () | Ok q -> Queue.add q queries; @@ -393,15 +374,18 @@ module Server = struct let rec process () = Lwt_condition.wait cond >>= fun () -> let q = Queue.pop queries in - let path = q.Query.path in + let path = Request.path q in (if List.mem_assoc path routes then ( let op = List.assoc path routes in dispatch db op q >>= fun r -> - Reply.write fd r + Response.write e r ) else ( - Reply.write fd (not_allowed q) - )) >>= fun () -> - process () + Response.write e (not_allowed q) + )) >>= function + | Ok () -> process () + | Error e -> + Log.err (fun l -> l "%a" Endpoint.pp_error e); + process () in Lwt.pick [ listen (); diff --git a/projects/miragesdk/src/sdk/ctl.mli b/projects/miragesdk/src/sdk/ctl.mli index 92f80dcd5..896dfefdd 100644 --- a/projects/miragesdk/src/sdk/ctl.mli +++ b/projects/miragesdk/src/sdk/ctl.mli @@ -1,81 +1,102 @@ (** [Control] handle the server part of the control path, running in the privileged container. *) -module Query: sig - (** The type for operations. *) - type operation = - | Write - | Read - | Delete +exception Undefined_field of int - (** The type for control plane queries. *) - type t = { - version : int32; (** Protocol version. *) - id : int32; (** Session identifier. *) - operation: operation; - path : string; (** Should be only valid ASCII. *) - payload : string; (** Arbitrary payload. *) - } +module Endpoint: sig - type error = [ `Eof | `Msg of string ] - (** The type of errors. *) + type t + (** The type for SDK endpoints. *) - val pp_error: error Fmt.t - (** [pp_error] is the pretty-printer for query errors. *) + val v: IO.t ->t + (** [v f] is a fresh endpoint state built on top of the flow [f]. *) - val pp: t Fmt.t - (** [pp] is the pretty-printer for queries. *) - - val of_cstruct: Cstruct.t -> (t, [`Msg of string]) result - (** [of_cstruct buf] is the query [t] such that the serialization of - [t] is [buf]. *) - - val to_cstruct: t -> Cstruct.t - (** [to_cstruct t] is the serialization of [t]. *) - - val write: IO.flow -> t -> unit Lwt.t - (** [write fd t] writes a query message. *) - - val read: IO.flow -> (t, error) result Lwt.t - (** [read fd] reads a query message. *) - -end - -module Reply: sig - - (** The type for status. *) - type status = - | Ok - | Error - - (** The type for control plane replies. *) - type t = { - id : int32; (** Session identifier. *) - status : status; (** Status of the operation. *) - payload: string; (** Arbitrary payload. *) - } - - val pp: t Fmt.t - (** [pp] is the pretty-printer for replies. *) - - val of_cstruct: Cstruct.t -> (t, [`Msg of string]) result - (** [of_cstruct buf] is the reply [t] such that the serialization of - [t] is [buf]. *) - - val to_cstruct: t -> Cstruct.t - (** [to_cstruct t] is the serialization of [t]. *) - - type error = [`Eof | `Msg of string ] - (** The type for reply errors. *) + (** The type for endpoint errors. *) + type error = private [> + | `IO of IO.write_error + | `Msg of string + | `Undefined_field of int + ] val pp_error: error Fmt.t (** [pp_error] is the pretty-printer for errors. *) - val write: IO.flow -> t -> unit Lwt.t +end + +module Request: sig + + type t + (** The type for SDK requests. *) + + (** The type for request actions. *) + type action = + | Write of string + | Read + | Delete + + val pp: t Fmt.t + (** [pp] is the pretty-printer for requests. *) + + val equal: t -> t -> bool + (** [equal] is the equality function for requests. *) + + val pp_action: action Fmt.t + (** [pp_action] is the pretty-printer for request actions. *) + + val action: t -> action + (** [action t] is [t]'s requested operation. Can raise + [Endpoint.Undefined_field]. *) + + val path: t -> string list + (** [path t] is the [t]'s request path. *) + + val id: t -> int32 + (** [id t] it [t]'s request id. *) + + val v: id:int32 -> path:string list -> action -> t + (** [v ~id ~path action] is a new request. *) + + val write: Endpoint.t -> t -> (unit, Endpoint.error) result Lwt.t + (** [write e t] writes a request message for the + action [action] and the path [path] using the unique ID [id]. *) + + val read: Endpoint.t -> (t, Endpoint.error) result Lwt.t + (** [read e] reads a query message. *) + +end + +module Response: sig + + type t + (** The type for responses. *) + + (** The type for response status. *) + type status = (string, string) result + + val pp: t Fmt.t + (** [pp] is the pretty-printer for responses. *) + + val equal: t -> t -> bool + (** [equal] is the equality function for responses. *) + + val pp_status: status Fmt.t + (** [pp_status] is the pretty-printer for response statuses. *) + + val status: t -> status + (** [status t] is [t]'s response status. Can raise + [Endpoint.Undefined_field]. *) + + val id: t -> int32 + (** [id t] is [t]'s response ID. *) + + val v: id:int32 -> status -> t + (** [v ~id status] is a new response. *) + + val write: Endpoint.t -> t -> (unit, Endpoint.error) result Lwt.t (** [write fd t] writes a reply message. *) - val read: IO.flow -> (t, error) result Lwt.t + val read: Endpoint.t -> (t, Endpoint.error) result Lwt.t (** [read fd] reads a reply message. *) end @@ -103,16 +124,16 @@ module Client: sig server. A client state also stores some state for all the incomplete client queries. *) - val read: t -> string -> (string option, error) result Lwt.t + val read: t -> string list -> (string option, error) result Lwt.t (** [read t k] is the value associated with the key [k] in the control plane state. Return [None] if no value is associated to [k]. *) - val write: t -> string -> string -> (unit, error) result Lwt.t + val write: t -> string list -> string -> (unit, error) result Lwt.t (** [write t p v] associates [v] to the key [k] in the control plane state. *) - val delete: t -> string -> (unit, error) result Lwt.t + val delete: t -> string list -> (unit, error) result Lwt.t (** [delete t k] remove [k]'s binding in the control plane state. *) end @@ -129,7 +150,7 @@ module Server: sig type op = [ `Read | `Write | `Delete ] (** The type for operations to perform on routes. *) - val listen: routes:(string * op list) list -> KV.t -> IO.t -> unit Lwt.t + val listen: routes:(string list * op list) list -> KV.t -> IO.t -> unit Lwt.t (** [listen ~routes kv fd] is the thread exposing the KV store [kv], holding control plane state, running inside the privileged container. [routes] are the routes exposed by the server to the diff --git a/projects/miragesdk/src/sdk/jbuild b/projects/miragesdk/src/sdk/jbuild index 8fb46a101..ffaecc244 100644 --- a/projects/miragesdk/src/sdk/jbuild +++ b/projects/miragesdk/src/sdk/jbuild @@ -5,6 +5,14 @@ (libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty decompress irmin irmin-git lwt.unix rawlink tuntap dispatch irmin-watcher inotify astring rresult mirage-flow-lwt - mirage-channel-lwt io-page.unix ipaddr)) - (preprocess (per_file ((pps (cstruct.ppx)) (ctl)))) + mirage-channel-lwt io-page.unix ipaddr capnp)) )) + +(rule + ((targets (proto.ml proto.mli)) + (deps (proto.capnp)) + (action (progn + (run capnp compile -o ocaml ${<}) + (system "mv proto.ml proto.ml.in") + (system "echo '[@@@ocaml.warning \"-A\"]\n' > proto.ml") + (system "cat proto.ml.in >> proto.ml"))))) diff --git a/projects/miragesdk/src/sdk/proto.capnp b/projects/miragesdk/src/sdk/proto.capnp new file mode 100644 index 000000000..b204d1478 --- /dev/null +++ b/projects/miragesdk/src/sdk/proto.capnp @@ -0,0 +1,19 @@ +@0x9e83562906de8259; + +struct Request { + id @0 :Int32; + path @1 :List(Text); + union { + write @2 :Data; + read @3 :Void; + delete @4 :Void; + } +} + +struct Response { + id @0: Int32; + union { + ok @1 :Data; + error @2 :Data; + } +} diff --git a/projects/miragesdk/src/test/test.ml b/projects/miragesdk/src/test/test.ml index ac279b9ad..637148770 100644 --- a/projects/miragesdk/src/test/test.ml +++ b/projects/miragesdk/src/test/test.ml @@ -93,119 +93,107 @@ let test_socketpair pipe () = Lwt.return_unit -let query = Alcotest.testable Ctl.Query.pp (=) -let reply = Alcotest.testable Ctl.Reply.pp (=) +let request = Alcotest.testable Ctl.Request.pp Ctl.Request.equal +let response = Alcotest.testable Ctl.Response.pp Ctl.Response.equal let queries = - let open Ctl.Query in + let open Ctl.Request in [ - { version = 0l; id = 0l; operation = Read; path = "/foo/bar"; payload = "" }; - { version = Int32.max_int; id = Int32.max_int; operation = Write ; path = ""; payload = "foo" }; - { version = 1l;id = 0l; operation = Delete; path = ""; payload = "" }; - { version = -2l; id = -3l; operation = Delete; path = "foo"; payload = "foo" }; + v ~id:0l ~path:["foo";"bar"] Read; + v ~id:Int32.max_int ~path:[] (Write "foo"); + v ~id:0l ~path:[] Delete; + v ~id:(-3l) ~path:["foo"] Delete; ] let replies = - let open Ctl.Reply in + let open Ctl.Response in [ - { id = 0l; status = Ok; payload = "" }; - { id = Int32.max_int; status = Ok; payload = "foo" }; - { id = 0l; status = Error; payload = "" }; - { id = -3l; status = Error; payload = "foo" }; + v ~id:0l (Ok ""); + v ~id:Int32.max_int (Ok "foo"); + v ~id:0l (Error ""); + v ~id:(-3l) (Error "foo"); ] -let test_serialization to_cstruct of_cstruct message messages = - let test m = - let buf = to_cstruct m in - match of_cstruct buf with - | Ok m' -> Alcotest.(check message) "to_cstruct/of_cstruct" m m' - | Error (`Msg e) -> Alcotest.fail ("Message.of_cstruct: " ^ e) - in - List.iter test messages +let failf fmt = Fmt.kstrf Alcotest.fail fmt -let test_send t write read message pp_error messages = - let calf = calf Init.Pipe.(ctl t) in - let priv = priv Init.Pipe.(ctl t) in +let test_send t write read message messages = + let calf = Ctl.Endpoint.v @@ calf Init.Pipe.(ctl t) in + let priv = Ctl.Endpoint.v @@ priv Init.Pipe.(ctl t) in let test m = - write calf m >>= fun () -> - read priv >|= function - | Ok m' -> Alcotest.(check message) "write/read" m m' - | Error e -> Fmt.kstrf Alcotest.fail "Message.read: %a" pp_error e + write calf m >>= function + | Error e -> failf "Message.write: %a" Ctl.Endpoint.pp_error e + | Ok () -> + read priv >|= function + | Ok m' -> Alcotest.(check message) "write/read" m m' + | Error e -> failf "Message.read: %a" Ctl.Endpoint.pp_error e in Lwt_list.iter_s test messages -let test_query_serialization () = - let open Ctl.Query in - test_serialization to_cstruct of_cstruct query queries +let test_request_send t () = + let open Ctl.Request in + test_send t write read request queries -let test_reply_serialization () = - let open Ctl.Reply in - test_serialization to_cstruct of_cstruct reply replies - -let test_query_send t () = - let open Ctl.Query in - test_send t write read query pp_error queries - -let test_reply_send t () = - let open Ctl.Reply in - test_send t write read reply pp_error replies +let test_response_send t () = + let open Ctl.Response in + test_send t write read response replies let failf fmt = Fmt.kstrf Alcotest.fail fmt (* read ops *) let pp_error = Ctl.Client.pp_error +let pp_path = Fmt.(Dump.list string) let read_should_err t k = Ctl.Client.read t k >|= function | Error _ -> () - | Ok None -> failf "read(%s) -> got: none, expected: err" k - | Ok Some v -> failf "read(%s) -> got: found:%S, expected: err" k v + | Ok None -> failf "read(%a) -> got: none, expected: err" pp_path k + | Ok Some v -> failf "read(%a) -> got: found:%S, expected: err" pp_path k v let read_should_none t k = Ctl.Client.read t k >|= function - | Error e -> failf "read(%s) -> got: error:%a, expected none" k pp_error e + | Error e -> failf "read(%a) -> got: error:%a, expected none" pp_path k pp_error e | Ok None -> () - | Ok Some v -> failf "read(%s) -> got: found:%S, expected none" k v + | Ok Some v -> failf "read(%a) -> got: found:%S, expected none" pp_path k v let read_should_work t k v = Ctl.Client.read t k >|= function - | Error e -> failf "read(%s) -> got: error:%a, expected ok" k pp_error e - | Ok None -> failf "read(%s) -> got: none, expected ok" k + | Error e -> failf "read(%a) -> got: error:%a, expected ok" pp_path k pp_error e + | Ok None -> failf "read(%a) -> got: none, expected ok" pp_path k | Ok Some v' -> - if v <> v' then failf "read(%s) -> got: ok:%S, expected: ok:%S" k v' v + if v <> v' then failf "read(%a) -> got: ok:%S, expected: ok:%S" pp_path k v' v (* write ops *) let write_should_err t k v = Ctl.Client.write t k v >|= function - | Ok () -> failf "write(%s) -> ok" k + | Ok () -> failf "write(%a) -> ok" pp_path k | Error _ -> () let write_should_work t k v = Ctl.Client.write t k v >|= function | Ok () -> () - | Error e -> failf "write(%s) -> error: %a" k pp_error e + | Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e (* del ops *) let delete_should_err t k = Ctl.Client.delete t k >|= function - | Ok () -> failf "del(%s) -> ok" k + | Ok () -> failf "del(%a) -> ok" pp_path k | Error _ -> () let delete_should_work t k = Ctl.Client.delete t k >|= function | Ok () -> () - | Error e -> failf "write(%s) -> error: %a" k pp_error e + | Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e let test_ctl t () = let calf = calf Init.Pipe.(ctl t) in let priv = priv Init.Pipe.(ctl t) in - let k1 = "/foo/bar" in - let k2 = "a" in - let k3 = "b/c" in - let k4 = "xxxxxx" in + let k1 = ["foo"; "bar"] in + let k2 = ["a"] in + let k3 = ["b"; "c"] in + let k4 = ["xxxxxx"] in let all = [`Read; `Write; `Delete] in let routes = [k1,all; k2,all; k3,all ] in let git_root = "/tmp/sdk/ctl" in @@ -215,12 +203,11 @@ let test_ctl t () = let client () = let t = Ctl.Client.v calf in let allowed k v = - delete_should_work t k >>= fun () -> + delete_should_work t k >>= fun () -> read_should_none t k >>= fun () -> write_should_work t k v >>= fun () -> read_should_work t k v >>= fun () -> - let path = String.cuts ~empty:false ~sep:"/" k in - Ctl.KV.get ctl path >|= fun v' -> + Ctl.KV.get ctl k >|= fun v' -> Alcotest.(check string) "in the db" v v' in let disallowed k v = @@ -281,10 +268,8 @@ let test = [ "stdout is a pipe" , `Quick, run (test_pipe Init.Pipe.(stderr t)); "net is a socket pair", `Quick, run (test_socketpair Init.Pipe.(net t)); "ctl is a socket pair", `Quick, run (test_socketpair Init.Pipe.(ctl t)); - "seralize queries" , `Quick, test_query_serialization; - "seralize replies" , `Quick, test_reply_serialization; - "send queries" , `Quick, run (test_query_send t); - "send replies" , `Quick, run (test_reply_send t); + "send requests" , `Quick, run (test_request_send t); + "send responses" , `Quick, run (test_response_send t); "ctl" , `Quick, run (test_ctl t); "exec" , `Quick, run test_exec; ]