mirror of
https://github.com/linuxkit/linuxkit.git
synced 2025-07-20 17:49:10 +00:00
miragesdk: re-org source code
Split the bits which can be re-used in other services (e.g. init dance and the server-side of the control path). `main.ml` now only contains what is specific to the DHCP logic (+ the /caf directory). Signed-off-by: Thomas Gazagnaire <thomas@gazagnaire.org>
This commit is contained in:
parent
7fa21377b5
commit
56085a3e6c
@ -1,5 +1,5 @@
|
||||
BASE=ocaml/opam:alpine-3.5_ocaml-4.04.0
|
||||
FILES=src/main.ml src/inflator.ml src/io_fs.ml src/bpf/dhcp.c \
|
||||
FILES=$(shell find src/ -regex '.*\.mli?') src/bpf/dhcp.c \
|
||||
src/jbuild src/bpf/jbuild
|
||||
IMAGE=dhcp-client
|
||||
OBJS=obj/dhcp-client
|
||||
|
@ -33,9 +33,9 @@ let net =
|
||||
let key = Key.(create "input" Arg.(opt int 3 doc)) in
|
||||
netif_of_fd key
|
||||
|
||||
let store =
|
||||
let ctl =
|
||||
let doc =
|
||||
Key.Arg.info ~docv:"FD" ~doc:"Store interface" ["store"]
|
||||
Key.Arg.info ~docv:"FD" ~doc:"Control interface" ["ctl"]
|
||||
in
|
||||
let key = Key.(create "output" Arg.(opt int 4 doc)) in
|
||||
netif_of_fd key
|
||||
@ -54,4 +54,4 @@ let main =
|
||||
foreign ~keys ~packages "Unikernel.Main"
|
||||
(time @-> network @-> network @-> job)
|
||||
|
||||
let () = register "dhcp-client" [main $ default_time $ net $ store]
|
||||
let () = register "dhcp-client" [main $ default_time $ net $ ctl]
|
||||
|
@ -199,13 +199,13 @@ end
|
||||
module Main
|
||||
(Time :Mirage_time_lwt.S)
|
||||
(Net : Mirage_net_lwt.S)
|
||||
(Store: Mirage_net_lwt.S) =
|
||||
(Ctl : Mirage_net_lwt.S) =
|
||||
struct
|
||||
|
||||
module API = API(Store)
|
||||
module API = API(Ctl)
|
||||
module Dhcp_client = Dhcp_client_mirage.Make(Time)(Net)
|
||||
|
||||
let start () net store =
|
||||
let start () net ctl =
|
||||
let requests = match Key_gen.codes () with
|
||||
| [] -> default_options
|
||||
| l ->
|
||||
@ -220,6 +220,6 @@ struct
|
||||
Lwt_stream.last_new stream >>= fun result ->
|
||||
let result = of_ipv4_config result in
|
||||
Log.info (fun l -> l "found lease: %a" pp result);
|
||||
API.set_ip store result.address
|
||||
API.set_ip ctl result.address
|
||||
|
||||
end
|
||||
|
128
projects/miragesdk/dhcp-client/src/ctl.ml
Normal file
128
projects/miragesdk/dhcp-client/src/ctl.ml
Normal file
@ -0,0 +1,128 @@
|
||||
open Lwt.Infix
|
||||
|
||||
let src = Logs.Src.create "init" ~doc:"Init steps"
|
||||
module Log = (val Logs.src_log src : Logs.LOG)
|
||||
|
||||
(* FIXME: to avoid linking with gmp *)
|
||||
module IO = struct
|
||||
type ic = unit
|
||||
type oc = unit
|
||||
type ctx = unit
|
||||
let with_connection ?ctx:_ _uri ?init:_ _f = Lwt.fail_with "not allowed"
|
||||
let read_all _ic = Lwt.fail_with "not allowed"
|
||||
let read_exactly _ic _n = Lwt.fail_with "not allowed"
|
||||
let write _oc _buf = Lwt.fail_with "not allowed"
|
||||
let flush _oc = Lwt.fail_with "not allowed"
|
||||
let ctx () = Lwt.return_none
|
||||
end
|
||||
|
||||
(* FIXME: we don't use Irmin_unix.Git.FS.KV to avoid linking with gmp *)
|
||||
module Store = Irmin_git.FS.KV(IO)(Inflator)(Io_fs)
|
||||
module KV = Store(Irmin.Contents.String)
|
||||
|
||||
let v path =
|
||||
let config = Irmin_git.config path in
|
||||
KV.Repo.v config >>= fun repo ->
|
||||
KV.of_branch repo "calf"
|
||||
|
||||
let set_listen_dir_hook () =
|
||||
Irmin.Private.Watch.set_listen_dir_hook Irmin_watcher.hook
|
||||
|
||||
module HTTP = struct
|
||||
|
||||
module Wm = struct
|
||||
module Rd = Webmachine.Rd
|
||||
include Webmachine.Make(Cohttp_lwt_unix.Server.IO)
|
||||
end
|
||||
|
||||
let with_key rd f =
|
||||
match KV.Key.of_string rd.Wm.Rd.dispatch_path with
|
||||
| Ok x -> f x
|
||||
| Error _ -> Wm.respond 404 rd
|
||||
|
||||
let infof fmt =
|
||||
Fmt.kstrf (fun msg () ->
|
||||
let date = Int64.of_float (Unix.gettimeofday ()) in
|
||||
Irmin.Info.v ~date ~author:"calf" msg
|
||||
) fmt
|
||||
|
||||
let ok = "{\"status\": \"ok\"}"
|
||||
|
||||
class item db = object(self)
|
||||
|
||||
inherit [Cohttp_lwt_body.t] Wm.resource
|
||||
|
||||
method private of_string rd =
|
||||
Cohttp_lwt_body.to_string rd.Wm.Rd.req_body >>= fun value ->
|
||||
with_key rd (fun key ->
|
||||
let info = infof "Updating %a" KV.Key.pp key in
|
||||
KV.set db ~info key value >>= fun () ->
|
||||
let resp_body = `String ok in
|
||||
let rd = { rd with Wm.Rd.resp_body } in
|
||||
Wm.continue true rd
|
||||
)
|
||||
|
||||
method private to_string rd =
|
||||
with_key rd (fun key ->
|
||||
KV.find db key >>= function
|
||||
| Some value -> Wm.continue (`String value) rd
|
||||
| None -> assert false
|
||||
)
|
||||
|
||||
method resource_exists rd =
|
||||
with_key rd (fun key ->
|
||||
KV.mem db key >>= fun mem ->
|
||||
Wm.continue mem rd
|
||||
)
|
||||
|
||||
method allowed_methods rd =
|
||||
Wm.continue [`GET; `HEAD; `PUT; `DELETE] rd
|
||||
|
||||
method content_types_provided rd =
|
||||
Wm.continue [
|
||||
"plain", self#to_string
|
||||
] rd
|
||||
|
||||
method content_types_accepted rd =
|
||||
Wm.continue [
|
||||
"plain", self#of_string
|
||||
] rd
|
||||
|
||||
method delete_resource rd =
|
||||
with_key rd (fun key ->
|
||||
let info = infof "Deleting %a" KV.Key.pp key in
|
||||
KV.remove db ~info key >>= fun () ->
|
||||
let resp_body = `String ok in
|
||||
Wm.continue true { rd with Wm.Rd.resp_body }
|
||||
)
|
||||
end
|
||||
|
||||
let v db routes =
|
||||
let routes = List.map (fun r -> r, fun () -> new item db) routes in
|
||||
let callback (_ch, _conn) request body =
|
||||
let open Cohttp in
|
||||
(Wm.dispatch' routes ~body ~request >|= function
|
||||
| None -> (`Not_found, Header.init (), `String "Not found", [])
|
||||
| Some result -> result)
|
||||
>>= fun (status, headers, body, path) ->
|
||||
Log.info (fun l ->
|
||||
l "%d - %s %s"
|
||||
(Code.code_of_status status)
|
||||
(Code.string_of_method (Request.meth request))
|
||||
(Uri.path (Request.uri request)));
|
||||
Log.debug (fun l -> l "path=%a" Fmt.(Dump.list string) path);
|
||||
(* Finally, send the response to the client *)
|
||||
Cohttp_lwt_unix.Server.respond ~flush:true ~headers ~body ~status ()
|
||||
in
|
||||
(* create the server and handle requests with the function defined above *)
|
||||
let conn_closed (_, conn) =
|
||||
Log.info (fun l ->
|
||||
l "connection %s closed\n%!" (Cohttp.Connection.to_string conn))
|
||||
in
|
||||
Cohttp_lwt_unix.Server.make ~callback ~conn_closed ()
|
||||
end
|
||||
|
||||
let serve ~routes db fd =
|
||||
let http = HTTP.v db routes in
|
||||
let on_exn e = Log.err (fun l -> l "ERROR: %a" Fmt.exn e) in
|
||||
Cohttp_lwt_unix.Server.create ~on_exn ~mode:(`Fd fd) http
|
16
projects/miragesdk/dhcp-client/src/ctl.mli
Normal file
16
projects/miragesdk/dhcp-client/src/ctl.mli
Normal file
@ -0,0 +1,16 @@
|
||||
(** [Control] handle the server part of the control path, running in
|
||||
the privileged container. *)
|
||||
|
||||
module KV: Irmin.KV with type contents = string
|
||||
|
||||
val v: string -> KV.t Lwt.t
|
||||
(** [v p] is the KV store storing the control state, located at path
|
||||
[p] in the filesystem of the privileged container. *)
|
||||
|
||||
val serve: routes:string list -> KV.t -> Lwt_unix.file_descr -> unit Lwt.t
|
||||
(** [serve ~routes kv fd] is the thread exposing the KV store [kv],
|
||||
holding control state, running inside the privileged container.
|
||||
[routes] are the routes exposed by the server (currently over a
|
||||
simple HTTP server -- but will change to something else later,
|
||||
probably protobuf) to the calf and [kv] is the control state
|
||||
handler. *)
|
216
projects/miragesdk/dhcp-client/src/init.ml
Normal file
216
projects/miragesdk/dhcp-client/src/init.ml
Normal file
@ -0,0 +1,216 @@
|
||||
open Lwt.Infix
|
||||
|
||||
let src = Logs.Src.create "init" ~doc:"Init steps"
|
||||
module Log = (val Logs.src_log src : Logs.LOG)
|
||||
|
||||
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
|
||||
|
||||
module Fd = struct
|
||||
|
||||
type t = {
|
||||
name: string;
|
||||
fd : Lwt_unix.file_descr;
|
||||
}
|
||||
|
||||
let fd t = t.fd
|
||||
let stdout = { name = "stdout"; fd = Lwt_unix.stdout }
|
||||
let stderr = { name = "stderr"; fd = Lwt_unix.stderr }
|
||||
let stdin = { name = "stdin" ; fd = Lwt_unix.stdin }
|
||||
|
||||
let to_int t =
|
||||
(Obj.magic (Lwt_unix.unix_file_descr t.fd): int)
|
||||
|
||||
let pp ppf fd = Fmt.pf ppf "%s:%d" fd.name (to_int fd)
|
||||
|
||||
let close fd =
|
||||
Log.debug (fun l -> l "close %a" pp fd);
|
||||
Lwt_unix.close fd.fd
|
||||
|
||||
let dev_null =
|
||||
Lwt_unix.of_unix_file_descr ~blocking:false
|
||||
(Unix.openfile "/dev/null" [Unix.O_RDWR] 0)
|
||||
|
||||
let redirect_to_dev_null fd =
|
||||
Log.debug (fun l -> l "redirect-stdin-to-dev-null");
|
||||
Lwt_unix.close fd.fd >>= fun () ->
|
||||
Lwt_unix.dup2 dev_null fd.fd;
|
||||
Lwt_unix.close dev_null
|
||||
|
||||
let dup2 ~src ~dst =
|
||||
Log.debug (fun l -> l "dup2 %a => %a" pp src pp dst);
|
||||
Lwt_unix.dup2 src.fd dst.fd;
|
||||
close src
|
||||
|
||||
let proxy_net ~net fd =
|
||||
Log.debug (fun l -> l "proxy-net eth0 <=> %a" pp fd);
|
||||
let rec listen_rawlink () =
|
||||
Lwt_rawlink.read_packet net >>= fun buf ->
|
||||
Log.debug (fun l -> l "PROXY-NET: => %a" Cstruct.hexdump_pp buf);
|
||||
Log.debug (fun l -> l "PROXY-NET: => %S" (Cstruct.to_string buf));
|
||||
let rec write buf =
|
||||
Lwt_cstruct.write fd.fd buf >>= function
|
||||
| 0 -> Lwt.return_unit
|
||||
| n -> write (Cstruct.shift buf n)
|
||||
in
|
||||
write buf >>= fun () ->
|
||||
listen_rawlink ()
|
||||
in
|
||||
let listen_socket () =
|
||||
let len = 16 * 1024 in
|
||||
let buf = Cstruct.create len in
|
||||
let rec loop () =
|
||||
Lwt_cstruct.read fd.fd buf >>= fun len ->
|
||||
let buf = Cstruct.sub buf 0 len in
|
||||
Log.debug (fun l -> l "PROXY-NET: <= %a" Cstruct.hexdump_pp buf);
|
||||
Lwt_rawlink.send_packet net buf >>= fun () ->
|
||||
loop ()
|
||||
in
|
||||
loop ()
|
||||
in
|
||||
Lwt.pick [
|
||||
listen_rawlink ();
|
||||
listen_socket ();
|
||||
]
|
||||
|
||||
let rec really_write dst buf off len =
|
||||
match len with
|
||||
| 0 -> Lwt.return_unit
|
||||
| len ->
|
||||
Lwt_unix.write dst.fd buf off len >>= fun n ->
|
||||
really_write dst buf (off+n) (len-n)
|
||||
|
||||
let forward ~src ~dst =
|
||||
Log.debug (fun l -> l "forward %a => %a" pp src pp dst);
|
||||
let len = 16 * 1024 in
|
||||
let buf = Bytes.create len in
|
||||
let rec loop () =
|
||||
Lwt_unix.read src.fd buf 0 len >>= fun len ->
|
||||
if len = 0 then
|
||||
(* FIXME: why this ever happen *)
|
||||
Fmt.kstrf Lwt.fail_with "FORWARD[%a => %a]: EOF" pp src pp dst
|
||||
else (
|
||||
Log.debug (fun l ->
|
||||
l "FORWARD[%a => %a]: %S (%d)"
|
||||
pp src pp dst (Bytes.sub buf 0 len) len);
|
||||
really_write dst buf 0 len >>= fun () ->
|
||||
loop ()
|
||||
)
|
||||
in
|
||||
loop ()
|
||||
|
||||
let proxy x y =
|
||||
Lwt.pick [
|
||||
forward ~src:x ~dst:y;
|
||||
forward ~src:y ~dst:x;
|
||||
]
|
||||
|
||||
end
|
||||
|
||||
module Pipe = struct
|
||||
|
||||
type t = Fd.t * Fd.t
|
||||
|
||||
let priv = fst
|
||||
let calf = snd
|
||||
|
||||
let socketpair name =
|
||||
let priv, calf = Lwt_unix.(socketpair PF_UNIX SOCK_STREAM 0) in
|
||||
Lwt_unix.clear_close_on_exec priv;
|
||||
Lwt_unix.clear_close_on_exec calf;
|
||||
{ Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf }
|
||||
|
||||
let pipe name =
|
||||
let priv, calf = Lwt_unix.pipe () in
|
||||
Lwt_unix.clear_close_on_exec priv;
|
||||
Lwt_unix.clear_close_on_exec calf;
|
||||
{ Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf }
|
||||
|
||||
(* logs pipe *)
|
||||
let stdout = pipe "logs-out"
|
||||
let stderr = pipe "logs-err"
|
||||
|
||||
(* store pipe *)
|
||||
let ctl = socketpair "ctl"
|
||||
|
||||
(* network pipe *)
|
||||
let net = socketpair "net"
|
||||
|
||||
(* metrics pipe *)
|
||||
let metrics = pipe "metrics"
|
||||
|
||||
end
|
||||
|
||||
let exec_calf cmd =
|
||||
Fd.(redirect_to_dev_null stdin) >>= fun () ->
|
||||
|
||||
(* close parent fds *)
|
||||
Fd.close Pipe.(priv stdout) >>= fun () ->
|
||||
Fd.close Pipe.(priv stderr) >>= fun () ->
|
||||
Fd.close Pipe.(priv ctl) >>= fun () ->
|
||||
Fd.close Pipe.(priv net) >>= fun () ->
|
||||
Fd.close Pipe.(priv metrics) >>= fun () ->
|
||||
|
||||
let cmds = String.concat " " cmd in
|
||||
|
||||
let calf_net = Pipe.(calf net) in
|
||||
let calf_ctl = Pipe.(calf ctl) in
|
||||
let calf_stdout = Pipe.(calf stdout) in
|
||||
let calf_stderr = Pipe.(calf stderr) in
|
||||
|
||||
Log.info (fun l -> l "Executing %s" cmds);
|
||||
Log.debug (fun l -> l "net-fd=%a store-fd=%a" Fd.pp calf_net Fd.pp calf_ctl);
|
||||
|
||||
Fd.dup2 ~src:calf_stdout ~dst:Fd.stdout >>= fun () ->
|
||||
Fd.dup2 ~src:calf_stderr ~dst:Fd.stderr >>= fun () ->
|
||||
|
||||
(* exec the calf *)
|
||||
Unix.execve (List.hd cmd) (Array.of_list cmd) [||]
|
||||
|
||||
let rawlink ?filter ethif =
|
||||
Log.debug (fun l -> l "bringing up %s" ethif);
|
||||
(try Tuntap.set_up_and_running ethif
|
||||
with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e));
|
||||
Lwt_rawlink.open_link ?filter ethif
|
||||
|
||||
let check_exit_status cmd status =
|
||||
let cmds = String.concat " " cmd in
|
||||
match status with
|
||||
| Unix.WEXITED 0 -> Lwt.return_unit
|
||||
| Unix.WEXITED i -> failf "%s: exit %d" cmds i
|
||||
| Unix.WSIGNALED i -> failf "%s: signal %d" cmds i
|
||||
| Unix.WSTOPPED i -> failf "%s: stopped %d" cmds i
|
||||
|
||||
let exec_priv ~pid ~cmd ~net ~ctl ~handlers =
|
||||
|
||||
(* close child fds *)
|
||||
Fd.(redirect_to_dev_null stdin) >>= fun () ->
|
||||
Fd.close Pipe.(calf stdout) >>= fun () ->
|
||||
Fd.close Pipe.(calf stderr) >>= fun () ->
|
||||
Fd.close Pipe.(calf net) >>= fun () ->
|
||||
Fd.close Pipe.(calf ctl) >>= fun () ->
|
||||
Fd.close Pipe.(calf metrics) >>= fun () ->
|
||||
|
||||
let wait () =
|
||||
Lwt_unix.waitpid [] pid >>= fun (_pid, w) ->
|
||||
Lwt_io.flush_all () >>= fun () ->
|
||||
|
||||
check_exit_status cmd w
|
||||
in
|
||||
Lwt.pick ([
|
||||
wait ();
|
||||
(* data *)
|
||||
Fd.proxy_net ~net Pipe.(priv net);
|
||||
|
||||
(* redirect the calf stdout to the shim stdout *)
|
||||
Fd.forward ~src:Pipe.(priv stdout) ~dst:Fd.stdout;
|
||||
Fd.forward ~src:Pipe.(priv stderr) ~dst:Fd.stderr;
|
||||
(* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *)
|
||||
ctl ();
|
||||
handlers ();
|
||||
])
|
||||
|
||||
let run ~net ~ctl ~handlers cmd =
|
||||
Lwt_io.flush_all () >>= fun () ->
|
||||
match Lwt_unix.fork () with
|
||||
| 0 -> exec_calf cmd
|
||||
| pid -> exec_priv ~pid ~cmd ~net ~ctl ~handlers
|
109
projects/miragesdk/dhcp-client/src/init.mli
Normal file
109
projects/miragesdk/dhcp-client/src/init.mli
Normal file
@ -0,0 +1,109 @@
|
||||
(** Init functions.
|
||||
|
||||
[Init] contains funcitons to initialise the state of the
|
||||
privileged container.
|
||||
|
||||
{ul
|
||||
|
||||
{- fowrard and filter the network traffic using BPF (for instance
|
||||
to allow only DHCP traffic).}
|
||||
{- open pipes for forwarding the calf's stdout and stderr
|
||||
to the privileged container's ones.}
|
||||
{- open a pipe to forward the metrics.}
|
||||
{- open a socket pair with the calf to be able to transmit control
|
||||
data, e.g. the IP address once a DHCP lease is obtained.}
|
||||
}*)
|
||||
|
||||
|
||||
module Fd: sig
|
||||
|
||||
type t
|
||||
(** The type for file descriptors. *)
|
||||
|
||||
val pp: t Fmt.t
|
||||
(** [pp_fd] pretty prints a file descriptor. *)
|
||||
|
||||
val fd: t -> Lwt_unix.file_descr
|
||||
(** [fd t] is [t]'s underlying unix file descriptor. *)
|
||||
|
||||
val to_int: t -> int
|
||||
(** [to_int fd] is [fd]'s number. *)
|
||||
|
||||
val redirect_to_dev_null: t -> unit Lwt.t
|
||||
(** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *)
|
||||
|
||||
val close: t -> unit Lwt.t
|
||||
(** [close fd] closes [fd]. *)
|
||||
|
||||
val dup2: src:t -> dst:t -> unit Lwt.t
|
||||
(** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *)
|
||||
|
||||
val proxy_net: net:Lwt_rawlink.t -> t -> unit Lwt.t
|
||||
(** [proxy_net ~net fd] proxies the traffic between the raw net link
|
||||
[net] and [fd]. *)
|
||||
|
||||
val forward: src:t -> dst:t -> unit Lwt.t
|
||||
(** [forward ~src ~dst] forwards the flow from [src] to [dst]. *)
|
||||
|
||||
(** {1 Usefull File Descriptors} *)
|
||||
|
||||
val stdin: t
|
||||
(** [stdin] is the standart input. *)
|
||||
|
||||
val stdout: t
|
||||
(** [stdout] is the standard output. *)
|
||||
|
||||
val stderr: t
|
||||
(** [stderr] is the standard error. *)
|
||||
|
||||
end
|
||||
|
||||
module Pipe: sig
|
||||
|
||||
type t
|
||||
(** The type for pipes. Could be either uni-directional (normal
|
||||
pipes) or a bi-directional (socket pairs). *)
|
||||
|
||||
val priv: t -> Fd.t
|
||||
(** [priv p] is the private side of the pipe [p]. *)
|
||||
|
||||
val calf: t -> Fd.t
|
||||
(** [calf p] is the calf side of the pipe [p]. *)
|
||||
|
||||
(** {1 Useful Pipes} *)
|
||||
|
||||
val stdout: t
|
||||
(** [stdout] is the uni-directional pipe from the calf's stdout . *)
|
||||
|
||||
val stderr: t
|
||||
(** [stderr] is the uni-directional pipe from the calf's stderr. *)
|
||||
|
||||
val metrics: t
|
||||
(** [metrics] is the uni-directional pipe fomr the calf's metric
|
||||
endpoint. *)
|
||||
|
||||
val ctl: t
|
||||
(** [ctl] is the bi-directional pipe used to exchange control
|
||||
data between the calf and the priv containers. *)
|
||||
|
||||
val net: t
|
||||
(** [net] is the bi-directional pipe used to exchange network
|
||||
traffic between the calf and the priv containers. *)
|
||||
|
||||
end
|
||||
|
||||
val rawlink: ?filter:string -> string -> Lwt_rawlink.t
|
||||
(** [rawlink ?filter i] is the net raw link to the interface [i] using
|
||||
the (optional) BPF filter [filter]. *)
|
||||
|
||||
val run:
|
||||
net:Lwt_rawlink.t ->
|
||||
ctl:(unit -> unit Lwt.t) ->
|
||||
handlers:(unit -> unit Lwt.t) ->
|
||||
string list -> unit Lwt.t
|
||||
(** [run ~net ~ctl ~handlers cmd] runs [cmd] in a unprivileged calf
|
||||
process. [ctl] is the control thread connected to the {Pipe.ctl}
|
||||
pipe. [net] is the net raw link which will be connected to the
|
||||
calf via the {!Pipe.net} socket pair. [handlers] are the system
|
||||
handler thread which will react to control data to perform
|
||||
privileged system actions. *)
|
@ -5,288 +5,6 @@ module Log = (val Logs.src_log src : Logs.LOG)
|
||||
|
||||
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
|
||||
|
||||
type fd = {
|
||||
name: string;
|
||||
fd : Lwt_unix.file_descr;
|
||||
}
|
||||
|
||||
let stdout = { name = "stdout"; fd = Lwt_unix.stdout }
|
||||
let stderr = { name = "stderr"; fd = Lwt_unix.stderr }
|
||||
let stdin = { name = "stdin" ; fd = Lwt_unix.stdin }
|
||||
|
||||
let int_of_fd (fd:Lwt_unix.file_descr) =
|
||||
(Obj.magic (Lwt_unix.unix_file_descr fd): int)
|
||||
|
||||
let pp_fd ppf fd = Fmt.pf ppf "%s:%d" fd.name (int_of_fd fd.fd)
|
||||
|
||||
let close fd =
|
||||
Log.debug (fun l -> l "close %a" pp_fd fd);
|
||||
Lwt_unix.close fd.fd
|
||||
|
||||
let dev_null =
|
||||
Lwt_unix.of_unix_file_descr ~blocking:false
|
||||
(Unix.openfile "/dev/null" [Unix.O_RDWR] 0)
|
||||
|
||||
let close_and_dup fd =
|
||||
Log.debug (fun l -> l "close-and-dup %a" pp_fd fd);
|
||||
Lwt_unix.close fd.fd >>= fun () ->
|
||||
Lwt_unix.dup2 dev_null fd.fd;
|
||||
Lwt_unix.close dev_null
|
||||
|
||||
let dup2 ~src ~dst =
|
||||
Log.debug (fun l -> l "dup2 %a => %a" pp_fd src pp_fd dst);
|
||||
Lwt_unix.dup2 src.fd dst.fd;
|
||||
close src
|
||||
|
||||
let proxy_rawlink ~rawlink ~fd =
|
||||
Log.debug (fun l -> l "proxy-netif eth0 <=> %a" pp_fd fd);
|
||||
let rec listen_rawlink () =
|
||||
Lwt_rawlink.read_packet rawlink >>= fun buf ->
|
||||
Log.debug (fun l -> l "PROXY-NETIF: => %a" Cstruct.hexdump_pp buf);
|
||||
Log.debug (fun l -> l "PROXY-NETIF: => %S" (Cstruct.to_string buf));
|
||||
let rec write buf =
|
||||
Lwt_cstruct.write fd.fd buf >>= function
|
||||
| 0 -> Lwt.return_unit
|
||||
| n -> write (Cstruct.shift buf n)
|
||||
in
|
||||
write buf >>= fun () ->
|
||||
listen_rawlink ()
|
||||
in
|
||||
let listen_socket () =
|
||||
let len = 16 * 1024 in
|
||||
let buf = Cstruct.create len in
|
||||
let rec loop () =
|
||||
Lwt_cstruct.read fd.fd buf >>= fun len ->
|
||||
let buf = Cstruct.sub buf 0 len in
|
||||
Log.debug (fun l -> l "PROXY-NETIF: <= %a" Cstruct.hexdump_pp buf);
|
||||
Lwt_rawlink.send_packet rawlink buf >>= fun () ->
|
||||
loop ()
|
||||
in
|
||||
loop ()
|
||||
in
|
||||
Lwt.pick [
|
||||
listen_rawlink ();
|
||||
listen_socket ();
|
||||
]
|
||||
|
||||
let rec really_write dst buf off len =
|
||||
match len with
|
||||
| 0 -> Lwt.return_unit
|
||||
| len ->
|
||||
Lwt_unix.write dst.fd buf off len >>= fun n ->
|
||||
really_write dst buf (off+n) (len-n)
|
||||
|
||||
let forward ~src ~dst =
|
||||
Log.debug (fun l -> l "forward %a => %a" pp_fd src pp_fd dst);
|
||||
let len = 16 * 1024 in
|
||||
let buf = Bytes.create len in
|
||||
let rec loop () =
|
||||
Lwt_unix.read src.fd buf 0 len >>= fun len ->
|
||||
if len = 0 then
|
||||
(* FIXME: why this ever happen *)
|
||||
Fmt.kstrf Lwt.fail_with "FORWARD[%a => %a]: EOF" pp_fd src pp_fd dst
|
||||
else (
|
||||
Log.debug (fun l ->
|
||||
l "FORWARD[%a => %a]: %S (%d)"
|
||||
pp_fd src pp_fd dst (Bytes.sub buf 0 len) len);
|
||||
really_write dst buf 0 len >>= fun () ->
|
||||
loop ()
|
||||
)
|
||||
in
|
||||
loop ()
|
||||
|
||||
let proxy x y =
|
||||
Lwt.pick [
|
||||
forward ~src:x ~dst:y;
|
||||
forward ~src:y ~dst:x;
|
||||
]
|
||||
|
||||
(* Prepare the fd space before we fork to run the calf *)
|
||||
|
||||
let socketpair name =
|
||||
let priv, calf = Lwt_unix.(socketpair PF_UNIX SOCK_STREAM 0) in
|
||||
Lwt_unix.clear_close_on_exec priv;
|
||||
Lwt_unix.clear_close_on_exec calf;
|
||||
{ name = name; fd = priv }, { name = name ^ "-calf"; fd = calf }
|
||||
|
||||
let pipe name =
|
||||
let priv, calf = Lwt_unix.pipe () in
|
||||
Lwt_unix.clear_close_on_exec priv;
|
||||
Lwt_unix.clear_close_on_exec calf;
|
||||
{ name = name; fd = priv }, { name = name ^ "-calf"; fd = calf }
|
||||
|
||||
(* logs pipe *)
|
||||
let logs_out = pipe "logs-out"
|
||||
let logs_err = pipe "logs-err"
|
||||
|
||||
(* store pipe *)
|
||||
let store = socketpair "store"
|
||||
|
||||
(* network pipe *)
|
||||
let net = socketpair "net"
|
||||
|
||||
(* metrics pipe *)
|
||||
(* let metrics = make "metrics" *)
|
||||
|
||||
let child cmd =
|
||||
close_and_dup stdin >>= fun () ->
|
||||
|
||||
(* close parent fds *)
|
||||
close (fst logs_out) >>= fun () ->
|
||||
close (fst logs_err) >>= fun () ->
|
||||
close (fst store) >>= fun () ->
|
||||
close (fst net) >>= fun () ->
|
||||
(*
|
||||
close (fst metrics) >>= fun () ->
|
||||
*)
|
||||
|
||||
let cmds = String.concat " " cmd in
|
||||
Log.info (fun l -> l "Executing %s" cmds);
|
||||
Log.debug (fun l ->
|
||||
l "net-fd=%a store-fd=%a" pp_fd (snd net) pp_fd (snd store));
|
||||
|
||||
dup2 ~src:(snd logs_out) ~dst:stdout >>= fun () ->
|
||||
dup2 ~src:(snd logs_err) ~dst:stderr >>= fun () ->
|
||||
|
||||
(* exec the calf *)
|
||||
Unix.execve (List.hd cmd) (Array.of_list cmd) [||]
|
||||
|
||||
module Store = struct
|
||||
|
||||
(* FIXME: to avoid linking with gmp *)
|
||||
module IO = struct
|
||||
type ic = unit
|
||||
type oc = unit
|
||||
type ctx = unit
|
||||
let with_connection ?ctx:_ _uri ?init:_ _f = Lwt.fail_with "not allowed"
|
||||
let read_all _ic = Lwt.fail_with "not allowed"
|
||||
let read_exactly _ic _n = Lwt.fail_with "not allowed"
|
||||
let write _oc _buf = Lwt.fail_with "not allowed"
|
||||
let flush _oc = Lwt.fail_with "not allowed"
|
||||
let ctx () = Lwt.return_none
|
||||
end
|
||||
|
||||
(* FIXME: we don't use Irmin_unix.Git.FS.KV to avoid linking with gmp *)
|
||||
module Store = Irmin_git.FS.KV(IO)(Inflator)(Io_fs)
|
||||
module KV = Store(Irmin.Contents.String)
|
||||
|
||||
let client () =
|
||||
let config = Irmin_git.config "/data" in
|
||||
KV.Repo.v config >>= fun repo ->
|
||||
KV.of_branch repo "calf"
|
||||
|
||||
let set_listen_dir_hook () =
|
||||
Irmin.Private.Watch.set_listen_dir_hook Irmin_watcher.hook
|
||||
|
||||
module HTTP = struct
|
||||
|
||||
module Wm = struct
|
||||
module Rd = Webmachine.Rd
|
||||
include Webmachine.Make(Cohttp_lwt_unix.Server.IO)
|
||||
end
|
||||
|
||||
let with_key rd f =
|
||||
match KV.Key.of_string rd.Wm.Rd.dispatch_path with
|
||||
| Ok x -> f x
|
||||
| Error _ -> Wm.respond 404 rd
|
||||
|
||||
let infof fmt =
|
||||
Fmt.kstrf (fun msg () ->
|
||||
let date = Int64.of_float (Unix.gettimeofday ()) in
|
||||
Irmin.Info.v ~date ~author:"calf" msg
|
||||
) fmt
|
||||
|
||||
let ok = "{\"status\": \"ok\"}"
|
||||
|
||||
class item db = object(self)
|
||||
|
||||
inherit [Cohttp_lwt_body.t] Wm.resource
|
||||
|
||||
method private of_string rd =
|
||||
Cohttp_lwt_body.to_string rd.Wm.Rd.req_body >>= fun value ->
|
||||
with_key rd (fun key ->
|
||||
let info = infof "Updating %a" KV.Key.pp key in
|
||||
KV.set db ~info key value >>= fun () ->
|
||||
let resp_body = `String ok in
|
||||
let rd = { rd with Wm.Rd.resp_body } in
|
||||
Wm.continue true rd
|
||||
)
|
||||
|
||||
method private to_string rd =
|
||||
with_key rd (fun key ->
|
||||
KV.find db key >>= function
|
||||
| Some value -> Wm.continue (`String value) rd
|
||||
| None -> assert false
|
||||
)
|
||||
|
||||
method resource_exists rd =
|
||||
with_key rd (fun key ->
|
||||
KV.mem db key >>= fun mem ->
|
||||
Wm.continue mem rd
|
||||
)
|
||||
|
||||
method allowed_methods rd =
|
||||
Wm.continue [`GET; `HEAD; `PUT; `DELETE] rd
|
||||
|
||||
method content_types_provided rd =
|
||||
Wm.continue [
|
||||
"plain", self#to_string
|
||||
] rd
|
||||
|
||||
method content_types_accepted rd =
|
||||
Wm.continue [
|
||||
"plain", self#of_string
|
||||
] rd
|
||||
|
||||
method delete_resource rd =
|
||||
with_key rd (fun key ->
|
||||
let info = infof "Deleting %a" KV.Key.pp key in
|
||||
KV.remove db ~info key >>= fun () ->
|
||||
let resp_body = `String ok in
|
||||
Wm.continue true { rd with Wm.Rd.resp_body }
|
||||
)
|
||||
end
|
||||
|
||||
let v db =
|
||||
let routes = [
|
||||
("/ip" , fun () -> new item db);
|
||||
("/domain" , fun () -> new item db);
|
||||
("/search" , fun () -> new item db);
|
||||
("/mtu" , fun () -> new item db);
|
||||
("/nameserver/*", fun () -> new item db);
|
||||
] in
|
||||
let callback (_ch, _conn) request body =
|
||||
let open Cohttp in
|
||||
(Wm.dispatch' routes ~body ~request >|= function
|
||||
| None -> (`Not_found, Header.init (), `String "Not found", [])
|
||||
| Some result -> result)
|
||||
>>= fun (status, headers, body, path) ->
|
||||
Log.info (fun l ->
|
||||
l "%d - %s %s"
|
||||
(Code.code_of_status status)
|
||||
(Code.string_of_method (Request.meth request))
|
||||
(Uri.path (Request.uri request)));
|
||||
Log.debug (fun l -> l "path=%a" Fmt.(Dump.list string) path);
|
||||
(* Finally, send the response to the client *)
|
||||
Cohttp_lwt_unix.Server.respond ~flush:true ~headers ~body ~status ()
|
||||
in
|
||||
(* create the server and handle requests with the function defined above *)
|
||||
let conn_closed (_, conn) =
|
||||
Log.info (fun l ->
|
||||
l "connection %s closed\n%!" (Cohttp.Connection.to_string conn))
|
||||
in
|
||||
Cohttp_lwt_unix.Server.make ~callback ~conn_closed ()
|
||||
end
|
||||
|
||||
let serve () =
|
||||
client () >>= fun db ->
|
||||
let http = HTTP.v db in
|
||||
let fd = fst store in
|
||||
let on_exn e = Log.err (fun l -> l "XXX %a" Fmt.exn e) in
|
||||
Log.info (fun l -> l "serving KV store on %a" pp_fd fd);
|
||||
Cohttp_lwt_unix.Server.create ~on_exn ~mode:(`Fd fd.fd) http
|
||||
|
||||
end
|
||||
|
||||
module Handlers = struct
|
||||
|
||||
@ -298,7 +16,7 @@ module Handlers = struct
|
||||
| _ -> None
|
||||
|
||||
let ip t =
|
||||
Store.KV.watch_key t ["ip"] (fun diff ->
|
||||
Ctl.KV.watch_key t ["ip"] (fun diff ->
|
||||
match contents_of_diff diff with
|
||||
| Some ip ->
|
||||
Log.info (fun l -> l "SET IP to %s" ip);
|
||||
@ -311,8 +29,8 @@ module Handlers = struct
|
||||
ip;
|
||||
]
|
||||
|
||||
let install () =
|
||||
Store.client () >>= fun db ->
|
||||
let watch path =
|
||||
Ctl.v path >>= fun db ->
|
||||
Lwt_list.map_p (fun f -> f db) handlers >>= fun _ ->
|
||||
let t, _ = Lwt.task () in
|
||||
t
|
||||
@ -321,59 +39,21 @@ end
|
||||
|
||||
external bpf_filter: unit -> string = "bpf_filter"
|
||||
|
||||
let rawlink ethif =
|
||||
Log.debug (fun l -> l "bringing up %s" ethif);
|
||||
(try Tuntap.set_up_and_running ethif
|
||||
with e -> Log.err (fun l -> l "rawling: %a" Fmt.exn e));
|
||||
Lwt_rawlink.open_link ~filter:(bpf_filter ()) ethif
|
||||
|
||||
let check_exit_status cmd status =
|
||||
let cmds = String.concat " " cmd in
|
||||
match status with
|
||||
| Unix.WEXITED 0 -> Lwt.return_unit
|
||||
| Unix.WEXITED i -> failf "%s: exit %d" cmds i
|
||||
| Unix.WSIGNALED i -> failf "%s: signal %d" cmds i
|
||||
| Unix.WSTOPPED i -> failf "%s: stopped %d" cmds i
|
||||
|
||||
let parent cmd pid ethif =
|
||||
(* network traffic *)
|
||||
let rawlink = rawlink ethif in
|
||||
|
||||
(* close child fds *)
|
||||
close_and_dup stdin >>= fun () ->
|
||||
close (snd logs_out) >>= fun () ->
|
||||
close (snd logs_err) >>= fun () ->
|
||||
close (snd net) >>= fun () ->
|
||||
close (snd store) >>= fun () ->
|
||||
(*
|
||||
close (snd metrics) >>= fun () ->
|
||||
*)
|
||||
let wait () =
|
||||
Lwt_unix.waitpid [] pid >>= fun (_pid, w) ->
|
||||
Lwt_io.flush_all () >>= fun () ->
|
||||
|
||||
check_exit_status cmd w
|
||||
in
|
||||
Lwt.pick [
|
||||
wait ();
|
||||
(* data *)
|
||||
proxy_rawlink ~rawlink ~fd:(fst net);
|
||||
|
||||
(* redirect the calf stdout to the shim stdout *)
|
||||
forward ~src:(fst logs_out) ~dst:stdout;
|
||||
forward ~src:(fst logs_err) ~dst:stderr;
|
||||
(* metrics: TODO *)
|
||||
|
||||
Store.serve ();
|
||||
Handlers.install ();
|
||||
]
|
||||
|
||||
let run () cmd ethif =
|
||||
let run () cmd ethif path =
|
||||
Lwt_main.run (
|
||||
Lwt_io.flush_all () >>= fun () ->
|
||||
match Lwt_unix.fork () with
|
||||
| 0 -> child cmd
|
||||
| pid -> parent cmd pid ethif
|
||||
let net = Init.rawlink ~filter:(bpf_filter ()) ethif in
|
||||
let routes = [
|
||||
"/ip";
|
||||
"/domain";
|
||||
"/search";
|
||||
"/mtu";
|
||||
"/nameservers/*"
|
||||
] in
|
||||
Ctl.v "/data" >>= fun ctl ->
|
||||
let fd = Init.(Fd.fd @@ Pipe.(priv ctl)) in
|
||||
let ctl () = Ctl.serve ~routes ctl fd in
|
||||
let handlers () = Handlers.watch path in
|
||||
Init.run ~net ~ctl ~handlers cmd
|
||||
)
|
||||
|
||||
(* CLI *)
|
||||
@ -392,6 +72,9 @@ let setup_log style_renderer level =
|
||||
let setup_log =
|
||||
Term.(const setup_log $ Fmt_cli.style_renderer () $ Logs_cli.level ())
|
||||
|
||||
let ctl = string_of_int Init.(Fd.to_int Pipe.(calf ctl))
|
||||
let net = string_of_int Init.(Fd.to_int Pipe.(calf net))
|
||||
|
||||
let cmd =
|
||||
(* FIXME: use runc isolation
|
||||
let default_cmd = [
|
||||
@ -401,7 +84,7 @@ let cmd =
|
||||
] in
|
||||
*)
|
||||
let default_cmd = [
|
||||
"/dhcp-client-calf"; "--store=10"; "--net=12"
|
||||
"/dhcp-client-calf"; "--ctl="^ctl; "--net="^net
|
||||
] in
|
||||
let doc =
|
||||
Arg.info ~docv:"CMD" ~doc:"Command to run the calf process." ["cmd"]
|
||||
@ -421,18 +104,3 @@ let run =
|
||||
let () = match Term.eval run with
|
||||
| `Error _ -> exit 1
|
||||
| _ -> exit 0
|
||||
|
||||
(*
|
||||
|
||||
let kv_store = Unix.pipe ()
|
||||
|
||||
let install_logger () =
|
||||
Logs_syslog_lwt.udp_reporter (Unix.inet_addr_of_string "127.0.0.1") ()
|
||||
>|= fun r ->
|
||||
Logs.set_reporter r
|
||||
|
||||
let () = Lwt_main.run (
|
||||
install_logger () >>= fun () ->
|
||||
fd_of_tap0 >>= fun fd ->
|
||||
)
|
||||
*)
|
||||
|
@ -19,7 +19,7 @@ system:
|
||||
command: [/usr/bin/binfmt, -dir, /etc/binfmt.d/, -mount, /binfmt_misc]
|
||||
- name: dhcp-client
|
||||
network_mode: host
|
||||
image: "mobylinux/dhcp-client:d910ad5712a86cf2848753eecd1b7d07d80ad095"
|
||||
image: "mobylinux/dhcp-client:f6ef2cc4c3bf7dcad643f22fbd3d355af1725105"
|
||||
capabilities:
|
||||
- CAP_NET_ADMIN # to bring eth0 up
|
||||
- CAP_NET_RAW # to read /dev/eth0
|
||||
|
Loading…
Reference in New Issue
Block a user