Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove or protect all use of weak hash. #3814

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/core/builtins/builtins_ffmpeg_encoder.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ module InternalScaler = Swscale.Make (Swscale.BigArray) (Swscale.Frame)

type source_idx = { source : Source.source; idx : int64 }

module SourceIdx = Weak.Make (struct
type t = source_idx
module SourceIdx = struct
include Weak.Make (struct
type t = source_idx

let equal x y = x.source == y.source
let hash x = Obj.magic x.source
end)
let equal x y = x.source == y.source
let hash x = Oo.id x.source
end)

let create n = (create n, Mutex.create ())
let merge (c, m) v = Mutex.mutexify m (fun () -> merge c v) ()
end

let source_idx_map = SourceIdx.create 0

Expand Down
25 changes: 15 additions & 10 deletions src/core/converters/video/ffmpeg_video_converter.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,21 @@ module WH = struct
(* Number of converters to always keep in memory. *)
let n = 2
let keep = Array.make n None

let add h fmt conv =
let conv = (fmt, Some conv) in
for i = 1 to n - 1 do
keep.(i - 1) <- keep.(i)
done;
keep.(n - 1) <- Some conv;
add h conv

let assoc h fmt = Option.get (snd (find h (fmt, None)))
let create n = (create n, Mutex.create ())

let add (h, m) fmt conv =
Mutex.mutexify m
(fun () ->
let conv = (fmt, Some conv) in
for i = 1 to n - 1 do
keep.(i - 1) <- keep.(i)
done;
keep.(n - 1) <- Some conv;
add h conv)
()

let assoc (h, m) fmt =
Mutex.mutexify m (fun () -> Option.get (snd (find h (fmt, None)))) ()
end

(* Weak hashtable containing converters already created. *)
Expand Down
16 changes: 11 additions & 5 deletions src/core/encoder/encoders/ffmpeg_encoder_common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ type stream_data = {
mutable ready : bool;
}

module Stream = Weak.Make (struct
type t = stream_data
module Stream = struct
include Weak.Make (struct
type t = stream_data

let equal x y = x.idx = y.idx
let hash x = Int64.to_int x.idx
end)
let equal x y = x.idx = y.idx
let hash x = Int64.to_int x.idx
end)

let create n = (create n, Mutex.create ())
let merge (c, m) v = Mutex.mutexify m (fun () -> merge c v) ()
let remove (c, m) v = Mutex.mutexify m (fun () -> remove c v) ()
end

