Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test for ungraceful stop of camera recorder pipeline #150

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions gst-plugin-pravega/src/pravegasink/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const PROPERTY_NAME_RETENTION_TYPE: &str = "retention-type";
const PROPERTY_NAME_RETENTION_DAYS: &str = "retention-days";
const PROPERTY_NAME_RETENTION_BYTES: &str = "retention-bytes";
const PROPERTY_NAME_RETENTION_MAINTENANCE_INTERVAL_SECONDS: &str = "retention-maintenance-interval-seconds";
const PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC: &str = "simulate-failure-after-sec";

#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::GEnum)]
#[repr(u32)]
Expand Down Expand Up @@ -244,6 +245,7 @@ struct Settings {
retention_days: Option<f64>,
retention_bytes: Option<u64>,
retention_maintenance_interval_seconds: u64,
simulate_failure_after_sec: Option<u64>,
}

impl Default for Settings {
Expand All @@ -263,6 +265,7 @@ impl Default for Settings {
retention_days: None,
retention_bytes: None,
retention_maintenance_interval_seconds: DEFAULT_RETENTION_MAINTENANCE_INTERVAL_SECONDS,
simulate_failure_after_sec: None,
}
}
}
Expand All @@ -284,6 +287,7 @@ enum State {
buffers_written: u64,
retention_thread_stop_tx: Sender<()>,
retention_thread_handle: Option<JoinHandle<()>>,
simulate_failure_after_sec: Option<u64>,
},
}

