-
Notifications
You must be signed in to change notification settings - Fork 17
/
pravega-rtsp-server.rs
274 lines (233 loc) · 10.5 KB
/
pravega-rtsp-server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
//
// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Based on gstreamer-rs/examples/src/bin/rtsp-server.rs.
// This example demonstrates how to set up a rtsp server using GStreamer.
// For this, the example parses an arbitrary pipeline in launch syntax
// from the cli and provides this pipeline's output as stream, served
// using GStreamers rtsp server.
use anyhow::Error;
use clap::Clap;
use derive_more::{Display, Error};
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst_rtsp_server::prelude::*;
use gst_rtsp_server::subclass::prelude::*;
use log::info;
use std::collections::HashMap;
use url::Url;
#[derive(Debug, Display, Error)]
#[display(fmt = "Could not get mount points")]
struct NoMountPoints;
/// Pravega RTSP server
#[derive(Clap)]
struct Opts {
/// Pravega controller in format "127.0.0.1:9090"
#[clap(short, long, default_value = "127.0.0.1:9090")]
controller: String,
/// Pravega scope
#[clap(short, long)]
scope: String,
}
fn main() {
match run() {
Ok(r) => r,
Err(e) => eprintln!("Error! {}", e),
}
}
fn run() -> Result<(), Error> {
env_logger::init();
let _opts: Opts = Opts::parse();
// Initialize GStreamer
gst::init()?;
let main_loop = glib::MainLoop::new(None, false);
let server = gst_rtsp_server::RTSPServer::new();
// Much like HTTP servers, RTSP servers have multiple endpoints that
// provide different streams. Here, we ask our server to give
// us a reference to his list of endpoints, so we can add our
// test endpoint, providing the pipeline from the cli.
let mounts = server.mount_points().ok_or(NoMountPoints)?;
// // Next, we create a factory for the endpoint we want to create.
// // The job of the factory is to create a new pipeline for each client that
// // connects, or (if configured to do so) to reuse an existing pipeline.
// let factory = gst_rtsp_server::RTSPMediaFactory::new();
// Next, we create our custom factory for the endpoint we want to create.
// The job of the factory is to create a new pipeline for each client that
// connects, or (if configured to do so) to reuse an existing pipeline.
let factory = media_factory::Factory::default();
// Here we tell the media factory the media we want to serve.
// This is done in the launch syntax. When the first client connects,
// the factory will use this syntax to create a new pipeline instance.
// factory.set_launch(args[1].as_str());
// This setting specifies whether each connecting client gets the output
// of a new instance of the pipeline, or whether all connected clients share
// the output of the same pipeline.
// If you want to stream a fixed video you have stored on the server to any
// client, you would not set this to shared here (since every client wants
// to start at the beginning of the video). But if you want to distribute
// a live source, you will probably want to set this to shared, to save
// computing and memory capacity on the server.
// factory.set_shared(true);
// Now we add a new mount-point and tell the RTSP server to serve the content
// provided by the factory we configured above, when a client connects to
// this specific path.
mounts.add_factory("/test", &factory);
// Attach the server to our main context.
// A main context is the thing where other stuff is registering itself for its
// events (e.g. sockets, GStreamer bus, ...) and the main loop is something that
// polls the main context for its events and dispatches them to whoever is
// interested in them. In this example, we only do have one, so we can
// leave the context parameter empty, it will automatically select
// the default one.
let id = server.attach(None)?;
println!(
"Stream ready at rtsp://127.0.0.1:{}/test",
server.bound_port()
);
// Start the mainloop. From this point on, the server will start to serve
// our quality content to connecting clients.
main_loop.run();
glib::source_remove(id);
Ok(())
}
// Our custom media factory that creates a media input manually
mod media_factory {
use super::*;
// In the imp submodule we include the actual implementation
mod imp {
use super::*;
// This is the private data of our factory
pub struct Factory {}
// This trait registers our type with the GObject object system and
// provides the entry points for creating a new instance and setting
// up the class data
#[glib::object_subclass]
impl ObjectSubclass for Factory {
const NAME: &'static str = "RsRTSPMediaFactory";
type Type = super::Factory;
type ParentType = gst_rtsp_server::RTSPMediaFactory;
// Called when a new instance is to be created. We need to return an instance
// of our struct here.
fn new() -> Self {
Self {}
}
}
// Implementation of glib::Object virtual methods
impl ObjectImpl for Factory {
fn constructed(&self, factory: &Self::Type) {
self.parent_constructed(factory);
// All media created by this factory are our custom media type. This would
// not require a media factory subclass and can also be called on the normal
// RTSPMediaFactory.
factory.set_media_gtype(super::media::Media::static_type());
}
}
// Implementation of gst_rtsp_server::RTSPMediaFactory virtual methods
impl RTSPMediaFactoryImpl for Factory {
fn create_element(
&self,
_factory: &Self::Type,
url: &gst_rtsp::RTSPUrl,
) -> Option<gst::Element> {
let url = url.request_uri().unwrap().to_string();
let url = Url::parse(&url[..]).unwrap();
info!("url={:?}", url);
let query_map: HashMap<_, _> = url.query_pairs().into_owned().collect();
info!("query_map={:?}", query_map);
let stream = query_map.get("stream").unwrap().clone();
info!("stream={:?}", stream);
let opts: Opts = Opts::parse();
let pipeline_description = format!(
"pravegasrc stream={}/{} controller={} ! tsdemux ! h264parse ! rtph264pay name=pay0 pt=96",
opts.scope, stream, opts.controller);
info!("Launch Pipeline: {}", pipeline_description);
let bin = gst::parse_launch(&pipeline_description.to_owned()).unwrap();
Some(bin.upcast())
// let bin = gst::Bin::new(None);
// let pravegasrc = gst::ElementFactory::make("pravegasrc", None).unwrap();
// pravegasrc.set_property("controller", &"192.168.1.123:9090".to_value()).unwrap();
// pravegasrc.set_property("stream", &"examples/demo18".to_value()).unwrap();
// let demux = gst::ElementFactory::make("tsdemux", None).unwrap();
// let pay = gst::ElementFactory::make("rtph264pay", Some("pay0")).unwrap();
// pay.set_property("pt", &96u32.to_value()).unwrap();
// bin.add_many(&[&pravegasrc, &demux, &pay]).unwrap();
// gst::Element::link_many(&[&pravegasrc, &demux, &pay]).unwrap();
// Some(bin.upcast())
}
}
}
// This here defines the public interface of our factory and implements
// the corresponding traits so that it behaves like any other RTSPMediaFactory
glib::wrapper! {
pub struct Factory(ObjectSubclass<imp::Factory>) @extends gst_rtsp_server::RTSPMediaFactory;
}
// Factories must be Send+Sync, and ours is
unsafe impl Send for Factory {}
unsafe impl Sync for Factory {}
impl Default for Factory {
// Creates a new instance of our factory
fn default() -> Factory {
glib::Object::new(&[]).expect("Failed to create factory")
}
}
}
// Our custom media subclass that adds a custom attribute to the SDP returned by DESCRIBE
mod media {
use super::*;
// In the imp submodule we include the actual implementation
mod imp {
use super::*;
// This is the private data of our media
pub struct Media {}
// This trait registers our type with the GObject object system and
// provides the entry points for creating a new instance and setting
// up the class data
#[glib::object_subclass]
impl ObjectSubclass for Media {
const NAME: &'static str = "RsRTSPMedia";
type Type = super::Media;
type ParentType = gst_rtsp_server::RTSPMedia;
// Called when a new instance is to be created. We need to return an instance
// of our struct here.
fn new() -> Self {
info!("Created custom media");
Self {}
}
}
// Implementation of glib::Object virtual methods
impl ObjectImpl for Media {}
// Implementation of gst_rtsp_server::RTSPMedia virtual methods
impl RTSPMediaImpl for Media {
fn setup_sdp(
&self,
media: &Self::Type,
sdp: &mut gst_sdp::SDPMessageRef,
info: &gst_rtsp_server::subclass::SDPInfo,
) -> Result<(), gst::LoggableError> {
self.parent_setup_sdp(media, sdp, info)?;
sdp.add_attribute("my-custom-attribute", Some("has-a-value"));
Ok(())
}
fn query_stop(&self, media: &Self::Type) -> Option<gst::ClockTime> {
info!("query_stop: BEGIN");
let result = self.parent_query_stop(media);
info!("query_stop: END; result={:?}", result);
result
}
}
}
// This here defines the public interface of our factory and implements
// the corresponding traits so that it behaves like any other RTSPMedia
glib::wrapper! {
pub struct Media(ObjectSubclass<imp::Media>) @extends gst_rtsp_server::RTSPMedia;
}
// Medias must be Send+Sync, and ours is
unsafe impl Send for Media {}
unsafe impl Sync for Media {}
}