Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions utilities/stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,18 @@ def stress_test(mxid: str = ""):
parser = argparse.ArgumentParser()
parser.add_argument("-ne", "--n-edge-detectors", default=0, type=int, help="Number of edge detectors to create.")
parser.add_argument("--no-nnet", action="store_true", default=False, help="Don't create a neural network.")
parser.add_argument(
"--slow-rampup",
action="store_true",
default=False,
help="Ramp IR dot/flood brightness after the pipeline starts (helps avoid device crashes).",
)
parser.add_argument(
"--rampup-seconds",
type=float,
default=5.0,
help="Duration (seconds) for --slow-rampup. Recommended: 5.",
)

# May have some unknown args
args, _ = parser.parse_known_args()
Expand All @@ -159,24 +171,76 @@ def stress_test(mxid: str = ""):
exp_time = 20000

import time

success, device_info = dai.Device.getDeviceByMxId(mxid)
cam_args = [] # Device info or no args at all
if success:
cam_args.append(device_info)
with dai.Device(*cam_args) as device:
print("Setting default dot intensity to", dot_intensity)
device.setIrLaserDotProjectorIntensity(dot_intensity)
print("Setting default flood intensity to", flood_intensity)
device.setIrFloodLightIntensity(flood_intensity)
if args.slow_rampup:
# Put IR to a known safe baseline before starting the pipeline.
# The actual configured targets are applied after the pipeline is running.
try:
device.setIrLaserDotProjectorIntensity(0.0)
device.setIrFloodLightIntensity(0.0)
except Exception as e:
print("Warning: Failed to set IR intensity baseline:", repr(e))
else:
print("Setting default dot intensity to", dot_intensity)
device.setIrLaserDotProjectorIntensity(dot_intensity)
print("Setting default flood intensity to", flood_intensity)
device.setIrFloodLightIntensity(flood_intensity)

pipeline, outputs, pipeline_context = build_pipeline(device, args)
device.startPipeline(pipeline)

ramp_start_t: Optional[float] = None
ramp_last_set_t: float = 0.0
ramp_last_dot: Optional[float] = None
ramp_last_flood: Optional[float] = None

if args.slow_rampup:
print(
f"Slow rampup enabled: ramping dot={dot_intensity:.2f}, flood={flood_intensity:.2f} over {args.rampup_seconds:.1f}s (non-blocking)"
)
ramp_start_t = time.time()

start_time = time.time()
queues = [device.getOutputQueue(name, size, False)
for name, size in outputs if name != "sys_log"]
camera_control_q = device.getInputQueue("cam_control")
sys_info_q = device.getOutputQueue("sys_log", 1, False)
usb_speed = device.getUsbSpeed()
while True:
# Ramp IR on-the-fly while we keep draining queues.
if ramp_start_t is not None:
now = time.time()
dur = float(args.rampup_seconds)
# Rate-limit device setter calls; these are RPCs and can be chatty.
if dur <= 0:
frac = 1.0
else:
frac = clamp((now - ramp_start_t) / dur, 0.0, 1.0)

# Update at most ~20Hz, or on final completion.
if (now - ramp_last_set_t) >= 0.05 or frac >= 1.0:
dot = dot_intensity * frac
flood = flood_intensity * frac
try:
if ramp_last_dot is None or abs(dot - ramp_last_dot) >= 1e-3:
device.setIrLaserDotProjectorIntensity(dot)
ramp_last_dot = dot
if ramp_last_flood is None or abs(flood - ramp_last_flood) >= 1e-3:
device.setIrFloodLightIntensity(flood)
ramp_last_flood = flood
except Exception as e:
print("Warning: IR rampup failed:", repr(e))
ramp_start_t = None
ramp_last_set_t = now

if frac >= 1.0:
ramp_start_t = None

for queue in queues:
packet = queue.tryGet()
# print("QUEUE", queue.getName(), "PACKET", packet)
Expand Down Expand Up @@ -296,6 +360,7 @@ def build_pipeline(device: dai.Device, args) -> Tuple[dai.Pipeline, List[Tuple[s
eeprom = calib.getEepromData()
left_socket = eeprom.stereoRectificationData.leftCameraSocket
right_socket = eeprom.stereoRectificationData.rightCameraSocket
print(f"L: {left_socket} | R: {right_socket}")
Comment thread
zrezke marked this conversation as resolved.
Outdated
align_socket = [
cam.socket
for cam in camera_features
Expand Down
Loading