Can not dynamically re-add source after delete

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) : GPU
• DeepStream Version : 6.4
• JetPack Version (valid for Jetson only)
• TensorRT Version : 8.6.1.6-1+cuda12.0
• NVIDIA GPU Driver Version (valid for GPU only) : Driver Version: 525.147.05
• Issue Type( questions, new requirements, bugs) : questions
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)

I follow this https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/runtime_source_add_delete to dynamically add and delete source.
In my implementation, I start deepstream pipeline with no source, then schedule a background job to add a source to the pipeline → It works.
After a few seconds, another background job is invoked to trigger delete the source → It works.
After a few more seconds, I want to re-add the source but the pipeline got hang.

Overall of the pipeline:

Pipeline implementation:

import sys
from pipeline.source_management import SourceMeta
sys.path.append("../")
from common.bus_call import bus_call
from common.is_aarch_64 import is_aarch64
from common.FPS import PERF_DATA
import pyds
from ctypes import *
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst, GstRtspServer, GLib
from typing import Dict
from pipeline.utils import *
from pipeline.metadata import *


def bus_call(bus, message, loop, pipeline):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream but still keep running to wait for sources\n")
        # pipeline.set_state(Gst.State.NULL)
        
        # loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))

    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    return True


class Pipeline():
    def __init__(self, config: Dict) -> None:
        self.config = config
        self._build()
    
    
    def cb_newpad(self, decodebin,pad,data):
        print("In cb_newpad\n")
        caps=pad.get_current_caps()
        gststruct=caps.get_structure(0)
        gstname=gststruct.get_name()

        # Need to check if the pad created by the decodebin is for video and not
        # audio.
        if(gstname.find("video")!=-1):
            source_id = data
            pad_name = "sink_%u" % int(source_id)
            print(pad_name)
            #Get a sink pad from the streammux, link to decodebin
            sinkpad = self.streammux.get_request_pad(pad_name)
            if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
                print("Decodebin linked to pipeline")
            else:
                sys.stderr.write("Failed to link decodebin to pipeline\n")
            
            # save the src padd to source meta
            self.source_meta[int(source_id)].streammux_sinkpad = sinkpad

    def decodebin_child_added(self, child_proxy, Object, name, user_data, gpu_id=0):
        if (name.find("decodebin") != -1):
            Object.connect("child-added", self.decodebin_child_added, user_data)

        if "source" in name:
            source_element = child_proxy.get_by_name("source")
            if source_element.find_property('drop-on-latency') != None:
                Object.set_property("drop-on-latency", False)

        if(name.find("nvv4l2decoder") != -1):
            if (is_aarch64()):
                Object.set_property("enable-max-performance", True)
                Object.set_property("drop-frame-interval", 0)
                Object.set_property("num-extra-surfaces", 0)
            else:
                Object.set_property("gpu_id", gpu_id)
            # Object.set_property("cudadec-memtype", 2) 


    def create_source_bin(self, index, uri):        
        bin_name = "source-bin-%02d" % int(index)
        if uri.find("https://") == 0:
            uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", bin_name)
        else:
            uri_decode_bin = Gst.ElementFactory.make("uridecodebin", bin_name)
        uri_decode_bin.set_property("uri", uri)
        uri_decode_bin.connect("pad-added", self.cb_newpad, index)
        uri_decode_bin.connect("child-added", self.decodebin_child_added, index)
        
        return uri_decode_bin
    
    def create_sink_bin(self, idx):
        sink_bin = Gst.Bin.new(f"sinkbin-{idx}")

        queue = make_elm("queue", f"queue-{idx}")
        sink_bin.add(queue)

        nvvideoconvert = make_elm("nvvideoconvert", f"nvvideoconvert-{idx}")
        sink_bin.add(nvvideoconvert)

        caps = make_elm("capsfilter", f"caps-{idx}")
        caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA"))
        sink_bin.add(caps)

        osd = make_elm("nvdsosd", f"osd-{idx}")
        sink_bin.add(osd)

        osd_sink_pad = osd.get_static_pad("sink")
        if not osd_sink_pad:
            raise RuntimeError("Unable to get osd sink pad")
        osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, self._osd_sink_pad_buffer_probe, 0)

        queuesinkpad = queue.get_static_pad("sink")
        if not queuesinkpad:
            raise RuntimeError("Unable to get queue sink pad")

        queue.link(nvvideoconvert)
        nvvideoconvert.link(caps)
        caps.link(osd)
        
        fakesink = Gst.ElementFactory.make("fakesink", "fakesink")
        fakesink.set_property('enable-last-sample', 0)
        fakesink.set_property('sync', 0)
        # fakesink.set_property("qos",0)
        sink_bin.add(fakesink)

        queue_fakesink = make_elm("queue", f"queue-fakesink-{idx}")
        sink_bin.add(queue_fakesink)
        
        osd.link(queue_fakesink)
        queue_fakesink.link(fakesink)

        sink_bin.add_pad(Gst.GhostPad.new(f"sink_{idx}", queuesinkpad))
        return sink_bin
    
    def build_downstream(self, idx):
        sink_bin = self.create_sink_bin(idx)
        # add to pipeline and add to source meta
        self._pipeline.add(sink_bin)
        self.source_meta[idx].sink_bin = sink_bin

        # link with demux
        sink_pad = sink_bin.get_static_pad(f"sink_{idx}")
        self.source_meta[idx].demux_srcpad.link(sink_pad)
        self.source_meta[idx].sink_pad = sink_pad

    def _build(self) -> None:
        # init global variables
        self.MAX_NUM_SOURCES = self.config["max_sources"]
        self.source_meta = [SourceMeta() for _ in range(self.MAX_NUM_SOURCES)]

        # self.sources = self.get_sources()
        # num_srcs = len(self.sources)
        global perf_data
        perf_data = PERF_DATA(self.MAX_NUM_SOURCES)

        # initialize GStreamer
        GLib.threads_init()
        Gst.init(None)

        # create pipeline
        logging.info("Creating Pipeline")
        self._pipeline = Gst.Pipeline()
        
        if not self._pipeline:
            raise RuntimeError("Unable to create Pipeline")
        
        logging.info("Creating streammux")
        streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
        if not streammux:
            raise RuntimeError("Unable to create streammux")
        self.streammux = streammux
        self._pipeline.add(streammux)

        if not is_aarch64():
            mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
            streammux.set_property("nvbuf-memory-type", mem_type)
        streammux.set_property("width", self.config["width"])
        streammux.set_property("height", self.config["height"])
        streammux.set_property("batch-size", self.MAX_NUM_SOURCES)
        streammux.set_property("batched-push-timeout", self.config["batched-push-timeout"])
        streammux.set_property("enable-padding", 0)
        streammux.set_property("gpu-id", 0)
        streammux.set_property("live-source", 1)
        
        # create queue1
        logging.info("Creating queue1")
        queue1 = Gst.ElementFactory.make("queue", "queue1")
        if not queue1:
            raise RuntimeError("Unable to create queue1")
        self._pipeline.add(queue1)
        
        # create pgie
        logging.info("Creating pgie")
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
        if not pgie:
            raise RuntimeError("Unable to create pgie")
        pgie.set_property("config-file-path", self.config["pgie_config_path"])
        pgie_batch_size = pgie.get_property("batch-size")
        if pgie_batch_size != self.MAX_NUM_SOURCES:
            logging.warning(f"WARNING: Overriding infer-config batch-size {pgie_batch_size} with number of sources {self.MAX_NUM_SOURCES}")
            pgie.set_property("batch-size", self.MAX_NUM_SOURCES)
        self._pipeline.add(pgie)

        # create tracker
        logging.info("Creating tracker")
        tracker = Gst.ElementFactory.make("nvtracker", "tracker")
        if not tracker:
            raise RuntimeError("Unable to create tracker")
        tracker = initialize_tracker(tracker, self.config)
        self._pipeline.add(tracker)

        # create sgie
        logging.info("Creating sgie")
        sgie = Gst.ElementFactory.make("nvinfer", "secondary-inference")
        if not sgie:
            raise RuntimeError("Unable to create sgie")
        sgie.set_property("config-file-path", self.config["sgie_config_path"])
        self._pipeline.add(sgie)

        # create demuxer
        logging.info("Creating demuxer")
        self.demuxer = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
        if not self.demuxer:
            raise RuntimeError("Unable to create demuxer")
        self._pipeline.add(self.demuxer)

        # link elements
        streammux.link(queue1)
        queue1.link(pgie)
        pgie.link(tracker)
        tracker.link(sgie)
        
        sgie.link(self.demuxer)
        for idx in range(self.MAX_NUM_SOURCES):
            # create demux first
            demuxsrcpad = self.demuxer.get_request_pad(f"src_{idx}")
            if not demuxsrcpad:
                raise RuntimeError("Unable to get demuxer src pad")
            
            # save the demux
            self.source_meta[idx].demux_srcpad = demuxsrcpad

        # create an event loop and feed gstreamer bus mesages to it
        self._loop = GLib.MainLoop()
        bus = self._pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", bus_call, self._loop, self._pipeline)

        GLib.timeout_add(5000, perf_data.perf_print_callback)

        self._pipeline.set_state(Gst.State.PAUSED)

        # list the output sources
        if self.config["export_rtsp"]:
            logging.info("Now playing...")
            for idx in range(self.MAX_NUM_SOURCES):
                print(f"Source {idx}: {self.config['sources'][str(idx)]['uri']}")

                rtsp_port_num = self.config["rtsp_port"] + idx
                server = GstRtspServer.RTSPServer.new()
                server.props.service = f"{rtsp_port_num}"
                server.attach(None)

                factory = GstRtspServer.RTSPMediaFactory.new()
                factory.set_launch(
                f'( udpsrc name=pay0 port={self.config["udp_port"] + idx} buffer-size=524288 '
                f'caps="application/x-rtp, media=video, clock-rate=90000, '
                f'encoding-name=(string){self.config["format"]}, payload=96 " )')
                factory.set_shared(True)
                server.get_mount_points().add_factory(f"/stream{idx}", factory)
                rtsp_link = f"rtsp://localhost:{rtsp_port_num}/stream{idx}"
                logging.info(f"RTSP link: {rtsp_link}")

        self._pipeline.set_state(Gst.State.PLAYING)


        # add source after each 1 seconds
        GLib.timeout_add_seconds(1, self.add_source, 1)
        
        # remove source after 20 seconds
        GLib.timeout_add_seconds(20, self.stop_release_source, 1)

        # # re-add source after each 30 seconds
        GLib.timeout_add_seconds(30, self.add_source, 1)

    def stop_release_source(self, source_id):
        #Attempt to change status of source to be released 
        state_return = self.source_meta[source_id].source_bin.set_state(Gst.State.NULL)

        # self.demuxer.set_state(Gst.State.NULL)

        pad_name = "sink_%u" % source_id
        if state_return == Gst.StateChangeReturn.SUCCESS:
            print("STATE CHANGE SUCCESS\n")

            # remove sink bin
            sink_bin = self.source_meta[source_id].sink_bin
            sinkpad = sink_bin.get_static_pad(f"sink_{source_id}")
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self._pipeline.remove(sink_bin)
            sink_bin.set_state(Gst.State.NULL)
            import time
            while sink_bin.get_state(0.1)[1] != Gst.State.NULL:
                time.sleep(0.1)
            
            # unlink demux src pad
            demuxsrcpad = self.source_meta[source_id].demux_srcpad 
            ret = demuxsrcpad.unlink(self.source_meta[source_id].sink_pad)
            if not ret:
                print("Stop the pads are not linked")
            
            # release source bin
            sinkpad = self.streammux.get_static_pad(pad_name)
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self.streammux.release_request_pad(sinkpad)
            self._pipeline.remove(self.source_meta[source_id].source_bin)

        elif state_return == Gst.StateChangeReturn.FAILURE:
            print("STATE CHANGE FAILURE\n")
        
        elif state_return == Gst.StateChangeReturn.ASYNC:
            print("STATE CHANGE ASYNC\n")
            state_return = self.source_meta[source_id].source_bin.get_state(Gst.CLOCK_TIME_NONE)
            # remove sink bin
            sink_bin = self.source_meta[source_id].sink_bin
            sinkpad = sink_bin.get_static_pad(f"sink_{source_id}")
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self._pipeline.remove(sink_bin)
            sink_bin.set_state(Gst.State.NULL)
            import time
            while sink_bin.get_state(0.1)[1] != Gst.State.NULL:
                time.sleep(0.1)
            
            # unlink demux src pad
            demuxsrcpad = self.source_meta[source_id].demux_srcpad 
            ret = demuxsrcpad.unlink(self.source_meta[source_id].sink_pad)
            if not ret:
                print("Stop the pads are not linked")
            
            # release source bin
            sinkpad = self.streammux.get_static_pad(pad_name)
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self.streammux.release_request_pad(sinkpad)
            self._pipeline.remove(self.source_meta[source_id].source_bin)
        return False
    
    def add_source(self, idx):
        try:
            fake_new_src = {
                    "uri": "rtsp://fake-rtsp-stream:8554/stream0",
                    "car_confidence": 0.1,
                    "person_confidence": 0.1,
                    "restricted_zones": [
                        [[[200, 300], [1500, 300]], [[1500, 300], 
                        [1500, 900]], [[1500, 900], [200, 900]], 
                        [[200, 900], [200, 300]]]                 
                    ]
                }
            
            is_live = False
            # create and add sources
          
            logging.info(f"Onboarding new source")
            uri = fake_new_src["uri"]
                
            if "rtsp://" in uri or "https://" in uri:
                is_live = True

            source_bin = self.create_source_bin(idx, uri)
            if not source_bin:
                raise RuntimeError("Unable to create source bin")

            self._pipeline.add(source_bin)
            self.source_meta[idx].source_bin = source_bin
            
            self.build_downstream(idx)

            # set properties streammux
            if is_live:
                self.streammux.set_property("live-source", 1)

            #Set state of source bin to playing
            state_return = source_bin.set_state(Gst.State.PLAYING)
            
            if state_return == Gst.StateChangeReturn.SUCCESS:
                print("STATE CHANGE SUCCESS\n")

            elif state_return == Gst.StateChangeReturn.FAILURE:
                print("STATE CHANGE FAILURE\n")
            
            elif state_return == Gst.StateChangeReturn.ASYNC:
                print("STATE CHANGE ASYNC\n")
                state_return = source_bin.get_state(Gst.CLOCK_TIME_NONE)

            elif state_return == Gst.StateChangeReturn.NO_PREROLL:
                print("STATE CHANGE NO PREROLL\n")
            
            return False # return false to make it run once
        except Exception as e:
            print(f"Warning: {e}")
        finally:
            return False # return false to make it run once

    def _clean(self) -> None:
        logging.info("Cleaning Pipeline")
        pyds.unset_callback_funcs()
        self._pipeline.set_state(Gst.State.NULL)

    
    def run(self) -> None:
        logging.info("Running Pipeline")
        self._pipeline.set_state(Gst.State.PLAYING)
        try:
            self._loop.run()
        except:
            pass
        self._clean()


    def _osd_sink_pad_buffer_probe(self, pad, info, u_data):
        gst_buffer = info.get_buffer()
        if not gst_buffer:
            logging.error("Unable to get GstBuffer")
            return

        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame is not None:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            except StopIteration:
                break

            # update fps
            cam_id = frame_meta.pad_index
            perf_data.update_fps(f"stream{cam_id}")

            try:
                frame_meta.bInferDone = True
                l_frame = l_frame.next
            except StopIteration:
                break

        return Gst.PadProbeReturn.OK

