I have modified the deepstream_test3_app to add sources at runtime. Specifically, I have created another thread which adds source after some delay. So, far everything works fine if I add mp4 file source. But when I add another rtsp source the tiler does not display the newly added stream.
No errors or warnings are logged when file source is added. But, when rtsp source is added a warning about frames being dropped is logged.
Below is the code I am using to add sources at runtime. Any help will be highly appreciated.
/*
* Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
#include <pthread.h>
#include <chrono>
#include <gst/gst.h>
#include <glib.h>
#include <cstdio>
#include <cmath>
#include <cstring>
#include <ctime>
#include <thread>
#include <iostream>
#include "gstnvdsmeta.h"
#include "gst-nvmessage.h"
/* The muxer output resolution must be set if the input streams will be of
* different resolution. The muxer will scale all the input frames to this
* resolution. */
#define MUXER_OUTPUT_WIDTH 1280
#define MUXER_OUTPUT_HEIGHT 720
/* Muxer batch formation timeout, for e.g. 40 millisec. Should ideally be set
* based on the fastest source's framerate. */
#define MUXER_BATCH_TIMEOUT_USEC 3400000
#define TILED_OUTPUT_WIDTH_INFER 1280
#define TILED_OUTPUT_HEIGHT_INFER 720
#define TILED_OUTPUT_WIDTH_OF 640
#define TILED_OUTPUT_HEIGHT_OF 360
using namespace std;
GstElement *pipeline = NULL;
GstElement *streammux = NULL;
bool enableThread =true;
guint i = 0;
/* NVIDIA Decoder source pad memory feature. This feature signifies that source
* pads having this capability will push GstBuffers containing NvBufSurface. */
#define GST_CAPS_FEATURES_NVMM "memory:NVMM"
static gboolean
bus_call (GstBus * bus, GstMessage * msg, gpointer data)
{
GMainLoop *loop = (GMainLoop *) data;
printf("In main loop \n");
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_EOS:
g_print ("End of stream\n");
g_main_loop_quit (loop);
break;
case GST_MESSAGE_WARNING:
{
gchar *debug;
GError *error;
gst_message_parse_warning (msg, &error, &debug);
g_printerr ("WARNING from element %s: %s\n",
GST_OBJECT_NAME (msg->src), error->message);
g_free (debug);
g_printerr ("Warning: %s\n", error->message);
g_error_free (error);
break;
}
case GST_MESSAGE_ERROR:
{
gchar *debug;
GError *error;
gst_message_parse_error (msg, &error, &debug);
g_printerr ("ERROR from element %s: %s\n",
GST_OBJECT_NAME (msg->src), error->message);
if (debug)
g_printerr ("Error details: %s\n", debug);
g_free (debug);
g_error_free (error);
g_main_loop_quit (loop);
break;
}
case GST_MESSAGE_ELEMENT:
{
if (gst_nvmessage_is_stream_eos (msg)) {
guint stream_id;
if (gst_nvmessage_parse_stream_eos (msg, &stream_id)) {
g_print ("Got EOS from stream %d\n", stream_id);
}
}
break;
}
case GST_MESSAGE_DEVICE_ADDED:
{
printf("we added new source");
}
/////////////////////////////////////////////////////////////////////////////////////
case GST_MESSAGE_UNKNOWN:
break;
case GST_MESSAGE_INFO:
break;
case GST_MESSAGE_TAG:
break;
case GST_MESSAGE_BUFFERING:
break;
case GST_MESSAGE_STATE_CHANGED:
break;
case GST_MESSAGE_STATE_DIRTY:
break;
case GST_MESSAGE_STEP_DONE:
break;
case GST_MESSAGE_CLOCK_PROVIDE:
break;
case GST_MESSAGE_CLOCK_LOST:
break;
case GST_MESSAGE_NEW_CLOCK:
break;
case GST_MESSAGE_STRUCTURE_CHANGE:
break;
case GST_MESSAGE_STREAM_STATUS:
break;
case GST_MESSAGE_APPLICATION:
break;
case GST_MESSAGE_SEGMENT_START:
break;
case GST_MESSAGE_SEGMENT_DONE:
break;
case GST_MESSAGE_DURATION_CHANGED:
break;
case GST_MESSAGE_LATENCY:
break;
case GST_MESSAGE_ASYNC_START:
break;
case GST_MESSAGE_ASYNC_DONE:
break;
case GST_MESSAGE_REQUEST_STATE:
break;
case GST_MESSAGE_STEP_START:
break;
case GST_MESSAGE_QOS:
break;
case GST_MESSAGE_PROGRESS:
break;
case GST_MESSAGE_TOC:
break;
case GST_MESSAGE_RESET_TIME:
break;
case GST_MESSAGE_STREAM_START:
break;
case GST_MESSAGE_NEED_CONTEXT:
break;
case GST_MESSAGE_HAVE_CONTEXT:
break;
case GST_MESSAGE_EXTENDED:
break;
case GST_MESSAGE_DEVICE_REMOVED:
break;
case GST_MESSAGE_PROPERTY_NOTIFY:
break;
case GST_MESSAGE_STREAM_COLLECTION:
break;
case GST_MESSAGE_STREAMS_SELECTED:
break;
case GST_MESSAGE_REDIRECT:
break;
case GST_MESSAGE_ANY:
break;
/////////////////////////////////////////////////////////////////////////////////
default:
break;
}
return TRUE;
}
static void
cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data)
{
g_print ("In cb_newpad\n");
GstCaps *caps = gst_pad_get_current_caps (decoder_src_pad);
const GstStructure *str = gst_caps_get_structure (caps, 0);
const gchar *name = gst_structure_get_name (str);
GstElement *source_bin = (GstElement *) data;
GstCapsFeatures *features = gst_caps_get_features (caps, 0);
/* Need to check if the pad created by the decodebin is for video and not
* audio. */
if (!strncmp (name, "video", 5)) {
/* Link the decodebin pad only if decodebin has picked nvidia
* decoder plugin nvv4l2decoder. We do this by checking if the pad caps contain
* NVMM memory features. */
if (gst_caps_features_contains (features, GST_CAPS_FEATURES_NVMM)) {
/* Get the source bin ghost pad */
GstPad *bin_ghost_pad = gst_element_get_static_pad (source_bin, "src");
if (!gst_ghost_pad_set_target (GST_GHOST_PAD (bin_ghost_pad),
decoder_src_pad)) {
g_printerr ("Failed to link decoder src pad to source bin ghost pad\n");
}
gst_object_unref (bin_ghost_pad);
} else {
g_printerr ("Error: Decodebin did not pick nvidia decoder plugin.\n");
}
}
}
static void
decodebin_child_added (GstChildProxy * child_proxy, GObject * object,
gchar * name, gpointer user_data)
{
g_print ("Decodebin child added: %s\n", name);
if (g_strrstr (name, "decodebin") == name) {
g_signal_connect (G_OBJECT (object), "child-added",
G_CALLBACK (decodebin_child_added), user_data);
}
}
static GstElement *
create_source_bin (guint index, gchar * uri)
{
GstElement *bin = NULL, *uri_decode_bin = NULL;
gchar bin_name[16] = { };
g_snprintf (bin_name, 15, "source-bin-%02d", index);
/* Create a source GstBin to abstract this bin's content from the rest of the
* pipeline */
bin = gst_bin_new (bin_name);
/* Source element for reading from the uri.
* We will use decodebin and let it figure out the container format of the
* stream and the codec and plug the appropriate demux and decode plugins. */
uri_decode_bin = gst_element_factory_make ("uridecodebin", "uri-decode-bin");
if (!bin || !uri_decode_bin) {
g_printerr ("One element in source bin could not be created.\n");
return NULL;
}
/* We set the input uri to the source element */
g_object_set (G_OBJECT (uri_decode_bin), "uri", uri, NULL);
/* Connect to the "pad-added" signal of the decodebin which generates a
* callback once a new pad for raw data has beed created by the decodebin */
g_signal_connect (G_OBJECT (uri_decode_bin), "pad-added",
G_CALLBACK (cb_newpad), bin);
g_signal_connect (G_OBJECT (uri_decode_bin), "child-added",
G_CALLBACK (decodebin_child_added), bin);
gst_bin_add (GST_BIN (bin), uri_decode_bin);
/* We need to create a ghost pad for the source bin which will act as a proxy
* for the video decoder src pad. The ghost pad will not have a target right
* now. Once the decode bin creates the video decoder and generates the
* cb_newpad callback, we will set the ghost pad target to the video decoder
* src pad. */
if (!gst_element_add_pad (bin, gst_ghost_pad_new_no_target ("src",
GST_PAD_SRC))) {
g_printerr ("Failed to add ghost pad in source bin\n");
return NULL;
}
printf("Dynamic DSO");
return bin;
}
//############################ Add Source #######################################
/////////////////////////////////////////////////////////////////////////////////
int AddSource(gchar * uri)
{
GstPad *sinkpad, *srcpad;
gchar pad_name[16] = { };
GstElement *source_bin = create_source_bin (i, uri);
if (!source_bin) {
g_printerr ("Failed to create source bin. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), source_bin);
g_snprintf (pad_name, 15, "sink_%u", i);
sinkpad = gst_element_get_request_pad (streammux, pad_name);
if (!sinkpad) {
g_printerr ("Streammux request sink pad failed. Exiting.\n");
return -1;
}
srcpad = gst_element_get_static_pad (source_bin, "src");
if (!srcpad) {
g_printerr ("Failed to get src pad of source bin. Exiting.\n");
return -1;
}
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
g_printerr ("Failed to link source bin to stream muxer. Exiting.\n");
return -1;
}
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
//i++;
}
void AddSourceThreadCallBack()
{
if (enableThread)
{
//char *rtsp-src= "rtsp://admin:[email protected]/mpeg4";
//char* file-src = "file:///home/User/Desktop/deepstream-4.0/samples/streams/sample_720p.mp4";
gchar *file = "rtsp://admin:[email protected]/mpeg4";
this_thread::__sleep_for(chrono::seconds(2),chrono::nanoseconds(1));
AddSource(file);
enableThread = false;
}
}
////////////////////////////////////////////////////////////////////////////////////
//##################################################################################
//############################ Remove Source #######################################
////////////////////////////////////////////////////////////////////////////////////
//int RemoveSource()
//{
// GList *sinkPadsList = streammux->sinkpads;
// GList *sourcePadList = streammux->srcpads;
// auto x = streammux->numsrcpads;
// auto y = streammux->numsinkpads;
//
// return 1;
//}
//
//
//void RemoveSourceThreadCallBack()
//{
// while(enableThread)
// {
// this_thread::__sleep_for(chrono::seconds(3),chrono::nanoseconds(1));
// RemoveSource();
// }
//}
////////////////////////////////////////////////////////////////////////////////////
//##################################################################################
int
main (int argc, char *argv[])
{
GMainLoop *loop = NULL;
GstElement *sink_infer = NULL,
*tiler_infer = NULL, *pgie = NULL, *nvvidconv = NULL,
*nvosd = NULL;
GstBus *bus = NULL;
guint bus_watch_id;
guint num_sources;
guint tiler_rows, tiler_columns;
guint pgie_batch_size;
/* Check input arguments */
if (argc < 2) {
g_printerr ("Usage: %s <uri1> [uri2] ... [uriN] \n", argv[0]);
return -1;
}
num_sources = argc - 1;
/* Standard GStreamer initialization */
gst_init (&argc, &argv);
loop = g_main_loop_new (NULL, FALSE);
/* Create gstreamer elements */
/* Create Pipeline element that will form a connection of other elements */
pipeline = gst_pipeline_new ("anomaly-detection-pipeline");
/* Create nvstreammux instance to form batches from one or more sources. */
streammux = gst_element_factory_make ("nvstreammux", "stream-muxer");
if (!pipeline || !streammux) {
g_printerr ("(Line=%d) One element could not be created. Exiting.\n",
__LINE__);
return -1;
}
gst_bin_add (GST_BIN (pipeline), streammux);
for (i = 0; i < num_sources; i++)
{
GstPad *sinkpad, *srcpad;
gchar pad_name[16] = { };
GstElement *source_bin = create_source_bin (i, argv[i + 1]);
if (!source_bin) {
g_printerr ("Failed to create source bin. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), source_bin);
g_snprintf (pad_name, 15, "sink_%u", i);
cout<<pad_name;
sinkpad = gst_element_get_request_pad (streammux, pad_name);
if (!sinkpad) {
g_printerr ("Streammux request sink pad failed. Exiting.\n");
return -1;
}
srcpad = gst_element_get_static_pad (source_bin, "src");
if (!srcpad) {
g_printerr ("Failed to get src pad of source bin. Exiting.\n");
return -1;
}
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
g_printerr ("Failed to link source bin to stream muxer. Exiting.\n");
return -1;
}
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
}
//##########################################################################################
auto alpha = i;
/* Use nvinfer to infer on batched frame. */
pgie = gst_element_factory_make ("nvinfer", "primary-nvinference-engine");
/* Use nvtiler to composite the batched frames into a 2D tiled array based
* on the source of the frames. */
tiler_infer =
gst_element_factory_make ("nvmultistreamtiler", "nvtiler-infer");
/* Use convertor to convert from NV12 to RGBA as required by nvosd */
nvvidconv = gst_element_factory_make ("nvvideoconvert", "nvvideo-converter");
/* Create OSD to draw on the converted RGBA buffer */
nvosd = gst_element_factory_make ("nvdsosd", "nv-onscreendisplay");
sink_infer =
gst_element_factory_make ("nveglglessink", "nvelgglessink-infer");
if (!pgie || !tiler_infer || !nvvidconv || !nvosd || !sink_infer) {
g_printerr ("One Infer element could not be created. Exiting.\n");
return -1;
}
g_object_set (G_OBJECT (streammux), "width", MUXER_OUTPUT_WIDTH, "height",
MUXER_OUTPUT_HEIGHT, "batch-size", num_sources,
"batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC, NULL);
/* Configure the nvinfer element using the nvinfer config file. */
g_object_set (G_OBJECT (pgie),
"config-file-path", "/home/sabbih_shah/Desktop/Temp_WorkSpace/DeepStream_Dynamic_Source/src/dsanomaly_pgie_config.txt", NULL);
/* Override the batch-size set in the config file with the number of sources. */
g_object_get (G_OBJECT (pgie), "batch-size", &pgie_batch_size, NULL);
if (pgie_batch_size != num_sources) {
g_printerr
("WARNING: Overriding infer-config batch-size (%d) with number of sources (%d)\n",
pgie_batch_size, num_sources);
g_object_set (G_OBJECT (pgie), "batch-size", num_sources, NULL);
}
tiler_rows = (guint) sqrt (num_sources);
tiler_columns = (guint) ceil (1.0 * num_sources / tiler_rows);
/* we set the tiler properties here */
g_object_set (G_OBJECT (tiler_infer), "rows", tiler_rows, "columns",
tiler_columns, "width", TILED_OUTPUT_WIDTH_INFER, "height",
TILED_OUTPUT_HEIGHT_INFER, NULL);
/* We set the sink properties here */
// g_object_set (G_OBJECT (sink_of), "window-x", 0, "window-y", 0, NULL);
g_object_set (G_OBJECT (sink_infer), "window-x", TILED_OUTPUT_WIDTH_OF,
"window-y", TILED_OUTPUT_HEIGHT_OF, NULL);
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
/* Set up the pipeline */
/* we add all elements into the pipeline */
gst_bin_add_many (GST_BIN (pipeline), pgie, tiler_infer, nvvidconv, nvosd, sink_infer, NULL);
if (!gst_element_link_many (streammux, pgie, tiler_infer, nvvidconv, nvosd, sink_infer,
NULL)) {
g_printerr ("Elements could not be linked. Exiting.\n");
return -1;
}
/* Set the pipeline to "playing" state */
g_print ("Now playing:");
for (i = 0; i < num_sources; i++) {
g_print (" %s,", argv[i + 1]);
}
g_print ("\n");
gst_object_unref (bus);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
/* Wait till pipeline encounters an error or EOS */
g_print ("Running...\n");
//########################### Add Source Thread ############################
////////////////////////////////////////////////////////////////////////////
thread AddThread(AddSourceThreadCallBack);
AddThread.detach();
////////////////////////////////////////////////////////////////////////////
//##########################################################################
//########################### Remove Source Thread #########################
////////////////////////////////////////////////////////////////////////////
// thread RemoveThread(RemoveSourceThreadCallBack);
// RemoveThread.detach();
////////////////////////////////////////////////////////////////////////////
//##########################################################################
g_main_loop_run (loop);
/* Out of the main loop, clean up nicely */
g_print ("Returned, stopping playback\n");
gst_element_set_state (pipeline, GST_STATE_NULL);
g_print ("Deleting pipeline\n");
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
gst_deinit ();
return 0;
}