Pipelines API

API reference for job pipelines and task queue.

SDK Functions

submit_job

objdet.pipelines.sdk.submit_job(job_type, gpu_count=1, gpu_memory_gb=16, priority=0, tags=None, dependencies=None, **config)[source]

Submit a job to the pipeline queue.

Parameters:
  • job_type (str) – Type of job (“train”, “export”, “preprocess”).

  • gpu_count (int) – Number of GPUs required.

  • gpu_memory_gb (float) – GPU memory required.

  • priority (int) – Job priority (higher = more important).

  • tags (list[str] | None) – Optional tags for filtering.

  • dependencies (list[str] | None) – Optional list of job IDs this depends on.

  • **config (Any) – Job-specific configuration.

Return type:

str

Returns:

Job ID for tracking.

Example

>>> job_id = submit_job(
...     job_type="train",
...     config_path="configs/coco.yaml",
...     output_dir="/outputs/exp_001",
...     max_epochs=50,
... )
from objdet.pipelines import submit_job

job_id = submit_job(
    job_type="train",
    config_path="configs/coco_frcnn.yaml",
    output_dir="/outputs/exp_001",
    gpu_count=2,
    gpu_memory_gb=16,
    priority=5,
    tags=["experiment", "coco"],
)

get_job_status

objdet.pipelines.sdk.get_job_status(job_id)[source]

Get status of a submitted job.

Parameters:

job_id (str) – Job ID returned from submit_job.

Return type:

dict[str, Any]

Returns:

Dictionary with job status information.

from objdet.pipelines import get_job_status

status = get_job_status(job_id)
print(f"Job {job_id}: {status['status']}")

cancel_job

objdet.pipelines.sdk.cancel_job(job_id)[source]

Cancel a pending or running job.

Parameters:

job_id (str) – Job ID to cancel.

Return type:

bool

Returns:

True if cancelled successfully.

from objdet.pipelines import cancel_job

success = cancel_job(job_id)

list_jobs

objdet.pipelines.sdk.list_jobs(status=None, job_type=None, tags=None)[source]

List jobs with optional filters.

Parameters:
Return type:

list[dict[str, Any]]

Returns:

List of job dictionaries.

from objdet.pipelines import list_jobs
from objdet.pipelines.job import JobStatus, JobType

# List all running training jobs
jobs = list_jobs(status=JobStatus.RUNNING, job_type=JobType.TRAIN)

Job Models

JobStatus

class objdet.pipelines.job.JobStatus(*values)[source]

Bases: StrEnum

Status of a pipeline job.

PENDING = 'pending'
QUEUED = 'queued'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'
RETRYING = 'retrying'

Values:

  • PENDING - Job created but not yet queued

  • QUEUED - Job submitted to Celery queue

  • RUNNING - Job currently executing

  • COMPLETED - Job finished successfully

  • FAILED - Job failed with error

  • CANCELLED - Job was cancelled

  • RETRYING - Job is being retried after failure


JobType

class objdet.pipelines.job.JobType(*values)[source]

Bases: StrEnum

Type of pipeline job.

TRAIN = 'train'
EXPORT = 'export'
PREPROCESS = 'preprocess'
EVALUATE = 'evaluate'

Values:

  • TRAIN - Training job

  • EXPORT - Model export job

  • PREPROCESS - Data preprocessing job

  • EVALUATE - Model evaluation job


Job

class objdet.pipelines.job.Job(job_type, config, id=<factory>, status=JobStatus.PENDING, dependencies=<factory>, created_at=<factory>, started_at=None, completed_at=None, result=None, error=None, celery_task_id=None, priority=0, tags=<factory>)[source]

Bases: object

Represents a pipeline job.

id

Unique job identifier.

job_type

Type of job.

status

Current status.

config

Job configuration.

dependencies

IDs of jobs this depends on.

created_at

Creation timestamp.

started_at

Start timestamp.

completed_at

Completion timestamp.

result

Job result data.

error

Error message if failed.

celery_task_id

Celery task ID.

job_type: JobType
config: dict[str, Any]
id: str
status: JobStatus = 'pending'
dependencies: list[str]
created_at: Instant
started_at: Instant | None = None
completed_at: Instant | None = None
result: dict[str, Any] | None = None
error: str | None = None
celery_task_id: str | None = None
priority: int = 0
tags: list[str]
to_dict()[source]

Convert job to dictionary.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Create job from dictionary.

Return type:

Job

is_ready(completed_jobs)[source]

Check if all dependencies are satisfied.

Parameters:

completed_jobs (set[str]) – Set of completed job IDs.

Return type:

bool

Returns:

True if job can be executed.


JobDAG

Directed Acyclic Graph for managing job dependencies.

class objdet.pipelines.job.JobDAG(jobs=<factory>)[source]

Bases: object

Directed Acyclic Graph of jobs with dependencies.

Manages job execution order based on dependencies.

jobs: dict[str, Job]
add_job(job)[source]

Add a job to the DAG.

Return type:

None

get_ready_jobs()[source]

Get jobs that are ready to execute.

Return type:

list[Job]

Returns:

List of jobs with all dependencies satisfied.

get_execution_order()[source]

Get topological order for job execution.

Return type:

list[str]

Returns:

List of job IDs in execution order.

from objdet.pipelines.job import Job, JobDAG, JobType

dag = JobDAG()

# Create jobs with dependencies
preprocess_job = Job(job_type=JobType.PREPROCESS, config={"input_dir": "/data"})
train_job = Job(
    job_type=JobType.TRAIN,
    config={"config_path": "train.yaml"},
    dependencies=[preprocess_job.id],
)

dag.add_job(preprocess_job)
dag.add_job(train_job)

# Get execution order
order = dag.get_execution_order()

Celery Tasks

train_model

objdet.pipelines.tasks.train_model(config_path, output_dir, checkpoint=None, max_epochs=None, devices=1, accelerator='auto', extra_args=None)

Train a detection model.

This task runs a full training loop using Lightning Trainer.

Parameters:
  • self – Celery task instance (bound task).

  • config_path (str) – Path to LightningCLI config YAML.

  • output_dir (str) – Directory for outputs (checkpoints, logs).

  • checkpoint (str | None) – Optional path to resume from checkpoint.

  • max_epochs (int | None) – Override max epochs from config.

  • devices (int) – Number of devices to use.

  • accelerator (str) – Accelerator type.

  • extra_args (dict[str, Any] | None) – Additional CLI arguments.

Return type:

dict[str, Any]

Returns:

Dictionary with training results (best metrics, checkpoint path).


export_model

objdet.pipelines.tasks.export_model(checkpoint_path, output_path, export_format='onnx', input_shape=(1, 3, 640, 640))

Export a trained model to optimized format.

Parameters:
  • self – Celery task instance (bound task).

  • checkpoint_path (str) – Path to model checkpoint.

  • output_path (str) – Output path for exported model.

  • export_format (str) – Target format (onnx, tensorrt, safetensors).

  • input_shape (tuple[int, ...]) – Input tensor shape.

Return type:

dict[str, Any]

Returns:

Dictionary with export results.


preprocess_data

objdet.pipelines.tasks.preprocess_data(input_dir, output_dir, format_name, num_workers=4, class_names=None)

Preprocess dataset to LitData format.

Parameters:
  • self – Celery task instance (bound task).

  • input_dir (str) – Source dataset directory.

  • output_dir (str) – Output directory.

  • format_name (str) – Source format (coco, voc, yolo).

  • num_workers (int) – Number of workers.

  • class_names (list[str] | None) – Class names (required for YOLO).

Return type:

dict[str, Any]

Returns:

Dictionary with preprocessing results.