Skip to main content
enrich_many / resolve_many work when you have the whole list in memory. When records arrive from a stream — a CSV cursor, a SQS poller, a Kafka consumer, a generator — you don’t want to materialize everything first. The Batcher and AsyncBatcher helpers let you add() items one at a time. They buffer up to size, fire the batch, and drain whatever’s left on context exit. They’re generic over the callable. The SDK’s own mc.api.enrich / mc.api.resolve fit because they take list[Record]. But anything that matches Callable[[list[T]], R] plugs in — a DB upsert, a webhook fan-out, a file writer.

Sync

from minerva import Minerva, Batcher

mc = Minerva()

with Batcher(mc.api.enrich, size=100) as b:
    for row in iter_csv("leads.csv"):
        b.add(row)

for resp in b.results:
    print(resp.to_df())

Async

from minerva import AsyncMinerva, AsyncBatcher

async with AsyncMinerva() as mc:
    async with AsyncBatcher(mc.api.enrich, size=100) as b:
        async for row in aiter_kafka_topic("leads"):
            await b.add(row)
    for resp in b.results:
        print(resp.to_df())
By default each filled batch fires as a background taskadd returns immediately and the next batch starts buffering while the previous one is in flight. The context manager’s exit awaits all outstanding tasks. Set block=True to serialize instead, or max_concurrency=N to cap how many run in parallel.

Custom callables

The batcher is not enrich-specific. Drop in any function that takes a list[T]:
def upsert_to_warehouse(rows: list[dict]) -> int:
    return db.bulk_insert("leads", rows)

with Batcher(
    upsert_to_warehouse,
    size=500,
    on_result=lambda n: log.info(f"wrote {n} rows"),
) as b:
    for row in some_stream():
        b.add(row)
on_result runs after every batch. When set, results are not accumulated on b.results — the callback is the sink.

Custom Minerva endpoints via mc.api.call

If you’re hitting a preview / client-specific endpoint that doesn’t have a typed wrapper yet, wrap mc.api.call in a thin function so it matches the list[T] -> R shape. You get the same authentication, rate limiting, and MinervaTransientError retry as the typed methods:
def post_to_special(records: list[dict]) -> dict:
    return mc.api.call("POST", "/v2/your-special-endpoint", json={"records": records})

with Batcher(post_to_special, size=100) as b:
    for row in iter_stream():
        b.add(row)
Async equivalent:
async with AsyncMinerva() as mc:
    async def post_special(rows: list[dict]) -> dict:
        return await mc.api.call("POST", "/v2/your-special-endpoint", json={"records": rows})

    async with AsyncBatcher(post_special, size=100) as b:
        async for row in aiter_stream():
            await b.add(row)

Pydantic validation at add()

Pass schema= to validate every item against a pydantic model before it’s queued. The original input is still what gets buffered — pydantic is a shape gate, not a transformer.
from minerva import Batcher, EnrichRecord

with Batcher(mc.api.enrich, size=100, schema=EnrichRecord) as b:
    for row in iter_csv("leads.csv"):
        b.add(row)

print(b.invalid_count)   # how many rows failed the schema
print(b.invalid)         # [(item, summary), ...]
Three policies for the failure path:
on_invalidBehavior on a row that fails schema.model_validate(item)
"warn" (default)Emits MinervaValidationWarning. The row is still added — the API is the final arbiter.
"skip"Emits the warning, drops the row, bumps b.invalid_count.
"raise"Raises MinervaValidationError from add() immediately.
"warn" is the default because the SDK shouldn’t second-guess the API. If pydantic is stricter than the server, dropping the row hides records the server would have accepted. Flip to "skip" when you’d rather keep the batch clean than ship something that might 4xx, or "raise" when the pipeline should fail fast on bad input. To silence the warning entirely:
import warnings
from minerva import MinervaValidationWarning
warnings.filterwarnings("ignore", category=MinervaValidationWarning)

Error handling

By default, if the batch call raises, the error surfaces from add() or flush() and aborts the loop. For pipelines that should keep going past a failure, pass swallow_errors=True:
with Batcher(mc.api.enrich, size=100, swallow_errors=True) as b:
    for row in iter_csv("leads.csv"):
        b.add(row)

print(f"sent {len(b.results)} batches, {len(b.errors)} failed")
for e in b.errors:
    log.warning("batch failed: %s", e)

What lives on the batcher

AttributeMeaning
b.resultsOne entry per successful batch — the callable’s return value (unless on_result is set).
b.errorsOne entry per batch that raised (only populated with swallow_errors=True).
b.invalid(item, summary) tuples for every row that failed schema validation.
b.invalid_countCount of b.invalid.
b.bufferedHow many items are currently in the buffer.
b.in_flight(AsyncBatcher only) background-task count not yet completed.

When to reach for enrich_many instead

You have…Use
A finite list in memorymc.api.enrich_many(records) — simpler, fans out concurrently, returns one merged list.
A stream / cursor / generatorBatcher / AsyncBatcher.
A list, but want callback-per-batchBatcher with on_result=….
Both pipe through the same _request_data path, so rate-limiting + MinervaTransientError compose with either choice.