API reference¶
Every public symbol the SDK exports. Importable from augur_sdk.
DebugSession¶
The capture controller. Use as a context manager.
class DebugSession:
def __init__(
self,
*,
run_id: str,
client_name: str,
out_dir: str | Path,
client_version: str | None = None,
client_git_sha: str | None = None,
capture_mode: CaptureMode | str | None = None,
debug_session_id: str | None = None,
redaction_policy: RedactionPolicy | None = None,
store: Store | None = None,
tags: dict[str, str] | None = None,
started_at: str | None = None,
dsn: str | None = None,
): ...
def record_step(self, step: StepTrace) -> None: ...
def record_event(self, event: DecisionEvent) -> None: ...
def attach_observation(
self, *, step_index: int, kind: Literal["pre", "post"], png_bytes: bytes,
) -> str: ...
def set_status(self, status: str) -> None: ...
def add_tag(self, key: str, value: str) -> None: ...
def set_live_endpoints(
self, *, status_url: str | None = None,
video_url: str | None = None, reasoning_url: str | None = None,
) -> None: ...
def close(self, status: str | None = None) -> BundleManifest: ...
Keyword arguments¶
| Argument | Default | Purpose |
|---|---|---|
run_id |
(required) | Adapter-supplied run id; appears in every record |
client_name |
(required) | Your adapter / runtime name (myagent, mantis, …) |
out_dir |
(required) | Where the bundle lands on disk |
client_version |
None | Free-form; surfaces in the viewer's run header |
client_git_sha |
None | Used by diagnostics to attribute regressions |
capture_mode |
AUGUR_CAPTURE_MODE env var, else off |
See concepts/capture-modes.md |
debug_session_id |
auto-generated dbg_<uuid12> |
Override only if you need a stable id (e.g. resume) |
redaction_policy |
DefaultRedactionPolicy() |
See concepts/redaction.md |
store |
LocalFSStore(out_dir) |
Swap for S3Store(...) (stub today) or your own |
tags |
{} |
Free-form key/value, surfaces in viewer filters |
started_at |
now (UTC ISO-8601) | Override when bridging a post-hoc adapter |
dsn |
AUGUR_DSN env var |
When set, the SDK streams + heartbeats; bundle is still written locally |
Lifecycle¶
with DebugSession(...) as s:opens the session.- On
__exit__(or explicits.close()), the bundle is written atomically. - If an exception bubbles up through the
withblock, the run is markedhaltedautomatically.
Methods¶
def attach_observation(*, step_index: int, kind: str, png_bytes: bytes) -> str: ...
def record_step(step: StepTrace) -> None: ...
def record_event(event: DecisionEvent) -> None: ...
def set_status(status: str) -> None: ...
def add_tag(key: str, value: str) -> None: ...
# Mid-run capture-mode override (since 0.1.3)
def set_capture_mode(mode: str | CaptureMode) -> None: ...
# Stream a runner-log chunk to the server (since 0.1.3)
def append_log(text: str, *, step_index: int | None = None, name: str = "run") -> None: ...
# Add a post-hoc verdict to a previously-recorded step (since 0.1.5)
def attach_verifier(
step_index: int,
*,
status: str,
reason: str | None = None,
check: str | None = None,
expected: Any = None,
actual: Any = None,
evidence_refs: list[str] | None = None,
) -> None: ...
# Producer-side training-data helpers (since 0.1.8)
def set_costs(
*,
total_usd: float | None = None,
model_usd: float | None = None,
gpu_usd: float | None = None,
proxy_usd: float | None = None,
tokens_in: int | None = None,
tokens_out: int | None = None,
cache_hit_tokens: int | None = None,
) -> None: ...
def set_step_costs(
step_index: int,
*,
total_usd: float | None = None,
model_usd: float | None = None,
tokens_in: int | None = None,
tokens_out: int | None = None,
cache_hit_tokens: int | None = None,
) -> None: ...
def set_score(
step_index: int,
score: float,
*,
comparator: str | None = None, # verifier|model-judge|exact-match|human
components: dict[str, float] | None = None,
) -> None: ...
def record_modelio(
record: dict,
*,
step_index: int | None = None,
layer: str | None = None,
validate: bool = True,
) -> str: # returns bundle-relative path
...
# Sentry-for-CUA primitives (since 0.1.13)
def set_step_versions(
step_index: int,
*,
model: str | None = None,
prompt: str | None = None,
prompt_hash: str | None = None,
tool_descriptions_hash: str | None = None,
code_git_sha: str | None = None,
grounder: str | None = None,
env_fingerprint_ref: str | None = None,
) -> None: ...
def attach_env_fingerprint(
step_index: int,
*,
url_host: str | None = None,
url_path_template: str | None = None,
viewport_hash: str | None = None,
dom_hash: str | None = None,
api_shapes: dict[str, str] | None = None,
extensions: list[str] | None = None,
) -> None: ...
def record_judge_decision(
step_index: int,
*,
judge_id: str,
judge_type: str, # rule|model|human|hybrid
verdict: dict, # {"status": "passed"|…, ...}
confidence: float | None = None,
evidence_refs: list[str] | None = None,
judged_at: str | None = None,
promote: bool = True,
) -> None: ...
# Sentry-for-CUA primitives — round two (since 0.1.14)
def mark_for_eval(
step_index: int,
reason: str,
*,
candidate_cluster_id: str | None = None,
) -> None: ...
def finalize_outcome(
*,
scope: str = "session", # "step" | "session"
step_index: int | None = None,
verdict: dict | None = None,
task_class: str | None = None,
cost_summary: dict[str, int | float] | None = None,
) -> dict: ...
def successful_task_cost_summary() -> dict | None: ...
def record_reasoning(
step_index: int | None,
text: str,
*,
tokens: int | None = None,
format: str = "adapter_inferred", # adapter_inferred|claude_extended_thinking|openai_reasoning_summary
model: str | None = None,
ts: str | None = None,
) -> dict: ...
def declare_side_effect(
step_index: int,
resource: str,
action: str,
*,
idempotency_key: str | None = None,
reversibility: str = "irreversible", # irreversible|reversible|compensated
compensation_handle: str | None = None,
provenance: str = "sdk_declared", # sdk_declared|adapter_inferred|human_declared
side_effect_id: str | None = None,
) -> str: ... # returns side_effect_id
def commit_side_effect(
side_effect_id: str,
observed_result: Any = None,
) -> None: ...
def mark_side_effect_aborted(
side_effect_id: str,
reason: str,
) -> None: ...
def abort_pending_side_effects(reason: str) -> list[str]: ...
def bind_intervention(
adapter: Adapter,
*,
poll_timeout: float = 25.0,
) -> InterventionChannel | None: ...
# DebugSession constructor kwarg
# branch_context: dict | None = None
# When set, labels the run as a replay branch; lands on the session
# record and propagates a lightweight slice (no mutation payload) to
# every step. See BranchContext model.
# Branching-replay constructor (since 0.2.1)
@classmethod
def branch_from(
cls,
*,
parent_run_id: str,
branch_point_step_index: int,
mutated_axis: str, # model|prompt|action|grounder|tool_description
mutation: dict,
client_name: str,
out_dir: str | Path,
mode: str = "auto", # replay|sandbox|auto
parent_bundle: str | Path | None = None,
branch_id: str | None = None,
run_id: str | None = None,
**session_kwargs,
) -> DebugSession: ...
@property
def branch_mode(self) -> str | None: ...
set_capture_mode(mode) stamps capture_mode on every subsequent
record_step until cleared. The manifest's capture_mode remains the
default; an explicit step["capture_mode"] set by the caller always
wins over the override. Use it to upgrade from metadata to
screenshots after a failed verifier without restarting the session.
append_log(text, ...) POSTs to /api/v1/runs/<run_id>/logs. With
step_index set, the server routes the chunk to logs/step-<idx>.log;
without it, to logs/<name>.log. No-op when streaming is disabled —
local logs belong in the bundle's logs/ directory written directly
via the configured Store.
Training-data helpers (since 0.1.8)¶
set_costs(...) stamps a run-level cost rollup on the session. The
values appear on both the session record (trace.json) and the
manifest (manifest.json#/costs) so cost-aware consumers can read
either. Repeat calls are merge-on-call — unset dimensions are
preserved across calls.
set_step_costs(step_index, ...) patches the costs object on an
already-recorded step. Merge semantics; raises ValueError if no
step exists at step_index.
set_score(step_index, score, ...) adds a continuous reward signal
(0..1) to a recorded step's verdict. Merges into the existing
verdict — the categorical status is preserved. Score is clamped
to [0.0, 1.0]. comparator, when set, must be one of
verifier | model-judge | exact-match | human.
record_modelio(record, *, step_index, layer, validate=True) is the
producer-side helper for one model call's full input + output.
Validates the payload against modelio.schema.json (unless
validate=False), applies the session's redaction policy
(stamping redaction_applied: true), and writes to
modelio/<step_index:04d>-<layer>-<seq>.json (or
modelio/run-<layer>-<seq>.json for step_index=None). Idempotent
on prompt_hash — repeat calls with the same hash return the
existing path without staging a duplicate. Returns the bundle-relative
path. Step → modelio linkage is path-based; consumers find a step's
model calls by globbing modelio/<step:04d>*.json.
When streaming is enabled (DSN configured), the same call also
fires the redacted record live to
POST /api/v1/runs/<run_id>/modelio/<relpath> on a background
thread — producers don't need to wire anything extra; one
record_modelio(...) call covers both the bundle write and the
live ingest. If the server returns 403 (the tenant hasn't
enabled modelio capture), the sink latches off for the rest of
the session and subsequent records skip the network entirely;
they still land in the bundle on close(). All other errors are
logged at DEBUG and do not disable streaming.
Sentry-for-CUA primitives (since 0.1.13)¶
set_step_versions(step_index, ...) stamps version axes onto
step.captured_versions. Used by the platform's causal-attribution
engine to disentangle which input changed when an outcome moves.
Partial updates merge; unset arguments are preserved. When
record_modelio(step_index=...) is called, the SDK auto-stamps
model (from request.model) and prompt_hash onto the same
block so adapters that already use record_modelio get the
linkage for free.
attach_env_fingerprint(step_index, ...) attaches a structural
environment fingerprint to a step: url_host, url_path_template,
viewport_hash, dom_hash, api_shapes, and extensions. Stored
side-by-side with the visual fingerprint
(observation.hashes.phash_64), not merged, so the platform's
determinism checker can attribute drift to agent / model / env
independently. The SDK never derives dom_hash itself — only
adapters that already probe DOM for diagnostics should populate it,
which preserves the screenshot-grounded core invariant.
record_judge_decision(step_index, judge_id, judge_type, verdict, ...)
makes rule, model, human, and hybrid judges first-class.
Decisions accumulate on step.judge_decisions (an ordered list)
and, by default, the supplied verdict is also promoted to the
operative step.verdict with step.verdict_source set to
"<judge_type>:<judge_id>" for provenance. Pass promote=False
to record the decision without changing the operative verdict.
attach_verifier() now also emits an implicit
judge_type="rule" decision alongside its verdict patch, so the
legacy entry point preserves provenance too. When streaming is
enabled, the decision is POSTed live to
POST /api/v1/runs/<run_id>/steps/<step_index>/judge-decisions
on a background thread.
manifest.trajectory_fingerprint is populated automatically at
session close — a deterministic digest over the
(action.type, failure_class|verdict.status, normalized_target_label)
sequence. Same shape → same fingerprint; one-step swap → small
Hamming distance on the bigram half. Algorithm is pluggable via
the augur_sdk.fingerprints entry point group (default = cua_v1).
See concepts/trajectory-fingerprint.md.
Eval, outcome, reasoning, branching, side-effects, intervention (since 0.1.14)¶
mark_for_eval(step_index, reason, candidate_cluster_id=None) tags a
step as a regression-fixture candidate. Idempotent on step_index —
last-write-wins. Tags land in eval_candidates.json at the bundle
root and on the live stream. The tag is value even at rest in the
bundle; CLI export can read it.
finalize_outcome(scope=..., step_index=None, verdict=None, task_class=None, cost_summary=None)
emits a single coupled record so the cost-per-outcome rollup
doesn't have to JOIN three independent fields. scope="step" records
one outcome per step; scope="session" rolls up per-step step.costs
across the run (session-level set_costs() wins ties) and emits a
session-level outcome. Records land in outcomes.json and on the
live stream. successful_task_cost_summary() returns the same
rolled-up summary in-process for budget asserts in tests.
record_reasoning(step_index, text, tokens=None, format=..., model=None)
captures explicit reasoning for models that emit it (Claude extended
thinking, OpenAI reasoning summaries). Records land in
events/reasoning.jsonl (one record per line) and on the live stream.
Reasoning text has its own redaction hook on RedactionPolicy:
add_reasoning_redactor(fn) registers a str -> str callable that
runs only over the text field of a reasoning record, in addition
to the regular redaction pipeline. ModelApiAdapterBase provides a
extract_reasoning_from_response() static method that pulls
reasoning blocks out of a Claude or OpenAI response — adapters can
forward the result straight to record_reasoning().
declare_side_effect(step_index, resource, action, ...) declares an
irreversible action before dispatch so the ledger records the
agent's intent even if the run is killed mid-step. Returns a
side_effect_id. commit_side_effect(side_effect_id, observed_result)
lands the dispatcher's return payload (redacted) under
side_effects/<step:04d>-<id>.json. mark_side_effect_aborted(sid,
reason) is used by the intervention channel when a kill arrives
between declare and commit. abort_pending_side_effects(reason)
mass-aborts every declared-but-not-committed effect — used by the
kill handler. All four post live via StreamingSink.post_side_effect().
The branch_context= constructor kwarg labels the session as a
replay-branch trajectory. Required fields:
DebugSession(
run_id="run_b",
client_name="...",
out_dir="...",
branch_context={
"parent_run_id": "run_a",
"branch_point_step_index": 3,
"mutated_axis": "model", # model|prompt|action|grounder|tool_description
"mutation": {"model": "claude-opus-4-7"},
"branch_id": "branch_xyz", # optional
},
)
The full block lands on the session record (with mutation payload).
A lightweight slice (without mutation) propagates to every step so
server-side cohort filters can exclude branches by default without
joining back to the session. Production runs omit the field — no
regression.
DebugSession.branch_from(...) (since 0.2.1)¶
The higher-level constructor for replay branches. Takes the parent linkage explicitly, picks the right mode automatically, and optionally loads the deterministic prefix from a parent bundle so the branch's bundle includes every step from index 0 without re-execution.
# Mutate the model on a parent run; replay everything up to step 3,
# then run fresh against a sandbox.
with DebugSession.branch_from(
parent_run_id="run_a",
branch_point_step_index=3,
mutated_axis="model", # model|prompt|action|grounder|tool_description
mutation={"model": "claude-opus-4-7"},
client_name="myadapter",
out_dir="branch-bundle/",
mode="auto", # replay|sandbox|auto
parent_bundle="parent-bundle/", # required when mode resolves to replay
) as branch:
assert branch.branch_mode == "replay"
# Steps 0–2 are already in `branch`'s recorder from the parent.
# Producer continues from step 3 with the mutated model:
branch.record_step({...step 3 with new behaviour...})
Modes:
replay— load steps[0, branch_point_step_index)fromparent_bundle'strace.jsoninto the new session, copying pre/post screenshot bytes verbatim. The producer continues fresh frombranch_point_step_index. Refuses whenmutated_axis="action"because the parent's downstream observations no longer reflect what the new agent will see (SPEC §10).sandbox— stampbranch_contextonly; the producer executes from step 0 against a live target. No prefix loading. The right mode whenmutated_axis="action"or when the parent bundle isn't available.auto(default) — pickssandboxformutated_axis="action",replayfor the four upstream axes.
branch_id and run_id default to
f"{parent_run_id}:branch:<short-uuid>" (per
branch_context.schema.json's convention); pass either explicitly to
override. Any other DebugSession kwarg (tags, client_version,
capture_mode, redaction_policy, …) can be passed through.
The resolved mode is exposed on session.branch_mode for callers and
tests that want to introspect after construction.
bind_intervention(adapter) wires up a long-poll on
GET <DSN-base>/runs/<run_id>/commands and dispatches received
commands to the adapter:
| Command | Adapter hook |
|---|---|
pause |
on_pause() |
resume |
on_resume() |
kill |
on_kill() (after abort_pending_side_effects) |
inject_hint |
on_inject_hint(text) |
override_action |
on_action_override(coords, provenance="human_override") |
Returns None when streaming is disabled (no DSN). The SDK guarantees
provenance="human_override" on operator coordinates so the
trajectory preserves the SPEC §4 invariant that runtime action
selection is screenshot-grounded — never silently mixed with grounder
output. Adapters that don't implement a particular hook see the
channel degrade to a no-op for that command type. At-least-once
delivery; SDK dedupes on command_id. Every received command is
logged to the session's decision-event stream with the operator id.
attach_verifier(step_index, status=..., ...) lets an external harness
add a post-hoc verdict to a step the producer left as unknown (or
mis-classified). Useful for trace formats with no native verifier
signal (OpenAI / Anthropic Computer-Use, raw OSWorld): run an external
check against the step's post-state, then attach the result. Replaces
the step's verdict field in-place. When check / expected /
actual are given without an explicit reason, the SDK composes one:
"<check>: expected=<expected> actual=<actual>". With streaming
enabled, the patched step is re-emitted to the live sink so viewers
see the update without waiting for close(). Raises ValueError
when no step exists at step_index. Precedence:
native verdict > attach_verifier > inferred default.
CaptureMode¶
class CaptureMode(StrEnum):
OFF = "off"
METADATA = "metadata"
TRACE = "trace"
SCREENSHOTS = "screenshots"
VIDEO = "video"
MODEL_IO = "model_io"
DISPATCH = "dispatch"
REPLAY = "replay"
FULL = "full"
Modes are ordered: CaptureMode.SCREENSHOTS >= CaptureMode.TRACE etc.
See concepts/capture-modes.md.
def resolve_capture_mode(
explicit: CaptureMode | str | None = None,
*,
env: Mapping[str, str] | None = None,
) -> CaptureMode: ...
Models (augur_sdk.models)¶
All TypedDicts. Pass dicts directly to the SDK — no construction step.
DebugSession— top-level session recordStepTrace— one stepObservation— screenshot metadataDecisionEvent— planner/grounding/verifier/recovery eventReplayFixture— replay seedDiagnosticFinding— output of the rules engineBundleManifest— envelope returned bysession.close()BundleTrace—{ session, steps[] }
Enum types:
- CoordinateSpace = Literal["viewport_css_px", "device_px", "screenshot_px", "dom_client_rect"]
- Provenance = Literal["screenshot", "dom", "human_override", "replay_candidate", "diagnostic"]
- StepStatus, VerdictStatus, RunStatus, RecoveryType, DecisionLayer, DecisionKind
Redaction (augur_sdk.redaction)¶
DEFAULT_POLICY_ID = "default-pii-v1"
class RedactionPolicy:
id: str
drop_keys: frozenset[str] # keys whose values are removed entirely
mask_keys: frozenset[str] # keys whose values are replaced with ***REDACTED:mask***
redactors: list[Callable[[str], str]]
droppers: list[Callable[[str, Any], bool]]
def add_redactor(self, fn: Callable[[str], str]) -> None: ...
def add_dropper(self, fn: Callable[[str, Any], bool]) -> None: ...
def apply(self, value: Any, *, key: str | None = None) -> Any: ...
class DefaultRedactionPolicy(RedactionPolicy): ...
Storage (augur_sdk.storage)¶
class Store(Protocol):
@property
def root_uri(self) -> str: ...
def signed_url(self, relpath: str, *, ttl_seconds: int = 3600) -> str: ...
@contextmanager
def open_write_binary(self, relpath: str) -> Iterator[IO[bytes]]: ...
@contextmanager
def open_write_text(self, relpath: str) -> Iterator[IO[str]]: ...
def exists(self, relpath: str) -> bool: ...
def read_text(self, relpath: str) -> str: ...
def read_bytes(self, relpath: str) -> bytes: ...
class LocalFSStore: ... # default; atomic write-and-rename
class S3Store: ... # stub; lands later
Validation (augur_sdk.validation)¶
@dataclass
class ValidationIssue:
where: str
message: str
path: str
def validate_bundle(bundle_dir: str | Path) -> list[ValidationIssue]: ...
Diagnostics (augur_sdk.diagnostics)¶
class BundleContext:
bundle_dir: Path
@property
def manifest(self) -> dict: ...
@property
def trace(self) -> dict: ...
@property
def session(self) -> dict: ...
@property
def steps(self) -> list[dict]: ...
def events_for_step(self, step_index: int) -> list[dict]: ...
def all_events(self) -> Iterator[dict]: ...
def log_text(self, name: str = "runner.log") -> str | None: ...
def log_paths(self) -> list[str]: ...
class RuleResult:
findings: list[DiagnosticFinding]
def emit(
self, *,
rule_id: str, severity: str, summary: str,
evidence: Iterable[DiagnosticEvidence],
step_index: int | None = None,
recommendation: str | None = None,
) -> None: ...
class Rule(Protocol):
rule_id: str
severity: str
def __call__(self, ctx: BundleContext, result: RuleResult) -> None: ...
def rule(rule_id: str, *, severity: str = "medium") -> Callable[[Callable], Rule]: ...
class RulesEngine:
def __init__(self, rules: Iterable[Rule]) -> None: ...
def evaluate(self, bundle_dir: Path | str) -> list[DiagnosticFinding]: ...
def load_pack(name: str) -> list[Rule]: ...
Adapter base (augur_sdk.adapter)¶
class Adapter(Protocol):
name: ClassVar[str]
SUPPORTED_SCHEMA_RANGE: ClassVar[tuple[str, str]]
def on_run_start(self, *, run_id: str, client_version: str) -> dict[str, str] | None: ...
def on_step(self, raw_step: Any) -> StepTrace | None: ...
def on_action(self, raw_action: Any, step_index: int) -> dict[str, Any]: ...
def on_decision(self, raw_event: Any, step_index: int | None) -> DecisionEvent: ...
def on_observation(self, raw_obs: Any, step_index: int) -> Observation: ...
def on_run_end(self, *, status: str) -> None: ...
See reference/adapter-authoring.md.
Streaming (augur_sdk.streaming)¶
Internal — usually you set AUGUR_DSN and forget about it. But exposed
in case you want to drive the sink yourself:
@dataclass
class DSN:
base_url: str # e.g. https://augur.example/api/v1
token: str # secret api_key
tenant: str # informational
@classmethod
def parse(cls, raw: str) -> "DSN": ...
@classmethod
def from_env(cls, explicit: str | None = None) -> "DSN | None": ...
class StreamingSink:
def __init__(self, dsn: DSN, *, client_name: str, client_version: str | None) -> None: ...
def begin(self, *, run_id: str, capture_mode: str) -> None: ...
def end(self) -> None: ...
def post_manifest(self, manifest: dict[str, Any]) -> None: ...
def put_trace(self, trace: dict[str, Any]) -> None: ...
def put_step(self, step: StepTrace) -> None: ...
def post_events(self, events: list[DecisionEvent], *, step_index: int | None) -> None: ...
def post_screenshot(self, step_index: int, kind: str, png_bytes: bytes) -> None: ...
def post_modelio(self, relpath: str, record: dict[str, Any]) -> None: ...
def post_logs(self, *, text: str, name: str = "run", step_index: int | None = None) -> None: ...
post_modelio(relpath, record) is driven by Session.record_modelio()
— relpath is the bundle-relative path returned by the recorder
(e.g. modelio/0003-planner-0.json), so the live URL is
/api/v1/runs/<run_id>/<relpath>. Fire-and-forget on a background
thread; a 403 from the server latches the sink off for the
session (the bundle still owns the record).
post_logs(text, ...) is driven by Session.append_log() and POSTs
the chunk to /api/v1/runs/<run_id>/logs; the server routes it to
logs/<name>.log or logs/step-<idx>.log.