nbatch

Flexible batch processing utilities

  • Tim Monko

License BSD-3 PyPI Python Version tests codecov napari hub npe2 Copier

Lightweight batch processing utilities for the ndev-kit ecosystem.

nbatch provides a foundation for batch processing operations. It's designed to work seamlessly with napari plugins but has no napari or Qt dependencies.

Features

  • @batch decorator - Transform single-item functions into batch-capable functions
  • BatchContext - Track progress through batch operations
  • BatchRunner - Orchestrate batch operations with threading, progress callbacks, and cancellation
  • discover_files() - Flexible file discovery with natural sorting (like file explorers)
  • batch_logger - Scoped logging for batch operations with headers/footers
  • Minimal dependencies - Only requires natsort for natural file ordering
  • Optional napari integration - Uses napari's threading when available, falls back to standard threads

Installation

pip install nbatch

For development:

pip install -e . --group dev

Quick Start

Basic Batch Processing

The @batch decorator transforms a function that processes a single item into one that handles both single items and batches:

from pathlib import Path
from nbatch import batch

@batch
def process_image(path: Path) -> str:
    # Your processing logic here
    return path.stem.upper()

# Single item - returns result directly
result = process_image(Path("image.tif"))
# Returns: "IMAGE"

# List of items - returns generator
results = process_image([Path("a.tif"), Path("b.tif")])
list(results)
# Returns: ["A", "B"]

# Directory - discovers files and returns generator
results = process_image(Path("/data/images"))
# Processes all files in directory

Progress Tracking

Use with_context=True to get progress information:

@batch(with_context=True)
def process_image(path: Path) -> str:
    return path.stem

for result, ctx in process_image(files):
    print(f"{ctx.progress:.0%} complete: {result}")
    # 10% complete: image1
    # 20% complete: image2
    # ...

The BatchContext provides:

  • ctx.index - Zero-based index of current item
  • ctx.total - Total number of items
  • ctx.item - The current item being processed
  • ctx.progress - Progress as fraction (0.0 to 1.0)
  • ctx.is_first / ctx.is_last - Boolean flags

Error Handling

Control how errors are handled with on_error:

# 'raise' (default) - Re-raise exceptions immediately
@batch(on_error='raise')
def strict_process(path): ...

# 'continue' - Log error and yield None for failed items
@batch(on_error='continue')
def lenient_process(path): ...
# Results: ["good", None, "ok"]

# 'skip' - Log error and skip failed items entirely
@batch(on_error='skip')
def skip_errors(path): ...
# Results: ["good", "ok"]

File Discovery

Control which files are processed:

# Custom glob patterns
@batch(patterns='*.tif')
def process_tiffs(path): ...

# Multiple patterns
@batch(patterns=['*.tif', '*.tiff', '*.png'])
def process_images(path): ...

# Non-recursive (top-level only)
@batch(recursive=False)
def process_top_level(path): ...

Or use discover_files() directly:

from nbatch import discover_files

# From directory with patterns
files = discover_files("/data/images", patterns=["*.tif", "*.png"])

# From explicit list
files = discover_files([path1, path2, path3])

Logging

Use batch_logger for structured logging. By default, it outputs to the console (stderr). Optionally log to a file:

from nbatch import batch, batch_logger

@batch(with_context=True)
def process(path):
    return path.stem

# Console only (default)
with batch_logger() as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

# With file logging (appends by default)
with batch_logger(log_file="output/process.log", header={"Files": 100}) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")
        # Or use log.info(), log.warning(), log.error()

# File only (no console output)
with batch_logger(log_file="output/quiet.log", console=False) as log:
    for result, ctx in process(files):
        log(ctx, f"Processed: {result}")

Log file output:

============================================================
Batch processing started at 2025-01-29 10:30:00
------------------------------------------------------------
Files: 100
============================================================
2025-01-29 10:30:01 - INFO - [1/100] image1.tif - Processed: image1
2025-01-29 10:30:02 - INFO - [2/100] image2.tif - Processed: image2
...
============================================================
Batch processing completed at 2025-01-29 10:35:00
============================================================

Integration with napari

Using BatchRunner (Recommended)

BatchRunner provides clean orchestration for widgets with threading, progress callbacks, and cancellation:

from nbatch import batch, BatchRunner

