Merge pull request #2412 from talex5/https-unikernel

Update https-unikernel to released capnp-rpc 0.1 API
This commit is contained in:
Justin Cormack 2017-08-15 15:58:52 +01:00 committed by GitHub
commit 8bd6465e27
17 changed files with 290 additions and 265 deletions

View File

@ -1,15 +1,12 @@
FROM ocaml/opam@sha256:523988cb7ac4c51e1c6e2f00658686c320330000859f49e1ec8fe3d6df046a26
FROM ocaml/opam@sha256:e2e0dbbc859e078504d3a084feda27194406badf0d3d7e3d5321179c1c03190b
#FROM ocaml/opam:debian-9_ocaml-4.04.0
RUN opam pin add -yn capnp 'https://github.com/talex5/capnp-ocaml.git#demo2'
RUN opam pin add -yn capnp-rpc 'https://github.com/talex5/capnp-rpc.git#demo2'
RUN opam pin add -yn capnp-rpc-lwt 'https://github.com/talex5/capnp-rpc.git#demo2'
RUN opam depext -i -y jbuilder lwt cohttp astring tls capnp camlzip alcotest cohttp
RUN cd opam-repository && git fetch && git reset --hard df060ffa5c9d62ec63395fa80d0f5b50a5863c47 && opam update
RUN opam depext -i -y jbuilder lwt cohttp astring tls capnp camlzip alcotest cohttp capnp-rpc-unix
RUN sudo apt-get install -y screen python-pip python-setuptools python-dev --no-install-recommends
RUN pip install cython pycapnp
ADD opam /home/opam/src/opam
RUN opam pin add -ny mypkg /home/opam/src
RUN opam install --deps-only mypkg
ENV JBUILD_STATIC=true
WORKDIR /home/opam/src
ADD . /home/opam/src
RUN sudo chown -R opam .

View File

