diff --git a/projects/miragesdk/src/sdk/ctl.ml b/projects/miragesdk/src/sdk/ctl.ml new file mode 100644 index 000000000..feadedf35 --- /dev/null +++ b/projects/miragesdk/src/sdk/ctl.ml @@ -0,0 +1,136 @@ +open Lwt.Infix + +let src = Logs.Src.create "init" ~doc:"Init steps" +module Log = (val Logs.src_log src : Logs.LOG) + +(* FIXME: to avoid linking with gmp *) +module IO = struct + type ic = unit + type oc = unit + type ctx = unit + let with_connection ?ctx:_ _uri ?init:_ _f = Lwt.fail_with "not allowed" + let read_all _ic = Lwt.fail_with "not allowed" + let read_exactly _ic _n = Lwt.fail_with "not allowed" + let write _oc _buf = Lwt.fail_with "not allowed" + let flush _oc = Lwt.fail_with "not allowed" + let ctx () = Lwt.return_none +end + +(* FIXME: we don't use Irmin_unix.Git.FS.KV to avoid linking with gmp *) +module Store = Irmin_git.FS.KV(IO)(Inflator)(Io_fs) +module KV = Store(Irmin.Contents.String) + +let v path = + let config = Irmin_git.config path in + KV.Repo.v config >>= fun repo -> + KV.of_branch repo "calf" + +let () = + Irmin.Private.Watch.set_listen_dir_hook Irmin_watcher.hook + +module Dispatch = struct + + module Wm = struct + module Rd = Webmachine.Rd + include Webmachine.Make(Cohttp_lwt_unix.Server.IO) + end + + let with_key rd f = + match KV.Key.of_string rd.Wm.Rd.dispatch_path with + | Ok x -> f x + | Error _ -> Wm.respond 404 rd + + let infof fmt = + Fmt.kstrf (fun msg () -> + let date = Int64.of_float (Unix.gettimeofday ()) in + Irmin.Info.v ~date ~author:"calf" msg + ) fmt + + let ok = "{\"status\": \"ok\"}" + + class item db = object(self) + + inherit [Cohttp_lwt_body.t] Wm.resource + + method private of_string rd = + Cohttp_lwt_body.to_string rd.Wm.Rd.req_body >>= fun value -> + with_key rd (fun key -> + let info = infof "Updating %a" KV.Key.pp key in + KV.set db ~info key value >>= fun () -> + let resp_body = `String ok in + let rd = { rd with Wm.Rd.resp_body } in + Wm.continue true rd + ) + + method private to_string rd = + with_key rd (fun key -> + KV.find db key >>= function + | Some value -> Wm.continue (`String value) rd + | None -> assert false + ) + + method resource_exists rd = + with_key rd (fun key -> + KV.mem db key >>= fun mem -> + Wm.continue mem rd + ) + + method allowed_methods rd = + Wm.continue [`GET; `HEAD; `PUT; `DELETE] rd + + method content_types_provided rd = + Wm.continue [ + "plain", self#to_string + ] rd + + method content_types_accepted rd = + Wm.continue [ + "plain", self#of_string + ] rd + + method delete_resource rd = + with_key rd (fun key -> + let info = infof "Deleting %a" KV.Key.pp key in + KV.remove db ~info key >>= fun () -> + let resp_body = `String ok in + Wm.continue true { rd with Wm.Rd.resp_body } + ) + end + + let v db routes = + let routes = List.map (fun r -> r, fun () -> new item db) routes in + let callback (_ch, _conn) request body = + let open Cohttp in + (Wm.dispatch' routes ~body ~request >|= function + | None -> (`Not_found, Header.init (), `String "Not found", []) + | Some result -> result) + >>= fun (status, headers, body, path) -> + Log.info (fun l -> + l "%d - %s %s" + (Code.code_of_status status) + (Code.string_of_method (Request.meth request)) + (Uri.path (Request.uri request))); + Log.debug (fun l -> l "path=%a" Fmt.(Dump.list string) path); + (* Finally, send the response to the client *) + Cohttp_lwt_unix.Server.respond ~headers ~body ~status () + in + (* create the server and handle requests with the function defined above *) + let conn_closed (_, conn) = + Log.info (fun l -> + l "connection %s closed\n%!" (Cohttp.Connection.to_string conn)) + in + Cohttp_lwt_unix.Server.make ~callback ~conn_closed () +end + +let int_of_fd (t:Lwt_unix.file_descr) = + (Obj.magic (Lwt_unix.unix_file_descr t): int) + +let serve ~routes db fd = + let http = Dispatch.v db routes in + let on_exn e = Log.err (fun l -> l "ERROR: %a" Fmt.exn e) in + Lwt_unix.blocking fd >>= fun blocking -> + Log.debug (fun l -> + l "Serving the control state over fd:%d (blocking=%b)" + (int_of_fd fd) blocking + ); + Cohttp_lwt_unix.Server.create ~on_exn ~mode:(`Fd fd) http diff --git a/projects/miragesdk/src/sdk/ctl.mli b/projects/miragesdk/src/sdk/ctl.mli new file mode 100644 index 000000000..b88e4a7cf --- /dev/null +++ b/projects/miragesdk/src/sdk/ctl.mli @@ -0,0 +1,16 @@ +(** [Control] handle the server part of the control path, running in + the privileged container. *) + +module KV: Irmin.KV with type contents = string + +val v: string -> KV.t Lwt.t +(** [v p] is the KV store storing the control state, located at path + [p] in the filesystem of the privileged container. *) + +val serve: routes:string list -> KV.t -> Lwt_unix.file_descr -> unit Lwt.t +(** [serve ~routes kv fd] is the thread exposing the KV store [kv], + holding control state, running inside the privileged container. + [routes] are the routes exposed by the server (currently over a + simple HTTP server -- but will change to something else later, + probably protobuf) to the calf and [kv] is the control state + handler. *) diff --git a/projects/miragesdk/src/sdk/inflator.ml b/projects/miragesdk/src/sdk/inflator.ml new file mode 100644 index 000000000..4554f32a4 --- /dev/null +++ b/projects/miragesdk/src/sdk/inflator.ml @@ -0,0 +1,47 @@ +(* https://github.com/Engil/Canopy/blob/3b5573ad0be0fa729b6d4e1ca8b9bb348e164960/inflator.ml *) + +let input_buffer = Bytes.create 0xFFFF +let output_buffer = Bytes.create 0xFFFF +let window = Decompress.Window.create ~proof:Decompress.B.proof_bytes + +let deflate ?(level = 4) buff = + let pos = ref 0 in + let res = Buffer.create (Cstruct.len buff) in + Decompress.Deflate.bytes input_buffer output_buffer + (fun input_buffer -> function + | Some _ -> + let n = min 0xFFFF (Cstruct.len buff - !pos) in + Cstruct.blit_to_bytes buff !pos input_buffer 0 n; + pos := !pos + n; + n + | None -> + let n = min 0xFFFF (Cstruct.len buff - !pos) in + Cstruct.blit_to_bytes buff !pos input_buffer 0 n; + pos := !pos + n; + n) + (fun output_buffer len -> + Buffer.add_subbytes res output_buffer 0 len; + 0xFFFF) + (Decompress.Deflate.default ~proof:Decompress.B.proof_bytes level) + |> function + | Ok _ -> Cstruct.of_string (Buffer.contents res) + | Error _ -> failwith "Deflate.deflate" + +let inflate ?output_size orig = + let res = Buffer.create (match output_size with + | Some len -> len + | None -> Mstruct.length orig) + in + Decompress.Inflate.bytes input_buffer output_buffer + (fun input_buffer -> + let n = min 0xFFFF (Mstruct.length orig) in + let s = Mstruct.get_string orig n in + Bytes.blit_string s 0 input_buffer 0 n; + n) + (fun output_buffer len -> + Buffer.add_subbytes res output_buffer 0 len; + 0xFFFF) + (Decompress.Inflate.default (Decompress.Window.reset window)) + |> function + | Ok _ -> Some (Mstruct.of_string (Buffer.contents res)) + | Error _ -> None diff --git a/projects/miragesdk/src/sdk/init.ml b/projects/miragesdk/src/sdk/init.ml new file mode 100644 index 000000000..301f19804 --- /dev/null +++ b/projects/miragesdk/src/sdk/init.ml @@ -0,0 +1,217 @@ +open Lwt.Infix + +let src = Logs.Src.create "init" ~doc:"Init steps" +module Log = (val Logs.src_log src : Logs.LOG) + +let failf fmt = Fmt.kstrf Lwt.fail_with fmt + +module Fd = struct + + type t = { + name: string; + fd : Lwt_unix.file_descr; + } + + let fd t = t.fd + let stdout = { name = "stdout"; fd = Lwt_unix.stdout } + let stderr = { name = "stderr"; fd = Lwt_unix.stderr } + let stdin = { name = "stdin" ; fd = Lwt_unix.stdin } + + let to_int t = + (Obj.magic (Lwt_unix.unix_file_descr t.fd): int) + + let pp ppf fd = Fmt.pf ppf "%s:%d" fd.name (to_int fd) + + let close fd = + Log.debug (fun l -> l "close %a" pp fd); + Lwt_unix.close fd.fd + + let dev_null = + Lwt_unix.of_unix_file_descr ~blocking:false + (Unix.openfile "/dev/null" [Unix.O_RDWR] 0) + + let redirect_to_dev_null fd = + Log.debug (fun l -> l "redirect-stdin-to-dev-null"); + Lwt_unix.close fd.fd >>= fun () -> + Lwt_unix.dup2 dev_null fd.fd; + Lwt_unix.close dev_null + + let dup2 ~src ~dst = + Log.debug (fun l -> l "dup2 %a => %a" pp src pp dst); + Lwt_unix.dup2 src.fd dst.fd; + close src + + let proxy_net ~net fd = + Log.debug (fun l -> l "proxy-net eth0 <=> %a" pp fd); + let rec listen_rawlink () = + Lwt_rawlink.read_packet net >>= fun buf -> + Log.debug (fun l -> l "PROXY-NET: => %a" Cstruct.hexdump_pp buf); + Log.debug (fun l -> l "PROXY-NET: => %S" (Cstruct.to_string buf)); + let rec write buf = + Lwt_cstruct.write fd.fd buf >>= function + | 0 -> Lwt.return_unit + | n -> write (Cstruct.shift buf n) + in + write buf >>= fun () -> + listen_rawlink () + in + let listen_socket () = + let len = 16 * 1024 in + let buf = Cstruct.create len in + let rec loop () = + Lwt_cstruct.read fd.fd buf >>= fun len -> + let buf = Cstruct.sub buf 0 len in + Log.debug (fun l -> l "PROXY-NET: <= %a" Cstruct.hexdump_pp buf); + Lwt_rawlink.send_packet net buf >>= fun () -> + loop () + in + loop () + in + Lwt.pick [ + listen_rawlink (); + listen_socket (); + ] + + let rec really_write dst buf off len = + match len with + | 0 -> Lwt.return_unit + | len -> + Lwt_unix.write dst.fd buf off len >>= fun n -> + really_write dst buf (off+n) (len-n) + + let forward ~src ~dst = + Log.debug (fun l -> l "forward %a => %a" pp src pp dst); + let len = 16 * 1024 in + let buf = Bytes.create len in + let rec loop () = + Lwt_unix.read src.fd buf 0 len >>= fun len -> + if len = 0 then + (* FIXME: why this ever happen *) + Fmt.kstrf Lwt.fail_with "FORWARD[%a => %a]: EOF" pp src pp dst + else ( + Log.debug (fun l -> + l "FORWARD[%a => %a]: %S (%d)" + pp src pp dst (Bytes.sub buf 0 len) len); + really_write dst buf 0 len >>= fun () -> + loop () + ) + in + loop () + + let proxy x y = + Lwt.pick [ + forward ~src:x ~dst:y; + forward ~src:y ~dst:x; + ] + +end + +module Pipe = struct + + type t = Fd.t * Fd.t + + let priv = fst + let calf = snd + + let socketpair name = + let priv, calf = Lwt_unix.(socketpair PF_UNIX SOCK_STREAM 0) in + Lwt_unix.clear_close_on_exec priv; + Lwt_unix.clear_close_on_exec calf; + { Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf } + + let pipe name = + let priv, calf = Lwt_unix.pipe () in + Lwt_unix.clear_close_on_exec priv; + Lwt_unix.clear_close_on_exec calf; + { Fd.name = name; fd = priv }, { Fd.name = name ^ "-calf"; fd = calf } + + (* logs pipe *) + let stdout = pipe "stdout" + let stderr = pipe "stderr" + + (* store pipe *) + let ctl = socketpair "ctl" + + (* network pipe *) + let net = socketpair "net" + + (* metrics pipe *) + let metrics = pipe "metrics" + +end + +let exec_calf cmd = + Fd.(redirect_to_dev_null stdin) >>= fun () -> + + (* close parent fds *) + Fd.close Pipe.(priv stdout) >>= fun () -> + Fd.close Pipe.(priv stderr) >>= fun () -> + Fd.close Pipe.(priv ctl) >>= fun () -> + Fd.close Pipe.(priv net) >>= fun () -> + Fd.close Pipe.(priv metrics) >>= fun () -> + + let cmds = String.concat " " cmd in + + let calf_net = Pipe.(calf net) in + let calf_ctl = Pipe.(calf ctl) in + let calf_stdout = Pipe.(calf stdout) in + let calf_stderr = Pipe.(calf stderr) in + + Log.info (fun l -> l "Executing %s" cmds); + Log.debug (fun l -> l "net-fd=%a store-fd=%a" Fd.pp calf_net Fd.pp calf_ctl); + + Fd.dup2 ~src:calf_stdout ~dst:Fd.stdout >>= fun () -> + Fd.dup2 ~src:calf_stderr ~dst:Fd.stderr >>= fun () -> + + (* exec the calf *) + Unix.execve (List.hd cmd) (Array.of_list cmd) [||] + +let rawlink ?filter ethif = + Log.debug (fun l -> l "bringing up %s" ethif); + (try Tuntap.set_up_and_running ethif + with e -> Log.err (fun l -> l "rawlink: %a" Fmt.exn e)); + Lwt_rawlink.open_link ?filter ethif + +let check_exit_status cmd status = + let cmds = String.concat " " cmd in + match status with + | Unix.WEXITED 0 -> Lwt.return_unit + | Unix.WEXITED i -> failf "%s: exit %d" cmds i + | Unix.WSIGNALED i -> failf "%s: signal %d" cmds i + | Unix.WSTOPPED i -> failf "%s: stopped %d" cmds i + +let exec_priv ~pid ~cmd ~net ~ctl ~handlers = + + Fd.(redirect_to_dev_null stdin) >>= fun () -> + + (* close child fds *) + Fd.close Pipe.(calf stdout) >>= fun () -> + Fd.close Pipe.(calf stderr) >>= fun () -> + Fd.close Pipe.(calf net) >>= fun () -> + Fd.close Pipe.(calf ctl) >>= fun () -> + Fd.close Pipe.(calf metrics) >>= fun () -> + + let wait () = + Lwt_unix.waitpid [] pid >>= fun (_pid, w) -> + Lwt_io.flush_all () >>= fun () -> + + check_exit_status cmd w + in + Lwt.pick ([ + wait (); + (* data *) + Fd.proxy_net ~net Pipe.(priv net); + + (* redirect the calf stdout to the shim stdout *) + Fd.forward ~src:Pipe.(priv stdout) ~dst:Fd.stdout; + Fd.forward ~src:Pipe.(priv stderr) ~dst:Fd.stderr; + (* TODO: Init.Fd.forward ~src:Init.Pipe.(priv metrics) ~dst:Init.Fd.metric; *) + ctl (); +(* handlers (); *) + ]) + +let run ~net ~ctl ~handlers cmd = + Lwt_io.flush_all () >>= fun () -> + match Lwt_unix.fork () with + | 0 -> exec_calf cmd + | pid -> exec_priv ~pid ~cmd ~net ~ctl ~handlers diff --git a/projects/miragesdk/src/sdk/init.mli b/projects/miragesdk/src/sdk/init.mli new file mode 100644 index 000000000..3fb6abb68 --- /dev/null +++ b/projects/miragesdk/src/sdk/init.mli @@ -0,0 +1,109 @@ +(** Init functions. + + [Init] contains funcitons to initialise the state of the + privileged container. + + {ul + + {- fowrard and filter the network traffic using BPF (for instance + to allow only DHCP traffic).} + {- open pipes for forwarding the calf's stdout and stderr + to the privileged container's ones.} + {- open a pipe to forward the metrics.} + {- open a socket pair with the calf to be able to transmit control + data, e.g. the IP address once a DHCP lease is obtained.} + }*) + + +module Fd: sig + + type t + (** The type for file descriptors. *) + + val pp: t Fmt.t + (** [pp_fd] pretty prints a file descriptor. *) + + val fd: t -> Lwt_unix.file_descr + (** [fd t] is [t]'s underlying unix file descriptor. *) + + val to_int: t -> int +(** [to_int fd] is [fd]'s number. *) + + val redirect_to_dev_null: t -> unit Lwt.t + (** [redirect_to_dev_null fd] redirects [fd] [/dev/null]. *) + + val close: t -> unit Lwt.t + (** [close fd] closes [fd]. *) + + val dup2: src:t -> dst:t -> unit Lwt.t + (** [dup2 ~src ~dst] calls [Unix.dup2] on [src] and [dst]. *) + + val proxy_net: net:Lwt_rawlink.t -> t -> unit Lwt.t + (** [proxy_net ~net fd] proxies the traffic between the raw net link + [net] and [fd]. *) + + val forward: src:t -> dst:t -> unit Lwt.t + (** [forward ~src ~dst] forwards the flow from [src] to [dst]. *) + + (** {1 Usefull File Descriptors} *) + + val stdin: t + (** [stdin] is the standart input. *) + + val stdout: t + (** [stdout] is the standard output. *) + + val stderr: t + (** [stderr] is the standard error. *) + +end + +module Pipe: sig + + type t + (** The type for pipes. Could be either uni-directional (normal + pipes) or a bi-directional (socket pairs). *) + + val priv: t -> Fd.t + (** [priv p] is the private side of the pipe [p]. *) + + val calf: t -> Fd.t + (** [calf p] is the calf side of the pipe [p]. *) + + (** {1 Useful Pipes} *) + + val stdout: t + (** [stdout] is the uni-directional pipe from the calf's stdout . *) + + val stderr: t + (** [stderr] is the uni-directional pipe from the calf's stderr. *) + + val metrics: t + (** [metrics] is the uni-directional pipe fomr the calf's metric + endpoint. *) + + val ctl: t + (** [ctl] is the bi-directional pipe used to exchange control + data between the calf and the priv containers. *) + + val net: t + (** [net] is the bi-directional pipe used to exchange network + traffic between the calf and the priv containers. *) + +end + +val rawlink: ?filter:string -> string -> Lwt_rawlink.t +(** [rawlink ?filter i] is the net raw link to the interface [i] using + the (optional) BPF filter [filter]. *) + +val run: + net:Lwt_rawlink.t -> + ctl:(unit -> unit Lwt.t) -> + handlers:(unit -> unit Lwt.t) -> + string list -> unit Lwt.t +(** [run ~net ~ctl ~handlers cmd] runs [cmd] in a unprivileged calf + process. [ctl] is the control thread connected to the {Pipe.ctl} + pipe. [net] is the net raw link which will be connected to the + calf via the {!Pipe.net} socket pair. [handlers] are the system + handler thread which will react to control data to perform + privileged system actions. *) diff --git a/projects/miragesdk/src/sdk/io_fs.ml b/projects/miragesdk/src/sdk/io_fs.ml new file mode 100644 index 000000000..6e2ac43c0 --- /dev/null +++ b/projects/miragesdk/src/sdk/io_fs.ml @@ -0,0 +1,325 @@ +(* from irmin-unix, to avoid linking with gmp ... *) + +module Log = struct + let src = Logs.Src.create "git.unix" ~doc:"logs git's unix events" + include (val Logs.src_log src : Logs.LOG) +end + +open Lwt.Infix + +let mkdir_pool = Lwt_pool.create 1 (fun () -> Lwt.return_unit) + +let protect_unix_exn = function + | Unix.Unix_error _ as e -> Lwt.fail (Failure (Printexc.to_string e)) + | e -> Lwt.fail e + +let ignore_enoent = function + | Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return_unit + | e -> Lwt.fail e + +let protect f x = Lwt.catch (fun () -> f x) protect_unix_exn +let safe f x = Lwt.catch (fun () -> f x) ignore_enoent + +let mkdir dirname = + let rec aux dir = + if Sys.file_exists dir && Sys.is_directory dir then Lwt.return_unit + else ( + let clear = + if Sys.file_exists dir then ( + Log.debug (fun l -> + l "%s already exists but is a file, removing." dir); + safe Lwt_unix.unlink dir + ) else + Lwt.return_unit + in + clear >>= fun () -> + aux (Filename.dirname dir) >>= fun () -> + Log.debug (fun l -> l "mkdir %s" dir); + protect (Lwt_unix.mkdir dir) 0o755; + ) in + Lwt_pool.use mkdir_pool (fun () -> aux dirname) + +let file_exists f = + Lwt.catch (fun () -> Lwt_unix.file_exists f) (function + (* See https://github.com/ocsigen/lwt/issues/316 *) + | Unix.Unix_error (Unix.ENOTDIR, _, _) -> Lwt.return_false + | e -> Lwt.fail e) + +module Lock = struct + + let is_stale max_age file = + file_exists file >>= fun exists -> + if exists then ( + Lwt.catch (fun () -> + Lwt_unix.stat file >>= fun s -> + let stale = Unix.gettimeofday () -. s.Unix.st_mtime > max_age in + Lwt.return stale) + (function + | Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return false + | e -> Lwt.fail e) + ) else + Lwt.return false + + let unlock file = + Lwt_unix.unlink file + + let lock ?(max_age = 10. *. 60. (* 10 minutes *)) ?(sleep = 0.001) file = + let rec aux i = + Log.debug (fun f -> f "lock %s %d" file i); + is_stale max_age file >>= fun is_stale -> + if is_stale then ( + Log.err (fun f -> f "%s is stale, removing it." file); + unlock file >>= fun () -> + aux 1 + ) else + let create () = + let pid = Unix.getpid () in + mkdir (Filename.dirname file) >>= fun () -> + Lwt_unix.openfile file [Unix.O_CREAT; Unix.O_RDWR; Unix.O_EXCL] 0o600 + >>= fun fd -> + let oc = Lwt_io.of_fd ~mode:Lwt_io.Output fd in + Lwt_io.write_int oc pid >>= fun () -> + Lwt_unix.close fd + in + Lwt.catch create (function + | Unix.Unix_error(Unix.EEXIST, _, _) -> + let backoff = 1. +. Random.float (let i = float i in i *. i) in + Lwt_unix.sleep (sleep *. backoff) >>= fun () -> + aux (i+1) + | e -> Lwt.fail e) + in + aux 1 + + let with_lock file fn = + match file with + | None -> fn () + | Some f -> lock f >>= fun () -> Lwt.finalize fn (fun () -> unlock f) + +end + +let mmap_threshold = 4096 +let openfile_pool = Lwt_pool.create 200 (fun () -> Lwt.return_unit) + +let mkdir = mkdir + +type path = string + +(* we use file locking *) +type lock = path +let lock_file x = x + +let file_exists = file_exists + +let list_files kind dir = + if Sys.file_exists dir && Sys.is_directory dir then + let d = Sys.readdir dir in + let d = Array.to_list d in + let d = List.map (Filename.concat dir) d in + let d = List.filter kind d in + let d = List.sort String.compare d in + Lwt.return d + else + Lwt.return_nil + +let directories dir = + list_files (fun f -> + try Sys.is_directory f with Sys_error _ -> false + ) dir + +let files dir = + list_files (fun f -> + try not (Sys.is_directory f) with Sys_error _ -> false + ) dir + +let write_cstruct fd b = + let rec rwrite fd buf ofs len = + Lwt_bytes.write fd buf ofs len >>= fun n -> + if len = 0 then Lwt.fail End_of_file + else if n < len then rwrite fd buf (ofs + n) (len - n) + else Lwt.return_unit in + match Cstruct.len b with + | 0 -> Lwt.return_unit + | len -> rwrite fd (Cstruct.to_bigarray b) 0 len + +let delays = Array.init 20 (fun i -> 0.1 *. (float i) ** 2.) + +let command fmt = + Printf.ksprintf (fun str -> + Log.debug (fun l -> l "[exec] %s" str); + let i = Sys.command str in + if i <> 0 then Log.debug (fun l -> l "[exec] error %d" i); + Lwt.return_unit + ) fmt + +let remove_dir dir = + if Sys.os_type = "Win32" then + command "cmd /d /v:off /c rd /s /q %S" dir + else + command "rm -rf %S" dir + +let remove_file ?lock file = + Lock.with_lock lock (fun () -> + Lwt.catch + (fun () -> Lwt_unix.unlink file) + (function + (* On Windows, [EACCES] can also occur in an attempt to + rename a file or directory or to remove an existing + directory. *) + | Unix.Unix_error (Unix.EACCES, _, _) + | Unix.Unix_error (Unix.EISDIR, _, _) -> remove_dir file + | Unix.Unix_error (Unix.ENOENT, _, _) -> Lwt.return_unit + | e -> Lwt.fail e) + ) + +let rename = + if Sys.os_type <> "Win32" then Lwt_unix.rename + else + fun tmp file -> + let rec aux i = + Lwt.catch + (fun () -> Lwt_unix.rename tmp file) + (function + (* On Windows, [EACCES] can also occur in an attempt to + rename a file or directory or to remove an existing + directory. *) + | Unix.Unix_error (Unix.EACCES, _, _) as e -> + if i >= Array.length delays then Lwt.fail e + else ( + file_exists file >>= fun exists -> + if exists && Sys.is_directory file then ( + remove_dir file >>= fun () -> aux (i+1) + ) else ( + Log.debug (fun l -> + l "Got EACCES, retrying in %.1fs" delays.(i)); + Lwt_unix.sleep delays.(i) >>= fun () -> aux (i+1) + )) + | e -> Lwt.fail e) + in + aux 0 + +let with_write_file ?temp_dir file fn = + begin match temp_dir with + | None -> Lwt.return_unit + | Some d -> mkdir d + end >>= fun () -> + let dir = Filename.dirname file in + mkdir dir >>= fun () -> + let tmp = Filename.temp_file ?temp_dir (Filename.basename file) "write" in + Lwt_pool.use openfile_pool (fun () -> + Log.debug (fun l -> l "Writing %s (%s)" file tmp); + Lwt_unix.(openfile tmp [O_WRONLY; O_NONBLOCK; O_CREAT; O_TRUNC] 0o644) + >>= fun fd -> + Lwt.finalize (fun () -> protect fn fd) (fun () -> Lwt_unix.close fd) + >>= fun () -> + rename tmp file + ) + +let read_file_with_read file size = + let chunk_size = max 4096 (min size 0x100000) in + let buf = Cstruct.create size in + let flags = [Unix.O_RDONLY] in + let perm = 0o0 in + Lwt_unix.openfile file flags perm >>= fun fd -> + let rec aux off = + let read_size = min chunk_size (size - off) in + Lwt_bytes.read fd buf.Cstruct.buffer off read_size >>= fun read -> + (* It should test for read = 0 in case size is larger than the + real size of the file. This may happen for instance if the + file was truncated while reading. *) + let off = off + read in + if off >= size then + Lwt.return buf + else + aux off + in + Lwt.finalize (fun () -> aux 0) + (fun () -> Lwt_unix.close fd) + +let read_file_with_mmap file = + let fd = Unix.(openfile file [O_RDONLY; O_NONBLOCK] 0o644) in + let ba = Lwt_bytes.map_file ~fd ~shared:false () in + Unix.close fd; + Lwt.return (Cstruct.of_bigarray ba) + +let read_file file = + Lwt.catch (fun () -> + Lwt_pool.use openfile_pool (fun () -> + Log.debug (fun l -> l "Reading %s" file); + Lwt_unix.stat file >>= fun stats -> + let size = stats.Lwt_unix.st_size in + (if size >= mmap_threshold then read_file_with_mmap file + else read_file_with_read file size + ) >|= fun buf -> + Some buf + ) + ) (function + | Unix.Unix_error _ | Sys_error _ -> Lwt.return_none + | e -> Lwt.fail e) + +let stat_info_unsafe path = + let open Git.Index in + let stats = Unix.stat path in + let ctime = { lsb32 = Int32.of_float stats.Unix.st_ctime; nsec = 0l } in + let mtime = { lsb32 = Int32.of_float stats.Unix.st_mtime; nsec = 0l } in + let dev = Int32.of_int stats.Unix.st_dev in + let inode = Int32.of_int stats.Unix.st_ino in + let mode = match stats.Unix.st_kind, stats.Unix.st_perm with + | Unix.S_REG, p -> if p land 0o100 = 0o100 then `Exec else `Normal + | Unix.S_LNK, _ -> `Link + | k, p -> + let kind = match k with + | Unix.S_REG -> "REG" + | Unix.S_DIR -> "DIR" + | Unix.S_CHR -> "CHR" + | Unix.S_BLK -> "BLK" + | Unix.S_LNK -> "LNK" + | Unix.S_FIFO -> "FIFO" + | Unix.S_SOCK -> "SOCK" + in + let perm = Printf.sprintf "%o" p in + let error = + Printf.sprintf "%s: not supported kind of file [%s, %s]." + path kind perm + in + failwith error + in + let uid = Int32.of_int stats.Unix.st_uid in + let gid = Int32.of_int stats.Unix.st_gid in + let size = Int32.of_int stats.Unix.st_size in + { ctime; mtime; dev; inode; uid; gid; mode; size } + +let stat_info path = + Lwt.catch (fun () -> Lwt.return (Some (stat_info_unsafe path))) (function + | Sys_error _ | Unix.Unix_error _ -> Lwt.return_none + | e -> Lwt.fail e) + +let chmod ?lock f `Exec = + Lock.with_lock lock (fun () -> Lwt_unix.chmod f 0o755) + +let write_file ?temp_dir ?lock file b = + let write () = + with_write_file file ?temp_dir (fun fd -> write_cstruct fd b) + in + Lock.with_lock lock (fun () -> + Lwt.catch write (function + | Unix.Unix_error (Unix.EISDIR, _, _) -> remove_dir file >>= write + | e -> Lwt.fail e + ) + ) + +let test_and_set_file ?temp_dir ~lock file ~test ~set = + Lock.with_lock (Some lock) (fun () -> + read_file file >>= fun v -> + let equal = match test, v with + | None , None -> true + | Some x, Some y -> Cstruct.equal x y + | _ -> false + in + if not equal then Lwt.return false + else + (match set with + | None -> remove_file file + | Some v -> write_file ?temp_dir file v) + >|= fun () -> + true + ) diff --git a/projects/miragesdk/src/sdk/jbuild b/projects/miragesdk/src/sdk/jbuild new file mode 100644 index 000000000..403040e55 --- /dev/null +++ b/projects/miragesdk/src/sdk/jbuild @@ -0,0 +1,10 @@ +(jbuild_version 1) + +(library + ((name sdk) + (libraries (logs-syslog.lwt threads cohttp.lwt cstruct.lwt + cmdliner fmt.cli logs.fmt logs.cli fmt.tty decompress + irmin irmin-git irmin-http lwt.unix rawlink tuntap + irmin-watcher inotify)) + (preprocess (per_file ((pps (cstruct.ppx)) (ctl)))) + ))