In which, pipeline.source_management.py is:

class SourceMeta:
    def __init__(self) -> None:
        self.streammux_sinkpad = None
        self.demux_srcpad = None
        self.source_bin = None
        self.sink_bin = None
        self.sink_pad = None

I also follow this question to add/stop a source but it seems not solve the issue. How to add elemts after demux during runtime? - #4 by daredeviles888

Hope you can help me find it out. Thanks.

Can you describe step by step how to run your demo? If I can reproduce that on my host, the analysis will be faster.

Thank you for the prompt response @yuweiw .
The code pasted above exactly can not run without invoking other code in my project. However, could you please just catch the main steps of building the pipeline in that code, especially the add_source and the stop_release_source functions. May I forget an important step to stop or add a source at runtime?

Surprisingly, We found that if there is at least one running stream, re-adding works as expected (for example, keep stream index 0 running, then delete and re-add another stream to index 1 works well).

OK. Your problem is similar to the 283113. When we upgraded gstreamer to 1.20, we couldn’t add a source when the list has no stream. This could be a bug in Gstreamer. You can try that with DeepStream 6.3 and check if it works normally.

I now can re-add streams without downgrading Deepstream version, by setting Gst.State.READY to the pipeline after deleting the last source. When re-add, just set Gst.State.PLAYING to the pipeline again. Thank you.