# Define your processing function (pure, testable)
@batch(on_error='continue')
def process_image(path, model, output_dir):
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget class
class MyWidget:
    def __init__(self, viewer):
        self._viewer = viewer
        
        # Create runner once - reusable for all batches
        self.runner = BatchRunner(
            on_start=self._on_batch_start,
            on_item_complete=self._on_item_complete,
            on_complete=self._on_batch_complete,
            on_error=self._on_item_error,
            on_cancel=self._on_cancelled,
        )
        
        self._run_button.clicked.connect(self.run_batch)
        self._cancel_button.clicked.connect(self.runner.cancel)
    
    def _on_batch_start(self, total):
        """Called when batch starts with total item count."""
        self._progress_bar.setValue(0)
        self._progress_bar.setMaximum(total)
    
    def _on_item_complete(self, result, ctx):
        """Called after each item completes."""
        self._progress_bar.setValue(ctx.index + 1)
        # Optionally add result to viewer
        if result is not None:
            self._viewer.add_image(result, name=f"Result {ctx.index}")
    
    def _on_batch_complete(self):
        errors = self.runner.error_count
        if errors > 0:
            self._progress_bar.label = f"Done with {errors} errors"
        else:
            self._progress_bar.label = "Complete!"
    
    def _on_item_error(self, ctx, exception):
        self._progress_bar.label = f"Error on {ctx.item.name}"
    
    def _on_cancelled(self):
        self._progress_bar.label = "Cancelled"
    
    def run_batch(self):
        """Triggered by 'Run' button - just one line!"""
        self.runner.run(
            process_image,
            self.files,
            model=self.model,
            output_dir=self.output_dir,
            log_file=self.output_dir / "batch.log",
        )

Using @thread_worker directly

For more control, use napari's @thread_worker with the @batch decorator:

from napari.qt.threading import thread_worker
from nbatch import batch, batch_logger

@batch(with_context=True, on_error='continue')
def process_image(path, model, output_dir):
    # Your processing logic
    result = model.predict(load_image(path))
    save_result(result, output_dir / path.name)
    return result

# In your widget
def run_batch(self):
    @thread_worker
    def _run():
        with batch_logger(log_file=self.output_dir / 'log.txt') as log:
            for result, ctx in process_image(
                self.input_dir,
                model=self.model,
                output_dir=self.output_dir,
            ):
                log(ctx, f"Processed: {ctx.item.name}")
                yield ctx  # Enables progress updates
    
    worker = _run()
    worker.yielded.connect(
        lambda ctx: self.progress_bar.setValue(int(ctx.progress * 100))
    )
    worker.start()

API Reference

@batch Decorator

@batch(
    on_error: Literal['raise', 'continue', 'skip'] = 'raise',
    with_context: bool = False,
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
)

BatchContext

@dataclass(frozen=True)
class BatchContext:
    index: int      # Zero-based index
    total: int      # Total items
    item: Any       # Current item
    
    @property
    def progress(self) -> float: ...  # (index + 1) / total
    @property
    def is_first(self) -> bool: ...   # index == 0
    @property
    def is_last(self) -> bool: ...    # index == total - 1

discover_files()

def discover_files(
    source: str | Path | Iterable[str | Path],
    patterns: str | Sequence[str] = '*',
    recursive: bool = False,
) -> list[Path]: ...

batch_logger

@contextmanager
def batch_logger(
    log_file: str | Path | None = None,  # Optional file path
    header: Mapping[str, object] | None = None,  # Metadata to write at start
    level: int = logging.INFO,
    console: bool = True,  # Output to stderr
    file_mode: Literal['w', 'a'] = 'a',  # Append by default
) -> Generator[BatchLogger, None, None]: ...

BatchRunner

class BatchRunner:
    def __init__(
        self,
        on_start: Callable[[int], None] | None = None,
        on_item_complete: Callable[[Any, BatchContext], None] | None = None,
        on_complete: Callable[[], None] | None = None,
        on_error: Callable[[BatchContext, Exception], None] | None = None,
        on_cancel: Callable[[], None] | None = None,
    ): ...
    
    def run(
        self,
        func: Callable,
        items: Any,
        *args,
        threaded: bool = True,
        log_file: str | Path | None = None,
        log_header: Mapping[str, object] | None = None,
        patterns: str | Sequence[str] = '*',
        recursive: bool = False,
        **kwargs,  # Passed to func!
    ) -> None: ...
    
    def cancel(self) -> None: ...
    
    @property
    def is_running(self) -> bool: ...
    
    @property
    def was_cancelled(self) -> bool: ...
    
    @property
    def error_count(self) -> int: ...  # Errors in current/last batch

Contributing

Contributions are welcome! Please ensure tests pass before submitting a pull request:

pytest --cov=src/nbatch

License

Distributed under the terms of the BSD-3 license.

Part of ndev-kit

nbatch is part of the ndev-kit ecosystem for no-code bioimage analysis in napari.

Version:

  • 0.0.3

Last updated:

  • 2025-12-01

First released:

  • 2025-12-01

License:

  • Unavailable

Supported data:

  • Information not submitted

Plugin type:

Open extension:

Save extension:

Python versions supported:

Operating system:

  • Information not submitted

Requirements:

  • natsort
  • napari; extra == "napari"
  • pyqt6; extra == "qtpy-backend"
  • nbatch[napari]; extra == "all"
  • nbatch[qtpy-backend]; extra == "all"
Website by the napari team, original design by CZI. Go to napari main website.