(* We lazily store last_start when concatenating
streams. The idea is to always have the greatest
Expand Down
15 changes: 5 additions & 10 deletions src/core/io/srt_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
(** SRT input *)

module Pcre = Re.Pcre
module WeakQueue = Liquidsoap_lang.Queues.WeakQueue

exception Done
exception Not_connected
Expand Down Expand Up @@ -449,17 +450,11 @@ class virtual output_networking_agent =
: Srt.socket -> exn -> Printexc.raw_backtrace -> unit
end

module ToDisconnect = Liquidsoap_lang.Active_value.Make (struct
type t = < disconnect : unit ; srt_id : int >

let id t = t#srt_id
end)

let to_disconnect = ToDisconnect.create 10
let to_disconnect = WeakQueue.create ()

let () =
Lifecycle.on_core_shutdown ~name:"Srt disconnect" (fun () ->
ToDisconnect.iter (fun s -> s#disconnect) to_disconnect)
WeakQueue.iter to_disconnect (fun s -> s#disconnect))

class virtual caller ~enforced_encryption ~pbkeylen ~passphrase ~streamid
~polling_delay ~payload_size ~messageapi ~hostname ~port ~connection_timeout
Expand All @@ -470,7 +465,7 @@ class virtual caller ~enforced_encryption ~pbkeylen ~passphrase ~streamid
val mutable connect_task = None
val task_should_stop = Atomic.make false
val socket = Atomic.make None
initializer ToDisconnect.add to_disconnect (self :> ToDisconnect.data)
initializer WeakQueue.push to_disconnect (self :> < disconnect : unit >)

method private get_socket =
match Atomic.get socket with Some s -> s | None -> raise Not_connected
Expand Down Expand Up @@ -557,7 +552,7 @@ class virtual listener ~enforced_encryption ~pbkeylen ~passphrase ~max_clients
method virtual should_stop : bool
method virtual mutexify : 'a 'b. ('a -> 'b) -> 'a -> 'b
val listening_socket = Atomic.make None
initializer ToDisconnect.add to_disconnect (self :> ToDisconnect.data)
initializer WeakQueue.push to_disconnect (self :> < disconnect : unit >)

method private is_connected =
self#mutexify (fun () -> client_sockets <> []) ()
Expand Down
19 changes: 7 additions & 12 deletions src/core/lang_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,9 @@
*****************************************************************************)

module Lang = Liquidsoap_lang.Lang
module WeakQueue = Liquidsoap_lang.Queues.WeakQueue
open Lang

module Alive_values_map = Liquidsoap_lang.Active_value.Make (struct
type t = Value.t

let id v = v.Value.id
end)

module ClockValue = struct
include Value.MkAbstract (struct
type content = Clock.t
Expand Down Expand Up @@ -389,16 +384,16 @@ let to_track = Track.of_value
the currently defined source as argument). *)
type 'a operator_method = string * scheme * string * ('a -> value)

let checked_values = Alive_values_map.create 10
let checked_values = WeakQueue.create ()

(** Ensure that the frame contents of all the sources occurring in the value agree with [t]. *)
let check_content v t =
let check t t' = Typing.(t <: t') in
let rec check_value v t =
if not (Alive_values_map.mem checked_values v) then (
if not (WeakQueue.exists checked_values (fun v' -> v' == v)) then (
(* We need to avoid checking the same value multiple times, otherwise we
get an exponential blowup, see #1247. *)
Alive_values_map.add checked_values v;
WeakQueue.push checked_values v;
match (v.Value.value, (Type.deref t).Type.descr) with
| _, Type.Var _ -> ()
| _ when Source_val.is_value v ->
Expand Down Expand Up @@ -656,7 +651,7 @@ let add_track_operator ~(category : Doc.Value.source) ~descr ?(flags = [])
let category = `Track category in
add_builtin ~category ~descr ~flags ?base name arguments return_t f

let itered_values = Alive_values_map.create 10
let itered_values = WeakQueue.create ()

let iter_sources ?(on_imprecise = fun () -> ()) f v =
let rec iter_term env v =
Expand Down Expand Up @@ -700,10 +695,10 @@ let iter_sources ?(on_imprecise = fun () -> ()) f v =
v.Term.methods;
iter_base_term env v
and iter_value v =
if not (Alive_values_map.mem itered_values v) then (
if not (WeakQueue.exists itered_values (fun v' -> v == v')) then (
(* We need to avoid checking the same value multiple times, otherwise we
get an exponential blowup, see #1247. *)
Alive_values_map.add itered_values v;
WeakQueue.push itered_values v;
Value.Methods.iter (fun _ v -> iter_value v) v.Value.methods;
match v.value with
| _ when Source_val.is_value v -> f (Source_val.of_value v)
Expand Down
46 changes: 20 additions & 26 deletions src/core/tools/pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -44,44 +44,38 @@ module type S = sig
val clear : unit -> unit
end

module WeakQueue = Liquidsoap_lang.Queues.WeakQueue

module Make (P : T) : S with type t = P.t = struct
type t = P.t

module WeakHash = Weak.Make (struct
type t = P.t

let equal t t' = P.id t = P.id t'
let hash = P.id
end)
let q = WeakQueue.create ()

let h = WeakHash.create 10
exception Found of t

let find id =
match WeakHash.find_opt h (P.destroyed id) with
| Some v when not (P.is_destroyed v) -> Some v
| _ -> None
try
WeakQueue.iter q (fun r ->
if P.id r = id && not (P.is_destroyed r) then raise (Found r));
None
with Found r -> Some r

let fold f =
WeakHash.fold
(fun v cur -> if P.is_destroyed v then cur else f (P.id v) v cur)
h
WeakQueue.fold q (fun v cur ->
if P.is_destroyed v then cur else f (P.id v) v cur)

let iter f =
WeakHash.iter
(fun entry -> if not (P.is_destroyed entry) then f (P.id entry) entry)
h
WeakQueue.iter q (fun entry ->
if not (P.is_destroyed entry) then f (P.id entry) entry)

let remove id = WeakHash.remove h (P.destroyed id)
let remove id = WeakQueue.filter q (fun r -> P.id r = id)
let current_id = Atomic.make 0

let add fn =
let rec f () =
let id = Atomic.fetch_and_add current_id 1 in
let v = fn id in
match WeakHash.merge h v with v' when v == v' -> v | _ -> f ()
in
f ()

let size () = WeakHash.count h
let clear () = WeakHash.clear h
let v = fn (Atomic.fetch_and_add current_id 1) in
WeakQueue.push q v;
v

let size () = WeakQueue.length q
let clear () = WeakQueue.flush q (fun _ -> ())
end
38 changes: 0 additions & 38 deletions src/lang/active_value.ml

This file was deleted.

33 changes: 0 additions & 33 deletions src/lang/active_value.mli

This file was deleted.

1 change: 0 additions & 1 deletion src/lang/dune
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
(pps sedlex.ppx ppx_string))
(libraries liquidsoap-lang.console dune-site re str unix menhirLib)
(modules
active_value
backoff
build_config
builtins_bool
Expand Down
7 changes: 1 addition & 6 deletions src/lang/term/runtime_term.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ type pattern =

and meth_term_default = [ `Nullable | `Pattern of pattern | `None ]

type 'a term = {
mutable t : Type.t;
term : 'a;
methods : 'a term Methods.t;
id : int;
}
type 'a term = { mutable t : Type.t; term : 'a; methods : 'a term Methods.t }

(* ~l1:x1 .. ?li:(xi=defi) .. *)
type ('a, 'b) func_argument = {
Expand Down
Loading
Loading