Pipelines API¶
API reference for job pipelines and task queue.
SDK Functions¶
submit_job¶
- objdet.pipelines.sdk.submit_job(job_type, gpu_count=1, gpu_memory_gb=16, priority=0, tags=None, dependencies=None, **config)[source]¶
Submit a job to the pipeline queue.
- Parameters:
job_type (
str) – Type of job (“train”, “export”, “preprocess”).gpu_count (
int) – Number of GPUs required.gpu_memory_gb (
float) – GPU memory required.priority (
int) – Job priority (higher = more important).dependencies (
list[str] |None) – Optional list of job IDs this depends on.**config (
Any) – Job-specific configuration.
- Return type:
- Returns:
Job ID for tracking.
Example
>>> job_id = submit_job( ... job_type="train", ... config_path="configs/coco.yaml", ... output_dir="/outputs/exp_001", ... max_epochs=50, ... )
from objdet.pipelines import submit_job
job_id = submit_job(
job_type="train",
config_path="configs/coco_frcnn.yaml",
output_dir="/outputs/exp_001",
gpu_count=2,
gpu_memory_gb=16,
priority=5,
tags=["experiment", "coco"],
)
get_job_status¶
from objdet.pipelines import get_job_status
status = get_job_status(job_id)
print(f"Job {job_id}: {status['status']}")
cancel_job¶
from objdet.pipelines import cancel_job
success = cancel_job(job_id)
list_jobs¶
- objdet.pipelines.sdk.list_jobs(status=None, job_type=None, tags=None)[source]¶
List jobs with optional filters.
from objdet.pipelines import list_jobs
from objdet.pipelines.job import JobStatus, JobType
# List all running training jobs
jobs = list_jobs(status=JobStatus.RUNNING, job_type=JobType.TRAIN)
Job Models¶
JobStatus¶
- class objdet.pipelines.job.JobStatus(*values)[source]¶
Bases:
StrEnumStatus of a pipeline job.
- PENDING = 'pending'¶
- QUEUED = 'queued'¶
- RUNNING = 'running'¶
- COMPLETED = 'completed'¶
- FAILED = 'failed'¶
- CANCELLED = 'cancelled'¶
- RETRYING = 'retrying'¶
Values:
PENDING- Job created but not yet queuedQUEUED- Job submitted to Celery queueRUNNING- Job currently executingCOMPLETED- Job finished successfullyFAILED- Job failed with errorCANCELLED- Job was cancelledRETRYING- Job is being retried after failure
JobType¶
- class objdet.pipelines.job.JobType(*values)[source]¶
Bases:
StrEnumType of pipeline job.
- TRAIN = 'train'¶
- EXPORT = 'export'¶
- PREPROCESS = 'preprocess'¶
- EVALUATE = 'evaluate'¶
Values:
TRAIN- Training jobEXPORT- Model export jobPREPROCESS- Data preprocessing jobEVALUATE- Model evaluation job
Job¶
- class objdet.pipelines.job.Job(job_type, config, id=<factory>, status=JobStatus.PENDING, dependencies=<factory>, created_at=<factory>, started_at=None, completed_at=None, result=None, error=None, celery_task_id=None, priority=0, tags=<factory>)[source]
Bases:
objectRepresents a pipeline job.
- id
Unique job identifier.
- job_type
Type of job.
- status
Current status.
- config
Job configuration.
- dependencies
IDs of jobs this depends on.
- created_at
Creation timestamp.
- started_at
Start timestamp.
- completed_at
Completion timestamp.
- result
Job result data.
- error
Error message if failed.
- celery_task_id
Celery task ID.
- job_type: JobType
- id: str
- status: JobStatus = 'pending'
- created_at: Instant
- started_at: Instant | None = None
- completed_at: Instant | None = None
- priority: int = 0
- classmethod from_dict(data)[source]
Create job from dictionary.
- Return type:
Job
JobDAG¶
Directed Acyclic Graph for managing job dependencies.
- class objdet.pipelines.job.JobDAG(jobs=<factory>)[source]¶
Bases:
objectDirected Acyclic Graph of jobs with dependencies.
Manages job execution order based on dependencies.
from objdet.pipelines.job import Job, JobDAG, JobType
dag = JobDAG()
# Create jobs with dependencies
preprocess_job = Job(job_type=JobType.PREPROCESS, config={"input_dir": "/data"})
train_job = Job(
job_type=JobType.TRAIN,
config={"config_path": "train.yaml"},
dependencies=[preprocess_job.id],
)
dag.add_job(preprocess_job)
dag.add_job(train_job)
# Get execution order
order = dag.get_execution_order()
Celery Tasks¶
train_model¶
- objdet.pipelines.tasks.train_model(config_path, output_dir, checkpoint=None, max_epochs=None, devices=1, accelerator='auto', extra_args=None)¶
Train a detection model.
This task runs a full training loop using Lightning Trainer.
- Parameters:
self – Celery task instance (bound task).
config_path (
str) – Path to LightningCLI config YAML.output_dir (
str) – Directory for outputs (checkpoints, logs).checkpoint (
str|None) – Optional path to resume from checkpoint.devices (
int) – Number of devices to use.accelerator (
str) – Accelerator type.extra_args (
dict[str,Any] |None) – Additional CLI arguments.
- Return type:
- Returns:
Dictionary with training results (best metrics, checkpoint path).
export_model¶
- objdet.pipelines.tasks.export_model(checkpoint_path, output_path, export_format='onnx', input_shape=(1, 3, 640, 640))¶
Export a trained model to optimized format.
- Parameters:
- Return type:
- Returns:
Dictionary with export results.
preprocess_data¶
- objdet.pipelines.tasks.preprocess_data(input_dir, output_dir, format_name, num_workers=4, class_names=None)¶
Preprocess dataset to LitData format.
- Parameters:
- Return type:
- Returns:
Dictionary with preprocessing results.