It looks wired, sometime re-adding works, some other times it doesn’t (got hang forever)

And I observed these warnings but the pipeline is still able to run. Is it safe to ignore these warnings?

(python3:33738): GStreamer-WARNING **: 09:43:04.561: ../gst/gstpad.c:5337:store_sticky_event:<queue-queuemsg_0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.561: ../gst/gstpad.c:5337:store_sticky_event:<tee-tee_0:src_1> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.561: ../gst/gstpad.c:5337:store_sticky_event:<queue-queuemsg_0:src> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.561: ../gst/gstpad.c:5337:store_sticky_event:<queue-queuerender_0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.562: ../gst/gstpad.c:5337:store_sticky_event:<tee-tee_0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.562: ../gst/gstpad.c:5337:store_sticky_event:<nvmsgconv-nvmsgconv_0:src> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.562: ../gst/gstpad.c:5337:store_sticky_event:<nvdsosd-osd-0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.562: ../gst/gstpad.c:5337:store_sticky_event:<capsfilter-caps-0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.562: ../gst/gstpad.c:5337:store_sticky_event:<nvmsgbroker-nvmsgbroker_0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.562: ../gst/gstpad.c:5337:store_sticky_event:<nvvideoconvert-nvvideoconvert-0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.562: ../gst/gstpad.c:5337:store_sticky_event:<nvmsgconv-nvmsgconv_0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.613: ../gst/gstpad.c:5337:store_sticky_event:<queue-queuerender_0:src> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.613: ../gst/gstpad.c:5337:store_sticky_event:<nvvideoconvert-nvvidconv_postosd-0:src> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.613: ../gst/gstpad.c:5337:store_sticky_event:<capsfilter-caps_postosd-0:src> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.613: ../gst/gstpad.c:5337:store_sticky_event:<nvv4l2h264enc-encoder-0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.613: ../gst/gstpad.c:5337:store_sticky_event:<capsfilter-caps_postosd-0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:04.613: ../gst/gstpad.c:5337:store_sticky_event:<nvvideoconvert-nvvidconv_postosd-0:sink> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:06.186: ../gst/gstpad.c:5337:store_sticky_event:<nvv4l2h264enc-encoder-0:src> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:06.186: ../gst/gstpad.c:5337:store_sticky_event:<h264parse-codeparser-0:src> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:06.186: ../gst/gstpad.c:5337:store_sticky_event:<mpegtsmux-rtppay-0:sink_65> Sticky event misordering, got 'caps' before 'stream-start'
 
