Python SDK
skyaiapp · Python 3.9+ · native async + sync wrapper
Production-grade Python SDK: native async on httpx, Pydantic v2 models, runs in notebooks, FastAPI, Celery, and Lambda.
Installation
# pip
pip install skyaiapp
# poetry
poetry add skyaiapp
# uv (recommended for speed)
uv add skyaiapp
# Optional extras
pip install "skyaiapp[opentelemetry]" # auto-export traces to OTLP
pip install "skyaiapp[langchain]" # optional langchain adapterAsync + sync
Async is the default. The sync wrapper is friendly for notebooks and scripts.
# Async — preferred for any service
import os
import asyncio
from skyaiapp import SkyAI, RouterError, RouterTimeoutError, RateLimitError
from skyaiapp.types import RouteRequest, RouteResponse
# Singleton — reuse across requests; httpx connection pool lives here.
sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])
async def summarize(doc: str) -> RouteResponse | None:
try:
return await sky.route(
goal="cost", # "cost" | "quality" | "stability"
strategy="balanced",
messages=[
{"role": "system", "content": "You are a precise summarizer."},
{"role": "user", "content": doc},
],
budget={"max_cost_usd": 0.01},
cache={"enabled": True, "similarity": 0.92, "ttl_seconds": 86400},
metadata={"tenant": "acme-corp", "workflow": "summary"},
timeout_ms=15_000,
)
except RateLimitError as e:
await asyncio.sleep((e.retry_after_ms or 1000) / 1000)
return await summarize(doc)
except RouterTimeoutError:
return None
except RouterError as e:
log.error("router_error", code=e.code, trace_id=e.trace_id)
raise
# Sync wrapper — for notebooks / one-off scripts.
from skyaiapp import SkyAISync
sky_sync = SkyAISync(api_key=os.environ["SKYAIAPP_API_KEY"])
res = sky_sync.route(
goal="quality",
messages=[{"role": "user", "content": "..."}],
)
print(res.routing.selected_model, res.output)Streaming
from skyaiapp import SkyAI
sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])
async def stream_to_user(prompt: str):
stream = await sky.stream(
goal="quality",
strategy="quality-first",
messages=[{"role": "user", "content": prompt}],
)
print("Routing to:", stream.routing.selected_model)
async for chunk in stream:
if chunk.type == "token":
print(chunk.delta, end="", flush=True)
elif chunk.type == "tool_call":
print(f"\n[tool] {chunk.tool_name} {chunk.arguments}")
elif chunk.type == "done":
print(f"\n$ {chunk.routing.cost_usd:.6f}")
# FastAPI — pipe SSE straight to the client
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(prompt: str):
stream = await sky.stream(messages=[{"role": "user", "content": prompt}])
return StreamingResponse(stream.iter_sse(), media_type="text/event-stream")Agent runtime
from skyaiapp import SkyAI, define_tool
from pydantic import BaseModel, Field
sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])
# Pydantic-typed tool — validation runs both ways automatically.
class InvoiceArgs(BaseModel):
id: str = Field(pattern=r"^inv_[a-z0-9]+$")
class InvoiceResult(BaseModel):
status: str
amount_usd: float
@define_tool(
name="lookup_invoice",
description="Fetch invoice by ID from internal billing system.",
args_model=InvoiceArgs,
return_model=InvoiceResult,
)
async def lookup_invoice(args: InvoiceArgs) -> InvoiceResult:
return await billing.find(args.id)
agent = sky.create_agent(
tools=["web_search", "calculator", "code_exec", lookup_invoice],
max_steps=10,
per_step_timeout_ms=30_000,
total_budget_usd=0.50,
model_strategy={"goal": "quality", "strategy": "balanced"},
)
result = await agent.run(
task="Find this month overdue invoices and email a polite reminder to each.",
on_step=lambda s: log.info("agent.step", num=s.number, action=s.action),
)
print(result.output, result.trace_id)Mocking in pytest
# tests/test_summarize.py
import pytest
from skyaiapp import SkyAI
from skyaiapp.testing import mock_route
@pytest.fixture
def sky():
return SkyAI(api_key="sk_test_x", mock=True)
@pytest.mark.asyncio
async def test_routes_to_cheap_model_on_cost_goal(sky):
sky.__mock__.add(mock_route(
goal="cost",
response={
"output": "Mock summary.",
"routing": {"selected_model": "claude-haiku-4.5", "cost_usd": 0.0001, "latency_ms": 200, "cache_hit": False},
"trace_id": "tr_test_001",
},
))
res = await sky.route(goal="cost", messages=[{"role": "user", "content": "..."}])
assert res.routing.selected_model == "claude-haiku-4.5"
assert res.routing.cost_usd < 0.001Common anti-patterns
Calling sync SkyAISync from async paths
Why it's bad: Blocks the event loop. In async apps, always await sky.route().
Instantiating SkyAI per request
Why it's bad: Loses the httpx connection pool; full TLS handshake every time. Singleton it.
Using bare try/except Exception
Why it's bad: RouterError subclasses carry retry semantics, trace_id, detail. Branch on subclass.
Looped sleep retries without reading retry_after_ms
Why it's bad: RateLimitError.retry_after_ms is the server's optimal back-off.
See also
Was this page helpful?
Let us know how we can improve