miragesdk: first cut of a MirageSDK

Today the SDK only contains helper code to create secure Moby services based on
MirageOS. Today the SDK only defines the architecture and the communication
pipes between the privileged service and the calf; the proper communication
API will be specified after we have a few more use-cases.

Signed-off-by: Thomas Gazagnaire <thomas@gazagnaire.org>
This commit is contained in:
Thomas Gazagnaire 2017-03-29 11:59:24 +02:00
parent 13d110e2c7
commit d008e859c6
7 changed files with 860 additions and 0 deletions

View File

@ -0,0 +1,136 @@
open Lwt.Infix
let src = Logs.Src.create "init" ~doc:"Init steps"
module Log = (val Logs.src_log src : Logs.LOG)
(* FIXME: to avoid linking with gmp *)
module IO = struct
type ic = unit
type oc = unit
type ctx = unit
let with_connection ?ctx:_ _uri ?init:_ _f = Lwt.fail_with "not allowed"
let read_all _ic = Lwt.fail_with "not allowed"
let read_exactly _ic _n = Lwt.fail_with "not allowed"
let write _oc _buf = Lwt.fail_with "not allowed"
let flush _oc = Lwt.fail_with "not allowed"
let ctx () = Lwt.return_none
end
(* FIXME: we don't use Irmin_unix.Git.FS.KV to avoid linking with gmp *)
module Store = Irmin_git.FS.KV(IO)(Inflator)(Io_fs)
module KV = Store(Irmin.Contents.String)
let v path =
let config = Irmin_git.config path in
KV.Repo.v config >>= fun repo ->
KV.of_branch repo "calf"
let () =
Irmin.Private.Watch.set_listen_dir_hook Irmin_watcher.hook
module Dispatch = struct
module Wm = struct
module Rd = Webmachine.Rd
include Webmachine.Make(Cohttp_lwt_unix.Server.IO)
end
let with_key rd f =
match KV.Key.of_string rd.Wm.Rd.dispatch_path with
| Ok x -> f x
| Error _ -> Wm.respond 404 rd
let infof fmt =
Fmt.kstrf (fun msg () ->
let date = Int64.of_float (Unix.gettimeofday ()) in
Irmin.Info.v ~date ~author:"calf" msg
) fmt
let ok = "{\"status\": \"ok\"}"
class item db = object(self)
inherit [Cohttp_lwt_body.t] Wm.resource
method private of_string rd =
Cohttp_lwt_body.to_string rd.Wm.Rd.req_body >>= fun value ->
with_key rd (fun key ->
let info = infof "Updating %a" KV.Key.pp key in
KV.set db ~info key value >>= fun () ->
let resp_body = `String ok in
let rd = { rd with Wm.Rd.resp_body } in
Wm.continue true rd
)
method private to_string rd =
with_key rd (fun key ->
KV.find db key >>= function
| Some value -> Wm.continue (`String value) rd
| None -> assert false
)
method resource_exists rd =
with_key rd (fun key ->
KV.mem db key >>= fun mem ->
Wm.continue mem rd
)
method allowed_methods rd =
Wm.continue [`GET; `HEAD; `PUT; `DELETE] rd
method content_types_provided rd =
Wm.continue [
"plain", self#to_string
] rd
method content_types_accepted rd =
Wm.continue [
"plain", self#of_string
] rd
method delete_resource rd =
with_key rd (fun key ->
let info = infof "Deleting %a" KV.Key.pp key in
KV.remove db ~info key >>= fun () ->
let resp_body = `String ok in
Wm.continue true { rd with Wm.Rd.resp_body }
)
end
let v db routes =
let routes = List.map (fun r -> r, fun () -> new item db) routes in
let callback (_ch, _conn) request body =
let open Cohttp in
(Wm.dispatch' routes ~body ~request >|= function
| None -> (`Not_found, Header.init (), `String "Not found", [])
| Some result -> result)
>>= fun (status, headers, body, path) ->
Log.info (fun l ->
l "%d - %s %s"
(Code.code_of_status status)
(Code.string_of_method (Request.meth request))
(Uri.path (Request.uri request)));
Log.debug (fun l -> l "path=%a" Fmt.(Dump.list string) path);
(* Finally, send the response to the client *)
Cohttp_lwt_unix.Server.respond ~headers ~body ~status ()
in
(* create the server and handle requests with the function defined above *)
let conn_closed (_, conn) =
Log.info (fun l ->
l "connection %s closed\n%!" (Cohttp.Connection.to_string conn))
in
Cohttp_lwt_unix.Server.make ~callback ~conn_closed ()
end
let int_of_fd (t:Lwt_unix.file_descr) =
(Obj.magic (Lwt_unix.unix_file_descr t): int)
let serve ~routes db fd =
let http = Dispatch.v db routes in
let on_exn e = Log.err (fun l -> l "ERROR: %a" Fmt.exn e) in
Lwt_unix.blocking fd >>= fun blocking ->
Log.debug (fun l ->
l "Serving the control state over fd:%d (blocking=%b)"
(int_of_fd fd) blocking
);
Cohttp_lwt_unix.Server.create ~on_exn ~mode:(`Fd fd) http