(python3:33738): GStreamer-WARNING **: 09:43:06.186: ../gst/gstpad.c:5337:store_sticky_event:<h264parse-codeparser-0:sink> Sticky event misordering, got 'caps' before 'stream-start'

There may be a problem with the timing of some Gstreamer events in your code. If none of these events need to be handled in the pipeline, there is no problem.

        # add source after each 1 seconds
        GLib.timeout_add_seconds(1, self.add_source, 1)
        
        # remove source after 20 seconds
        GLib.timeout_add_seconds(20, self.stop_release_source, 1)

        # # re-add source after each 30 seconds
        GLib.timeout_add_seconds(30, self.add_source, 1)

Does your server support multiple requests to get the same url at the same time? It may be limited by your server if you add the same source over and over again.

Tried it, even when using different rtsp urls, it’s still an issue.

Could you make sure that add action and delete action don’t be executed at the same time? And if there is no sources, the delete action cannot be executed.

Yes, by increasing the gap time between these actions to make them not overlap, still see the pipeline got hang.

These are three timers running, and you can’t guarantee no overlap this way. Could you just try that like our demo deepstream_rt_src_add_del.py. You can adding the source to the upper limit before deleting it.
Or you can describe your needs in detail.

Our need is keep the pipeline to run without video source and is able to re-add sources after all sources deleted. In the example you provided above, the pipeline quits when there is no source running.

