Agent Skill · NVIDIA NIM

dali-dynamic-mode

DALI imperative dynamic mode (`nvidia.dali.experimental.dynamic`, ndd): use when working on ndd code or migrating pipelines; skip pipeline-only tasks.

Provider: NVIDIA NIM Path in repo: plugins/nvidia-skills/skills/dali-dynamic-mode/SKILL.md

Skill body

DALI Dynamic Mode

Purpose

Guide AI agents in writing, reviewing, and migrating code that uses DALI’s imperative dynamic-mode API, nvidia.dali.experimental.dynamic (ndd).

Instructions

Prerequisites

Introduction

Dynamic mode is DALI’s imperative Python API. It lets code call DALI operators directly from normal Python control flow instead of building and running a pipeline graph.

Core Data Types

Tensor – single sample

t = ndd.tensor(data)           # copy
t = ndd.as_tensor(data)        # wrap, no copy if possible
t.cpu()                        # move to CPU
t.gpu()                        # move to GPU
t.torch(copy=False)            # conversion to PyTorch tensor with no copy (default)
t[1:3]                         # slicing supported
np.asarray(t)                  # NumPy via __array__ (CPU only)

Supports __dlpack__, __cuda_array_interface__, __array__, arithmetic operators.

Batch – collection of samples (variable shapes OK)

b = ndd.batch([arr1, arr2])    # copy
b = ndd.as_batch(data)         # wrap, no copy if possible

Batch has no __getitem__batch[i] raises TypeError because indexing is ambiguous (sample selection vs. per-sample slicing). Use the explicit APIs instead:

Intent Method Returns
Get sample i batch.tensors[i] Tensor
Get subset of samples batch.tensors[slice_or_list] Batch
Slice within each sample batch.slice[...] Batch (same batch_size)
Sample-wise slicing batch.slice[batch_of_indices] Batch (same batch_size)

.tensors[] picks which samples. .slice indexes inside each sample.

xy = ndd.random.uniform(batch_size=16, range=[0, 1], shape=2)
crop_x = xy.slice[0]       # Batch of 16 scalars, first element from each sample
crop_y = xy.slice[1]       # Batch of 16 scalars, second element from each sample
sample_0 = xy.tensors[0]   # Tensor, the entire first sample [x, y]

Advanced slicing

The .slice[] API accepts batches of indices, allowing the user to mix and match batches and scalar values, e.g.:

imgs = ndd.imread(filenames)  # a batch of images, if `filenames` is a list
sliced = imgs.slice[
    42 :  # the range start is broadcast to all samples
    ndd.batch(imgs.shape).slice[0] // 2  # per-sample range stop (half of each image)
]

PyTorch conversion:

Iteration: for sample in batch: yields Tensors.

Readers

Readers are stateful objects – create once, reuse across epochs. This matters because readers track internal state like shuffle order and shard position.

