diff --git a/projects/miragesdk/examples/mirage-dhcp.yml b/projects/miragesdk/examples/mirage-dhcp.yml index 349339866..7a7fc4528 100644 --- a/projects/miragesdk/examples/mirage-dhcp.yml +++ b/projects/miragesdk/examples/mirage-dhcp.yml @@ -1,7 +1,7 @@ kernel: image: "mobylinux/kernel:4.9.x" cmdline: "console=ttyS0 page_poison=1" -init: "mobylinux/init:3024f1eaf8779691229d661791607aade4df855d" +init: "mobylinux/init:5770b8f1c72d3b9da43951d4ce3b53d473e3dc8b" system: - name: sysctl image: "mobylinux/sysctl:2cf2f9d5b4d314ba1bfc22b2fe931924af666d8c" @@ -24,14 +24,16 @@ daemon: oomScoreAdj: -800 readonly: true - name: dhcp-client + image: "mobylinux/dhcp-client:30fddee3bf4a17945065dffb6f6bbef0d729f60d@sha256:32d5d93a96253928268ddd03a900f9404afb6267d60ba1e551859ca92d35663c" net: host - image: "mobylinux/dhcp-client:f40cafe2ade4b115704750a85d21eb35b1116b91" capabilities: - CAP_NET_ADMIN # to bring eth0 up - CAP_NET_RAW # to read /dev/eth0 binds: - /var/run/dhcp-client:/data - command: [/dhcp-client, -vv] + - /sbin:/sbin # for ifconfig + - /bin:/bin # for ifconfig + - /lib:/lib # for ifconfig readonly: true files: - path: /var/run/dhcp-client/README diff --git a/projects/miragesdk/pkg/init/Makefile b/projects/miragesdk/pkg/init/Makefile index cc0353dca..363940806 100644 --- a/projects/miragesdk/pkg/init/Makefile +++ b/projects/miragesdk/pkg/init/Makefile @@ -1,7 +1,7 @@ CONTAINERD_IMAGE=mobylinux/containerd:a688df6aee1e3700eb8d54dbc81070361df397a2@sha256:59ee3da05fe4dad4fbecff582c86fc30ce75e19a225eeeb07e203c9cc36fe34f CONTAINERD_BINARIES=usr/bin/containerd usr/bin/containerd-shim usr/bin/ctr usr/bin/dist -RUNC_IMAGE=mobylinux/runc:94c54debf9a3ebb6d31311bdddb881ea07486dcd@sha256:13cabc1017c6448498e74bae9892ebc9dbad9e5d68f7df6b3855a15522e3a86b +RUNC_IMAGE=mobylinux/runc:45884ad2bfad045cbf35f2b78b4c82f75fb19854@sha256:d7c4576405f2318d329f538f847927018d4e8497d7968bd3323ff047e2ffe257 RUNC_BINARY=usr/bin/runc C_COMPILE=mobylinux/c-compile:81a6bd8ff45d769b60a2ee1acdaccda11ab835c8@sha256:eac250997a3b9784d3285a03c0c8311d4ca6fb63dc75164c987411ba93006487 @@ -9,11 +9,11 @@ START_STOP_DAEMON=sbin/start-stop-daemon default: push -$(RUNC_BINARY): +$(RUNC_BINARY): Makefile mkdir -p $(dir $@) docker run --rm --net=none $(RUNC_IMAGE) tar cf - $@ | tar xf - -$(CONTAINERD_BINARIES): +$(CONTAINERD_BINARIES): Makefile mkdir -p $(dir $@) docker run --rm --net=none $(CONTAINERD_IMAGE) tar cf - $@ | tar xf - diff --git a/projects/miragesdk/pkg/init/init b/projects/miragesdk/pkg/init/init index 46c52bceb..f27b647b0 100755 --- a/projects/miragesdk/pkg/init/init +++ b/projects/miragesdk/pkg/init/init @@ -14,9 +14,10 @@ setup_console() { line="-L" term="vt100" ;; - tty0) - # skip current console - return 0 + tty?) + line="" + speed="38400" + term="" ;; esac # skip consoles already in inittab diff --git a/projects/miragesdk/src/Dockerfile.build b/projects/miragesdk/src/Dockerfile.build index fe1d25f83..8b8a38015 100644 --- a/projects/miragesdk/src/Dockerfile.build +++ b/projects/miragesdk/src/Dockerfile.build @@ -1,13 +1,11 @@ FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 RUN git -C /home/opam/opam-repository pull origin master && opam update -u -RUN opam pin -n add conduit https://github.com/samoht/ocaml-conduit.git#fd -RUN opam pin -n add mirage-net-unix https://github.com/samoht/mirage-net-unix.git#fd +RUN opam info mirage-net-fd RUN opam depext -iy mirage-net-unix logs-syslog irmin-unix cohttp decompress RUN opam depext -iy rawlink tuntap.1.0.0 jbuilder irmin-watcher inotify RUN opam install rresult -RUN opam pin add cstruct --dev # for ppx/jbuilder RUN opam pin add tuntap 1.0.0 # TMP: to compile the calf @@ -28,4 +26,5 @@ RUN opam config exec -- jbuilder build dhcp-client/main.exe RUN sudo cp /src/_build/default/dhcp-client/main.exe /dhcp-client RUN opam config exec -- jbuilder build dhcp-client/calf/unikernel.exe -RUN sudo cp /src/_build/default/dhcp-client/calf/unikernel.exe /dhcp-client-calf +RUN sudo mkdir -p /calf +RUN sudo cp /src/_build/default/dhcp-client/calf/unikernel.exe /calf/dhcp-client-calf diff --git a/projects/miragesdk/src/Dockerfile.pkg b/projects/miragesdk/src/Dockerfile.pkg index d1f06492e..8ec2f56f2 100644 --- a/projects/miragesdk/src/Dockerfile.pkg +++ b/projects/miragesdk/src/Dockerfile.pkg @@ -1,4 +1,4 @@ #FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 FROM scratch COPY obj ./ -CMD ["/dhcp-client"] +CMD ["/dhcp-client", "-vv"] diff --git a/projects/miragesdk/src/Makefile b/projects/miragesdk/src/Makefile index a7cd8a225..93dc3e834 100644 --- a/projects/miragesdk/src/Makefile +++ b/projects/miragesdk/src/Makefile @@ -7,7 +7,7 @@ IMAGE=dhcp-client OBJS=obj/dhcp-client MIRAGE_COMPILE=mobylinux/mirage-compile:f903b0e1b4328271364cc63f123ac49d56739cef@sha256:a54d9ca84d3f5998dba92ce83d60d49289cee8908a8b0f6ec280d30ab8edf46c -CALF_OBJS=obj/dhcp-client-calf +CALF_OBJS=obj/calf/dhcp-client-calf CALF_FILES=dhcp-client/calf/config.ml dhcp-client/calf/unikernel.ml \ dhcp-client/calf/jbuild @@ -18,8 +18,8 @@ default: push docker build -t $(IMAGE):build -f Dockerfile.build -q . > .build || \ (rm -f $@ && exit 1) -.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) - docker build -t $(IMAGE):pkg -f Dockerfile.pkg -q . > .pkg || \ +.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) obj/config.json + docker build --no-cache -t $(IMAGE):pkg -f Dockerfile.pkg -q . > .pkg || \ (rm -f $@ && exit 1) .dev: Dockerfile.dev init-dev.sh @@ -43,13 +43,23 @@ enter-dev: .dev # tar xf - || exit 1) && \ # touch $@ -$(OBJS) $(CALF_OBJS): .build $(FILES) $(CALF_FILES) - mkdir -p obj/usr/lib obj/bin +$(OBJS): .build $(FILES) + mkdir -p obj/calf ( cd obj && \ - docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(OBJS:obj/%=/%) $(CALF_OBJS:obj/%=/%) | tar xf - ) && \ + docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(OBJS:obj/%=/%) | tar xf - ) && \ touch $@ -hash: Makefile Dockerfile.build Dockerfile.pkg $(FILES) $(CALF_FILES) .build +$(CALF_OBJS): .build $(CALF_FILES) + mkdir -p obj/calf + ( cd obj && \ + docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(CALF_OBJS:obj/%=/%) | tar xf - ) && \ + touch $@ + +obj/config.json: dhcp-client/calf/config.json + mkdir -p obj/calf + cp $^ $@ + +hash: Makefile Dockerfile.build Dockerfile.pkg $(FILES) $(CALF_FILES) .build obj/config.json { cat $^; \ docker run --rm --entrypoint sh $(IMAGE):build -c 'cat /lib/apk/db/installed'; \ docker run --rm --entrypoint sh $(IMAGE):build -c 'opam list'; } \ diff --git a/projects/miragesdk/src/dhcp-client/calf/config.json b/projects/miragesdk/src/dhcp-client/calf/config.json new file mode 100644 index 000000000..212e28393 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/calf/config.json @@ -0,0 +1,38 @@ +{ + "ociVersion": "1.0.0-rc5-dev", + "platform": { + "os": "linux", + "arch": "amd64" + }, + "process": { + "terminal": false, + "user": {}, + "args": ["/dhcp-client-calf", "-vv", "--net=3", "--ctl=4"], + "cwd": "/", + "capabilities": { + "bounding": [], + "effective": [], + "inheritable": [], + "permitted": [] + } + }, + "root": { + "path": "calf", + "readonly": true + }, + "mounts": [ + { "destination": "/proc", "type": "proc", "source": "proc"} + ], + "linux": { + "resources": { + "disableOOMKiller": false + }, + "namespaces": [ + { "type": "pid" }, + { "type": "ipc" }, + { "type": "uts" }, + { "type": "network" }, + { "type": "mount" } + ] + } +} diff --git a/projects/miragesdk/src/dhcp-client/calf/unikernel.ml b/projects/miragesdk/src/dhcp-client/calf/unikernel.ml index b59692da6..b023a2970 100644 --- a/projects/miragesdk/src/dhcp-client/calf/unikernel.ml +++ b/projects/miragesdk/src/dhcp-client/calf/unikernel.ml @@ -3,8 +3,11 @@ open Lwt.Infix let src = Logs.Src.create "charrua" module Log = (val Logs.src_log src : Logs.LOG) +let failf fmt = Fmt.kstrf Lwt.fail_with fmt + type t = { address: Ipaddr.V4.t; + gateway: Ipaddr.V4.t option; domain: string option; search: string option; nameservers: Ipaddr.V4.t list; @@ -13,8 +16,9 @@ type t = { (* FIXME: we loose lots of info here *) let of_ipv4_config (t: Mirage_protocols_lwt.ipv4_config) = { address = t.Mirage_protocols_lwt.address; - domain = None; - search = None; + gateway = t.Mirage_protocols_lwt.gateway; + domain = None; + search = None; nameservers = [] } let pp ppf t = @@ -33,10 +37,14 @@ let of_pkt lease = (* ipv4_config expects a single IP address and the information * needed to construct a prefix. It can optionally use one router. *) let address = lease.yiaddr in + let gateway = match Dhcp_wire.collect_routers lease.options with + | [] -> None + | h::_ -> Some h + in let domain = Dhcp_wire.find_domain_name lease.options in let search = Dhcp_wire.find_domain_search lease.options in let nameservers = Dhcp_wire.collect_name_servers lease.options in - { address; domain; search; nameservers } + { address; gateway; domain; search; nameservers } let of_pkt_opt = function | None -> None @@ -167,9 +175,18 @@ let setup_log = module Dhcp_client = Dhcp_client_mirage.Make(Time)(Net) +let set_ip ctl k ip = + let str = Ipaddr.V4.to_string ip ^ "\n" in + Sdk.Ctl.Client.write ctl k str >>= function + | Ok () -> Lwt.return_unit + | Error (`Msg e) -> failf "error while writing %s: %s" k e + +let set_ip_opt ctl k = function + | None -> Lwt.return_unit + | Some ip -> set_ip ctl k ip + let start () dhcp_codes net ctl = Netif_fd.connect net >>= fun net -> - let ctl = Sdk.Ctl.Client.v (Lwt_unix.of_unix_file_descr ctl) in let requests = match dhcp_codes with | [] -> default_options | l -> @@ -184,15 +201,19 @@ let start () dhcp_codes net ctl = Lwt_stream.last_new stream >>= fun result -> let result = of_ipv4_config result in Log.info (fun l -> l "found lease: %a" pp result); - Sdk.Ctl.Client.write ctl "/ip" (Ipaddr.V4.to_string result.address ^ "\n") + set_ip ctl "/ip" result.address >>= fun () -> + set_ip_opt ctl "/gateway" result.gateway (* FIXME: Main end *) -let magic (x: int) = (Obj.magic x: Unix.file_descr) + +let fd (x: int) = (Obj.magic x: Unix.file_descr) + +let flow (x: int) = Sdk.Init.file_descr (Lwt_unix.of_unix_file_descr @@ fd x) let start () dhcp_codes net ctl = Lwt_main.run ( - let net = magic net in - let ctl = magic ctl in + let net = fd net in + let ctl = Sdk.Ctl.Client.v (flow ctl) in start () dhcp_codes net ctl ) @@ -202,7 +223,4 @@ let run = let () = match Term.eval run with | `Error _ -> exit 1 - | `Ok (Ok ()) |`Help |`Version -> exit 0 - | `Ok (Error (`Msg e)) -> - Printf.eprintf "%s\n%!" e; - exit 1 + | `Ok () |`Help |`Version -> exit 0 diff --git a/projects/miragesdk/src/dhcp-client/jbuild b/projects/miragesdk/src/dhcp-client/jbuild index cece90bb7..37c01d523 100644 --- a/projects/miragesdk/src/dhcp-client/jbuild +++ b/projects/miragesdk/src/dhcp-client/jbuild @@ -2,6 +2,6 @@ (executables ((names (main)) - (libraries (sdk bpf_dhcp)) + (libraries (sdk bpf_dhcp bos)) (flags (-cclib -static)) )) diff --git a/projects/miragesdk/src/dhcp-client/main.ml b/projects/miragesdk/src/dhcp-client/main.ml index 22f590ac8..e8b796963 100644 --- a/projects/miragesdk/src/dhcp-client/main.ml +++ b/projects/miragesdk/src/dhcp-client/main.ml @@ -7,6 +7,12 @@ module Log = (val Logs.src_log src : Logs.LOG) let failf fmt = Fmt.kstrf Lwt.fail_with fmt +let run fmt = + Fmt.kstrf (fun str -> + match Sys.command str with + | 0 -> Lwt.return () + | i -> Fmt.kstrf Lwt.fail_with "%S exited with code %d" str i + ) fmt module Handlers = struct @@ -17,45 +23,55 @@ module Handlers = struct | `Updated (_, (_, `Contents (v, _))) -> Some v | _ -> None - let ip t = + let ip ~ethif t = Ctl.KV.watch_key t ["ip"] (fun diff -> match contents_of_diff diff with + | None -> Lwt.return_unit | Some ip -> + let ip = String.trim ip in Log.info (fun l -> l "SET IP to %s" ip); - Lwt.return () - | _ -> - Lwt.return () + (* FIXME: use language bindings to netlink instead *) + run "ifconfig %s %s netmask 255.255.255.0" ethif ip + (* run "ip addr add %s/24 dev %s" ip ethif *) ) - let handlers = [ - ip; + let gateway t = + Ctl.KV.watch_key t ["gateway"] (fun diff -> + match contents_of_diff diff with + | None -> Lwt.return_unit + | Some gw -> + let gw = String.trim gw in + Log.info (fun l -> l "SET GATEWAY to %s" gw); + (* FIXME: use language bindings to netlink instead *) + run "ip route add default via %s" gw + ) + + let handlers ~ethif = [ + ip ~ethif; + gateway; ] - let watch path = - Ctl.v path >>= fun db -> - Lwt_list.map_p (fun f -> f db) handlers >>= fun _ -> + let watch ~ethif db = + Lwt_list.map_p (fun f -> f db) (handlers ethif) >>= fun _ -> let t, _ = Lwt.task () in t end -external bpf_filter: unit -> string = "bpf_filter" +external dhcp_filter: unit -> string = "bpf_filter" let t = Init.Pipe.v () -let ctl = string_of_int Init.(Fd.to_int Pipe.(calf @@ ctl t)) -let net = string_of_int Init.(Fd.to_int Pipe.(calf @@ net t)) let default_cmd = [ - "/dhcp-client-calf"; "--ctl="^ctl; "--net="^net + "/calf/dhcp-client-calf"; "--net=3"; "--ctl=4"; "-vv"; ] -(* FIXME: use runc isolation - let default_cmd = [ - "/usr/bin/runc"; "--"; "run"; - "--bundle"; "/containers/images/000-dhcp-client"; - "dhcp-client" - ] in - *) +(* +let default_cmd = [ + "/usr/bin/runc"; "run"; "--preserve-fds"; "2"; "--bundle"; "."; "calf +" +] +*) let read_cmd file = if Sys.file_exists file then @@ -71,18 +87,18 @@ let read_cmd file = | Some f -> read_cmd f in Lwt_main.run ( - let net = Init.rawlink ~filter:(bpf_filter ()) ethif in let routes = [ "/ip"; + "/gateway"; "/domain"; "/search"; "/mtu"; "/nameservers/*" ] in - Ctl.v "/data" >>= fun ctl -> - let fd = Init.(Fd.fd @@ Pipe.(priv @@ ctl t)) in - let ctl () = Ctl.Server.listen ~routes ctl fd in - let handlers () = Handlers.watch path in + Ctl.v path >>= fun db -> + let ctl fd = Ctl.Server.listen ~routes db fd in + let handlers () = Handlers.watch ~ethif db in + let net = Init.rawlink ~filter:(dhcp_filter ()) ethif in Init.run t ~net ~ctl ~handlers cmd ) diff --git a/projects/miragesdk/src/sdk/IO.ml b/projects/miragesdk/src/sdk/IO.ml index a03189370..4e8a2d023 100644 --- a/projects/miragesdk/src/sdk/IO.ml +++ b/projects/miragesdk/src/sdk/IO.ml @@ -3,46 +3,61 @@ open Lwt.Infix let src = Logs.Src.create "IO" ~doc:"IO helpers" module Log = (val Logs.src_log src : Logs.LOG) -let rec really_write fd buf off len = - match len with - | 0 -> Lwt.return_unit - | len -> - Log.debug (fun l -> l "really_write off=%d len=%d" off len); - Lwt_unix.write fd buf off len >>= fun n -> - really_write fd buf (off+n) (len-n) +(* from mirage-conduit. FIXME: move to mirage-flow *) +type 'a io = 'a Lwt.t +type buffer = Cstruct.t +type error = [`Msg of string] +type write_error = [ Mirage_flow.write_error | error ] +let pp_error ppf (`Msg s) = Fmt.string ppf s -let write fd buf = really_write fd buf 0 (String.length buf) +let pp_write_error ppf = function + | #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e + | #error as e -> pp_error ppf e -let rec really_read fd buf off len = - match len with - | 0 -> Lwt.return_unit - | len -> - Log.debug (fun l -> l "really_read off=%d len=%d" off len); - Lwt_unix.read fd buf off len >>= fun n -> - really_read fd buf (off+n) (len-n) +type flow = + | Flow: string + * (module Mirage_flow_lwt.CONCRETE with type flow = 'a) + * 'a + -> flow -let read_all fd = - Log.debug (fun l -> l "read_all"); - let len = 16 * 1024 in - let buf = Bytes.create len in - let rec loop acc = - Lwt_unix.read fd buf 0 len >>= fun n -> - let acc = String.sub buf 0 n :: acc in - if n <= len then Lwt.return (List.rev acc) - else loop acc +let create (type a) (module M: Mirage_flow_lwt.S with type flow = a) t name = + let m = + (module Mirage_flow_lwt.Concrete(M): + Mirage_flow_lwt.CONCRETE with type flow = a) in - loop [] >|= fun bufs -> - String.concat "" bufs + Flow (name, m , t) -let read_n fd len = - Log.debug (fun l -> l "read_n len=%d" len); - let buf = Bytes.create len in - let rec loop acc len = - Lwt_unix.read fd buf 0 len >>= fun n -> - let acc = String.sub buf 0 n :: acc in - match len - n with - | 0 -> Lwt.return (List.rev acc) - | r -> loop acc r +let read (Flow (_, (module F), flow)) = F.read flow +let write (Flow (_, (module F), flow)) b = F.write flow b +let writev (Flow (_, (module F), flow)) b = F.writev flow b +let close (Flow (_, (module F), flow)) = F.close flow +let pp ppf (Flow (name, _, _)) = Fmt.string ppf name + +type t = flow + +let forward ~src ~dst = + let rec loop () = + read src >>= function + | Ok `Eof -> + Log.err (fun l -> l "forward[%a => %a] EOF" pp src pp dst); + Lwt.return_unit + | Error e -> + Log.err (fun l -> l "forward[%a => %a] %a" pp src pp dst pp_error e); + Lwt.return_unit + | Ok (`Data buf) -> + Log.debug (fun l -> l "forward[%a => %a] %a" + pp src pp dst Cstruct.hexdump_pp buf); + write dst buf >>= function + | Ok () -> loop () + | Error e -> + Log.err (fun l -> l "forward[%a => %a] %a" + pp src pp dst pp_write_error e); + Lwt.return_unit in - loop [] len >|= fun bufs -> - String.concat "" bufs + loop () + +let proxy f1 f2 = + Lwt.join [ + forward ~src:f1 ~dst:f2; + forward ~src:f2 ~dst:f1; + ] diff --git a/projects/miragesdk/src/sdk/IO.mli b/projects/miragesdk/src/sdk/IO.mli index b07954d18..2883e34d2 100644 --- a/projects/miragesdk/src/sdk/IO.mli +++ b/projects/miragesdk/src/sdk/IO.mli @@ -1,16 +1,21 @@ (** IO helpers *) -val really_write: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t -(** [really_write fd buf off len] writes exactly [len] bytes to [fd]. *) +type t +(** The type for IO flows *) -val write: Lwt_unix.file_descr -> string -> unit Lwt.t -(** [write fd buf] writes all the buffer [buf] in [fd]. *) +include Mirage_flow_lwt.S with type flow = t -val really_read: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t -(** [really_read fd buf off len] reads exactly [len] bytes from [fd]. *) +val create: (module Mirage_flow_lwt.S with type flow = 'a) -> 'a -> string -> flow +(** [create (module M) t name] is the flow representing [t] using the + function defined in [M]. *) -val read_all: Lwt_unix.file_descr -> string Lwt.t -(** [read_all fd] reads as much data as it is available in [fd]. *) +val pp: flow Fmt.t +(** [pp] is the pretty-printer for IO flows. *) -val read_n: Lwt_unix.file_descr -> int -> string Lwt.t -(** [read_n fd n] reads exactly [n] bytes from [fd]. *) +val forward: src:t -> dst:t -> unit Lwt.t +(** [forward ~src ~dst] forwards writes from [src] to [dst]. Block + until either [src] or [dst] is closed. *) + +val proxy: t -> t -> unit Lwt.t +(** [proxy x y] is the same as [forward x y <*> forward y x]. Block + until both flows are closed. *) diff --git a/projects/miragesdk/src/sdk/ctl.ml b/projects/miragesdk/src/sdk/ctl.ml index d097664bd..909ae8be0 100644 --- a/projects/miragesdk/src/sdk/ctl.ml +++ b/projects/miragesdk/src/sdk/ctl.ml @@ -4,6 +4,8 @@ open Astring let src = Logs.Src.create "init" ~doc:"Init steps" module Log = (val Logs.src_log src : Logs.LOG) +let failf fmt = Fmt.kstrf Lwt.fail_with fmt + (* FIXME: to avoid linking with gmp *) module No_IO = struct type ic = unit @@ -32,6 +34,8 @@ let () = (* FIXME: inotify need some unknown massaging. *) (* Irmin_watcher.hook *) +module C = Mirage_channel_lwt.Make(IO) + module Query = struct (* FIXME: this should probably be replaced by protobuf *) @@ -61,6 +65,9 @@ module Query = struct } [@@little_endian] ] + type error = [ `Eof | `Msg of string ] + let pp_error ppf t = Fmt.string ppf (match t with `Eof -> "EOF" | `Msg s -> s) + (* to avoid warning 32 *) let _ = hexdump_msg let _ = string_to_operation @@ -109,25 +116,37 @@ module Query = struct Cstruct.blit_from_bytes msg.payload 0 buf (sizeof_msg+path) payload; buf + let err e = Lwt.return (Error (`Msg (Fmt.to_to_string C.pp_error e))) + let err_eof = Lwt.return (Error `Eof) + let read fd = - IO.read_n fd 4 >>= fun buf -> - Log.debug (fun l -> l "Message.read len=%S" buf); - let len = - Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0 - |> Int32.to_int - in - IO.read_n fd len >|= fun buf -> - of_cstruct (Cstruct.of_string buf) + let fd = C.create fd in + C.read_exactly fd ~len:4 >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf); + let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in + C.read_exactly fd ~len >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Lwt.return (of_cstruct buf) let write fd msg = - let buf = to_cstruct msg |> Cstruct.to_string in + let buf = to_cstruct msg in let len = let len = Cstruct.create 4 in - Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ String.length buf); - Cstruct.to_string len + Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf); + len in - IO.write fd len >>= fun () -> - IO.write fd buf + IO.write fd len >>= function + | Error e -> failf "Query.write(len) %a" IO.pp_write_error e + | Ok () -> IO.write fd buf >>= function + | Ok () -> Lwt.return_unit + | Error e -> failf "Query.write(buf) %a" IO.pp_write_error e end @@ -155,6 +174,9 @@ module Reply = struct } [@@little_endian] ] + type error = [ `Eof | `Msg of string ] + let pp_error ppf t = Fmt.string ppf (match t with `Eof -> "EOF" | `Msg s -> s) + (* to avoid warning 32 *) let _ = hexdump_msg let _ = string_to_status @@ -191,25 +213,37 @@ module Reply = struct Cstruct.blit_from_bytes msg.payload 0 buf sizeof_msg payload; buf + let err e = Lwt.return (Result.Error (`Msg (Fmt.to_to_string C.pp_error e))) + let err_eof = Lwt.return (Result.Error `Eof) + let read fd = - IO.read_n fd 4 >>= fun buf -> - Log.debug (fun l -> l "Message.read len=%S" buf); - let len = - Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0 - |> Int32.to_int - in - IO.read_n fd len >|= fun buf -> - of_cstruct (Cstruct.of_string buf) + let fd = C.create fd in + C.read_exactly fd ~len:4 >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf); + let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in + C.read_exactly fd ~len >>= function + | Ok `Eof -> err_eof + | Error e -> err e + | Ok (`Data buf) -> + let buf = Cstruct.concat buf in + Lwt.return (of_cstruct buf) let write fd msg = - let buf = to_cstruct msg |> Cstruct.to_string in + let buf = to_cstruct msg in let len = let len = Cstruct.create 4 in - Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ String.length buf); - Cstruct.to_string len + Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf); + len in - IO.write fd len >>= fun () -> - IO.write fd buf + IO.write fd len >>= function + | Error e -> failf "Reply.write(len) %a" IO.pp_write_error e + | Ok () -> IO.write fd buf >>= function + | Ok () -> Lwt.return_unit + | Error e -> failf "Reply.write(buf) %a" IO.pp_write_error e end @@ -231,7 +265,7 @@ module Client = struct module Cache = Hashtbl.Make(K) type t = { - fd : Lwt_unix.file_descr; + fd : IO.t; replies: Reply.t Cache.t; } @@ -247,8 +281,9 @@ module Client = struct Lwt.return r with Not_found -> Reply.read t.fd >>= function - | Error (`Msg e) -> - Log.err (fun l -> l "Got %s while waiting for a reply to %ld" e id); + | Error e -> + Log.err (fun l -> l "Got %a while waiting for a reply to %ld" + Query.pp_error e id); loop () | Ok r -> if r.id = id then Lwt.return r @@ -327,20 +362,13 @@ module Server = struct | Some v -> ok q v ) - - let int_of_fd (t:Lwt_unix.file_descr) = - (Obj.magic (Lwt_unix.unix_file_descr t): int) - let listen ~routes db fd = - Lwt_unix.blocking fd >>= fun blocking -> - Log.debug (fun l -> - l "Serving the control state over fd:%d (blocking=%b)" - (int_of_fd fd) blocking - ); + Log.debug (fun l -> l "Serving the control state over %a" IO.pp fd); let queries = Queue.create () in let cond = Lwt_condition.create () in let rec listen () = Query.read fd >>= function + | Error `Eof -> Lwt.return_unit | Error (`Msg e) -> Log.err (fun l -> l "received invalid message: %s" e); listen () diff --git a/projects/miragesdk/src/sdk/ctl.mli b/projects/miragesdk/src/sdk/ctl.mli index 0dad5aa8e..6974e211a 100644 --- a/projects/miragesdk/src/sdk/ctl.mli +++ b/projects/miragesdk/src/sdk/ctl.mli @@ -18,6 +18,12 @@ module Query: sig payload : string; (** Arbitrary payload. *) } + type error = [ `Eof | `Msg of string ] + (** The type of errors. *) + + val pp_error: error Fmt.t + (** [pp_error] is the pretty-printer for query errors. *) + val pp: t Fmt.t (** [pp] is the pretty-printer for queries. *) @@ -28,10 +34,10 @@ module Query: sig val to_cstruct: t -> Cstruct.t (** [to_cstruct t] is the serialization of [t]. *) - val write: Lwt_unix.file_descr -> t -> unit Lwt.t + val write: IO.flow -> t -> unit Lwt.t (** [write fd t] writes a query message. *) - val read: Lwt_unix.file_descr -> (t, [`Msg of string]) result Lwt.t + val read: IO.flow -> (t, error) result Lwt.t (** [read fd] reads a query message. *) end @@ -60,10 +66,16 @@ module Reply: sig val to_cstruct: t -> Cstruct.t (** [to_cstruct t] is the serialization of [t]. *) - val write: Lwt_unix.file_descr -> t -> unit Lwt.t + type error = [`Eof | `Msg of string ] + (** The type for reply errors. *) + + val pp_error: error Fmt.t + (** [pp_error] is the pretty-printer for errors. *) + + val write: IO.flow -> t -> unit Lwt.t (** [write fd t] writes a reply message. *) - val read: Lwt_unix.file_descr -> (t, [`Msg of string]) result Lwt.t + val read: IO.flow -> (t, error) result Lwt.t (** [read fd] reads a reply message. *) end @@ -80,7 +92,7 @@ module Client: sig type t (** The type for client state. *) - val v: Lwt_unix.file_descr -> t + val v: IO.t -> t (** [v fd] is the client state using [fd] to send requests to the server. A client state also stores some state for all the incomplete client queries. *) @@ -108,7 +120,7 @@ val v: string -> KV.t Lwt.t module Server: sig - val listen: routes:string list -> KV.t -> Lwt_unix.file_descr -> unit Lwt.t + val listen: routes:string list -> KV.t -> IO.t -> unit Lwt.t (** [listen ~routes kv fd] is the thread exposing the KV store [kv], holding control plane state, running inside the privileged container. [routes] are the routes exposed by the server to the diff --git a/projects/miragesdk/src/sdk/init.ml b/projects/miragesdk/src/sdk/init.ml index 5f6559893..1ef40cb05 100644 --- a/projects/miragesdk/src/sdk/init.ml +++ b/projects/miragesdk/src/sdk/init.ml @@ -5,6 +5,127 @@ module Log = (val Logs.src_log src : Logs.LOG) let failf fmt = Fmt.kstrf Lwt.fail_with fmt +let pp_fd ppf (t:Lwt_unix.file_descr) = + Fmt.int ppf (Obj.magic (Lwt_unix.unix_file_descr t): int) + +let rec really_write fd buf off len = + match len with + | 0 -> Lwt.return_unit + | len -> + Log.debug (fun l -> l "really_write %a off=%d len=%d" pp_fd fd off len); + Lwt_unix.write fd buf off len >>= fun n -> + if n = 0 then Lwt.fail_with "write 0" + else really_write fd buf (off+n) (len-n) + +let write_all fd buf = really_write fd buf 0 (String.length buf) + +let read_all fd = + Log.debug (fun l -> l "read_all %a" pp_fd fd); + let len = 16 * 1024 in + let buf = Bytes.create len in + let rec loop acc = + Lwt_unix.read fd buf 0 len >>= fun n -> + if n = 0 then failf "read %a: 0" pp_fd fd + else + let acc = String.sub buf 0 n :: acc in + if n <= len then Lwt.return (List.rev acc) + else loop acc + in + loop [] >|= fun bufs -> + String.concat "" bufs + +module Flow = struct + + (* build a flow from Lwt_unix.file_descr *) + module Fd: Mirage_flow_lwt.CONCRETE with type flow = Lwt_unix.file_descr = struct + type 'a io = 'a Lwt.t + type buffer = Cstruct.t + type error = [`Msg of string] + type write_error = [ Mirage_flow.write_error | error ] + + let pp_error ppf (`Msg s) = Fmt.string ppf s + + let pp_write_error ppf = function + | #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e + | #error as e -> pp_error ppf e + + type flow = Lwt_unix.file_descr + + let err e = Lwt.return (Error (`Msg (Printexc.to_string e))) + + let read t = + Lwt.catch (fun () -> + read_all t >|= fun buf -> Ok (`Data (Cstruct.of_string buf)) + ) (function Failure _ -> Lwt.return (Ok `Eof) | e -> err e) + + let write t b = + Lwt.catch (fun () -> + write_all t (Cstruct.to_string b) >|= fun () -> Ok () + ) (fun e -> err e) + + let close t = Lwt_unix.close t + + let writev t bs = + Lwt.catch (fun () -> + Lwt_list.iter_s (fun b -> write_all t (Cstruct.to_string b)) bs + >|= fun () -> Ok () + ) (fun e -> err e) + end + + (* build a flow from rawlink *) + module Rawlink: Mirage_flow_lwt.CONCRETE with type flow = Lwt_rawlink.t = struct + type 'a io = 'a Lwt.t + type buffer = Cstruct.t + type error = [`Msg of string] + type write_error = [ Mirage_flow.write_error | error ] + + let pp_error ppf (`Msg s) = Fmt.string ppf s + + let pp_write_error ppf = function + | #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e + | #error as e -> pp_error ppf e + + type flow = Lwt_rawlink.t + + let err e = Lwt.return (Error (`Msg (Printexc.to_string e))) + + let read t = + Lwt.catch (fun () -> + Lwt_rawlink.read_packet t >|= fun buf -> Ok (`Data buf) + ) (function Failure _ -> Lwt.return (Ok `Eof) | e -> err e) + + let write t b = + Lwt.catch (fun () -> + Lwt_rawlink.send_packet t b >|= fun () -> Ok () + ) (fun e -> err e) + + let close t = Lwt_rawlink.close_link t + + let writev t bs = + Lwt.catch (fun () -> + Lwt_list.iter_s (Lwt_rawlink.send_packet t) bs >|= fun () -> Ok () + ) (fun e -> err e) + end + + let int_of_fd t = + (Obj.magic (Lwt_unix.unix_file_descr t): int) + + let fd ?name t = + IO.create (module Fd) t (match name with + | None -> string_of_int (int_of_fd t) + | Some n -> n) + +end + +let file_descr ?name t = Flow.fd ?name t + +let rawlink ?filter ethif = + Log.debug (fun l -> l "bringing up %s" ethif); + (try Tuntap.set_up_and_running ethif + with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); + let t = Lwt_rawlink.open_link ?filter ethif in + IO.create (module Flow.Rawlink) t ethif + module Fd = struct type t = { @@ -12,11 +133,15 @@ module Fd = struct fd : Lwt_unix.file_descr; } - let fd t = t.fd let stdout = { name = "stdout"; fd = Lwt_unix.stdout } let stderr = { name = "stderr"; fd = Lwt_unix.stderr } let stdin = { name = "stdin" ; fd = Lwt_unix.stdin } + let of_int name (i:int) = + let fd : Unix.file_descr = Obj.magic i in + let fd = Lwt_unix.of_unix_file_descr fd in + { name; fd } + let to_int t = (Obj.magic (Lwt_unix.unix_file_descr t.fd): int) @@ -24,7 +149,7 @@ module Fd = struct let close fd = Log.debug (fun l -> l "close %a" pp fd); - Lwt_unix.close fd.fd + Unix.close (Lwt_unix.unix_file_descr fd.fd) let dev_null = Lwt_unix.of_unix_file_descr ~blocking:false @@ -32,62 +157,16 @@ module Fd = struct let redirect_to_dev_null fd = Log.debug (fun l -> l "redirect-stdin-to-dev-null"); - Lwt_unix.close fd.fd >>= fun () -> + Unix.close (Lwt_unix.unix_file_descr fd.fd); Lwt_unix.dup2 dev_null fd.fd; - Lwt_unix.close dev_null + Unix.close (Lwt_unix.unix_file_descr dev_null) let dup2 ~src ~dst = Log.debug (fun l -> l "dup2 %a => %a" pp src pp dst); Lwt_unix.dup2 src.fd dst.fd; close src - let proxy_net ~net fd = - Log.debug (fun l -> l "proxy-net eth0 <=> %a" pp fd); - let rec listen_rawlink () = - Lwt_rawlink.read_packet net >>= fun buf -> - Log.debug (fun l -> l "PROXY-NET: => %a" Cstruct.hexdump_pp buf); - Log.debug (fun l -> l "PROXY-NET: => %S" (Cstruct.to_string buf)); - let rec write buf = - Lwt_cstruct.write fd.fd buf >>= function - | 0 -> Lwt.return_unit - | n -> write (Cstruct.shift buf n) - in - write buf >>= fun () -> - listen_rawlink () - in - let listen_socket () = - let len = 16 * 1024 in - let buf = Cstruct.create len in - let rec loop () = - Lwt_cstruct.read fd.fd buf >>= fun len -> - let buf = Cstruct.sub buf 0 len in - Log.debug (fun l -> l "PROXY-NET: <= %a" Cstruct.hexdump_pp buf); - Lwt_rawlink.send_packet net buf >>= fun () -> - loop () - in - loop () - in - Lwt.pick [ - listen_rawlink (); - listen_socket (); - ] - - let forward ~src ~dst = - Log.debug (fun l -> l "forward %a => %a" pp src pp dst); - let len = 16 * 1024 in - let buf = Bytes.create len in - let rec loop () = - Lwt_unix.read src.fd buf 0 len >>= fun len -> - if len = 0 then Lwt.return_unit (* EOF *) - else ( - Log.debug (fun l -> - l "FORWARD[%a => %a]: %S (%d)" - pp src pp dst (Bytes.sub buf 0 len) len); - IO.really_write dst.fd buf 0 len >>= fun () -> - loop () - ) - in - loop () + let flow t = Flow.fd t.fd ~name:(Fmt.to_to_string pp t) end @@ -130,10 +209,10 @@ module Pipe = struct (* logs pipe *) let stdout = pipe "stdout" in let stderr = pipe "stderr" in - (* store pipe *) - let ctl = socketpair "ctl" in (* network pipe *) let net = socketpair "net" in + (* store pipe *) + let ctl = socketpair "ctl" in (* metrics pipe *) let metrics = pipe "metrics" in { stdout; stderr; ctl; net; metrics } @@ -141,37 +220,34 @@ module Pipe = struct end let exec_calf t cmd = - Fd.(redirect_to_dev_null stdin) >>= fun () -> + Log.info (fun l -> l "child pid is %d" Unix.(getpid ())); + Fd.(redirect_to_dev_null stdin); (* close parent fds *) - Fd.close Pipe.(priv t.stdout) >>= fun () -> - Fd.close Pipe.(priv t.stderr) >>= fun () -> - Fd.close Pipe.(priv t.ctl) >>= fun () -> - Fd.close Pipe.(priv t.net) >>= fun () -> - Fd.close Pipe.(priv t.metrics) >>= fun () -> + Fd.close Pipe.(priv t.stdout); + Fd.close Pipe.(priv t.stderr); + Fd.close Pipe.(priv t.ctl); + Fd.close Pipe.(priv t.net); + Fd.close Pipe.(priv t.metrics); let cmds = String.concat " " cmd in - let calf_net = Pipe.(calf t.net) in - let calf_ctl = Pipe.(calf t.ctl) in - let calf_stdout = Pipe.(calf t.stdout) in - let calf_stderr = Pipe.(calf t.stderr) in + let calf_stdout = Fd.of_int "stdout" 1 in + let calf_stderr = Fd.of_int "stderr" 2 in + let calf_net = Fd.of_int "net" 3 in + let calf_ctl = Fd.of_int "ctl" 4 in Log.info (fun l -> l "Executing %s" cmds); - Log.debug (fun l -> l "net-fd=%a store-fd=%a" Fd.pp calf_net Fd.pp calf_ctl); - Fd.dup2 ~src:calf_stdout ~dst:Fd.stdout >>= fun () -> - Fd.dup2 ~src:calf_stderr ~dst:Fd.stderr >>= fun () -> + (* Move all open fds at the top *) + Fd.dup2 ~src:Pipe.(calf t.net) ~dst:calf_net; + Fd.dup2 ~src:Pipe.(calf t.ctl) ~dst:calf_ctl; + Fd.dup2 ~src:Pipe.(calf t.stderr) ~dst:calf_stderr; + Fd.dup2 ~src:Pipe.(calf t.stdout) ~dst:calf_stdout; (* exec the calf *) Unix.execve (List.hd cmd) (Array.of_list cmd) [||] -let rawlink ?filter ethif = - Log.debug (fun l -> l "bringing up %s" ethif); - (try Tuntap.set_up_and_running ethif - with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); - Lwt_rawlink.open_link ?filter ethif - let check_exit_status cmd status = let cmds = String.concat " " cmd in match status with @@ -180,16 +256,16 @@ let check_exit_status cmd status = | Unix.WSIGNALED i -> failf "%s: signal %d" cmds i | Unix.WSTOPPED i -> failf "%s: stopped %d" cmds i -let exec_priv t ~pid ~cmd ~net ~ctl ~handlers = +let exec_priv t ~pid cmd = - Fd.(redirect_to_dev_null stdin) >>= fun () -> + Fd.(redirect_to_dev_null stdin); (* close child fds *) - Fd.close Pipe.(calf t.stdout) >>= fun () -> - Fd.close Pipe.(calf t.stderr) >>= fun () -> - Fd.close Pipe.(calf t.net) >>= fun () -> - Fd.close Pipe.(calf t.ctl) >>= fun () -> - Fd.close Pipe.(calf t.metrics) >>= fun () -> + Fd.close Pipe.(calf t.stdout); + Fd.close Pipe.(calf t.stderr); + Fd.close Pipe.(calf t.net); + Fd.close Pipe.(calf t.ctl); + Fd.close Pipe.(calf t.metrics); let wait () = Lwt_unix.waitpid [] pid >>= fun (_pid, w) -> @@ -197,21 +273,40 @@ let exec_priv t ~pid ~cmd ~net ~ctl ~handlers = check_exit_status cmd w in + Lwt.return wait + +let block_for_ever = + let t, _ = Lwt.task () in + fun () -> t + +let exec_and_forward ?(handlers=block_for_ever) ~pid ~cmd ~net ~ctl t = + + exec_priv t ~pid cmd >>= fun wait -> + + let priv_net = Fd.flow Pipe.(priv t.net) in + let priv_ctl = Fd.flow Pipe.(priv t.ctl) in + let priv_stdout = Fd.flow Pipe.(priv t.stdout) in + let priv_stderr = Fd.flow Pipe.(priv t.stderr) in + Lwt.pick ([ wait (); (* data *) - Fd.proxy_net ~net Pipe.(priv t.net); + IO.proxy net priv_net; (* redirect the calf stdout to the shim stdout *) - Fd.forward ~src:Pipe.(priv t.stdout) ~dst:Fd.stdout; - Fd.forward ~src:Pipe.(priv t.stderr) ~dst:Fd.stderr; - (* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *) - ctl (); + IO.forward ~src:priv_stdout ~dst:Fd.(flow stdout); + IO.forward ~src:priv_stderr ~dst:Fd.(flow stderr); + (* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) + ~dst:Init.Fd.metric; *) + ctl priv_ctl; handlers (); ]) -let run t ~net ~ctl ~handlers cmd = +let exec t cmd fn = Lwt_io.flush_all () >>= fun () -> match Lwt_unix.fork () with | 0 -> exec_calf t cmd - | pid -> exec_priv t ~pid ~cmd ~net ~ctl ~handlers + | pid -> fn pid + +let run t ~net ~ctl ?handlers cmd = + exec t cmd (fun pid -> exec_and_forward ?handlers ~pid ~cmd ~net ~ctl t) diff --git a/projects/miragesdk/src/sdk/init.mli b/projects/miragesdk/src/sdk/init.mli index 4733b43b5..f3e225496 100644 --- a/projects/miragesdk/src/sdk/init.mli +++ b/projects/miragesdk/src/sdk/init.mli @@ -14,6 +14,7 @@ data, e.g. the IP address once a DHCP lease is obtained.} }*) + module Fd: sig type t @@ -22,28 +23,15 @@ module Fd: sig val pp: t Fmt.t (** [pp_fd] pretty prints a file descriptor. *) - val fd: t -> Lwt_unix.file_descr - (** [fd t] is [t]'s underlying unix file descriptor. *) - - val to_int: t -> int -(** [to_int fd] is [fd]'s number. *) - - val redirect_to_dev_null: t -> unit Lwt.t + val redirect_to_dev_null: t -> unit (** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *) - val close: t -> unit Lwt.t + val close: t -> unit (** [close fd] closes [fd]. *) - val dup2: src:t -> dst:t -> unit Lwt.t + val dup2: src:t -> dst:t -> unit (** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *) - val proxy_net: net:Lwt_rawlink.t -> t -> unit Lwt.t - (** [proxy_net ~net fd] proxies the traffic between the raw net link - [net] and [fd]. *) - - val forward: src:t -> dst:t -> unit Lwt.t - (** [forward ~src ~dst] forwards the flow from [src] to [dst]. *) - (** {1 Usefull File Descriptors} *) val stdin: t @@ -55,8 +43,14 @@ module Fd: sig val stderr: t (** [stderr] is the standard error. *) + val flow: t -> IO.t + (** [flow t] is the flow representing [t]. *) + end +val file_descr: ?name:string -> Lwt_unix.file_descr -> IO.t +(** [file_descr ?name fd] is the flow for the file-descripor [fd]. *) + module Pipe: sig type t @@ -102,18 +96,25 @@ module Pipe: sig end -val rawlink: ?filter:string -> string -> Lwt_rawlink.t -(** [rawlink ?filter i] is the net raw link to the interface [i] using - the (optional) BPF filter [filter]. *) +val rawlink: ?filter:string -> string -> IO.t +(** [rawlink ?filter x] is the flow using the network interface + [x]. The packets can be filtered using the BPF filter + [filter]. See the documentation of + {{:https://github.com/haesbaert/rawlink}rawlink} for more details + on how to build that filter. *) +val exec: Pipe.monitor -> string list -> (int -> unit Lwt.t) -> unit Lwt.t +(** [exec t cmd k] executes [cmd] in an unprivileged calf process and + call [k] with the pid of the parent process. The child and parents + are connected using [t]. *) + +(* FIXME(samoht): not very happy with that signatue *) val run: Pipe.monitor -> - net:Lwt_rawlink.t -> - ctl:(unit -> unit Lwt.t) -> - handlers:(unit -> unit Lwt.t) -> + net:IO.t -> ctl:(IO.t -> unit Lwt.t) -> + ?handlers:(unit -> unit Lwt.t) -> string list -> unit Lwt.t -(** [run m ~net ~ctl ~handlers cmd] runs [cmd] in a unprivileged calf - process. [ctl] is the control thread connected to the {Pipe.ctl} - pipe. [net] is the net raw link which will be connected to the - calf via the {!Pipe.net} socket pair. [handlers] are the system +(** [run m ~net ~ctl ?handlers cmd] runs [cmd] in a unprivileged calf + process. [net] is the network interface flow. [ctl] is the control + thread connected to the {Pipe.ctl} pipe. [handlers] are the system handler thread which will react to control data to perform privileged system actions. *) diff --git a/projects/miragesdk/src/sdk/jbuild b/projects/miragesdk/src/sdk/jbuild index e357caa71..12c0bce23 100644 --- a/projects/miragesdk/src/sdk/jbuild +++ b/projects/miragesdk/src/sdk/jbuild @@ -4,6 +4,7 @@ ((name sdk) (libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty decompress irmin irmin-git lwt.unix rawlink tuntap dispatch - irmin-watcher inotify astring rresult)) + irmin-watcher inotify astring rresult mirage-flow-lwt + mirage-channel-lwt io-page.unix)) (preprocess (per_file ((pps (cstruct.ppx)) (ctl)))) )) diff --git a/projects/miragesdk/src/test/jbuild b/projects/miragesdk/src/test/jbuild index ef137b2c4..c1b135f79 100644 --- a/projects/miragesdk/src/test/jbuild +++ b/projects/miragesdk/src/test/jbuild @@ -2,7 +2,7 @@ (executables ((names (test)) - (libraries (sdk alcotest astring mtime.os)))) + (libraries (sdk alcotest astring mtime.os mirage-flow-lwt)))) (alias ((name runtest) diff --git a/projects/miragesdk/src/test/test.ml b/projects/miragesdk/src/test/test.ml index c03543ef3..b90fa511d 100644 --- a/projects/miragesdk/src/test/test.ml +++ b/projects/miragesdk/src/test/test.ml @@ -8,44 +8,50 @@ let random_string n = (* workaround https://github.com/mirage/alcotest/issues/88 *) exception Check_error of string -let check_raises msg exn f = +let check_raises msg f = Lwt.catch (fun () -> f () >>= fun () -> Lwt.fail (Check_error msg) ) (function - | Check_error e -> Alcotest.fail e - | e -> - if exn e then Lwt.return_unit - else Fmt.kstrf Alcotest.fail "%s raised %a" msg Fmt.exn e) - -let is_unix_error = function - | Unix.Unix_error _ -> true - | _ -> false + | Check_error e -> Alcotest.fail e + | _ -> Lwt.return_unit + ) let escape = String.Ascii.escape let write fd strs = Lwt_list.iter_s (fun str -> - IO.really_write fd str 0 (String.length str) + IO.write fd (Cstruct.of_string str) >>= function + | Ok () -> Lwt.return_unit + | Error e -> Fmt.kstrf Lwt.fail_with "write: %a" IO.pp_write_error e ) strs +let read fd = + IO.read fd >>= function + | Ok (`Data x) -> Lwt.return (Cstruct.to_string x) + | Ok `Eof -> Lwt.fail_with "read: EOF" + | Error e -> Fmt.kstrf Lwt.fail_with "read: %a" IO.pp_error e + +let calf pipe = Init.(Fd.flow Pipe.(calf pipe)) +let priv pipe = Init.(Fd.flow Pipe.(priv pipe)) + let test_pipe pipe () = - let calf = Init.Fd.fd @@ Init.Pipe.(calf pipe) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv pipe) in + let calf = calf pipe in + let priv = priv pipe in let name = Init.Pipe.name pipe in let test strs = let escape_strs = String.concat ~sep:"" @@ List.map escape strs in (* pipes are unidirectional *) (* calf -> priv works *) write calf strs >>= fun () -> - IO.read_all priv >>= fun buf -> + read priv >>= fun buf -> let msg = Fmt.strf "%s: calf -> priv" name in Alcotest.(check string) msg escape_strs (escape buf); (* priv -> calf don't *) - check_raises (Fmt.strf "%s: priv side is writable!" name) is_unix_error + check_raises (Fmt.strf "%s: priv side is writable!" name) (fun () -> write priv strs) >>= fun () -> - check_raises (Fmt.strf "%s: calf sid is readable!" name) is_unix_error - (fun () -> IO.read_all calf >|= ignore) >>= fun () -> + check_raises (Fmt.strf "%s: calf sid is readable!" name) + (fun () -> read calf >|= ignore) >>= fun () -> Lwt.return_unit in test [random_string 1] >>= fun () -> @@ -56,19 +62,19 @@ let test_pipe pipe () = Lwt.return_unit let test_socketpair pipe () = - let calf = Init.Fd.fd @@ Init.Pipe.(calf pipe) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv pipe) in + let calf = calf pipe in + let priv = priv pipe in let name = Init.Pipe.name pipe in let test strs = let escape_strs = String.concat ~sep:"" @@ List.map escape strs in (* socket pairs are bi-directional *) (* calf -> priv works *) write calf strs >>= fun () -> - IO.read_all priv >>= fun buf -> + read priv >>= fun buf -> Alcotest.(check string) (name ^ " calf -> priv") escape_strs (escape buf); (* priv -> cal works *) write priv strs >>= fun () -> - IO.read_all calf >>= fun buf -> + read calf >>= fun buf -> Alcotest.(check string) (name ^ " priv -> calf") escape_strs (escape buf); Lwt.return_unit in @@ -117,14 +123,14 @@ let test_serialization to_cstruct of_cstruct message messages = in List.iter test messages -let test_send t write read message messages = - let calf = Init.Fd.fd @@ Init.Pipe.(calf @@ ctl t) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv @@ ctl t) in +let test_send t write read message pp_error messages = + let calf = calf Init.Pipe.(ctl t) in + let priv = priv Init.Pipe.(ctl t) in let test m = write calf m >>= fun () -> read priv >|= function - | Ok m' -> Alcotest.(check message) "write/read" m m' - | Error (`Msg e) -> Alcotest.fail ("Message.read: " ^ e) + | Ok m' -> Alcotest.(check message) "write/read" m m' + | Error e -> Fmt.kstrf Alcotest.fail "Message.read: %a" pp_error e in Lwt_list.iter_s test messages @@ -138,11 +144,11 @@ let test_reply_serialization () = let test_query_send t () = let open Ctl.Query in - test_send t write read query queries + test_send t write read query pp_error queries let test_reply_send t () = let open Ctl.Reply in - test_send t write read reply replies + test_send t write read reply pp_error replies let failf fmt = Fmt.kstrf Alcotest.fail fmt @@ -192,8 +198,8 @@ let delete_should_work t k = | Error (`Msg e) -> failf "write(%s) -> error: %s" k e let test_ctl t () = - let calf = Init.Fd.fd @@ Init.Pipe.(calf @@ ctl t) in - let priv = Init.Fd.fd @@ Init.Pipe.(priv @@ ctl t) in + let calf = calf Init.Pipe.(ctl t) in + let priv = priv Init.Pipe.(ctl t) in let k1 = "/foo/bar" in let k2 = "a" in let k3 = "b/c" in @@ -230,6 +236,33 @@ let test_ctl t () = server (); ] +let in_memory_flow () = + let flow = Mirage_flow_lwt.F.string () in + IO.create (module Mirage_flow_lwt.F) flow "mem" + +let test_exec () = + let test () = + let check n pipe = + let t = Init.Pipe.v () in + let pipe = pipe t in + Init.exec t ["/bin/sh"; "-c"; "echo foo >& " ^ string_of_int n] @@ fun _pid -> + read @@ priv pipe >>= fun foo -> + let name = Fmt.strf "fork %s" Init.Pipe.(name pipe) in + Alcotest.(check string) name "foo\n" foo; + Lwt.return_unit + in + check 1 Init.Pipe.stdout >>= fun () -> + (* avoid logging interference *) + let level = Logs.level () in + Logs.set_level None; + check 2 Init.Pipe.stderr >>= fun () -> + Logs.set_level level; + check 3 Init.Pipe.net >>= fun () -> + check 4 Init.Pipe.ctl >>= fun () -> + Lwt.return_unit + in + test () + let run f () = try Lwt_main.run (f ()) with e -> @@ -250,6 +283,7 @@ let test = [ "send queries" , `Quick, run (test_query_send t); "send replies" , `Quick, run (test_reply_send t); "ctl" , `Quick, run (test_ctl t); + "exec" , `Quick, run test_exec; ] let reporter ?(prefix="") () =