@ -32,15 +32,15 @@ The easiest way to do this is by creating multiple windows in `screen` (which is
First, start the store:
https-unikernel-store /tmp/store.sock
https-unikernel-store unix:/tmp/store.sock
Then, create a new window (`Ctrl-A c`) and start the http service:
https-unikernel-http /tmp/http.sock --store=/tmp/store.sock
https-unikernel-http unix:/tmp/http.sock --store=unix:/tmp/store.sock
Finally, make another window and run the TLS terminator:
https-unikernel-tls --http /tmp/http.sock --port 8443
https-unikernel-tls --http unix:/tmp/http.sock --port 8443
### Testing with Python

View File

@ -1 +0,0 @@
include Proto.MakeRPC(Capnp.BytesMessage)(Capnp_rpc_lwt)

View File

@ -1,77 +0,0 @@
open Lwt.Infix
open Capnp_rpc_lwt
let connect ~switch path =
Logs.info (fun f -> f "Connecting to %S" path);
let socket = Unix.(socket PF_UNIX SOCK_STREAM 0) in
begin
try Unix.connect socket (Unix.ADDR_UNIX path)
with Unix.Unix_error(Unix.ECONNREFUSED, "connect", "") ->
Logs.err (fun f -> f "Failed to connect to %S" path);
exit 1
end;
let endpoint = Endpoint.of_socket ~switch socket in
let conn = CapTP.connect ~switch endpoint in
CapTP.bootstrap conn
let rm_socket path =
match Unix.lstat path with
| stat when stat.Unix.st_kind = Unix.S_SOCK -> Unix.unlink path
| _ -> failwith (Fmt.strf "%S exists and is not a socket" path)
| exception Unix.Unix_error(Unix.ENOENT, "lstat", _) -> ()
let listen ~switch ~offer path =
let socket = Unix.(socket PF_UNIX SOCK_STREAM 0) in
Lwt_switch.add_hook (Some switch) (fun () -> Unix.close socket; Lwt.return_unit);
rm_socket path;
Unix.bind socket (Unix.ADDR_UNIX path);
Unix.listen socket 5;
let socket = Lwt_unix.of_unix_file_descr socket in
Logs.info (fun f -> f "Waiting for connections on %S" path);
let rec loop () =
Lwt_unix.accept socket >>= fun (c, _) ->
Logs.info (fun f -> f "Got connection on %S" path);
Lwt_switch.with_switch @@ fun switch -> (* todo: with_child_switch *)
let endpoint = Endpoint.of_socket ~switch (Lwt_unix.unix_file_descr c) in
ignore (CapTP.connect ~switch ~offer endpoint);
loop () in
loop ()
module Actor = struct
type t = Fmt.style * string
let pp f (style, name) = Fmt.(styled style (const string name)) f ()
let tag = Logs.Tag.def "actor" pp
end
let pp_qid f = function
| None -> ()
| Some x ->
let s = Uint32.to_string x in
Fmt.(styled `Magenta (fun f x -> Fmt.pf f " (qid=%s)" x)) f s
let reporter =
let report src level ~over k msgf =
let src = Logs.Src.name src in
msgf @@ fun ?header ?(tags=Logs.Tag.empty) fmt ->
let actor =
match Logs.Tag.find Actor.tag tags with
| Some x -> x
| None -> `Black, "------"
in
let qid = Logs.Tag.find Capnp_rpc.Debug.qid_tag tags in
let print _ =
Fmt.(pf stderr) "%a@." pp_qid qid;
over ();
k ()
in
Fmt.kpf print Fmt.stderr ("%a %a %a: @[" ^^ fmt ^^ "@]")
Fmt.(styled `Magenta string) (Printf.sprintf "%11s" src)
Logs_fmt.pp_header (level, header)
Actor.pp actor
in
{ Logs.report = report }
let init_logging () =
Fmt_tty.setup_std_outputs ();
Logs.set_reporter reporter;
Logs.set_level (Some Logs.Info)

View File

@ -1 +0,0 @@
let endpoint_tag = Logs.Tag.def "endpoint" Fmt.string

View File

@ -1,22 +1,23 @@
let () = Common.init_logging ()
(** Run the HTTP service as a stand-alone Unix process. *)
let main store_socket http_socket =
let () = Logging.init ()
let main store_addr http_addr =
Lwt_main.run begin
Lwt_switch.with_switch @@ fun switch ->
let store = Common.connect ~switch store_socket in
let http = Http_server.service store in
Common.listen ~switch ~offer:http http_socket
let store = Capnp_rpc_unix.connect store_addr in
let http = Http_server.local store in
Capnp_rpc_unix.serve ~offer:http http_addr
end
open Cmdliner
let store =
let doc = "The database store to use" in
Arg.(required @@ opt (some string) None @@ info ~doc ~docv:"STORE" ["store"])
Arg.(required @@ opt (some Capnp_rpc_unix.Connect_address.conv) None @@ info ~doc ~docv:"STORE" ["store"])
let http =
let doc = "The http socket to provide" in
Arg.(required @@ pos 0 (some string) None @@ info ~doc ~docv:"HTTP" [])
Arg.(required @@ pos 0 (some Capnp_rpc_unix.Listen_address.conv) None @@ info ~doc ~docv:"HTTP" [])
let cmd =
Term.(const main $ store $ http), Term.info "http"

View File

@ -1,3 +1,5 @@
(** Accepts connections (over Cap'n Proto) and implements the HTTP protocol. *)
let src = Logs.Src.create "web.http" ~doc:"HTTP engine for web server"
module Log = (val Logs.src_log src: Logs.LOG)
@ -5,44 +7,6 @@ open Capnp_rpc_lwt
open Lwt.Infix
open Astring
module Remote_flow = struct
type buffer = Cstruct.t
type flow = Api.Reader.Flow.t Capability.t
type error = [`Capnp of Capnp_rpc.Error.t]
type write_error = [Mirage_flow.write_error | `Capnp of Capnp_rpc.Error.t]
type 'a io = 'a Lwt.t
let create x = x
let read t =
let module R = Api.Reader.Flow.Read_results in
let req = Capability.Request.create_no_args () in
Capability.call_for_value_exn t Api.Reader.Flow.read_method req >>= fun resp ->
let p = R.of_payload resp in
let data = R.data_get p in
Lwt.return (Ok (`Data (Cstruct.of_string data)))
let write t data =
let module P = Api.Builder.Flow.Write_params in
let req, p = Capability.Request.create P.init_pointer in
P.data_set p (Cstruct.to_string data);
Capability.call_for_value_exn t Api.Reader.Flow.write_method req >>= fun _ ->
Lwt.return (Ok ())
let writev t data =
write t (Cstruct.concat data)
let close _ = failwith "TODO: close"
let pp_error f = function
| `Capnp e -> Capnp_rpc.Error.pp f e
| `Closed -> Fmt.string f "Closed"
let pp_write_error f = function
| `Capnp e -> Capnp_rpc.Error.pp f e
| #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error f e
end
module IO = struct
type 'a t = 'a Lwt.t
let (>>=) = Lwt.bind
@ -50,7 +14,7 @@ module IO = struct
type ic = Lwt_io.input_channel
type oc = Lwt_io.output_channel
type conn = Remote_flow.flow
type conn = Rpc.Flow.flow
let read_line ic =
Lwt_io.read_line_opt ic
@ -68,22 +32,8 @@ end
module Server = Cohttp_lwt.Make_server(IO)
type t = Api.Reader.Store.t Capability.t
(* Make a Cap'n'Proto call to the store service *)
let get t path =
let module P = Api.Builder.Store.Get_params in
let req, p = Capability.Request.create P.init_pointer in
ignore (P.path_set_list p path);
Capability.call_for_value_exn t Api.Reader.Store.get_method req >>= fun resp ->
let open Api.Reader.Store in
match GetResults.get (GetResults.of_payload resp) with
| GetResults.NotFound -> Lwt.return None
| GetResults.Ok data -> Lwt.return (Some data)
| GetResults.Undefined _ -> failwith "Protocol error: bad msg type"
(* Handle HTTP requests *)
let callback t _conn req _body =
(* Handle one HTTP request *)
let handle_request store _conn req _body =
let open Cohttp in
let uri = Request.uri req in
Log.info (fun f -> f "HTTP request for %a" Uri.pp_hum uri);
@ -95,7 +45,7 @@ let callback t _conn req _body =
| [] -> ["index.html"]
| p -> p
in
begin get t path >>= function
begin Rpc.Store.get store path >>= function
| Some body -> Server.respond_string ~status:`OK ~body ()
| None -> Server.respond_not_found ~uri ()
end
@ -103,32 +53,13 @@ let callback t _conn req _body =
let body = Fmt.strf "Bad method %S" (Code.string_of_method m) in
Server.respond_error ~status:`Bad_request ~body ()
let callback t = Server.callback (Server.make ~callback:(callback t) ())
module Remote_flow_unix = Mirage_flow_unix.Make(Rpc.Flow)
module Remote_flow_unix = Mirage_flow_unix.Make(Remote_flow)
let handle_connection store c =
Log.info (fun f -> f "Handing new connection");
let flow = Remote_flow.create c in
callback store flow (Remote_flow_unix.ic flow) (Remote_flow_unix.oc flow) >>= fun () ->
Capability.dec_ref c;
Lwt.return_unit
let service store =
Api.Builder.HttpServer.local @@
object
inherit Api.Builder.HttpServer.service
method accept_impl req =
Log.info (fun f -> f "Handing new connection");
let module P = Api.Reader.HttpServer.Accept_params in
let p = P.of_payload req in
match P.connection_get p with
| None -> Service.fail "No connection!"
| Some i ->
let c = Payload.import req i in
Service.return_lwt (fun () ->
handle_connection store c >|= fun () ->
Ok (Service.Response.create_empty ())
)
end
let local store =
let handle_http_connection = Server.callback (Server.make ~callback:(handle_request store) ()) in
Rpc.Http.local (fun flow ->
Log.info (fun f -> f "Handing new connection");
handle_http_connection flow (Remote_flow_unix.ic flow) (Remote_flow_unix.oc flow) >>= fun () ->
Capability.dec_ref flow;
Lwt.return_unit
)

View File

@ -2,7 +2,7 @@
(executables (
(names (main store_main http_main tls_main))
(libraries (capnp-rpc-lwt cohttp.lwt irmin-unix cmdliner fmt.tty))
(libraries (capnp-rpc-lwt capnp-rpc-unix cohttp.lwt irmin-unix cmdliner fmt.tty))
(flags (:standard -w -53-55))
))
(rule

View File

@ -0,0 +1,38 @@
module Actor = struct
type t = Fmt.style * string
let pp f (style, name) = Fmt.(styled style (const string name)) f ()
let tag = Logs.Tag.def "actor" pp
end
let pp_qid f = function
| None -> ()
| Some x ->
let s = Uint32.to_string x in
Fmt.(styled `Magenta (fun f x -> Fmt.pf f " (qid=%s)" x)) f s
let reporter =
let report src level ~over k msgf =
let src = Logs.Src.name src in
msgf @@ fun ?header ?(tags=Logs.Tag.empty) fmt ->
let actor =
match Logs.Tag.find Actor.tag tags with
| Some x -> x
| None -> `Black, "------"
in
let qid = Logs.Tag.find Capnp_rpc.Debug.qid_tag tags in
let print _ =
Fmt.(pf stderr) "%a@." pp_qid qid;
over ();
k ()
in
Fmt.kpf print Fmt.stderr ("%a %a %a: @[" ^^ fmt ^^ "@]")
Fmt.(styled `Magenta string) (Printf.sprintf "%11s" src)
Logs_fmt.pp_header (level, header)
Actor.pp actor
in
{ Logs.report = report }
let init () =
Fmt_tty.setup_std_outputs ();
Logs.set_reporter reporter;
Logs.set_level (Some Logs.Info)

