sdk: replace custom transport protocol by Capnproto

Initial patch by @talex5

Signed-off-by: Thomas Gazagnaire <thomas@gazagnaire.org>
This commit is contained in:
Thomas Gazagnaire 2017-06-14 11:10:04 +01:00
parent a914c0cd2b
commit 03cd4d6fd3
13 changed files with 190 additions and 578 deletions

View File

@ -5,7 +5,7 @@ FROM alpine:3.5 as capnp
RUN mkdir -p /src RUN mkdir -p /src
RUN apk update && apk add autoconf automake libtool linux-headers git g++ make RUN apk update && apk add autoconf automake libtool linux-headers git g++ make
RUN cd /src && git clone -b v0.6.0 https://github.com/sandstorm-io/capnproto.git RUN cd /src && git clone https://github.com/sandstorm-io/capnproto.git
WORKDIR /src/capnproto/c++ WORKDIR /src/capnproto/c++
RUN ./setup-autotools.sh RUN ./setup-autotools.sh
RUN autoreconf -i RUN autoreconf -i
@ -17,31 +17,40 @@ RUN which capnp
### SDK ### SDK
FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 as sdk FROM ocaml/opam@sha256:1e1d7fafbfd461bf684b5e11213c85a71fec93577455285e5d82d902ffad91d2 as sdk
#FROM ocaml/opam:alpine-3.5_ocaml-4.04.0 as sdk
COPY --from=capnp /usr/local/bin/capnp /usr/local/bin/ COPY --from=capnp /usr/local/bin/capnp /usr/local/bin/
COPY --from=capnp /usr/local/lib/libcapnpc-0.6.0.so /usr/local/lib/ COPY --from=capnp /usr/local/bin/capnpc /usr/local/bin/
COPY --from=capnp /usr/local/lib/libcapnp-0.6.0.so /usr/local/lib/ COPY --from=capnp /usr/local/lib/libcapnpc-0.7-dev.so /usr/local/lib/
COPY --from=capnp /usr/local/lib/libkj-0.6.0.so /usr/local/lib/ COPY --from=capnp /usr/local/lib/libcapnp-0.7-dev.so /usr/local/lib/
COPY --from=capnp /usr/local/lib/libkj-0.7-dev.so /usr/local/lib/
RUN sudo mkdir -p /src RUN sudo mkdir -p /src
USER opam USER opam
WORKDIR /src WORKDIR /src
RUN git -C /home/opam/opam-repository pull && opam update -u RUN git -C /home/opam/opam-repository fetch origin && \
RUN opam pin add jbuilder 1.0+beta10 -n git -C /home/opam/opam-repository reset ad921dfa87c2e201ef54806d0367aaacce75bc62 --hard && \
opam update -u
RUN opam depext -uiy cstruct cstruct-lwt lwt lwt logs irmin-git rawlink tuntap astring rresult \ RUN opam pin add -n capnp.dev 'https://github.com/talex5/capnp-ocaml.git#interfaces'
mirage-flow-lwt mirage-channel-lwt io-page decompress capnp RUN opam pin add -n capnp-rpc.dev 'https://github.com/mirage/capnp-rpc.git'
RUN opam pin add -n capnp-rpc-lwt.dev 'https://github.com/mirage/capnp-rpc.git'
COPY sdk.opam /src
RUN sudo chown opam -R /src
RUN opam pin add sdk.local /src -n
RUN opam depext -y alcotest sdk
RUN opam install alcotest && opam install --deps sdk
RUN opam list RUN opam list
COPY ./sdk /src/ COPY ./sdk /src/
COPY ./sdk.opam /src/
RUN sudo chown opam -R /src RUN sudo chown opam -R /src
RUN opam config exec -- jbuilder build -p sdk @install RUN opam update sdk && opam install sdk -t
RUN opam config exec -- jbuilder install sdk
### Privileged Container ### Privileged Container
@ -64,8 +73,9 @@ RUN sudo cp /src/_build/default/dhcp-client/main.exe /bin/dhcp-client
FROM sdk as calf FROM sdk as calf
RUN opam pin add charrua-client https://github.com/yomimono/charrua-client.git#with-cdhcpc -n RUN opam pin add charrua-client.dev https://github.com/samoht/charrua-client.git#with-cdhcpc -n
RUN opam pin add mirage-net-fd 0.2.0 -n RUN opam pin add mirage-net-fd 0.2.0 -n
RUN opam list
RUN opam depext -iy mirage-net-fd charrua-client lwt mirage-types-lwt cmdliner RUN opam depext -iy mirage-net-fd charrua-client lwt mirage-types-lwt cmdliner
RUN opam list RUN opam list

View File