reader = ndd.readers.File(file_root=image_dir, random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        # jpegs, labels are Batch objects
        ...

Key points:

Sharded reading for distributed training:

reader = ndd.readers.File(
    file_root=image_dir,
    shard_id=rank, num_shards=world_size,
    stick_to_shard=True,
    pad_last_batch=True,
)

Device Handling

Execution Model

Default mode is eager – async execution in a background thread, returns immediately.

No .evaluate() needed in most cases. Any data consumption (.torch(), __dlpack__, __array__, .shape, property access, iteration) triggers evaluation automatically.

For debugging, switch to synchronous mode so errors surface at the exact call site rather than later in the async queue:

with ndd.EvalMode.sync_cpu:
    images = ndd.decoders.image(jpegs, device="gpu")
    images = ndd.resize(images, size=[224, 224])
    # Any error surfaces here, at the exact op that failed

Modes (increasing synchronicity): deferred < eager < sync_cpu < sync_full

Use EvalMode.sync_full for debugging instead of scattering .evaluate() calls – it’s cleaner and catches all issues at once. sync_cpu is often sufficient and lighter than sync_full.

Thread Configuration

ndd.set_num_threads(4)  # Call once at startup, only if necessary to override the defaults

Controls DALI’s internal worker threads for CPU operators. Defaults to CPU affinity count or DALI_NUM_THREADS env var. Unrelated to Python-level threading.

RNG

Two approaches (use one, not both):

# Approach 1: set the thread-local default seed (simple, good enough for most cases)
ndd.random.set_seed(42)
angles = ndd.random.uniform(batch_size=64, range=(-30, 30))

# Approach 2: explicit RNG object (finer control, pass rng= to each op)
rng = ndd.random.RNG(seed=42)
values = ndd.random.uniform(batch_size=64, range=[0, 1], shape=2, rng=rng)

When rng= is passed to a random op, the explicit RNG overrides the default seed. Thread-local: each thread has independent random state.

Random ops need an explicit batch_size when working with batches – there is no pipeline-level batch size to inherit.

Checkpointing

Dynamic mode has no pipeline-level checkpoint. Checkpoints aggregate the state of individual stateful objects: readers and RNG instances. Stateless ops (decoders, resize, rotate, normalize, …) are not part of a checkpoint.

ckpt = ndd.checkpoint.Checkpoint()
ckpt.register(reader, "my_reader")
ckpt.register(rng, "rng")

# ... iterate for a while ...

ckpt.collect()                       # snapshot the registered objects
ckpt.save("ckpt_{seq:04d}.json")     # writes ckpt_0000.json, ckpt_0001.json, ...

Restoring is the symmetric operation – build a fresh reader and RNG, then load + register. The loaded state is applied to each object at register time:

reader = ndd.readers.File(file_root=..., enable_checkpointing=True, name="my_reader")
rng = ndd.random.RNG()

ckpt = ndd.checkpoint.Checkpoint()
ckpt.load("ckpt_{seq:04d}.json")     # picks the highest sequence number
ckpt.register(reader, "my_reader")   # state applied here
ckpt.register(rng, "rng")            # ditto

for batch in reader.next_epoch(batch_size=N):
    ...  # produces the next batch after the checkpointed iteration

Key rules:

Manual get_state / set_state is also available directly on each Reader and RNG – the Checkpoint aggregator is built on top of it. Use the manual API only when integrating with an external checkpoint system.

Examples

Image Classification Pipeline

import nvidia.dali.experimental.dynamic as ndd

reader = ndd.readers.File(file_root="/data/imagenet/train", random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        images = ndd.decoders.image(jpegs, device="gpu")
        images = ndd.resize(images, size=[224, 224])
        images = ndd.crop_mirror_normalize(
            images,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
        )
        train_step(images.torch(), labels.torch())

Common Mistakes

Wrong Right Why
device="mixed" device="gpu" "mixed" is pipeline mode only
batch[i] batch.tensors[i] Batch has no __getitem__
batch.tensors[0] for per-sample slicing batch.slice[0] .tensors pick samples; .slice slices within each sample
.evaluate() after every op Let consumption trigger eval .torch(), .shape, etc. trigger it automatically
.cpu() before GPU model .torch() directly Avoids wasteful D2H + H2D round-trip
Recreate reader each epoch reader.next_epoch() Readers are stateful – create once, reuse
ndd.readers.file(...) ndd.readers.File(...) Reader classes are PascalCase
break from next_epoch() loop Exhaust iterator or create new reader Iterator must be fully consumed before next next_epoch()
No batch_size to random ops ndd.random.uniform(batch_size=N, ...) No pipeline-level batch size to inherit
register(reader) after first next_epoch to restore Register the freshly built reader before the first iteration Reader state can only be applied before the prefetch thread starts
Restoring into a reader built without enable_checkpointing=True after iteration Pass enable_checkpointing=True at construction (or register before first iteration) Backend doesn’t keep snapshots otherwise
Spelling out default argument values Skip default argument values Very high Python-side overhead, especially when the argument accepts Tensors/Batches. Skipping arguments uses a fast path, actually passing a sentinel value.

Pipeline Mode Migration

Pipeline Mode Dynamic Mode
@pipeline_def / pipe.build() / pipe.run() Direct function calls in a loop
fn.readers.file(...) ndd.readers.File(...) (PascalCase, stateful)
fn.decoders.image(jpegs, device="mixed") ndd.decoders.image(jpegs, device="gpu")
fn.op_name(...) ndd.op_name(...)
Pipeline-level batch_size=64 reader.next_epoch(batch_size=64) + random ops batch_size=64
Pipeline-level seed=42 ndd.random.set_seed(42) or ndd.random.RNG(seed=42)
Pipeline-level num_threads=4 ndd.set_num_threads(4) at startup
output.at(i) batch.tensors[i]
output.as_cpu() batch.cpu()
pipe.run() returns tuple of TensorList reader.next_epoch(batch_size=N) yields tuples of Batch
Pipeline(..., enable_checkpointing=True) + pipe.checkpoint() / pipeline(checkpoint=...) ndd.checkpoint.Checkpoint + per-object register / collect / save / load; readers opt in with enable_checkpointing=True

Limitations

Dynamic mode is more flexible than pipeline mode, but can have slightly worse performance. For maximum throughput, prefer pipeline mode.

Troubleshooting

Skill frontmatter

license: Apache-2.0 metadata: {"author" => "DALI Team ", "tags" => ["dali", "dynamic-mode", "ndd", "data-loading", "data-processing", "gpu-processing"], "languages" => ["python"], "team" => "dali", "domain" => "deep-learning"}