Stablev2.4.0

Python SDK

skyaiapp · Python 3.9+ · native async + sync wrapper

Production-grade Python SDK: native async on httpx, Pydantic v2 models, runs in notebooks, FastAPI, Celery, and Lambda.

Installation

# pip
pip install skyaiapp

# poetry
poetry add skyaiapp

# uv (recommended for speed)
uv add skyaiapp

# Optional extras
pip install "skyaiapp[opentelemetry]"  # auto-export traces to OTLP
pip install "skyaiapp[langchain]"      # optional langchain adapter

Async + sync

Async is the default. The sync wrapper is friendly for notebooks and scripts.

# Async — preferred for any service
import os
import asyncio
from skyaiapp import SkyAI, RouterError, RouterTimeoutError, RateLimitError
from skyaiapp.types import RouteRequest, RouteResponse

# Singleton — reuse across requests; httpx connection pool lives here.
sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])

async def summarize(doc: str) -> RouteResponse | None:
    try:
        return await sky.route(
            goal="cost",                    # "cost" | "quality" | "stability"
            strategy="balanced",
            messages=[
                {"role": "system", "content": "You are a precise summarizer."},
                {"role": "user",   "content": doc},
            ],
            budget={"max_cost_usd": 0.01},
            cache={"enabled": True, "similarity": 0.92, "ttl_seconds": 86400},
            metadata={"tenant": "acme-corp", "workflow": "summary"},
            timeout_ms=15_000,
        )
    except RateLimitError as e:
        await asyncio.sleep((e.retry_after_ms or 1000) / 1000)
        return await summarize(doc)
    except RouterTimeoutError:
        return None
    except RouterError as e:
        log.error("router_error", code=e.code, trace_id=e.trace_id)
        raise

# Sync wrapper — for notebooks / one-off scripts.
from skyaiapp import SkyAISync

sky_sync = SkyAISync(api_key=os.environ["SKYAIAPP_API_KEY"])
res = sky_sync.route(
    goal="quality",
    messages=[{"role": "user", "content": "..."}],
)
print(res.routing.selected_model, res.output)

Streaming

from skyaiapp import SkyAI

sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])

async def stream_to_user(prompt: str):
    stream = await sky.stream(
        goal="quality",
        strategy="quality-first",
        messages=[{"role": "user", "content": prompt}],
    )

    print("Routing to:", stream.routing.selected_model)

    async for chunk in stream:
        if chunk.type == "token":
            print(chunk.delta, end="", flush=True)
        elif chunk.type == "tool_call":
            print(f"\n[tool] {chunk.tool_name} {chunk.arguments}")
        elif chunk.type == "done":
            print(f"\n$ {chunk.routing.cost_usd:.6f}")

# FastAPI — pipe SSE straight to the client
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(prompt: str):
    stream = await sky.stream(messages=[{"role": "user", "content": prompt}])
    return StreamingResponse(stream.iter_sse(), media_type="text/event-stream")

Agent runtime

from skyaiapp import SkyAI, define_tool
from pydantic import BaseModel, Field

sky = SkyAI(api_key=os.environ["SKYAIAPP_API_KEY"])

# Pydantic-typed tool — validation runs both ways automatically.
class InvoiceArgs(BaseModel):
    id: str = Field(pattern=r"^inv_[a-z0-9]+$")

class InvoiceResult(BaseModel):
    status: str
    amount_usd: float

@define_tool(
    name="lookup_invoice",
    description="Fetch invoice by ID from internal billing system.",
    args_model=InvoiceArgs,
    return_model=InvoiceResult,
)
async def lookup_invoice(args: InvoiceArgs) -> InvoiceResult:
    return await billing.find(args.id)

agent = sky.create_agent(
    tools=["web_search", "calculator", "code_exec", lookup_invoice],
    max_steps=10,
    per_step_timeout_ms=30_000,
    total_budget_usd=0.50,
    model_strategy={"goal": "quality", "strategy": "balanced"},
)

result = await agent.run(
    task="Find this month overdue invoices and email a polite reminder to each.",
    on_step=lambda s: log.info("agent.step", num=s.number, action=s.action),
)
print(result.output, result.trace_id)

Mocking in pytest

# tests/test_summarize.py
import pytest
from skyaiapp import SkyAI
from skyaiapp.testing import mock_route

@pytest.fixture
def sky():
    return SkyAI(api_key="sk_test_x", mock=True)

@pytest.mark.asyncio
async def test_routes_to_cheap_model_on_cost_goal(sky):
    sky.__mock__.add(mock_route(
        goal="cost",
        response={
            "output": "Mock summary.",
            "routing": {"selected_model": "claude-haiku-4.5", "cost_usd": 0.0001, "latency_ms": 200, "cache_hit": False},
            "trace_id": "tr_test_001",
        },
    ))

    res = await sky.route(goal="cost", messages=[{"role": "user", "content": "..."}])
    assert res.routing.selected_model == "claude-haiku-4.5"
    assert res.routing.cost_usd < 0.001

Common anti-patterns

  • Calling sync SkyAISync from async paths

    Why it's bad: Blocks the event loop. In async apps, always await sky.route().

  • Instantiating SkyAI per request

    Why it's bad: Loses the httpx connection pool; full TLS handshake every time. Singleton it.

  • Using bare try/except Exception

    Why it's bad: RouterError subclasses carry retry semantics, trace_id, detail. Branch on subclass.

  • Looped sleep retries without reading retry_after_ms

    Why it's bad: RateLimitError.retry_after_ms is the server's optimal back-off.

See also

Was this page helpful?

Let us know how we can improve

Python SDK | SkyAIApp Docs — SkyAIApp