diff --git a/src/core/io/ffmpeg_io.ml b/src/core/io/ffmpeg_io.ml index 06a1590a64..21c463d21d 100644 --- a/src/core/io/ffmpeg_io.ml +++ b/src/core/io/ffmpeg_io.ml @@ -60,6 +60,13 @@ let () = Lifecycle.before_core_shutdown ~name:"input.ffmpeg shutdown" (fun () -> Atomic.set shutdown true) +let string_of_source_status = function + | `Stopped -> "stopped" + | `Starting -> "starting" + | `Polling -> "polling" + | `Connected _ -> "connected" + | `Stopping -> "stopping" + class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug ~max_buffer ~on_error ~on_stop ~on_start ~on_connect ~metadata_filter ~on_disconnect ~new_track_on_metadata ?format ~opts ~trim_url url = @@ -70,6 +77,14 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug ~name ~fallible:true ~on_start ~on_stop ~autostart () as super val connect_task = Atomic.make None + + initializer + let t = + Duppy.Async.add ~priority:`Blocking Tutils.scheduler self#do_connect + in + Atomic.set connect_task (Some t) + + method connect_task = Option.get (Atomic.get connect_task) method seek_source = (self :> Source.source) method remaining = -1 method abort_track = Generator.add_track_mark self#buffer @@ -104,77 +119,102 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug let u = url () in if trim_url then String.trim u else u - method set_url u = url <- u + method set_url u = + let old_url = url in + url <- u; + if u () <> old_url () then self#disconnect + method buffer_length = Frame.seconds_of_audio (Generator.length self#buffer) - method private connect_task () = + method private do_connect () = Generator.set_max_length self#buffer max_length; try - if self#source_status = `Stopping then raise Stopped; - assert (self#source_status = `Starting); - Atomic.set source_status `Polling; - let opts = Hashtbl.copy opts in - let url = self#url in - let closed = Atomic.make false in - let input = - Av.open_input - ~interrupt:(fun () -> Atomic.get shutdown || Atomic.get closed) - ?format ~opts url - in - if Hashtbl.length opts > 0 then - failwith - (Printf.sprintf "Unrecognized options: %s" - (Ffmpeg_format.string_of_options opts)); - let content_type = - Ffmpeg_decoder.get_type ~format ~ctype:self#content_type ~url input - in - if not (Decoder.can_decode_type content_type self#content_type) then - failwith - (Printf.sprintf "url %S cannot produce content of type %s" url - (Frame.string_of_content_type self#content_type)); - let streams = - Ffmpeg_decoder.mk_streams ~ctype:self#content_type - ~decode_first_metadata:true input - in - let decoder = - Ffmpeg_decoder.mk_decoder ~streams ~target_position:(ref None) input - in - let buffer = Decoder.mk_buffer ~ctype:self#content_type self#buffer in - (* FFmpeg has memory leaks with chained ogg stream so we manually - reset the metadata after fetching it. *) - let get_metadata stream = - let m = Av.get_metadata stream in - Av.set_metadata stream []; - m - in - let get_metadata () = - normalize_metadata - (Ffmpeg_decoder.Streams.fold - (fun _ stream m -> - m - @ - match stream with - | `Audio_frame (stream, _) -> get_metadata stream - | `Audio_packet (stream, _) -> get_metadata stream - | `Video_frame (stream, _) -> get_metadata stream - | `Video_packet (stream, _) -> get_metadata stream - | `Data_packet _ -> []) - streams - (Av.get_input_metadata input)) - in - let last_meta = ref [] in - let get_metadata () = - let m = get_metadata () in - if m <> !last_meta then ( - last_meta := m; - m) - else [] - in - on_connect input; - Generator.add_track_mark self#buffer; - let container = { input; decoder; buffer; get_metadata; closed } in - Atomic.set source_status (`Connected (url, container)); - -1. + match self#source_status with + | `Stopping -> raise Stopped + | `Stopped | `Polling | `Connected _ -> -1. + | `Starting -> + Atomic.set source_status `Polling; + let opts = Hashtbl.copy opts in + let url = self#url in + let closed = Atomic.make false in + let input = + Av.open_input + ~interrupt:(fun () -> + Atomic.get shutdown || Atomic.get closed) + ?format ~opts url + in + if Hashtbl.length opts > 0 then + failwith + (Printf.sprintf "Unrecognized options: %s" + (Ffmpeg_format.string_of_options opts)); + let content_type = + Ffmpeg_decoder.get_type ~format ~ctype:self#content_type ~url + input + in + if not (Decoder.can_decode_type content_type self#content_type) + then + failwith + (Printf.sprintf "url %S cannot produce content of type %s" url + (Frame.string_of_content_type self#content_type)); + let streams = + Ffmpeg_decoder.mk_streams ~ctype:self#content_type + ~decode_first_metadata:true input + in + let decoder = + Ffmpeg_decoder.mk_decoder ~streams ~target_position:(ref None) + input + in + let buffer = + Decoder.mk_buffer ~ctype:self#content_type self#buffer + in + (* FFmpeg has memory leaks with chained ogg stream so we manually + reset the metadata after fetching it. *) + let get_metadata stream = + let m = Av.get_metadata stream in + Av.set_metadata stream []; + m + in + let get_metadata () = + normalize_metadata + (Ffmpeg_decoder.Streams.fold + (fun _ stream m -> + m + @ + match stream with + | `Audio_frame (stream, _) -> get_metadata stream + | `Audio_packet (stream, _) -> get_metadata stream + | `Video_frame (stream, _) -> get_metadata stream + | `Video_packet (stream, _) -> get_metadata stream + | `Data_packet _ -> []) + streams + (Av.get_input_metadata input)) + in + let last_meta = ref [] in + let get_metadata () = + let m = get_metadata () in + if m <> !last_meta then ( + last_meta := m; + m) + else [] + in + let container = + { input; decoder; buffer; get_metadata; closed } + in + if + Atomic.compare_and_set source_status `Polling + (`Connected (url, container)) + then ( + on_connect input; + Generator.add_track_mark self#buffer) + else ( + Atomic.set closed true; + Av.close input; + match self#source_status with + | `Stopping -> raise Stopped + | v -> + self#log#important "Inconsistent source status: %s" + (string_of_source_status v)); + -1. with | Stopped -> Atomic.set source_status `Stopped; @@ -192,25 +232,14 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug method private connect = match self#source_status with | `Starting | `Polling | `Connected _ -> () - | `Stopping | `Stopped -> ( + | `Stopping | `Stopped -> Atomic.set source_status `Starting; - match Atomic.get connect_task with - | Some t -> Duppy.Async.wake_up t - | None -> - let t = - Duppy.Async.add ~priority:`Blocking Tutils.scheduler - self#connect_task - in - Atomic.set connect_task (Some t); - Duppy.Async.wake_up t) + Duppy.Async.wake_up self#connect_task method private disconnect = let stop_task () = - match Atomic.get connect_task with - | None -> () - | Some t -> - Atomic.set source_status `Stopping; - Duppy.Async.wake_up t + Atomic.set source_status `Stopping; + Duppy.Async.wake_up self#connect_task in match self#source_status with | `Stopping | `Stopped -> ()