View File

@ -0,0 +1,16 @@
(** [Control] handle the server part of the control path, running in
the privileged container. *)
module KV: Irmin.KV with type contents = string
val v: string -> KV.t Lwt.t
(** [v p] is the KV store storing the control state, located at path
[p] in the filesystem of the privileged container. *)
val serve: routes:string list -> KV.t -> Lwt_unix.file_descr -> unit Lwt.t
(** [serve ~routes kv fd] is the thread exposing the KV store [kv],
holding control state, running inside the privileged container.
[routes] are the routes exposed by the server (currently over a
simple HTTP server -- but will change to something else later,
probably protobuf) to the calf and [kv] is the control state
handler. *)

View File

@ -0,0 +1,47 @@
(* https://github.com/Engil/Canopy/blob/3b5573ad0be0fa729b6d4e1ca8b9bb348e164960/inflator.ml *)
let input_buffer = Bytes.create 0xFFFF
let output_buffer = Bytes.create 0xFFFF
let window = Decompress.Window.create ~proof:Decompress.B.proof_bytes
let deflate ?(level = 4) buff =
let pos = ref 0 in
let res = Buffer.create (Cstruct.len buff) in
Decompress.Deflate.bytes input_buffer output_buffer
(fun input_buffer -> function
| Some _ ->
let n = min 0xFFFF (Cstruct.len buff - !pos) in
Cstruct.blit_to_bytes buff !pos input_buffer 0 n;
pos := !pos + n;
n
| None ->
let n = min 0xFFFF (Cstruct.len buff - !pos) in
Cstruct.blit_to_bytes buff !pos input_buffer 0 n;
pos := !pos + n;
n)
(fun output_buffer len ->
Buffer.add_subbytes res output_buffer 0 len;
0xFFFF)
(Decompress.Deflate.default ~proof:Decompress.B.proof_bytes level)
|> function
| Ok _ -> Cstruct.of_string (Buffer.contents res)
| Error _ -> failwith "Deflate.deflate"
let inflate ?output_size orig =
let res = Buffer.create (match output_size with
| Some len -> len
| None -> Mstruct.length orig)
in
Decompress.Inflate.bytes input_buffer output_buffer
(fun input_buffer ->
let n = min 0xFFFF (Mstruct.length orig) in
let s = Mstruct.get_string orig n in
Bytes.blit_string s 0 input_buffer 0 n;
n)
(fun output_buffer len ->
Buffer.add_subbytes res output_buffer 0 len;
0xFFFF)
(Decompress.Inflate.default (Decompress.Window.reset window))
|> function
| Ok _ -> Some (Mstruct.of_string (Buffer.contents res))
| Error _ -> None

View File

@ -0,0 +1,217 @@
open Lwt.Infix
let src = Logs.Src.create "init" ~doc:"Init steps"
module Log = (val Logs.src_log src : Logs.LOG)
let failf fmt = Fmt.kstrf Lwt.fail_with fmt
module Fd = struct
type t = {
name: string;
fd : Lwt_unix.file_descr;
}
let fd t = t.fd
let stdout = { name = "stdout"; fd = Lwt_unix.stdout }
let stderr = { name = "stderr"; fd = Lwt_unix.stderr }
let stdin = { name = "stdin" ; fd = Lwt_unix.stdin }
let to_int t =
(Obj.magic (Lwt_unix.unix_file_descr t.fd): int)
let pp ppf fd = Fmt.pf ppf "%s:%d" fd.name (to_int fd)
let close fd =
Log.debug (fun l -> l "close %a" pp fd);
Lwt_unix.close fd.fd
let dev_null =
Lwt_unix.of_unix_file_descr ~blocking:false
(Unix.openfile "/dev/null" [Unix.O_RDWR] 0)
let redirect_to_dev_null fd =
Log.debug (fun l -> l "redirect-stdin-to-dev-null");
Lwt_unix.close fd.fd >>= fun () ->
Lwt_unix.dup2 dev_null fd.fd;
Lwt_unix.close dev_null
let dup2 ~src ~dst =
Log.debug (fun l -> l "dup2 %a => %a" pp src pp dst);
Lwt_unix.dup2 src.fd dst.fd;
close src
let proxy_net ~net fd =
Log.debug (fun l -> l "proxy-net eth0 <=> %a" pp fd);
let rec listen_rawlink () =
Lwt_rawlink.read_packet net >>= fun buf ->
Log.debug (fun l -> l "PROXY-NET: => %a" Cstruct.hexdump_pp buf);
Log.debug (fun l -> l "PROXY-NET: => %S" (Cstruct.to_string buf));
let rec write buf =
Lwt_cstruct.write fd.fd buf >>= function
| 0 -> Lwt.return_unit
| n -> write (Cstruct.shift buf n)
in
write buf >>= fun () ->
listen_rawlink ()
in
let listen_socket () =
let len = 16 * 1024 in
let buf = Cstruct.create len in
let rec loop () =
Lwt_cstruct.read fd.fd buf >>= fun len ->
let buf = Cstruct.sub buf 0 len in
Log.debug (fun l -> l "PROXY-NET: <= %a" Cstruct.hexdump_pp buf);
Lwt_rawlink.send_packet net buf >>= fun () ->
loop ()
in
loop ()
in
Lwt.pick [
listen_rawlink ();
listen_socket ();
]
let rec really_write dst buf off len =
match len with
| 0 -> Lwt.return_unit
| len ->
Lwt_unix.write dst.fd buf off len >>= fun n ->
really_write dst buf (off+n) (len-n)
let forward ~src ~dst =
Log.debug (fun l -> l "forward %a => %a" pp src pp dst);
let len = 16 * 1024 in
let buf = Bytes.create len in
let rec loop () =
Lwt_unix.read src.fd buf 0 len >>= fun len ->
if len = 0 then
(* FIXME: why this ever happen *)
Fmt.kstrf Lwt.fail_with "FORWARD[%a => %a]: EOF" pp src pp dst
else (
Log.debug (fun l ->
l "FORWARD[%a => %a]: %S (%d)"
pp src pp dst (Bytes.sub buf 0 len) len);
really_write dst buf 0 len >>= fun () ->
loop ()
)
in
loop ()
let proxy x y =
Lwt.pick [
forward ~src:x ~dst:y;
forward ~src:y ~dst:x;
]
end
module Pipe = struct
type t = Fd.t * Fd.t
let priv = fst
let calf = snd
let socketpair name =
let priv, calf = Lwt_unix.(socketpair PF_UNIX SOCK_STREAM 0) in
Lwt_unix.clear_close_on_exec priv;
Lwt_unix.clear_close_on_exec calf;
{ Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf }
let pipe name =
let priv, calf = Lwt_unix.pipe () in
Lwt_unix.clear_close_on_exec priv;
Lwt_unix.clear_close_on_exec calf;
{ Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf }
(* logs pipe *)
let stdout = pipe "stdout"
let stderr = pipe "stderr"
(* store pipe *)
let ctl = socketpair "ctl"
(* network pipe *)
let net = socketpair "net"
(* metrics pipe *)
let metrics = pipe "metrics"
end
let exec_calf cmd =
Fd.(redirect_to_dev_null stdin) >>= fun () ->
(* close parent fds *)
Fd.close Pipe.(priv stdout) >>= fun () ->
Fd.close Pipe.(priv stderr) >>= fun () ->
Fd.close Pipe.(priv ctl) >>= fun () ->
Fd.close Pipe.(priv net) >>= fun () ->
Fd.close Pipe.(priv metrics) >>= fun () ->
let cmds = String.concat " " cmd in
let calf_net = Pipe.(calf net) in
let calf_ctl = Pipe.(calf ctl) in
let calf_stdout = Pipe.(calf stdout) in
let calf_stderr = Pipe.(calf stderr) in
Log.info (fun l -> l "Executing %s" cmds);
Log.debug (fun l -> l "net-fd=%a store-fd=%a" Fd.pp calf_net Fd.pp calf_ctl);
Fd.dup2 ~src:calf_stdout ~dst:Fd.stdout >>= fun () ->
Fd.dup2 ~src:calf_stderr ~dst:Fd.stderr >>= fun () ->
(* exec the calf *)
Unix.execve (List.hd cmd) (Array.of_list cmd) [||]
let rawlink ?filter ethif =
Log.debug (fun l -> l "bringing up %s" ethif);
(try Tuntap.set_up_and_running ethif
with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e));
Lwt_rawlink.open_link ?filter ethif
let check_exit_status cmd status =
let cmds = String.concat " " cmd in
match status with
| Unix.WEXITED 0 -> Lwt.return_unit
| Unix.WEXITED i -> failf "%s: exit %d" cmds i
| Unix.WSIGNALED i -> failf "%s: signal %d" cmds i
| Unix.WSTOPPED i -> failf "%s: stopped %d" cmds i
let exec_priv ~pid ~cmd ~net ~ctl ~handlers =
Fd.(redirect_to_dev_null stdin) >>= fun () ->
(* close child fds *)
Fd.close Pipe.(calf stdout) >>= fun () ->
Fd.close Pipe.(calf stderr) >>= fun () ->
Fd.close Pipe.(calf net) >>= fun () ->
Fd.close Pipe.(calf ctl) >>= fun () ->
Fd.close Pipe.(calf metrics) >>= fun () ->
let wait () =
Lwt_unix.waitpid [] pid >>= fun (_pid, w) ->
Lwt_io.flush_all () >>= fun () ->
check_exit_status cmd w
in
Lwt.pick ([
wait ();
(* data *)
Fd.proxy_net ~net Pipe.(priv net);
(* redirect the calf stdout to the shim stdout *)
Fd.forward ~src:Pipe.(priv stdout) ~dst:Fd.stdout;
Fd.forward ~src:Pipe.(priv stderr) ~dst:Fd.stderr;
(* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *)
ctl ();
(* handlers (); *)
])
let run ~net ~ctl ~handlers cmd =
Lwt_io.flush_all () >>= fun () ->
match Lwt_unix.fork () with
| 0 -> exec_calf cmd
| pid -> exec_priv ~pid ~cmd ~net ~ctl ~handlers

