Please provide complete information as applicable to your setup.
• Hardware Platform (Jetson / GPU) : GPU
• DeepStream Version : 6.4
• JetPack Version (valid for Jetson only)
• TensorRT Version : 8.6.1.6-1+cuda12.0
• NVIDIA GPU Driver Version (valid for GPU only) : Driver Version: 525.147.05
• Issue Type( questions, new requirements, bugs) : questions
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
I follow this https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/runtime_source_add_delete to dynamically add and delete source.
In my implementation, I start deepstream pipeline with no source, then schedule a background job to add a source to the pipeline → It works.
After a few seconds, another background job is invoked to trigger delete the source → It works.
After a few more seconds, I want to re-add the source but the pipeline got hang.
Overall of the pipeline:
Pipeline implementation:
import sys
from pipeline.source_management import SourceMeta
sys.path.append("../")
from common.bus_call import bus_call
from common.is_aarch_64 import is_aarch64
from common.FPS import PERF_DATA
import pyds
from ctypes import *
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst, GstRtspServer, GLib
from typing import Dict
from pipeline.utils import *
from pipeline.metadata import *
def bus_call(bus, message, loop, pipeline):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream but still keep running to wait for sources\n")
# pipeline.set_state(Gst.State.NULL)
# loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
loop.quit()
return True
class Pipeline():
def __init__(self, config: Dict) -> None:
self.config = config
self._build()
def cb_newpad(self, decodebin,pad,data):
print("In cb_newpad\n")
caps=pad.get_current_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
# Need to check if the pad created by the decodebin is for video and not
# audio.
if(gstname.find("video")!=-1):
source_id = data
pad_name = "sink_%u" % int(source_id)
print(pad_name)
#Get a sink pad from the streammux, link to decodebin
sinkpad = self.streammux.get_request_pad(pad_name)
if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
print("Decodebin linked to pipeline")
else:
sys.stderr.write("Failed to link decodebin to pipeline\n")
# save the src padd to source meta
self.source_meta[int(source_id)].streammux_sinkpad = sinkpad
def decodebin_child_added(self, child_proxy, Object, name, user_data, gpu_id=0):
if (name.find("decodebin") != -1):
Object.connect("child-added", self.decodebin_child_added, user_data)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", False)
if(name.find("nvv4l2decoder") != -1):
if (is_aarch64()):
Object.set_property("enable-max-performance", True)
Object.set_property("drop-frame-interval", 0)
Object.set_property("num-extra-surfaces", 0)
else:
Object.set_property("gpu_id", gpu_id)
# Object.set_property("cudadec-memtype", 2)
def create_source_bin(self, index, uri):
bin_name = "source-bin-%02d" % int(index)
if uri.find("https://") == 0:
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", bin_name)
else:
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", bin_name)
uri_decode_bin.set_property("uri", uri)
uri_decode_bin.connect("pad-added", self.cb_newpad, index)
uri_decode_bin.connect("child-added", self.decodebin_child_added, index)
return uri_decode_bin
def create_sink_bin(self, idx):
sink_bin = Gst.Bin.new(f"sinkbin-{idx}")
queue = make_elm("queue", f"queue-{idx}")
sink_bin.add(queue)
nvvideoconvert = make_elm("nvvideoconvert", f"nvvideoconvert-{idx}")
sink_bin.add(nvvideoconvert)
caps = make_elm("capsfilter", f"caps-{idx}")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA"))
sink_bin.add(caps)
osd = make_elm("nvdsosd", f"osd-{idx}")
sink_bin.add(osd)
osd_sink_pad = osd.get_static_pad("sink")
if not osd_sink_pad:
raise RuntimeError("Unable to get osd sink pad")
osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, self._osd_sink_pad_buffer_probe, 0)
queuesinkpad = queue.get_static_pad("sink")
if not queuesinkpad:
raise RuntimeError("Unable to get queue sink pad")
queue.link(nvvideoconvert)
nvvideoconvert.link(caps)
caps.link(osd)
fakesink = Gst.ElementFactory.make("fakesink", "fakesink")
fakesink.set_property('enable-last-sample', 0)
fakesink.set_property('sync', 0)
# fakesink.set_property("qos",0)
sink_bin.add(fakesink)
queue_fakesink = make_elm("queue", f"queue-fakesink-{idx}")
sink_bin.add(queue_fakesink)
osd.link(queue_fakesink)
queue_fakesink.link(fakesink)
sink_bin.add_pad(Gst.GhostPad.new(f"sink_{idx}", queuesinkpad))
return sink_bin
def build_downstream(self, idx):
sink_bin = self.create_sink_bin(idx)
# add to pipeline and add to source meta
self._pipeline.add(sink_bin)
self.source_meta[idx].sink_bin = sink_bin
# link with demux
sink_pad = sink_bin.get_static_pad(f"sink_{idx}")
self.source_meta[idx].demux_srcpad.link(sink_pad)
self.source_meta[idx].sink_pad = sink_pad
def _build(self) -> None:
# init global variables
self.MAX_NUM_SOURCES = self.config["max_sources"]
self.source_meta = [SourceMeta() for _ in range(self.MAX_NUM_SOURCES)]
# self.sources = self.get_sources()
# num_srcs = len(self.sources)
global perf_data
perf_data = PERF_DATA(self.MAX_NUM_SOURCES)
# initialize GStreamer
GLib.threads_init()
Gst.init(None)
# create pipeline
logging.info("Creating Pipeline")
self._pipeline = Gst.Pipeline()
if not self._pipeline:
raise RuntimeError("Unable to create Pipeline")
logging.info("Creating streammux")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
raise RuntimeError("Unable to create streammux")
self.streammux = streammux
self._pipeline.add(streammux)
if not is_aarch64():
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
streammux.set_property("width", self.config["width"])
streammux.set_property("height", self.config["height"])
streammux.set_property("batch-size", self.MAX_NUM_SOURCES)
streammux.set_property("batched-push-timeout", self.config["batched-push-timeout"])
streammux.set_property("enable-padding", 0)
streammux.set_property("gpu-id", 0)
streammux.set_property("live-source", 1)
# create queue1
logging.info("Creating queue1")
queue1 = Gst.ElementFactory.make("queue", "queue1")
if not queue1:
raise RuntimeError("Unable to create queue1")
self._pipeline.add(queue1)
# create pgie
logging.info("Creating pgie")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
raise RuntimeError("Unable to create pgie")
pgie.set_property("config-file-path", self.config["pgie_config_path"])
pgie_batch_size = pgie.get_property("batch-size")
if pgie_batch_size != self.MAX_NUM_SOURCES:
logging.warning(f"WARNING: Overriding infer-config batch-size {pgie_batch_size} with number of sources {self.MAX_NUM_SOURCES}")
pgie.set_property("batch-size", self.MAX_NUM_SOURCES)
self._pipeline.add(pgie)
# create tracker
logging.info("Creating tracker")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
raise RuntimeError("Unable to create tracker")
tracker = initialize_tracker(tracker, self.config)
self._pipeline.add(tracker)
# create sgie
logging.info("Creating sgie")
sgie = Gst.ElementFactory.make("nvinfer", "secondary-inference")
if not sgie:
raise RuntimeError("Unable to create sgie")
sgie.set_property("config-file-path", self.config["sgie_config_path"])
self._pipeline.add(sgie)
# create demuxer
logging.info("Creating demuxer")
self.demuxer = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
if not self.demuxer:
raise RuntimeError("Unable to create demuxer")
self._pipeline.add(self.demuxer)
# link elements
streammux.link(queue1)
queue1.link(pgie)
pgie.link(tracker)
tracker.link(sgie)
sgie.link(self.demuxer)
for idx in range(self.MAX_NUM_SOURCES):
# create demux first
demuxsrcpad = self.demuxer.get_request_pad(f"src_{idx}")
if not demuxsrcpad:
raise RuntimeError("Unable to get demuxer src pad")
# save the demux
self.source_meta[idx].demux_srcpad = demuxsrcpad
# create an event loop and feed gstreamer bus mesages to it
self._loop = GLib.MainLoop()
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, self._loop, self._pipeline)
GLib.timeout_add(5000, perf_data.perf_print_callback)
self._pipeline.set_state(Gst.State.PAUSED)
# list the output sources
if self.config["export_rtsp"]:
logging.info("Now playing...")
for idx in range(self.MAX_NUM_SOURCES):
print(f"Source {idx}: {self.config['sources'][str(idx)]['uri']}")
rtsp_port_num = self.config["rtsp_port"] + idx
server = GstRtspServer.RTSPServer.new()
server.props.service = f"{rtsp_port_num}"
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch(
f'( udpsrc name=pay0 port={self.config["udp_port"] + idx} buffer-size=524288 '
f'caps="application/x-rtp, media=video, clock-rate=90000, '
f'encoding-name=(string){self.config["format"]}, payload=96 " )')
factory.set_shared(True)
server.get_mount_points().add_factory(f"/stream{idx}", factory)
rtsp_link = f"rtsp://localhost:{rtsp_port_num}/stream{idx}"
logging.info(f"RTSP link: {rtsp_link}")
self._pipeline.set_state(Gst.State.PLAYING)
# add source after each 1 seconds
GLib.timeout_add_seconds(1, self.add_source, 1)
# remove source after 20 seconds
GLib.timeout_add_seconds(20, self.stop_release_source, 1)
# # re-add source after each 30 seconds
GLib.timeout_add_seconds(30, self.add_source, 1)
def stop_release_source(self, source_id):
#Attempt to change status of source to be released
state_return = self.source_meta[source_id].source_bin.set_state(Gst.State.NULL)
# self.demuxer.set_state(Gst.State.NULL)
pad_name = "sink_%u" % source_id
if state_return == Gst.StateChangeReturn.SUCCESS:
print("STATE CHANGE SUCCESS\n")
# remove sink bin
sink_bin = self.source_meta[source_id].sink_bin
sinkpad = sink_bin.get_static_pad(f"sink_{source_id}")
sinkpad.send_event(Gst.Event.new_flush_stop(False))
self._pipeline.remove(sink_bin)
sink_bin.set_state(Gst.State.NULL)
import time
while sink_bin.get_state(0.1)[1] != Gst.State.NULL:
time.sleep(0.1)
# unlink demux src pad
demuxsrcpad = self.source_meta[source_id].demux_srcpad
ret = demuxsrcpad.unlink(self.source_meta[source_id].sink_pad)
if not ret:
print("Stop the pads are not linked")
# release source bin
sinkpad = self.streammux.get_static_pad(pad_name)
sinkpad.send_event(Gst.Event.new_flush_stop(False))
self.streammux.release_request_pad(sinkpad)
self._pipeline.remove(self.source_meta[source_id].source_bin)
elif state_return == Gst.StateChangeReturn.FAILURE:
print("STATE CHANGE FAILURE\n")
elif state_return == Gst.StateChangeReturn.ASYNC:
print("STATE CHANGE ASYNC\n")
state_return = self.source_meta[source_id].source_bin.get_state(Gst.CLOCK_TIME_NONE)
# remove sink bin
sink_bin = self.source_meta[source_id].sink_bin
sinkpad = sink_bin.get_static_pad(f"sink_{source_id}")
sinkpad.send_event(Gst.Event.new_flush_stop(False))
self._pipeline.remove(sink_bin)
sink_bin.set_state(Gst.State.NULL)
import time
while sink_bin.get_state(0.1)[1] != Gst.State.NULL:
time.sleep(0.1)
# unlink demux src pad
demuxsrcpad = self.source_meta[source_id].demux_srcpad
ret = demuxsrcpad.unlink(self.source_meta[source_id].sink_pad)
if not ret:
print("Stop the pads are not linked")
# release source bin
sinkpad = self.streammux.get_static_pad(pad_name)
sinkpad.send_event(Gst.Event.new_flush_stop(False))
self.streammux.release_request_pad(sinkpad)
self._pipeline.remove(self.source_meta[source_id].source_bin)
return False
def add_source(self, idx):
try:
fake_new_src = {
"uri": "rtsp://fake-rtsp-stream:8554/stream0",
"car_confidence": 0.1,
"person_confidence": 0.1,
"restricted_zones": [
[[[200, 300], [1500, 300]], [[1500, 300],
[1500, 900]], [[1500, 900], [200, 900]],
[[200, 900], [200, 300]]]
]
}
is_live = False
# create and add sources
logging.info(f"Onboarding new source")
uri = fake_new_src["uri"]
if "rtsp://" in uri or "https://" in uri:
is_live = True
source_bin = self.create_source_bin(idx, uri)
if not source_bin:
raise RuntimeError("Unable to create source bin")
self._pipeline.add(source_bin)
self.source_meta[idx].source_bin = source_bin
self.build_downstream(idx)
# set properties streammux
if is_live:
self.streammux.set_property("live-source", 1)
#Set state of source bin to playing
state_return = source_bin.set_state(Gst.State.PLAYING)
if state_return == Gst.StateChangeReturn.SUCCESS:
print("STATE CHANGE SUCCESS\n")
elif state_return == Gst.StateChangeReturn.FAILURE:
print("STATE CHANGE FAILURE\n")
elif state_return == Gst.StateChangeReturn.ASYNC:
print("STATE CHANGE ASYNC\n")
state_return = source_bin.get_state(Gst.CLOCK_TIME_NONE)
elif state_return == Gst.StateChangeReturn.NO_PREROLL:
print("STATE CHANGE NO PREROLL\n")
return False # return false to make it run once
except Exception as e:
print(f"Warning: {e}")
finally:
return False # return false to make it run once
def _clean(self) -> None:
logging.info("Cleaning Pipeline")
pyds.unset_callback_funcs()
self._pipeline.set_state(Gst.State.NULL)
def run(self) -> None:
logging.info("Running Pipeline")
self._pipeline.set_state(Gst.State.PLAYING)
try:
self._loop.run()
except:
pass
self._clean()
def _osd_sink_pad_buffer_probe(self, pad, info, u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
logging.error("Unable to get GstBuffer")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
# update fps
cam_id = frame_meta.pad_index
perf_data.update_fps(f"stream{cam_id}")
try:
frame_meta.bInferDone = True
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
In which, pipeline.source_management.py
is:
class SourceMeta:
def __init__(self) -> None:
self.streammux_sinkpad = None
self.demux_srcpad = None
self.source_bin = None
self.sink_bin = None
self.sink_pad = None
I also follow this question to add/stop a source but it seems not solve the issue. How to add elemts after demux during runtime? - #4 by daredeviles888
Hope you can help me find it out. Thanks.