View File

@ -1,34 +1,32 @@
(** Run all the services together in a single process, communicating over Unix-domain sockets. *)
open Lwt.Infix
open Capnp_rpc_lwt
let switch = Lwt_switch.create ()
let socket_pair ~switch =
let client, server = Unix.(socketpair PF_UNIX SOCK_STREAM 0) in
Lwt_switch.add_hook (Some switch) (fun () ->
Unix.close client;
Unix.close server;
Lwt.return ()
);
(Endpoint.of_socket ~switch client, Endpoint.of_socket ~switch server)
let client, server = Lwt_unix.(socketpair PF_UNIX SOCK_STREAM 0) in
(Capnp_rpc_unix.endpoint_of_socket ~switch client,
Capnp_rpc_unix.endpoint_of_socket ~switch server)
let store_to_http, http_to_store = socket_pair ~switch
let http_to_tls, tls_to_http = socket_pair ~switch
let () =
Common.init_logging ();
Logging.init ();
Lwt_main.run begin
begin
Store.service () >>= fun service ->
let tags = Logs.Tag.add Common.Actor.tag (`Green, "Store ") Logs.Tag.empty in
Store.local () >>= fun service ->
let tags = Logs.Tag.add Logging.Actor.tag (`Green, "Store ") Logs.Tag.empty in
let _ : CapTP.t = CapTP.connect ~offer:service ~tags ~switch store_to_http in
Lwt.return ()
end
>>= fun () ->
begin
let tags = Logs.Tag.add Common.Actor.tag (`Red, "HTTP ") Logs.Tag.empty in
let tags = Logs.Tag.add Logging.Actor.tag (`Red, "HTTP ") Logs.Tag.empty in
let store = CapTP.bootstrap (CapTP.connect ~tags ~switch http_to_store) in
let service = Http_server.service store in
let service = Http_server.local store in
let _ : CapTP.t = CapTP.connect ~offer:service ~tags ~switch http_to_tls in
Lwt.return ()
end