@ -1,39 +1,25 @@
BASE=ocaml/opam:alpine-3.5_ocaml-4.04.0 .PHONY: tag push
FILES=$(shell find . -name jbuild) \
$(shell find sdk/ -name '*.ml') \
$(shell find sdk/ -name '*.mli') \
dhcp-client/bpf/dhcp.c dhcp-client/main.ml \
dhcp-client-calf/unikernel.ml dhcp-client-calf/config.json
IMAGE=dhcp-client
default: push default: push
@
.build: Dockerfile $(FILES) ORG?=linuxkitprojects
docker build $(NO_CACHE) -t $(IMAGE) -f Dockerfile . IMAGE=dhcp-client
docker build $(NO_CACHE) -t $(IMAGE) -f Dockerfile -q . > .build || \ NOCACHE?=
(rm -f $@ && exit 1)
hash: Makefile Dockerfile $(FILES) .build DEPS=Dockerfile \
{ cat $^; \ $(shell find . -name jbuild) \
docker run --rm --entrypoint sh $(IMAGE) -c 'cat /lib/apk/db/installed'; \ $(shell find . -name '*.ml') \
docker run --rm --entrypoint sh $(IMAGE) -c 'opam list'; } \ $(shell find . -name '*.mli') \
| sha1sum | sed 's/ .*//' > $@ $(shell find . -name '*.c') \
$(shell find . -name '*.json')
tag: .build HASH?=$(shell git ls-tree HEAD -- ../$(notdir $(CURDIR)) | awk '{print $$3}')
docker tag $(IMAGE) linuxkitprojects/$(IMAGE):$(shell cat hash)
push: hash .build tag: $(DEPS)
docker pull $(BASE) docker build --squash $(NOCACHE) -t $(ORG)/$(IMAGE):$(HASH) .
docker pull linuxkitprojects/$(IMAGE):$(shell cat hash) || \
(docker tag $(IMAGE) linuxkitprojects/$(IMAGE):$(shell cat hash) && \
docker push linuxkitprojects/$(IMAGE):$(shell cat hash))
clean:: push: tag
rm -rf hash .build DOCKER_CONTENT_TRUST=1 docker pull $(ORG)/$(IMAGE):$(HASH) || \
(docker rmi -f $(IMAGE) || echo ok) DOCKER_CONTENT_TRUST=1 docker push $(ORG)/$(IMAGE):$(HASH)
#### DEV #### DEV
@ -42,11 +28,9 @@ clean::
test: test:
jbuilder runtest --dev jbuilder runtest --dev
dev-clean: clean:
rm -rf _build jbuilder clean
dev: dev:
jbuilder build dhcp-client/main.exe --dev jbuilder build dhcp-client/main.exe --dev
jbuilder build dhcp-client-calf/unikernel.exe --dev jbuilder build dhcp-client-calf/unikernel.exe --dev
.DELETE_ON_ERROR:

View File

@ -208,8 +208,10 @@ let flow (x: int) = Sdk.Init.file_descr (Lwt_unix.of_unix_file_descr @@ fd x)
let start () dhcp_codes net ctl = let start () dhcp_codes net ctl =
Lwt_main.run ( Lwt_main.run (
Lwt_switch.with_switch @@ fun switch ->
let net = fd net in let net = fd net in
let ctl = Sdk.Ctl.Client.v (flow ctl) in let client = Capnp_rpc_lwt.CapTP.of_endpoint ~switch (Capnp_rpc_lwt.Endpoint.of_flow ~switch (module Sdk.IO) (flow ctl)) in
let ctl = Capnp_rpc_lwt.CapTP.bootstrap client in
start () dhcp_codes net ctl start () dhcp_codes net ctl
) )

View File

@ -28,6 +28,8 @@
#include <net/ethernet.h> #include <net/ethernet.h>
#ifdef __linux__
#include <linux/if_packet.h> #include <linux/if_packet.h>
#include <linux/filter.h> #include <linux/filter.h>
@ -87,3 +89,5 @@ CAMLprim value bpf_filter(value vunit)
memcpy(String_val(vfilter), bootp_bpf_filter, sizeof(bootp_bpf_filter)); memcpy(String_val(vfilter), bootp_bpf_filter, sizeof(bootp_bpf_filter));
CAMLreturn (vfilter); CAMLreturn (vfilter);
} }
#endif

View File