Expand Down Expand Up @@ -481,6 +485,15 @@ impl ObjectImpl for PravegaSink {
DEFAULT_RETENTION_MAINTENANCE_INTERVAL_SECONDS,
glib::ParamFlags::WRITABLE,
),
glib::ParamSpec::new_uint64(
PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC,
"Simulate failure after seconds",
"Simulate raw data writting failure after successful specfied seconds of index wrtting",
0,
std::u64::MAX,
0,
glib::ParamFlags::WRITABLE,
),
]});
PROPERTIES.as_ref()
}
Expand Down Expand Up @@ -664,7 +677,20 @@ impl ObjectImpl for PravegaSink {
if let Err(err) = res {
gst_error!(CAT, obj: obj, "Failed to set property `{}`: {}", PROPERTY_NAME_RETENTION_MAINTENANCE_INTERVAL_SECONDS, err);
}
},
},
PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC => {
let res: Result<(), glib::Error> = match value.get::<u64>() {
Ok(seconds) => {
let mut settings = self.settings.lock().unwrap();
settings.simulate_failure_after_sec = Some(seconds);
Ok(())
},
Err(_) => unreachable!("type checked upstream"),
};
if let Err(err) = res {
gst_error!(CAT, obj: obj, "Failed to set property `{}`: {}", PROPERTY_NAME_SIMULATE_FAILURE_AFTER_SEC, err);
}
},
_ => unimplemented!(),
};
}
Expand Down Expand Up @@ -744,6 +770,7 @@ impl BaseSinkImpl for PravegaSink {
gst_info!(CAT, obj: element, "start: controller={}", controller);
let keycloak_file = settings.keycloak_file.clone();
gst_info!(CAT, obj: element, "start: keycloak_file={:?}", keycloak_file);
gst_info!(CAT, obj: element, "start: simulate_failure_after_sec={:?}", settings.simulate_failure_after_sec);
let config = utils::create_client_config(controller, keycloak_file).map_err(|error| {
gst::error_msg!(gst::ResourceError::Settings, ["Failed to create Pravega client config: {}", error])
})?;
Expand Down Expand Up @@ -845,6 +872,7 @@ impl BaseSinkImpl for PravegaSink {
buffers_written: 0,
retention_thread_stop_tx,
retention_thread_handle,
simulate_failure_after_sec: settings.simulate_failure_after_sec,
};
gst_info!(CAT, obj: element, "start: Started");
Ok(())
Expand All @@ -867,7 +895,8 @@ impl BaseSinkImpl for PravegaSink {
last_index_time,
final_timestamp,
final_offset,
buffers_written) = match *state {
buffers_written,
simulate_failure_after_sec) = match *state {
State::Started {
ref mut writer,
ref mut index_writer,
Expand All @@ -876,14 +905,16 @@ impl BaseSinkImpl for PravegaSink {
ref mut final_timestamp,
ref mut final_offset,
ref mut buffers_written,
simulate_failure_after_sec,
..
} => (writer,
index_writer,
first_valid_time,
last_index_time,
final_timestamp,
final_offset,
buffers_written),
buffers_written,
simulate_failure_after_sec),
State::Stopped => {
gst::element_error!(element, gst::CoreError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
Expand Down Expand Up @@ -1114,6 +1145,15 @@ impl BaseSinkImpl for PravegaSink {
}
*final_offset = Some(writer_offset_end);

if let Some(seconds) = simulate_failure_after_sec {
if !first_valid_time.is_none() {
if (timestamp - first_valid_time.to_owned()).nanoseconds().unwrap() > (seconds as i32 * SECOND).nanoseconds().unwrap() {
gst::element_error!(element, gst::CoreError::Failed, ["Simulate pravegasink failure"]);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see "Simulate pravegasink failure" in the test log. How can I confirm that this is actually simulating a failure?
Also, by not closing the writer, this is effectively performing a graceful stop as you can tell from these log messages:

0:00:35.579572114 50606 0x558c7e3ab860 INFO             pravegasink gst-plugin-pravega/src/pravegasink/imp.rs:1167:gstpravega::pravegasink::imp:<pravegasink0> stop: BEGIN
0:00:35.581769200 50606 0x558c7e3ab860 INFO             pravegasink gst-plugin-pravega/src/pravegasink/imp.rs:1221:gstpravega::pravegasink::imp:<pravegasink0> stop: Wrote final index record IndexRecord { timestamp: 2022-01-04T18:12:48.404852888Z (1641320005404852888 ns, 455922:13:25.404852888), offset: 163880, random_access: false, discontinuity: false }
0:00:35.583745207 50606 0x558c7e3ab860 INFO             pravegasink gst-plugin-pravega/src/pravegasink/imp.rs:1250:gstpravega::pravegasink::imp:<pravegasink0> stop: END: result=Ok(())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log "Simulate pravegasink failure" in info.

return Err(gst::FlowError::Error);
}
}
}

Ok(gst::FlowSuccess::Ok)
})();
gst_trace!(CAT, obj: element, "render: END: result={:?}", result);
Expand All @@ -1135,7 +1175,8 @@ impl BaseSinkImpl for PravegaSink {
final_timestamp,
final_offset,
retention_thread_stop_tx,
retention_thread_handle) = match *state {
retention_thread_handle,
simulate_failure_after_sec) = match *state {
State::Started {
ref runtime,
ref mut writer,
Expand All @@ -1144,14 +1185,16 @@ impl BaseSinkImpl for PravegaSink {
ref mut final_offset,
ref mut retention_thread_stop_tx,
ref mut retention_thread_handle,
simulate_failure_after_sec,
..
} => (runtime,
writer,
index_writer,
final_timestamp,
final_offset,
retention_thread_stop_tx,
retention_thread_handle),
retention_thread_handle,
simulate_failure_after_sec),
State::Stopped => {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
Expand All @@ -1164,6 +1207,12 @@ impl BaseSinkImpl for PravegaSink {
gst::error_msg!(gst::ResourceError::Write, ["Failed to flush Pravega data stream: {}", error])
})?;

if let Some(_) = simulate_failure_after_sec {
gst_info!(CAT, obj: element, "Simulate pravegasink failure");
*state = State::Stopped;
return Ok(());
}

// Write final index record.
// The timestamp will be the the buffer timestamp + duration of the final buffer.
// The offset will be current write position.
Expand Down
121 changes: 121 additions & 0 deletions integration-test/src/camera_recorder_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//
// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//

#[cfg(test)]
mod test {
use gst::prelude::*;
use gst::ClockType::Realtime;
use pravega_video::timestamp::{PravegaTimestamp, TimeDelta, SECOND};
use rstest::rstest;
#[allow(unused_imports)]
use tracing::{error, info, debug};
use uuid::Uuid;
use crate::*;
use crate::rtsp_camera_simulator::{start_or_get_rtsp_test_source, RTSPCameraSimulatorConfigBuilder};
use crate::utils::*;

/// Test ungraceful stop of camera recorder pipeline by simulate a pravegasink failure.
/// The pipeline can continue to record the video stream after restart and the video stream can be decoded and playback.
#[rstest]
#[case(Realtime, 30, 15)]
fn test_ungraceful_stop(#[case] clock_type: gst::ClockType, #[case] num_sec_to_record: u64, #[case] num_sec_to_failure: u64) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment describing what this test is supposed to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

gst_init();
let clock = gst::SystemClock::obtain();
clock.set_property("clock-type", &clock_type).unwrap();
gst::SystemClock::set_default(Some(&clock));
let rtsp_server_config = RTSPCameraSimulatorConfigBuilder::default().fps(20).build().unwrap();
let fps = rtsp_server_config.fps;
let num_frames = num_sec_to_record * fps;
let (rtsp_url, _rtsp_server) = start_or_get_rtsp_test_source(rtsp_server_config);
let test_config = &get_test_config();
info!("### BEGIN:test_config={:?}", test_config);
let stream_name = &format!("test-pravegasrc-{}-{}", test_config.test_id, Uuid::new_v4())[..];
let pipeline_description = format!("\
rtspsrc \
buffer-mode=none \
drop-on-latency=true \
latency=2000 \
location={rtsp_url} \
ntp-sync=true \
ntp-time-source=running-time \
! rtph264depay \
! h264parse \
! video/x-h264,alignment=au \
! identity name=h264par silent=false eos-after={num_frames} \
! timestampcvt \
! identity name=tscvt__ silent=false \
! mp4mux streamable=true fragment-duration=1 ! fragmp4pay \
! pravegasink {pravega_plugin_properties} \
timestamp-mode=tai sync=false \
",
rtsp_url = rtsp_url,
num_frames = num_frames,
pravega_plugin_properties = test_config.pravega_plugin_properties(stream_name),
);
let expected_timestamp = PravegaTimestamp::now();
let _ = launch_pipeline_and_get_summary(format!("{} simulate-failure-after-sec={}", pipeline_description, num_sec_to_failure).as_ref());

// restart the pipeline
let _ = launch_pipeline_and_get_summary(&pipeline_description).unwrap();

info!("#### Read recorded stream from Pravega, no demux, no decoding, part 1");
let pipeline_description_read = format!(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should test that the video is also decodable. Use the code from https://github.com/pravega/gstreamer-pravega/blob/master/integration-test/src/rtsp_tests.rs#L200-L228. Some frames around the ungraceful stop may be corrupted but others before and after should be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

"pravegasrc {pravega_plugin_properties} \
start-mode=earliest \
end-mode=latest \
! appsink name=sink sync=false",
pravega_plugin_properties = test_config.pravega_plugin_properties(stream_name),
);
let summary_read = launch_pipeline_and_get_summary(&pipeline_description_read).unwrap();
debug!("summary_read={}", summary_read);
let first_pts_read = summary_read.first_valid_pts();
let last_pts_read = summary_read.last_valid_pts();
assert!(first_pts_read.is_some(), "Pipeline is not recording timestamps");
assert_between_u64("decreasing_pts_count", summary_read.decreasing_pts_count(), 0, 0);
assert_between_u64("decreasing_dts_count", summary_read.decreasing_dts_count(), 0, 0);
// the gap between fisrt_pts_read and expected_timestamp is caused by the duration of creation of scope & stream and first 5s invalid PTS when RTSP connection initialized
let gap = 60;
assert_timestamp_approx_eq("first_pts_read", first_pts_read, expected_timestamp, 0 * SECOND, gap * SECOND);
assert_timestamp_approx_eq("last_pts_read", last_pts_read, expected_timestamp + (num_sec_to_record + num_sec_to_failure) * SECOND, 0 * SECOND, gap * 2 * SECOND);
assert!(summary_read.pts_range() >= (num_sec_to_record + num_sec_to_failure - 10 * 2) * SECOND);
assert!(summary_read.pts_range() <= (num_sec_to_record + num_sec_to_failure + gap) * SECOND);

info!("#### Read recorded stream from Pravega, complete decoding, part 2");
let pipeline_description_decode = format!(
"pravegasrc {pravega_plugin_properties} \
start-mode=earliest \
end-mode=latest \
! identity name=fromsource silent=false \
! decodebin \
! identity name=decoded silent=false \
! appsink name=sink sync=false",
pravega_plugin_properties = test_config.pravega_plugin_properties(stream_name),
);
let summary_decoded = launch_pipeline_and_get_summary(&pipeline_description_decode).unwrap();
summary_decoded.dump("summary_decoded: ");
debug!("summary_decoded={}", summary_decoded);
let first_pts_decoded = summary_decoded.first_valid_pts();
let last_pts_decoded = summary_decoded.last_valid_pts();
assert!(first_pts_decoded.is_some(), "Pipeline is not recording timestamps");
assert_between_u64("decreasing_pts_count", summary_decoded.decreasing_pts_count(), 0, 0);
let decode_margin = 10 * SECOND;
assert_timestamp_approx_eq("first_pts_decoded", first_pts_decoded, first_pts_read, decode_margin, decode_margin);
assert_timestamp_approx_eq("last_pts_decoded", last_pts_decoded, last_pts_read, decode_margin, decode_margin);
assert!(summary_decoded.pts_range() >= (num_sec_to_record + num_sec_to_failure - 10 * 2) * SECOND);
assert!(summary_decoded.pts_range() <= (num_sec_to_record + num_sec_to_failure + gap) * SECOND);
let num_frames_expected_min = (num_sec_to_record + num_sec_to_failure - 10 * 2) * fps;
let num_frames_expected_max = (num_sec_to_record + num_sec_to_failure) * fps;
assert_between_u64("num_buffers", summary_decoded.num_buffers(), num_frames_expected_min, num_frames_expected_max);
assert_between_u64("num_buffers_with_valid_pts", summary_decoded.num_buffers_with_valid_pts(), num_frames_expected_min, num_frames_expected_max);
// Last 2 buffers are usually corrupted. These can be ignored.
assert_between_u64("corrupted_buffer_count", summary_decoded.corrupted_buffer_count(), 0, 2);
assert_between_u64("imperfect_timestamp_count", summary_decoded.imperfect_pts_count(TimeDelta::none()), 0, 0);
}
}
1 change: 1 addition & 0 deletions integration-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#![allow(dead_code)]

mod camera_recorder_tests;
mod extreme_tests;
mod failure_recovery_tests;
mod file_import_tests;
Expand Down