mirror of
https://github.com/linuxkit/linuxkit.git
synced 2025-07-20 17:49:10 +00:00
miragesdk: refactor the SDK
Expose a non-unix dependent flow-like API, so it is easier to test/use in a unikernel. Signed-off-by: Thomas Gazagnaire <thomas@gazagnaire.org>
This commit is contained in:
parent
a07952d4e6
commit
3cec2b1f5e
@ -24,13 +24,14 @@ daemon:
|
|||||||
oomScoreAdj: -800
|
oomScoreAdj: -800
|
||||||
readonly: true
|
readonly: true
|
||||||
- name: dhcp-client
|
- name: dhcp-client
|
||||||
image: "mobylinux/dhcp-client:99ecd3304172eb7570aa5c7f527cec2577b48a84"
|
image: "mobylinux/dhcp-client:6478d616909eee58bfda46ba742b5b286965fb03"
|
||||||
net: host
|
net: host
|
||||||
capabilities:
|
capabilities:
|
||||||
- CAP_NET_ADMIN # to bring eth0 up
|
- CAP_NET_ADMIN # to bring eth0 up
|
||||||
- CAP_NET_RAW # to read /dev/eth0
|
- CAP_NET_RAW # to read /dev/eth0
|
||||||
binds:
|
binds:
|
||||||
- /var/run/dhcp-client:/data
|
- /var/run/dhcp-client:/data
|
||||||
|
- /usr/bin/runc:/usr/bin/runc
|
||||||
- /sbin:/sbin # for ifconfig
|
- /sbin:/sbin # for ifconfig
|
||||||
- /bin:/bin # for ifconfig
|
- /bin:/bin # for ifconfig
|
||||||
- /lib:/lib # for ifconfig
|
- /lib:/lib # for ifconfig
|
||||||
|
@ -26,4 +26,5 @@ RUN opam config exec -- jbuilder build dhcp-client/main.exe
|
|||||||
RUN sudo cp /src/_build/default/dhcp-client/main.exe /dhcp-client
|
RUN sudo cp /src/_build/default/dhcp-client/main.exe /dhcp-client
|
||||||
|
|
||||||
RUN opam config exec -- jbuilder build dhcp-client/calf/unikernel.exe
|
RUN opam config exec -- jbuilder build dhcp-client/calf/unikernel.exe
|
||||||
RUN sudo cp /src/_build/default/dhcp-client/calf/unikernel.exe /dhcp-client-calf
|
RUN sudo mkdir -p /calf
|
||||||
|
RUN sudo cp /src/_build/default/dhcp-client/calf/unikernel.exe /calf/dhcp-client-calf
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
#FROM ocaml/opam:alpine-3.5_ocaml-4.04.0
|
#FROM ocaml/opam:alpine-3.5_ocaml-4.04.0
|
||||||
FROM scratch
|
FROM scratch
|
||||||
COPY obj ./
|
COPY obj ./
|
||||||
CMD ["/dhcp-client"]
|
CMD ["/dhcp-client", "-vv"]
|
||||||
|
@ -7,7 +7,7 @@ IMAGE=dhcp-client
|
|||||||
OBJS=obj/dhcp-client
|
OBJS=obj/dhcp-client
|
||||||
|
|
||||||
MIRAGE_COMPILE=mobylinux/mirage-compile:f903b0e1b4328271364cc63f123ac49d56739cef@sha256:a54d9ca84d3f5998dba92ce83d60d49289cee8908a8b0f6ec280d30ab8edf46c
|
MIRAGE_COMPILE=mobylinux/mirage-compile:f903b0e1b4328271364cc63f123ac49d56739cef@sha256:a54d9ca84d3f5998dba92ce83d60d49289cee8908a8b0f6ec280d30ab8edf46c
|
||||||
CALF_OBJS=obj/dhcp-client-calf
|
CALF_OBJS=obj/calf/dhcp-client-calf
|
||||||
CALF_FILES=dhcp-client/calf/config.ml dhcp-client/calf/unikernel.ml \
|
CALF_FILES=dhcp-client/calf/config.ml dhcp-client/calf/unikernel.ml \
|
||||||
dhcp-client/calf/jbuild
|
dhcp-client/calf/jbuild
|
||||||
|
|
||||||
@ -18,8 +18,8 @@ default: push
|
|||||||
docker build -t $(IMAGE):build -f Dockerfile.build -q . > .build || \
|
docker build -t $(IMAGE):build -f Dockerfile.build -q . > .build || \
|
||||||
(rm -f $@ && exit 1)
|
(rm -f $@ && exit 1)
|
||||||
|
|
||||||
.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS)
|
.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) obj/config.json
|
||||||
docker build -t $(IMAGE):pkg -f Dockerfile.pkg -q . > .pkg || \
|
docker build --no-cache -t $(IMAGE):pkg -f Dockerfile.pkg -q . > .pkg || \
|
||||||
(rm -f $@ && exit 1)
|
(rm -f $@ && exit 1)
|
||||||
|
|
||||||
.dev: Dockerfile.dev init-dev.sh
|
.dev: Dockerfile.dev init-dev.sh
|
||||||
@ -43,13 +43,23 @@ enter-dev: .dev
|
|||||||
# tar xf - || exit 1) && \
|
# tar xf - || exit 1) && \
|
||||||
# touch $@
|
# touch $@
|
||||||
|
|
||||||
$(OBJS) $(CALF_OBJS): .build $(FILES) $(CALF_FILES)
|
$(OBJS): .build $(FILES)
|
||||||
mkdir -p obj/usr/lib obj/bin
|
mkdir -p obj/calf
|
||||||
( cd obj && \
|
( cd obj && \
|
||||||
docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(OBJS:obj/%=/%) $(CALF_OBJS:obj/%=/%) | tar xf - ) && \
|
docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(OBJS:obj/%=/%) | tar xf - ) && \
|
||||||
touch $@
|
touch $@
|
||||||
|
|
||||||
hash: Makefile Dockerfile.build Dockerfile.pkg $(FILES) $(CALF_FILES) .build
|
$(CALF_OBJS): .build $(CALF_FILES)
|
||||||
|
mkdir -p obj/calf
|
||||||
|
( cd obj && \
|
||||||
|
docker run --rm --net=none --log-driver=none -i $(IMAGE):build tar -cf - $(CALF_OBJS:obj/%=/%) | tar xf - ) && \
|
||||||
|
touch $@
|
||||||
|
|
||||||
|
obj/config.json: dhcp-client/calf/config.json
|
||||||
|
mkdir -p obj/calf
|
||||||
|
cp $^ $@
|
||||||
|
|
||||||
|
hash: Makefile Dockerfile.build Dockerfile.pkg $(FILES) $(CALF_FILES) .build obj/config.json
|
||||||
{ cat $^; \
|
{ cat $^; \
|
||||||
docker run --rm --entrypoint sh $(IMAGE):build -c 'cat /lib/apk/db/installed'; \
|
docker run --rm --entrypoint sh $(IMAGE):build -c 'cat /lib/apk/db/installed'; \
|
||||||
docker run --rm --entrypoint sh $(IMAGE):build -c 'opam list'; } \
|
docker run --rm --entrypoint sh $(IMAGE):build -c 'opam list'; } \
|
||||||
|
@ -187,7 +187,6 @@ let set_ip_opt ctl k = function
|
|||||||
|
|
||||||
let start () dhcp_codes net ctl =
|
let start () dhcp_codes net ctl =
|
||||||
Netif_fd.connect net >>= fun net ->
|
Netif_fd.connect net >>= fun net ->
|
||||||
let ctl = Sdk.Ctl.Client.v (Lwt_unix.of_unix_file_descr ctl) in
|
|
||||||
let requests = match dhcp_codes with
|
let requests = match dhcp_codes with
|
||||||
| [] -> default_options
|
| [] -> default_options
|
||||||
| l ->
|
| l ->
|
||||||
@ -206,12 +205,15 @@ let start () dhcp_codes net ctl =
|
|||||||
set_ip_opt ctl "/gateway" result.gateway
|
set_ip_opt ctl "/gateway" result.gateway
|
||||||
|
|
||||||
(* FIXME: Main end *)
|
(* FIXME: Main end *)
|
||||||
let magic (x: int) = (Obj.magic x: Unix.file_descr)
|
|
||||||
|
let fd (x: int) = (Obj.magic x: Unix.file_descr)
|
||||||
|
|
||||||
|
let flow (x: int) = Sdk.Init.file_descr (Lwt_unix.of_unix_file_descr @@ fd x)
|
||||||
|
|
||||||
let start () dhcp_codes net ctl =
|
let start () dhcp_codes net ctl =
|
||||||
Lwt_main.run (
|
Lwt_main.run (
|
||||||
let net = magic net in
|
let net = fd net in
|
||||||
let ctl = magic ctl in
|
let ctl = Sdk.Ctl.Client.v (flow ctl) in
|
||||||
start () dhcp_codes net ctl
|
start () dhcp_codes net ctl
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,31 +51,26 @@ module Handlers = struct
|
|||||||
gateway;
|
gateway;
|
||||||
]
|
]
|
||||||
|
|
||||||
let watch ~ethif path =
|
let watch ~ethif db =
|
||||||
Ctl.v path >>= fun db ->
|
|
||||||
Lwt_list.map_p (fun f -> f db) (handlers ethif) >>= fun _ ->
|
Lwt_list.map_p (fun f -> f db) (handlers ethif) >>= fun _ ->
|
||||||
let t, _ = Lwt.task () in
|
let t, _ = Lwt.task () in
|
||||||
t
|
t
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
external bpf_filter: unit -> string = "bpf_filter"
|
external dhcp_filter: unit -> string = "bpf_filter"
|
||||||
|
|
||||||
let t = Init.Pipe.v ()
|
let t = Init.Pipe.v ()
|
||||||
|
|
||||||
let ctl = string_of_int Init.(Fd.to_int Pipe.(calf @@ ctl t))
|
|
||||||
let net = string_of_int Init.(Fd.to_int Pipe.(calf @@ net t))
|
|
||||||
let default_cmd = [
|
let default_cmd = [
|
||||||
"/dhcp-client-calf"; "--ctl="^ctl; "--net="^net
|
"/calf/dhcp-client-calf"; "--net=3"; "--ctl=4"; "-vv";
|
||||||
]
|
]
|
||||||
|
|
||||||
(* FIXME: use runc isolation
|
(*
|
||||||
let default_cmd = [
|
let default_cmd = [
|
||||||
"/usr/bin/runc"; "--"; "run";
|
"/usr/bin/runc"; "run"; "--preserve-fds"; "2"; "--bundle"; "."; "dhcp-client"
|
||||||
"--bundle"; "/containers/images/000-dhcp-client";
|
]
|
||||||
"dhcp-client"
|
*)
|
||||||
] in
|
|
||||||
*)
|
|
||||||
|
|
||||||
let read_cmd file =
|
let read_cmd file =
|
||||||
if Sys.file_exists file then
|
if Sys.file_exists file then
|
||||||
@ -91,7 +86,6 @@ let read_cmd file =
|
|||||||
| Some f -> read_cmd f
|
| Some f -> read_cmd f
|
||||||
in
|
in
|
||||||
Lwt_main.run (
|
Lwt_main.run (
|
||||||
let net = Init.rawlink ~filter:(bpf_filter ()) ethif in
|
|
||||||
let routes = [
|
let routes = [
|
||||||
"/ip";
|
"/ip";
|
||||||
"/gateway";
|
"/gateway";
|
||||||
@ -100,10 +94,10 @@ let read_cmd file =
|
|||||||
"/mtu";
|
"/mtu";
|
||||||
"/nameservers/*"
|
"/nameservers/*"
|
||||||
] in
|
] in
|
||||||
Ctl.v "/data" >>= fun ctl ->
|
Ctl.v "/data" >>= fun db ->
|
||||||
let fd = Init.(Fd.fd @@ Pipe.(priv @@ ctl t)) in
|
let ctl fd = Ctl.Server.listen ~routes db fd in
|
||||||
let ctl () = Ctl.Server.listen ~routes ctl fd in
|
let handlers () = Handlers.watch ~ethif db in
|
||||||
let handlers () = Handlers.watch ~ethif path in
|
let net = Init.rawlink ~filter:(dhcp_filter ()) ethif in
|
||||||
Init.run t ~net ~ctl ~handlers cmd
|
Init.run t ~net ~ctl ~handlers cmd
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -3,52 +3,61 @@ open Lwt.Infix
|
|||||||
let src = Logs.Src.create "IO" ~doc:"IO helpers"
|
let src = Logs.Src.create "IO" ~doc:"IO helpers"
|
||||||
module Log = (val Logs.src_log src : Logs.LOG)
|
module Log = (val Logs.src_log src : Logs.LOG)
|
||||||
|
|
||||||
let rec really_write fd buf off len =
|
(* from mirage-conduit. FIXME: move to mirage-flow *)
|
||||||
match len with
|
type 'a io = 'a Lwt.t
|
||||||
| 0 -> Lwt.return_unit
|
type buffer = Cstruct.t
|
||||||
| len ->
|
type error = [`Msg of string]
|
||||||
Log.debug (fun l -> l "really_write off=%d len=%d" off len);
|
type write_error = [ Mirage_flow.write_error | error ]
|
||||||
Lwt_unix.write fd buf off len >>= fun n ->
|
let pp_error ppf (`Msg s) = Fmt.string ppf s
|
||||||
if n = 0 then Lwt.fail_with "write 0"
|
|
||||||
else really_write fd buf (off+n) (len-n)
|
|
||||||
|
|
||||||
let write fd buf = really_write fd buf 0 (String.length buf)
|
let pp_write_error ppf = function
|
||||||
|
| #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e
|
||||||
|
| #error as e -> pp_error ppf e
|
||||||
|
|
||||||
let rec really_read fd buf off len =
|
type flow =
|
||||||
match len with
|
| Flow: string
|
||||||
| 0 -> Lwt.return_unit
|
* (module Mirage_flow_lwt.CONCRETE with type flow = 'a)
|
||||||
| len ->
|
* 'a
|
||||||
Log.debug (fun l -> l "really_read off=%d len=%d" off len);
|
-> flow
|
||||||
Lwt_unix.read fd buf off len >>= fun n ->
|
|
||||||
if n = 0 then Lwt.fail_with "read 0"
|
|
||||||
else really_read fd buf (off+n) (len-n)
|
|
||||||
|
|
||||||
let read_all fd =
|
let create (type a) (module M: Mirage_flow_lwt.S with type flow = a) t name =
|
||||||
Log.debug (fun l -> l "read_all");
|
let m =
|
||||||
let len = 16 * 1024 in
|
(module Mirage_flow_lwt.Concrete(M):
|
||||||
let buf = Bytes.create len in
|
Mirage_flow_lwt.CONCRETE with type flow = a)
|
||||||
let rec loop acc =
|
|
||||||
Lwt_unix.read fd buf 0 len >>= fun n ->
|
|
||||||
if n = 0 then Lwt.fail_with "read 0"
|
|
||||||
else
|
|
||||||
let acc = String.sub buf 0 n :: acc in
|
|
||||||
if n <= len then Lwt.return (List.rev acc)
|
|
||||||
else loop acc
|
|
||||||
in
|
in
|
||||||
loop [] >|= fun bufs ->
|
Flow (name, m , t)
|
||||||
String.concat "" bufs
|
|
||||||
|
|
||||||
let read_n fd len =
|
let read (Flow (_, (module F), flow)) = F.read flow
|
||||||
Log.debug (fun l -> l "read_n len=%d" len);
|
let write (Flow (_, (module F), flow)) b = F.write flow b
|
||||||
let buf = Bytes.create len in
|
let writev (Flow (_, (module F), flow)) b = F.writev flow b
|
||||||
let rec loop acc len =
|
let close (Flow (_, (module F), flow)) = F.close flow
|
||||||
Lwt_unix.read fd buf 0 len >>= fun n ->
|
let pp ppf (Flow (name, _, _)) = Fmt.string ppf name
|
||||||
if n = 0 then Lwt.fail_with "read 0"
|
|
||||||
else
|
type t = flow
|
||||||
let acc = String.sub buf 0 n :: acc in
|
|
||||||
match len - n with
|
let forward ~src ~dst =
|
||||||
| 0 -> Lwt.return (List.rev acc)
|
let rec loop () =
|
||||||
| r -> loop acc r
|
read src >>= function
|
||||||
|
| Ok `Eof ->
|
||||||
|
Log.err (fun l -> l "forward[%a => %a] EOF" pp src pp dst);
|
||||||
|
Lwt.return_unit
|
||||||
|
| Error e ->
|
||||||
|
Log.err (fun l -> l "forward[%a => %a] %a" pp src pp dst pp_error e);
|
||||||
|
Lwt.return_unit
|
||||||
|
| Ok (`Data buf) ->
|
||||||
|
Log.debug (fun l -> l "forward[%a => %a] %a"
|
||||||
|
pp src pp dst Cstruct.hexdump_pp buf);
|
||||||
|
write dst buf >>= function
|
||||||
|
| Ok () -> loop ()
|
||||||
|
| Error e ->
|
||||||
|
Log.err (fun l -> l "forward[%a => %a] %a"
|
||||||
|
pp src pp dst pp_write_error e);
|
||||||
|
Lwt.return_unit
|
||||||
in
|
in
|
||||||
loop [] len >|= fun bufs ->
|
loop ()
|
||||||
String.concat "" bufs
|
|
||||||
|
let proxy f1 f2 =
|
||||||
|
Lwt.join [
|
||||||
|
forward ~src:f1 ~dst:f2;
|
||||||
|
forward ~src:f2 ~dst:f1;
|
||||||
|
]
|
||||||
|
@ -1,16 +1,21 @@
|
|||||||
(** IO helpers *)
|
(** IO helpers *)
|
||||||
|
|
||||||
val really_write: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t
|
type t
|
||||||
(** [really_write fd buf off len] writes exactly [len] bytes to [fd]. *)
|
(** The type for IO flows *)
|
||||||
|
|
||||||
val write: Lwt_unix.file_descr -> string -> unit Lwt.t
|
include Mirage_flow_lwt.S with type flow = t
|
||||||
(** [write fd buf] writes all the buffer [buf] in [fd]. *)
|
|
||||||
|
|
||||||
val really_read: Lwt_unix.file_descr -> string -> int -> int -> unit Lwt.t
|
val create: (module Mirage_flow_lwt.S with type flow = 'a) -> 'a -> string -> flow
|
||||||
(** [really_read fd buf off len] reads exactly [len] bytes from [fd]. *)
|
(** [create (module M) t name] is the flow representing [t] using the
|
||||||
|
function defined in [M]. *)
|
||||||
|
|
||||||
val read_all: Lwt_unix.file_descr -> string Lwt.t
|
val pp: flow Fmt.t
|
||||||
(** [read_all fd] reads as much data as it is available in [fd]. *)
|
(** [pp] is the pretty-printer for IO flows. *)
|
||||||
|
|
||||||
val read_n: Lwt_unix.file_descr -> int -> string Lwt.t
|
val forward: src:t -> dst:t -> unit Lwt.t
|
||||||
(** [read_n fd n] reads exactly [n] bytes from [fd]. *)
|
(** [forward ~src ~dst] forwards writes from [src] to [dst]. Block
|
||||||
|
until either [src] or [dst] is closed. *)
|
||||||
|
|
||||||
|
val proxy: t -> t -> unit Lwt.t
|
||||||
|
(** [proxy x y] is the same as [forward x y <*> forward y x]. Block
|
||||||
|
until both flows are closed. *)
|
||||||
|
@ -4,6 +4,8 @@ open Astring
|
|||||||
let src = Logs.Src.create "init" ~doc:"Init steps"
|
let src = Logs.Src.create "init" ~doc:"Init steps"
|
||||||
module Log = (val Logs.src_log src : Logs.LOG)
|
module Log = (val Logs.src_log src : Logs.LOG)
|
||||||
|
|
||||||
|
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
|
||||||
|
|
||||||
(* FIXME: to avoid linking with gmp *)
|
(* FIXME: to avoid linking with gmp *)
|
||||||
module No_IO = struct
|
module No_IO = struct
|
||||||
type ic = unit
|
type ic = unit
|
||||||
@ -32,6 +34,8 @@ let () =
|
|||||||
(* FIXME: inotify need some unknown massaging. *)
|
(* FIXME: inotify need some unknown massaging. *)
|
||||||
(* Irmin_watcher.hook *)
|
(* Irmin_watcher.hook *)
|
||||||
|
|
||||||
|
module C = Mirage_channel_lwt.Make(IO)
|
||||||
|
|
||||||
module Query = struct
|
module Query = struct
|
||||||
|
|
||||||
(* FIXME: this should probably be replaced by protobuf *)
|
(* FIXME: this should probably be replaced by protobuf *)
|
||||||
@ -109,25 +113,37 @@ module Query = struct
|
|||||||
Cstruct.blit_from_bytes msg.payload 0 buf (sizeof_msg+path) payload;
|
Cstruct.blit_from_bytes msg.payload 0 buf (sizeof_msg+path) payload;
|
||||||
buf
|
buf
|
||||||
|
|
||||||
|
let err e = Lwt.return (Error (`Msg (Fmt.to_to_string C.pp_error e)))
|
||||||
|
let err_eof = Lwt.return (Error (`Msg "EOF"))
|
||||||
|
|
||||||
let read fd =
|
let read fd =
|
||||||
IO.read_n fd 4 >>= fun buf ->
|
let fd = C.create fd in
|
||||||
Log.debug (fun l -> l "Message.read len=%S" buf);
|
C.read_exactly fd ~len:4 >>= function
|
||||||
let len =
|
| Ok `Eof -> err_eof
|
||||||
Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0
|
| Error e -> err e
|
||||||
|> Int32.to_int
|
| Ok (`Data buf) ->
|
||||||
in
|
let buf = Cstruct.concat buf in
|
||||||
IO.read_n fd len >|= fun buf ->
|
Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf);
|
||||||
of_cstruct (Cstruct.of_string buf)
|
let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in
|
||||||
|
C.read_exactly fd ~len >>= function
|
||||||
|
| Ok `Eof -> err_eof
|
||||||
|
| Error e -> err e
|
||||||
|
| Ok (`Data buf) ->
|
||||||
|
let buf = Cstruct.concat buf in
|
||||||
|
Lwt.return (of_cstruct buf)
|
||||||
|
|
||||||
let write fd msg =
|
let write fd msg =
|
||||||
let buf = to_cstruct msg |> Cstruct.to_string in
|
let buf = to_cstruct msg in
|
||||||
let len =
|
let len =
|
||||||
let len = Cstruct.create 4 in
|
let len = Cstruct.create 4 in
|
||||||
Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ String.length buf);
|
Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf);
|
||||||
Cstruct.to_string len
|
len
|
||||||
in
|
in
|
||||||
IO.write fd len >>= fun () ->
|
IO.write fd len >>= function
|
||||||
IO.write fd buf
|
| Error e -> failf "Query.write(len) %a" IO.pp_write_error e
|
||||||
|
| Ok () -> IO.write fd buf >>= function
|
||||||
|
| Ok () -> Lwt.return_unit
|
||||||
|
| Error e -> failf "Query.write(buf) %a" IO.pp_write_error e
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -191,25 +207,37 @@ module Reply = struct
|
|||||||
Cstruct.blit_from_bytes msg.payload 0 buf sizeof_msg payload;
|
Cstruct.blit_from_bytes msg.payload 0 buf sizeof_msg payload;
|
||||||
buf
|
buf
|
||||||
|
|
||||||
|
let err e = Lwt.return (Result.Error (`Msg (Fmt.to_to_string C.pp_error e)))
|
||||||
|
let err_eof = Lwt.return (Result.Error (`Msg "EOF"))
|
||||||
|
|
||||||
let read fd =
|
let read fd =
|
||||||
IO.read_n fd 4 >>= fun buf ->
|
let fd = C.create fd in
|
||||||
Log.debug (fun l -> l "Message.read len=%S" buf);
|
C.read_exactly fd ~len:4 >>= function
|
||||||
let len =
|
| Ok `Eof -> err_eof
|
||||||
Cstruct.LE.get_uint32 (Cstruct.of_string buf) 0
|
| Error e -> err e
|
||||||
|> Int32.to_int
|
| Ok (`Data buf) ->
|
||||||
in
|
let buf = Cstruct.concat buf in
|
||||||
IO.read_n fd len >|= fun buf ->
|
Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf);
|
||||||
of_cstruct (Cstruct.of_string buf)
|
let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in
|
||||||
|
C.read_exactly fd ~len >>= function
|
||||||
|
| Ok `Eof -> err_eof
|
||||||
|
| Error e -> err e
|
||||||
|
| Ok (`Data buf) ->
|
||||||
|
let buf = Cstruct.concat buf in
|
||||||
|
Lwt.return (of_cstruct buf)
|
||||||
|
|
||||||
let write fd msg =
|
let write fd msg =
|
||||||
let buf = to_cstruct msg |> Cstruct.to_string in
|
let buf = to_cstruct msg in
|
||||||
let len =
|
let len =
|
||||||
let len = Cstruct.create 4 in
|
let len = Cstruct.create 4 in
|
||||||
Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ String.length buf);
|
Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf);
|
||||||
Cstruct.to_string len
|
len
|
||||||
in
|
in
|
||||||
IO.write fd len >>= fun () ->
|
IO.write fd len >>= function
|
||||||
IO.write fd buf
|
| Error e -> failf "Reply.write(len) %a" IO.pp_write_error e
|
||||||
|
| Ok () -> IO.write fd buf >>= function
|
||||||
|
| Ok () -> Lwt.return_unit
|
||||||
|
| Error e -> failf "Reply.write(buf) %a" IO.pp_write_error e
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -231,7 +259,7 @@ module Client = struct
|
|||||||
module Cache = Hashtbl.Make(K)
|
module Cache = Hashtbl.Make(K)
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
fd : Lwt_unix.file_descr;
|
fd : IO.t;
|
||||||
replies: Reply.t Cache.t;
|
replies: Reply.t Cache.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -327,16 +355,8 @@ module Server = struct
|
|||||||
| Some v -> ok q v
|
| Some v -> ok q v
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
let int_of_fd (t:Lwt_unix.file_descr) =
|
|
||||||
(Obj.magic (Lwt_unix.unix_file_descr t): int)
|
|
||||||
|
|
||||||
let listen ~routes db fd =
|
let listen ~routes db fd =
|
||||||
Lwt_unix.blocking fd >>= fun blocking ->
|
Log.debug (fun l -> l "Serving the control state over %a" IO.pp fd);
|
||||||
Log.debug (fun l ->
|
|
||||||
l "Serving the control state over fd:%d (blocking=%b)"
|
|
||||||
(int_of_fd fd) blocking
|
|
||||||
);
|
|
||||||
let queries = Queue.create () in
|
let queries = Queue.create () in
|
||||||
let cond = Lwt_condition.create () in
|
let cond = Lwt_condition.create () in
|
||||||
let rec listen () =
|
let rec listen () =
|
||||||
|
@ -28,10 +28,10 @@ module Query: sig
|
|||||||
val to_cstruct: t -> Cstruct.t
|
val to_cstruct: t -> Cstruct.t
|
||||||
(** [to_cstruct t] is the serialization of [t]. *)
|
(** [to_cstruct t] is the serialization of [t]. *)
|
||||||
|
|
||||||
val write: Lwt_unix.file_descr -> t -> unit Lwt.t
|
val write: IO.flow -> t -> unit Lwt.t
|
||||||
(** [write fd t] writes a query message. *)
|
(** [write fd t] writes a query message. *)
|
||||||
|
|
||||||
val read: Lwt_unix.file_descr -> (t, [`Msg of string]) result Lwt.t
|
val read: IO.flow -> (t, [`Msg of string]) result Lwt.t
|
||||||
(** [read fd] reads a query message. *)
|
(** [read fd] reads a query message. *)
|
||||||
|
|
||||||
end
|
end
|
||||||
@ -60,10 +60,10 @@ module Reply: sig
|
|||||||
val to_cstruct: t -> Cstruct.t
|
val to_cstruct: t -> Cstruct.t
|
||||||
(** [to_cstruct t] is the serialization of [t]. *)
|
(** [to_cstruct t] is the serialization of [t]. *)
|
||||||
|
|
||||||
val write: Lwt_unix.file_descr -> t -> unit Lwt.t
|
val write: IO.flow -> t -> unit Lwt.t
|
||||||
(** [write fd t] writes a reply message. *)
|
(** [write fd t] writes a reply message. *)
|
||||||
|
|
||||||
val read: Lwt_unix.file_descr -> (t, [`Msg of string]) result Lwt.t
|
val read: IO.flow -> (t, [`Msg of string]) result Lwt.t
|
||||||
(** [read fd] reads a reply message. *)
|
(** [read fd] reads a reply message. *)
|
||||||
|
|
||||||
end
|
end
|
||||||
@ -80,7 +80,7 @@ module Client: sig
|
|||||||
type t
|
type t
|
||||||
(** The type for client state. *)
|
(** The type for client state. *)
|
||||||
|
|
||||||
val v: Lwt_unix.file_descr -> t
|
val v: IO.t -> t
|
||||||
(** [v fd] is the client state using [fd] to send requests to the
|
(** [v fd] is the client state using [fd] to send requests to the
|
||||||
server. A client state also stores some state for all the
|
server. A client state also stores some state for all the
|
||||||
incomplete client queries. *)
|
incomplete client queries. *)
|
||||||
@ -108,7 +108,7 @@ val v: string -> KV.t Lwt.t
|
|||||||
|
|
||||||
module Server: sig
|
module Server: sig
|
||||||
|
|
||||||
val listen: routes:string list -> KV.t -> Lwt_unix.file_descr -> unit Lwt.t
|
val listen: routes:string list -> KV.t -> IO.t -> unit Lwt.t
|
||||||
(** [listen ~routes kv fd] is the thread exposing the KV store [kv],
|
(** [listen ~routes kv fd] is the thread exposing the KV store [kv],
|
||||||
holding control plane state, running inside the privileged
|
holding control plane state, running inside the privileged
|
||||||
container. [routes] are the routes exposed by the server to the
|
container. [routes] are the routes exposed by the server to the
|
||||||
|
@ -5,6 +5,127 @@ module Log = (val Logs.src_log src : Logs.LOG)
|
|||||||
|
|
||||||
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
|
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
|
||||||
|
|
||||||
|
let pp_fd ppf (t:Lwt_unix.file_descr) =
|
||||||
|
Fmt.int ppf (Obj.magic (Lwt_unix.unix_file_descr t): int)
|
||||||
|
|
||||||
|
let rec really_write fd buf off len =
|
||||||
|
match len with
|
||||||
|
| 0 -> Lwt.return_unit
|
||||||
|
| len ->
|
||||||
|
Log.debug (fun l -> l "really_write %a off=%d len=%d" pp_fd fd off len);
|
||||||
|
Lwt_unix.write fd buf off len >>= fun n ->
|
||||||
|
if n = 0 then Lwt.fail_with "write 0"
|
||||||
|
else really_write fd buf (off+n) (len-n)
|
||||||
|
|
||||||
|
let write_all fd buf = really_write fd buf 0 (String.length buf)
|
||||||
|
|
||||||
|
let read_all fd =
|
||||||
|
Log.debug (fun l -> l "read_all %a" pp_fd fd);
|
||||||
|
let len = 16 * 1024 in
|
||||||
|
let buf = Bytes.create len in
|
||||||
|
let rec loop acc =
|
||||||
|
Lwt_unix.read fd buf 0 len >>= fun n ->
|
||||||
|
if n = 0 then failf "read %a: 0" pp_fd fd
|
||||||
|
else
|
||||||
|
let acc = String.sub buf 0 n :: acc in
|
||||||
|
if n <= len then Lwt.return (List.rev acc)
|
||||||
|
else loop acc
|
||||||
|
in
|
||||||
|
loop [] >|= fun bufs ->
|
||||||
|
String.concat "" bufs
|
||||||
|
|
||||||
|
module Flow = struct
|
||||||
|
|
||||||
|
(* build a flow from Lwt_unix.file_descr *)
|
||||||
|
module Fd: Mirage_flow_lwt.CONCRETE with type flow = Lwt_unix.file_descr = struct
|
||||||
|
type 'a io = 'a Lwt.t
|
||||||
|
type buffer = Cstruct.t
|
||||||
|
type error = [`Msg of string]
|
||||||
|
type write_error = [ Mirage_flow.write_error | error ]
|
||||||
|
|
||||||
|
let pp_error ppf (`Msg s) = Fmt.string ppf s
|
||||||
|
|
||||||
|
let pp_write_error ppf = function
|
||||||
|
| #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e
|
||||||
|
| #error as e -> pp_error ppf e
|
||||||
|
|
||||||
|
type flow = Lwt_unix.file_descr
|
||||||
|
|
||||||
|
let err e = Lwt.return (Error (`Msg (Printexc.to_string e)))
|
||||||
|
|
||||||
|
let read t =
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
read_all t >|= fun buf -> Ok (`Data (Cstruct.of_string buf))
|
||||||
|
) (function Failure _ -> Lwt.return (Ok `Eof) | e -> err e)
|
||||||
|
|
||||||
|
let write t b =
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
write_all t (Cstruct.to_string b) >|= fun () -> Ok ()
|
||||||
|
) (fun e -> err e)
|
||||||
|
|
||||||
|
let close t = Lwt_unix.close t
|
||||||
|
|
||||||
|
let writev t bs =
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
Lwt_list.iter_s (fun b -> write_all t (Cstruct.to_string b)) bs
|
||||||
|
>|= fun () -> Ok ()
|
||||||
|
) (fun e -> err e)
|
||||||
|
end
|
||||||
|
|
||||||
|
(* build a flow from rawlink *)
|
||||||
|
module Rawlink: Mirage_flow_lwt.CONCRETE with type flow = Lwt_rawlink.t = struct
|
||||||
|
type 'a io = 'a Lwt.t
|
||||||
|
type buffer = Cstruct.t
|
||||||
|
type error = [`Msg of string]
|
||||||
|
type write_error = [ Mirage_flow.write_error | error ]
|
||||||
|
|
||||||
|
let pp_error ppf (`Msg s) = Fmt.string ppf s
|
||||||
|
|
||||||
|
let pp_write_error ppf = function
|
||||||
|
| #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error ppf e
|
||||||
|
| #error as e -> pp_error ppf e
|
||||||
|
|
||||||
|
type flow = Lwt_rawlink.t
|
||||||
|
|
||||||
|
let err e = Lwt.return (Error (`Msg (Printexc.to_string e)))
|
||||||
|
|
||||||
|
let read t =
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
Lwt_rawlink.read_packet t >|= fun buf -> Ok (`Data buf)
|
||||||
|
) (function Failure _ -> Lwt.return (Ok `Eof) | e -> err e)
|
||||||
|
|
||||||
|
let write t b =
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
Lwt_rawlink.send_packet t b >|= fun () -> Ok ()
|
||||||
|
) (fun e -> err e)
|
||||||
|
|
||||||
|
let close t = Lwt_rawlink.close_link t
|
||||||
|
|
||||||
|
let writev t bs =
|
||||||
|
Lwt.catch (fun () ->
|
||||||
|
Lwt_list.iter_s (Lwt_rawlink.send_packet t) bs >|= fun () -> Ok ()
|
||||||
|
) (fun e -> err e)
|
||||||
|
end
|
||||||
|
|
||||||
|
let int_of_fd t =
|
||||||
|
(Obj.magic (Lwt_unix.unix_file_descr t): int)
|
||||||
|
|
||||||
|
let fd ?name t =
|
||||||
|
IO.create (module Fd) t (match name with
|
||||||
|
| None -> string_of_int (int_of_fd t)
|
||||||
|
| Some n -> n)
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
let file_descr ?name t = Flow.fd ?name t
|
||||||
|
|
||||||
|
let rawlink ?filter ethif =
|
||||||
|
Log.debug (fun l -> l "bringing up %s" ethif);
|
||||||
|
(try Tuntap.set_up_and_running ethif
|
||||||
|
with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e));
|
||||||
|
let t = Lwt_rawlink.open_link ?filter ethif in
|
||||||
|
IO.create (module Flow.Rawlink) t ethif
|
||||||
|
|
||||||
module Fd = struct
|
module Fd = struct
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
@ -12,11 +133,14 @@ module Fd = struct
|
|||||||
fd : Lwt_unix.file_descr;
|
fd : Lwt_unix.file_descr;
|
||||||
}
|
}
|
||||||
|
|
||||||
let fd t = t.fd
|
|
||||||
let stdout = { name = "stdout"; fd = Lwt_unix.stdout }
|
let stdout = { name = "stdout"; fd = Lwt_unix.stdout }
|
||||||
let stderr = { name = "stderr"; fd = Lwt_unix.stderr }
|
let stderr = { name = "stderr"; fd = Lwt_unix.stderr }
|
||||||
let stdin = { name = "stdin" ; fd = Lwt_unix.stdin }
|
let stdin = { name = "stdin" ; fd = Lwt_unix.stdin }
|
||||||
|
|
||||||
|
let of_int name (i:int) =
|
||||||
|
let fd = Lwt_unix.of_unix_file_descr (Obj.magic i: Unix.file_descr) in
|
||||||
|
{ name; fd }
|
||||||
|
|
||||||
let to_int t =
|
let to_int t =
|
||||||
(Obj.magic (Lwt_unix.unix_file_descr t.fd): int)
|
(Obj.magic (Lwt_unix.unix_file_descr t.fd): int)
|
||||||
|
|
||||||
@ -41,53 +165,7 @@ module Fd = struct
|
|||||||
Lwt_unix.dup2 src.fd dst.fd;
|
Lwt_unix.dup2 src.fd dst.fd;
|
||||||
close src
|
close src
|
||||||
|
|
||||||
let proxy_net ~net fd =
|
let flow t = Flow.fd t.fd ~name:(Fmt.to_to_string pp t)
|
||||||
Log.debug (fun l -> l "proxy-net eth0 <=> %a" pp fd);
|
|
||||||
let rec listen_rawlink () =
|
|
||||||
Lwt_rawlink.read_packet net >>= fun buf ->
|
|
||||||
Log.debug (fun l -> l "PROXY-NET: => %a" Cstruct.hexdump_pp buf);
|
|
||||||
Log.debug (fun l -> l "PROXY-NET: => %S" (Cstruct.to_string buf));
|
|
||||||
let rec write buf =
|
|
||||||
Lwt_cstruct.write fd.fd buf >>= function
|
|
||||||
| 0 -> Lwt.return_unit
|
|
||||||
| n -> write (Cstruct.shift buf n)
|
|
||||||
in
|
|
||||||
write buf >>= fun () ->
|
|
||||||
listen_rawlink ()
|
|
||||||
in
|
|
||||||
let listen_socket () =
|
|
||||||
let len = 16 * 1024 in
|
|
||||||
let buf = Cstruct.create len in
|
|
||||||
let rec loop () =
|
|
||||||
Lwt_cstruct.read fd.fd buf >>= fun len ->
|
|
||||||
let buf = Cstruct.sub buf 0 len in
|
|
||||||
Log.debug (fun l -> l "PROXY-NET: <= %a" Cstruct.hexdump_pp buf);
|
|
||||||
Lwt_rawlink.send_packet net buf >>= fun () ->
|
|
||||||
loop ()
|
|
||||||
in
|
|
||||||
loop ()
|
|
||||||
in
|
|
||||||
Lwt.pick [
|
|
||||||
listen_rawlink ();
|
|
||||||
listen_socket ();
|
|
||||||
]
|
|
||||||
|
|
||||||
let forward ~src ~dst =
|
|
||||||
Log.debug (fun l -> l "forward %a => %a" pp src pp dst);
|
|
||||||
let len = 16 * 1024 in
|
|
||||||
let buf = Bytes.create len in
|
|
||||||
let rec loop () =
|
|
||||||
Lwt_unix.read src.fd buf 0 len >>= fun len ->
|
|
||||||
if len = 0 then Lwt.return_unit (* EOF *)
|
|
||||||
else (
|
|
||||||
Log.debug (fun l ->
|
|
||||||
l "FORWARD[%a => %a]: %S (%d)"
|
|
||||||
pp src pp dst (Bytes.sub buf 0 len) len);
|
|
||||||
IO.really_write dst.fd buf 0 len >>= fun () ->
|
|
||||||
loop ()
|
|
||||||
)
|
|
||||||
in
|
|
||||||
loop ()
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -130,10 +208,10 @@ module Pipe = struct
|
|||||||
(* logs pipe *)
|
(* logs pipe *)
|
||||||
let stdout = pipe "stdout" in
|
let stdout = pipe "stdout" in
|
||||||
let stderr = pipe "stderr" in
|
let stderr = pipe "stderr" in
|
||||||
(* store pipe *)
|
|
||||||
let ctl = socketpair "ctl" in
|
|
||||||
(* network pipe *)
|
(* network pipe *)
|
||||||
let net = socketpair "net" in
|
let net = socketpair "net" in
|
||||||
|
(* store pipe *)
|
||||||
|
let ctl = socketpair "ctl" in
|
||||||
(* metrics pipe *)
|
(* metrics pipe *)
|
||||||
let metrics = pipe "metrics" in
|
let metrics = pipe "metrics" in
|
||||||
{ stdout; stderr; ctl; net; metrics }
|
{ stdout; stderr; ctl; net; metrics }
|
||||||
@ -141,6 +219,7 @@ module Pipe = struct
|
|||||||
end
|
end
|
||||||
|
|
||||||
let exec_calf t cmd =
|
let exec_calf t cmd =
|
||||||
|
Log.info (fun l -> l "child pid is %d" Unix.(getpid ()));
|
||||||
Fd.(redirect_to_dev_null stdin) >>= fun () ->
|
Fd.(redirect_to_dev_null stdin) >>= fun () ->
|
||||||
|
|
||||||
(* close parent fds *)
|
(* close parent fds *)
|
||||||
@ -152,26 +231,22 @@ let exec_calf t cmd =
|
|||||||
|
|
||||||
let cmds = String.concat " " cmd in
|
let cmds = String.concat " " cmd in
|
||||||
|
|
||||||
let calf_net = Pipe.(calf t.net) in
|
let calf_stdout = Fd.of_int "stdout" 1 in
|
||||||
let calf_ctl = Pipe.(calf t.ctl) in
|
let calf_stderr = Fd.of_int "stderr" 2 in
|
||||||
let calf_stdout = Pipe.(calf t.stdout) in
|
let calf_net = Fd.of_int "net" 3 in
|
||||||
let calf_stderr = Pipe.(calf t.stderr) in
|
let calf_ctl = Fd.of_int "ctl" 4 in
|
||||||
|
|
||||||
Log.info (fun l -> l "Executing %s" cmds);
|
Log.info (fun l -> l "Executing %s" cmds);
|
||||||
Log.debug (fun l -> l "net-fd=%a store-fd=%a" Fd.pp calf_net Fd.pp calf_ctl);
|
|
||||||
|
|
||||||
Fd.dup2 ~src:calf_stdout ~dst:Fd.stdout >>= fun () ->
|
(* Move all open fds at the top *)
|
||||||
Fd.dup2 ~src:calf_stderr ~dst:Fd.stderr >>= fun () ->
|
Fd.dup2 ~src:Pipe.(calf t.stdout) ~dst:calf_stdout >>= fun () ->
|
||||||
|
Fd.dup2 ~src:Pipe.(calf t.stderr) ~dst:calf_stderr >>= fun () ->
|
||||||
|
Fd.dup2 ~src:Pipe.(calf t.net) ~dst:calf_net >>= fun () ->
|
||||||
|
Fd.dup2 ~src:Pipe.(calf t.ctl) ~dst:calf_ctl >>= fun () ->
|
||||||
|
|
||||||
(* exec the calf *)
|
(* exec the calf *)
|
||||||
Unix.execve (List.hd cmd) (Array.of_list cmd) [||]
|
Unix.execve (List.hd cmd) (Array.of_list cmd) [||]
|
||||||
|
|
||||||
let rawlink ?filter ethif =
|
|
||||||
Log.debug (fun l -> l "bringing up %s" ethif);
|
|
||||||
(try Tuntap.set_up_and_running ethif
|
|
||||||
with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e));
|
|
||||||
Lwt_rawlink.open_link ?filter ethif
|
|
||||||
|
|
||||||
let check_exit_status cmd status =
|
let check_exit_status cmd status =
|
||||||
let cmds = String.concat " " cmd in
|
let cmds = String.concat " " cmd in
|
||||||
match status with
|
match status with
|
||||||
@ -191,6 +266,12 @@ let exec_priv t ~pid ~cmd ~net ~ctl ~handlers =
|
|||||||
Fd.close Pipe.(calf t.ctl) >>= fun () ->
|
Fd.close Pipe.(calf t.ctl) >>= fun () ->
|
||||||
Fd.close Pipe.(calf t.metrics) >>= fun () ->
|
Fd.close Pipe.(calf t.metrics) >>= fun () ->
|
||||||
|
|
||||||
|
|
||||||
|
let priv_net = Fd.flow Pipe.(priv t.net) in
|
||||||
|
let priv_ctl = Fd.flow Pipe.(priv t.ctl) in
|
||||||
|
let priv_stdout = Fd.flow Pipe.(priv t.stdout) in
|
||||||
|
let priv_stderr = Fd.flow Pipe.(priv t.stderr) in
|
||||||
|
|
||||||
let wait () =
|
let wait () =
|
||||||
Lwt_unix.waitpid [] pid >>= fun (_pid, w) ->
|
Lwt_unix.waitpid [] pid >>= fun (_pid, w) ->
|
||||||
Lwt_io.flush_all () >>= fun () ->
|
Lwt_io.flush_all () >>= fun () ->
|
||||||
@ -200,13 +281,13 @@ let exec_priv t ~pid ~cmd ~net ~ctl ~handlers =
|
|||||||
Lwt.pick ([
|
Lwt.pick ([
|
||||||
wait ();
|
wait ();
|
||||||
(* data *)
|
(* data *)
|
||||||
Fd.proxy_net ~net Pipe.(priv t.net);
|
IO.proxy net priv_net;
|
||||||
|
|
||||||
(* redirect the calf stdout to the shim stdout *)
|
(* redirect the calf stdout to the shim stdout *)
|
||||||
Fd.forward ~src:Pipe.(priv t.stdout) ~dst:Fd.stdout;
|
IO.forward ~src:priv_stdout ~dst:Fd.(flow stdout);
|
||||||
Fd.forward ~src:Pipe.(priv t.stderr) ~dst:Fd.stderr;
|
IO.forward ~src:priv_stderr ~dst:Fd.(flow stderr);
|
||||||
(* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *)
|
(* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *)
|
||||||
ctl ();
|
ctl priv_ctl;
|
||||||
handlers ();
|
handlers ();
|
||||||
])
|
])
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
data, e.g. the IP address once a DHCP lease is obtained.}
|
data, e.g. the IP address once a DHCP lease is obtained.}
|
||||||
}*)
|
}*)
|
||||||
|
|
||||||
|
|
||||||
module Fd: sig
|
module Fd: sig
|
||||||
|
|
||||||
type t
|
type t
|
||||||
@ -22,12 +23,6 @@ module Fd: sig
|
|||||||
val pp: t Fmt.t
|
val pp: t Fmt.t
|
||||||
(** [pp_fd] pretty prints a file descriptor. *)
|
(** [pp_fd] pretty prints a file descriptor. *)
|
||||||
|
|
||||||
val fd: t -> Lwt_unix.file_descr
|
|
||||||
(** [fd t] is [t]'s underlying unix file descriptor. *)
|
|
||||||
|
|
||||||
val to_int: t -> int
|
|
||||||
(** [to_int fd] is [fd]'s number. *)
|
|
||||||
|
|
||||||
val redirect_to_dev_null: t -> unit Lwt.t
|
val redirect_to_dev_null: t -> unit Lwt.t
|
||||||
(** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *)
|
(** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *)
|
||||||
|
|
||||||
@ -37,13 +32,6 @@ module Fd: sig
|
|||||||
val dup2: src:t -> dst:t -> unit Lwt.t
|
val dup2: src:t -> dst:t -> unit Lwt.t
|
||||||
(** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *)
|
(** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *)
|
||||||
|
|
||||||
val proxy_net: net:Lwt_rawlink.t -> t -> unit Lwt.t
|
|
||||||
(** [proxy_net ~net fd] proxies the traffic between the raw net link
|
|
||||||
[net] and [fd]. *)
|
|
||||||
|
|
||||||
val forward: src:t -> dst:t -> unit Lwt.t
|
|
||||||
(** [forward ~src ~dst] forwards the flow from [src] to [dst]. *)
|
|
||||||
|
|
||||||
(** {1 Usefull File Descriptors} *)
|
(** {1 Usefull File Descriptors} *)
|
||||||
|
|
||||||
val stdin: t
|
val stdin: t
|
||||||
@ -55,8 +43,14 @@ module Fd: sig
|
|||||||
val stderr: t
|
val stderr: t
|
||||||
(** [stderr] is the standard error. *)
|
(** [stderr] is the standard error. *)
|
||||||
|
|
||||||
|
val flow: t -> IO.t
|
||||||
|
(** [flow t] is the flow representing [t]. *)
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
val file_descr: ?name:string -> Lwt_unix.file_descr -> IO.t
|
||||||
|
(** [file_descr ?name fd] is the flow for the file-descripor [fd]. *)
|
||||||
|
|
||||||
module Pipe: sig
|
module Pipe: sig
|
||||||
|
|
||||||
type t
|
type t
|
||||||
@ -102,18 +96,21 @@ module Pipe: sig
|
|||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
val rawlink: ?filter:string -> string -> Lwt_rawlink.t
|
val rawlink: ?filter:string -> string -> IO.t
|
||||||
(** [rawlink ?filter i] is the net raw link to the interface [i] using
|
(** [rawlink ?filter x] is the flow using the network interface
|
||||||
the (optional) BPF filter [filter]. *)
|
[x]. The packets can be filtered using the BPF filter
|
||||||
|
[filter]. See the documentation of
|
||||||
|
{{:https://github.com/haesbaert/rawlink}rawlink} for more details
|
||||||
|
on how to build that filter. *)
|
||||||
|
|
||||||
|
(* FIXME(samoht): not very happy with that signatue *)
|
||||||
val run: Pipe.monitor ->
|
val run: Pipe.monitor ->
|
||||||
net:Lwt_rawlink.t ->
|
net:IO.t ->
|
||||||
ctl:(unit -> unit Lwt.t) ->
|
ctl:(IO.t -> unit Lwt.t) ->
|
||||||
handlers:(unit -> unit Lwt.t) ->
|
handlers:(unit -> unit Lwt.t) ->
|
||||||
string list -> unit Lwt.t
|
string list -> unit Lwt.t
|
||||||
(** [run m ~net ~ctl ~handlers cmd] runs [cmd] in a unprivileged calf
|
(** [run m ~net ~ctl ~handlers cmd] runs [cmd] in a unprivileged calf
|
||||||
process. [ctl] is the control thread connected to the {Pipe.ctl}
|
process. [net] is the network interface flow. [ctl] is the control
|
||||||
pipe. [net] is the net raw link which will be connected to the
|
thread connected to the {Pipe.ctl} pipe. [handlers] are the system
|
||||||
calf via the {!Pipe.net} socket pair. [handlers] are the system
|
|
||||||
handler thread which will react to control data to perform
|
handler thread which will react to control data to perform
|
||||||
privileged system actions. *)
|
privileged system actions. *)
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
((name sdk)
|
((name sdk)
|
||||||
(libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty
|
(libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty
|
||||||
decompress irmin irmin-git lwt.unix rawlink tuntap dispatch
|
decompress irmin irmin-git lwt.unix rawlink tuntap dispatch
|
||||||
irmin-watcher inotify astring rresult))
|
irmin-watcher inotify astring rresult mirage-flow-lwt
|
||||||
|
mirage-channel-lwt io-page.unix))
|
||||||
(preprocess (per_file ((pps (cstruct.ppx)) (ctl))))
|
(preprocess (per_file ((pps (cstruct.ppx)) (ctl))))
|
||||||
))
|
))
|
||||||
|
@ -8,44 +8,50 @@ let random_string n =
|
|||||||
(* workaround https://github.com/mirage/alcotest/issues/88 *)
|
(* workaround https://github.com/mirage/alcotest/issues/88 *)
|
||||||
exception Check_error of string
|
exception Check_error of string
|
||||||
|
|
||||||
let check_raises msg exn f =
|
let check_raises msg f =
|
||||||
Lwt.catch (fun () ->
|
Lwt.catch (fun () ->
|
||||||
f () >>= fun () ->
|
f () >>= fun () ->
|
||||||
Lwt.fail (Check_error msg)
|
Lwt.fail (Check_error msg)
|
||||||
) (function
|
) (function
|
||||||
| Check_error e -> Alcotest.fail e
|
| Check_error e -> Alcotest.fail e
|
||||||
| e ->
|
| _ -> Lwt.return_unit
|
||||||
if exn e then Lwt.return_unit
|
)
|
||||||
else Fmt.kstrf Alcotest.fail "%s raised %a" msg Fmt.exn e)
|
|
||||||
|
|
||||||
let is_unix_error = function
|
|
||||||
| Unix.Unix_error _ -> true
|
|
||||||
| _ -> false
|
|
||||||
|
|
||||||
let escape = String.Ascii.escape
|
let escape = String.Ascii.escape
|
||||||
|
|
||||||
let write fd strs =
|
let write fd strs =
|
||||||
Lwt_list.iter_s (fun str ->
|
Lwt_list.iter_s (fun str ->
|
||||||
IO.really_write fd str 0 (String.length str)
|
IO.write fd (Cstruct.of_string str) >>= function
|
||||||
|
| Ok () -> Lwt.return_unit
|
||||||
|
| Error e -> Fmt.kstrf Lwt.fail_with "write: %a" IO.pp_write_error e
|
||||||
) strs
|
) strs
|
||||||
|
|
||||||
|
let read fd =
|
||||||
|
IO.read fd >>= function
|
||||||
|
| Ok (`Data x) -> Lwt.return (Cstruct.to_string x)
|
||||||
|
| Ok `Eof -> Lwt.fail_with "read: EOF"
|
||||||
|
| Error e -> Fmt.kstrf Lwt.fail_with "read: %a" IO.pp_error e
|
||||||
|
|
||||||
|
let calf pipe = Init.(Fd.flow Pipe.(calf pipe))
|
||||||
|
let priv pipe = Init.(Fd.flow Pipe.(priv pipe))
|
||||||
|
|
||||||
let test_pipe pipe () =
|
let test_pipe pipe () =
|
||||||
let calf = Init.Fd.fd @@ Init.Pipe.(calf pipe) in
|
let calf = calf pipe in
|
||||||
let priv = Init.Fd.fd @@ Init.Pipe.(priv pipe) in
|
let priv = priv pipe in
|
||||||
let name = Init.Pipe.name pipe in
|
let name = Init.Pipe.name pipe in
|
||||||
let test strs =
|
let test strs =
|
||||||
let escape_strs = String.concat ~sep:"" @@ List.map escape strs in
|
let escape_strs = String.concat ~sep:"" @@ List.map escape strs in
|
||||||
(* pipes are unidirectional *)
|
(* pipes are unidirectional *)
|
||||||
(* calf -> priv works *)
|
(* calf -> priv works *)
|
||||||
write calf strs >>= fun () ->
|
write calf strs >>= fun () ->
|
||||||
IO.read_all priv >>= fun buf ->
|
read priv >>= fun buf ->
|
||||||
let msg = Fmt.strf "%s: calf -> priv" name in
|
let msg = Fmt.strf "%s: calf -> priv" name in
|
||||||
Alcotest.(check string) msg escape_strs (escape buf);
|
Alcotest.(check string) msg escape_strs (escape buf);
|
||||||
(* priv -> calf don't *)
|
(* priv -> calf don't *)
|
||||||
check_raises (Fmt.strf "%s: priv side is writable!" name) is_unix_error
|
check_raises (Fmt.strf "%s: priv side is writable!" name)
|
||||||
(fun () -> write priv strs) >>= fun () ->
|
(fun () -> write priv strs) >>= fun () ->
|
||||||
check_raises (Fmt.strf "%s: calf sid is readable!" name) is_unix_error
|
check_raises (Fmt.strf "%s: calf sid is readable!" name)
|
||||||
(fun () -> IO.read_all calf >|= ignore) >>= fun () ->
|
(fun () -> read calf >|= ignore) >>= fun () ->
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
in
|
in
|
||||||
test [random_string 1] >>= fun () ->
|
test [random_string 1] >>= fun () ->
|
||||||
@ -56,19 +62,19 @@ let test_pipe pipe () =
|
|||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
let test_socketpair pipe () =
|
let test_socketpair pipe () =
|
||||||
let calf = Init.Fd.fd @@ Init.Pipe.(calf pipe) in
|
let calf = calf pipe in
|
||||||
let priv = Init.Fd.fd @@ Init.Pipe.(priv pipe) in
|
let priv = priv pipe in
|
||||||
let name = Init.Pipe.name pipe in
|
let name = Init.Pipe.name pipe in
|
||||||
let test strs =
|
let test strs =
|
||||||
let escape_strs = String.concat ~sep:"" @@ List.map escape strs in
|
let escape_strs = String.concat ~sep:"" @@ List.map escape strs in
|
||||||
(* socket pairs are bi-directional *)
|
(* socket pairs are bi-directional *)
|
||||||
(* calf -> priv works *)
|
(* calf -> priv works *)
|
||||||
write calf strs >>= fun () ->
|
write calf strs >>= fun () ->
|
||||||
IO.read_all priv >>= fun buf ->
|
read priv >>= fun buf ->
|
||||||
Alcotest.(check string) (name ^ " calf -> priv") escape_strs (escape buf);
|
Alcotest.(check string) (name ^ " calf -> priv") escape_strs (escape buf);
|
||||||
(* priv -> cal works *)
|
(* priv -> cal works *)
|
||||||
write priv strs >>= fun () ->
|
write priv strs >>= fun () ->
|
||||||
IO.read_all calf >>= fun buf ->
|
read calf >>= fun buf ->
|
||||||
Alcotest.(check string) (name ^ " priv -> calf") escape_strs (escape buf);
|
Alcotest.(check string) (name ^ " priv -> calf") escape_strs (escape buf);
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
in
|
in
|
||||||
@ -118,8 +124,8 @@ let test_serialization to_cstruct of_cstruct message messages =
|
|||||||
List.iter test messages
|
List.iter test messages
|
||||||
|
|
||||||
let test_send t write read message messages =
|
let test_send t write read message messages =
|
||||||
let calf = Init.Fd.fd @@ Init.Pipe.(calf @@ ctl t) in
|
let calf = Init.(Fd.flow Pipe.(calf @@ ctl t)) in
|
||||||
let priv = Init.Fd.fd @@ Init.Pipe.(priv @@ ctl t) in
|
let priv = Init.(Fd.flow Pipe.(priv @@ ctl t)) in
|
||||||
let test m =
|
let test m =
|
||||||
write calf m >>= fun () ->
|
write calf m >>= fun () ->
|
||||||
read priv >|= function
|
read priv >|= function
|
||||||
@ -192,8 +198,8 @@ let delete_should_work t k =
|
|||||||
| Error (`Msg e) -> failf "write(%s) -> error: %s" k e
|
| Error (`Msg e) -> failf "write(%s) -> error: %s" k e
|
||||||
|
|
||||||
let test_ctl t () =
|
let test_ctl t () =
|
||||||
let calf = Init.Fd.fd @@ Init.Pipe.(calf @@ ctl t) in
|
let calf = Init.(Fd.flow Init.Pipe.(calf @@ ctl t)) in
|
||||||
let priv = Init.Fd.fd @@ Init.Pipe.(priv @@ ctl t) in
|
let priv = Init.(Fd.flow Init.Pipe.(priv @@ ctl t)) in
|
||||||
let k1 = "/foo/bar" in
|
let k1 = "/foo/bar" in
|
||||||
let k2 = "a" in
|
let k2 = "a" in
|
||||||
let k3 = "b/c" in
|
let k3 = "b/c" in
|
||||||
|
Loading…
Reference in New Issue
Block a user