Run Inference

To run inference with Datature Vi, call your loaded ViModel instance with a source file (image or video), URL, data URI, or folder. The SDK returns a (result, error) tuple by default, or a streaming iterator when you set stream=True.

Before You Start
  • A loaded model from Datature Vi or HuggingFace
  • Images in a supported format: .jpg, .jpeg, .png, .bmp, .gif, .tiff, .webp
  • For video, a supported file extension on disk or on the URL (for example .mp4, .mov, .webm, .mkv, .avi) or a data:video/... data URI
  • Familiarity with streaming vs non-streaming modes

Learn how to load models →

Streaming vs non-streaming

Non-streaming (default)

Calling model(...) without stream=True returns a (result, error) tuple once generation completes. This is the right mode for batch processing, automated pipelines, and any situation where you need explicit error handling.

result, error = model(
    source="image.jpg",
    user_prompt="Describe this image"
    # stream=False is the default
)

if error is None:
    print(result.result)
else:
    print(f"Error: {error}")

Streaming

Pass stream=True to receive tokens as they generate. Iterate over the stream object, then call .get_final_completion() for the full structured result.

stream = model(
    source="image.jpg",
    user_prompt="Describe this image",
    stream=True
)

for token in stream:
    print(token, end="", flush=True)

result = stream.get_final_completion()
print(f"\n\n{result.result}")

Use streaming when building interactive applications that benefit from displaying partial output as it arrives.

Chain-of-thought (CoT) inference

Pass cot=True on model(...) to turn on chain-of-thought (CoT) mode at inference time. The SDK appends the CoT system prompt suffix and uses structured decoding so the model emits <think> then <answer>. Parsed fields such as thinking and the task-specific answer are filled the same way as when the model was trained with CoT-style data.

If you pass generation_config as a dict and omit max_new_tokens, the SDK raises the generation cap to 4096 tokens while cot=True so reasoning steps fit. If you pass your own cap, that value is kept.

result, error = model(
    source="image.jpg",
    user_prompt="How many vehicles are visible? Show your reasoning.",
    cot=True,
    stream=False,
)

if error is None:
    if result.thinking:
        print("Reasoning:", result.thinking)
    # VQA: use result.result.answer; other task types differ (see prediction schemas)
    print("Answer:", result.result.answer)

Concepts: chain-of-thought reasoning · Prediction schemas: thinking and raw output

Single image inference

Basic usage

result, error = model(
    source="/path/to/image.jpg",
    user_prompt="What objects are in this image?"
)

if error is None:
    print(f"Result: {result.result}")
else:
    print(f"Error: {error}")

With generation config

Control output length, randomness, and sampling strategy:

result, error = model(
    source="image.jpg",
    user_prompt="Describe this image in detail",
    generation_config={
        "max_new_tokens": 256,
        "temperature": 0.7,
        "top_p": 0.9
    }
)

Learn about generation parameters →

Supported image sources

# Relative path
result, error = model(source="./images/photo.jpg")

# Absolute path
result, error = model(source="/home/user/images/photo.jpg")

# Home directory shorthand
result, error = model(source="~/Pictures/photo.png")

Supported formats: .jpg, .jpeg, .png, .bmp, .gif, .tiff, .tif, .webp

Video inference

For models that use the Qwen-VL-style predictor pipeline (Qwen2.5-VL, Qwen3-VL, Qwen3.5, Cosmos Reason1, Cosmos Reason2, InternVL 3.5, LLaVA-NeXT), you run video the same way as images: pass a video path, a video URL, or a base64 data:video/... URI as source. Streaming, generation_config, prompts, and error handling work the same.

The SDK treats a file as video from its extension (and URLs from the path before any query string). NVILA and DeepSeek OCR predictors accept images only; video input raises a clear error naming supported model families.

Frame sampling (fps)

Pass fps as a keyword argument on model(...), not inside generation_config. It sets how many frames per second of wall-clock video are sampled for the processor. It applies only when the input is detected as video. The default is 4.0.

result, error = model(
    source="clip.mp4",
    user_prompt="Describe what happens in this clip.",
    fps=2.0,
    stream=False,
)

Task type for video

If the model was trained for freeform or generic tasks, the SDK switches to video-freeform when the source is video. You can still pass task_type="video-freeform" to force that mode when needed (see the ViModel docstring in the SDK for overrides).

Batch folders and videos

When source is a directory, the SDK expands it to image files only. To batch several videos, pass a list of video paths (each item is processed like a single-image job).

Batch inference

Pass a list of paths or a folder path. For folders, the SDK collects image files and returns one (result, error) tuple per image. Progress display is on by default.

Process a list of files

image_paths = ["image1.jpg", "image2.jpg", "image3.jpg"]

results = model(
    source=image_paths,
    user_prompt="Describe this image",
    show_progress=True
)

for i, (result, error) in enumerate(results):
    if error is None:
        print(f"Image {i+1}: {result.result}")
    else:
        print(f"Image {i+1} failed: {error}")

Process an entire folder

results = model(
    source="./my_images/",
    user_prompt="Describe this image",
    show_progress=True
)

successful = sum(1 for _, e in results if e is None)
print(f"Processed {successful}/{len(results)} images successfully")

