Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/guides/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,12 @@ def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str):
# pre_launch_commands=["nvidia-smi"],
# Optional: Specify image pull secrets for authenticating with container registries
# image_pull_secrets=["my-image-pull-secret"],
# packager=run.GitArchivePackager() # Choose appropriate packager
# Optional: Enable preemption scheduling
# can_be_preempted=True, # job yields nodes to higher-priority jobs
# can_preempt=True, # job can evict lower-priority jobs
# queue_priority="mid-4000", # required when either preemption flag is set
# Choose appropriate packager
# packager=run.GitArchivePackager()
)
return executor

Expand Down
12 changes: 11 additions & 1 deletion nemo_run/core/execution/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
EnvValue,
LeptonContainer,
Mount,
QueueConfig,
)
from leptonai.api.v1.types.job import (
LeptonJob,
Expand Down Expand Up @@ -83,6 +84,9 @@ class LeptonExecutor(Executor):
pre_launch_commands: list[str] = field(default_factory=list) # Custom commands before launch
head_resource_shape: Optional[str] = "" # Only used for LeptonRayCluster
ray_version: Optional[str] = None # Only used for LeptonRayCluster
can_be_preempted: bool = False # job yields nodes to higher-priority jobs
can_preempt: bool = False # job can evict lower-priority jobs
queue_priority: Optional[str] = None # e.g. "mid-4000"; required when either flag is set

def stop_job(self, job_id: str):
"""
Expand Down Expand Up @@ -285,7 +289,13 @@ def create_lepton_job(self, name: str):
privileged=False,
metrics=None,
log=None,
queue_config=None,
queue_config=QueueConfig(
priority_class=self.queue_priority or "mid-4000",
can_be_preempted=self.can_be_preempted if self.can_be_preempted else None,
can_preempt=self.can_preempt if self.can_preempt else None,
)
if (self.can_be_preempted or self.can_preempt)
else None,
stopped=None,
)

Expand Down
159 changes: 154 additions & 5 deletions test/core/execution/test_lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import pytest
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
from leptonai.api.v1.types.deployment import (
EnvValue,
EnvVar,
LeptonContainer,
LeptonResourceAffinity,
Mount,
EnvVar,
EnvValue,
QueueConfig,
)
from leptonai.api.v1.types.job import LeptonJob, LeptonJobUserSpec

Expand Down Expand Up @@ -486,6 +487,145 @@ def test_create_lepton_job_with_empty_reservation_config(self, mock_APIClient_cl
created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.reservation_config is None

def test_init_preemption_defaults(self):
"""Test that preemption fields default to off."""
executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
mounts=[{"path": "/test", "mount_path": "/test"}],
)

assert executor.can_be_preempted is False
assert executor.can_preempt is False
assert executor.queue_priority is None

def test_init_with_can_be_preempted(self):
executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
mounts=[{"path": "/test", "mount_path": "/test"}],
can_be_preempted=True,
)

assert executor.can_be_preempted is True

def test_init_with_can_preempt(self):
executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
mounts=[{"path": "/test", "mount_path": "/test"}],
can_preempt=True,
)

assert executor.can_preempt is True

def test_init_with_queue_priority(self):
executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
mounts=[{"path": "/test", "mount_path": "/test"}],
can_be_preempted=True,
queue_priority="high-8000",
)

assert executor.queue_priority == "high-8000"

@patch("nemo_run.core.execution.lepton.APIClient")
def test_create_lepton_job_without_preemption(self, mock_APIClient_class):
"""Test queue_config is None when neither preemption flag is set."""
mock_client = mock_APIClient_class.return_value
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))

executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
node_group="123456",
mounts=[{"path": "/test", "mount_path": "/test"}],
)
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
executor._node_group_id = MagicMock(return_value=node_group)

executor.create_lepton_job("my-lepton-job")

created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.queue_config is None

@patch("nemo_run.core.execution.lepton.APIClient")
def test_create_lepton_job_with_can_be_preempted(self, mock_APIClient_class):
"""Test queue_config is set correctly when can_be_preempted=True."""
mock_client = mock_APIClient_class.return_value
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))

executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
node_group="123456",
mounts=[{"path": "/test", "mount_path": "/test"}],
can_be_preempted=True,
)
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
executor._node_group_id = MagicMock(return_value=node_group)

executor.create_lepton_job("my-lepton-job")

created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.queue_config == QueueConfig(
priority_class="mid-4000",
can_be_preempted=True,
can_preempt=None,
)

@patch("nemo_run.core.execution.lepton.APIClient")
def test_create_lepton_job_with_can_preempt(self, mock_APIClient_class):
"""Test queue_config is set correctly when can_preempt=True."""
mock_client = mock_APIClient_class.return_value
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))

executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
node_group="123456",
mounts=[{"path": "/test", "mount_path": "/test"}],
can_preempt=True,
)
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
executor._node_group_id = MagicMock(return_value=node_group)

executor.create_lepton_job("my-lepton-job")

created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.queue_config == QueueConfig(
priority_class="mid-4000",
can_be_preempted=None,
can_preempt=True,
)

@patch("nemo_run.core.execution.lepton.APIClient")
def test_create_lepton_job_with_custom_queue_priority(self, mock_APIClient_class):
"""Test that a custom queue_priority overrides the 'mid-4000' default."""
mock_client = mock_APIClient_class.return_value
mock_client.job.create.return_value = LeptonJob(metadata=Metadata(id="my-lepton-job"))
node_group = SimpleNamespace(metadata=SimpleNamespace(id_="123456"))

executor = LeptonExecutor(
container_image="test-image",
nemo_run_dir="/test/path",
node_group="123456",
mounts=[{"path": "/test", "mount_path": "/test"}],
can_be_preempted=True,
queue_priority="high-8000",
)
executor._valid_node_ids = MagicMock(return_value=["node-id-1"])
executor._node_group_id = MagicMock(return_value=node_group)

executor.create_lepton_job("my-lepton-job")

created_job = mock_client.job.create.call_args[0][0]
assert created_job.spec.queue_config.priority_class == "high-8000"

def test_nnodes(self):
executor = LeptonExecutor(
container_image="nvcr.io/nvidia/test:latest",
Expand Down Expand Up @@ -549,7 +689,11 @@ def test_valid_storage_mounts_with_mount_from(self):
container_image="nvcr.io/nvidia/test:latest",
nemo_run_dir="/workspace/nemo_run",
mounts=[
{"path": "/workspace", "mount_path": "/workspace", "from": "local-storage:nfs"}
{
"path": "/workspace",
"mount_path": "/workspace",
"from": "local-storage:nfs",
}
],
)

Expand Down Expand Up @@ -730,7 +874,10 @@ def test_package_configs(self, mock_file, mock_makedirs):
mounts=[{"path": "/test", "mount_path": "/test"}],
)

configs = [("config1.yaml", "key: value"), ("subdir/config2.yaml", "another: config")]
configs = [
("config1.yaml", "key: value"),
("subdir/config2.yaml", "another: config"),
]

filenames = executor.package_configs(*configs)

Expand Down Expand Up @@ -855,7 +1002,9 @@ def test_launch_method_comprehensive(
"""Test launch method name validation, pre_launch_commands, and script generation."""
# Setup
executor = LeptonExecutor(
container_image="test-image", nemo_run_dir="/test", pre_launch_commands=["echo setup"]
container_image="test-image",
nemo_run_dir="/test",
pre_launch_commands=["echo setup"],
)
executor.job_dir = executor.lepton_job_dir = "/fake"
mock_join.return_value = "/fake/script.sh"
Expand Down
Loading