diff --git a/projects/miragesdk/src/.gitignore b/projects/miragesdk/src/.gitignore index 1359271fd..c676cab6b 100644 --- a/projects/miragesdk/src/.gitignore +++ b/projects/miragesdk/src/.gitignore @@ -1,33 +1,9 @@ -.build -.pkg -.dev obj/ hash *.install +.merlin +_build -# Generated by jbuilder -dhcp-client-calf/.merlin -dhcp-client/bpf/.merlin -dhcp-client/.merlin -sdk/.merlin -test/.merlin - -# Generated by `make dev` -_build/ -main.native -dhcp-client-calf/dhcp_client -src/bpf/.merlin - -# Generated by the mirage tool -dhcp-client/calf/_build -dhcp-client/calf/Makefile -dhcp-client/calf/myocamlbuild.ml -dhcp-client/calf/*.opam -dhcp-client/calf/key_gen.ml -dhcp-client/calf/main.ml -dhcp-client/calf/.mirage.config - -# Trash files \#* .#* *~ diff --git a/projects/miragesdk/src/Dockerfile b/projects/miragesdk/src/Dockerfile index 4e31e920e..bca509aef 100644 --- a/projects/miragesdk/src/Dockerfile +++ b/projects/miragesdk/src/Dockerfile @@ -17,37 +17,36 @@ RUN which capnp ### SDK -FROM ocaml/opam@sha256:1e1d7fafbfd461bf684b5e11213c85a71fec93577455285e5d82d902ffad91d2 as sdk -#FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 as sdk +#FROM ocaml/opam@sha256:b42566186327141d715c212da3057942bd4cfa5503a87733d366835fa2ddf98d +FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 as sdk COPY --from=capnp /usr/local/bin/capnp /usr/local/bin/ COPY --from=capnp /usr/local/bin/capnpc /usr/local/bin/ COPY --from=capnp /usr/local/lib/libcapnpc-0.7-dev.so /usr/local/lib/ COPY --from=capnp /usr/local/lib/libcapnp-0.7-dev.so /usr/local/lib/ COPY --from=capnp /usr/local/lib/libkj-0.7-dev.so /usr/local/lib/ +COPY --from=capnp /usr/local/include/capnp /usr/local/include/capnp RUN sudo mkdir -p /src USER opam WORKDIR /src RUN git -C /home/opam/opam-repository fetch origin && \ - git -C /home/opam/opam-repository reset ad921dfa87c2e201ef54806d0367aaacce75bc62 --hard && \ + git -C /home/opam/opam-repository reset 092a9a66 --hard && \ opam update -u -RUN opam pin add -n capnp.dev 'https://github.com/talex5/capnp-ocaml.git#interfaces' +# capnp +RUN opam pin add -n mirage-flow.dev --dev +RUN opam pin add -n capnp.dev 'https://github.com/talex5/capnp-ocaml.git#interfaces2' RUN opam pin add -n capnp-rpc.dev 'https://github.com/mirage/capnp-rpc.git' RUN opam pin add -n capnp-rpc-lwt.dev 'https://github.com/mirage/capnp-rpc.git' -RUN opam pin add -n charrua-client.dev https://github.com/samoht/charrua-client.git#with-cdhcpc -RUN opam pin add -n mirage-flow-rawlink.dev https://github.com/mirage/mirage-flow-rawlink.git -RUN opam pin add -n mirage-flow-lwt --dev -RUN opam pin add -n mirage-flow-unix --dev COPY sdk.opam /src RUN sudo chown opam -R /src RUN opam pin add sdk.local /src -n RUN opam depext -y alcotest sdk -RUN opam install alcotest && opam install --deps sdk +RUN opam install alcotest && opam install --deps sdk -t RUN opam list @@ -56,36 +55,59 @@ RUN sudo chown opam -R /src RUN opam update sdk && opam install sdk -t -### Privileged Container +### dhcp-client -FROM sdk as priv +FROM sdk as dhcp-client + +# charrua +RUN opam pin add -n charrua-client.dev https://github.com/mirage/charrua-core.git +RUN opam pin add -n charrua-client-lwt.dev https://github.com/samoht/charrua-core.git#fix-build-and-tests +RUN opam pin add -n charrua-client-mirage.dev https://github.com/samoht/charrua-core.git#fix-build-and-tests + +COPY dhcp-client.opam /src +RUN sudo chown opam -R /src +RUN opam pin add dhcp-client /src -n + +RUN opam install dhcp-client --deps COPY ./dhcp-client /src/dhcp-client RUN sudo chown opam -R /src -RUN opam config exec -- jbuilder build --dev dhcp-client/main.exe +RUN opam config exec -- jbuilder build --dev -p dhcp-client RUN sudo mkdir -p /out RUN sudo cp /src/_build/default/dhcp-client/main.exe /out/dhcp-client +RUN sudo cp /src/_build/default/dhcp-client/main_eng.exe /out/dhcp-client-engine +RUN sudo cp /src/_build/default/dhcp-client/main_net.exe /out/dhcp-client-network +RUN sudo cp /src/_build/default/dhcp-client/main_act.exe /out/dhcp-client-actuator - -### Calf Container - -FROM sdk as calf - -COPY ./dhcp-client-calf/unikernel.ml /src/dhcp-client-calf/ -COPY ./dhcp-client-calf/jbuild /src/dhcp-client-calf/ -RUN sudo chown opam -R /src - -RUN opam config exec -- jbuilder build --dev dhcp-client-calf/unikernel.exe -RUN sudo mkdir -p /out/ -RUN sudo cp /src/_build/default/dhcp-client-calf/unikernel.exe /out/dhcp-client-calf - -### Final build +### One binary FROM scratch USER 0 -COPY --from=priv /out / -COPY --from=calf /out /calf/rootfs/ -COPY dhcp-client-calf/config.json /calf +COPY --from=dhcp-client /out/dhcp-client / CMD ["/dhcp-client", "-vv"] + +### DHCP client engine + +FROM scratch + +USER 0 +COPY --from=dhcp-client /out/dhcp-client-engine / +CMD ["/dhcp-client-engine", "-vv"] + +### DHCP network proxy + +FROM scratch + +USER 0 +COPY --from=dhcp-client /out/dhcp-client-actuator / +CMD ["/dhcp-client-actuator", "-vv"] + +### Host actuator + +FROM scratch + +USER 0 +COPY --from=dhcp-client /out/dhcp-client-actuator / +CMD ["/dhcp-client-actuator", "-vv"] diff --git a/projects/miragesdk/src/Makefile b/projects/miragesdk/src/Makefile index 44b4f321a..77943fded 100644 --- a/projects/miragesdk/src/Makefile +++ b/projects/miragesdk/src/Makefile @@ -1,7 +1,7 @@ .PHONY: tag push default: push -ORG?=miragesdk +ORG?=linuxkitprojects IMAGE=dhcp-client NOCACHE?= @@ -32,5 +32,5 @@ clean: jbuilder clean dev: - jbuilder build dhcp-client/main.exe --dev - jbuilder build dhcp-client-calf/unikernel.exe --dev + jbuilder build sdk.install dhcp-client.install --dev + jbuilder build test/test.exe --dev diff --git a/projects/miragesdk/src/dhcp-client-calf/config.json b/projects/miragesdk/src/dhcp-client-calf/config.json deleted file mode 100644 index 7a6beace2..000000000 --- a/projects/miragesdk/src/dhcp-client-calf/config.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "ociVersion": "1.0.0-rc5-dev", - "platform": { - "os": "linux", - "arch": "amd64" - }, - "process": { - "terminal": false, - "user": {}, - "args": ["/dhcp-client-calf", "-vv", "--net=3", "--ctl=4"], - "cwd": "/", - "capabilities": { - "bounding": [], - "effective": [], - "inheritable": [], - "permitted": [] - } - }, - "root": { - "path": "rootfs", - "readonly": true - }, - "mounts": [ - { "destination": "/proc", "type": "proc", "source": "proc"} - ], - "linux": { - "resources": { - "disableOOMKiller": false - }, - "namespaces": [ - { "type": "pid" }, - { "type": "ipc" }, - { "type": "uts" }, - { "type": "network" }, - { "type": "mount" } - ] - } -} diff --git a/projects/miragesdk/src/dhcp-client-calf/jbuild b/projects/miragesdk/src/dhcp-client-calf/jbuild deleted file mode 100644 index 922d9e56d..000000000 --- a/projects/miragesdk/src/dhcp-client-calf/jbuild +++ /dev/null @@ -1,8 +0,0 @@ -(jbuild_version 1) - -(executables - ((names (unikernel)) - (libraries (sdk mirage-net-fd lwt charrua-client.mirage charrua-client - lwt.unix cmdliner fmt.cli logs.fmt logs.cli fmt.tty)) - (flags (-cclib -static)) - )) diff --git a/projects/miragesdk/src/dhcp-client-calf/unikernel.ml b/projects/miragesdk/src/dhcp-client-calf/unikernel.ml deleted file mode 100644 index 184b30a32..000000000 --- a/projects/miragesdk/src/dhcp-client-calf/unikernel.ml +++ /dev/null @@ -1,158 +0,0 @@ -open Lwt.Infix - -let src = Logs.Src.create "charrua" -module Log = (val Logs.src_log src : Logs.LOG) - -let failf fmt = Fmt.kstrf Lwt.fail_with fmt - -type t = { - address: Ipaddr.V4.t; - gateway: Ipaddr.V4.t option; - domain: string option; - search: string option; - nameservers: Ipaddr.V4.t list; -} - -(* FIXME: we (still) lose lots of info here *) -let of_lease (t: Dhcp_wire.pkt) = - let gateway = match Dhcp_wire.collect_routers t.Dhcp_wire.options with - | [] -> None - | n::_ -> Some n - in - { address = t.Dhcp_wire.yiaddr; - gateway; - domain = Dhcp_wire.find_domain_name t.Dhcp_wire.options; - search = Dhcp_wire.find_domain_search t.Dhcp_wire.options; - nameservers = Dhcp_wire.collect_dns_servers t.Dhcp_wire.options } - -let pp ppf t = - Fmt.pf ppf "\n\ - address : %a\n\ - domain : %a\n\ - search : %a\n\ - nameservers: %a\n" - Ipaddr.V4.pp_hum t.address - Fmt.(option ~none:(unit "--") string) t.domain - Fmt.(option ~none:(unit "--") string) t.search - Fmt.(list ~sep:(unit " ") Ipaddr.V4.pp_hum) t.nameservers - -let parse_option_code str = - match Dhcp_wire.string_to_option_code str with - | Some x -> Ok x - | None -> Error (Fmt.strf "%s is not a valid DHCP option code" str) - -let default_options = - let open Dhcp_wire in - [ - RAPID_COMMIT; - DOMAIN_NAME; - DOMAIN_SEARCH; - HOSTNAME; - CLASSLESS_STATIC_ROUTE; - NTP_SERVERS; - INTERFACE_MTU; - ] - -(* FIXME: use the mirage tool *) - -module Time = struct - type +'a io = 'a Lwt.t - let sleep_ns x = Lwt_unix.sleep (Int64.to_float x /. 1_000_000_000.) -end -module Net = Netif_fd -module Ctl = Netif_fd - -open Cmdliner - -let dhcp_codes = - let doc = Arg.info ~docv:"OPT" ~doc:"DHCP options." ["c";"codes"] in - Arg.(value & opt (list string) [] doc) - -let net = - let doc = Arg.info ~docv:"FD" ~doc:"Network interface" ["net"] in - Arg.(value & opt int 3 doc) - -let ctl = - let doc = Arg.info ~docv:"FD" ~doc:"Control interface" ["ctl"] in - Arg.(value & opt int 4 doc) - -let setup_log style_renderer level = - Fmt_tty.setup_std_outputs ?style_renderer (); - Logs.set_level level; - let pp_header ppf x = - Fmt.pf ppf "%5d: %a " (Unix.getpid ()) Logs_fmt.pp_header x - in - Logs.set_reporter (Logs_fmt.reporter ~pp_header ()); - () - -let setup_log = - Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ()) - -(* FIXME: module Main ... *) - -module Dhcp_client = Dhcp_client_lwt.Make(Time)(Net) - -let pp_path = Fmt.(list ~sep:(unit "/") string) - -let set_ip ctl k ip = - let str = Ipaddr.V4.to_string ip ^ "\n" in - Sdk.Ctl.Client.write ctl k str >>= function - | Ok () -> Lwt.return_unit - | Error e -> - failf "error while writing %a: %a" pp_path k Sdk.Ctl.Client.pp_error e - -let set_ip_opt ctl k = function - | None -> Lwt.return_unit - | Some ip -> set_ip ctl k ip - -let get_mac ctl = - Sdk.Ctl.Client.read ctl ["mac"] >>= function - | Ok None -> Lwt.return None - | Ok Some s -> Lwt.return @@ Macaddr.of_string (String.trim s) - | Error e -> failf "get_mac: %a" Sdk.Ctl.Client.pp_error e - -let start () dhcp_codes net ctl = - get_mac ctl >>= fun mac -> - Netif_fd.connect ?mac net >>= fun net -> - let requests = match dhcp_codes with - | [] -> default_options - | l -> - List.fold_left (fun acc c -> match parse_option_code c with - | Ok x -> x :: acc - | Error e -> - Log.err (fun l -> l "error: %s" e); - acc - ) [] l - in - Dhcp_client.connect ~requests net >>= fun stream -> - Lwt_stream.last_new stream >>= fun result -> - let result = of_lease result in - Log.info (fun l -> l "found lease: %a" pp result); - set_ip ctl ["ip"] result.address >>= fun () -> - set_ip_opt ctl ["gateway"] result.gateway - -(* FIXME: Main end *) - -let fd (x: int) = (Obj.magic x: Unix.file_descr) - -let flow (x: int) = Sdk.Init.file_descr (Lwt_unix.of_unix_file_descr @@ fd x) - -let start () dhcp_codes net ctl = - Lwt_main.run ( - Lwt_switch.with_switch @@ fun switch -> - let net = fd net in - let flow = - Capnp_rpc_lwt.Endpoint.of_flow ~switch (module Mirage_flow_lwt) (flow ctl) - in - let client = Capnp_rpc_lwt.CapTP.of_endpoint ~switch flow in - let ctl = Capnp_rpc_lwt.CapTP.bootstrap client in - start () dhcp_codes net ctl - ) - -let run = - Term.(const start $ setup_log $ dhcp_codes $ net $ ctl), - Term.info "dhcp-client" ~version:"0.0" - -let () = match Term.eval run with - | `Error _ -> exit 1 - | `Ok () |`Help |`Version -> exit 0 diff --git a/projects/miragesdk/src/dhcp-client-calf/unikernel.mli b/projects/miragesdk/src/dhcp-client-calf/unikernel.mli deleted file mode 100644 index e790aeb70..000000000 --- a/projects/miragesdk/src/dhcp-client-calf/unikernel.mli +++ /dev/null @@ -1 +0,0 @@ -(* empty *) diff --git a/projects/miragesdk/src/dhcp-client.opam b/projects/miragesdk/src/dhcp-client.opam new file mode 100644 index 000000000..b26a46459 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client.opam @@ -0,0 +1,14 @@ +opam-version: "1.2" +maintainer: "Thomas Gazagnaire " +authors: "Thomas Gazagnaire " +homepage: "https://github.com/linuxkit/linuxkit" +bug-reports: "https://github.com/linuxkit/linuxkit/issues" +license: "Apache" +dev-repo: "https://github.com/linuxkit/linuxkit.git" +build: ["jbuilder" "build" "-p" name "-j" jobs] +depends: [ + "jbuilder" {build & >= "1.0+beta10"} + "charrua-client" + "charrua-client-mirage" + "sdk" +] diff --git a/projects/miragesdk/src/dhcp-client/bpf/jbuild b/projects/miragesdk/src/dhcp-client/bpf/jbuild deleted file mode 100644 index e4afaccb1..000000000 --- a/projects/miragesdk/src/dhcp-client/bpf/jbuild +++ /dev/null @@ -1,6 +0,0 @@ -(jbuild_version 1) - -(library - ((name bpf_dhcp) - (c_names (dhcp)) -)) diff --git a/projects/miragesdk/src/dhcp-client/bpf/dhcp.c b/projects/miragesdk/src/dhcp-client/dhcp.c similarity index 96% rename from projects/miragesdk/src/dhcp-client/bpf/dhcp.c rename to projects/miragesdk/src/dhcp-client/dhcp.c index 9382e4918..5632274da 100644 --- a/projects/miragesdk/src/dhcp-client/bpf/dhcp.c +++ b/projects/miragesdk/src/dhcp-client/dhcp.c @@ -28,15 +28,6 @@ #include -#ifdef __linux__ - -#include -#include - -#include - -#include - #include #include #include @@ -52,6 +43,15 @@ #include "caml/custom.h" #include "caml/bigarray.h" +#ifdef __linux__ + +#include +#include + +#include + +#include + #define BOOTPC 68 #define BPF_WHOLEPACKET 0x0fffffff @@ -80,6 +80,13 @@ static const struct sock_filter bootp_bpf_filter [] = { BPF_STMT(BPF_RET + BPF_K, 0), }; +#else + +struct sock_filter {}; +static const struct sock_filter bootp_bpf_filter [] = {}; + +#endif + /* Filters */ CAMLprim value bpf_filter(value vunit) { @@ -89,5 +96,3 @@ CAMLprim value bpf_filter(value vunit) memcpy(String_val(vfilter), bootp_bpf_filter, sizeof(bootp_bpf_filter)); CAMLreturn (vfilter); } - -#endif diff --git a/projects/miragesdk/src/dhcp-client/engine.ml b/projects/miragesdk/src/dhcp-client/engine.ml new file mode 100644 index 000000000..ff73016e6 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/engine.ml @@ -0,0 +1,56 @@ +open Lwt.Infix + +let src = Logs.Src.create "dhcp-client/engine" +module Log = (val Logs.src_log src : Logs.LOG) + +type t = { + address: Ipaddr.V4.t; + gateway: Ipaddr.V4.t option; + domain: string option; + search: string option; + nameservers: Ipaddr.V4.t list; +} + +(* FIXME: we (still) lose lots of info here *) +let of_lease (t: Dhcp_wire.pkt) = + let gateway = match Dhcp_wire.collect_routers t.Dhcp_wire.options with + | [] -> None + | n::_ -> Some n + in + { address = t.Dhcp_wire.yiaddr; + gateway; + domain = Dhcp_wire.find_domain_name t.Dhcp_wire.options; + search = Dhcp_wire.find_domain_search t.Dhcp_wire.options; + nameservers = Dhcp_wire.collect_dns_servers t.Dhcp_wire.options } + +let pp ppf t = + Fmt.pf ppf "\n\ + address : %a\n\ + domain : %a\n\ + search : %a\n\ + nameservers: %a\n" + Ipaddr.V4.pp_hum t.address + Fmt.(option ~none:(unit "--") string) t.domain + Fmt.(option ~none:(unit "--") string) t.search + Fmt.(list ~sep:(unit " ") Ipaddr.V4.pp_hum) t.nameservers + +module Make + (Time: Sdk.Time.S) + (Net : Sdk.Net.S) + (Host: Sdk.Host.S) = +struct + + module Dhcp_client = Dhcp_client_lwt.Make(Time)(Net) + + let start _ net host = + Host.dhcp_options host >>= fun requests -> + Dhcp_client.connect ~requests net >>= fun stream -> + Lwt_stream.last_new stream >>= fun result -> + let result = of_lease result in + Log.info (fun l -> l "found lease: %a" pp result); + Host.set_ip host result.address >>= fun () -> + (match result.gateway with + | None -> Lwt.return_unit + | Some ip -> Host.set_gateway host ip) + +end diff --git a/projects/miragesdk/src/dhcp-client/engine.mli b/projects/miragesdk/src/dhcp-client/engine.mli new file mode 100644 index 000000000..e3bb932c8 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/engine.mli @@ -0,0 +1,11 @@ +(** [Engine] is the DHCP client engine. It access network traffic via + the [Net] MirageOS's network interface, and use [Act] to modify IP + tables and other low-level caches. *) + +module Make + (Time: Sdk.Time.S) + (Net : Sdk.Net.S) + (Act : Sdk.Host.S): +sig + val start: Time.t -> Net.t -> Act.t -> unit Lwt.t +end diff --git a/projects/miragesdk/src/dhcp-client/jbuild b/projects/miragesdk/src/dhcp-client/jbuild index 50fc50383..c0054db78 100644 --- a/projects/miragesdk/src/dhcp-client/jbuild +++ b/projects/miragesdk/src/dhcp-client/jbuild @@ -1,7 +1,13 @@ (jbuild_version 1) +(library + ((name dhcp_filter) + (modules ()) + (c_names (dhcp)))) + (executables - ((names (main)) - (libraries (sdk bpf_dhcp bos cmdliner fmt.cli logs.fmt logs.cli fmt.tty)) - (flags (-cclib -static)) - )) + ((names (main main_eng main_act main_net)) + (public_names (dhcp-client dhcp-client-eng dhcp-client-act dhcp-client-net)) + (package dhcp-client) + (libraries (sdk charrua-client charrua-client-mirage dhcp_filter + cmdliner fmt.cli logs.fmt logs.cli fmt.tty)))) diff --git a/projects/miragesdk/src/dhcp-client/main.ml b/projects/miragesdk/src/dhcp-client/main.ml index 67c8488a7..b520ffcde 100644 --- a/projects/miragesdk/src/dhcp-client/main.ml +++ b/projects/miragesdk/src/dhcp-client/main.ml @@ -1,112 +1,26 @@ open Lwt.Infix -open Sdk -open Astring -let src = Logs.Src.create "dhcp-client" ~doc:"DHCP client" -module Log = (val Logs.src_log src : Logs.LOG) +module Act = Sdk.Host.Local +module Net = Network.Make(Act) +module Eng = Engine.Make(Sdk.Time.Local)(Net)(Act) -module Handlers = struct +let main intf = + Act.connect intf >>= fun act -> + Net.connect act >>= fun net -> + Eng.start () net act - (* System handlers *) - - let contents_of_diff = function - | `Added (_, `Contents (v, _)) - | `Updated (_, (_, `Contents (v, _))) -> Some v - | _ -> None - - let with_ip str f = - match Ipaddr.V4.of_string (String.trim str) with - | Some ip -> - Log.info (fun l -> l "SET IP to %a" Ipaddr.V4.pp_hum ip); - f ip - | None -> - Log.err (fun l -> l "%s is not a valid IP" str); - Lwt.return_unit - - let ip ~ethif t = - Ctl.KV.watch_key t ["ip"] (fun diff -> - match contents_of_diff diff with - | None -> Lwt.return_unit - | Some ip -> with_ip ip (fun ip -> Net.set_ip ethif ip) - ) - - let gateway t = - Ctl.KV.watch_key t ["gateway"] (fun diff -> - match contents_of_diff diff with - | None -> Lwt.return_unit - | Some gw -> with_ip gw (fun gw -> Net.set_gateway gw) - ) - - let handlers ~ethif = [ - ip ~ethif; - gateway; - ] - - let watch ~ethif db = - Lwt_list.map_p (fun f -> f db) (handlers ~ethif) >>= fun _ -> - let t, _ = Lwt.task () in - t - -end - -external dhcp_filter: unit -> string = "bpf_filter" - -let t = Init.Pipe.v () - -(* -let default_cmd = [ - "/calf/dhcp-client-calf"; "--net=3"; "--ctl=4"; "-vv"; -] -*) - -let default_cmd = [ - "/usr/bin/runc"; "run"; "--preserve-fds"; "2"; "--bundle"; "calf"; "calf" -] - -let read_cmd file = - if Sys.file_exists file then - let ic = open_in_bin file in - let line = input_line ic in - String.cuts ~sep:" " line - else - failwith ("Cannot read " ^ file) - -let infof fmt = - Fmt.kstrf (fun msg () -> - let date = Int64.of_float (Unix.gettimeofday ()) in - Irmin.Info.v ~date ~author:"priv" msg - ) fmt - -let run () cmd ethif path = - let cmd = match cmd with - | None -> default_cmd - | Some f -> read_cmd f - in - Lwt_main.run ( - Lwt_switch.with_switch @@ fun switch -> - let routes = [ - ["ip"] , [`Write]; - ["mac"] , [`Read ]; - ["gateway"], [`Write]; - ] in - Ctl.v path >>= fun db -> - let ctl fd = - let service = Ctl.Server.service ~routes db in - let endpoint = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module Mirage_flow_lwt) fd in - ignore (Capnp_rpc_lwt.CapTP.of_endpoint ~switch ~offer:service endpoint) - in - let handlers () = Handlers.watch ~ethif db in - let net = Init.rawlink ~filter:(dhcp_filter ()) ethif in - Net.mac ethif >>= fun mac -> - let mac = Macaddr.to_string mac ^ "\n" in - Ctl.KV.set db ~info:(infof "Add mac") ["mac"] mac >>= fun () -> - Init.run t ~net ~ctl ~handlers cmd - ) - -(* CLI *) +let run () intf = Lwt_main.run (main intf) open Cmdliner +let intf = + let doc = + Arg.info ~docv:"INTF" ~doc:"The interface to listen too." + ["e"; "ethif"] + in + Arg.(value & opt string "eth0" doc) + +(* FIXME: use SDK to write logs *) let setup_log style_renderer level = Fmt_tty.setup_std_outputs ?style_renderer (); Logs.set_level level; @@ -119,28 +33,9 @@ let setup_log style_renderer level = let setup_log = Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ()) -let cmd = - let doc = - Arg.info ~docv:"CMD" ~doc:"Command to run the calf process." ["cmd"] - in - Arg.(value & opt (some string) None & doc) - -let ethif = - let doc = - Arg.info ~docv:"NAME" ~doc:"The interface to listen too." ["ethif"] - in - Arg.(value & opt string "eth0" & doc) - -let path = - let doc = - Arg.info ~docv:"DIR" - ~doc:"The directory where control state will be stored." ["path"] - in - Arg.(value & opt string "/data" & doc) - let run = - Term.(const run $ setup_log $ cmd $ ethif $ path), - Term.info "dhcp-client" ~version:"0.0" + Term.(const run $ setup_log $ intf), + Term.info "dhcp-client" ~version:"%%VERSION%%" let () = match Term.eval run with | `Error _ -> exit 1 diff --git a/projects/miragesdk/src/dhcp-client/main.mli b/projects/miragesdk/src/dhcp-client/main.mli index e790aeb70..0d81ac122 100644 --- a/projects/miragesdk/src/dhcp-client/main.mli +++ b/projects/miragesdk/src/dhcp-client/main.mli @@ -1 +1,3 @@ -(* empty *) +(* FIXME *) + +(* Link everything together! *) diff --git a/projects/miragesdk/src/dhcp-client/main_act.ml b/projects/miragesdk/src/dhcp-client/main_act.ml new file mode 100644 index 000000000..03c017676 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/main_act.ml @@ -0,0 +1,67 @@ +open Lwt.Infix + +let src = Logs.Src.create "dhcp-client/actuator" +module Log = (val Logs.src_log src : Logs.LOG) + +module Flow = Sdk.Flow.Fd +module Host = Sdk.Host.Local +module N = Sdk.Host.Server(Flow)(Host) +module E = Sdk.Host.Server(Flow)(Host) + +let start ~intf ~net ~eng = + Lwt_switch.with_switch @@ fun switch -> + Flow.connect net >>= fun net -> + Flow.connect eng >>= fun eng -> + Host.connect intf >>= fun host -> + N.listen ~switch (N.service host) net; + E.listen ~switch (E.service host) eng; + fst (Lwt.task ()) + +let run () intf net eng = Lwt_main.run (start ~intf ~net ~eng) + +open Cmdliner + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + let pp_header ppf x = + Fmt.pf ppf "%5d: %a " (Unix.getpid ()) Logs_fmt.pp_header x + in + Logs.set_reporter (Logs_fmt.reporter ~pp_header ()); + () + +let setup_log = + Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ()) + +let intf = + let doc = + Arg.info ~docv:"INTF" ~doc:"The interface to listen too." + ["e"; "ethif"] + in + Arg.(value & opt string "eth0" doc) + +let eng = + let doc = + Arg.info + ~docv:"FD" + ~doc:"The file descriptor to use to connect to the DHCP client engine." + ["e"; "engine"] + in + Arg.(value & opt int 3 & doc) + +let net = + let doc = + Arg.info + ~docv:"FD" + ~doc:"The file descriptor to use to connect to the network proxy." + ["n"; "network"] + in + Arg.(value & opt int 4 & doc) + +let run = + Term.(const run $ setup_log $ intf $ net $ eng), + Term.info "dhcp-client-actuator" ~version:"%%VERSION%%" + +let () = match Term.eval run with + | `Error _ -> exit 1 + | `Ok () |`Help |`Version -> exit 0 diff --git a/projects/miragesdk/src/dhcp-client/main_act.mli b/projects/miragesdk/src/dhcp-client/main_act.mli new file mode 100644 index 000000000..e69de29bb diff --git a/projects/miragesdk/src/dhcp-client/main_eng.ml b/projects/miragesdk/src/dhcp-client/main_eng.ml new file mode 100644 index 000000000..15ab50ed9 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/main_eng.ml @@ -0,0 +1,58 @@ +open Lwt.Infix + +module Flow = Sdk.Flow.Fd +module Time = Sdk.Time.Local +module Net = Sdk.Net.Client(Flow) +module Act = Sdk.Host.Client(Flow) + +module Main = Engine.Make(Time)(Net)(Act) + +let start ~net ~act = + Lwt_switch.with_switch @@ fun switch -> + Flow.connect net >>= fun net -> + Net.connect ~switch net >>= fun net -> + Flow.connect act >>= fun act -> + Act.connect ~switch act >>= fun act -> + Main.start () net act + +let run () net act = Lwt_main.run (start ~net ~act) + +open Cmdliner + +let net = + let doc = + Arg.info + ~docv:"FD" + ~doc:"The file descriptor to use to connect to the network proxy." + ["e"; "engine"] + in + Arg.(value & opt int 3 & doc) + +let act = + let doc = + Arg.info + ~docv:"FD" + ~doc:"The file descriptor to use to connect to the host actuator." + ["a"; "actuator"] + in + Arg.(value & opt int 4 & doc) + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + let pp_header ppf x = + Fmt.pf ppf "%5d: %a " (Unix.getpid ()) Logs_fmt.pp_header x + in + Logs.set_reporter (Logs_fmt.reporter ~pp_header ()); + () + +let setup_log = + Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ()) + +let run = + Term.(const run $ setup_log $ net $ act), + Term.info "dhcp-client-engine" ~version:"%%VERSION%%" + +let () = match Term.eval run with + | `Error _ -> exit 1 + | `Ok () |`Help |`Version -> exit 0 diff --git a/projects/miragesdk/src/dhcp-client/main_eng.mli b/projects/miragesdk/src/dhcp-client/main_eng.mli new file mode 100644 index 000000000..e69de29bb diff --git a/projects/miragesdk/src/dhcp-client/main_net.ml b/projects/miragesdk/src/dhcp-client/main_net.ml new file mode 100644 index 000000000..bbb3f08c4 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/main_net.ml @@ -0,0 +1,58 @@ +open Lwt.Infix + +module Flow = Sdk.Flow.Fd +module Act = Sdk.Host.Client(Flow) +module Net = Network.Make(Act) + +module Main = Sdk.Net.Server(Flow)(Net) + +let start ~eng ~act = + Lwt_switch.with_switch @@ fun switch -> + Flow.connect act >>= fun act -> + Act.connect ~switch act >>= fun act -> + Flow.connect eng >>= fun eng -> + Net.connect act >>= fun net -> + Main.listen ~switch (Main.service net) eng; + fst (Lwt.task ()) + +let run () eng act = Lwt_main.run (start ~eng ~act) + +open Cmdliner + +let eng = + let doc = + Arg.info + ~docv:"FD" + ~doc:"The file descriptor to use to connect to the DHCP client engine." + ["e"; "engine"] + in + Arg.(value & opt int 3 & doc) + +let act = + let doc = + Arg.info + ~docv:"FD" + ~doc:"The file descriptor to use to connect to the host actuator." + ["a"; "actuator"] + in + Arg.(value & opt int 4 & doc) + +let setup_log style_renderer level = + Fmt_tty.setup_std_outputs ?style_renderer (); + Logs.set_level level; + let pp_header ppf x = + Fmt.pf ppf "%5d: %a " (Unix.getpid ()) Logs_fmt.pp_header x + in + Logs.set_reporter (Logs_fmt.reporter ~pp_header ()); + () + +let setup_log = + Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ()) + +let run = + Term.(const run $ setup_log $ eng $ act), + Term.info "dhcp-client-network" ~version:"%%VERSION%%" + +let () = match Term.eval run with + | `Error _ -> exit 1 + | `Ok () |`Help |`Version -> exit 0 diff --git a/projects/miragesdk/src/dhcp-client/main_net.mli b/projects/miragesdk/src/dhcp-client/main_net.mli new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/main_net.mli @@ -0,0 +1 @@ + diff --git a/projects/miragesdk/src/dhcp-client/network.ml b/projects/miragesdk/src/dhcp-client/network.ml new file mode 100644 index 000000000..e74d6de48 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/network.ml @@ -0,0 +1,15 @@ +open Lwt.Infix + +external dhcp_filter: unit -> string = "bpf_filter" + +module Make (Act: Sdk.Host.S) = struct + + include Sdk.Net.Rawlink + + let connect act = + let filter = dhcp_filter () in + Act.mac act >>= fun mac -> + Act.interface act >>= fun intf -> + Sdk.Net.Rawlink.connect ~filter ~mac intf + +end diff --git a/projects/miragesdk/src/dhcp-client/network.mli b/projects/miragesdk/src/dhcp-client/network.mli new file mode 100644 index 000000000..93ded5537 --- /dev/null +++ b/projects/miragesdk/src/dhcp-client/network.mli @@ -0,0 +1,7 @@ +(** [Network] provides a MirageOS's network interface with only DHCP + traffic. It uses [Act] to get the host's MAC address. *) + +module Make (Act: Sdk.Host.S): sig + include Sdk.Net.S + val connect: Act.t -> t Lwt.t +end diff --git a/projects/miragesdk/src/init-dev.sh b/projects/miragesdk/src/init-dev.sh deleted file mode 100644 index 079892bb9..000000000 --- a/projects/miragesdk/src/init-dev.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh - -set -eu - -opam config exec -- /bin/sh diff --git a/projects/miragesdk/src/sdk.opam b/projects/miragesdk/src/sdk.opam index 701aba191..197d6c5a5 100644 --- a/projects/miragesdk/src/sdk.opam +++ b/projects/miragesdk/src/sdk.opam @@ -1,33 +1,28 @@ opam-version: "1.2" -name: "sdk" -version: "dev" maintainer: "Thomas Gazagnaire " authors: "Thomas Gazagnaire " homepage: "https://github.com/linuxkit/linuxkit" bug-reports: "https://github.com/linuxkit/linuxkit/issues" license: "Apache" dev-repo: "https://github.com/linuxkit/linuxkit.git" -build: ["jbuilder" "build" "-p" name "-j" jobs "@install"] +build: ["jbuilder" "build" "-p" name "-j" jobs] depends: [ - "jbuilder" {build & >= "1.0+beta7"} - "ocamlfind" {build} + "jbuilder" {build & >= "1.0+beta10"} "cstruct" - "cstruct-lwt" "lwt" - "logs" "astring" "rresult" - "mirage-flow-lwt" - "mirage-channel-lwt" - "io-page" - "irmin-git" - "decompress" + "logs" + "astring" + "mirage-time-lwt" + "mirage-flow-lwt" {>= "1.4.0"} + "mirage-flow-unix" {>= "1.4.0"} + "mirage-flow-rawlink" + "irmin-mem" "capnp-rpc-lwt" "rawlink" "tuntap" {= "1.0.0"} "ipaddr" - "bos" - "mirage-flow-rawlink" + "mirage-net-flow" "mirage-net-fd" - "charrua-client" - "mirage-types-lwt" + "charrua-core" "alcotest" {test} ] diff --git a/projects/miragesdk/src/sdk/conf.ml b/projects/miragesdk/src/sdk/conf.ml new file mode 100644 index 000000000..0a1e672df --- /dev/null +++ b/projects/miragesdk/src/sdk/conf.ml @@ -0,0 +1,225 @@ +open Lwt.Infix +open Capnp_rpc_lwt + +let src = Logs.Src.create "init" ~doc:"Init steps" +module Log = (val Logs.src_log src : Logs.LOG) + +let pp_path = Fmt.(brackets (list ~sep:(const string "/") string)) + +let () = + Irmin.Private.Watch.set_listen_dir_hook + (fun _ _ _ -> Lwt.return (fun () -> Lwt.return_unit)) + (* FIXME: inotify need some unknown massaging. *) + (* Irmin_watcher.hook *) + +exception Undefined_field of int + +let err_not_found fmt = Fmt.kstrf (fun x -> Lwt.fail_invalid_arg x) fmt +let failf fmt = Fmt.kstrf (fun x -> Lwt.fail_with x) fmt + + +module R = Api.Reader.Conf +module B = Api.Builder.Conf + +module Callback = struct + + let service f = + B.Callback.local @@ object (_: B.Callback.service) + inherit B.Callback.service + method f_impl req = + let module P = R.Callback.F_params in + let params = P.of_payload req in + let change = P.change_get params in + Service.return_lwt (fun () -> + f change >|= fun () -> + Ok (Service.Response.create_empty ()) + ) + end + + let client t change = + let module P = B.Callback.F_params in + let req, p = Capability.Request.create P.init_pointer in + P.change_set p change; + Capability.call_for_value t R.Callback.f_method req >>= function + | Ok _ -> Lwt.return () + | Error e -> failf "error: f(%s) -> %a" change Capnp_rpc.Error.pp e + +end + + +module Client (F: Flow.S) = struct + + type t = R.t Capability.t + + let pp_error = Capnp_rpc.Error.pp + + let connect ~switch ?tags f = + let ep = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) f in + let client = Capnp_rpc_lwt.CapTP.connect ~switch ?tags ep in + Capnp_rpc_lwt.CapTP.bootstrap client |> Lwt.return + + let find t path = + let module P = B.Read_params in + let req, p = Capability.Request.create P.init_pointer in + P.path_set_list p path |> ignore; + Capability.call_for_value t R.read_method req >>= function + | Error e -> failf "error read(%a): %a" pp_path path pp_error e + | Ok r -> + let module R = R.Response in + let r = R.of_payload r in + match R.get r with + | R.Ok data -> Lwt.return (Some data) + | R.NotFound -> Lwt.return None + | R.Undefined _ -> failf "invalid return" + + let get t path = + find t path >>= function + | Some v -> Lwt.return v + | None -> err_not_found "get %a" pp_path path + + let set t path data = + let module P = B.Write_params in + let req, p = Capability.Request.create P.init_pointer in + P.path_set_list p path |> ignore; + P.data_set p data; + Capability.call_for_value t R.write_method req >>= function + | Ok _ -> Lwt.return () + | Error e -> failf "error write(%a): %a" pp_path path pp_error e + + let delete t path = + let module P = B.Delete_params in + let req, p = Capability.Request.create P.init_pointer in + P.path_set_list p path |> ignore; + Capability.call_for_value t R.delete_method req >>= function + | Ok _ -> Lwt.return () + | Error e -> failf "error delete(%a): %a" pp_path path pp_error e + + let watch t path f = + let module P = B.Watch_params in + let req, p = Capability.Request.create P.init_pointer in + P.path_set_list p path |> ignore; + let callback = Capability.Request.export req (Callback.service f) in + P.callback_set p (Some callback); + Capability.call_for_value t R.watch_method req >>= function + | Ok _ -> Lwt.return () + | Error e -> failf "error watch(%a): %a" pp_path path pp_error e + +end + +module Server (F: Flow.S) = struct + + module KV = struct + + module Store = Irmin_mem.KV + + include Store(Irmin.Contents.String) + + let v () = + let config = Irmin_mem.config () in + Repo.v config >>= fun repo -> + of_branch repo "calf" + + end + + type op = [ `Read | `Write | `Delete ] + + type t = R.t Capability.t + + let infof fmt = + Fmt.kstrf (fun msg () -> + let date = Int64.of_float (Unix.gettimeofday ()) in + Irmin.Info.v ~date ~author:"calf" msg + ) fmt + + let not_allowed path = + let err = Fmt.strf "%a is not an allowed path" pp_path path in + Log.err (fun l -> l "%s" err); + err + + let write db key value = + let info = infof "Updating %a" KV.Key.pp key in + KV.set db ~info key value + + let delete db key = + let info = infof "Removing %a" KV.Key.pp key in + KV.remove db ~info key + + let contents_of_diff = function + | `Added (_, `Contents (v, _)) + | `Updated (_, (_, `Contents (v, _))) -> Some v + | _ -> None + + let watch ~switch db key f = + KV.watch_key db key (fun diff -> + match contents_of_diff diff with + | Some v -> f v + | None -> Lwt.return () + ) >|= fun w -> + Lwt_switch.add_hook (Some switch) (fun () -> KV.unwatch w) + + let with_permission_check ~routes op key fn = + match List.assoc key routes with + | perms when List.mem op perms -> fn () + | _ -> Service.fail "%s" (not_allowed key) + | exception Not_found -> Service.fail "%s" (not_allowed key) + + let service ~switch ~routes db = + B.local @@ object (_ : B.service) + inherit B.service + method read_impl req = + let module P = R.Read_params in + let params = P.of_payload req in + let key = P.path_get_list params in + with_permission_check ~routes `Read key @@ fun () -> + Service.return_lwt (fun () -> + let module R = B.Response in + let resp, r = Service.Response.create R.init_pointer in + (KV.find db key >|= function + | None -> R.not_found_set r + | Some x -> R.ok_set r x + ) >|= fun () -> + Ok resp + ) + + method write_impl req = + let module P = R.Write_params in + let params = P.of_payload req in + let key = P.path_get_list params in + let value = P.data_get params in + with_permission_check ~routes `Write key @@ fun () -> + Service.return_lwt (fun () -> + write db key value >|= fun () -> + Ok (Service.Response.create_empty ()) + ) + + method delete_impl req = + let module P = R.Delete_params in + let params = P.of_payload req in + let key = P.path_get_list params in + with_permission_check ~routes `Delete key @@ fun () -> + Service.return_lwt (fun () -> + delete db key >|= fun () -> + Ok (Service.Response.create_empty ()) + ) + + method watch_impl req = + let module P = R.Watch_params in + let params = P.of_payload req in + let key = P.path_get_list params in + match P.callback_get params with + | None -> failwith "No watcher callback given" + | Some i -> + let callback = Payload.import req i in + with_permission_check ~routes `Read key @@ fun () -> + Service.return_lwt (fun () -> + watch ~switch db key (Callback.client callback) >|= fun () -> + Ok (Service.Response.create_empty ()) + ) + end + + let listen ~switch ?tags service fd = + let endpoint = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) fd in + Capnp_rpc_lwt.CapTP.connect ~switch ?tags ~offer:service endpoint + |> ignore + +end diff --git a/projects/miragesdk/src/sdk/conf.mli b/projects/miragesdk/src/sdk/conf.mli new file mode 100644 index 000000000..d916f36f1 --- /dev/null +++ b/projects/miragesdk/src/sdk/conf.mli @@ -0,0 +1,73 @@ +(** [Conf] exposes functions to to manipulate configuration data. *) + +exception Undefined_field of int + +module Client (F: Flow.S): sig + + (** [Client] exposes functions to read, write and watch + configuration data. The configuration data is organized as a + simple Irmin's KV store. + + {e TODO: decide if we want to support test_and_set (instead of + write).} *) + + type t + (** The type for client state. *) + + val connect: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> F.t -> t Lwt.t + (** [connect f] connects to the flow [f]. *) + + val find: t -> string list -> string option Lwt.t + (** [find t k] is the value associated with the key [k] in the + control plane state. Return [None] if no value is associated to + [k]. *) + + val get: t -> string list -> string Lwt.t + (** [get t k] is similar to [fint t k] but raise `Invalid_argument` + if the [k] is not a valid path. *) + + val set: t -> string list -> string -> unit Lwt.t + (** [set t p v] associates [v] to the key [k] in the control plane + state. *) + + val delete: t -> string list -> unit Lwt.t + (** [delete t k] remove [k]'s binding in the control plane state. *) + + val watch: t -> string list -> (string -> unit Lwt.t) -> unit Lwt.t + (** [watch t k f] calls [f] on every change of the key [k]. *) + +end + +module Server (F: Flow.S): sig + + (** [Server] exposes functions to serve configuration data over + MirageOS flows. *) + + (** [KV] is the Irmin store storing configuration data. *) + module KV: sig + + include Irmin.KV with type contents = string + + val v: unit -> t Lwt.t + (** [v ()] is the KV store storing the control plane state. *) + + end + + type t + (** The type for server state. *) + + type op = [ `Read | `Write | `Delete ] + (** The type for operations to perform on routes. *) + + val service: switch:Lwt_switch.t -> + routes:(string list * op list) list -> KV.t -> t + (** [service ~switch ~routes kv] is the thread exposing the KV store [kv], + holding control plane state, running inside the privileged + container. [routes] are the routes exposed by the server to the + calf and [kv] is the control plane state. *) + + val listen: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> t -> F.t -> unit + (** [listen ~switch s m f] exposes service [s] on the flow + [f]. [switch] can be used to stop the server. *) + +end diff --git a/projects/miragesdk/src/sdk/ctl.ml b/projects/miragesdk/src/sdk/ctl.ml deleted file mode 100644 index 57aee6f0f..000000000 --- a/projects/miragesdk/src/sdk/ctl.ml +++ /dev/null @@ -1,156 +0,0 @@ -open Lwt.Infix -open Capnp_rpc_lwt - -let src = Logs.Src.create "init" ~doc:"Init steps" -module Log = (val Logs.src_log src : Logs.LOG) - -(* FIXME: to avoid linking with gmp *) -module No_IO = struct - type ic = unit - type oc = unit - type ctx = unit - let with_connection ?ctx:_ _uri ?init:_ _f = Lwt.fail_with "not allowed" - let read_all _ic = Lwt.fail_with "not allowed" - let read_exactly _ic _n = Lwt.fail_with "not allowed" - let write _oc _buf = Lwt.fail_with "not allowed" - let flush _oc = Lwt.fail_with "not allowed" - let ctx () = Lwt.return_none -end - -(* FIXME: we don't use Irmin_unix.Git.FS.KV to avoid linking with gmp *) -module Store = Irmin_git.FS.KV(No_IO)(Inflator)(Io_fs) -module KV = Store(Irmin.Contents.String) - -let pp_path = Fmt.(brackets (list ~sep:(const string "/") string)) - -let v path = - let config = Irmin_git.config path in - KV.Repo.v config >>= fun repo -> - KV.of_branch repo "calf" - -let () = - Irmin.Private.Watch.set_listen_dir_hook - (fun _ _ _ -> Lwt.return (fun () -> Lwt.return_unit)) - (* FIXME: inotify need some unknown massaging. *) - (* Irmin_watcher.hook *) - -module C = Mirage_channel_lwt.Make(Mirage_flow_lwt) - -exception Undefined_field of int - -let errorf fmt = - Fmt.kstrf (fun x -> Error (`Msg x)) fmt - -module Client = struct - - module C = Api.Reader.Ctl - - type error = [`Msg of string] - let pp_error ppf (`Msg s) = Fmt.string ppf s - - type t = C.t Capability.t - - let read t path = - let module P = Api.Builder.Ctl.Read_params in - let module R = Api.Reader.Response in - let req, p = Capability.Request.create P.init_pointer in - P.path_set_list p path |> ignore; - Capability.call_for_value t C.read_method req >|= function - | Error e -> errorf "error: read(%a) -> %a" pp_path path Capnp_rpc.Error.pp e - | Ok r -> - let r = R.of_payload r in - match R.get r with - | R.Ok data -> Ok (Some data) - | R.NotFound -> Ok None - | R.Undefined _ -> Error (`Msg "invalid return") - - let write t path data = - let module P = Api.Builder.Ctl.Write_params in - let req, p = Capability.Request.create P.init_pointer in - P.path_set_list p path |> ignore; - P.data_set p data; - Capability.call_for_value t C.write_method req >|= function - | Ok _ -> Ok () - | Error e -> errorf "error: write(%a) -> %a" pp_path path Capnp_rpc.Error.pp e - - let delete t path = - let module P = Api.Builder.Ctl.Delete_params in - let req, p = Capability.Request.create P.init_pointer in - P.path_set_list p path |> ignore; - Capability.call_for_value t C.delete_method req >|= function - | Ok _ -> Ok () - | Error e -> errorf "error: delete(%a) -> %a" pp_path path Capnp_rpc.Error.pp e - -end - -module Server = struct - - type op = [ `Read | `Write | `Delete ] - - type t = Api.Reader.Ctl.t Capability.t - - let infof fmt = - Fmt.kstrf (fun msg () -> - let date = Int64.of_float (Unix.gettimeofday ()) in - Irmin.Info.v ~date ~author:"calf" msg - ) fmt - - let not_allowed path = - let err = Fmt.strf "%a is not an allowed path" pp_path path in - Log.err (fun l -> l "%s" err); - err - - let write db key value = - let info = infof "Updating %a" KV.Key.pp key in - KV.set db ~info key value - - let delete db key = - let info = infof "Removing %a" KV.Key.pp key in - KV.remove db ~info key - - let with_permission_check ~routes op key fn = - match List.assoc key routes with - | perms when List.mem op perms -> fn () - | _ -> Service.fail "%s" (not_allowed key) - | exception Not_found -> Service.fail "%s" (not_allowed key) - - let service ~routes db = - Api.Builder.Ctl.local @@ - object (_ : Api.Builder.Ctl.service) - method read req = - let module P = Api.Reader.Ctl.Read_params in - let module R = Api.Builder.Response in - let params = P.of_payload req in - let key = P.path_get_list params in - with_permission_check ~routes `Read key @@ fun () -> - Service.return_lwt (fun () -> - let resp, r = Service.Response.create R.init_pointer in - (KV.find db key >|= function - | None -> R.not_found_set r - | Some x -> R.ok_set r x - ) >|= fun () -> - Ok resp - ) - - method write req = - let module P = Api.Reader.Ctl.Write_params in - let params = P.of_payload req in - let key = P.path_get_list params in - let value = P.data_get params in - with_permission_check ~routes `Write key @@ fun () -> - Service.return_lwt (fun () -> - write db key value >|= fun () -> - Ok (Service.Response.create_empty ()) - ) - - method delete req = - let module P = Api.Reader.Ctl.Delete_params in - let params = P.of_payload req in - let key = P.path_get_list params in - with_permission_check ~routes `Delete key @@ fun () -> - Service.return_lwt (fun () -> - delete db key >|= fun () -> - Ok (Service.Response.create_empty ()) - ) - end -end diff --git a/projects/miragesdk/src/sdk/ctl.mli b/projects/miragesdk/src/sdk/ctl.mli deleted file mode 100644 index c93878dd0..000000000 --- a/projects/miragesdk/src/sdk/ctl.mli +++ /dev/null @@ -1,60 +0,0 @@ -(** [Control] handle the server part of the control path, running in - the privileged container. *) - - -exception Undefined_field of int - -module Client: sig - - (** Client-side of the control plane. The control plane state is a - simple KV store that the client can query with read/write/delete - operations. - - TODO: decide if we want to support test_and_set (instead of - write) and some kind of watches. *) - - type t = Api.Reader.Ctl.t Capnp_rpc_lwt.Capability.t - (** The type for client state. *) - - type error - (** The type for client errors. *) - - val pp_error: error Fmt.t - (** [pp_error] is the pretty-printer for client errors. *) - - val read: t -> string list -> (string option, error) result Lwt.t - (** [read t k] is the value associated with the key [k] in the - control plane state. Return [None] if no value is associated to - [k]. *) - - val write: t -> string list -> string -> (unit, error) result Lwt.t - (** [write t p v] associates [v] to the key [k] in the control plane - state. *) - - val delete: t -> string list -> (unit, error) result Lwt.t - (** [delete t k] remove [k]'s binding in the control plane state. *) - -end - -(** [KV] stores tje control plane state. *) -module KV: Irmin.KV with type contents = string - -val v: string -> KV.t Lwt.t -(** [v p] is the KV store storing the control plane state, located at - path [p] in the filesystem of the privileged container. *) - -module Server: sig - - type t = Api.Reader.Ctl.t Capnp_rpc_lwt.Capability.t - (** The type for server state. *) - - type op = [ `Read | `Write | `Delete ] - (** The type for operations to perform on routes. *) - - val service: routes:(string list * op list) list -> KV.t -> t - (** [service ~routes kv] is the thread exposing the KV store [kv], - holding control plane state, running inside the privileged - container. [routes] are the routes exposed by the server to the - calf and [kv] is the control plane state. *) - -end diff --git a/projects/miragesdk/src/sdk/flow.ml b/projects/miragesdk/src/sdk/flow.ml new file mode 100644 index 000000000..e1a2cea28 --- /dev/null +++ b/projects/miragesdk/src/sdk/flow.ml @@ -0,0 +1,228 @@ +open Lwt.Infix +open Capnp_rpc_lwt + +module type S = sig + type t + include Mirage_flow_lwt.S with type flow = t +end + +module Client (F: S) = struct + + type 'a io = 'a Lwt.t + + module R = Api.Reader.Flow + module B = Api.Builder.Flow + + type t = R.t Capability.t + type flow = t + + type buffer = Cstruct.t + + type error = [ + | `Msg of string + | `Undefined of int + | `Capnp of Capnp_rpc.Error.t + ] + + type write_error = [ + | `Closed + | error + ] + + let pp_error: error Fmt.t = fun ppf -> function + | `Msg s -> Fmt.pf ppf "error %s" s + | `Undefined i -> Fmt.pf ppf "undefined %d" i + | `Capnp e -> Fmt.pf ppf "capnp: %a" Capnp_rpc.Error.pp e + + let pp_write_error: write_error Fmt.t = fun ppf -> function + | `Closed -> Fmt.string ppf "closed" + | #error as e -> pp_error ppf e + + let connect ~switch ?tags f = + let ep = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) f in + let client = Capnp_rpc_lwt.CapTP.connect ~switch ?tags ep in + Capnp_rpc_lwt.CapTP.bootstrap client |> Lwt.return + + let read_result r = + let module R = R.ReadResult in + match R.get (R.of_payload r) with + | R.Data data -> Ok (`Data (Cstruct.of_string data)) + | R.Eof -> Ok `Eof + | R.Error s -> Error (`Msg s) + | R.Undefined i -> Error (`Undefined i) + + let read t = + let module P = B.Read_params in + let req, _ = Capability.Request.create P.init_pointer in + Capability.call_for_value t R.read_method req >|= function + | Error e -> Error (`Capnp e) + | Ok r -> read_result r + + let write_result r = + let module R = R.WriteResult in + match R.get (R.of_payload r) with + | R.Ok -> Ok () + | R.Closed -> Error `Closed + | R.Error s -> Error (`Msg s) + | R.Undefined i -> Error (`Undefined i) + + let write t buf = + let module P = B.Write_params in + let req, p = Capability.Request.create P.init_pointer in + P.buffer_set p (Cstruct.to_string buf); + Capability.call_for_value t R.write_method req >|= function + | Error e -> Error (`Capnp e) + | Ok r -> write_result r + + let writev t bufs = + let module P = B.Writev_params in + let req, p = Capability.Request.create P.init_pointer in + ignore @@ P.buffers_set_list p (List.map Cstruct.to_string bufs); + Capability.call_for_value t R.writev_method req >|= function + | Error e -> Error (`Capnp e) + | Ok r -> write_result r + + let close t = + let module P = B.Close_params in + let req, _ = Capability.Request.create P.init_pointer in + Capability.call_for_value t R.close_method req >|= fun _ -> + () + +end + +module Server (F: S) (Local: S) = struct + + module R = Api.Reader.Flow + module B = Api.Builder.Flow + + let read_result result = + let module R = B.ReadResult in + let resp, r = Service.Response.create R.init_pointer in + let () = match result with + | Ok (`Data buf) -> R.data_set r (Cstruct.to_string buf) + | Ok `Eof -> R.eof_set r + | Error e -> Fmt.kstrf (R.error_set r) "%a" Local.pp_error e + in + Ok resp + + let write_result result = + let module R = B.WriteResult in + let resp, r = Service.Response.create R.init_pointer in + let () = match result with + | Ok () -> R.ok_set r + | Error `Closed -> R.closed_set r + | Error e -> Fmt.kstrf (R.error_set r) "%a" Local.pp_write_error e + in + Ok resp + + let close_result () = + let module R = B.Close_results in + let resp, _ = Service.Response.create R.init_pointer in + Ok resp + + let service t = + B.local @@ + object (_ : B.service) + inherit B.service + + method read_impl _req = + Service.return_lwt (fun () -> Local.read t >|= read_result) + + method write_impl req = + let module P = R.Write_params in + let params = P.of_payload req in + let buf = P.buffer_get params |> Cstruct.of_string in + Service.return_lwt (fun () -> Local.write t buf >|= write_result) + + method writev_impl req = + let module P = R.Writev_params in + let params = P.of_payload req in + let bufs = P.buffers_get_list params |> List.map Cstruct.of_string in + Service.return_lwt (fun () -> Local.writev t bufs >|= write_result) + + method close_impl _req = + Service.return_lwt (fun () -> Local.close t >|= close_result) + + end + + type t = R.t Capability.t + + let listen ~switch ?tags service fd = + let endpoint = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) fd in + Capnp_rpc_lwt.CapTP.connect ~switch ?tags ~offer:service endpoint + |> ignore + +end + +let src = Logs.Src.create "sdk/flow" +module Log = (val Logs.src_log src : Logs.LOG) + +module FIFO = struct + + include Mirage_flow_unix.Fd + type t = flow + + let mkfifo path = + if not (Sys.file_exists path) then + Lwt.catch (fun () -> + Lwt_unix.mkfifo path 0o644 + ) (function + | Unix.Unix_error(Unix.EEXIST, _, _) -> Lwt.return_unit + | e -> Lwt.fail e) + else + Lwt.return_unit + + let of_fd x = x + + let connect path = + Log.debug (fun l -> l "opening FIFO: %s\n%!" path); + mkfifo path >>= fun () -> + Lwt_unix.openfile path [Lwt_unix.O_RDWR] 0o644 + +end + +module Socket = struct + + include Mirage_flow_unix.Fd + type t = flow + + let connect path = + let fd = Lwt_unix.socket Lwt_unix.PF_UNIX Lwt_unix.SOCK_STREAM 0 in + Lwt_unix.connect fd (Lwt_unix.ADDR_UNIX path) >|= fun () -> + fd + +end + +module Rawlink = struct + + include Mirage_flow_rawlink + type t = flow + + let connect ~filter ethif = + Log.debug (fun l -> l "bringing up %s" ethif); + (try Tuntap.set_up_and_running ethif + with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); + Lwt_rawlink.open_link ~filter ethif + |> Lwt.return + +end + +module Fd = struct + + include Mirage_flow_unix.Fd + type t = flow + + let of_fd x = x + + let connect (i:int) = + let fd : Unix.file_descr = Obj.magic i in + Lwt_unix.of_unix_file_descr fd + |> Lwt.return + +end + +module Mem = struct + include Mirage_flow_lwt.F + type t = flow + let connect () = make () +end diff --git a/projects/miragesdk/src/sdk/flow.mli b/projects/miragesdk/src/sdk/flow.mli new file mode 100644 index 000000000..6f8a930e1 --- /dev/null +++ b/projects/miragesdk/src/sdk/flow.mli @@ -0,0 +1,55 @@ +(** MirageOS's flow interface over RPC *) + +module type S = sig + type t + include Mirage_flow_lwt.S with type flow = t +end + +(** {1 Remote Flows} *) + +(** [Client(F)] a an implementation of MirageOS's flow interface over + the flow [F]. Once connected, to the other side of the flow, + behave just as a normal local flow, althought all the calls are + now sent to the remote end. *) +module Client (F: S): sig + include S + val connect: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> F.t -> t Lwt.t +end + +(** [Server(F)(Local)] exposes the flow [Local] as a Cap-n-p RPC + endpoint over the flow [F]. Clients calls done on the other side + of the flow [F] will be executed on the server-side. *) +module Server (F: S) (Local: S): sig + type t + val service: Local.t -> t + val listen: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> t -> F.t -> unit +end + +(** {1 Local Flows} *) + +module FIFO: sig + include S + val of_fd: Lwt_unix.file_descr -> t + val connect: string -> t Lwt.t +end + +module Socket: sig + include S + val connect: string -> t Lwt.t +end + +module Rawlink: sig + include S + val connect: filter:string -> string -> t Lwt.t +end + +module Fd: sig + include S + val of_fd: Lwt_unix.file_descr -> t + val connect: int -> t Lwt.t +end + +module Mem: sig + include S + val connect: unit -> t +end diff --git a/projects/miragesdk/src/sdk/host.ml b/projects/miragesdk/src/sdk/host.ml new file mode 100644 index 000000000..d1652a3a4 --- /dev/null +++ b/projects/miragesdk/src/sdk/host.ml @@ -0,0 +1,212 @@ +(* This file is a big hack and should be replaced ASAP with proper bindings *) + +open Lwt.Infix + +let src = Logs.Src.create "net" ~doc:"Network Configuration" +module Log = (val Logs.src_log src : Logs.LOG) + +module type S = sig + type t + val interface: t -> string Lwt.t + val mac: t -> Macaddr.t Lwt.t + val dhcp_options: t -> Dhcp_wire.option_code list Lwt.t + val set_ip: t -> Ipaddr.V4.t -> unit Lwt.t + val set_gateway: t -> Ipaddr.V4.t -> unit Lwt.t +end + +module Local = struct + + type t = { + intf: string + } + + let connect intf = Lwt.return {intf} + let interface {intf} = Lwt.return intf + + let run fmt = + Fmt.kstrf (fun str -> + Log.info (fun l -> l "run: %S" str); + match Sys.command str with + | 0 -> Lwt.return () + | i -> Fmt.kstrf Lwt.fail_with "%S exited with code %d" str i + ) fmt + + let read fmt = + Fmt.kstrf (fun str -> + Lwt_process.pread ("/bin/sh", [|"/bin/sh"; "-c"; str|]) + ) fmt + + let mac t = + read "ifconfig -a %s | grep -o -E '([[:xdigit:]]{1,2}:){5}[[:xdigit:]]{1,2}'" + t.intf >|= fun mac -> + Macaddr.of_string_exn (String.trim mac) + + let dhcp_options _t = + (* FIXME: read /etc/dhcpc.conf *) + let open Dhcp_wire in + [ + RAPID_COMMIT; + DOMAIN_NAME; + DOMAIN_SEARCH; + HOSTNAME; + CLASSLESS_STATIC_ROUTE; + NTP_SERVERS; + INTERFACE_MTU; + ] + |> Lwt.return + + let set_ip t ip = + (* FIXME: use language bindings to netlink instead *) + (* run "ip addr add %s/24 dev %s" ip ethif *) + run "ifconfig %s %a netmask 255.255.255.0" t.intf Ipaddr.V4.pp_hum ip + + let set_gateway _t gw = + run "ip route add default via %a" Ipaddr.V4.pp_hum gw + +end + +open Lwt.Infix +open Capnp_rpc_lwt + +module R = Api.Reader.Host +module B = Api.Builder.Host + +module Client (F: Flow.S) = struct + + let pp_error = Capnp_rpc.Error.pp + + type t = R.t Capability.t + + let error e = Fmt.kstrf Lwt.fail_with "%a" pp_error e + + let connect ~switch ?tags f = + let ep = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) f in + let client = Capnp_rpc_lwt.CapTP.connect ~switch ?tags ep in + Capnp_rpc_lwt.CapTP.bootstrap client |> Lwt.return + + let intf_result r = + let module R = R.Intf_results in + R.intf_get (R.of_payload r) + + let interface t = + let module P = B.Intf_params in + let req, _ = Capability.Request.create P.init_pointer in + Capability.call_for_value t R.intf_method req >>= function + | Error e -> error e + | Ok r -> Lwt.return (intf_result r) + + let mac_result r = + let module R = R.Mac_results in + let mac = R.mac_get (R.of_payload r) in + Macaddr.of_string_exn mac + + let mac t = + let module P = B.Mac_params in + let req, _ = Capability.Request.create P.init_pointer in + Capability.call_for_value t R.mac_method req >>= function + | Error e -> error e + | Ok r -> Lwt.return (mac_result r) + + let dhcp_options_result r = + let module R = R.DhcpOptions_results in + let options = R.options_get_list (R.of_payload r) in + List.fold_left (fun acc o -> + match Dhcp_wire.string_to_option_code o with + | None -> acc + | Some o -> o :: acc + ) [] options + + let dhcp_options t = + let module P = B.DhcpOptions_params in + let req, _ = Capability.Request.create P.init_pointer in + Capability.call_for_value t R.dhcp_options_method req >>= function + | Error e -> error e + | Ok r -> Lwt.return (dhcp_options_result r) + + let set_ip t ip = + let module P = B.SetIp_params in + let req, p = Capability.Request.create P.init_pointer in + P.ip_set p (Ipaddr.V4.to_string ip); + Capability.call_for_value t R.set_ip_method req >>= function + | Error e -> error e + | Ok _ -> Lwt.return () + + let set_gateway t ip = + let module P = B.SetGateway_params in + let req, p = Capability.Request.create P.init_pointer in + P.ip_set p (Ipaddr.V4.to_string ip); + Capability.call_for_value t R.set_gateway_method req >>= function + | Error e -> error e + | Ok _r -> Lwt.return () + +end + +module Server (F: Flow.S) (N: S) = struct + + type t = B.t Capability.t + + let mac_result result = + let module R = B.Mac_results in + let resp, r = Service.Response.create R.init_pointer in + R.mac_set r (Macaddr.to_string result); + Ok resp + + let intf_result result = + let module R = B.Intf_results in + let resp, r = Service.Response.create R.init_pointer in + R.intf_set r result; + Ok resp + + let dhcp_options_result result = + let module R = B.DhcpOptions_results in + let resp, r = Service.Response.create R.init_pointer in + let result = List.map Dhcp_wire.option_code_to_string result in + let _ = R.options_set_list r result in + Ok resp + + let service t = + B.local @@ + object (_ : B.service) + inherit B.service + + method intf_impl _req = + Service.return_lwt (fun () -> N.interface t >|= intf_result) + + method mac_impl _req = + Service.return_lwt (fun () -> N.mac t >|= mac_result) + + method dhcp_options_impl _req = + Service.return_lwt (fun () -> N.dhcp_options t >|= dhcp_options_result) + + method set_ip_impl req = + let module P = R.SetIp_params in + let params = P.of_payload req in + let ip = P.ip_get params in + Service.return_lwt (fun () -> + let module R = B.SetIp_results in + let resp, _ = Service.Response.create R.init_pointer in + match Ipaddr.V4.of_string ip with + | None ->Lwt.fail_invalid_arg "invalid ip" + | Some ip -> N.set_ip t ip >|= fun () -> Ok resp + ) + + method set_gateway_impl req = + let module P = R.SetGateway_params in + let params = P.of_payload req in + let ip = P.ip_get params in + Service.return_lwt (fun () -> + let module R = B.SetGateway_results in + let resp, _ = Service.Response.create R.init_pointer in + match Ipaddr.V4.of_string ip with + | None ->Lwt.fail_invalid_arg "invalid ip" + | Some ip -> N.set_ip t ip >|= fun () -> Ok resp + ) + + end + + let listen ~switch ?tags service fd = + let endpoint = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) fd in + Capnp_rpc_lwt.CapTP.connect ~switch ?tags ~offer:service endpoint + |> ignore + +end diff --git a/projects/miragesdk/src/sdk/host.mli b/projects/miragesdk/src/sdk/host.mli new file mode 100644 index 000000000..cc21e7148 --- /dev/null +++ b/projects/miragesdk/src/sdk/host.mli @@ -0,0 +1,50 @@ +(** [Net] exposes low-level system functions related to network. *) + +module type S = sig + + type t + (** The type for network host actuator. *) + + val interface: t -> string Lwt.t + (** [interface t] is [t]'s interface. *) + + val mac: t -> Macaddr.t Lwt.t + (** [mac t] is the MAC address of the interface [e]. *) + + val dhcp_options: t -> Dhcp_wire.option_code list Lwt.t + (** [dhcp_options] are the DHCP client options associted with the + [t]'s interface. *) + + val set_ip: t -> Ipaddr.V4.t -> unit Lwt.t + (** [set_ip t ip] sets [t]'s IP address to [ip]. *) + + val set_gateway: t -> Ipaddr.V4.t -> unit Lwt.t + (** [set_gateway ip] set the default host gateway to [ip]. *) + +end + +(** [Client(F)] a an implementation of S interface over the flow + [F]. Once connected, to the other side of the flow, behave just as + a normal local net actuator, althought all the calls are now sent + to the remote end. *) +module Client (F: Flow.S): sig + include S + val connect: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> F.t -> t Lwt.t +end + +(** [Server(F)(Local)] exposes the host networking actuator [Local] as + a Cap-n-p RPC endpoint over the flow [F]. Clients calls executed + on the other end of the flow [F] will be executed on the + server-side. *) +module Server (F: Flow.S) (Local: S): sig + type t + val service: Local.t -> t + val listen: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> t -> F.t -> unit +end + +(** Local network actuactor. At the moment uses a lot of very bad + hacks, should be cleaned up. *) +module Local: sig + include S + val connect: string -> t Lwt.t +end diff --git a/projects/miragesdk/src/sdk/inflator.ml b/projects/miragesdk/src/sdk/inflator.ml deleted file mode 100644 index 4554f32a4..000000000 --- a/projects/miragesdk/src/sdk/inflator.ml +++ /dev/null @@ -1,47 +0,0 @@ -(* https://github.com/Engil/Canopy/blob/3b5573ad0be0fa729b6d4e1ca8b9bb348e164960/inflator.ml *) - -let input_buffer = Bytes.create 0xFFFF -let output_buffer = Bytes.create 0xFFFF -let window = Decompress.Window.create ~proof:Decompress.B.proof_bytes - -let deflate ?(level = 4) buff = - let pos = ref 0 in - let res = Buffer.create (Cstruct.len buff) in - Decompress.Deflate.bytes input_buffer output_buffer - (fun input_buffer -> function - | Some _ -> - let n = min 0xFFFF (Cstruct.len buff - !pos) in - Cstruct.blit_to_bytes buff !pos input_buffer 0 n; - pos := !pos + n; - n - | None -> - let n = min 0xFFFF (Cstruct.len buff - !pos) in - Cstruct.blit_to_bytes buff !pos input_buffer 0 n; - pos := !pos + n; - n) - (fun output_buffer len -> - Buffer.add_subbytes res output_buffer 0 len; - 0xFFFF) - (Decompress.Deflate.default ~proof:Decompress.B.proof_bytes level) - |> function - | Ok _ -> Cstruct.of_string (Buffer.contents res) - | Error _ -> failwith "Deflate.deflate" - -let inflate ?output_size orig = - let res = Buffer.create (match output_size with - | Some len -> len - | None -> Mstruct.length orig) - in - Decompress.Inflate.bytes input_buffer output_buffer - (fun input_buffer -> - let n = min 0xFFFF (Mstruct.length orig) in - let s = Mstruct.get_string orig n in - Bytes.blit_string s 0 input_buffer 0 n; - n) - (fun output_buffer len -> - Buffer.add_subbytes res output_buffer 0 len; - 0xFFFF) - (Decompress.Inflate.default (Decompress.Window.reset window)) - |> function - | Ok _ -> Some (Mstruct.of_string (Buffer.contents res)) - | Error _ -> None diff --git a/projects/miragesdk/src/sdk/init.ml b/projects/miragesdk/src/sdk/init.ml deleted file mode 100644 index 7376bbd24..000000000 --- a/projects/miragesdk/src/sdk/init.ml +++ /dev/null @@ -1,213 +0,0 @@ -open Lwt.Infix - -let src = Logs.Src.create "init" ~doc:"Init steps" -module Log = (val Logs.src_log src : Logs.LOG) - -let failf fmt = Fmt.kstrf Lwt.fail_with fmt - -module Flow = struct - - let int_of_fd t = - (Obj.magic (Lwt_unix.unix_file_descr t): int) - - let fd ?name t = - Mirage_flow_lwt.create (module Mirage_flow_unix.Fd) t (match name with - | None -> string_of_int (int_of_fd t) - | Some n -> n) - -end - -let file_descr ?name t = Flow.fd ?name t - -let rawlink ?filter ethif = - Log.debug (fun l -> l "bringing up %s" ethif); - (try Tuntap.set_up_and_running ethif - with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); - let t = Lwt_rawlink.open_link ?filter ethif in - Mirage_flow_lwt.create (module Mirage_flow_rawlink) t ethif - -module Fd = struct - - type t = { - name: string; - fd : Lwt_unix.file_descr; - } - - let stdout = { name = "stdout"; fd = Lwt_unix.stdout } - let stderr = { name = "stderr"; fd = Lwt_unix.stderr } - let stdin = { name = "stdin" ; fd = Lwt_unix.stdin } - - let of_int name (i:int) = - let fd : Unix.file_descr = Obj.magic i in - let fd = Lwt_unix.of_unix_file_descr fd in - { name; fd } - - let to_int t = - (Obj.magic (Lwt_unix.unix_file_descr t.fd): int) - - let pp ppf fd = Fmt.pf ppf "%s:%d" fd.name (to_int fd) - - let close fd = - Log.debug (fun l -> l "close %a" pp fd); - Unix.close (Lwt_unix.unix_file_descr fd.fd) - - let dev_null = - Lwt_unix.of_unix_file_descr ~blocking:false - (Unix.openfile "/dev/null" [Unix.O_RDWR] 0) - - let redirect_to_dev_null fd = - Log.debug (fun l -> l "redirect-stdin-to-dev-null"); - Unix.close (Lwt_unix.unix_file_descr fd.fd); - Lwt_unix.dup2 dev_null fd.fd; - Unix.close (Lwt_unix.unix_file_descr dev_null) - - let dup2 ~src ~dst = - Log.debug (fun l -> l "dup2 %a => %a" pp src pp dst); - Lwt_unix.dup2 src.fd dst.fd; - close src - - let flow t = Flow.fd t.fd ~name:(Fmt.to_to_string pp t) - -end - -module Pipe = struct - - type t = Fd.t * Fd.t - - type monitor = { - stdout: t; - stderr: t; - metrics: t; - ctl: t; - net: t; - } - - let stdout t = t.stdout - let stderr t = t.stderr - let metrics t = t.metrics - let ctl t = t.ctl - let net t = t.net - - let name (x, _) = x.Fd.name - - let priv = fst - let calf = snd - - let socketpair name = - let priv, calf = Lwt_unix.(socketpair PF_UNIX SOCK_STREAM 0) in - Lwt_unix.clear_close_on_exec priv; - Lwt_unix.clear_close_on_exec calf; - { Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf } - - let pipe name = - let priv, calf = Lwt_unix.pipe () in - Lwt_unix.clear_close_on_exec priv; - Lwt_unix.clear_close_on_exec calf; - { Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf } - - let v () = - (* logs pipe *) - let stdout = pipe "stdout" in - let stderr = pipe "stderr" in - (* network pipe *) - let net = socketpair "net" in - (* store pipe *) - let ctl = socketpair "ctl" in - (* metrics pipe *) - let metrics = pipe "metrics" in - { stdout; stderr; ctl; net; metrics } - -end - -let exec_calf t cmd = - Log.info (fun l -> l "child pid is %d" Unix.(getpid ())); - Fd.(redirect_to_dev_null stdin); - - (* close parent fds *) - Fd.close Pipe.(priv t.stdout); - Fd.close Pipe.(priv t.stderr); - Fd.close Pipe.(priv t.ctl); - Fd.close Pipe.(priv t.net); - Fd.close Pipe.(priv t.metrics); - - let cmds = String.concat " " cmd in - - let calf_stdout = Fd.of_int "stdout" 1 in - let calf_stderr = Fd.of_int "stderr" 2 in - let calf_net = Fd.of_int "net" 3 in - let calf_ctl = Fd.of_int "ctl" 4 in - - Log.info (fun l -> l "Executing %s" cmds); - - (* Move all open fds at the top *) - Fd.dup2 ~src:Pipe.(calf t.net) ~dst:calf_net; - Fd.dup2 ~src:Pipe.(calf t.ctl) ~dst:calf_ctl; - Fd.dup2 ~src:Pipe.(calf t.stderr) ~dst:calf_stderr; - Fd.dup2 ~src:Pipe.(calf t.stdout) ~dst:calf_stdout; - - (* exec the calf *) - Unix.execve (List.hd cmd) (Array.of_list cmd) [||] - -let check_exit_status cmd status = - let cmds = String.concat " " cmd in - match status with - | Unix.WEXITED 0 -> Lwt.return_unit - | Unix.WEXITED i -> failf "%s: exit %d" cmds i - | Unix.WSIGNALED i -> failf "%s: signal %d" cmds i - | Unix.WSTOPPED i -> failf "%s: stopped %d" cmds i - -let exec_priv t ~pid cmd = - - Fd.(redirect_to_dev_null stdin); - - (* close child fds *) - Fd.close Pipe.(calf t.stdout); - Fd.close Pipe.(calf t.stderr); - Fd.close Pipe.(calf t.net); - Fd.close Pipe.(calf t.ctl); - Fd.close Pipe.(calf t.metrics); - - let wait () = - Lwt_unix.waitpid [] pid >>= fun (_pid, w) -> - Lwt_io.flush_all () >>= fun () -> - - check_exit_status cmd w - in - Lwt.return wait - -let block_for_ever = - let t, _ = Lwt.task () in - fun () -> t - -let exec_and_forward ?(handlers=block_for_ever) ~pid ~cmd ~net ~ctl t = - - exec_priv t ~pid cmd >>= fun wait -> - - let priv_net = Fd.flow Pipe.(priv t.net) in - let priv_ctl = Fd.flow Pipe.(priv t.ctl) in - let priv_stdout = Fd.flow Pipe.(priv t.stdout) in - let priv_stderr = Fd.flow Pipe.(priv t.stderr) in - - ctl priv_ctl; - - Lwt.pick ([ - wait (); - (* data *) - Mirage_flow_lwt.proxy ~verbose:true net priv_net; - - (* redirect the calf stdout to the shim stdout *) - Mirage_flow_lwt.forward ~verbose:false ~src:priv_stdout ~dst:Fd.(flow stdout); - Mirage_flow_lwt.forward ~verbose:false ~src:priv_stderr ~dst:Fd.(flow stderr); - (* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) - ~dst:Init.Fd.metric; *) - handlers (); - ]) - -let exec t cmd fn = - Lwt_io.flush_all () >>= fun () -> - match Lwt_unix.fork () with - | 0 -> exec_calf t cmd - | pid -> fn pid - -let run t ~net ~ctl ?handlers cmd = - exec t cmd (fun pid -> exec_and_forward ?handlers ~pid ~cmd ~net ~ctl t) diff --git a/projects/miragesdk/src/sdk/init.mli b/projects/miragesdk/src/sdk/init.mli deleted file mode 100644 index eecaa3504..000000000 --- a/projects/miragesdk/src/sdk/init.mli +++ /dev/null @@ -1,120 +0,0 @@ -(** Init functions. - - [Init] contains funcitons to initialise the state of the - privileged container. - - {ul - - {- fowrard and filter the network traffic using BPF (for instance - to allow only DHCP traffic).} - {- open pipes for forwarding the calf's stdout and stderr - to the privileged container's ones.} - {- open a pipe to forward the metrics.} - {- open a socket pair with the calf to be able to transmit control - data, e.g. the IP address once a DHCP lease is obtained.} - }*) - - -module Fd: sig - - type t - (** The type for file descriptors. *) - - val pp: t Fmt.t - (** [pp_fd] pretty prints a file descriptor. *) - - val redirect_to_dev_null: t -> unit - (** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *) - - val close: t -> unit - (** [close fd] closes [fd]. *) - - val dup2: src:t -> dst:t -> unit - (** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *) - - (** {1 Usefull File Descriptors} *) - - val stdin: t - (** [stdin] is the standart input. *) - - val stdout: t - (** [stdout] is the standard output. *) - - val stderr: t - (** [stderr] is the standard error. *) - - val flow: t -> Mirage_flow_lwt.t - (** [flow t] is the flow representing [t]. *) - -end - -val file_descr: ?name:string -> Lwt_unix.file_descr -> Mirage_flow_lwt.t -(** [file_descr ?name fd] is the flow for the file-descripor [fd]. *) - -module Pipe: sig - - type t - (** The type for pipes. Could be either uni-directional (normal - pipes) or a bi-directional (socket pairs). *) - - type monitor - (** The type for pipe monitors. *) - - val v: unit -> monitor - - val name: t -> string - (** [name t] is [t]'s name. *) - - val priv: t -> Fd.t - (** [priv p] is the private side of the pipe [p]. *) - - val calf: t -> Fd.t - (** [calf p] is the calf side of the pipe [p]. *) - - (** {1 Useful Pipes} *) - - val stdout: monitor -> t - (** [stdout m] is the uni-directional pipe from the calf's stdout - monitored by [m]. *) - - val stderr: monitor -> t - (** [stderr m] is the uni-directional pipe from the calf's stderr - monitored by [m]. *) - - val metrics: monitor -> t - (** [metrics m] is the uni-directional pipe from the calf's metric - endpoint monitored by [m]. *) - - val ctl: monitor -> t - (** [ctl m] is the bi-directional pipe used to exchange control data - between the calf and the priv containers monitored by [m]. *) - - val net: monitor -> t - (** [net m] is the bi-directional pipe used to exchange network - traffic between the calf and the priv containers monitored by - [m]. *) - -end - -val rawlink: ?filter:string -> string -> Mirage_flow_lwt.t -(** [rawlink ?filter x] is the flow using the network interface - [x]. The packets can be filtered using the BPF filter - [filter]. See the documentation of - {{:https://github.com/haesbaert/rawlink}rawlink} for more details - on how to build that filter. *) - -val exec: Pipe.monitor -> string list -> (int -> unit Lwt.t) -> unit Lwt.t -(** [exec t cmd k] executes [cmd] in an unprivileged calf process and - call [k] with the pid of the parent process. The child and parents - are connected using [t]. *) - -(* FIXME(samoht): not very happy with that signatue *) -val run: Pipe.monitor -> - net:Mirage_flow_lwt.t -> ctl:(Mirage_flow_lwt.t -> unit) -> - ?handlers:(unit -> unit Lwt.t) -> - string list -> unit Lwt.t -(** [run m ~net ~ctl ?handlers cmd] runs [cmd] in a unprivileged calf - process. [net] is the network interface flow. [ctl] runs the control - thread connected to the {Pipe.ctl} pipe. [handlers] are the system - handler thread which will react to control data to perform - privileged system actions. *) diff --git a/projects/miragesdk/src/sdk/io_fs.ml b/projects/miragesdk/src/sdk/io_fs.ml deleted file mode 100644 index 6e2ac43c0..000000000 --- a/projects/miragesdk/src/sdk/io_fs.ml +++ /dev/null @@ -1,325 +0,0 @@ -(* from irmin-unix, to avoid linking with gmp ... *) - -module Log = struct - let src = Logs.Src.create "git.unix" ~doc:"logs git's unix events" - include (val Logs.src_log src : Logs.LOG) -end - -open Lwt.Infix - -let mkdir_pool = Lwt_pool.create 1 (fun () -> Lwt.return_unit) - -let protect_unix_exn = function - | Unix.Unix_error _ as e -> Lwt.fail (Failure (Printexc.to_string e)) - | e -> Lwt.fail e - -let ignore_enoent = function - | Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return_unit - | e -> Lwt.fail e - -let protect f x = Lwt.catch (fun () -> f x) protect_unix_exn -let safe f x = Lwt.catch (fun () -> f x) ignore_enoent - -let mkdir dirname = - let rec aux dir = - if Sys.file_exists dir && Sys.is_directory dir then Lwt.return_unit - else ( - let clear = - if Sys.file_exists dir then ( - Log.debug (fun l -> - l "%s already exists but is a file, removing." dir); - safe Lwt_unix.unlink dir - ) else - Lwt.return_unit - in - clear >>= fun () -> - aux (Filename.dirname dir) >>= fun () -> - Log.debug (fun l -> l "mkdir %s" dir); - protect (Lwt_unix.mkdir dir) 0o755; - ) in - Lwt_pool.use mkdir_pool (fun () -> aux dirname) - -let file_exists f = - Lwt.catch (fun () -> Lwt_unix.file_exists f) (function - (* See https://github.com/ocsigen/lwt/issues/316 *) - | Unix.Unix_error (Unix.ENOTDIR, _, _) -> Lwt.return_false - | e -> Lwt.fail e) - -module Lock = struct - - let is_stale max_age file = - file_exists file >>= fun exists -> - if exists then ( - Lwt.catch (fun () -> - Lwt_unix.stat file >>= fun s -> - let stale = Unix.gettimeofday () -. s.Unix.st_mtime > max_age in - Lwt.return stale) - (function - | Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return false - | e -> Lwt.fail e) - ) else - Lwt.return false - - let unlock file = - Lwt_unix.unlink file - - let lock ?(max_age = 10. *. 60. (* 10 minutes *)) ?(sleep = 0.001) file = - let rec aux i = - Log.debug (fun f -> f "lock %s %d" file i); - is_stale max_age file >>= fun is_stale -> - if is_stale then ( - Log.err (fun f -> f "%s is stale, removing it." file); - unlock file >>= fun () -> - aux 1 - ) else - let create () = - let pid = Unix.getpid () in - mkdir (Filename.dirname file) >>= fun () -> - Lwt_unix.openfile file [Unix.O_CREAT; Unix.O_RDWR; Unix.O_EXCL] 0o600 - >>= fun fd -> - let oc = Lwt_io.of_fd ~mode:Lwt_io.Output fd in - Lwt_io.write_int oc pid >>= fun () -> - Lwt_unix.close fd - in - Lwt.catch create (function - | Unix.Unix_error(Unix.EEXIST, _, _) -> - let backoff = 1. +. Random.float (let i = float i in i *. i) in - Lwt_unix.sleep (sleep *. backoff) >>= fun () -> - aux (i+1) - | e -> Lwt.fail e) - in - aux 1 - - let with_lock file fn = - match file with - | None -> fn () - | Some f -> lock f >>= fun () -> Lwt.finalize fn (fun () -> unlock f) - -end - -let mmap_threshold = 4096 -let openfile_pool = Lwt_pool.create 200 (fun () -> Lwt.return_unit) - -let mkdir = mkdir - -type path = string - -(* we use file locking *) -type lock = path -let lock_file x = x - -let file_exists = file_exists - -let list_files kind dir = - if Sys.file_exists dir && Sys.is_directory dir then - let d = Sys.readdir dir in - let d = Array.to_list d in - let d = List.map (Filename.concat dir) d in - let d = List.filter kind d in - let d = List.sort String.compare d in - Lwt.return d - else - Lwt.return_nil - -let directories dir = - list_files (fun f -> - try Sys.is_directory f with Sys_error _ -> false - ) dir - -let files dir = - list_files (fun f -> - try not (Sys.is_directory f) with Sys_error _ -> false - ) dir - -let write_cstruct fd b = - let rec rwrite fd buf ofs len = - Lwt_bytes.write fd buf ofs len >>= fun n -> - if len = 0 then Lwt.fail End_of_file - else if n < len then rwrite fd buf (ofs + n) (len - n) - else Lwt.return_unit in - match Cstruct.len b with - | 0 -> Lwt.return_unit - | len -> rwrite fd (Cstruct.to_bigarray b) 0 len - -let delays = Array.init 20 (fun i -> 0.1 *. (float i) ** 2.) - -let command fmt = - Printf.ksprintf (fun str -> - Log.debug (fun l -> l "[exec] %s" str); - let i = Sys.command str in - if i <> 0 then Log.debug (fun l -> l "[exec] error %d" i); - Lwt.return_unit - ) fmt - -let remove_dir dir = - if Sys.os_type = "Win32" then - command "cmd /d /v:off /c rd /s /q %S" dir - else - command "rm -rf %S" dir - -let remove_file ?lock file = - Lock.with_lock lock (fun () -> - Lwt.catch - (fun () -> Lwt_unix.unlink file) - (function - (* On Windows, [EACCES] can also occur in an attempt to - rename a file or directory or to remove an existing - directory. *) - | Unix.Unix_error (Unix.EACCES, _, _) - | Unix.Unix_error (Unix.EISDIR, _, _) -> remove_dir file - | Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return_unit - | e -> Lwt.fail e) - ) - -let rename = - if Sys.os_type <> "Win32" then Lwt_unix.rename - else - fun tmp file -> - let rec aux i = - Lwt.catch - (fun () -> Lwt_unix.rename tmp file) - (function - (* On Windows, [EACCES] can also occur in an attempt to - rename a file or directory or to remove an existing - directory. *) - | Unix.Unix_error (Unix.EACCES, _, _) as e -> - if i >= Array.length delays then Lwt.fail e - else ( - file_exists file >>= fun exists -> - if exists && Sys.is_directory file then ( - remove_dir file >>= fun () -> aux (i+1) - ) else ( - Log.debug (fun l -> - l "Got EACCES, retrying in %.1fs" delays.(i)); - Lwt_unix.sleep delays.(i) >>= fun () -> aux (i+1) - )) - | e -> Lwt.fail e) - in - aux 0 - -let with_write_file ?temp_dir file fn = - begin match temp_dir with - | None -> Lwt.return_unit - | Some d -> mkdir d - end >>= fun () -> - let dir = Filename.dirname file in - mkdir dir >>= fun () -> - let tmp = Filename.temp_file ?temp_dir (Filename.basename file) "write" in - Lwt_pool.use openfile_pool (fun () -> - Log.debug (fun l -> l "Writing %s (%s)" file tmp); - Lwt_unix.(openfile tmp [O_WRONLY; O_NONBLOCK; O_CREAT; O_TRUNC] 0o644) - >>= fun fd -> - Lwt.finalize (fun () -> protect fn fd) (fun () -> Lwt_unix.close fd) - >>= fun () -> - rename tmp file - ) - -let read_file_with_read file size = - let chunk_size = max 4096 (min size 0x100000) in - let buf = Cstruct.create size in - let flags = [Unix.O_RDONLY] in - let perm = 0o0 in - Lwt_unix.openfile file flags perm >>= fun fd -> - let rec aux off = - let read_size = min chunk_size (size - off) in - Lwt_bytes.read fd buf.Cstruct.buffer off read_size >>= fun read -> - (* It should test for read = 0 in case size is larger than the - real size of the file. This may happen for instance if the - file was truncated while reading. *) - let off = off + read in - if off >= size then - Lwt.return buf - else - aux off - in - Lwt.finalize (fun () -> aux 0) - (fun () -> Lwt_unix.close fd) - -let read_file_with_mmap file = - let fd = Unix.(openfile file [O_RDONLY; O_NONBLOCK] 0o644) in - let ba = Lwt_bytes.map_file ~fd ~shared:false () in - Unix.close fd; - Lwt.return (Cstruct.of_bigarray ba) - -let read_file file = - Lwt.catch (fun () -> - Lwt_pool.use openfile_pool (fun () -> - Log.debug (fun l -> l "Reading %s" file); - Lwt_unix.stat file >>= fun stats -> - let size = stats.Lwt_unix.st_size in - (if size >= mmap_threshold then read_file_with_mmap file - else read_file_with_read file size - ) >|= fun buf -> - Some buf - ) - ) (function - | Unix.Unix_error _ | Sys_error _ -> Lwt.return_none - | e -> Lwt.fail e) - -let stat_info_unsafe path = - let open Git.Index in - let stats = Unix.stat path in - let ctime = { lsb32 = Int32.of_float stats.Unix.st_ctime; nsec = 0l } in - let mtime = { lsb32 = Int32.of_float stats.Unix.st_mtime; nsec = 0l } in - let dev = Int32.of_int stats.Unix.st_dev in - let inode = Int32.of_int stats.Unix.st_ino in - let mode = match stats.Unix.st_kind, stats.Unix.st_perm with - | Unix.S_REG, p -> if p land 0o100 = 0o100 then `Exec else `Normal - | Unix.S_LNK, _ -> `Link - | k, p -> - let kind = match k with - | Unix.S_REG -> "REG" - | Unix.S_DIR -> "DIR" - | Unix.S_CHR -> "CHR" - | Unix.S_BLK -> "BLK" - | Unix.S_LNK -> "LNK" - | Unix.S_FIFO -> "FIFO" - | Unix.S_SOCK -> "SOCK" - in - let perm = Printf.sprintf "%o" p in - let error = - Printf.sprintf "%s: not supported kind of file [%s, %s]." - path kind perm - in - failwith error - in - let uid = Int32.of_int stats.Unix.st_uid in - let gid = Int32.of_int stats.Unix.st_gid in - let size = Int32.of_int stats.Unix.st_size in - { ctime; mtime; dev; inode; uid; gid; mode; size } - -let stat_info path = - Lwt.catch (fun () -> Lwt.return (Some (stat_info_unsafe path))) (function - | Sys_error _ | Unix.Unix_error _ -> Lwt.return_none - | e -> Lwt.fail e) - -let chmod ?lock f `Exec = - Lock.with_lock lock (fun () -> Lwt_unix.chmod f 0o755) - -let write_file ?temp_dir ?lock file b = - let write () = - with_write_file file ?temp_dir (fun fd -> write_cstruct fd b) - in - Lock.with_lock lock (fun () -> - Lwt.catch write (function - | Unix.Unix_error (Unix.EISDIR, _, _) -> remove_dir file >>= write - | e -> Lwt.fail e - ) - ) - -let test_and_set_file ?temp_dir ~lock file ~test ~set = - Lock.with_lock (Some lock) (fun () -> - read_file file >>= fun v -> - let equal = match test, v with - | None , None -> true - | Some x, Some y -> Cstruct.equal x y - | _ -> false - in - if not equal then Lwt.return false - else - (match set with - | None -> remove_file file - | Some v -> write_file ?temp_dir file v) - >|= fun () -> - true - ) diff --git a/projects/miragesdk/src/sdk/jbuild b/projects/miragesdk/src/sdk/jbuild index 03acbe050..c2b9febb9 100644 --- a/projects/miragesdk/src/sdk/jbuild +++ b/projects/miragesdk/src/sdk/jbuild @@ -4,10 +4,9 @@ ((name sdk) (public_name sdk) (flags (:standard -w -53-55)) - (libraries (cstruct.lwt decompress irmin irmin-git lwt.unix rawlink - tuntap astring rresult mirage-flow-lwt capnp capnp-rpc-lwt - mirage-channel-lwt io-page.unix ipaddr mirage-flow-unix - mirage-flow-rawlink)))) + (libraries (irmin irmin-mem lwt.unix rawlink charrua-core.wire + tuntap astring mirage-flow-lwt mirage-net-flow + mirage-time-lwt mirage-flow-rawlink capnp capnp-rpc-lwt)))) (rule ((targets (proto.ml proto.mli)) diff --git a/projects/miragesdk/src/sdk/net.ml b/projects/miragesdk/src/sdk/net.ml index 2a99e60ce..0fe380322 100644 --- a/projects/miragesdk/src/sdk/net.ml +++ b/projects/miragesdk/src/sdk/net.ml @@ -1,32 +1,232 @@ -(* This file is a big hack and should be replaced ASAP with proper bindings *) - open Lwt.Infix +open Capnp_rpc_lwt -let src = Logs.Src.create "net" ~doc:"Network Configuration" +module B = Api.Builder.Net +module R = Api.Reader.Net + +module type S = Mirage_net_lwt.S + +module Callback = struct + + let service f = + B.Callback.local @@ object (_: B.Callback.service) + inherit B.Callback.service + method f_impl req = + let module P = R.Callback.F_params in + let params = P.of_payload req in + let change = P.buffer_get params in + Service.return_lwt (fun () -> + f (Cstruct.of_string change) >|= fun () -> + Ok (Service.Response.create_empty ()) + ) + end + + module F = Api.Reader.Conf.Callback + + let client t change = + let module P = B.Callback.F_params in + let req, p = Capability.Request.create P.init_pointer in + let change = Cstruct.to_string change in + P.buffer_set p change; + Capability.call_for_value t R.Callback.f_method req >>= function + | Ok _ -> Lwt.return () + | Error e -> + Fmt.kstrf Lwt.fail_with "error: f(%s) -> %a" change Capnp_rpc.Error.pp e + +end + +module Client (F: Flow.S) = struct + + type 'a io = 'a Lwt.t + + type t = { + cap : R.t Capability.t; + mac : Macaddr.t; + stats: Mirage_net.stats; + } + + type page_aligned_buffer = Io_page.t + type buffer = Cstruct.t + type macaddr = Macaddr.t + + type error = [ + | `Msg of string + | `Undefined of int + | `Capnp of Capnp_rpc.Error.t + | Mirage_device.error + ] + + let pp_error: error Fmt.t = fun ppf -> function + | `Msg s -> Fmt.pf ppf "error %s" s + | `Undefined i -> Fmt.pf ppf "undefined %d" i + | `Capnp e -> Fmt.pf ppf "capnp: %a" Capnp_rpc.Error.pp e + | #Mirage_device.error as e -> Mirage_device.pp_error ppf e + + let result r = + let module R = R.Result in + match R.get (R.of_payload r) with + | R.Ok -> Ok () + | R.Unimplemented -> Error `Unimplemented + | R.Disconnected -> Error `Disconnected + | R.Error s -> Error (`Msg s) + | R.Undefined i -> Error (`Undefined i) + + let write t buf = + let module P = B.Write_params in + let req, p = Capability.Request.create P.init_pointer in + P.buffer_set p (Cstruct.to_string buf); + Capability.call_for_value t.cap R.write_method req >|= function + | Error e -> Error (`Capnp e) + | Ok r -> + Mirage_net.Stats.tx t.stats (Int64.of_int @@ Cstruct.len buf); + result r + + let writev t bufs = + let module P = B.Writev_params in + let req, p = Capability.Request.create P.init_pointer in + ignore @@ P.buffers_set_list p (List.map Cstruct.to_string bufs); + Capability.call_for_value t.cap R.writev_method req >|= function + | Error e -> Error (`Capnp e) + | Ok r -> + Mirage_net.Stats.tx t.stats (Int64.of_int @@ Cstruct.lenv bufs); + result r + + let listen t f = + let module P = B.Listen_params in + let req, p = Capability.Request.create P.init_pointer in + let callback = Capability.Request.export req (Callback.service f) in + P.callback_set p (Some callback); + Capability.call_for_value t.cap R.listen_method req >|= function + | Ok _ -> Ok () + | Error e -> Error (`Capnp e) + + let disconnect { cap; _ } = + let module P = B.Disconnect_params in + let req, _ = Capability.Request.create P.init_pointer in + Capability.call_for_value cap R.disconnect_method req >|= fun _ -> + () + + let mac t = t.mac + + let capability ~switch ?tags f = + let ep = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) f in + let client = Capnp_rpc_lwt.CapTP.connect ~switch ?tags ep in + Capnp_rpc_lwt.CapTP.bootstrap client |> Lwt.return + + let connect ~switch ?tags f = + capability ~switch ?tags f >>= fun cap -> + let module P = B.Mac_params in + let req, _ = Capability.Request.create P.init_pointer in + Capability.call_for_value cap R.mac_method req >>= function + | Error e -> Fmt.kstrf Lwt.fail_with "%a" Capnp_rpc.Error.pp e + | Ok r -> + let module R = R.Mac_results in + let mac = R.mac_get (R.of_payload r) |> Macaddr.of_string_exn in + let stats = Mirage_net.Stats.create () in + Lwt.return { cap; mac; stats } + + let reset_stats_counters t = Mirage_net.Stats.reset t.stats + let get_stats_counters t = t.stats +end + +module Server (F: Flow.S) (Local: Mirage_net_lwt.S) = struct + + let result x = + let module R = B.Result in + let resp, r = Service.Response.create R.init_pointer in + let () = match x with + | Ok () -> R.ok_set r + | Error `Disconnected -> R.disconnected_set r + | Error `Unimplemented -> R.unimplemented_set r + | Error e -> Fmt.kstrf (R.error_set r) "%a" Local.pp_error e + in + Ok resp + + let mac_result x = + let module R = B.Mac_results in + let resp, r = Service.Response.create R.init_pointer in + R.mac_set r (Macaddr.to_string x); + Ok resp + + let disconnect_result () = + let module R = B.Disconnect_results in + let resp, _ = Service.Response.create R.init_pointer in + Ok resp + + let service t = + B.local @@ + object (_ : B.service) + inherit B.service + + method disconnect_impl _req = + Service.return_lwt (fun () -> Local.disconnect t >|= disconnect_result) + + method write_impl req = + let module P = R.Write_params in + let params = P.of_payload req in + let buf = P.buffer_get params |> Cstruct.of_string in + Service.return_lwt (fun () -> Local.write t buf >|= result) + + method writev_impl req = + let module P = R.Writev_params in + let params = P.of_payload req in + let bufs = P.buffers_get_list params |> List.map Cstruct.of_string in + Service.return_lwt (fun () -> Local.writev t bufs >|= result) + + method listen_impl req = + let module P = R.Listen_params in + let params = P.of_payload req in + match P.callback_get params with + | None -> failwith "No watcher callback given" + | Some i -> + let callback = Payload.import req i in + Service.return_lwt (fun () -> + Local.listen t (Callback.client callback) >|= result + ) + + method mac_impl req = + let module P = R.Mac_params in + let _params = P.of_payload req in + Service.return_lwt (fun () -> Lwt.return (mac_result (Local.mac t))) + + end + + type t = R.t Capability.t + + let listen ~switch ?tags service fd = + let endpoint = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module F) fd in + Capnp_rpc_lwt.CapTP.connect ~switch ?tags ~offer:service endpoint + |> ignore + +end + +let src = Logs.Src.create "sdk/net" module Log = (val Logs.src_log src : Logs.LOG) -let run fmt = - Fmt.kstrf (fun str -> - Log.info (fun l -> l "run: %S" str); - match Sys.command str with - | 0 -> Lwt.return () - | i -> Fmt.kstrf Lwt.fail_with "%S exited with code %d" str i - ) fmt +module Fd = struct -let read fmt = - Fmt.kstrf (fun str -> - Lwt_process.pread ("/bin/sh", [|"/bin/sh"; "-c"; str|]) - ) fmt + module Net = Mirage_net_flow.Make(Flow.Fd) -let mac ethif = - read "ifconfig -a %s | grep -o -E '([[:xdigit:]]{1,2}:){5}[[:xdigit:]]{1,2}'" - ethif >|= fun mac -> - Macaddr.of_string_exn (String.trim mac) + include Net -let set_ip ethif ip = - (* FIXME: use language bindings to netlink instead *) - (* run "ip addr add %s/24 dev %s" ip ethif *) - run "ifconfig %s %a netmask 255.255.255.0" ethif Ipaddr.V4.pp_hum ip + let connect ?mac (i:int) = + let fd : Unix.file_descr = Obj.magic i in + let fd = Lwt_unix.of_unix_file_descr fd in + Net.connect ?mac (Flow.Fd.of_fd fd) -let set_gateway gw = - run "ip route add default via %a" Ipaddr.V4.pp_hum gw +end + +module Rawlink = struct + + module R = Mirage_flow_rawlink + module Net = Mirage_net_flow.Make(R) + include Net + + let connect ~filter ?mac ethif = + Log.debug (fun l -> l "bringing up %s" ethif); + (try Tuntap.set_up_and_running ethif + with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); + let flow = Lwt_rawlink.open_link ~filter ethif in + connect ?mac flow + +end diff --git a/projects/miragesdk/src/sdk/net.mli b/projects/miragesdk/src/sdk/net.mli index 58ebdffe4..52b8ff5a2 100644 --- a/projects/miragesdk/src/sdk/net.mli +++ b/projects/miragesdk/src/sdk/net.mli @@ -1,10 +1,35 @@ -(** [Net] exposes low-level system functions related to network. *) +(** MirageOS's net interface over RPC *) -val mac: string -> Macaddr.t Lwt.t -(** [mac e] is the MAC address of the interface [e]. *) +(** {1 Remote Networks} *) -val set_ip: string -> Ipaddr.V4.t -> unit Lwt.t -(** [set_ip e ip] sets [e]'s IP address to [ip]. *) +module type S = Mirage_net_lwt.S -val set_gateway: Ipaddr.V4.t -> unit Lwt.t -(** [set_gateway ip] set the default host gateway to [ip]. *) +(** [Client(F)] a an implementation of MirageOS's net interface over + the flow [F]. Once connected, to the other side of the net, behave + just as a normal local net, althought all the calls are now sent + to the remote end. *) +module Client (F: Flow.S): sig + include S + val connect: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> F.t -> t Lwt.t +end + +(** [Server(F)(Local)] exposes the MirageOS's network [Local] as a + Cap-n-p RPC endpoint over the flow [F]. Clients calls done on the + other end of [F] will be executed on the server-side. *) +module Server (F: Flow.S) (Local: S): sig + type t + val service: Local.t -> t + val listen: switch:Lwt_switch.t -> ?tags:Logs.Tag.set -> t -> F.t -> unit +end + +(** {1 Local Networks} *) + +module Fd: sig + include S + val connect: ?mac:Macaddr.t -> int -> t Lwt.t +end + +module Rawlink: sig + include S + val connect: filter:string -> ?mac:Macaddr.t -> string -> t Lwt.t +end diff --git a/projects/miragesdk/src/sdk/proto.capnp b/projects/miragesdk/src/sdk/proto.capnp index 3b94b3705..81d69dd9c 100644 --- a/projects/miragesdk/src/sdk/proto.capnp +++ b/projects/miragesdk/src/sdk/proto.capnp @@ -1,14 +1,75 @@ @0x9e83562906de8259; -struct Response { - union { - ok @0 :Data; - notFound @1 :Void; +interface Flow { + + struct ReadResult { + union { + data @0: Data; + eof @1: Void; + error @2: Text; + } } + + struct WriteResult { + union { + ok @0: Void; + closed @1: Void; + error @2: Text; + } + } + + read @0 () -> ReadResult; + write @1 (buffer: Data) -> WriteResult; + writev @2 (buffers: List(Data)) -> WriteResult; + close @3 () -> (); } -interface Ctl { +interface Net { + + interface Callback { + f @0 (buffer :Data) -> (); + } + + struct Result { + union { + ok @0: Void; + disconnected @1: Void; + unimplemented @2: Void; + error @3: Text; + } + } + + disconnect @0 () -> (); + write @1 (buffer: Data) -> Result; + writev @2 (buffers: List(Data)) -> Result; + listen @3 (callback: Callback) -> Result; + mac @4 () -> (mac: Text); # FIXME: better type +} + +# FIXME: replace ip and mac by proper types for Mac and IP adresses +interface Host { + intf @0 () -> (intf: Text); + mac @1 () -> (mac: Text); + dhcpOptions @2 () -> (options: List(Text)); + setIp @3 (ip: Text) -> (); + setGateway @4 (ip: Text) -> (); +} + +interface Conf { + + interface Callback { + f @0 (change :Data) -> (); + } + + struct Response { + union { + ok @0 :Data; + notFound @1 :Void; + } + } + write @0 (path :List(Text), data: Data) -> (); read @1 (path :List(Text)) -> Response; delete @2 (path :List(Text)) -> (); + watch @3 (path :List(Text), callback :Callback) -> (); } diff --git a/projects/miragesdk/src/sdk/time.ml b/projects/miragesdk/src/sdk/time.ml new file mode 100644 index 000000000..61d696ce1 --- /dev/null +++ b/projects/miragesdk/src/sdk/time.ml @@ -0,0 +1,10 @@ +module type S = sig + type t + include Mirage_time_lwt.S +end + +module Local = struct + type +'a io = 'a Lwt.t + type t = unit + let sleep_ns x = Lwt_unix.sleep (Int64.to_float x /. 1_000_000_000.) +end diff --git a/projects/miragesdk/src/sdk/time.mli b/projects/miragesdk/src/sdk/time.mli new file mode 100644 index 000000000..83b33758b --- /dev/null +++ b/projects/miragesdk/src/sdk/time.mli @@ -0,0 +1,8 @@ +(** MirageOS's time interface over RPC. *) + +module type S = sig + type t + include Mirage_time_lwt.S +end + +module Local: S with type t = unit diff --git a/projects/miragesdk/src/test/jbuild b/projects/miragesdk/src/test/jbuild index 9f0a93d11..84fd4fa60 100644 --- a/projects/miragesdk/src/test/jbuild +++ b/projects/miragesdk/src/test/jbuild @@ -2,8 +2,7 @@ (executables ((names (test)) - (libraries (sdk alcotest astring mtime mtime.clock.os mirage-flow-lwt - logs.fmt)))) + (libraries (sdk alcotest astring mtime mtime.clock.os logs.fmt)))) (alias ((name runtest) diff --git a/projects/miragesdk/src/test/test.ml b/projects/miragesdk/src/test/test.ml index 128c9b5f1..0bf4946ee 100644 --- a/projects/miragesdk/src/test/test.ml +++ b/projects/miragesdk/src/test/test.ml @@ -1,6 +1,5 @@ open Astring open Lwt.Infix -open Sdk let random_string n = Bytes.init n (fun _ -> char_of_int (Random.int 255)) @@ -19,6 +18,8 @@ let check_raises msg f = let escape = String.Ascii.escape +module IO = Sdk.Flow.FIFO + let write fd strs = Lwt_list.iter_s (fun str -> IO.write fd (Cstruct.of_string str) >>= function @@ -32,50 +33,46 @@ let read fd = | Ok `Eof -> Lwt.fail_with "read: EOF" | Error e -> Fmt.kstrf Lwt.fail_with "read: %a" IO.pp_error e -let calf pipe = Init.(Fd.flow Pipe.(calf pipe)) -let priv pipe = Init.(Fd.flow Pipe.(priv pipe)) - -let test_pipe pipe () = - let calf = calf pipe in - let priv = priv pipe in - let name = Init.Pipe.name pipe in +(* +let test_socketpair c s () = + IO.connect path >>= fun c -> + IO.connect path >>= fun s -> let test strs = let escape_strs = String.concat ~sep:"" @@ List.map escape strs in - (* pipes are unidirectional *) - (* calf -> priv works *) - write calf strs >>= fun () -> - read priv >>= fun buf -> - let msg = Fmt.strf "%s: calf -> priv" name in - Alcotest.(check string) msg escape_strs (escape buf); - (* priv -> calf don't *) - check_raises (Fmt.strf "%s: priv side is writable!" name) - (fun () -> write priv strs) >>= fun () -> - check_raises (Fmt.strf "%s: calf sid is readable!" name) - (fun () -> read calf >|= ignore) >>= fun () -> + (* socket pairs are bi-directional *) + (* c -> s works *) + write c strs >>= fun () -> + read s >>= fun buf -> + Alcotest.(check string) "c -> s" escape_strs (escape buf); + (* s -> c works *) + write s strs >>= fun () -> + read c >>= fun buf -> + Alcotest.(check string) "s -> c" escape_strs (escape buf); Lwt.return_unit in test [random_string 1] >>= fun () -> test [random_string 1; random_string 1; random_string 10] >>= fun () -> test [random_string 100] >>= fun () -> - test [random_string 10241] >>= fun () -> + (* note: if size(writes) > 8192 then the next writes will block (as + we are using SOCK_STREAM *) + let n = 8182 / 4 in + test [ + random_string n; + random_string n; + random_string n; + random_string n; + ] >>= fun () -> Lwt.return_unit +*) -let test_socketpair pipe () = - let calf = calf pipe in - let priv = priv pipe in - let name = Init.Pipe.name pipe in +let test_fifo path () = + IO.connect path >>= fun t -> let test strs = let escape_strs = String.concat ~sep:"" @@ List.map escape strs in - (* socket pairs are bi-directional *) - (* calf -> priv works *) - write calf strs >>= fun () -> - read priv >>= fun buf -> - Alcotest.(check string) (name ^ " calf -> priv") escape_strs (escape buf); - (* priv -> cal works *) - write priv strs >>= fun () -> - read calf >>= fun buf -> - Alcotest.(check string) (name ^ " priv -> calf") escape_strs (escape buf); + write t strs >>= fun () -> + read t >>= fun buf -> + Alcotest.(check string) "fifo" escape_strs (escape buf); Lwt.return_unit in test [random_string 1] >>= fun () -> @@ -97,77 +94,87 @@ let failf fmt = Fmt.kstrf Alcotest.fail fmt (* read ops *) -let pp_error = Ctl.Client.pp_error +module Client = Sdk.Conf.Client(IO) +module Server = Sdk.Conf.Server(IO) + let pp_path = Fmt.(Dump.list string) let read_should_err t k = - Ctl.Client.read t k >|= function - | Error _ -> () - | Ok None -> failf "read(%a) -> got: none, expected: err" pp_path k - | Ok Some v -> failf "read(%a) -> got: found:%S, expected: err" pp_path k v + Lwt.catch (fun () -> + Client.find t k >|= function + | None -> failf "read(%a) -> got: none, expected: err" pp_path k + | Some v -> failf "read(%a) -> got: found:%S, expected: err" pp_path k v + ) (fun _ -> Lwt.return ()) let read_should_none t k = - Ctl.Client.read t k >|= function - | Error e -> failf "read(%a) -> got: error:%a, expected none" pp_path k pp_error e - | Ok None -> () - | Ok Some v -> failf "read(%a) -> got: found:%S, expected none" pp_path k v + Lwt.catch (fun () -> + Client.find t k >|= function + | None -> () + | Some v -> failf "read(%a) -> got: found:%S, expected none" pp_path k v + ) (fun e -> + failf "read(%a) -> got: error:%a, expected none" pp_path k Fmt.exn e) let read_should_work t k v = - Ctl.Client.read t k >|= function - | Error e -> failf "read(%a) -> got: error:%a, expected ok" pp_path k pp_error e - | Ok None -> failf "read(%a) -> got: none, expected ok" pp_path k - | Ok Some v' -> - if v <> v' then failf "read(%a) -> got: ok:%S, expected: ok:%S" pp_path k v' v + Lwt.catch (fun () -> + Client.find t k >|= function + | None -> failf "read(%a) -> got: none, expected ok" pp_path k + | Some v' -> + if v <> v' then + failf "read(%a) -> got: ok:%S, expected: ok:%S" pp_path k v' v + ) (fun e -> + failf "read(%a) -> got: error:%a, expected ok" pp_path k Fmt.exn e) (* write ops *) let write_should_err t k v = - Ctl.Client.write t k v >|= function - | Ok () -> failf "write(%a) -> ok" pp_path k - | Error _ -> () + Lwt.catch + (fun () -> Client.set t k v >|= fun () -> failf "write(%a) -> ok" pp_path k) + (fun _ -> Lwt.return ()) let write_should_work t k v = - Ctl.Client.write t k v >|= function - | Ok () -> () - | Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e + Lwt.catch + (fun () -> Client.set t k v) + (fun e -> failf "write(%a) -> error: %a" pp_path k Fmt.exn e) (* del ops *) let delete_should_err t k = - Ctl.Client.delete t k >|= function - | Ok () -> failf "del(%a) -> ok" pp_path k - | Error _ -> () + Lwt.catch + (fun () -> Client.delete t k >|= fun () -> failf "del(%a) -> ok" pp_path k) + (fun _ -> Lwt.return ()) let delete_should_work t k = - Ctl.Client.delete t k >|= function - | Ok () -> () - | Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e + Lwt.catch + (fun () -> Client.delete t k) + (fun e -> failf "write(%a) -> error: %a" pp_path k Fmt.exn e) -let test_ctl t () = +let pp_actor f (style, name) = Fmt.(styled style (const string name)) f () +let unknown = `Black, "------" +let actor_tag = Logs.Tag.def "actor" pp_actor + +let server_tags = Logs.Tag.(empty |> add actor_tag (`Red, "server")) +let client_tags = Logs.Tag.(empty |> add actor_tag (`Green, "client")) + +let test_ctl c s () = Lwt_switch.with_switch @@ fun switch -> - let calf = calf Init.Pipe.(ctl t) in - let priv = priv Init.Pipe.(ctl t) in let k1 = ["foo"; "bar"] in let k2 = ["a"] in let k3 = ["b"; "c"] in let k4 = ["xxxxxx"] in let all = [`Read; `Write; `Delete] in let routes = [k1,all; k2,all; k3,all ] in - let git_root = "/tmp/sdk/ctl" in - let _ = Sys.command (Fmt.strf "rm -rf %s" git_root) in - Ctl.v git_root >>= fun ctl -> + Server.KV.v () >>= fun kv -> let _server = - let service = Ctl.Server.service ~routes ctl in - Capnp_rpc_lwt.CapTP.of_endpoint ~switch ~offer:service (Capnp_rpc_lwt.Endpoint.of_flow ~switch (module IO) priv) + let service = Server.service ~switch ~routes kv in + Server.listen ~switch ~tags:server_tags service s in - let client = Capnp_rpc_lwt.CapTP.of_endpoint ~switch (Capnp_rpc_lwt.Endpoint.of_flow ~switch (module IO) calf) in - let t = Capnp_rpc_lwt.CapTP.bootstrap client in + Client.connect ~switch ~tags:client_tags c >>= fun t -> let allowed k v = delete_should_work t k >>= fun () -> read_should_none t k >>= fun () -> write_should_work t k v >>= fun () -> read_should_work t k v >>= fun () -> - Ctl.KV.get ctl k >|= fun v' -> + Server.KV.get kv k >|= fun v' -> Alcotest.(check string) "in the db" v v' in let disallowed k v = @@ -181,33 +188,6 @@ let test_ctl t () = disallowed k4 "" >>= fun () -> Lwt.return_unit -let in_memory_flow () = - let flow = Mirage_flow_lwt.F.string () in - IO.create (module Mirage_flow_lwt.F) flow "mem" - -let test_exec () = - let test () = - let check n pipe = - let t = Init.Pipe.v () in - let pipe = pipe t in - Init.exec t ["/bin/sh"; "-c"; "echo foo >& " ^ string_of_int n] @@ fun _pid -> - read @@ priv pipe >>= fun foo -> - let name = Fmt.strf "fork %s" Init.Pipe.(name pipe) in - Alcotest.(check string) name "foo\n" foo; - Lwt.return_unit - in - check 1 Init.Pipe.stdout >>= fun () -> - (* avoid logging interference *) - let level = Logs.level () in - Logs.set_level None; - check 2 Init.Pipe.stderr >>= fun () -> - Logs.set_level level; - check 3 Init.Pipe.net >>= fun () -> - check 4 Init.Pipe.ctl >>= fun () -> - Lwt.return_unit - in - test () - let run f () = try Lwt_main.run (f ()) with e -> @@ -216,35 +196,49 @@ let run f () = let test_stderr () = () -let t = Init.Pipe.v () +let fifo = "/tmp/sdk-fifo" +let c, s = + let c, s = Lwt_unix.(socketpair PF_UNIX SOCK_STREAM 0) in + IO.of_fd c, IO.of_fd s let test = [ - "stdout is a pipe" , `Quick, run (test_pipe Init.Pipe.(stdout t)); - "stdout is a pipe" , `Quick, run (test_pipe Init.Pipe.(stderr t)); - "net is a socket pair", `Quick, run (test_socketpair Init.Pipe.(net t)); - "ctl is a socket pair", `Quick, run (test_socketpair Init.Pipe.(ctl t)); - "ctl" , `Quick, run (test_ctl t); - "exec" , `Quick, run test_exec; + "FIFO flows", `Quick, run (test_fifo fifo); + "conf" , `Quick, run (test_ctl c s); ] +let pp_qid f = function + | None -> () + | Some x -> + let s = Uint32.to_string x in + Fmt.(styled `Magenta (fun f x -> Fmt.pf f " (qid=%s)" x)) f s + let reporter ?(prefix="") () = let pad n x = if String.length x > n then x else x ^ String.v ~len:(n - String.length x) (fun _ -> ' ') in let report src level ~over k msgf = - let k _ = over (); k () in let ppf = match level with Logs.App -> Fmt.stdout | _ -> Fmt.stderr in - let with_stamp h _tags k fmt = + let with_stamp h ?(tags=Logs.Tag.empty) k fmt = + let actor = + match Logs.Tag.find actor_tag tags with + | Some x -> x + | None -> unknown + in + let qid = Logs.Tag.find Capnp_rpc.Debug.qid_tag tags in let dt = Mtime.Span.to_us (Mtime_clock.elapsed ()) in - Fmt.kpf k ppf ("%s%+04.0fus %a %a @[" ^^ fmt ^^ "@]@.") + let k _ = + Fmt.(pf ppf) "%a@." pp_qid qid; + over (); k () in + Fmt.kpf k ppf ("%s%+04.0fus %a %a %a @[" ^^ fmt ^^ "@]") prefix dt Fmt.(styled `Magenta string) (pad 10 @@ Logs.Src.name src) Logs_fmt.pp_header (level, h) + pp_actor actor in msgf @@ fun ?header ?tags fmt -> - with_stamp header tags k fmt + with_stamp header ?tags k fmt in { Logs.report = report }