View File

@ -12,7 +12,7 @@ interface Store {
}
interface Flow {
read @0 () -> (data :Data);
read @0 () -> (data :Data); # "" means end-of-file
write @1 (data :Data) -> ();
}

View File

@ -0,0 +1,140 @@
open Lwt.Infix
open Capnp_rpc_lwt
module Api = Proto.MakeRPC(Capnp_rpc_lwt)
module Flow = struct
let local ic oc =
let module F = Api.Service.Flow in
F.local @@ object
inherit F.service
method read_impl _ release_param_caps =
release_param_caps ();
Service.return_lwt (fun () ->
Lwt_io.read ~count:4096 ic >|= fun data ->
let open F.Read in
let resp, results = Service.Response.create Results.init_pointer in
Results.data_set results data;
Ok resp
)
method write_impl req release_param_caps =
release_param_caps ();
let open F.Write in
let data = Params.data_get req in
Service.return_lwt (fun () ->
Lwt_io.write oc data >>= fun () ->
Lwt.return (Ok (Service.Response.create_empty ()))
)
end
module Flow = Api.Client.Flow
type buffer = Cstruct.t
type flow = Flow.t Capability.t
type error = [`Capnp of Capnp_rpc.Error.t]
type write_error = [Mirage_flow.write_error | `Capnp of Capnp_rpc.Error.t]
type 'a io = 'a Lwt.t
let read t =
let open Flow.Read in
let req = Capability.Request.create_no_args () in
Capability.call_for_value_exn t method_id req >>= fun resp ->
match Results.data_get resp with
| "" -> Lwt.return (Ok `Eof)
| data -> Lwt.return (Ok (`Data (Cstruct.of_string data)))
let write t data =
let open Flow.Write in
let req, p = Capability.Request.create Params.init_pointer in
Params.data_set p (Cstruct.to_string data);
Capability.call_for_unit_exn t method_id req >>= fun () ->
Lwt.return (Ok ())
let writev t data =
write t (Cstruct.concat data)
let close _ = failwith "TODO: close"
let pp_error f = function
| `Capnp e -> Capnp_rpc.Error.pp f e
| `Closed -> Fmt.string f "Closed"
let pp_write_error f = function
| `Capnp e -> Capnp_rpc.Error.pp f e
| #Mirage_flow.write_error as e -> Mirage_flow.pp_write_error f e
end
module Store = struct
(* The Cap'n'Proto service interface we expose. *)
let local lookup =
let module Store = Api.Service.Store in
Store.local @@ object
inherit Store.service
method get_impl req release_param_caps =
let open Store.Get in
let path = Params.path_get_list req in
release_param_caps ();
Service.return_lwt (fun () ->
let resp, results = Service.Response.create Results.init_pointer in
begin
lookup path >|= function
| Some data -> Results.ok_set results data
| None -> Results.not_found_set results
end
>>= fun () ->
Lwt.return (Ok resp)
)
end
module Store = Api.Client.Store
type t = Store.t Capability.t
(* Make a Cap'n'Proto call to the store service *)
let get t path =
let open Store.Get in
let req, p = Capability.Request.create Params.init_pointer in
ignore (Params.path_set_list p path);
Capability.call_for_value_exn t method_id req >>= fun resp ->
let open Api.Reader.Store in
match GetResults.get resp with
| GetResults.NotFound -> Lwt.return None
| GetResults.Ok data -> Lwt.return (Some data)
| GetResults.Undefined _ -> failwith "Protocol error: bad msg type"
end
module Http = struct
let local handle_connection =
let module HttpServer = Api.Service.HttpServer in
HttpServer.local @@ object
inherit HttpServer.service
method accept_impl req release_param_caps =
let open HttpServer.Accept in
let flow = Params.connection_get req in
release_param_caps ();
match flow with
| None -> Service.fail "No connection!"
| Some c ->
Service.return_lwt (fun () ->
handle_connection c >|= fun () ->
Ok (Service.Response.create_empty ())
)
end
module HttpServer = Api.Client.HttpServer
type t = HttpServer.t Capability.t
let accept t flow =
let open HttpServer.Accept in
let req, p = Capability.Request.create Params.init_pointer in
Params.connection_set p (Some flow);
Capability.call_for_unit t method_id req >|= function
| Ok () -> ()
| Error e -> Logs.warn (fun f -> f "Error from HTTP server: %a" Capnp_rpc.Error.pp e)
end

View File

@ -0,0 +1,42 @@
(** Cap'n Proto RPC adaptors.
This module deals with sending and receiving RPC messages.
It provides a more idiomatic OCaml API than the generated stubs.
There is one sub-module for each RPC interface.
Each sub-module provides client-side methods to invoke the
service, and a [local] function to implement a service. *)
open Capnp_rpc_lwt
[@@@ocaml.warning "-34"]
(* See: https://caml.inria.fr/mantis/print_bug_page.php?bug_id=7438 *)
module Flow : sig
include Mirage_flow_lwt.S with
type flow = [`Flow_e102f5fcaceb1e06] Capability.t and
type error = [`Capnp of Capnp_rpc.Error.t] and
type write_error = [`Closed | `Capnp of Capnp_rpc.Error.t]
val local : Lwt_io.input Lwt_io.channel -> Lwt_io.output Lwt_io.channel -> flow
(** [local ic oc] is a capability to a local flow implemented by [ic] and [oc]. *)
end
module Store : sig
type t = [`Store_96a6b45508236c12] Capability.t
val get : t -> string list -> string option Lwt.t
(** [get t path] looks up [path] in store [t]. *)
val local : (string list -> string option Lwt.t) -> t
(** [local lookup] is a local store that responds to requests with [lookup key]. *)
end
module Http : sig
type t = [`HttpServer_9ecd1f7bbfef9f1e] Capability.t
val accept : t -> Flow.flow -> unit Lwt.t
(** [accept t flow] asks [t] to handle new connection [flow]. *)
val local : (Flow.flow -> unit Lwt.t) -> t
(** [local handle_connection] is a capability to a local HTTP service that
uses [handle_connection flow] to handle each connection. *)
end

View File

@ -1,35 +1,19 @@
(** The implementation of the store.
This just looks up the requested page in an Irmin database. *)
let src = Logs.Src.create "web.store" ~doc:"Datastore for web server"
module Log = (val Logs.src_log src: Logs.LOG)
module Irmin_store = Irmin_unix.Git.FS.KV(Irmin.Contents.String)
open Lwt.Infix
open Capnp_rpc_lwt
open Astring
(* The Cap'n'Proto service interface we expose. *)
let service () =
let local () =
let config = Irmin_fs.config "www-data" in
Irmin_store.Repo.v config >>= fun repo ->
Irmin_store.of_branch repo Irmin_store.Branch.master >|= fun db ->
Api.Builder.Store.local @@
object
inherit Api.Builder.Store.service
method get_impl req =
let module P = Api.Reader.Store.Get_params in
let module R = Api.Builder.Store.GetResults in
let params = P.of_payload req in
let path = P.path_get_list params in
Log.info (fun f -> f "Handing request for %a" (Fmt.Dump.list String.dump) path);
Service.return_lwt (fun () ->
let resp, results = Service.Response.create R.init_pointer in
begin
Irmin_store.find db path >|= function
| Some data -> R.ok_set results data
| None -> R.not_found_set results
end
>>= fun () ->
Lwt.return (Ok resp)
)
end
Rpc.Store.local (fun path ->
Log.info (fun f -> f "Handing request for %a" (Fmt.Dump.list String.dump) path);
Irmin_store.find db path
)

View File

@ -1,19 +1,20 @@
(** Run the Store service as a stand-alone Unix process. *)
open Lwt.Infix
let () = Common.init_logging ()
let () = Logging.init ()
let main store_socket =
Lwt_main.run begin
Lwt_switch.with_switch @@ fun switch ->
Store.service () >>= fun store ->
Common.listen ~switch ~offer:store store_socket
Store.local () >>= fun store ->
Capnp_rpc_unix.serve ~offer:store store_socket
end
open Cmdliner
let store =
let doc = "The database store to serve" in
Arg.(required @@ pos 0 (some string) None @@ info ~doc ~docv:"STORE" [])
Arg.(required @@ pos 0 (some Capnp_rpc_unix.Listen_address.conv) None @@ info ~doc ~docv:"STORE" [])
let cmd =
Term.(const main $ store), Term.info "store"

View File

@ -1,9 +1,10 @@
let () = Common.init_logging ()
(** Run the TLS terminator as a stand-alone Unix process. *)
let main http_socket port =
let () = Logging.init ()
let main http_addr port =
Lwt_main.run begin
Lwt_switch.with_switch @@ fun switch ->
let http_service = Common.connect ~switch http_socket in
let http_service = Capnp_rpc_unix.connect http_addr in
Tls_terminator.run ~http_service ~port
end
@ -11,7 +12,7 @@ open Cmdliner
let http =
let doc = "The HTTP service to use" in
Arg.(required @@ opt (some string) None @@ info ~doc ~docv:"HTTP" ["http"])
Arg.(required @@ opt (some Capnp_rpc_unix.Connect_address.conv) None @@ info ~doc ~docv:"HTTP" ["http"])
let tls =
let doc = "The TLS port on which to listen for incoming connections" in

View File

@ -1,39 +1,9 @@
(** The TLS terminator implementation.
Listens for TLS connections on a port and forwards the plaintext flow to the HTTP service. *)
open Lwt.Infix
open Capnp_rpc_lwt
let make_flow _flow ic oc =
Api.Builder.Flow.local @@
object
inherit Api.Builder.Flow.service
method read_impl _ =
Service.return_lwt (fun () ->
Lwt_io.read ~count:4096 ic >|= fun data ->
let module R = Api.Builder.Flow.Read_results in
let resp, results = Service.Response.create R.init_pointer in
R.data_set results data;
Ok resp
)
method write_impl req =
let module R = Api.Reader.Flow.Write_params in
let p = R.of_payload req in
let data = R.data_get p in
Service.return_lwt (fun () ->
Lwt_io.write oc data >>= fun () ->
Lwt.return (Ok (Service.Response.create_empty ()))
)
end
let handle ~http_service flow =
let module P = Api.Builder.HttpServer.Accept_params in
let req, p = Capability.Request.create P.init_pointer in
P.connection_set p (Some (Capability.Request.export req flow));
Capability.dec_ref flow;
Capability.call_for_value http_service Api.Reader.HttpServer.accept_method req >|= function
| Ok _ -> ()
| Error e -> Logs.warn (fun f -> f "Error from HTTP server: %a" Capnp_rpc.Error.pp e)
let run ~port ~http_service =
let tls_config : Conduit_lwt_unix.server_tls_config =
`Crt_file_path "tls-secrets/server.crt",
@ -43,13 +13,14 @@ let run ~port ~http_service =
in
let mode = `TLS tls_config in
Logs.info (fun f -> f "Listening on https port %d" port);
Conduit_lwt_unix.(serve ~ctx:default_ctx) ~mode (fun flow ic oc ->
Conduit_lwt_unix.(serve ~ctx:default_ctx) ~mode (fun _flow ic oc ->
Logs.info (fun f -> f "Got new TLS connection");
let flow_obj = make_flow flow ic oc in
handle ~http_service flow_obj
let flow_obj = Rpc.Flow.local ic oc in
Rpc.Http.accept http_service flow_obj >|= fun () ->
Capability.dec_ref flow_obj
)
let init ~switch ~to_http =
let tags = Logs.Tag.add Common.Actor.tag (`Blue, "TLS ") Logs.Tag.empty in
let tags = Logs.Tag.add Logging.Actor.tag (`Blue, "TLS ") Logs.Tag.empty in
let http_service = CapTP.bootstrap (CapTP.connect ~tags ~switch to_http) in
run ~http_service ~port:8443