How to deactivate randomizations in Isaac Sim Replicator

Hello,

I am currently implementing a synthetic data generation pipeline in Isaac Sim, using replicator.
I want to create mixed datasets with different setups, like objects flying, objects falling on the floor and objects falling into a bin.

Each setup has different randomizations that I want to apply. So transitioning from one setup to the other would require a deactivation of the old randomizations and an activation of the new ones.

Is it possible to deactivate randomization?

I haven’t found anything about that in the documentation or the code. I also tried the “max_execs” attribute when setting the trigger, but this does not seem to work.

Here is a simplified code snipped to demonstrate that, based on the following Documentation: 10.15. Useful Snippets — Omniverse IsaacSim latest documentation

from omni.isaac.kit import SimulationApp

simulation_app = SimulationApp(launch_config={"headless": False})

import os

import omni.replicator.core as rep
import omni.usd


omni.usd.get_context().new_stage()

distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant")


large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0))
small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0))
large_cube_prim = large_cube.get_output_prims()["prims"][0]
small_cube_prim = small_cube.get_output_prims()["prims"][0]


cam = rep.create.camera(position=(0, 0, 5), look_at=(0, 0, 0))
rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512))
writer = rep.WriterRegistry.get("BasicWriter")

#####################################
# Change this directory
#####################################
out_dir = "/home/kiropro/Downloads/test_test" #os.getcwd() + "/_out_custom_event"

print(f"Writing data to {out_dir}")
writer.initialize(output_dir=out_dir, rgb=True)
writer.attach(rp)

def run_example():
    with rep.trigger.on_frame(interval=1, max_execs=8):
        with large_cube:
            rep.randomizer.rotation()
    rep.utils.send_og_event(event_name="randomize_small_cube")
    #rep.orchestrator.step(rt_subframes=8)
    rep.orchestrator.run_until_complete(num_frames=8)

    with rep.trigger.on_frame(interval=1, max_execs=8):
        with small_cube:
            rep.randomizer.rotation()

    #rep.orchestrator.step(rt_subframes=8)
    rep.orchestrator.run_until_complete(num_frames=8)

    with rep.trigger.on_frame(interval=1, max_execs=8):
        with large_cube:
            rep.randomizer.rotation()
    rep.utils.send_og_event(event_name="randomize_large_cube")
    #rep.orchestrator.step(rt_subframes=8)
    rep.orchestrator.run_until_complete(num_frames=8)

    # Wait until all the data is saved to disk
    rep.orchestrator.wait_until_complete()


run_example()


simulation_app.close()

Using the “max_execs” attribute, I would expect the trigger to stop triggering after max_execs are reached (and therefore alternating between big cube rotations and small cube rotations), but it does not stop triggering.

Kind regards

Axel

Hi there,

if you want to trigger specific randomizations you should use rep.trigger.on_custom_event(event_name="my_event") to register the randomization and rep.utils.send_og_event to trigger it.

In your snippet you are doing a mixture of the two, try getting rid of the on_frame triggers first (this will be triggered on every step() function call).

Also for quicker dev iterations I would suggest to use the script editor of the snippet (and after each run you can create a fresh new stage either from code or manually).

Let me know how it goes.

Best,
Andrei

1 Like

Hi Andrei,

Apologies for the delayed response, I didn’t have the chance to test it until now.

Thank you for the tip, everything now works as expected.

Below is the new code for reference:

from omni.isaac.kit import SimulationApp

simulation_app = SimulationApp(launch_config={"headless": True})

import os

import omni.replicator.core as rep
import omni.usd


omni.usd.get_context().new_stage()

distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant")


large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0))
small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0))
large_cube_prim = large_cube.get_output_prims()["prims"][0]
small_cube_prim = small_cube.get_output_prims()["prims"][0]


cam = rep.create.camera(position=(0, 0, 5), look_at=(0, 0, 0))
rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512))
writer = rep.WriterRegistry.get("BasicWriter")

#####################################
# Change this directory
#####################################
out_dir = "/home/kiropro/Downloads/test_test" #os.getcwd() + "/_out_custom_event"

print(f"Writing data to {out_dir}")
writer.initialize(output_dir=out_dir, rgb=True)
writer.attach(rp)

def run_example():
    with rep.trigger.on_custom_event(event_name="randomize_large_cube"):
        with large_cube:
            rep.randomizer.rotation()
    
    with rep.trigger.on_custom_event(event_name="randomize_small_cube"):
        with small_cube:
            rep.randomizer.rotation()
    
    
    for i in range(8):
        rep.utils.send_og_event(event_name="randomize_large_cube")
        rep.orchestrator.step(rt_subframes=8)
    #rep.orchestrator.run_until_complete(num_frames=8)

    for i in range(8):
        rep.utils.send_og_event(event_name="randomize_small_cube")
        rep.orchestrator.step(rt_subframes=8)

    for i in range(8):
        rep.utils.send_og_event(event_name="randomize_large_cube")
        rep.orchestrator.step(rt_subframes=8)

    # Wait until all the data is saved to disk
    rep.orchestrator.wait_until_complete()


run_example()


simulation_app.close()

Best regards

Axel

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.