We have a simple way to implement your needs through the REST API, and you can see if it is available.
how-to-use-nvmultiurisrcbin-in-a-pipeline
1. Terminal 1

gst-launch-1.0 nvmultiurisrcbin \
port=9000 ip-address=localhost \
batched-push-timeout=33333 max-batch-size=10 \
drop-pipeline-eos=1 live-source=1 \
uri-list=rtsp://xxx width=1920 height=1080 \
! nvmultistreamtiler ! nveglglessink

2. Terminal 2:
Add:

curl -XPOST 'http://localhost:9000/api/v1/stream/add' -d '{
  "key": "sensor",
  "value": {
     "camera_id": "uniqueSensorID1",
     "camera_name": "front_door",
     "camera_url": "rtsp://xxx",
     "change": "camera_add",
     "metadata": {
         "resolution": "1920 x1080",
         "codec": "h264",
         "framerate": 30
     }
 },
 "headers": {
     "source": "vst",
     "created_at": "2021-06-01T14:34:13.417Z"
 }
}'

Remove:

curl -XPOST 'http://localhost:9000/api/v1/stream/remove' -d '{
    "key": "sensor",
    "value": {
        "camera_id": "uniqueSensorID1",
        "camera_name": "front_door",
        "camera_url": "rtsp://xxx",
        "change": "camera_remove",
        "metadata": {
            "resolution": "1920 x1080",
            "codec": "h264",
            "framerate": 30
        }
    },
    "headers": {
        "source": "vst",
        "created_at": "2021-06-01T14:34:13.417Z"
    }
}'

