enrich_many / resolve_many work when you have the whole list in
memory. When records arrive from a stream — a CSV cursor, a SQS poller, a
Kafka consumer, a generator — you don’t want to materialize everything
first. The Batcher and AsyncBatcher helpers let you add() items one
at a time. They buffer up to size, fire the batch, and drain whatever’s
left on context exit.
They’re generic over the callable. The SDK’s own mc.api.enrich /
mc.api.resolve fit because they take list[Record]. But anything that
matches Callable[[list[T]], R] plugs in — a DB upsert, a webhook
fan-out, a file writer.
Sync
Async
add
returns immediately and the next batch starts buffering while the
previous one is in flight. The context manager’s exit awaits all
outstanding tasks. Set block=True to serialize instead, or
max_concurrency=N to cap how many run in parallel.
Custom callables
The batcher is not enrich-specific. Drop in any function that takes alist[T]:
on_result runs after every batch. When set, results are not
accumulated on b.results — the callback is the sink.
Custom Minerva endpoints via mc.api.call
If you’re hitting a preview / client-specific endpoint that doesn’t have
a typed wrapper yet, wrap mc.api.call in a thin
function so it matches the list[T] -> R shape. You get the same
authentication, rate limiting, and MinervaTransientError retry as the
typed methods:
Pydantic validation at add()
Pass schema= to validate every item against a pydantic model before
it’s queued. The original input is still what gets buffered — pydantic
is a shape gate, not a transformer.
on_invalid | Behavior on a row that fails schema.model_validate(item) |
|---|---|
"warn" (default) | Emits MinervaValidationWarning. The row is still added — the API is the final arbiter. |
"skip" | Emits the warning, drops the row, bumps b.invalid_count. |
"raise" | Raises MinervaValidationError from add() immediately. |
"warn" is the default because the SDK shouldn’t second-guess the API.
If pydantic is stricter than the server, dropping the row hides records
the server would have accepted. Flip to "skip" when you’d rather keep
the batch clean than ship something that might 4xx, or "raise" when
the pipeline should fail fast on bad input.
To silence the warning entirely:
Error handling
By default, if the batch call raises, the error surfaces fromadd() or
flush() and aborts the loop. For pipelines that should keep going past
a failure, pass swallow_errors=True:
What lives on the batcher
| Attribute | Meaning |
|---|---|
b.results | One entry per successful batch — the callable’s return value (unless on_result is set). |
b.errors | One entry per batch that raised (only populated with swallow_errors=True). |
b.invalid | (item, summary) tuples for every row that failed schema validation. |
b.invalid_count | Count of b.invalid. |
b.buffered | How many items are currently in the buffer. |
b.in_flight | (AsyncBatcher only) background-task count not yet completed. |
When to reach for enrich_many instead
| You have… | Use |
|---|---|
| A finite list in memory | mc.api.enrich_many(records) — simpler, fans out concurrently, returns one merged list. |
| A stream / cursor / generator | Batcher / AsyncBatcher. |
| A list, but want callback-per-batch | Batcher with on_result=…. |
_request_data path, so rate-limiting +
MinervaTransientError compose with either choice.