mirror of
https://github.com/linuxkit/linuxkit.git
synced 2025-07-22 10:31:35 +00:00
commit
fb0803087c
@ -28,7 +28,7 @@ services:
|
|||||||
oomScoreAdj: -800
|
oomScoreAdj: -800
|
||||||
readonly: true
|
readonly: true
|
||||||
- name: dhcp-client
|
- name: dhcp-client
|
||||||
image: mobylinux/dhcp-client:882ad65d1ef89a9a307b019c61f5f69301f59214
|
image: mobylinux/dhcp-client:a7a6b49b0ff51ffa2f44ac848cd649e29f946e0c
|
||||||
net: host
|
net: host
|
||||||
capabilities:
|
capabilities:
|
||||||
- CAP_NET_ADMIN # to bring eth0 up
|
- CAP_NET_ADMIN # to bring eth0 up
|
||||||
|
@ -1,20 +1,22 @@
|
|||||||
FROM ocaml/opam:alpine-3.5_ocaml-4.04.0
|
FROM ocaml/opam:alpine-3.5_ocaml-4.04.0
|
||||||
RUN cd /home/opam/opam-repository && git pull && opam update -u
|
RUN cd /home/opam/opam-repository && git pull && opam update -u
|
||||||
|
|
||||||
# to be able to use cstruct.ppx + jbuilder
|
## pins for priv
|
||||||
RUN opam pin add cstruct 2.4.0 -n
|
|
||||||
# to bring eth0 up
|
# to bring eth0 up
|
||||||
RUN opam pin add tuntap 1.0.0 -n
|
RUN opam pin add tuntap 1.0.0 -n
|
||||||
RUN opam pin add mirage-net-fd --dev -n
|
RUN opam pin add mirage-net-fd 0.2.0 -n
|
||||||
|
|
||||||
RUN opam depext -iy mirage-net-unix logs-syslog irmin-unix cohttp decompress
|
## pins for calf
|
||||||
RUN opam depext -iy rawlink tuntap jbuilder irmin-watcher inotify rresult
|
|
||||||
|
|
||||||
# TMP: to compile the calf
|
RUN opam pin add charrua-client https://github.com/yomimono/charrua-client.git#state-halfway -n
|
||||||
RUN opam pin add -n charrua-client https://github.com/yomimono/charrua-client.git#state-halfway
|
|
||||||
RUN opam depext -uiy ocamlfind topkg-care ocamlbuild lwt mirage-types-lwt mirage
|
## depdendencies
|
||||||
RUN opam depext -uiy charrua-client cohttp conduit mirage-unix
|
|
||||||
RUN opam depext -uiy mirage-net-fd ptime mirage-logs
|
RUN opam depext -iy \
|
||||||
|
irmin-unix cohttp decompress rawlink tuntap jbuilder irmin-watcher inotify \
|
||||||
|
rresult lwt capnp charrua-client mirage-net-fd ptime bos \
|
||||||
|
mirage-flow-lwt mirage-channel-lwt mirage-types-lwt
|
||||||
|
|
||||||
RUN sudo mkdir -p /src
|
RUN sudo mkdir -p /src
|
||||||
COPY ./sdk /src/sdk
|
COPY ./sdk /src/sdk
|
||||||
@ -28,6 +30,7 @@ RUN opam list
|
|||||||
|
|
||||||
RUN opam config exec -- jbuilder build dhcp-client/main.exe
|
RUN opam config exec -- jbuilder build dhcp-client/main.exe
|
||||||
RUN sudo cp /src/_build/default/dhcp-client/main.exe /dhcp-client
|
RUN sudo cp /src/_build/default/dhcp-client/main.exe /dhcp-client
|
||||||
|
RUN apk add capnp
|
||||||
|
|
||||||
RUN opam config exec -- jbuilder build dhcp-client/calf/unikernel.exe
|
RUN opam config exec -- jbuilder build dhcp-client/calf/unikernel.exe
|
||||||
RUN sudo mkdir -p /calf
|
RUN sudo mkdir -p /calf
|
||||||
|
@ -1,20 +1,22 @@
|
|||||||
FROM ocaml/opam:alpine-3.5_ocaml-4.04.0
|
FROM ocaml/opam:alpine-3.5_ocaml-4.04.0
|
||||||
RUN cd /home/opam/opam-repository && git pull && opam update -u
|
RUN cd /home/opam/opam-repository && git pull && opam update -u
|
||||||
|
|
||||||
# to be able to use cstruct.ppx + jbuilder
|
## pins for priv
|
||||||
RUN opam pin add cstruct 2.4.0 -n
|
|
||||||
# to bring eth0 up
|
# to bring eth0 up
|
||||||
RUN opam pin add tuntap 1.0.0 -n
|
RUN opam pin add tuntap 1.0.0 -n
|
||||||
RUN opam pin add mirage-net-fd --dev -n
|
RUN opam pin add mirage-net-fd 0.2.0 -n
|
||||||
|
|
||||||
RUN opam depext -iy mirage-net-unix logs-syslog irmin-unix cohttp decompress
|
## pins for calf
|
||||||
RUN opam depext -iy rawlink tuntap jbuilder irmin-watcher inotify rresult
|
|
||||||
|
|
||||||
# TMP: to compile the calf
|
RUN opam pin add charrua-client https://github.com/yomimono/charrua-client.git#state-halfway -n
|
||||||
RUN opam pin add -n charrua-client https://github.com/yomimono/charrua-client.git#state-halfway
|
|
||||||
RUN opam depext -uiy ocamlfind topkg-care ocamlbuild lwt mirage-types-lwt mirage
|
## depdendencies
|
||||||
RUN opam depext -uiy charrua-client cohttp conduit mirage-unix
|
|
||||||
RUN opam depext -uiy mirage-net-fd ptime mirage-logs
|
RUN opam depext -iy \
|
||||||
|
irmin-unix cohttp decompress rawlink tuntap jbuilder irmin-watcher inotify \
|
||||||
|
rresult lwt capnp charrua-client mirage-net-fd ptime bos \
|
||||||
|
mirage-flow-lwt mirage-channel-lwt mirage-types-lwt
|
||||||
|
|
||||||
RUN sudo mkdir -p /src /bin
|
RUN sudo mkdir -p /src /bin
|
||||||
COPY . /src
|
COPY . /src
|
||||||
|
@ -15,7 +15,7 @@ default: push
|
|||||||
@
|
@
|
||||||
|
|
||||||
.build: Dockerfile.build $(FILES)
|
.build: Dockerfile.build $(FILES)
|
||||||
docker build -t $(IMAGE):build -f Dockerfile.build -q . > .build || \
|
docker build $(NO_CACHE) -t $(IMAGE):build -f Dockerfile.build -q . > .build || \
|
||||||
(rm -f $@ && exit 1)
|
(rm -f $@ && exit 1)
|
||||||
|
|
||||||
.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) obj/config.json
|
.pkg: Dockerfile.pkg $(OBJS) $(CALF_OBJS) obj/config.json
|
||||||
@ -23,7 +23,7 @@ default: push
|
|||||||
(rm -f $@ && exit 1)
|
(rm -f $@ && exit 1)
|
||||||
|
|
||||||
.dev: Dockerfile.dev init-dev.sh
|
.dev: Dockerfile.dev init-dev.sh
|
||||||
docker build -t $(IMAGE):dev -f Dockerfile.dev -q . > .dev || \
|
docker build $(NO_CACHE) -t $(IMAGE):dev -f Dockerfile.dev -q . > .dev || \
|
||||||
(rm -f $@ && exit 1)
|
(rm -f $@ && exit 1)
|
||||||
|
|
||||||
enter-pkg: .pkg
|
enter-pkg: .pkg
|
||||||
|
@ -175,18 +175,21 @@ let setup_log =
|
|||||||
|
|
||||||
module Dhcp_client = Dhcp_client_mirage.Make(Time)(Net)
|
module Dhcp_client = Dhcp_client_mirage.Make(Time)(Net)
|
||||||
|
|
||||||
|
let pp_path = Fmt.(list ~sep:(unit "/") string)
|
||||||
|
|
||||||
let set_ip ctl k ip =
|
let set_ip ctl k ip =
|
||||||
let str = Ipaddr.V4.to_string ip ^ "\n" in
|
let str = Ipaddr.V4.to_string ip ^ "\n" in
|
||||||
Sdk.Ctl.Client.write ctl k str >>= function
|
Sdk.Ctl.Client.write ctl k str >>= function
|
||||||
| Ok () -> Lwt.return_unit
|
| Ok () -> Lwt.return_unit
|
||||||
| Error e -> failf "error while writing %s: %a" k Sdk.Ctl.Client.pp_error e
|
| Error e ->
|
||||||
|
failf "error while writing %a: %a" pp_path k Sdk.Ctl.Client.pp_error e
|
||||||
|
|
||||||
let set_ip_opt ctl k = function
|
let set_ip_opt ctl k = function
|
||||||
| None -> Lwt.return_unit
|
| None -> Lwt.return_unit
|
||||||
| Some ip -> set_ip ctl k ip
|
| Some ip -> set_ip ctl k ip
|
||||||
|
|
||||||
let get_mac ctl =
|
let get_mac ctl =
|
||||||
Sdk.Ctl.Client.read ctl "/mac" >>= function
|
Sdk.Ctl.Client.read ctl ["mac"] >>= function
|
||||||
| Ok None -> Lwt.return None
|
| Ok None -> Lwt.return None
|
||||||
| Ok Some s -> Lwt.return @@ Macaddr.of_string (String.trim s)
|
| Ok Some s -> Lwt.return @@ Macaddr.of_string (String.trim s)
|
||||||
| Error e -> failf "get_mac: %a" Sdk.Ctl.Client.pp_error e
|
| Error e -> failf "get_mac: %a" Sdk.Ctl.Client.pp_error e
|
||||||
@ -208,8 +211,8 @@ let start () dhcp_codes net ctl =
|
|||||||
Lwt_stream.last_new stream >>= fun result ->
|
Lwt_stream.last_new stream >>= fun result ->
|
||||||
let result = of_ipv4_config result in
|
let result = of_ipv4_config result in
|
||||||
Log.info (fun l -> l "found lease: %a" pp result);
|
Log.info (fun l -> l "found lease: %a" pp result);
|
||||||
set_ip ctl "/ip" result.address >>= fun () ->
|
set_ip ctl ["ip"] result.address >>= fun () ->
|
||||||
set_ip_opt ctl "/gateway" result.gateway
|
set_ip_opt ctl ["gateway"] result.gateway
|
||||||
|
|
||||||
(* FIXME: Main end *)
|
(* FIXME: Main end *)
|
||||||
|
|
||||||
|
@ -84,9 +84,9 @@ let run () cmd ethif path =
|
|||||||
in
|
in
|
||||||
Lwt_main.run (
|
Lwt_main.run (
|
||||||
let routes = [
|
let routes = [
|
||||||
"/ip" , [`Write];
|
["ip"] , [`Write];
|
||||||
"/mac" , [`Read ];
|
["mac"] , [`Read ];
|
||||||
"/gateway", [`Write];
|
["gateway"], [`Write];
|
||||||
] in
|
] in
|
||||||
Ctl.v path >>= fun db ->
|
Ctl.v path >>= fun db ->
|
||||||
let ctl fd = Ctl.Server.listen ~routes db fd in
|
let ctl fd = Ctl.Server.listen ~routes db fd in
|
||||||
|
@ -1,11 +1,8 @@
|
|||||||
open Lwt.Infix
|
open Lwt.Infix
|
||||||
open Astring
|
|
||||||
|
|
||||||
let src = Logs.Src.create "init" ~doc:"Init steps"
|
let src = Logs.Src.create "init" ~doc:"Init steps"
|
||||||
module Log = (val Logs.src_log src : Logs.LOG)
|
module Log = (val Logs.src_log src : Logs.LOG)
|
||||||
|
|
||||||
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
|
|
||||||
|
|
||||||
(* FIXME: to avoid linking with gmp *)
|
(* FIXME: to avoid linking with gmp *)
|
||||||
module No_IO = struct
|
module No_IO = struct
|
||||||
type ic = unit
|
type ic = unit
|
||||||
@ -36,214 +33,199 @@ let () =
|
|||||||
|
|
||||||
module C = Mirage_channel_lwt.Make(IO)
|
module C = Mirage_channel_lwt.Make(IO)
|
||||||
|
|
||||||
module Query = struct
|
module P = Proto.Make(Capnp.BytesMessage)
|
||||||
|
|
||||||
(* FIXME: this should probably be replaced by protobuf *)
|
exception Undefined_field of int
|
||||||
|
|
||||||
[%%cenum
|
module Endpoint = struct
|
||||||
type operation =
|
|
||||||
| Write
|
let compression = `None
|
||||||
| Read
|
|
||||||
| Delete
|
|
||||||
[@@uint8_t]
|
|
||||||
]
|
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
version : int32;
|
output : IO.t;
|
||||||
id : int32;
|
input : C.t; (* reads are buffered *)
|
||||||
operation: operation;
|
decoder: Capnp.Codecs.FramedStream.t;
|
||||||
path : string;
|
|
||||||
payload : string;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[%%cstruct type msg = {
|
type error = [
|
||||||
version : uint32_t; (* protocol version *)
|
| `IO of IO.write_error
|
||||||
id : uint32_t; (* session identifier *)
|
| `Channel of C.error
|
||||||
operation : uint8_t; (* = type operation *)
|
| `Msg of string
|
||||||
path : uint16_t;
|
| `Undefined_field of int
|
||||||
payload : uint32_t;
|
|
||||||
} [@@little_endian]
|
|
||||||
]
|
]
|
||||||
|
|
||||||
type error = [ `Eof | `Msg of string ]
|
let pp_error ppf (e:error) = match e with
|
||||||
let pp_error ppf t = Fmt.string ppf (match t with `Eof -> "EOF" | `Msg s -> s)
|
| `IO e -> Fmt.pf ppf "IO: %a" IO.pp_write_error e
|
||||||
|
| `Channel e -> Fmt.pf ppf "channel: %a" C.pp_error e
|
||||||
|
| `Msg e -> Fmt.string ppf e
|
||||||
|
| `Undefined_field i -> Fmt.pf ppf "undefined field %d" i
|
||||||
|
|
||||||
(* to avoid warning 32 *)
|
let err_io e = Error (`IO e)
|
||||||
let _ = hexdump_msg
|
let err_channel e = Error (`Channel e)
|
||||||
let _ = string_to_operation
|
let err_msg fmt = Fmt.kstrf (fun s -> Error (`Msg s)) fmt
|
||||||
|
let err_frame = err_msg "Unsupported Cap'n'Proto frame received"
|
||||||
|
let err_undefined_field i = Error (`Undefined_field i)
|
||||||
|
|
||||||
let pp ppf t =
|
let v fd =
|
||||||
Fmt.pf ppf "%ld:%s:%S:%S"
|
let output = fd in
|
||||||
t.id (operation_to_string t.operation) t.path t.payload
|
let input = C.create fd in
|
||||||
|
let decoder = Capnp.Codecs.FramedStream.empty compression in
|
||||||
|
{ output; input; decoder }
|
||||||
|
|
||||||
(* FIXME: allocate less ... *)
|
let send t msg =
|
||||||
|
let buf = Capnp.Codecs.serialize ~compression msg in
|
||||||
|
(* FIXME: avoid copying *)
|
||||||
|
IO.write t.output (Cstruct.of_string buf) >|= function
|
||||||
|
| Error e -> err_io e
|
||||||
|
| Ok () -> Ok ()
|
||||||
|
|
||||||
let of_cstruct buf =
|
let rec recv t =
|
||||||
let open Rresult.R in
|
match Capnp.Codecs.FramedStream.get_next_frame t.decoder with
|
||||||
Log.debug (fun l -> l "Query.of_cstruct %S" @@ Cstruct.to_string buf);
|
| Ok msg -> Lwt.return (Ok (`Data msg))
|
||||||
let version = get_msg_version buf in
|
| Error Capnp.Codecs.FramingError.Unsupported -> Lwt.return err_frame
|
||||||
let id = get_msg_id buf in
|
| Error Capnp.Codecs.FramingError.Incomplete ->
|
||||||
(match int_to_operation (get_msg_operation buf) with
|
Log.info (fun f -> f "Endpoint.recv: incomplete; waiting for more data");
|
||||||
| None -> Error (`Msg "invalid operation")
|
C.read_some ~len:4096 t.input >>= function
|
||||||
| Some o -> Ok o)
|
| Ok `Eof -> Lwt.return (Ok `Eof)
|
||||||
>>= fun operation ->
|
| Error e -> Lwt.return (err_channel e)
|
||||||
let path_len = get_msg_path buf in
|
| Ok (`Data data) ->
|
||||||
let payload_len = get_msg_payload buf in
|
(* FIXME: avoid copying *)
|
||||||
let path =
|
let data = Cstruct.to_string data in
|
||||||
Cstruct.sub buf sizeof_msg path_len
|
Log.info (fun f -> f "Got %S" data);
|
||||||
|> Cstruct.to_string
|
Capnp.Codecs.FramedStream.add_fragment t.decoder data;
|
||||||
in
|
recv t
|
||||||
let payload =
|
|
||||||
Cstruct.sub buf (sizeof_msg + path_len) (Int32.to_int payload_len)
|
|
||||||
|> Cstruct.to_string
|
|
||||||
in
|
|
||||||
if String.Ascii.is_valid path then Ok { version; id; operation; path; payload }
|
|
||||||
else Error (`Msg "invalid path")
|
|
||||||
|
|
||||||
let to_cstruct msg =
|
|
||||||
Log.debug (fun l -> l "Query.to_cstruct %a" pp msg);
|
|
||||||
let operation = operation_to_int msg.operation in
|
|
||||||
let path = String.length msg.path in
|
|
||||||
let payload = String.length msg.payload in
|
|
||||||
let len = sizeof_msg + path + payload in
|
|
||||||
let buf = Cstruct.create len in
|
|
||||||
set_msg_version buf msg.version;
|
|
||||||
set_msg_id buf msg.id;
|
|
||||||
set_msg_operation buf operation;
|
|
||||||
set_msg_path buf path;
|
|
||||||
set_msg_payload buf (Int32.of_int payload);
|
|
||||||
Cstruct.blit_from_bytes msg.path 0 buf sizeof_msg path;
|
|
||||||
Cstruct.blit_from_bytes msg.payload 0 buf (sizeof_msg+path) payload;
|
|
||||||
buf
|
|
||||||
|
|
||||||
let err e = Lwt.return (Error (`Msg (Fmt.to_to_string C.pp_error e)))
|
|
||||||
let err_eof = Lwt.return (Error `Eof)
|
|
||||||
|
|
||||||
let read fd =
|
|
||||||
let fd = C.create fd in
|
|
||||||
C.read_exactly fd ~len:4 >>= function
|
|
||||||
| Ok `Eof -> err_eof
|
|
||||||
| Error e -> err e
|
|
||||||
| Ok (`Data buf) ->
|
|
||||||
let buf = Cstruct.concat buf in
|
|
||||||
Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf);
|
|
||||||
let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in
|
|
||||||
C.read_exactly fd ~len >>= function
|
|
||||||
| Ok `Eof -> err_eof
|
|
||||||
| Error e -> err e
|
|
||||||
| Ok (`Data buf) ->
|
|
||||||
let buf = Cstruct.concat buf in
|
|
||||||
Lwt.return (of_cstruct buf)
|
|
||||||
|
|
||||||
let write fd msg =
|
|
||||||
let buf = to_cstruct msg in
|
|
||||||
let len =
|
|
||||||
let len = Cstruct.create 4 in
|
|
||||||
Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf);
|
|
||||||
len
|
|
||||||
in
|
|
||||||
IO.write fd len >>= function
|
|
||||||
| Error e -> failf "Query.write(len) %a" IO.pp_write_error e
|
|
||||||
| Ok () -> IO.write fd buf >>= function
|
|
||||||
| Ok () -> Lwt.return_unit
|
|
||||||
| Error e -> failf "Query.write(buf) %a" IO.pp_write_error e
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
module Reply = struct
|
module Request = struct
|
||||||
|
|
||||||
(* FIXME: this should probably be replaced by protobuf *)
|
type action =
|
||||||
|
| Write of string
|
||||||
|
| Read
|
||||||
|
| Delete
|
||||||
|
|
||||||
[%%cenum
|
let pp_action ppf = function
|
||||||
type status =
|
| Write s -> Fmt.pf ppf "write[%S]" s
|
||||||
| Ok
|
| Read -> Fmt.pf ppf "read"
|
||||||
| Error
|
| Delete -> Fmt.pf ppf "delete"
|
||||||
[@@uint8_t]
|
|
||||||
]
|
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
id : int32;
|
id : int32 Lazy.t;
|
||||||
status : status;
|
path : string list Lazy.t;
|
||||||
payload: string;
|
action: action Lazy.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
[%%cstruct type msg = {
|
let id t = Lazy.force t.id
|
||||||
id : uint32_t; (* session identifier *)
|
let path t = Lazy.force t.path
|
||||||
status : uint8_t; (* = type operation *)
|
let action t = Lazy.force t.action
|
||||||
payload: uint32_t;
|
|
||||||
} [@@little_endian]
|
|
||||||
]
|
|
||||||
|
|
||||||
type error = [ `Eof | `Msg of string ]
|
let pp_path = Fmt.(list ~sep:(unit "/") string)
|
||||||
let pp_error ppf t = Fmt.string ppf (match t with `Eof -> "EOF" | `Msg s -> s)
|
|
||||||
|
|
||||||
(* to avoid warning 32 *)
|
|
||||||
let _ = hexdump_msg
|
|
||||||
let _ = string_to_status
|
|
||||||
|
|
||||||
let pp ppf t =
|
let pp ppf t =
|
||||||
Fmt.pf ppf "%ld:%s:%S" t.id (status_to_string t.status) t.payload
|
let id = id t and path = path t and action = action t in
|
||||||
|
match action with
|
||||||
|
| exception Undefined_field i -> Fmt.pf ppf "<undefined-field: %d>" i
|
||||||
|
| action -> Fmt.pf ppf "%ld:%a:%a" id pp_path path pp_action action
|
||||||
|
|
||||||
(* FIXME: allocate less ... *)
|
let equal x y =
|
||||||
|
id x = id y && path x = path y && match action x = action y with
|
||||||
|
| exception Undefined_field _ -> false
|
||||||
|
| b -> b
|
||||||
|
|
||||||
let of_cstruct buf =
|
let v ~id ~path action =
|
||||||
let open Rresult.R in
|
{ id = lazy id; action = lazy action; path = lazy path }
|
||||||
Log.debug (fun l -> l "Message.of_cstruct %S" @@ Cstruct.to_string buf);
|
|
||||||
let id = get_msg_id buf in
|
let read e: (t, Endpoint.error) result Lwt.t =
|
||||||
(match int_to_status (get_msg_status buf) with
|
Endpoint.recv e >|= function
|
||||||
| None -> Error (`Msg "invalid operation")
|
| Error e -> Error e
|
||||||
| Some o -> Ok o)
|
| Ok `Eof -> Error (`IO `Closed)
|
||||||
>>= fun status ->
|
| Ok (`Data x) ->
|
||||||
let payload_len = Int32.to_int (get_msg_payload buf) in
|
let open P.Reader in
|
||||||
let payload =
|
let msg = Request.of_message x in
|
||||||
Cstruct.sub buf sizeof_msg payload_len
|
let id = lazy (Request.id_get msg) in
|
||||||
|> Cstruct.to_string
|
let path = lazy (Request.path_get_list msg) in
|
||||||
|
let action = lazy (match Request.get msg with
|
||||||
|
| Request.Write x -> Write x
|
||||||
|
| Request.Read -> Read
|
||||||
|
| Request.Delete -> Delete
|
||||||
|
| Request.Undefined i -> raise (Undefined_field i)
|
||||||
|
) in
|
||||||
|
Ok { id; path; action }
|
||||||
|
|
||||||
|
let write e t =
|
||||||
|
let open P.Builder in
|
||||||
|
match action t with
|
||||||
|
| exception Undefined_field i -> Lwt.return (Endpoint.err_undefined_field i)
|
||||||
|
| action ->
|
||||||
|
let msg =
|
||||||
|
let b = Request.init_root () in
|
||||||
|
Request.id_set b (id t);
|
||||||
|
ignore (Request.path_set_list b (path t));
|
||||||
|
(match action with
|
||||||
|
| Write x -> Request.write_set b x
|
||||||
|
| Read -> Request.read_set b
|
||||||
|
| Delete -> Request.delete_set b);
|
||||||
|
b
|
||||||
in
|
in
|
||||||
Ok { id; status; payload }
|
Endpoint.send e (Request.to_message msg)
|
||||||
|
|
||||||
let to_cstruct msg =
|
end
|
||||||
Log.debug (fun l -> l "Message.to_cstruct %a" pp msg);
|
|
||||||
let status = status_to_int msg.status in
|
|
||||||
let payload = String.length msg.payload in
|
|
||||||
let len = sizeof_msg + payload in
|
|
||||||
let buf = Cstruct.create len in
|
|
||||||
set_msg_id buf msg.id;
|
|
||||||
set_msg_status buf status;
|
|
||||||
set_msg_payload buf (Int32.of_int payload);
|
|
||||||
Cstruct.blit_from_bytes msg.payload 0 buf sizeof_msg payload;
|
|
||||||
buf
|
|
||||||
|
|
||||||
let err e = Lwt.return (Result.Error (`Msg (Fmt.to_to_string C.pp_error e)))
|
module Response = struct
|
||||||
let err_eof = Lwt.return (Result.Error `Eof)
|
|
||||||
|
|
||||||
let read fd =
|
type status = (string, string) result
|
||||||
let fd = C.create fd in
|
|
||||||
C.read_exactly fd ~len:4 >>= function
|
|
||||||
| Ok `Eof -> err_eof
|
|
||||||
| Error e -> err e
|
|
||||||
| Ok (`Data buf) ->
|
|
||||||
let buf = Cstruct.concat buf in
|
|
||||||
Log.debug (fun l -> l "Message.read len=%a" Cstruct.hexdump_pp buf);
|
|
||||||
let len = Cstruct.LE.get_uint32 buf 0 |> Int32.to_int in
|
|
||||||
C.read_exactly fd ~len >>= function
|
|
||||||
| Ok `Eof -> err_eof
|
|
||||||
| Error e -> err e
|
|
||||||
| Ok (`Data buf) ->
|
|
||||||
let buf = Cstruct.concat buf in
|
|
||||||
Lwt.return (of_cstruct buf)
|
|
||||||
|
|
||||||
let write fd msg =
|
let pp_status ppf = function
|
||||||
let buf = to_cstruct msg in
|
| Ok ok -> Fmt.pf ppf "ok:%S" ok
|
||||||
let len =
|
| Error e -> Fmt.pf ppf "error:%S" e
|
||||||
let len = Cstruct.create 4 in
|
|
||||||
Cstruct.LE.set_uint32 len 0 (Int32.of_int @@ Cstruct.len buf);
|
type t = {
|
||||||
len
|
id : int32 Lazy.t;
|
||||||
|
status: status Lazy.t;
|
||||||
|
}
|
||||||
|
|
||||||
|
let v ~id status = { id = lazy id; status = lazy status }
|
||||||
|
let id t = Lazy.force t.id
|
||||||
|
let status t = Lazy.force t.status
|
||||||
|
|
||||||
|
let pp ppf t = match status t with
|
||||||
|
| exception Undefined_field i -> Fmt.pf ppf "<undefined-field %d>" i
|
||||||
|
| s -> Fmt.pf ppf "%ld:%a" (id t) pp_status s
|
||||||
|
|
||||||
|
let equal x y =
|
||||||
|
id x = id y && match status x = status y with
|
||||||
|
| exception Undefined_field _ -> false
|
||||||
|
| b -> b
|
||||||
|
|
||||||
|
let read e: (t, Endpoint.error) result Lwt.t =
|
||||||
|
Endpoint.recv e >|= function
|
||||||
|
| Error e -> Error e
|
||||||
|
| Ok `Eof -> Error (`IO `Closed)
|
||||||
|
| Ok (`Data x) ->
|
||||||
|
let open P.Reader in
|
||||||
|
let msg = Response.of_message x in
|
||||||
|
let id = lazy (Response.id_get msg) in
|
||||||
|
let status = lazy (match Response.get msg with
|
||||||
|
| Response.Ok x -> Ok x
|
||||||
|
| Response.Error x -> Error x
|
||||||
|
| Response.Undefined i -> raise (Undefined_field i)
|
||||||
|
) in
|
||||||
|
Ok { id; status }
|
||||||
|
|
||||||
|
let write e t =
|
||||||
|
let open P.Builder in
|
||||||
|
match status t with
|
||||||
|
| exception Undefined_field i -> Lwt.return (Endpoint.err_undefined_field i)
|
||||||
|
| s ->
|
||||||
|
let msg =
|
||||||
|
let b = Response.init_root () in
|
||||||
|
Response.id_set b (id t);
|
||||||
|
(match s with
|
||||||
|
| Error s -> Response.error_set b s
|
||||||
|
| Ok s -> Response.ok_set b s);
|
||||||
|
b
|
||||||
in
|
in
|
||||||
IO.write fd len >>= function
|
Endpoint.send e (Response.to_message msg)
|
||||||
| Error e -> failf "Reply.write(len) %a" IO.pp_write_error e
|
|
||||||
| Ok () -> IO.write fd buf >>= function
|
|
||||||
| Ok () -> Lwt.return_unit
|
|
||||||
| Error e -> failf "Reply.write(buf) %a" IO.pp_write_error e
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -255,8 +237,6 @@ module Client = struct
|
|||||||
let n = ref 0l in
|
let n = ref 0l in
|
||||||
fun () -> n := Int32.succ !n; !n
|
fun () -> n := Int32.succ !n; !n
|
||||||
|
|
||||||
let version = 0l
|
|
||||||
|
|
||||||
type error = [`Msg of string]
|
type error = [`Msg of string]
|
||||||
let pp_error ppf (`Msg s) = Fmt.string ppf s
|
let pp_error ppf (`Msg s) = Fmt.string ppf s
|
||||||
|
|
||||||
@ -268,60 +248,64 @@ module Client = struct
|
|||||||
module Cache = Hashtbl.Make(K)
|
module Cache = Hashtbl.Make(K)
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
fd : IO.t;
|
e : Endpoint.t;
|
||||||
replies: Reply.t Cache.t;
|
replies: Response.t Cache.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
let v fd = { fd; replies = Cache.create 12 }
|
let v fd = { e = Endpoint.v fd; replies = Cache.create 12 }
|
||||||
|
let err e = Fmt.kstrf (fun e -> Error (`Msg e)) "%a" Endpoint.pp_error e
|
||||||
|
|
||||||
let call t query =
|
let call t r =
|
||||||
let id = query.Query.id in
|
let id = Request.id r in
|
||||||
Query.write t.fd query >>= fun () ->
|
Request.write t.e r >>= function
|
||||||
|
| Error e -> Lwt.return (err e)
|
||||||
|
| Ok () ->
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
try
|
try
|
||||||
let r = Cache.find t.replies id in
|
let r = Cache.find t.replies id in
|
||||||
Cache.remove t.replies id;
|
Cache.remove t.replies id;
|
||||||
Lwt.return r
|
Lwt.return r
|
||||||
with Not_found ->
|
with Not_found ->
|
||||||
Reply.read t.fd >>= function
|
Response.read t.e >>= function
|
||||||
| Error e ->
|
| Error e ->
|
||||||
Log.err (fun l -> l "Got %a while waiting for a reply to %ld"
|
Log.err (fun l -> l "Got %a while waiting for a reply to %ld"
|
||||||
Query.pp_error e id);
|
Endpoint.pp_error e id);
|
||||||
loop ()
|
loop ()
|
||||||
| Ok r ->
|
| Ok r ->
|
||||||
if r.id = id then Lwt.return r
|
let rid = Response.id r in
|
||||||
|
if rid = id then Lwt.return r
|
||||||
else (
|
else (
|
||||||
(* FIXME: maybe we want to check if id is not already
|
(* FIXME: maybe we want to check if id is not already
|
||||||
allocated *)
|
allocated *)
|
||||||
Cache.add t.replies r.id r;
|
Cache.add t.replies rid r;
|
||||||
loop ()
|
loop ()
|
||||||
)
|
)
|
||||||
in
|
in
|
||||||
loop () >|= fun r ->
|
loop () >|= fun r ->
|
||||||
assert (r.Reply.id = id);
|
assert (Response.id r = id);
|
||||||
match r.Reply.status with
|
match Response.status r with
|
||||||
| Ok -> Ok r.Reply.payload
|
| Ok s -> Ok s
|
||||||
| Error -> Error (`Msg r.Reply.payload)
|
| Error s -> Error (`Msg s)
|
||||||
|
|
||||||
let query operation path payload =
|
let request path action =
|
||||||
let id = new_id () in
|
let id = new_id () in
|
||||||
{ Query.version; id; operation; path; payload }
|
Request.v ~id ~path action
|
||||||
|
|
||||||
let read t path =
|
let read t path =
|
||||||
call t (query Read path "") >|= function
|
call t (request path Read) >|= function
|
||||||
| Ok x -> Ok (Some x)
|
| Ok x -> Ok (Some x)
|
||||||
| Error (`Msg e) ->
|
| Error e ->
|
||||||
if e = err_not_found then Ok None
|
if e = `Msg err_not_found then Ok None
|
||||||
else Error (`Msg e)
|
else Error e
|
||||||
|
|
||||||
let write t path v =
|
let write t path v =
|
||||||
call t (query Write path v) >|= function
|
call t (request path @@ Write v) >|= function
|
||||||
| Ok "" -> Ok ()
|
| Ok "" -> Ok ()
|
||||||
| Ok _ -> Error (`Msg "invalid return")
|
| Ok _ -> Error (`Msg "invalid return")
|
||||||
| Error _ as e -> e
|
| Error _ as e -> e
|
||||||
|
|
||||||
let delete t path =
|
let delete t path =
|
||||||
call t (query Delete path "") >|= function
|
call t (request path Delete) >|= function
|
||||||
| Ok "" -> Ok ()
|
| Ok "" -> Ok ()
|
||||||
| Ok _ -> Error (`Msg "invalid return")
|
| Ok _ -> Error (`Msg "invalid return")
|
||||||
| Error _ as e -> e
|
| Error _ as e -> e
|
||||||
@ -332,17 +316,9 @@ module Server = struct
|
|||||||
|
|
||||||
type op = [ `Read | `Write | `Delete ]
|
type op = [ `Read | `Write | `Delete ]
|
||||||
|
|
||||||
let ok q payload =
|
let ok q s = Response.v ~id:(Request.id q) (Ok s)
|
||||||
{ Reply.id = q.Query.id; status = Reply.Ok; payload }
|
let error q s = Response.v ~id:(Request.id q) (Error s)
|
||||||
|
let with_key q f = f (Request.path q)
|
||||||
let error q payload =
|
|
||||||
{ Reply.id = q.Query.id; status = Reply.Error; payload }
|
|
||||||
|
|
||||||
let with_key q f =
|
|
||||||
match KV.Key.of_string q.Query.path with
|
|
||||||
| Ok x -> f x
|
|
||||||
| Error (`Msg e) ->
|
|
||||||
Fmt.kstrf (fun msg -> Lwt.return (error q msg)) "invalid key: %s" e
|
|
||||||
|
|
||||||
let infof fmt =
|
let infof fmt =
|
||||||
Fmt.kstrf (fun msg () ->
|
Fmt.kstrf (fun msg () ->
|
||||||
@ -351,18 +327,20 @@ module Server = struct
|
|||||||
) fmt
|
) fmt
|
||||||
|
|
||||||
let not_allowed q =
|
let not_allowed q =
|
||||||
let path = q.Query.path in
|
let path = Request.path q in
|
||||||
let err = Fmt.strf "%s is not an allowed path" path in
|
let err = Fmt.strf "%a is not an allowed path" Request.pp_path path in
|
||||||
Log.err (fun l -> l "%ld: %s" q.Query.id path);
|
Log.err (fun l -> l "%ld: %a" (Request.id q) Request.pp_path path);
|
||||||
error q err
|
error q err
|
||||||
|
|
||||||
let dispatch db op q =
|
let dispatch db op q =
|
||||||
with_key q (fun key ->
|
with_key q (fun key ->
|
||||||
let can x = List.mem x op in
|
let can x = List.mem x op in
|
||||||
match q.Query.operation with
|
match Request.action q with
|
||||||
| Write when can `Write ->
|
| exception Undefined_field i ->
|
||||||
|
Fmt.kstrf (fun e -> Lwt.return (error q e)) "undefined field %i" i
|
||||||
|
| Write s when can `Write ->
|
||||||
let info = infof "Updating %a" KV.Key.pp key in
|
let info = infof "Updating %a" KV.Key.pp key in
|
||||||
KV.set db ~info key q.payload >|= fun () ->
|
KV.set db ~info key s >|= fun () ->
|
||||||
ok q ""
|
ok q ""
|
||||||
| Delete when can `Delete ->
|
| Delete when can `Delete ->
|
||||||
let info = infof "Removing %a" KV.Key.pp key in
|
let info = infof "Removing %a" KV.Key.pp key in
|
||||||
@ -379,11 +357,14 @@ module Server = struct
|
|||||||
Log.debug (fun l -> l "Serving the control state over %a" IO.pp fd);
|
Log.debug (fun l -> l "Serving the control state over %a" IO.pp fd);
|
||||||
let queries = Queue.create () in
|
let queries = Queue.create () in
|
||||||
let cond = Lwt_condition.create () in
|
let cond = Lwt_condition.create () in
|
||||||
|
let e = Endpoint.v fd in
|
||||||
let rec listen () =
|
let rec listen () =
|
||||||
Query.read fd >>= function
|
Request.read e >>= function
|
||||||
| Error `Eof -> Lwt.return_unit
|
| Error (`Channel _ | `IO _ as e) ->
|
||||||
| Error (`Msg e) ->
|
Log.err (fun l -> l "fatal error: %a" Endpoint.pp_error e);
|
||||||
Log.err (fun l -> l "received invalid message: %s" e);
|
Lwt.return_unit
|
||||||
|
| Error (`Msg _ | `Undefined_field _ as e) ->
|
||||||
|
Log.err (fun l -> l "transient error: %a" Endpoint.pp_error e);
|
||||||
listen ()
|
listen ()
|
||||||
| Ok q ->
|
| Ok q ->
|
||||||
Queue.add q queries;
|
Queue.add q queries;
|
||||||
@ -393,14 +374,17 @@ module Server = struct
|
|||||||
let rec process () =
|
let rec process () =
|
||||||
Lwt_condition.wait cond >>= fun () ->
|
Lwt_condition.wait cond >>= fun () ->
|
||||||
let q = Queue.pop queries in
|
let q = Queue.pop queries in
|
||||||
let path = q.Query.path in
|
let path = Request.path q in
|
||||||
(if List.mem_assoc path routes then (
|
(if List.mem_assoc path routes then (
|
||||||
let op = List.assoc path routes in
|
let op = List.assoc path routes in
|
||||||
dispatch db op q >>= fun r ->
|
dispatch db op q >>= fun r ->
|
||||||
Reply.write fd r
|
Response.write e r
|
||||||
) else (
|
) else (
|
||||||
Reply.write fd (not_allowed q)
|
Response.write e (not_allowed q)
|
||||||
)) >>= fun () ->
|
)) >>= function
|
||||||
|
| Ok () -> process ()
|
||||||
|
| Error e ->
|
||||||
|
Log.err (fun l -> l "%a" Endpoint.pp_error e);
|
||||||
process ()
|
process ()
|
||||||
in
|
in
|
||||||
Lwt.pick [
|
Lwt.pick [
|
||||||
|
@ -1,81 +1,102 @@
|
|||||||
(** [Control] handle the server part of the control path, running in
|
(** [Control] handle the server part of the control path, running in
|
||||||
the privileged container. *)
|
the privileged container. *)
|
||||||
|
|
||||||
module Query: sig
|
|
||||||
|
|
||||||
(** The type for operations. *)
|
exception Undefined_field of int
|
||||||
type operation =
|
|
||||||
| Write
|
|
||||||
| Read
|
|
||||||
| Delete
|
|
||||||
|
|
||||||
(** The type for control plane queries. *)
|
module Endpoint: sig
|
||||||
type t = {
|
|
||||||
version : int32; (** Protocol version. *)
|
|
||||||
id : int32; (** Session identifier. *)
|
|
||||||
operation: operation;
|
|
||||||
path : string; (** Should be only valid ASCII. *)
|
|
||||||
payload : string; (** Arbitrary payload. *)
|
|
||||||
}
|
|
||||||
|
|
||||||
type error = [ `Eof | `Msg of string ]
|
type t
|
||||||
(** The type of errors. *)
|
(** The type for SDK endpoints. *)
|
||||||
|
|
||||||
val pp_error: error Fmt.t
|
val v: IO.t ->t
|
||||||
(** [pp_error] is the pretty-printer for query errors. *)
|
(** [v f] is a fresh endpoint state built on top of the flow [f]. *)
|
||||||
|
|
||||||
val pp: t Fmt.t
|
(** The type for endpoint errors. *)
|
||||||
(** [pp] is the pretty-printer for queries. *)
|
type error = private [>
|
||||||
|
| `IO of IO.write_error
|
||||||
val of_cstruct: Cstruct.t -> (t, [`Msg of string]) result
|
| `Msg of string
|
||||||
(** [of_cstruct buf] is the query [t] such that the serialization of
|
| `Undefined_field of int
|
||||||
[t] is [buf]. *)
|
]
|
||||||
|
|
||||||
val to_cstruct: t -> Cstruct.t
|
|
||||||
(** [to_cstruct t] is the serialization of [t]. *)
|
|
||||||
|
|
||||||
val write: IO.flow -> t -> unit Lwt.t
|
|
||||||
(** [write fd t] writes a query message. *)
|
|
||||||
|
|
||||||
val read: IO.flow -> (t, error) result Lwt.t
|
|
||||||
(** [read fd] reads a query message. *)
|
|
||||||
|
|
||||||
end
|
|
||||||
|
|
||||||
module Reply: sig
|
|
||||||
|
|
||||||
(** The type for status. *)
|
|
||||||
type status =
|
|
||||||
| Ok
|
|
||||||
| Error
|
|
||||||
|
|
||||||
(** The type for control plane replies. *)
|
|
||||||
type t = {
|
|
||||||
id : int32; (** Session identifier. *)
|
|
||||||
status : status; (** Status of the operation. *)
|
|
||||||
payload: string; (** Arbitrary payload. *)
|
|
||||||
}
|
|
||||||
|
|
||||||
val pp: t Fmt.t
|
|
||||||
(** [pp] is the pretty-printer for replies. *)
|
|
||||||
|
|
||||||
val of_cstruct: Cstruct.t -> (t, [`Msg of string]) result
|
|
||||||
(** [of_cstruct buf] is the reply [t] such that the serialization of
|
|
||||||
[t] is [buf]. *)
|
|
||||||
|
|
||||||
val to_cstruct: t -> Cstruct.t
|
|
||||||
(** [to_cstruct t] is the serialization of [t]. *)
|
|
||||||
|
|
||||||
type error = [`Eof | `Msg of string ]
|
|
||||||
(** The type for reply errors. *)
|
|
||||||
|
|
||||||
val pp_error: error Fmt.t
|
val pp_error: error Fmt.t
|
||||||
(** [pp_error] is the pretty-printer for errors. *)
|
(** [pp_error] is the pretty-printer for errors. *)
|
||||||
|
|
||||||
val write: IO.flow -> t -> unit Lwt.t
|
end
|
||||||
|
|
||||||
|
module Request: sig
|
||||||
|
|
||||||
|
type t
|
||||||
|
(** The type for SDK requests. *)
|
||||||
|
|
||||||
|
(** The type for request actions. *)
|
||||||
|
type action =
|
||||||
|
| Write of string
|
||||||
|
| Read
|
||||||
|
| Delete
|
||||||
|
|
||||||
|
val pp: t Fmt.t
|
||||||
|
(** [pp] is the pretty-printer for requests. *)
|
||||||
|
|
||||||
|
val equal: t -> t -> bool
|
||||||
|
(** [equal] is the equality function for requests. *)
|
||||||
|
|
||||||
|
val pp_action: action Fmt.t
|
||||||
|
(** [pp_action] is the pretty-printer for request actions. *)
|
||||||
|
|
||||||
|
val action: t -> action
|
||||||
|
(** [action t] is [t]'s requested operation. Can raise
|
||||||
|
[Endpoint.Undefined_field]. *)
|
||||||
|
|
||||||
|
val path: t -> string list
|
||||||
|
(** [path t] is the [t]'s request path. *)
|
||||||
|
|
||||||
|
val id: t -> int32
|
||||||
|
(** [id t] it [t]'s request id. *)
|
||||||
|
|
||||||
|
val v: id:int32 -> path:string list -> action -> t
|
||||||
|
(** [v ~id ~path action] is a new request. *)
|
||||||
|
|
||||||
|
val write: Endpoint.t -> t -> (unit, Endpoint.error) result Lwt.t
|
||||||
|
(** [write e t] writes a request message for the
|
||||||
|
action [action] and the path [path] using the unique ID [id]. *)
|
||||||
|
|
||||||
|
val read: Endpoint.t -> (t, Endpoint.error) result Lwt.t
|
||||||
|
(** [read e] reads a query message. *)
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
module Response: sig
|
||||||
|
|
||||||
|
type t
|
||||||
|
(** The type for responses. *)
|
||||||
|
|
||||||
|
(** The type for response status. *)
|
||||||
|
type status = (string, string) result
|
||||||
|
|
||||||
|
val pp: t Fmt.t
|
||||||
|
(** [pp] is the pretty-printer for responses. *)
|
||||||
|
|
||||||
|
val equal: t -> t -> bool
|
||||||
|
(** [equal] is the equality function for responses. *)
|
||||||
|
|
||||||
|
val pp_status: status Fmt.t
|
||||||
|
(** [pp_status] is the pretty-printer for response statuses. *)
|
||||||
|
|
||||||
|
val status: t -> status
|
||||||
|
(** [status t] is [t]'s response status. Can raise
|
||||||
|
[Endpoint.Undefined_field]. *)
|
||||||
|
|
||||||
|
val id: t -> int32
|
||||||
|
(** [id t] is [t]'s response ID. *)
|
||||||
|
|
||||||
|
val v: id:int32 -> status -> t
|
||||||
|
(** [v ~id status] is a new response. *)
|
||||||
|
|
||||||
|
val write: Endpoint.t -> t -> (unit, Endpoint.error) result Lwt.t
|
||||||
(** [write fd t] writes a reply message. *)
|
(** [write fd t] writes a reply message. *)
|
||||||
|
|
||||||
val read: IO.flow -> (t, error) result Lwt.t
|
val read: Endpoint.t -> (t, Endpoint.error) result Lwt.t
|
||||||
(** [read fd] reads a reply message. *)
|
(** [read fd] reads a reply message. *)
|
||||||
|
|
||||||
end
|
end
|
||||||
@ -103,16 +124,16 @@ module Client: sig
|
|||||||
server. A client state also stores some state for all the
|
server. A client state also stores some state for all the
|
||||||
incomplete client queries. *)
|
incomplete client queries. *)
|
||||||
|
|
||||||
val read: t -> string -> (string option, error) result Lwt.t
|
val read: t -> string list -> (string option, error) result Lwt.t
|
||||||
(** [read t k] is the value associated with the key [k] in the
|
(** [read t k] is the value associated with the key [k] in the
|
||||||
control plane state. Return [None] if no value is associated to
|
control plane state. Return [None] if no value is associated to
|
||||||
[k]. *)
|
[k]. *)
|
||||||
|
|
||||||
val write: t -> string -> string -> (unit, error) result Lwt.t
|
val write: t -> string list -> string -> (unit, error) result Lwt.t
|
||||||
(** [write t p v] associates [v] to the key [k] in the control plane
|
(** [write t p v] associates [v] to the key [k] in the control plane
|
||||||
state. *)
|
state. *)
|
||||||
|
|
||||||
val delete: t -> string -> (unit, error) result Lwt.t
|
val delete: t -> string list -> (unit, error) result Lwt.t
|
||||||
(** [delete t k] remove [k]'s binding in the control plane state. *)
|
(** [delete t k] remove [k]'s binding in the control plane state. *)
|
||||||
|
|
||||||
end
|
end
|
||||||
@ -129,7 +150,7 @@ module Server: sig
|
|||||||
type op = [ `Read | `Write | `Delete ]
|
type op = [ `Read | `Write | `Delete ]
|
||||||
(** The type for operations to perform on routes. *)
|
(** The type for operations to perform on routes. *)
|
||||||
|
|
||||||
val listen: routes:(string * op list) list -> KV.t -> IO.t -> unit Lwt.t
|
val listen: routes:(string list * op list) list -> KV.t -> IO.t -> unit Lwt.t
|
||||||
(** [listen ~routes kv fd] is the thread exposing the KV store [kv],
|
(** [listen ~routes kv fd] is the thread exposing the KV store [kv],
|
||||||
holding control plane state, running inside the privileged
|
holding control plane state, running inside the privileged
|
||||||
container. [routes] are the routes exposed by the server to the
|
container. [routes] are the routes exposed by the server to the
|
||||||
|
@ -5,6 +5,14 @@
|
|||||||
(libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty
|
(libraries (threads cstruct.lwt cmdliner fmt.cli logs.fmt logs.cli fmt.tty
|
||||||
decompress irmin irmin-git lwt.unix rawlink tuntap dispatch
|
decompress irmin irmin-git lwt.unix rawlink tuntap dispatch
|
||||||
irmin-watcher inotify astring rresult mirage-flow-lwt
|
irmin-watcher inotify astring rresult mirage-flow-lwt
|
||||||
mirage-channel-lwt io-page.unix ipaddr))
|
mirage-channel-lwt io-page.unix ipaddr capnp))
|
||||||
(preprocess (per_file ((pps (cstruct.ppx)) (ctl))))
|
|
||||||
))
|
))
|
||||||
|
|
||||||
|
;(rule
|
||||||
|
; ((targets (proto.ml proto.mli))
|
||||||
|
; (deps (proto.capnp))
|
||||||
|
; (action (progn
|
||||||
|
; (run capnp compile -o ocaml ${<})
|
||||||
|
; (system "mv proto.ml proto.ml.in")
|
||||||
|
; (system "echo '[@@@ocaml.warning \"-A\"]\n' > proto.ml")
|
||||||
|
; (system "cat proto.ml.in >> proto.ml")))))
|
||||||
|
19
projects/miragesdk/src/sdk/proto.capnp
Normal file
19
projects/miragesdk/src/sdk/proto.capnp
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
@0x9e83562906de8259;
|
||||||
|
|
||||||
|
struct Request {
|
||||||
|
id @0 :Int32;
|
||||||
|
path @1 :List(Text);
|
||||||
|
union {
|
||||||
|
write @2 :Data;
|
||||||
|
read @3 :Void;
|
||||||
|
delete @4 :Void;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Response {
|
||||||
|
id @0: Int32;
|
||||||
|
union {
|
||||||
|
ok @1 :Data;
|
||||||
|
error @2 :Data;
|
||||||
|
}
|
||||||
|
}
|
4368
projects/miragesdk/src/sdk/proto.ml
Normal file
4368
projects/miragesdk/src/sdk/proto.ml
Normal file
File diff suppressed because it is too large
Load Diff
109
projects/miragesdk/src/sdk/proto.mli
Normal file
109
projects/miragesdk/src/sdk/proto.mli
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
type ro = Capnp.Message.ro
|
||||||
|
type rw = Capnp.Message.rw
|
||||||
|
|
||||||
|
module type S = sig
|
||||||
|
type 'cap message_t
|
||||||
|
|
||||||
|
type reader_t_Request_14112192289179464829
|
||||||
|
type builder_t_Request_14112192289179464829
|
||||||
|
type reader_t_Response_16897334327181152309
|
||||||
|
type builder_t_Response_16897334327181152309
|
||||||
|
|
||||||
|
module Reader : sig
|
||||||
|
type array_t
|
||||||
|
type builder_array_t
|
||||||
|
type pointer_t
|
||||||
|
module Response : sig
|
||||||
|
type t = reader_t_Response_16897334327181152309
|
||||||
|
type builder_t = builder_t_Response_16897334327181152309
|
||||||
|
type unnamed_union_t =
|
||||||
|
| Ok of string
|
||||||
|
| Error of string
|
||||||
|
| Undefined of int
|
||||||
|
val get : t -> unnamed_union_t
|
||||||
|
val id_get : t -> int32
|
||||||
|
val id_get_int_exn : t -> int
|
||||||
|
val of_message : 'cap message_t -> t
|
||||||
|
val of_builder : builder_t -> t
|
||||||
|
end
|
||||||
|
module Request : sig
|
||||||
|
type t = reader_t_Request_14112192289179464829
|
||||||
|
type builder_t = builder_t_Request_14112192289179464829
|
||||||
|
type unnamed_union_t =
|
||||||
|
| Write of string
|
||||||
|
| Read
|
||||||
|
| Delete
|
||||||
|
| Undefined of int
|
||||||
|
val get : t -> unnamed_union_t
|
||||||
|
val id_get : t -> int32
|
||||||
|
val id_get_int_exn : t -> int
|
||||||
|
val has_path : t -> bool
|
||||||
|
val path_get : t -> (ro, string, array_t) Capnp.Array.t
|
||||||
|
val path_get_list : t -> string list
|
||||||
|
val path_get_array : t -> string array
|
||||||
|
val of_message : 'cap message_t -> t
|
||||||
|
val of_builder : builder_t -> t
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
module Builder : sig
|
||||||
|
type array_t = Reader.builder_array_t
|
||||||
|
type reader_array_t = Reader.array_t
|
||||||
|
type pointer_t
|
||||||
|
module Response : sig
|
||||||
|
type t = builder_t_Response_16897334327181152309
|
||||||
|
type reader_t = reader_t_Response_16897334327181152309
|
||||||
|
type unnamed_union_t =
|
||||||
|
| Ok of string
|
||||||
|
| Error of string
|
||||||
|
| Undefined of int
|
||||||
|
val get : t -> unnamed_union_t
|
||||||
|
val ok_set : t -> string -> unit
|
||||||
|
val error_set : t -> string -> unit
|
||||||
|
val id_get : t -> int32
|
||||||
|
val id_get_int_exn : t -> int
|
||||||
|
val id_set : t -> int32 -> unit
|
||||||
|
val id_set_int_exn : t -> int -> unit
|
||||||
|
val of_message : rw message_t -> t
|
||||||
|
val to_message : t -> rw message_t
|
||||||
|
val to_reader : t -> reader_t
|
||||||
|
val init_root : ?message_size:int -> unit -> t
|
||||||
|
end
|
||||||
|
module Request : sig
|
||||||
|
type t = builder_t_Request_14112192289179464829
|
||||||
|
type reader_t = reader_t_Request_14112192289179464829
|
||||||
|
type unnamed_union_t =
|
||||||
|
| Write of string
|
||||||
|
| Read
|
||||||
|
| Delete
|
||||||
|
| Undefined of int
|
||||||
|
val get : t -> unnamed_union_t
|
||||||
|
val write_set : t -> string -> unit
|
||||||
|
val read_set : t -> unit
|
||||||
|
val delete_set : t -> unit
|
||||||
|
val id_get : t -> int32
|
||||||
|
val id_get_int_exn : t -> int
|
||||||
|
val id_set : t -> int32 -> unit
|
||||||
|
val id_set_int_exn : t -> int -> unit
|
||||||
|
val has_path : t -> bool
|
||||||
|
val path_get : t -> (rw, string, array_t) Capnp.Array.t
|
||||||
|
val path_get_list : t -> string list
|
||||||
|
val path_get_array : t -> string array
|
||||||
|
val path_set : t -> (rw, string, array_t) Capnp.Array.t -> (rw, string, array_t) Capnp.Array.t
|
||||||
|
val path_set_list : t -> string list -> (rw, string, array_t) Capnp.Array.t
|
||||||
|
val path_set_array : t -> string array -> (rw, string, array_t) Capnp.Array.t
|
||||||
|
val path_init : t -> int -> (rw, string, array_t) Capnp.Array.t
|
||||||
|
val of_message : rw message_t -> t
|
||||||
|
val to_message : t -> rw message_t
|
||||||
|
val to_reader : t -> reader_t
|
||||||
|
val init_root : ?message_size:int -> unit -> t
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
module Make (MessageWrapper : Capnp.MessageSig.S) :
|
||||||
|
(S with type 'cap message_t = 'cap MessageWrapper.Message.t
|
||||||
|
and type Reader.pointer_t = ro MessageWrapper.Slice.t option
|
||||||
|
and type Builder.pointer_t = rw MessageWrapper.Slice.t
|
||||||
|
)
|
||||||
|
|
@ -93,119 +93,107 @@ let test_socketpair pipe () =
|
|||||||
|
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
let query = Alcotest.testable Ctl.Query.pp (=)
|
let request = Alcotest.testable Ctl.Request.pp Ctl.Request.equal
|
||||||
let reply = Alcotest.testable Ctl.Reply.pp (=)
|
let response = Alcotest.testable Ctl.Response.pp Ctl.Response.equal
|
||||||
|
|
||||||
let queries =
|
let queries =
|
||||||
let open Ctl.Query in
|
let open Ctl.Request in
|
||||||
[
|
[
|
||||||
{ version = 0l; id = 0l; operation = Read; path = "/foo/bar"; payload = "" };
|
v ~id:0l ~path:["foo";"bar"] Read;
|
||||||
{ version = Int32.max_int; id = Int32.max_int; operation = Write ; path = ""; payload = "foo" };
|
v ~id:Int32.max_int ~path:[] (Write "foo");
|
||||||
{ version = 1l;id = 0l; operation = Delete; path = ""; payload = "" };
|
v ~id:0l ~path:[] Delete;
|
||||||
{ version = -2l; id = -3l; operation = Delete; path = "foo"; payload = "foo" };
|
v ~id:(-3l) ~path:["foo"] Delete;
|
||||||
]
|
]
|
||||||
|
|
||||||
let replies =
|
let replies =
|
||||||
let open Ctl.Reply in
|
let open Ctl.Response in
|
||||||
[
|
[
|
||||||
{ id = 0l; status = Ok; payload = "" };
|
v ~id:0l (Ok "");
|
||||||
{ id = Int32.max_int; status = Ok; payload = "foo" };
|
v ~id:Int32.max_int (Ok "foo");
|
||||||
{ id = 0l; status = Error; payload = "" };
|
v ~id:0l (Error "");
|
||||||
{ id = -3l; status = Error; payload = "foo" };
|
v ~id:(-3l) (Error "foo");
|
||||||
]
|
]
|
||||||
|
|
||||||
let test_serialization to_cstruct of_cstruct message messages =
|
let failf fmt = Fmt.kstrf Alcotest.fail fmt
|
||||||
let test m =
|
|
||||||
let buf = to_cstruct m in
|
|
||||||
match of_cstruct buf with
|
|
||||||
| Ok m' -> Alcotest.(check message) "to_cstruct/of_cstruct" m m'
|
|
||||||
| Error (`Msg e) -> Alcotest.fail ("Message.of_cstruct: " ^ e)
|
|
||||||
in
|
|
||||||
List.iter test messages
|
|
||||||
|
|
||||||
let test_send t write read message pp_error messages =
|
let test_send t write read message messages =
|
||||||
let calf = calf Init.Pipe.(ctl t) in
|
let calf = Ctl.Endpoint.v @@ calf Init.Pipe.(ctl t) in
|
||||||
let priv = priv Init.Pipe.(ctl t) in
|
let priv = Ctl.Endpoint.v @@ priv Init.Pipe.(ctl t) in
|
||||||
let test m =
|
let test m =
|
||||||
write calf m >>= fun () ->
|
write calf m >>= function
|
||||||
|
| Error e -> failf "Message.write: %a" Ctl.Endpoint.pp_error e
|
||||||
|
| Ok () ->
|
||||||
read priv >|= function
|
read priv >|= function
|
||||||
| Ok m' -> Alcotest.(check message) "write/read" m m'
|
| Ok m' -> Alcotest.(check message) "write/read" m m'
|
||||||
| Error e -> Fmt.kstrf Alcotest.fail "Message.read: %a" pp_error e
|
| Error e -> failf "Message.read: %a" Ctl.Endpoint.pp_error e
|
||||||
in
|
in
|
||||||
Lwt_list.iter_s test messages
|
Lwt_list.iter_s test messages
|
||||||
|
|
||||||
let test_query_serialization () =
|
let test_request_send t () =
|
||||||
let open Ctl.Query in
|
let open Ctl.Request in
|
||||||
test_serialization to_cstruct of_cstruct query queries
|
test_send t write read request queries
|
||||||
|
|
||||||
let test_reply_serialization () =
|
let test_response_send t () =
|
||||||
let open Ctl.Reply in
|
let open Ctl.Response in
|
||||||
test_serialization to_cstruct of_cstruct reply replies
|
test_send t write read response replies
|
||||||
|
|
||||||
let test_query_send t () =
|
|
||||||
let open Ctl.Query in
|
|
||||||
test_send t write read query pp_error queries
|
|
||||||
|
|
||||||
let test_reply_send t () =
|
|
||||||
let open Ctl.Reply in
|
|
||||||
test_send t write read reply pp_error replies
|
|
||||||
|
|
||||||
let failf fmt = Fmt.kstrf Alcotest.fail fmt
|
let failf fmt = Fmt.kstrf Alcotest.fail fmt
|
||||||
|
|
||||||
(* read ops *)
|
(* read ops *)
|
||||||
|
|
||||||
let pp_error = Ctl.Client.pp_error
|
let pp_error = Ctl.Client.pp_error
|
||||||
|
let pp_path = Fmt.(Dump.list string)
|
||||||
|
|
||||||
let read_should_err t k =
|
let read_should_err t k =
|
||||||
Ctl.Client.read t k >|= function
|
Ctl.Client.read t k >|= function
|
||||||
| Error _ -> ()
|
| Error _ -> ()
|
||||||
| Ok None -> failf "read(%s) -> got: none, expected: err" k
|
| Ok None -> failf "read(%a) -> got: none, expected: err" pp_path k
|
||||||
| Ok Some v -> failf "read(%s) -> got: found:%S, expected: err" k v
|
| Ok Some v -> failf "read(%a) -> got: found:%S, expected: err" pp_path k v
|
||||||
|
|
||||||
let read_should_none t k =
|
let read_should_none t k =
|
||||||
Ctl.Client.read t k >|= function
|
Ctl.Client.read t k >|= function
|
||||||
| Error e -> failf "read(%s) -> got: error:%a, expected none" k pp_error e
|
| Error e -> failf "read(%a) -> got: error:%a, expected none" pp_path k pp_error e
|
||||||
| Ok None -> ()
|
| Ok None -> ()
|
||||||
| Ok Some v -> failf "read(%s) -> got: found:%S, expected none" k v
|
| Ok Some v -> failf "read(%a) -> got: found:%S, expected none" pp_path k v
|
||||||
|
|
||||||
let read_should_work t k v =
|
let read_should_work t k v =
|
||||||
Ctl.Client.read t k >|= function
|
Ctl.Client.read t k >|= function
|
||||||
| Error e -> failf "read(%s) -> got: error:%a, expected ok" k pp_error e
|
| Error e -> failf "read(%a) -> got: error:%a, expected ok" pp_path k pp_error e
|
||||||
| Ok None -> failf "read(%s) -> got: none, expected ok" k
|
| Ok None -> failf "read(%a) -> got: none, expected ok" pp_path k
|
||||||
| Ok Some v' ->
|
| Ok Some v' ->
|
||||||
if v <> v' then failf "read(%s) -> got: ok:%S, expected: ok:%S" k v' v
|
if v <> v' then failf "read(%a) -> got: ok:%S, expected: ok:%S" pp_path k v' v
|
||||||
|
|
||||||
(* write ops *)
|
(* write ops *)
|
||||||
|
|
||||||
let write_should_err t k v =
|
let write_should_err t k v =
|
||||||
Ctl.Client.write t k v >|= function
|
Ctl.Client.write t k v >|= function
|
||||||
| Ok () -> failf "write(%s) -> ok" k
|
| Ok () -> failf "write(%a) -> ok" pp_path k
|
||||||
| Error _ -> ()
|
| Error _ -> ()
|
||||||
|
|
||||||
let write_should_work t k v =
|
let write_should_work t k v =
|
||||||
Ctl.Client.write t k v >|= function
|
Ctl.Client.write t k v >|= function
|
||||||
| Ok () -> ()
|
| Ok () -> ()
|
||||||
| Error e -> failf "write(%s) -> error: %a" k pp_error e
|
| Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e
|
||||||
|
|
||||||
(* del ops *)
|
(* del ops *)
|
||||||
|
|
||||||
let delete_should_err t k =
|
let delete_should_err t k =
|
||||||
Ctl.Client.delete t k >|= function
|
Ctl.Client.delete t k >|= function
|
||||||
| Ok () -> failf "del(%s) -> ok" k
|
| Ok () -> failf "del(%a) -> ok" pp_path k
|
||||||
| Error _ -> ()
|
| Error _ -> ()
|
||||||
|
|
||||||
let delete_should_work t k =
|
let delete_should_work t k =
|
||||||
Ctl.Client.delete t k >|= function
|
Ctl.Client.delete t k >|= function
|
||||||
| Ok () -> ()
|
| Ok () -> ()
|
||||||
| Error e -> failf "write(%s) -> error: %a" k pp_error e
|
| Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e
|
||||||
|
|
||||||
let test_ctl t () =
|
let test_ctl t () =
|
||||||
let calf = calf Init.Pipe.(ctl t) in
|
let calf = calf Init.Pipe.(ctl t) in
|
||||||
let priv = priv Init.Pipe.(ctl t) in
|
let priv = priv Init.Pipe.(ctl t) in
|
||||||
let k1 = "/foo/bar" in
|
let k1 = ["foo"; "bar"] in
|
||||||
let k2 = "a" in
|
let k2 = ["a"] in
|
||||||
let k3 = "b/c" in
|
let k3 = ["b"; "c"] in
|
||||||
let k4 = "xxxxxx" in
|
let k4 = ["xxxxxx"] in
|
||||||
let all = [`Read; `Write; `Delete] in
|
let all = [`Read; `Write; `Delete] in
|
||||||
let routes = [k1,all; k2,all; k3,all ] in
|
let routes = [k1,all; k2,all; k3,all ] in
|
||||||
let git_root = "/tmp/sdk/ctl" in
|
let git_root = "/tmp/sdk/ctl" in
|
||||||
@ -219,8 +207,7 @@ let test_ctl t () =
|
|||||||
read_should_none t k >>= fun () ->
|
read_should_none t k >>= fun () ->
|
||||||
write_should_work t k v >>= fun () ->
|
write_should_work t k v >>= fun () ->
|
||||||
read_should_work t k v >>= fun () ->
|
read_should_work t k v >>= fun () ->
|
||||||
let path = String.cuts ~empty:false ~sep:"/" k in
|
Ctl.KV.get ctl k >|= fun v' ->
|
||||||
Ctl.KV.get ctl path >|= fun v' ->
|
|
||||||
Alcotest.(check string) "in the db" v v'
|
Alcotest.(check string) "in the db" v v'
|
||||||
in
|
in
|
||||||
let disallowed k v =
|
let disallowed k v =
|
||||||
@ -281,10 +268,8 @@ let test = [
|
|||||||
"stdout is a pipe" , `Quick, run (test_pipe Init.Pipe.(stderr t));
|
"stdout is a pipe" , `Quick, run (test_pipe Init.Pipe.(stderr t));
|
||||||
"net is a socket pair", `Quick, run (test_socketpair Init.Pipe.(net t));
|
"net is a socket pair", `Quick, run (test_socketpair Init.Pipe.(net t));
|
||||||
"ctl is a socket pair", `Quick, run (test_socketpair Init.Pipe.(ctl t));
|
"ctl is a socket pair", `Quick, run (test_socketpair Init.Pipe.(ctl t));
|
||||||
"seralize queries" , `Quick, test_query_serialization;
|
"send requests" , `Quick, run (test_request_send t);
|
||||||
"seralize replies" , `Quick, test_reply_serialization;
|
"send responses" , `Quick, run (test_response_send t);
|
||||||
"send queries" , `Quick, run (test_query_send t);
|
|
||||||
"send replies" , `Quick, run (test_reply_send t);
|
|
||||||
"ctl" , `Quick, run (test_ctl t);
|
"ctl" , `Quick, run (test_ctl t);
|
||||||
"exec" , `Quick, run test_exec;
|
"exec" , `Quick, run test_exec;
|
||||||
]
|
]
|
||||||
|
Loading…
Reference in New Issue
Block a user