View File

@ -0,0 +1,109 @@
(** Init functions.
[Init] contains funcitons to initialise the state of the
privileged container.
{ul
{- fowrard and filter the network traffic using BPF (for instance
to allow only DHCP traffic).}
{- open pipes for forwarding the calf's stdout and stderr
to the privileged container's ones.}
{- open a pipe to forward the metrics.}
{- open a socket pair with the calf to be able to transmit control
data, e.g. the IP address once a DHCP lease is obtained.}
}*)
module Fd: sig
type t
(** The type for file descriptors. *)
val pp: t Fmt.t
(** [pp_fd] pretty prints a file descriptor. *)
val fd: t -> Lwt_unix.file_descr
(** [fd t] is [t]'s underlying unix file descriptor. *)
val to_int: t -> int
(** [to_int fd] is [fd]'s number. *)
val redirect_to_dev_null: t -> unit Lwt.t
(** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *)
val close: t -> unit Lwt.t
(** [close fd] closes [fd]. *)
val dup2: src:t -> dst:t -> unit Lwt.t
(** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *)
val proxy_net: net:Lwt_rawlink.t -> t -> unit Lwt.t
(** [proxy_net ~net fd] proxies the traffic between the raw net link
[net] and [fd]. *)
val forward: src:t -> dst:t -> unit Lwt.t
(** [forward ~src ~dst] forwards the flow from [src] to [dst]. *)
(** {1 Usefull File Descriptors} *)
val stdin: t
(** [stdin] is the standart input. *)
val stdout: t
(** [stdout] is the standard output. *)
val stderr: t
(** [stderr] is the standard error. *)
end
module Pipe: sig
type t
(** The type for pipes. Could be either uni-directional (normal
pipes) or a bi-directional (socket pairs). *)
val priv: t -> Fd.t
(** [priv p] is the private side of the pipe [p]. *)
val calf: t -> Fd.t
(** [calf p] is the calf side of the pipe [p]. *)
(** {1 Useful Pipes} *)
val stdout: t
(** [stdout] is the uni-directional pipe from the calf's stdout . *)
val stderr: t
(** [stderr] is the uni-directional pipe from the calf's stderr. *)
val metrics: t
(** [metrics] is the uni-directional pipe fomr the calf's metric
endpoint. *)
val ctl: t
(** [ctl] is the bi-directional pipe used to exchange control
data between the calf and the priv containers. *)
val net: t
(** [net] is the bi-directional pipe used to exchange network
traffic between the calf and the priv containers. *)
end
val rawlink: ?filter:string -> string -> Lwt_rawlink.t
(** [rawlink ?filter i] is the net raw link to the interface [i] using
the (optional) BPF filter [filter]. *)
val run:
net:Lwt_rawlink.t ->
ctl:(unit -> unit Lwt.t) ->
handlers:(unit -> unit Lwt.t) ->
string list -> unit Lwt.t
(** [run ~net ~ctl ~handlers cmd] runs [cmd] in a unprivileged calf
process. [ctl] is the control thread connected to the {Pipe.ctl}
pipe. [net] is the net raw link which will be connected to the
calf via the {!Pipe.net} socket pair. [handlers] are the system
handler thread which will react to control data to perform
privileged system actions. *)

View File

@ -0,0 +1,325 @@
(* from irmin-unix, to avoid linking with gmp ... *)
module Log = struct
let src = Logs.Src.create "git.unix" ~doc:"logs git's unix events"
include (val Logs.src_log src : Logs.LOG)
end
open Lwt.Infix
let mkdir_pool = Lwt_pool.create 1 (fun () -> Lwt.return_unit)
let protect_unix_exn = function
| Unix.Unix_error _ as e -> Lwt.fail (Failure (Printexc.to_string e))
| e -> Lwt.fail e
let ignore_enoent = function
| Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return_unit
| e -> Lwt.fail e
let protect f x = Lwt.catch (fun () -> f x) protect_unix_exn
let safe f x = Lwt.catch (fun () -> f x) ignore_enoent
let mkdir dirname =
let rec aux dir =
if Sys.file_exists dir && Sys.is_directory dir then Lwt.return_unit
else (
let clear =
if Sys.file_exists dir then (
Log.debug (fun l ->
l "%s already exists but is a file, removing." dir);
safe Lwt_unix.unlink dir
) else
Lwt.return_unit
in
clear >>= fun () ->
aux (Filename.dirname dir) >>= fun () ->
Log.debug (fun l -> l "mkdir %s" dir);
protect (Lwt_unix.mkdir dir) 0o755;
) in
Lwt_pool.use mkdir_pool (fun () -> aux dirname)
let file_exists f =
Lwt.catch (fun () -> Lwt_unix.file_exists f) (function
(* See https://github.com/ocsigen/lwt/issues/316 *)
| Unix.Unix_error (Unix.ENOTDIR, _, _) -> Lwt.return_false
| e -> Lwt.fail e)
module Lock = struct
let is_stale max_age file =
file_exists file >>= fun exists ->
if exists then (
Lwt.catch (fun () ->
Lwt_unix.stat file >>= fun s ->
let stale = Unix.gettimeofday () -. s.Unix.st_mtime > max_age in
Lwt.return stale)
(function
| Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return false
| e -> Lwt.fail e)
) else
Lwt.return false
let unlock file =
Lwt_unix.unlink file
let lock ?(max_age = 10. *. 60. (* 10 minutes *)) ?(sleep = 0.001) file =
let rec aux i =
Log.debug (fun f -> f "lock %s %d" file i);
is_stale max_age file >>= fun is_stale ->
if is_stale then (
Log.err (fun f -> f "%s is stale, removing it." file);
unlock file >>= fun () ->
aux 1
) else
let create () =
let pid = Unix.getpid () in
mkdir (Filename.dirname file) >>= fun () ->
Lwt_unix.openfile file [Unix.O_CREAT; Unix.O_RDWR; Unix.O_EXCL] 0o600
>>= fun fd ->
let oc = Lwt_io.of_fd ~mode:Lwt_io.Output fd in
Lwt_io.write_int oc pid >>= fun () ->
Lwt_unix.close fd
in
Lwt.catch create (function
| Unix.Unix_error(Unix.EEXIST, _, _) ->
let backoff = 1. +. Random.float (let i = float i in i *. i) in
Lwt_unix.sleep (sleep *. backoff) >>= fun () ->
aux (i+1)
| e -> Lwt.fail e)
in
aux 1
let with_lock file fn =
match file with
| None -> fn ()
| Some f -> lock f >>= fun () -> Lwt.finalize fn (fun () -> unlock f)
end
let mmap_threshold = 4096
let openfile_pool = Lwt_pool.create 200 (fun () -> Lwt.return_unit)
let mkdir = mkdir
type path = string
(* we use file locking *)
type lock = path
let lock_file x = x
let file_exists = file_exists
let list_files kind dir =
if Sys.file_exists dir && Sys.is_directory dir then
let d = Sys.readdir dir in
let d = Array.to_list d in
let d = List.map (Filename.concat dir) d in
let d = List.filter kind d in
let d = List.sort String.compare d in
Lwt.return d
else
Lwt.return_nil
let directories dir =
list_files (fun f ->
try Sys.is_directory f with Sys_error _ -> false
) dir
let files dir =
list_files (fun f ->
try not (Sys.is_directory f) with Sys_error _ -> false
) dir
let write_cstruct fd b =
let rec rwrite fd buf ofs len =
Lwt_bytes.write fd buf ofs len >>= fun n ->
if len = 0 then Lwt.fail End_of_file
else if n < len then rwrite fd buf (ofs + n) (len - n)
else Lwt.return_unit in
match Cstruct.len b with
| 0 -> Lwt.return_unit
| len -> rwrite fd (Cstruct.to_bigarray b) 0 len
let delays = Array.init 20 (fun i -> 0.1 *. (float i) ** 2.)
let command fmt =
Printf.ksprintf (fun str ->
Log.debug (fun l -> l "[exec] %s" str);
let i = Sys.command str in
if i <> 0 then Log.debug (fun l -> l "[exec] error %d" i);
Lwt.return_unit
) fmt
let remove_dir dir =
if Sys.os_type = "Win32" then
command "cmd /d /v:off /c rd /s /q %S" dir
else
command "rm -rf %S" dir
let remove_file ?lock file =
Lock.with_lock lock (fun () ->
Lwt.catch
(fun () -> Lwt_unix.unlink file)
(function
(* On Windows, [EACCES] can also occur in an attempt to
rename a file or directory or to remove an existing
directory. *)
| Unix.Unix_error (Unix.EACCES, _, _)
| Unix.Unix_error (Unix.EISDIR, _, _) -> remove_dir file
| Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return_unit
| e -> Lwt.fail e)
)
let rename =
if Sys.os_type <> "Win32" then Lwt_unix.rename
else
fun tmp file ->
let rec aux i =
Lwt.catch
(fun () -> Lwt_unix.rename tmp file)
(function
(* On Windows, [EACCES] can also occur in an attempt to
rename a file or directory or to remove an existing
directory. *)
| Unix.Unix_error (Unix.EACCES, _, _) as e ->
if i >= Array.length delays then Lwt.fail e
else (
file_exists file >>= fun exists ->
if exists && Sys.is_directory file then (
remove_dir file >>= fun () -> aux (i+1)
) else (
Log.debug (fun l ->
l "Got EACCES, retrying in %.1fs" delays.(i));
Lwt_unix.sleep delays.(i) >>= fun () -> aux (i+1)
))
| e -> Lwt.fail e)
in
aux 0
let with_write_file ?temp_dir file fn =
begin match temp_dir with
| None -> Lwt.return_unit
| Some d -> mkdir d
end >>= fun () ->
let dir = Filename.dirname file in
mkdir dir >>= fun () ->
let tmp = Filename.temp_file ?temp_dir (Filename.basename file) "write" in
Lwt_pool.use openfile_pool (fun () ->
Log.debug (fun l -> l "Writing %s (%s)" file tmp);
Lwt_unix.(openfile tmp [O_WRONLY; O_NONBLOCK; O_CREAT; O_TRUNC] 0o644)
>>= fun fd ->
Lwt.finalize (fun () -> protect fn fd) (fun () -> Lwt_unix.close fd)
>>= fun () ->
rename tmp file
)
let read_file_with_read file size =
let chunk_size = max 4096 (min size 0x100000) in
let buf = Cstruct.create size in
let flags = [Unix.O_RDONLY] in
let perm = 0o0 in
Lwt_unix.openfile file flags perm >>= fun fd ->
let rec aux off =
let read_size = min chunk_size (size - off) in
Lwt_bytes.read fd buf.Cstruct.buffer off read_size >>= fun read ->
(* It should test for read = 0 in case size is larger than the
real size of the file. This may happen for instance if the
file was truncated while reading. *)
let off = off + read in
if off >= size then
Lwt.return buf
else
aux off
in
Lwt.finalize (fun () -> aux 0)
(fun () -> Lwt_unix.close fd)
let read_file_with_mmap file =
let fd = Unix.(openfile file [O_RDONLY; O_NONBLOCK] 0o644) in
let ba = Lwt_bytes.map_file ~fd ~shared:false () in
Unix.close fd;
Lwt.return (Cstruct.of_bigarray ba)
let read_file file =
Lwt.catch (fun () ->
Lwt_pool.use openfile_pool (fun () ->
Log.debug (fun l -> l "Reading %s" file);
Lwt_unix.stat file >>= fun stats ->
let size = stats.Lwt_unix.st_size in
(if size >= mmap_threshold then read_file_with_mmap file
else read_file_with_read file size
) >|= fun buf ->
Some buf
)
) (function
| Unix.Unix_error _ | Sys_error _ -> Lwt.return_none
| e -> Lwt.fail e)
let stat_info_unsafe path =
let open Git.Index in
let stats = Unix.stat path in
let ctime = { lsb32 = Int32.of_float stats.Unix.st_ctime; nsec = 0l } in
let mtime = { lsb32 = Int32.of_float stats.Unix.st_mtime; nsec = 0l } in
let dev = Int32.of_int stats.Unix.st_dev in
let inode = Int32.of_int stats.Unix.st_ino in
let mode = match stats.Unix.st_kind, stats.Unix.st_perm with
| Unix.S_REG, p -> if p land 0o100 = 0o100 then `Exec else `Normal
| Unix.S_LNK, _ -> `Link
| k, p ->
let kind = match k with
| Unix.S_REG -> "REG"
| Unix.S_DIR -> "DIR"
| Unix.S_CHR -> "CHR"
| Unix.S_BLK -> "BLK"
| Unix.S_LNK -> "LNK"
| Unix.S_FIFO -> "FIFO"
| Unix.S_SOCK -> "SOCK"
in
let perm = Printf.sprintf "%o" p in
let error =
Printf.sprintf "%s: not supported kind of file [%s, %s]."
path kind perm
in
failwith error
in
let uid = Int32.of_int stats.Unix.st_uid in
let gid = Int32.of_int stats.Unix.st_gid in
let size = Int32.of_int stats.Unix.st_size in
{ ctime; mtime; dev; inode; uid; gid; mode; size }
let stat_info path =
Lwt.catch (fun () -> Lwt.return (Some (stat_info_unsafe path))) (function
| Sys_error _ | Unix.Unix_error _ -> Lwt.return_none
| e -> Lwt.fail e)
let chmod ?lock f `Exec =
Lock.with_lock lock (fun () -> Lwt_unix.chmod f 0o755)
let write_file ?temp_dir ?lock file b =
let write () =
with_write_file file ?temp_dir (fun fd -> write_cstruct fd b)
in
Lock.with_lock lock (fun () ->
Lwt.catch write (function
| Unix.Unix_error (Unix.EISDIR, _, _) -> remove_dir file >>= write
| e -> Lwt.fail e
)
)
let test_and_set_file ?temp_dir ~lock file ~test ~set =
Lock.with_lock (Some lock) (fun () ->
read_file file >>= fun v ->
let equal = match test, v with
| None , None -> true
| Some x, Some y -> Cstruct.equal x y
| _ -> false
in
if not equal then Lwt.return false
else
(match set with
| None -> remove_file file
| Some v -> write_file ?temp_dir file v)
>|= fun () ->
true
)

View File

@ -0,0 +1,10 @@
(jbuild_version 1)
(library
((name sdk)
(libraries (logs-syslog.lwt threads cohttp.lwt cstruct.lwt
cmdliner fmt.cli logs.fmt logs.cli fmt.tty decompress
irmin irmin-git irmin-http lwt.unix rawlink tuntap
irmin-watcher inotify))
(preprocess (per_file ((pps (cstruct.ppx)) (ctl))))
))