From 3cec2b1f5e10af02176f1d3002abe69a8b30de0e Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Thu, 6 Apr 2017 11:07:44 +0200 Subject: [PATCH] miragesdk: refactor the SDK Expose a non-unix dependent flow-like API, so it is easier to test/use in a unikernel. Signed-off-by: Thomas Gazagnaire --- projects/miragesdk/examples/mirage-dhcp.yml | 3 +- projects/miragesdk/src/Dockerfile.build | 3 +- projects/miragesdk/src/Dockerfile.pkg | 2 +- projects/miragesdk/src/Makefile | 24 +- .../src/dhcp-client/calf/unikernel.ml | 10 +- projects/miragesdk/src/dhcp-client/main.ml | 30 +-- projects/miragesdk/src/sdk/IO.ml | 95 ++++---- projects/miragesdk/src/sdk/IO.mli | 25 +- projects/miragesdk/src/sdk/ctl.ml | 92 +++++--- projects/miragesdk/src/sdk/ctl.mli | 12 +- projects/miragesdk/src/sdk/init.ml | 215 ++++++++++++------ projects/miragesdk/src/sdk/init.mli | 39 ++-- projects/miragesdk/src/sdk/jbuild | 3 +- projects/miragesdk/src/test/test.ml | 54 +++-- 14 files changed, 367 insertions(+), 240 deletions(-) diff --git a/projects/miragesdk/examples/mirage-dhcp.yml b/projects/miragesdk/examples/mirage-dhcp.yml index a843fc8d1..71b7f522a 100644 --- a/projects/miragesdk/examples/mirage-dhcp.yml +++ b/projects/miragesdk/examples/mirage-dhcp.yml @@ -24,13 +24,14 @@ daemon: oomScoreAdj: -800 readonly: true - name: dhcp-client - image: "mobylinux/dhcp-client:99ecd3304172eb7570aa5c7f527cec2577b48a84" + image: "mobylinux/dhcp-client:6478d616909eee58bfda46ba742b5b286965fb03" net: host capabilities: - CAP_NET_ADMIN # to bring eth0 up - CAP_NET_RAW # to read /dev/eth0 binds: - /var/run/dhcp-client:/data + - /usr/bin/runc:/usr/bin/runc - /sbin:/sbin # for ifconfig - /bin:/bin # for ifconfig - /lib:/lib # for ifconfig diff --git a/projects/miragesdk/src/Dockerfile.build b/projects/miragesdk/src/Dockerfile.build index 556f09aa8..8b8a38015 100644 --- a/projects/miragesdk/src/Dockerfile.build +++ b/projects/miragesdk/src/Dockerfile.build @@ -26,4 +26,5 @@ RUN opam config exec -- jbuilder build dhcp-client/main.exe RUN sudo cp /src/_build/default/dhcp-client/main.exe /dhcp-client RUN opam config exec -- jbuilder build dhcp-client/calf/unikernel.exe -RUN sudo cp /src/_build/default/dhcp-client/calf/unikernel.exe /dhcp-client-calf +RUN sudo mkdir -p /calf +RUN sudo cp /src/_build/default/dhcp-client/calf/unikernel.exe /calf/dhcp-client-calf diff --git a/projects/miragesdk/src/Dockerfile.pkg b/projects/miragesdk/src/Dockerfile.pkg index d1f06492e..8ec2f56f2 100644 --- a/projects/miragesdk/src/Dockerfile.pkg +++ b/projects/miragesdk/src/Dockerfile.pkg @@ -1,4 +1,4 @@ #FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 FROM scratch COPY obj ./ -CMD ["/dhcp-client"] +CMD ["/dhcp-client", "-vv"] diff --git a/projects/miragesdk/src/Makefile b/projects/miragesdk/src/Makefile index a7cd8a225..93dc3e834 100644 --- a/projects/miragesdk/src/Makefile +++ b/projects/miragesdk/src/Makefile @@ -7,7 +7,7 @@ IMAGE=dhcp-client OBJS=obj/dhcp-client MIRAGE_COMPILE=mobylinux/mirage-compile:f903b0e1b4328271364cc63f123ac49d56739cef@sha256:a54d9ca84d3f5998dba92ce83d60d49289cee8908a8b0f6ec280d30ab8edf46c -CALF_OBJS=obj/dhcp-client-calf +CALF_OBJS=obj/calf/dhcp-client-calf CALF_FILES=dhcp-client/calf/config.ml dhcp-client/calf/unikernel.ml \ dhcp-client/calf/jbuild @@ -18,8 +18,8 @@ default: push docker build -t $(IMAGE):build -f Dockerfile.build -q . > .build || \ (rm -f $@ && exit 1) -.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) - docker build -t $(IMAGE):pkg -f Dockerfile.pkg -q . > .pkg || \ +.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) obj/config.json + docker build --no-cache -t $(IMAGE):pkg -f Dockerfile.pkg -q . > .pkg || \ (rm -f $@ && exit 1) .dev: Dockerfile.dev init-dev.sh @@ -43,13 +43,23 @@ enter-dev: .dev # tar xf - || exit 1) && \ # touch $@ -$(OBJS) $(CALF_OBJS): .build $(FILES) $(CALF_FILES) - mkdir -p obj/usr/lib obj/bin +$(OBJS): .build $(FILES) + mkdir -p obj/calf ( cd obj && \ - docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(OBJS:obj/%=/%) $(CALF_OBJS:obj/%=/%) | tar xf - ) && \ + docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(OBJS:obj/%=/%) | tar xf - ) && \ touch $@ -hash: Makefile Dockerfile.build Dockerfile.pkg $(FILES) $(CALF_FILES) .build +$(CALF_OBJS): .build $(CALF_FILES) + mkdir -p obj/calf + ( cd obj && \ + docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(CALF_OBJS:obj/%=/%) | tar xf - ) && \ + touch $@ + +obj/config.json: dhcp-client/calf/config.json + mkdir -p obj/calf + cp $^ $@ + +hash: Makefile Dockerfile.build Dockerfile.pkg $(FILES) $(CALF_FILES) .build obj/config.json { cat $^; \ docker run --rm --entrypoint sh $(IMAGE):build -c 'cat /lib/apk/db/installed'; \ docker run --rm --entrypoint sh $(IMAGE):build -c 'opam list'; } \ diff --git a/projects/miragesdk/src/dhcp-client/calf/unikernel.ml b/projects/miragesdk/src/dhcp-client/calf/unikernel.ml index 4bf52574b..b023a2970 100644 --- a/projects/miragesdk/src/dhcp-client/calf/unikernel.ml +++ b/projects/miragesdk/src/dhcp-client/calf/unikernel.ml @@ -187,7 +187,6 @@ let set_ip_opt ctl k = function let start () dhcp_codes net ctl = Netif_fd.connect net >>= fun net -> - let ctl = Sdk.Ctl.Client.v (Lwt_unix.of_unix_file_descr ctl) in let requests = match dhcp_codes with | [] -> default_options | l -> @@ -206,12 +205,15 @@ let start () dhcp_codes net ctl = set_ip_opt ctl "/gateway" result.gateway (* FIXME: Main end *) -let magic (x: int) = (Obj.magic x: Unix.file_descr) + +let fd (x: int) = (Obj.magic x: Unix.file_descr) + +let flow (x: int) = Sdk.Init.file_descr (Lwt_unix.of_unix_file_descr @@ fd x) let start () dhcp_codes net ctl = Lwt_main.run ( - let net = magic net in - let ctl = magic ctl in + let net = fd net in + let ctl = Sdk.Ctl.Client.v (flow ctl) in start () dhcp_codes net ctl ) diff --git a/projects/miragesdk/src/dhcp-client/main.ml b/projects/miragesdk/src/dhcp-client/main.ml index 3a72c2429..fe4feae0b 100644 --- a/projects/miragesdk/src/dhcp-client/main.ml +++ b/projects/miragesdk/src/dhcp-client/main.ml @@ -51,31 +51,26 @@ module Handlers = struct gateway; ] - let watch ~ethif path = - Ctl.v path >>= fun db -> + let watch ~ethif db = Lwt_list.map_p (fun f -> f db) (handlers ethif) >>= fun _ -> let t, _ = Lwt.task () in t end -external bpf_filter: unit -> string = "bpf_filter" +external dhcp_filter: unit -> string = "bpf_filter" let t = Init.Pipe.v () -let ctl = string_of_int Init.(Fd.to_int Pipe.(calf @@ ctl t)) -let net = string_of_int Init.(Fd.to_int Pipe.(calf @@ net t)) let default_cmd = [ - "/dhcp-client-calf"; "--ctl="^ctl; "--net="^net + "/calf/dhcp-client-calf"; "--net=3"; "--ctl=4"; "-vv"; ] -(* FIXME: use runc isolation - let default_cmd = [ - "/usr/bin/runc"; "--"; "run"; - "--bundle"; "/containers/images/000-dhcp-client"; - "dhcp-client" - ] in - *) +(* +let default_cmd = [ + "/usr/bin/runc"; "run"; "--preserve-fds"; "2"; "--bundle"; "."; "dhcp-client" +] +*) let read_cmd file = if Sys.file_exists file then @@ -91,7 +86,6 @@ let read_cmd file = | Some f -> read_cmd f in Lwt_main.run ( - let net = Init.rawlink ~filter:(bpf_filter ()) ethif in let routes = [ "/ip"; "/gateway"; @@ -100,10 +94,10 @@ let read_cmd file = "/mtu"; "/nameservers/*" ] in - Ctl.v "/data" >>= fun ctl -> - let fd = Init.(Fd.fd @@ Pipe.(priv @@ ctl t)) in - let ctl () = Ctl.Server.listen ~routes ctl fd in - let handlers () = Handlers.watch ~ethif path in + Ctl.v "/data" >>= fun db -> + let ctl fd = Ctl.Server.listen ~routes db fd in + let handlers () = Handlers.watch ~ethif db in + let net = Init.rawlink ~filter:(dhcp_filter ()) ethif in Init.run t ~net ~ctl ~handlers cmd ) diff --git a/projects/miragesdk/src/sdk/IO.ml b/projects/miragesdk/src/sdk/IO.ml index 44c318344..4e8a2d023 100644 --- a/projects/miragesdk/src/sdk/IO.ml +++ b/projects/miragesdk/src/sdk/IO.ml @@ -3,52 +3,61 @@ open Lwt.Infix let src = Logs.Src.create "IO" ~doc:"IO helpers" module Log = (val Logs.src_log src : Logs.LOG) -let rec really_write fd buf off len = - match len with - | 0 -> Lwt.return_unit - | len -> - Log.debug (fun l -> l "really_write off=%d len=%d" off len); - Lwt_unix.write fd buf off len >>= fun n -> - if n = 0 then Lwt.fail_with "write 0" - else really_write fd buf (off+n) (len-n) +(* from mirage-conduit. FIXME: move to mirage-flow *) +type 'a io = 'a Lwt.t +type buffer = Cstruct.t +type error = [`Msg of string] +type write_error = [ Mirage_flow.write_error | error ] +let pp_error ppf (`Msg s) = Fmt.string ppf s -let write fd buf = really_write fd buf 0 (String.length buf) +let pp_write_error ppf = function + | #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e + | #error as e -> pp_error ppf e -let rec really_read fd buf off len = - match len with - | 0 -> Lwt.return_unit - | len -> - Log.debug (fun l -> l "really_read off=%d len=%d" off len); - Lwt_unix.read fd buf off len >>= fun n -> - if n = 0 then Lwt.fail_with "read 0" - else really_read fd buf (off+n) (len-n) +type flow = + | Flow: string + * (module Mirage_flow_lwt.CONCRETE with type flow = 'a) + * 'a + -> flow -let read_all fd = - Log.debug (fun l -> l "read_all"); - let len = 16 * 1024 in - let buf = Bytes.create len in - let rec loop acc = - Lwt_unix.read fd buf 0 len >>= fun n -> - if n = 0 then Lwt.fail_with "read 0" - else - let acc = String.sub buf 0 n :: acc in - if n <= len then Lwt.return (List.rev acc) - else loop acc +let create (type a) (module M: Mirage_flow_lwt.S with type flow = a) t name = + let m = + (module Mirage_flow_lwt.Concrete(M): + Mirage_flow_lwt.CONCRETE with type flow = a) in - loop [] >|= fun bufs -> - String.concat "" bufs + Flow (name, m , t) -let read_n fd len = - Log.debug (fun l -> l "read_n len=%d" len); - let buf = Bytes.create len in - let rec loop acc len = - Lwt_unix.read fd buf 0 len >>= fun n -> - if n = 0 then Lwt.fail_with "read 0" - else - let acc = String.sub buf 0 n :: acc in - match len - n with - | 0 -> Lwt.return (List.rev acc) - | r -> loop acc r +let read (Flow (_, (module F), flow)) = F.read flow +let write (Flow (_, (module F), flow)) b = F.write flow b +let writev (Flow (_, (module F), flow)) b = F.writev flow b +let close (Flow (_, (module F), flow)) = F.close flow +let pp ppf (Flow (name, _, _)) = Fmt.string ppf name + +type t = flow + +let forward ~src ~dst = + let rec loop () = + read src >>= function + | Ok `Eof -> + Log.err (fun l -> l "forward[%a => %a] EOF" pp src pp dst); + Lwt.return_unit + | Error e -> + Log.err (fun l -> l "forward[%a => %a] %a" pp src pp dst pp_error e); + Lwt.return_unit + | Ok (`Data buf) -> + Log.debug (fun l -> l "forward[%a => %a] %a" + pp src pp dst Cstruct.hexdump_pp buf); + write dst buf >>= function + | Ok () -> loop () + | Error e -> + Log.err (fun l -> l "forward[%a => %a] %a" + pp src pp dst pp_write_error e); + Lwt.return_unit in - loop [] len >|= fun bufs -> - String.concat "" bufs + loop () + +let proxy f1 f2 = + Lwt.join [ + forward ~src:f1 ~dst:f2; + forward ~src:f2 ~dst:f1; + ] diff --git a/projects/miragesdk/src/sdk/IO.mli b/projects/miragesdk/src/sdk/IO.mli index b07954d18..2883e34d2 100644 --- a/projects/miragesdk/src/sdk/IO.mli +++ b/projects/miragesdk/src/sdk/IO.mli @@ -1,16 +1,21 @@ (** IO helpers *) -val really_write: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t -(** [really_write fd buf off len] writes exactly [len] bytes to [fd]. *) +type t +(** The type for IO flows *) -val write: Lwt_unix.file_descr -> string -> unit Lwt.t -(** [write fd buf] writes all the buffer [buf] in [fd]. *) +include Mirage_flow_lwt.S with type flow = t -val really_read: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t -(** [really_read fd buf off len] reads exactly [len] bytes from [fd]. *) +val create: (module Mirage_flow_lwt.S with type flow = 'a) -> 'a -> string -> flow +(** [create (module M) t name] is the flow representing [t] using the + function defined in [M]. *) -val read_all: Lwt_unix.file_descr -> string Lwt.t -(** [read_all fd] reads as much data as it is available in [fd]. *) +val pp: flow Fmt.t +(** [pp] is the pretty-printer for IO flows. *) -val read_n: Lwt_unix.file_descr -> int -> string Lwt.t -(** [read_n fd n] reads exactly [n] bytes from [fd]. *) +val forward: src:t -> dst:t -> unit Lwt.t +(** [forward ~src ~dst] forwards writes from [src] to [dst]. Block + until either [src] or [dst] is closed. *) + +val proxy: t -> t -> unit Lwt.t +(** [proxy x y] is the same as [forward x y <*> forward y x]. Block + until both flows are closed. *) diff --git a/projects/miragesdk/src/sdk/ctl.ml b/projects/miragesdk/src/sdk/ctl.ml index d097664bd..2d18f23f9 100644 --- a/projects/miragesdk/src/sdk/ctl.ml +++ b/projects/miragesdk/src/sdk/ctl.ml @@ -4,6 +4,8 @@ open Astring let src = Logs.Src.create "init" ~doc:"Init steps" module Log = (val Logs.src_log src : Logs.LOG) +let failf fmt = Fmt.kstrf Lwt.fail_with fmt + (* FIXME: to avoid linking with gmp *) module No_IO = struct type ic = unit @@ -32,6 +34,8 @@ let () = (* FIXME: inotify need some unknown massaging. *) (* Irmin_watcher.hook *) +module C = Mirage_channel_lwt.Make(IO) + module Query = struct (* FIXME: this should probably be replaced by protobuf *) @@ -109,25 +113,37 @@ module Query = struct Cstruct.blit_from_bytes msg.payload 0 buf (sizeof_msg+path) payload; buf + let err e = Lwt.return (Error (`Msg (Fmt.to_to_string C.pp_error e))) + let err_eof = Lwt.return (Error (`Msg "EOF")) + let read fd = - IO.read_n fd 4 >>= fun buf -> - Log.debug (fun l -> l "Message.read len=%S" buf); - let len = - Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0 - |> Int32.to_int - in - IO.read_n fd len >|= fun buf -> - of_cstruct (Cstruct.of_string buf) + let fd = C.create fd in + C.read_exactly fd ~len:4 >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf); + let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in + C.read_exactly fd ~len >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Lwt.return (of_cstruct buf) let write fd msg = - let buf = to_cstruct msg |> Cstruct.to_string in + let buf = to_cstruct msg in let len = let len = Cstruct.create 4 in - Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ String.length buf); - Cstruct.to_string len + Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf); + len in - IO.write fd len >>= fun () -> - IO.write fd buf + IO.write fd len >>= function + | Error e -> failf "Query.write(len) %a" IO.pp_write_error e + | Ok () -> IO.write fd buf >>= function + | Ok () -> Lwt.return_unit + | Error e -> failf "Query.write(buf) %a" IO.pp_write_error e end @@ -191,25 +207,37 @@ module Reply = struct Cstruct.blit_from_bytes msg.payload 0 buf sizeof_msg payload; buf + let err e = Lwt.return (Result.Error (`Msg (Fmt.to_to_string C.pp_error e))) + let err_eof = Lwt.return (Result.Error (`Msg "EOF")) + let read fd = - IO.read_n fd 4 >>= fun buf -> - Log.debug (fun l -> l "Message.read len=%S" buf); - let len = - Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0 - |> Int32.to_int - in - IO.read_n fd len >|= fun buf -> - of_cstruct (Cstruct.of_string buf) + let fd = C.create fd in + C.read_exactly fd ~len:4 >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf); + let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in + C.read_exactly fd ~len >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Lwt.return (of_cstruct buf) let write fd msg = - let buf = to_cstruct msg |> Cstruct.to_string in + let buf = to_cstruct msg in let len = let len = Cstruct.create 4 in - Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ String.length buf); - Cstruct.to_string len + Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf); + len in - IO.write fd len >>= fun () -> - IO.write fd buf + IO.write fd len >>= function + | Error e -> failf "Reply.write(len) %a" IO.pp_write_error e + | Ok () -> IO.write fd buf >>= function + | Ok () -> Lwt.return_unit + | Error e -> failf "Reply.write(buf) %a" IO.pp_write_error e end @@ -231,7 +259,7 @@ module Client = struct module Cache = Hashtbl.Make(K) type t = { - fd : Lwt_unix.file_descr; + fd : IO.t; replies: Reply.t Cache.t; } @@ -327,16 +355,8 @@ module Server = struct | Some v -> ok q v ) - - let int_of_fd (t:Lwt_unix.file_descr) = - (Obj.magic (Lwt_unix.unix_file_descr t): int) - let listen ~routes db fd = - Lwt_unix.blocking fd >>= fun blocking -> - Log.debug (fun l -> - l "Serving the control state over fd:%d (blocking=%b)" - (int_of_fd fd) blocking - ); + Log.debug (fun l -> l "Serving the control state over %a" IO.pp fd); let queries = Queue.create () in let cond = Lwt_condition.create () in let rec listen () = diff --git a/projects/miragesdk/src/sdk/ctl.mli b/projects/miragesdk/src/sdk/ctl.mli index 0dad5aa8e..2472e9897 100644 --- a/projects/miragesdk/src/sdk/ctl.mli +++ b/projects/miragesdk/src/sdk/ctl.mli @@ -28,10 +28,10 @@ module Query: sig val to_cstruct: t -> Cstruct.t (** [to_cstruct t] is the serialization of [t]. *) - val write: Lwt_unix.file_descr -> t -> unit Lwt.t + val write: IO.flow -> t -> unit Lwt.t (** [write fd t] writes a query message. *) - val read: Lwt_unix.file_descr -> (t, [`Msg of string]) result Lwt.t + val read: IO.flow -> (t, [`Msg of string]) result Lwt.t (** [read fd] reads a query message. *) end @@ -60,10 +60,10 @@ module Reply: sig val to_cstruct: t -> Cstruct.t (** [to_cstruct t] is the serialization of [t]. *) - val write: Lwt_unix.file_descr -> t -> unit Lwt.t + val write: IO.flow -> t -> unit Lwt.t (** [write fd t] writes a reply message. *) - val read: Lwt_unix.file_descr -> (t, [`Msg of string]) result Lwt.t + val read: IO.flow -> (t, [`Msg of string]) result Lwt.t (** [read fd] reads a reply message. *) end @@ -80,7 +80,7 @@ module Client: sig type t (** The type for client state. *) - val v: Lwt_unix.file_descr -> t + val v: IO.t -> t (** [v fd] is the client state using [fd] to send requests to the server. A client state also stores some state for all the incomplete client queries. *) @@ -108,7 +108,7 @@ val v: string -> KV.t Lwt.t module Server: sig - val listen: routes:string list -> KV.t -> Lwt_unix.file_descr -> unit Lwt.t + val listen: routes:string list -> KV.t -> IO.t -> unit Lwt.t (** [listen ~routes kv fd] is the thread exposing the KV store [kv], holding control plane state, running inside the privileged container. [routes] are the routes exposed by the server to the diff --git a/projects/miragesdk/src/sdk/init.ml b/projects/miragesdk/src/sdk/init.ml index 5f6559893..6817c1e20 100644 --- a/projects/miragesdk/src/sdk/init.ml +++ b/projects/miragesdk/src/sdk/init.ml @@ -5,6 +5,127 @@ module Log = (val Logs.src_log src : Logs.LOG) let failf fmt = Fmt.kstrf Lwt.fail_with fmt +let pp_fd ppf (t:Lwt_unix.file_descr) = + Fmt.int ppf (Obj.magic (Lwt_unix.unix_file_descr t): int) + +let rec really_write fd buf off len = + match len with + | 0 -> Lwt.return_unit + | len -> + Log.debug (fun l -> l "really_write %a off=%d len=%d" pp_fd fd off len); + Lwt_unix.write fd buf off len >>= fun n -> + if n = 0 then Lwt.fail_with "write 0" + else really_write fd buf (off+n) (len-n) + +let write_all fd buf = really_write fd buf 0 (String.length buf) + +let read_all fd = + Log.debug (fun l -> l "read_all %a" pp_fd fd); + let len = 16 * 1024 in + let buf = Bytes.create len in + let rec loop acc = + Lwt_unix.read fd buf 0 len >>= fun n -> + if n = 0 then failf "read %a: 0" pp_fd fd + else + let acc = String.sub buf 0 n :: acc in + if n <= len then Lwt.return (List.rev acc) + else loop acc + in + loop [] >|= fun bufs -> + String.concat "" bufs + +module Flow = struct + + (* build a flow from Lwt_unix.file_descr *) + module Fd: Mirage_flow_lwt.CONCRETE with type flow = Lwt_unix.file_descr = struct + type 'a io = 'a Lwt.t + type buffer = Cstruct.t + type error = [`Msg of string] + type write_error = [ Mirage_flow.write_error | error ] + + let pp_error ppf (`Msg s) = Fmt.string ppf s + + let pp_write_error ppf = function + | #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e + | #error as e -> pp_error ppf e + + type flow = Lwt_unix.file_descr + + let err e = Lwt.return (Error (`Msg (Printexc.to_string e))) + + let read t = + Lwt.catch (fun () -> + read_all t >|= fun buf -> Ok (`Data (Cstruct.of_string buf)) + ) (function Failure _ -> Lwt.return (Ok `Eof) | e -> err e) + + let write t b = + Lwt.catch (fun () -> + write_all t (Cstruct.to_string b) >|= fun () -> Ok () + ) (fun e -> err e) + + let close t = Lwt_unix.close t + + let writev t bs = + Lwt.catch (fun () -> + Lwt_list.iter_s (fun b -> write_all t (Cstruct.to_string b)) bs + >|= fun () -> Ok () + ) (fun e -> err e) + end + + (* build a flow from rawlink *) + module Rawlink: Mirage_flow_lwt.CONCRETE with type flow = Lwt_rawlink.t = struct + type 'a io = 'a Lwt.t + type buffer = Cstruct.t + type error = [`Msg of string] + type write_error = [ Mirage_flow.write_error | error ] + + let pp_error ppf (`Msg s) = Fmt.string ppf s + + let pp_write_error ppf = function + | #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e + | #error as e -> pp_error ppf e + + type flow = Lwt_rawlink.t + + let err e = Lwt.return (Error (`Msg (Printexc.to_string e))) + + let read t = + Lwt.catch (fun () -> + Lwt_rawlink.read_packet t >|= fun buf -> Ok (`Data buf) + ) (function Failure _ -> Lwt.return (Ok `Eof) | e -> err e) + + let write t b = + Lwt.catch (fun () -> + Lwt_rawlink.send_packet t b >|= fun () -> Ok () + ) (fun e -> err e) + + let close t = Lwt_rawlink.close_link t + + let writev t bs = + Lwt.catch (fun () -> + Lwt_list.iter_s (Lwt_rawlink.send_packet t) bs >|= fun () -> Ok () + ) (fun e -> err e) + end + + let int_of_fd t = + (Obj.magic (Lwt_unix.unix_file_descr t): int) + + let fd ?name t = + IO.create (module Fd) t (match name with + | None -> string_of_int (int_of_fd t) + | Some n -> n) + +end + +let file_descr ?name t = Flow.fd ?name t + +let rawlink ?filter ethif = + Log.debug (fun l -> l "bringing up %s" ethif); + (try Tuntap.set_up_and_running ethif + with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); + let t = Lwt_rawlink.open_link ?filter ethif in + IO.create (module Flow.Rawlink) t ethif + module Fd = struct type t = { @@ -12,11 +133,14 @@ module Fd = struct fd : Lwt_unix.file_descr; } - let fd t = t.fd let stdout = { name = "stdout"; fd = Lwt_unix.stdout } let stderr = { name = "stderr"; fd = Lwt_unix.stderr } let stdin = { name = "stdin" ; fd = Lwt_unix.stdin } + let of_int name (i:int) = + let fd = Lwt_unix.of_unix_file_descr (Obj.magic i: Unix.file_descr) in + { name; fd } + let to_int t = (Obj.magic (Lwt_unix.unix_file_descr t.fd): int) @@ -41,53 +165,7 @@ module Fd = struct Lwt_unix.dup2 src.fd dst.fd; close src - let proxy_net ~net fd = - Log.debug (fun l -> l "proxy-net eth0 <=> %a" pp fd); - let rec listen_rawlink () = - Lwt_rawlink.read_packet net >>= fun buf -> - Log.debug (fun l -> l "PROXY-NET: => %a" Cstruct.hexdump_pp buf); - Log.debug (fun l -> l "PROXY-NET: => %S" (Cstruct.to_string buf)); - let rec write buf = - Lwt_cstruct.write fd.fd buf >>= function - | 0 -> Lwt.return_unit - | n -> write (Cstruct.shift buf n) - in - write buf >>= fun () -> - listen_rawlink () - in - let listen_socket () = - let len = 16 * 1024 in - let buf = Cstruct.create len in - let rec loop () = - Lwt_cstruct.read fd.fd buf >>= fun len -> - let buf = Cstruct.sub buf 0 len in - Log.debug (fun l -> l "PROXY-NET: <= %a" Cstruct.hexdump_pp buf); - Lwt_rawlink.send_packet net buf >>= fun () -> - loop () - in - loop () - in - Lwt.pick [ - listen_rawlink (); - listen_socket (); - ] - - let forward ~src ~dst = - Log.debug (fun l -> l "forward %a => %a" pp src pp dst); - let len = 16 * 1024 in - let buf = Bytes.create len in - let rec loop () = - Lwt_unix.read src.fd buf 0 len >>= fun len -> - if len = 0 then Lwt.return_unit (* EOF *) - else ( - Log.debug (fun l -> - l "FORWARD[%a => %a]: %S (%d)" - pp src pp dst (Bytes.sub buf 0 len) len); - IO.really_write dst.fd buf 0 len >>= fun () -> - loop () - ) - in - loop () + let flow t = Flow.fd t.fd ~name:(Fmt.to_to_string pp t) end @@ -130,10 +208,10 @@ module Pipe = struct (* logs pipe *) let stdout = pipe "stdout" in let stderr = pipe "stderr" in - (* store pipe *) - let ctl = socketpair "ctl" in (* network pipe *) let net = socketpair "net" in + (* store pipe *) + let ctl = socketpair "ctl" in (* metrics pipe *) let metrics = pipe "metrics" in { stdout; stderr; ctl; net; metrics } @@ -141,6 +219,7 @@ module Pipe = struct end let exec_calf t cmd = + Log.info (fun l -> l "child pid is %d" Unix.(getpid ())); Fd.(redirect_to_dev_null stdin) >>= fun () -> (* close parent fds *) @@ -152,26 +231,22 @@ let exec_calf t cmd = let cmds = String.concat " " cmd in - let calf_net = Pipe.(calf t.net) in - let calf_ctl = Pipe.(calf t.ctl) in - let calf_stdout = Pipe.(calf t.stdout) in - let calf_stderr = Pipe.(calf t.stderr) in + let calf_stdout = Fd.of_int "stdout" 1 in + let calf_stderr = Fd.of_int "stderr" 2 in + let calf_net = Fd.of_int "net" 3 in + let calf_ctl = Fd.of_int "ctl" 4 in Log.info (fun l -> l "Executing %s" cmds); - Log.debug (fun l -> l "net-fd=%a store-fd=%a" Fd.pp calf_net Fd.pp calf_ctl); - Fd.dup2 ~src:calf_stdout ~dst:Fd.stdout >>= fun () -> - Fd.dup2 ~src:calf_stderr ~dst:Fd.stderr >>= fun () -> + (* Move all open fds at the top *) + Fd.dup2 ~src:Pipe.(calf t.stdout) ~dst:calf_stdout >>= fun () -> + Fd.dup2 ~src:Pipe.(calf t.stderr) ~dst:calf_stderr >>= fun () -> + Fd.dup2 ~src:Pipe.(calf t.net) ~dst:calf_net >>= fun () -> + Fd.dup2 ~src:Pipe.(calf t.ctl) ~dst:calf_ctl >>= fun () -> (* exec the calf *) Unix.execve (List.hd cmd) (Array.of_list cmd) [||] -let rawlink ?filter ethif = - Log.debug (fun l -> l "bringing up %s" ethif); - (try Tuntap.set_up_and_running ethif - with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); - Lwt_rawlink.open_link ?filter ethif - let check_exit_status cmd status = let cmds = String.concat " " cmd in match status with @@ -191,6 +266,12 @@ let exec_priv t ~pid ~cmd ~net ~ctl ~handlers = Fd.close Pipe.(calf t.ctl) >>= fun () -> Fd.close Pipe.(calf t.metrics) >>= fun () -> + + let priv_net = Fd.flow Pipe.(priv t.net) in + let priv_ctl = Fd.flow Pipe.(priv t.ctl) in + let priv_stdout = Fd.flow Pipe.(priv t.stdout) in + let priv_stderr = Fd.flow Pipe.(priv t.stderr) in + let wait () = Lwt_unix.waitpid [] pid >>= fun (_pid, w) -> Lwt_io.flush_all () >>= fun () -> @@ -200,13 +281,13 @@ let exec_priv t ~pid ~cmd ~net ~ctl ~handlers = Lwt.pick ([ wait (); (* data *) - Fd.proxy_net ~net Pipe.(priv t.net); + IO.proxy net priv_net; (* redirect the calf stdout to the shim stdout *) - Fd.forward ~src:Pipe.(priv t.stdout) ~dst:Fd.stdout; - Fd.forward ~src:Pipe.(priv t.stderr) ~dst:Fd.stderr; + IO.forward ~src:priv_stdout ~dst:Fd.(flow stdout); + IO.forward ~src:priv_stderr ~dst:Fd.(flow stderr); (* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *) - ctl (); + ctl priv_ctl; handlers (); ]) diff --git a/projects/miragesdk/src/sdk/init.mli b/projects/miragesdk/src/sdk/init.mli index 4733b43b5..5ec269ec0 100644 --- a/projects/miragesdk/src/sdk/init.mli +++ b/projects/miragesdk/src/sdk/init.mli @@ -14,6 +14,7 @@ data, e.g. the IP address once a DHCP lease is obtained.} }*) + module Fd: sig type t @@ -22,12 +23,6 @@ module Fd: sig val pp: t Fmt.t (** [pp_fd] pretty prints a file descriptor. *) - val fd: t -> Lwt_unix.file_descr - (** [fd t] is [t]'s underlying unix file descriptor. *) - - val to_int: t -> int -(** [to_int fd] is [fd]'s number. *) - val redirect_to_dev_null: t -> unit Lwt.t (** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *) @@ -37,13 +32,6 @@ module Fd: sig val dup2: src:t -> dst:t -> unit Lwt.t (** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *) - val proxy_net: net:Lwt_rawlink.t -> t -> unit Lwt.t - (** [proxy_net ~net fd] proxies the traffic between the raw net link - [net] and [fd]. *) - - val forward: src:t -> dst:t -> unit Lwt.t - (** [forward ~src ~dst] forwards the flow from [src] to [dst]. *) - (** {1 Usefull File Descriptors} *) val stdin: t @@ -55,8 +43,14 @@ module Fd: sig val stderr: t (** [stderr] is the standard error. *) + val flow: t -> IO.t + (** [flow t] is the flow representing [t]. *) + end +val file_descr: ?name:string -> Lwt_unix.file_descr -> IO.t +(** [file_descr ?name fd] is the flow for the file-descripor [fd]. *) + module Pipe: sig type t @@ -102,18 +96,21 @@ module Pipe: sig end -val rawlink: ?filter:string -> string -> Lwt_rawlink.t -(** [rawlink ?filter i] is the net raw link to the interface [i] using - the (optional) BPF filter [filter]. *) +val rawlink: ?filter:string -> string -> IO.t +(** [rawlink ?filter x] is the flow using the network interface + [x]. The packets can be filtered using the BPF filter + [filter]. See the documentation of + {{:https://github.com/haesbaert/rawlink}rawlink} for more details + on how to build that filter. *) +(* FIXME(samoht): not very happy with that signatue *) val run: Pipe.monitor -> - net:Lwt_rawlink.t -> - ctl:(unit -> unit Lwt.t) -> + net:IO.t -> + ctl:(IO.t -> unit Lwt.t) -> handlers:(unit -> unit Lwt.t) -> string list -> unit Lwt.t (** [run m ~net ~ctl ~handlers cmd] runs [cmd] in a unprivileged calf - process. [ctl] is the control thread connected to the {Pipe.ctl} - pipe. [net] is the net raw link which will be connected to the - calf via the {!Pipe.net} socket pair. [handlers] are the system + process. [net] is the network interface flow. [ctl] is the control + thread connected to the {Pipe.ctl} pipe. [handlers] are the system handler thread which will react to control data to perform privileged system actions. *) diff --git a/projects/miragesdk/src/sdk/jbuild b/projects/miragesdk/src/sdk/jbuild index e357caa71..12c0bce23 100644 --- a/projects/miragesdk/src/sdk/jbuild +++ b/projects/miragesdk/src/sdk/jbuild @@ -4,6 +4,7 @@ ((name sdk) (libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty decompress irmin irmin-git lwt.unix rawlink tuntap dispatch - irmin-watcher inotify astring rresult)) + irmin-watcher inotify astring rresult mirage-flow-lwt + mirage-channel-lwt io-page.unix)) (preprocess (per_file ((pps (cstruct.ppx)) (ctl)))) )) diff --git a/projects/miragesdk/src/test/test.ml b/projects/miragesdk/src/test/test.ml index c03543ef3..7af003631 100644 --- a/projects/miragesdk/src/test/test.ml +++ b/projects/miragesdk/src/test/test.ml @@ -8,44 +8,50 @@ let random_string n = (* workaround https://github.com/mirage/alcotest/issues/88 *) exception Check_error of string -let check_raises msg exn f = +let check_raises msg f = Lwt.catch (fun () -> f () >>= fun () -> Lwt.fail (Check_error msg) ) (function - | Check_error e -> Alcotest.fail e - | e -> - if exn e then Lwt.return_unit - else Fmt.kstrf Alcotest.fail "%s raised %a" msg Fmt.exn e) - -let is_unix_error = function - | Unix.Unix_error _ -> true - | _ -> false + | Check_error e -> Alcotest.fail e + | _ -> Lwt.return_unit + ) let escape = String.Ascii.escape let write fd strs = Lwt_list.iter_s (fun str -> - IO.really_write fd str 0 (String.length str) + IO.write fd (Cstruct.of_string str) >>= function + | Ok () -> Lwt.return_unit + | Error e -> Fmt.kstrf Lwt.fail_with "write: %a" IO.pp_write_error e ) strs +let read fd = + IO.read fd >>= function + | Ok (`Data x) -> Lwt.return (Cstruct.to_string x) + | Ok `Eof -> Lwt.fail_with "read: EOF" + | Error e -> Fmt.kstrf Lwt.fail_with "read: %a" IO.pp_error e + +let calf pipe = Init.(Fd.flow Pipe.(calf pipe)) +let priv pipe = Init.(Fd.flow Pipe.(priv pipe)) + let test_pipe pipe () = - let calf = Init.Fd.fd @@ Init.Pipe.(calf pipe) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv pipe) in + let calf = calf pipe in + let priv = priv pipe in let name = Init.Pipe.name pipe in let test strs = let escape_strs = String.concat ~sep:"" @@ List.map escape strs in (* pipes are unidirectional *) (* calf -> priv works *) write calf strs >>= fun () -> - IO.read_all priv >>= fun buf -> + read priv >>= fun buf -> let msg = Fmt.strf "%s: calf -> priv" name in Alcotest.(check string) msg escape_strs (escape buf); (* priv -> calf don't *) - check_raises (Fmt.strf "%s: priv side is writable!" name) is_unix_error + check_raises (Fmt.strf "%s: priv side is writable!" name) (fun () -> write priv strs) >>= fun () -> - check_raises (Fmt.strf "%s: calf sid is readable!" name) is_unix_error - (fun () -> IO.read_all calf >|= ignore) >>= fun () -> + check_raises (Fmt.strf "%s: calf sid is readable!" name) + (fun () -> read calf >|= ignore) >>= fun () -> Lwt.return_unit in test [random_string 1] >>= fun () -> @@ -56,19 +62,19 @@ let test_pipe pipe () = Lwt.return_unit let test_socketpair pipe () = - let calf = Init.Fd.fd @@ Init.Pipe.(calf pipe) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv pipe) in + let calf = calf pipe in + let priv = priv pipe in let name = Init.Pipe.name pipe in let test strs = let escape_strs = String.concat ~sep:"" @@ List.map escape strs in (* socket pairs are bi-directional *) (* calf -> priv works *) write calf strs >>= fun () -> - IO.read_all priv >>= fun buf -> + read priv >>= fun buf -> Alcotest.(check string) (name ^ " calf -> priv") escape_strs (escape buf); (* priv -> cal works *) write priv strs >>= fun () -> - IO.read_all calf >>= fun buf -> + read calf >>= fun buf -> Alcotest.(check string) (name ^ " priv -> calf") escape_strs (escape buf); Lwt.return_unit in @@ -118,8 +124,8 @@ let test_serialization to_cstruct of_cstruct message messages = List.iter test messages let test_send t write read message messages = - let calf = Init.Fd.fd @@ Init.Pipe.(calf @@ ctl t) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv @@ ctl t) in + let calf = Init.(Fd.flow Pipe.(calf @@ ctl t)) in + let priv = Init.(Fd.flow Pipe.(priv @@ ctl t)) in let test m = write calf m >>= fun () -> read priv >|= function @@ -192,8 +198,8 @@ let delete_should_work t k = | Error (`Msg e) -> failf "write(%s) -> error: %s" k e let test_ctl t () = - let calf = Init.Fd.fd @@ Init.Pipe.(calf @@ ctl t) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv @@ ctl t) in + let calf = Init.(Fd.flow Init.Pipe.(calf @@ ctl t)) in + let priv = Init.(Fd.flow Init.Pipe.(priv @@ ctl t)) in let k1 = "/foo/bar" in let k2 = "a" in let k3 = "b/c" in