I have the same issue as you. When I delete a video source, despite the pipeline still containing other video sources, the pipeline hangs when I re-add the deleted source. I am still searching for the root cause and the solution. I hope you will notify me when you find it.

nvmultiurisrcbin looks potential, however, when delete and add a source, we need a callback triggered to do some other tasks, for example to create/delete a sink bin after demux element. Is there a way to create and trigger the callback with nvmultiurisrcbin

No. This plugin is only used to handle the adding and removing controls on the source side.
So could you describe your complete requirements and we will consider whether we need to develop a similar demo? Thanks

Yeah, the situation is where I have a Deepstream app, which is expected to run with 0 source, 1 source, 2 sources, and B sources where B is batch size of the streammux. The source can be added and removed at runtime. In case of no source running, I want it the keep the Pipeline, release any memory occupied, and just wait for adding sources. So far, I can not achieve it because I am struggling to make it run again if the last source is deleted (with our current implementation posted above). Alos, each time a source added or deleted, I want to make a call to our own server (another service) to notify adding/removing successfully, in this case, it would be great if we have a callback for nvmultiurisrcbin.

We don’t have a similar callback in the nvmultiurisrcbin plugin yet.

  • Sources can be dynamically added and delete.
  • The pipeline should not stop when there is no source.
  • When the source was removed, remove the related srcpad of the nvstreamdemux.

We will consider whether to implement a demo that meets the above requirements later.

1 Like