Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Nov 19, 2024
1 parent 4e74360 commit 2896698
Showing 1 changed file with 59 additions and 56 deletions.
115 changes: 59 additions & 56 deletions src/core/builtins/builtins_request.ml
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ let _ =
in
Lang.string s)

exception Invalid
exception Process_failed

class flush ~name r =
class process ~name r =
object (self)
inherit
Request_dynamic.dynamic
Expand All @@ -317,66 +317,69 @@ class flush ~name r =
self#on_wake_up (fun () ->
match Request.get_decoder ~ctype:self#content_type r with
| Some _ -> self#set_queue [r]
| None | (exception _) -> raise Invalid)
| None | (exception _) -> raise Process_failed)
end

let flush_request ~log ~name ~ratio ~timeout ~sleep_latency ~process r =
let process_request ~log ~name ~ratio ~timeout ~sleep_latency ~process r =
let module Time = (val Clock.time_implementation () : Liq_time.T) in
let open Time in
let start_time = Time.time () in
match Request.resolve ~timeout r with
| `Failed | `Timeout -> ()
| `Resolved ->
| `Resolved -> (
let timeout = Time.of_float timeout in
let timeout_time = Time.(start_time |+| timeout) in
let started = ref false in
let stopped = ref false in
let s = new flush ~name r in
let s = (process (s :> Source.source) :> Source.source) in
let clock =
Clock.create ~id:name ~sync:`Passive
~on_error:(fun exn bt ->
stopped := true;
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while processing source: %s"
(Printexc.to_string exn)))
()
in
let _ =
new Output.dummy
~clock ~infallible:false
~on_start:(fun () -> started := true)
~on_stop:(fun () -> stopped := true)
~register_telnet:false ~autostart:true (Lang.source s)
in
(try
Clock.start ~force:true clock;
log#info "Start streaming loop (ratio: %.02fx)" ratio;
let sleep_latency = Time.of_float sleep_latency in
let target_time () =
Time.(
start_time |+| sleep_latency
|+| of_float (Clock.time clock /. ratio))
in
while (not (Atomic.get should_stop)) && not !stopped do
if (not !started) && Time.(timeout_time |<=| start_time) then (
log#important "Timeout while waiting for the source to start!";
stopped := true)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done
with Invalid | Clock.Has_stopped -> ());
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Request processed. Total processing time: %.02fs, effective ratio: \
%.02fx"
processing_time effective_ratio;
Clock.stop clock
try
let s = new process ~name r in
let s = (process (s :> Source.source) :> Source.source) in
let clock =
Clock.create ~id:name ~sync:`Passive
~on_error:(fun exn bt ->
Utils.log_exception ~log
~bt:(Printexc.raw_backtrace_to_string bt)
(Printf.sprintf "Error while processing source: %s"
(Printexc.to_string exn));
raise Process_failed)
()
in
Fun.protect
~finally:(fun () -> try Clock.stop clock with _ -> ())
(fun () ->
let started = ref false in
let stopped = ref false in
let _ =
new Output.dummy
~clock ~infallible:false ~register_telnet:false
~on_start:(fun () -> started := true)
~on_stop:(fun () -> stopped := true)
~autostart:true (Lang.source s)
in
Clock.start ~force:true clock;
log#info "Start streaming loop (ratio: %.02fx)" ratio;
let sleep_latency = Time.of_float sleep_latency in
let target_time () =
Time.(
start_time |+| sleep_latency
|+| of_float (Clock.time clock /. ratio))
in
while (not (Atomic.get should_stop)) && not !stopped do
if (not !started) && Time.(timeout_time |<=| Time.time ()) then (
log#important
"Timeout while waiting for the source to be ready!";
raise Process_failed)
else (
Clock.tick clock;
let target_time = target_time () in
if Time.(time () |<| (target_time |+| sleep_latency)) then
sleep_until target_time)
done;
let processing_time = Time.(to_float (time () |-| start_time)) in
let effective_ratio = Clock.time clock /. processing_time in
log#info
"Request processed. Total processing time: %.02fs, effective \
ratio: %.02fx"
processing_time effective_ratio)
with Process_failed | Clock.Has_stopped -> ())

let _ =
let log = Log.make ["request"; "dump"] in
Expand Down Expand Up @@ -429,7 +432,7 @@ let _ =
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_request ~log ~name:"request.dump" ~ratio ~timeout ~sleep_latency
process_request ~log ~name:"request.dump" ~ratio ~timeout ~sleep_latency
~process r;
log#info "Request dumped.";
Lang.unit)
Expand Down Expand Up @@ -478,6 +481,6 @@ let _ =
let ratio = Lang.to_float (List.assoc "ratio" p) in
let timeout = Lang.to_float (List.assoc "timeout" p) in
let sleep_latency = Lang.to_float (List.assoc "sleep_latency" p) in
flush_request ~log ~name:"request.process" ~ratio ~timeout ~sleep_latency
~process r;
process_request ~log ~name:"request.process" ~ratio ~timeout
~sleep_latency ~process r;
Lang.unit)

0 comments on commit 2896698

Please sign in to comment.