@ -83,13 +83,18 @@ let run () cmd ethif path =
| Some f -> read_cmd f | Some f -> read_cmd f
in in
Lwt_main.run ( Lwt_main.run (
Lwt_switch.with_switch @@ fun switch ->
let routes = [ let routes = [
["ip"] , [`Write]; ["ip"] , [`Write];
["mac"] , [`Read ]; ["mac"] , [`Read ];
["gateway"], [`Write]; ["gateway"], [`Write];
] in ] in
Ctl.v path >>= fun db -> Ctl.v path >>= fun db ->
let ctl fd = Ctl.Server.listen ~routes db fd in let ctl fd =
let service = Ctl.Server.service ~routes db in
let endpoint = Capnp_rpc_lwt.Endpoint.of_flow ~switch (module Sdk.IO) fd in
ignore (Capnp_rpc_lwt.CapTP.of_endpoint ~switch ~offer:service endpoint)
in
let handlers () = Handlers.watch ~ethif db in let handlers () = Handlers.watch ~ethif db in
let net = Init.rawlink ~filter:(dhcp_filter ()) ethif in let net = Init.rawlink ~filter:(dhcp_filter ()) ethif in
Net.mac ethif >>= fun mac -> Net.mac ethif >>= fun mac ->

View File

@ -13,6 +13,7 @@ depends: [
"jbuilder" {build & >= "1.0+beta7"} "jbuilder" {build & >= "1.0+beta7"}
"ocamlfind" {build} "ocamlfind" {build}
"cstruct" "cstruct"
"cstruct-lwt"
"lwt" "lwt"
"logs" "astring" "rresult" "logs" "astring" "rresult"
"mirage-flow-lwt" "mirage-flow-lwt"
@ -20,5 +21,9 @@ depends: [
"io-page" "io-page"
"irmin-git" "irmin-git"
"decompress" "decompress"
"capnp" "capnp-rpc-lwt"
"rawlink"
"tuntap"
"ipaddr"
"alcotest" {test}
] ]

View File

@ -1,4 +1,5 @@
open Lwt.Infix open Lwt.Infix
open Capnp_rpc_lwt
let src = Logs.Src.create "init" ~doc:"Init steps" let src = Logs.Src.create "init" ~doc:"Init steps"
module Log = (val Logs.src_log src : Logs.LOG) module Log = (val Logs.src_log src : Logs.LOG)
@ -20,6 +21,8 @@ end
module Store = Irmin_git.FS.KV(No_IO)(Inflator)(Io_fs) module Store = Irmin_git.FS.KV(No_IO)(Inflator)(Io_fs)
module KV = Store(Irmin.Contents.String) module KV = Store(Irmin.Contents.String)
let pp_path = Fmt.(brackets (list ~sep:(const string "/") string))
let v path = let v path =
let config = Irmin_git.config path in let config = Irmin_git.config path in
KV.Repo.v config >>= fun repo -> KV.Repo.v config >>= fun repo ->
@ -33,282 +36,50 @@ let () =
module C = Mirage_channel_lwt.Make(IO) module C = Mirage_channel_lwt.Make(IO)
module P = Proto.Make(Capnp.BytesMessage)
exception Undefined_field of int exception Undefined_field of int
module Endpoint = struct let errorf fmt =
Fmt.kstrf (fun x -> Error (`Msg x)) fmt
let compression = `None
type t = {
output : IO.t;
input : C.t; (* reads are buffered *)
decoder: Capnp.Codecs.FramedStream.t;
}
type error = [
| `IO of IO.write_error
| `Channel of C.error
| `Msg of string
| `Undefined_field of int
]
let pp_error ppf (e:error) = match e with
| `IO e -> Fmt.pf ppf "IO: %a" IO.pp_write_error e
| `Channel e -> Fmt.pf ppf "channel: %a" C.pp_error e
| `Msg e -> Fmt.string ppf e
| `Undefined_field i -> Fmt.pf ppf "undefined field %d" i
let err_io e = Error (`IO e)
let err_channel e = Error (`Channel e)
let err_msg fmt = Fmt.kstrf (fun s -> Error (`Msg s)) fmt
let err_frame = err_msg "Unsupported Cap'n'Proto frame received"
let err_undefined_field i = Error (`Undefined_field i)
let v fd =
let output = fd in
let input = C.create fd in
let decoder = Capnp.Codecs.FramedStream.empty compression in
{ output; input; decoder }
let send t msg =
let buf = Capnp.Codecs.serialize ~compression msg in
(* FIXME: avoid copying *)
IO.write t.output (Cstruct.of_string buf) >|= function
| Error e -> err_io e
| Ok () -> Ok ()
let rec recv t =
match Capnp.Codecs.FramedStream.get_next_frame t.decoder with
| Ok msg -> Lwt.return (Ok (`Data msg))
| Error Capnp.Codecs.FramingError.Unsupported -> Lwt.return err_frame
| Error Capnp.Codecs.FramingError.Incomplete ->
Log.info (fun f -> f "Endpoint.recv: incomplete; waiting for more data");
C.read_some ~len:4096 t.input >>= function
| Ok `Eof -> Lwt.return (Ok `Eof)
| Error e -> Lwt.return (err_channel e)
| Ok (`Data data) ->
(* FIXME: avoid copying *)
let data = Cstruct.to_string data in
Log.info (fun f -> f "Got %S" data);
Capnp.Codecs.FramedStream.add_fragment t.decoder data;
recv t
end
module Request = struct
type action =
| Write of string
| Read
| Delete
let pp_action ppf = function
| Write s -> Fmt.pf ppf "write[%S]" s
| Read -> Fmt.pf ppf "read"
| Delete -> Fmt.pf ppf "delete"
type t = {
id : int32 Lazy.t;
path : string list Lazy.t;
action: action Lazy.t;
}
let id t = Lazy.force t.id
let path t = Lazy.force t.path
let action t = Lazy.force t.action
let pp_path = Fmt.(list ~sep:(unit "/") string)
let pp ppf t =
let id = id t and path = path t and action = action t in
match action with
| exception Undefined_field i -> Fmt.pf ppf "<undefined-field: %d>" i
| action -> Fmt.pf ppf "%ld:%a:%a" id pp_path path pp_action action
let equal x y =
id x = id y && path x = path y && match action x = action y with
| exception Undefined_field _ -> false
| b -> b
let v ~id ~path action =
{ id = lazy id; action = lazy action; path = lazy path }
let read e: (t, Endpoint.error) result Lwt.t =
Endpoint.recv e >|= function
| Error e -> Error e
| Ok `Eof -> Error (`IO `Closed)
| Ok (`Data x) ->
let open P.Reader in
let msg = Request.of_message x in
let id = lazy (Request.id_get msg) in
let path = lazy (Request.path_get_list msg) in
let action = lazy (match Request.get msg with
| Request.Write x -> Write x
| Request.Read -> Read
| Request.Delete -> Delete
| Request.Undefined i -> raise (Undefined_field i)
) in
Ok { id; path; action }
let write e t =
let open P.Builder in
match action t with
| exception Undefined_field i -> Lwt.return (Endpoint.err_undefined_field i)
| action ->
let msg =
let b = Request.init_root () in
Request.id_set b (id t);
ignore (Request.path_set_list b (path t));
(match action with
| Write x -> Request.write_set b x
| Read -> Request.read_set b
| Delete -> Request.delete_set b);
b
in
Endpoint.send e (Request.to_message msg)
end
module Response = struct
type status = (string, string) result
let pp_status ppf = function
| Ok ok -> Fmt.pf ppf "ok:%S" ok
| Error e -> Fmt.pf ppf "error:%S" e
type t = {
id : int32 Lazy.t;
status: status Lazy.t;
}
let v ~id status = { id = lazy id; status = lazy status }
let id t = Lazy.force t.id
let status t = Lazy.force t.status
let pp ppf t = match status t with
| exception Undefined_field i -> Fmt.pf ppf "<undefined-field %d>" i
| s -> Fmt.pf ppf "%ld:%a" (id t) pp_status s
let equal x y =
id x = id y && match status x = status y with
| exception Undefined_field _ -> false
| b -> b
let read e: (t, Endpoint.error) result Lwt.t =
Endpoint.recv e >|= function
| Error e -> Error e
| Ok `Eof -> Error (`IO `Closed)
| Ok (`Data x) ->
let open P.Reader in
let msg = Response.of_message x in
let id = lazy (Response.id_get msg) in
let status = lazy (match Response.get msg with
| Response.Ok x -> Ok x
| Response.Error x -> Error x
| Response.Undefined i -> raise (Undefined_field i)
) in
Ok { id; status }
let write e t =
let open P.Builder in
match status t with
| exception Undefined_field i -> Lwt.return (Endpoint.err_undefined_field i)
| s ->
let msg =
let b = Response.init_root () in
Response.id_set b (id t);
(match s with
| Error s -> Response.error_set b s
| Ok s -> Response.ok_set b s);
b
in
Endpoint.send e (Response.to_message msg)
end
let err_not_found = "err-not-found"
module Client = struct module Client = struct
let new_id = module C = Ctl_api.Reader.Ctl
let n = ref 0l in
fun () -> n := Int32.succ !n; !n
type error = [`Msg of string] type error = [`Msg of string]
let pp_error ppf (`Msg s) = Fmt.string ppf s let pp_error ppf (`Msg s) = Fmt.string ppf s
module K = struct type t = C.t Capability.t
type t = int32
let equal = Int32.equal
let hash = Hashtbl.hash
end
module Cache = Hashtbl.Make(K)
type t = {
e : Endpoint.t;
replies: Response.t Cache.t;
}
let v fd = { e = Endpoint.v fd; replies = Cache.create 12 }
let err e = Fmt.kstrf (fun e -> Error (`Msg e)) "%a" Endpoint.pp_error e
let call t r =
let id = Request.id r in
Request.write t.e r >>= function
| Error e -> Lwt.return (err e)
| Ok () ->
let rec loop () =
try
let r = Cache.find t.replies id in
Cache.remove t.replies id;
Lwt.return r
with Not_found ->
Response.read t.e >>= function
| Error e ->
Log.err (fun l -> l "Got %a while waiting for a reply to %ld"
Endpoint.pp_error e id);
loop ()
| Ok r ->
let rid = Response.id r in
if rid = id then Lwt.return r
else (
(* FIXME: maybe we want to check if id is not already
allocated *)
Cache.add t.replies rid r;
loop ()
)
in
loop () >|= fun r ->
assert (Response.id r = id);
match Response.status r with
| Ok s -> Ok s
| Error s -> Error (`Msg s)
let request path action =
let id = new_id () in
Request.v ~id ~path action
let read t path = let read t path =
call t (request path Read) >|= function let module P = Ctl_api.Builder.Ctl.Read_params in
| Ok x -> Ok (Some x) let module R = Ctl_api.Reader.Response in
| Error e -> let req, p = Capability.Request.create P.init_pointer in
if e = `Msg err_not_found then Ok None P.path_set_list p path |> ignore;
else Error e Capability.call_for_value t C.read_method req >|= function
| Error e -> errorf "error: read(%a) -> %a" pp_path path Capnp_rpc.Error.pp e
| Ok r ->
let r = R.of_payload r in
match R.get r with
| R.Ok data -> Ok (Some data)
| R.NotFound -> Ok None
| R.Undefined _ -> Error (`Msg "invalid return")
let write t path v = let write t path data =
call t (request path @@ Write v) >|= function let module P = Ctl_api.Builder.Ctl.Write_params in
| Ok "" -> Ok () let req, p = Capability.Request.create P.init_pointer in
| Ok _ -> Error (`Msg "invalid return") P.path_set_list p path |> ignore;
| Error _ as e -> e P.data_set p data;
Capability.call_for_value t C.write_method req >|= function
| Ok _ -> Ok ()
| Error e -> errorf "error: write(%a) -> %a" pp_path path Capnp_rpc.Error.pp e
let delete t path = let delete t path =
call t (request path Delete) >|= function let module P = Ctl_api.Builder.Ctl.Delete_params in
| Ok "" -> Ok () let req, p = Capability.Request.create P.init_pointer in
| Ok _ -> Error (`Msg "invalid return") P.path_set_list p path |> ignore;
| Error _ as e -> e Capability.call_for_value t C.delete_method req >|= function
| Ok _ -> Ok ()
| Error e -> errorf "error: delete(%a) -> %a" pp_path path Capnp_rpc.Error.pp e
end end
@ -316,80 +87,68 @@ module Server = struct
type op = [ `Read | `Write | `Delete ] type op = [ `Read | `Write | `Delete ]
let ok q s = Response.v ~id:(Request.id q) (Ok s)
let error q s = Response.v ~id:(Request.id q) (Error s)
let with_key q f = f (Request.path q)
let infof fmt = let infof fmt =
Fmt.kstrf (fun msg () -> Fmt.kstrf (fun msg () ->
let date = Int64.of_float (Unix.gettimeofday ()) in let date = Int64.of_float (Unix.gettimeofday ()) in
Irmin.Info.v ~date ~author:"calf" msg Irmin.Info.v ~date ~author:"calf" msg
) fmt ) fmt
let not_allowed q = let not_allowed path =
let path = Request.path q in let err = Fmt.strf "%a is not an allowed path" pp_path path in
let err = Fmt.strf "%a is not an allowed path" Request.pp_path path in Log.err (fun l -> l "%s" err);
Log.err (fun l -> l "%ld: %a" (Request.id q) Request.pp_path path); err
error q err
let dispatch db op q = let write db key value =
with_key q (fun key -> let info = infof "Updating %a" KV.Key.pp key in
let can x = List.mem x op in KV.set db ~info key value
match Request.action q with
| exception Undefined_field i ->
Fmt.kstrf (fun e -> Lwt.return (error q e)) "undefined field %i" i
| Write s when can `Write ->
let info = infof "Updating %a" KV.Key.pp key in
KV.set db ~info key s >|= fun () ->
ok q ""
| Delete when can `Delete ->
let info = infof "Removing %a" KV.Key.pp key in
KV.remove db ~info key >|= fun () ->
ok q ""
| Read when can `Read ->
(KV.find db key >|= function
| None -> error q err_not_found
| Some v -> ok q v)
| _ -> Lwt.return (not_allowed q)
)
let listen ~routes db fd = let delete db key =
Log.debug (fun l -> l "Serving the control state over %a" IO.pp fd); let info = infof "Removing %a" KV.Key.pp key in
let queries = Queue.create () in KV.remove db ~info key
let cond = Lwt_condition.create () in
let e = Endpoint.v fd in
let rec listen () =
Request.read e >>= function
| Error (`Channel _ | `IO _ as e) ->
Log.err (fun l -> l "fatal error: %a" Endpoint.pp_error e);
Lwt.return_unit
| Error (`Msg _ | `Undefined_field _ as e) ->
Log.err (fun l -> l "transient error: %a" Endpoint.pp_error e);
listen ()
| Ok q ->
Queue.add q queries;
Lwt_condition.signal cond ();
listen ()
in
let rec process () =
Lwt_condition.wait cond >>= fun () ->
let q = Queue.pop queries in
let path = Request.path q in
(if List.mem_assoc path routes then (
let op = List.assoc path routes in
dispatch db op q >>= fun r ->
Response.write e r
) else (
Response.write e (not_allowed q)
)) >>= function
| Ok () -> process ()
| Error e ->
Log.err (fun l -> l "%a" Endpoint.pp_error e);
process ()
in
Lwt.pick [
listen ();
process ();
]
let with_permission_check ~routes op key fn =
match List.assoc key routes with
| perms when List.mem op perms -> fn ()
| _ -> Service.fail "%s" (not_allowed key)
| exception Not_found -> Service.fail "%s" (not_allowed key)
let service ~routes db =
Ctl_api.Builder.Ctl.local @@
object (_ : Ctl_api.Builder.Ctl.service)
method read req =
let module P = Ctl_api.Reader.Ctl.Read_params in
let module R = Ctl_api.Builder.Response in
let params = P.of_payload req in
let key = P.path_get_list params in
with_permission_check ~routes `Read key @@ fun () ->
Service.return_lwt (fun () ->
let resp, r = Service.Response.create R.init_pointer in
(KV.find db key >|= function
| None -> R.not_found_set r
| Some x -> R.ok_set r x
) >|= fun () ->
Ok resp
)
method write req =
let module P = Ctl_api.Reader.Ctl.Write_params in
let params = P.of_payload req in
let key = P.path_get_list params in
let value = P.data_get params in
with_permission_check ~routes `Write key @@ fun () ->
Service.return_lwt (fun () ->
write db key value >|= fun () ->
Ok (Service.Response.create_empty ())
)
method delete req =
let module P = Ctl_api.Reader.Ctl.Delete_params in
let params = P.of_payload req in
let key = P.path_get_list params in
with_permission_check ~routes `Delete key @@ fun () ->
Service.return_lwt (fun () ->
delete db key >|= fun () ->
Ok (Service.Response.create_empty ())
)
end
end end

View File

@ -4,103 +4,6 @@
exception Undefined_field of int exception Undefined_field of int
module Endpoint: sig
type t
(** The type for SDK endpoints. *)
val v: IO.t ->t
(** [v f] is a fresh endpoint state built on top of the flow [f]. *)
(** The type for endpoint errors. *)
type error = private [>
| `IO of IO.write_error
| `Msg of string
| `Undefined_field of int
]
val pp_error: error Fmt.t
(** [pp_error] is the pretty-printer for errors. *)
end
module Request: sig
type t
(** The type for SDK requests. *)
(** The type for request actions. *)
type action =
| Write of string
| Read
| Delete
val pp: t Fmt.t
(** [pp] is the pretty-printer for requests. *)
val equal: t -> t -> bool
(** [equal] is the equality function for requests. *)
val pp_action: action Fmt.t
(** [pp_action] is the pretty-printer for request actions. *)
val action: t -> action
(** [action t] is [t]'s requested operation. Can raise
[Endpoint.Undefined_field]. *)
val path: t -> string list
(** [path t] is the [t]'s request path. *)
val id: t -> int32
(** [id t] it [t]'s request id. *)
val v: id:int32 -> path:string list -> action -> t
(** [v ~id ~path action] is a new request. *)
val write: Endpoint.t -> t -> (unit, Endpoint.error) result Lwt.t
(** [write e t] writes a request message for the
action [action] and the path [path] using the unique ID [id]. *)
val read: Endpoint.t -> (t, Endpoint.error) result Lwt.t
(** [read e] reads a query message. *)
end
module Response: sig
type t
(** The type for responses. *)
(** The type for response status. *)
type status = (string, string) result
val pp: t Fmt.t
(** [pp] is the pretty-printer for responses. *)
val equal: t -> t -> bool
(** [equal] is the equality function for responses. *)
val pp_status: status Fmt.t
(** [pp_status] is the pretty-printer for response statuses. *)
val status: t -> status
(** [status t] is [t]'s response status. Can raise
[Endpoint.Undefined_field]. *)
val id: t -> int32
(** [id t] is [t]'s response ID. *)
val v: id:int32 -> status -> t
(** [v ~id status] is a new response. *)
val write: Endpoint.t -> t -> (unit, Endpoint.error) result Lwt.t
(** [write fd t] writes a reply message. *)
val read: Endpoint.t -> (t, Endpoint.error) result Lwt.t
(** [read fd] reads a reply message. *)
end
module Client: sig module Client: sig
(** Client-side of the control plane. The control plane state is a (** Client-side of the control plane. The control plane state is a
@ -110,7 +13,7 @@ module Client: sig
TODO: decide if we want to support test_and_set (instead of TODO: decide if we want to support test_and_set (instead of
write) and some kind of watches. *) write) and some kind of watches. *)
type t type t = Ctl_api.Reader.Ctl.t Capnp_rpc_lwt.Capability.t
(** The type for client state. *) (** The type for client state. *)
type error type error
@ -119,11 +22,6 @@ module Client: sig
val pp_error: error Fmt.t val pp_error: error Fmt.t
(** [pp_error] is the pretty-printer for client errors. *) (** [pp_error] is the pretty-printer for client errors. *)
val v: IO.t -> t
(** [v fd] is the client state using [fd] to send requests to the
server. A client state also stores some state for all the
incomplete client queries. *)
val read: t -> string list -> (string option, error) result Lwt.t val read: t -> string list -> (string option, error) result Lwt.t
(** [read t k] is the value associated with the key [k] in the (** [read t k] is the value associated with the key [k] in the
control plane state. Return [None] if no value is associated to control plane state. Return [None] if no value is associated to
@ -150,8 +48,8 @@ module Server: sig
type op = [ `Read | `Write | `Delete ] type op = [ `Read | `Write | `Delete ]
(** The type for operations to perform on routes. *) (** The type for operations to perform on routes. *)
val listen: routes:(string list * op list) list -> KV.t -> IO.t -> unit Lwt.t val service: routes:(string list * op list) list -> KV.t -> Ctl_api.Reader.Ctl.t Capnp_rpc_lwt.Capability.t
(** [listen ~routes kv fd] is the thread exposing the KV store [kv], (** [service ~routes kv] is the thread exposing the KV store [kv],
holding control plane state, running inside the privileged holding control plane state, running inside the privileged
container. [routes] are the routes exposed by the server to the container. [routes] are the routes exposed by the server to the
calf and [kv] is the control plane state. *) calf and [kv] is the control plane state. *)

View File

@ -288,6 +288,8 @@ let exec_and_forward ?(handlers=block_for_ever) ~pid ~cmd ~net ~ctl t =
let priv_stdout = Fd.flow Pipe.(priv t.stdout) in let priv_stdout = Fd.flow Pipe.(priv t.stdout) in
let priv_stderr = Fd.flow Pipe.(priv t.stderr) in let priv_stderr = Fd.flow Pipe.(priv t.stderr) in
ctl priv_ctl;
Lwt.pick ([ Lwt.pick ([
wait (); wait ();
(* data *) (* data *)
@ -298,7 +300,6 @@ let exec_and_forward ?(handlers=block_for_ever) ~pid ~cmd ~net ~ctl t =
IO.forward ~verbose:false ~src:priv_stderr ~dst:Fd.(flow stderr); IO.forward ~verbose:false ~src:priv_stderr ~dst:Fd.(flow stderr);
(* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) (* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics)
~dst:Init.Fd.metric; *) ~dst:Init.Fd.metric; *)
ctl priv_ctl;
handlers (); handlers ();
]) ])

View File

@ -110,11 +110,11 @@ val exec: Pipe.monitor -> string list -> (int -> unit Lwt.t) -> unit Lwt.t
(* FIXME(samoht): not very happy with that signatue *) (* FIXME(samoht): not very happy with that signatue *)
val run: Pipe.monitor -> val run: Pipe.monitor ->
net:IO.t -> ctl:(IO.t -> unit Lwt.t) -> net:IO.t -> ctl:(IO.t -> unit) ->
?handlers:(unit -> unit Lwt.t) -> ?handlers:(unit -> unit Lwt.t) ->
string list -> unit Lwt.t string list -> unit Lwt.t
(** [run m ~net ~ctl ?handlers cmd] runs [cmd] in a unprivileged calf (** [run m ~net ~ctl ?handlers cmd] runs [cmd] in a unprivileged calf
process. [net] is the network interface flow. [ctl] is the control process. [net] is the network interface flow. [ctl] runs the control
thread connected to the {Pipe.ctl} pipe. [handlers] are the system thread connected to the {Pipe.ctl} pipe. [handlers] are the system
handler thread which will react to control data to perform handler thread which will react to control data to perform
privileged system actions. *) privileged system actions. *)

View File

@ -3,16 +3,12 @@
(library (library
((name sdk) ((name sdk)
(public_name sdk) (public_name sdk)
(flags (:standard -w -53-55))
(libraries (cstruct.lwt decompress irmin irmin-git lwt.unix rawlink (libraries (cstruct.lwt decompress irmin irmin-git lwt.unix rawlink
tuntap astring rresult mirage-flow-lwt capnp tuntap astring rresult mirage-flow-lwt capnp capnp-rpc-lwt
mirage-channel-lwt io-page.unix ipaddr)) mirage-channel-lwt io-page.unix ipaddr))))
))
(rule (rule
((targets (proto.ml proto.mli)) ((targets (proto.ml proto.mli))
(deps (proto.capnp)) (deps (proto.capnp))
(action (progn (action (run capnp compile -o ocaml ${<}))))
(run capnp compile -o ocaml ${<})
(system "mv proto.ml proto.ml.in")
(system "echo '[@@@ocaml.warning \"-A\"]\n' > proto.ml")
(system "cat proto.ml.in >> proto.ml")))))

View File

@ -1,19 +1,14 @@
@0x9e83562906de8259; @0x9e83562906de8259;
struct Request { struct Response {
id @0 :Int32;
path @1 :List(Text);
union { union {
write @2 :Data; ok @0 :Data;
read @3 :Void; notFound @1 :Void;
delete @4 :Void;
} }
} }
struct Response { interface Ctl {
id @0: Int32; write @0 (path :List(Text), data: Data) -> ();
union { read @1 (path :List(Text)) -> Response;
ok @1 :Data; delete @2 (path :List(Text)) -> ();
error @2 :Data;
}
} }

View File

@ -93,50 +93,6 @@ let test_socketpair pipe () =
Lwt.return_unit Lwt.return_unit
let request = Alcotest.testable Ctl.Request.pp Ctl.Request.equal
let response = Alcotest.testable Ctl.Response.pp Ctl.Response.equal
let queries =
let open Ctl.Request in
[
v ~id:0l ~path:["foo";"bar"] Read;
v ~id:Int32.max_int ~path:[] (Write "foo");
v ~id:0l ~path:[] Delete;
v ~id:(-3l) ~path:["foo"] Delete;
]
let replies =
let open Ctl.Response in
[
v ~id:0l (Ok "");
v ~id:Int32.max_int (Ok "foo");
v ~id:0l (Error "");
v ~id:(-3l) (Error "foo");
]
let failf fmt = Fmt.kstrf Alcotest.fail fmt
let test_send t write read message messages =
let calf = Ctl.Endpoint.v @@ calf Init.Pipe.(ctl t) in
let priv = Ctl.Endpoint.v @@ priv Init.Pipe.(ctl t) in
let test m =
write calf m >>= function
| Error e -> failf "Message.write: %a" Ctl.Endpoint.pp_error e
| Ok () ->
read priv >|= function
| Ok m' -> Alcotest.(check message) "write/read" m m'
| Error e -> failf "Message.read: %a" Ctl.Endpoint.pp_error e
in
Lwt_list.iter_s test messages
let test_request_send t () =
let open Ctl.Request in
test_send t write read request queries
let test_response_send t () =
let open Ctl.Response in
test_send t write read response replies
let failf fmt = Fmt.kstrf Alcotest.fail fmt let failf fmt = Fmt.kstrf Alcotest.fail fmt
(* read ops *) (* read ops *)
@ -188,6 +144,7 @@ let delete_should_work t k =
| Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e | Error e -> failf "write(%a) -> error: %a" pp_path k pp_error e
let test_ctl t () = let test_ctl t () =
Lwt_switch.with_switch @@ fun switch ->
let calf = calf Init.Pipe.(ctl t) in let calf = calf Init.Pipe.(ctl t) in
let priv = priv Init.Pipe.(ctl t) in let priv = priv Init.Pipe.(ctl t) in
let k1 = ["foo"; "bar"] in let k1 = ["foo"; "bar"] in
@ -199,32 +156,30 @@ let test_ctl t () =
let git_root = "/tmp/sdk/ctl" in let git_root = "/tmp/sdk/ctl" in
let _ = Sys.command (Fmt.strf "rm -rf %s" git_root) in let _ = Sys.command (Fmt.strf "rm -rf %s" git_root) in
Ctl.v git_root >>= fun ctl -> Ctl.v git_root >>= fun ctl ->
let server () = Ctl.Server.listen ~routes ctl priv in let _server =
let client () = let service = Ctl.Server.service ~routes ctl in
let t = Ctl.Client.v calf in Capnp_rpc_lwt.CapTP.of_endpoint ~switch ~offer:service (Capnp_rpc_lwt.Endpoint.of_flow ~switch (module IO) priv)
let allowed k v =
delete_should_work t k >>= fun () ->
read_should_none t k >>= fun () ->
write_should_work t k v >>= fun () ->
read_should_work t k v >>= fun () ->
Ctl.KV.get ctl k >|= fun v' ->
Alcotest.(check string) "in the db" v v'
in
let disallowed k v =
read_should_err t k >>= fun () ->
write_should_err t k v >>= fun () ->
delete_should_err t k
in
allowed k1 "" >>= fun () ->
allowed k2 "xxx" >>= fun () ->
allowed k3 (random_string (255 * 1024)) >>= fun () ->
disallowed k4 "" >>= fun () ->
Lwt.return_unit
in in
Lwt.pick [ let client = Capnp_rpc_lwt.CapTP.of_endpoint ~switch (Capnp_rpc_lwt.Endpoint.of_flow ~switch (module IO) calf) in
client (); let t = Capnp_rpc_lwt.CapTP.bootstrap client in
server (); let allowed k v =
] delete_should_work t k >>= fun () ->
read_should_none t k >>= fun () ->
write_should_work t k v >>= fun () ->
read_should_work t k v >>= fun () ->
Ctl.KV.get ctl k >|= fun v' ->
Alcotest.(check string) "in the db" v v'
in
let disallowed k v =
read_should_err t k >>= fun () ->
write_should_err t k v >>= fun () ->
delete_should_err t k
in
allowed k1 "" >>= fun () ->
allowed k2 "xxx" >>= fun () ->
allowed k3 (random_string (255 * 1024)) >>= fun () ->
disallowed k4 "" >>= fun () ->
Lwt.return_unit
let in_memory_flow () = let in_memory_flow () =
let flow = Mirage_flow_lwt.F.string () in let flow = Mirage_flow_lwt.F.string () in
@ -268,8 +223,6 @@ let test = [
"stdout is a pipe" , `Quick, run (test_pipe Init.Pipe.(stderr t)); "stdout is a pipe" , `Quick, run (test_pipe Init.Pipe.(stderr t));
"net is a socket pair", `Quick, run (test_socketpair Init.Pipe.(net t)); "net is a socket pair", `Quick, run (test_socketpair Init.Pipe.(net t));
"ctl is a socket pair", `Quick, run (test_socketpair Init.Pipe.(ctl t)); "ctl is a socket pair", `Quick, run (test_socketpair Init.Pipe.(ctl t));
"send requests" , `Quick, run (test_request_send t);
"send responses" , `Quick, run (test_response_send t);
"ctl" , `Quick, run (test_ctl t); "ctl" , `Quick, run (test_ctl t);
"exec" , `Quick, run test_exec; "exec" , `Quick, run test_exec;
] ]