Recursive directory search

results = model(
    source="./dataset/",
    user_prompt="What's in this image?",
    recursive=True,
    show_progress=True
)

print(f"Processed {len(results)} images across all subdirectories")

Mix files and folders

results = model(
    source=[
        "./image1.jpg",
        "./folder1/",
        "~/Pictures/photo.png",
        "./dataset/"
    ],
    user_prompt="Analyze this image",
    recursive=False,
    show_progress=True
)

Different prompts per image

images = ["car.jpg", "person.jpg", "building.jpg"]
prompts = [
    "What color is the car?",
    "How many people are visible?",
    "What type of building is this?"
]

results = model(
    source=images,
    user_prompt=prompts,
    show_progress=True
)

for image, prompt, (result, error) in zip(images, prompts, results):
    if error is None:
        print(f"{image}: {result.result}")

When providing a list of prompts, its length must match the number of images. A mismatch raises a ValueError.

Progress tracking

The progress bar shows current count, speed, estimated time remaining, and success/failure counts. Toggle it with show_progress:

# With progress bar (default)
results = model(source=image_list, user_prompt="Describe this", show_progress=True)

# Without progress bar (useful in automated scripts)
results = model(source=image_list, user_prompt="Describe this", show_progress=False)

Example output:

Running batch inference (45 / 100 images)... ━━━━━━╸━━━━━━━━ 45% 0:02:15

Error handling

Consistent error pattern

Non-streaming inference always returns (result, error). Check error is None before accessing result fields.

result, error = model(source="image.jpg")

if error is None:
    print(f"Success: {result.result}")
else:
    print(f"Failed: {error}")

    if isinstance(error, FileNotFoundError):
        print("Image file not found")
    elif "out of memory" in str(error).lower():
        print("GPU out of memory, try quantization")

Batch error handling

Each image in a batch has its own error status. A failure on one image does not stop the rest.

results = model(
    source=["img1.jpg", "missing.jpg", "img3.jpg"],
    user_prompt="Describe this",
    show_progress=True
)

successful = []
failed = []

for img, (result, error) in zip(images, results):
    if error is None:
        successful.append((img, result))
    else:
        failed.append((img, error))

print(f"Successful: {len(successful)}, Failed: {len(failed)}")

Common workflows

Save results to JSON

import json
from vi.inference.task_types.phrase_grounding import PhraseGroundingResponse

results = model(source="./images/", user_prompt="Describe this image")

output_data = []
for result, error in results:
    output_data.append({
        "result": str(result.result) if error is None else None,
        "error": str(error) if error else None,
        "has_grounding": isinstance(result, PhraseGroundingResponse) if error is None else False
    })

with open("results.json", "w") as f:
    json.dump(output_data, f, indent=2)

Process with metadata

from pathlib import Path
from datetime import datetime
from vi.inference.task_types.vqa import VQAResponse
from vi.inference.task_types.phrase_grounding import PhraseGroundingResponse

def get_text(result):
    if isinstance(result, VQAResponse):
        return result.result.answer
    elif isinstance(result, PhraseGroundingResponse):
        return result.result.sentence
    return result.result

image_files = list(Path("./test_images").glob("*.jpg"))
results = model(source=image_files, user_prompt="Describe this image")

output = []
for img_path, (result, error) in zip(image_files, results):
    output.append({
        "filename": img_path.name,
        "timestamp": datetime.now().isoformat(),
        "text": get_text(result) if error is None else None,
        "success": error is None
    })

success_rate = sum(1 for item in output if item["success"]) / len(output)
print(f"Success rate: {success_rate:.1%}")

Retry failed images

def process_with_retry(model, images, max_retries=3):
    results = {}
    remaining = list(images)

    for attempt in range(max_retries):
        if not remaining:
            break

        print(f"Attempt {attempt + 1}/{max_retries}, {len(remaining)} images remaining")

        batch_results = model(
            source=remaining,
            user_prompt="Describe this image",
            show_progress=True
        )

        new_remaining = []
        for img, (result, error) in zip(remaining, batch_results):
            if error is None:
                results[img] = result
            else:
                new_remaining.append(img)

        remaining = new_remaining

    return results, remaining

successful, failed = process_with_retry(model, image_list)
print(f"Successful: {len(successful)}, Failed: {len(failed)}")

Chunked processing for large datasets

import torch
from pathlib import Path

def process_in_chunks(model, image_dir, chunk_size=100):
    all_images = list(Path(image_dir).glob("*.jpg"))
    print(f"Processing {len(all_images)} images in chunks of {chunk_size}")

    all_results = []
    for i in range(0, len(all_images), chunk_size):
        chunk = all_images[i:i + chunk_size]
        results = model(source=chunk, user_prompt="Describe this image", show_progress=True)
        all_results.extend(results)

        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    return all_results

results = process_in_chunks(model, "./large_dataset", chunk_size=100)

Related resources

Task Types

VQA and phrase grounding: when to use each, prompt guidelines, and response structures.

Handle Results

Access captions and bounding boxes, convert coordinates, visualize predictions.

Improve Performance

Memory management, GPU utilization, and recommended batch sizes.