The main client class for connecting to Odyssey’s audio-visual intelligence platform.
Constructor
Odyssey(api_key: str, **kwargs)
Creates a new Odyssey client instance with the provided API key.
| Parameter | Type | Description |
|---|
api_key | str | API key for authentication (required) |
**kwargs | | Additional configuration options |
from odyssey import Odyssey
client = Odyssey(api_key="ody_your_api_key_here")
Methods
connect()
Connect to a streaming session. The Odyssey API automatically assigns an available session.
async def connect(
on_connected: Callable[[], None] | None = None,
on_disconnected: Callable[[], None] | None = None,
on_video_frame: Callable[[VideoFrame], None] | None = None,
on_stream_started: Callable[[str], None] | None = None,
on_stream_ended: Callable[[], None] | None = None,
on_interact_acknowledged: Callable[[str], None] | None = None,
on_stream_error: Callable[[str, str], None] | None = None,
on_error: Callable[[Exception, bool], None] | None = None,
on_status_change: Callable[[ConnectionStatus, str | None], None] | None = None,
on_broadcast_ready: Callable[[BroadcastInfo], None] | None = None,
) -> None
| Parameter | Type | Description | |
|---|
on_connected | Callable[[], None] | Called when connection is established | |
on_disconnected | Callable[[], None] | Called when connection is closed | |
on_video_frame | Callable[[VideoFrame], None] | Called for each video frame | |
on_stream_started | Callable[[str], None] | Called when stream starts (receives stream_id) | |
on_stream_ended | Callable[[], None] | Called when stream ends | |
on_interact_acknowledged | Callable[[str], None] | Called when interaction is acknowledged | |
on_stream_error | Callable[[str, str], None] | Called on stream error (reason, message) | |
on_error | Callable[[Exception, bool], None] | Called on error (error, fatal) | |
on_status_change | `Callable[[ConnectionStatus, str | None], None]` | Called on status change |
on_broadcast_ready | Callable[[BroadcastInfo], None] | Called when broadcast playback details are available | |
Raises:
| Exception | Description |
|---|
OdysseyAuthError | Authentication failed (invalid API key) |
OdysseyConnectionError | Connection failed (no streamers, timeout, etc.) |
try:
await client.connect(
on_video_frame=lambda frame: process_frame(frame),
on_stream_error=lambda reason, msg: print(f"Stream error: {reason} - {msg}"),
on_status_change=lambda status, msg: print(f"Status: {status.value}"),
)
except OdysseyAuthError:
print("Invalid API key")
except OdysseyConnectionError as e:
print(f"Connection failed: {e}")
finally:
await self.client.disconnect()
disconnect()
Disconnect from the session and clean up resources.
async def disconnect() -> None
await client.disconnect()
start_stream()
Start an interactive stream session.
Supports Broadcast mode by passing broadcast=True.
When Broadcast is enabled, the stream can be viewed by multiple spectators simultaneously. Playback details are delivered via the on_broadcast_ready callback.
async def start_stream(
prompt: str = "",
portrait: bool = True,
image: str | bytes | Image.Image | np.ndarray | None = None,
image_path: str | None = None, # deprecated
bypass_prompt_expansion: bool | None = None,
broadcast: bool = False
) -> str
| Parameter | Type | Default | Description | | | |
|---|
prompt | str | "" | Initial prompt to generate video content | | | |
portrait | bool | True | True for portrait (704x1280), False for landscape (1280x704). Resolution may vary by model. | | | |
image | `str | bytes | Image.Image | np.ndarray | None` | None | Image for image-to-video generation (see formats below) |
image_path | `str | None` | None | Deprecated. Use image instead. | | |
bypass_prompt_expansion | `bool | None` | None | Skip prompt expansion (safety-only mode). Requires the expansion bypass privilege. | | |
broadcast | bool | False | Enable broadcast mode, allowing multiple spectators to view the same running stream. | | | |
Supported image formats for the image parameter:
| Type | Description |
|---|
str | File path to an image |
bytes | Raw image bytes |
PIL.Image.Image | PIL Image object |
np.ndarray | NumPy array (RGB uint8, shape HxWx3) |
Returns: str - Stream ID when the stream is ready. Use this ID to retrieve recordings.
Raises: OdysseyStreamError - If not connected or stream fails to start.
try:
stream_id = await client.start_stream("A cat", portrait=True)
print(f"Stream started: {stream_id}")
except OdysseyStreamError as e:
print(f"Failed to start stream: {e}")
Image-to-video requirements:
- SDK version 1.0.0+
- Max size: 25MB
- Supported formats: JPEG, PNG, WebP, GIF, BMP, HEIC, HEIF, AVIF
- Images are resized to 1280x704 (landscape) or 704x1280 (portrait)
# Image-to-video examples
await client.connect(on_video_frame=process_frame)
# Using a file path
stream_id = await client.start_stream(
prompt="A cat",
portrait=False,
image="/path/to/image.jpg"
)
# Using PIL Image
from PIL import Image
pil_image = Image.open("/path/to/image.jpg")
stream_id = await client.start_stream(prompt="A cat", image=pil_image)
# Using bytes
with open("/path/to/image.jpg", "rb") as f:
image_bytes = f.read()
stream_id = await client.start_stream(prompt="A cat", image=image_bytes)
connect_to_stream()
Connect to an existing broadcast stream as a spectator.
This method is used when joining a broadcast created with start_stream(broadcast=True). The webrtc_url and spectator_token are provided via the on_broadcast_ready callback.
connect_to_stream(
webrtc_url: str,
spectator_token: str,
on_video_frame: VideoFrameCallback | None = None,
on_disconnected: DisconnectedCallback | None = None,
debug: bool = False
) -> SpectatorConnection
| Parameter | Type | Description | |
|---|
webrtc_url | str | WebRTC (WHEP) endpoint for the broadcast stream | |
spectator_token | str | Authentication token required to join the broadcast | |
on_video_frame | `Callable[[VideoFrame], None] | None` | Optional callback receiving video frames |
on_disconnected | `Callable[[], None] | None` | Callback triggered when the broadcast ends |
debug | bool | Enable debugging logs | |
Returns: SpectatorConnection - Handle for managing the playback session.
Raises:
| Exception | Description |
|---|
ValueError | If spectator token is invalid (401) |
ConnectionError | If stream not found (404) or connection fails |
Example:
from odyssey import connect_to_stream
spectator = connect_to_stream(
webrtc_url,
spectator_token
)
print("Connected to broadcast")
interact()
Send an interaction prompt to update the video content.
async def interact(prompt: str) -> str
| Parameter | Type | Description |
|---|
prompt | str | The interaction prompt |
Returns: str - The acknowledged prompt when processed.
Raises: OdysseyStreamError - If not connected or no active stream.
try:
ack_prompt = await client.interact("Pet the cat")
print(f"Interaction acknowledged: {ack_prompt}")
except OdysseyStreamError as e:
print(f"Failed to interact: {e}")
end_stream()
End the current interactive stream session.
async def end_stream() -> None
Raises: OdysseyStreamError - If not connected.
await client.end_stream()
get_recording()
Get recording data for a stream with presigned URLs.
async def get_recording(stream_id: str) -> Recording
| Parameter | Type | Description |
|---|
stream_id | str | The stream ID to get recording for (from start_stream) |
Returns: Recording - Recording data with presigned URLs valid for ~1 hour.
This method can be called without an active connection. It only requires a valid API key.
recording = await client.get_recording("stream-123")
if recording.video_url:
print(f"Video: {recording.video_url}")
print(f"Duration: {recording.duration_seconds}s")
list_stream_recordings()
List stream recordings for the authenticated user.
async def list_stream_recordings(
limit: int | None = None,
offset: int | None = None
) -> StreamRecordingsList
| Parameter | Type | Default | Description | |
|---|
limit | `int | None` | None | Maximum number of recordings to return |
offset | `int | None` | None | Number of recordings to skip for pagination |
Returns: StreamRecordingsList - Paginated list of stream recordings.
This method can be called without an active connection. It only requires a valid API key.
result = await client.list_stream_recordings(limit=10)
for rec in result.recordings:
print(f"{rec.stream_id}: {rec.duration_seconds}s ({rec.width}x{rec.height})")
print(f"Total: {result.total}")
Simulate API Methods
Simulate API methods were added in v1.0.0
The Simulate API allows you to run scripted interactions asynchronously. Unlike the Interactive API, simulations execute in the background and produce recordings you can retrieve when complete.
simulate()
Create a new simulation job.
async def simulate(
*,
script: list[dict] | None = None,
scripts: list[list[dict]] | None = None,
script_url: str | None = None,
portrait: bool = True,
bypass_prompt_expansion: bool | None = None,
) -> SimulationJobDetail
| Parameter | Type | Default | Description | |
|---|
script | `list[dict] | None` | None | Single script to run (provide one of script, scripts, or script_url) |
scripts | `list[list[dict]] | None` | None | Batch mode: multiple scripts to run in a single job |
script_url | `str | None` | None | URL to a JSON file containing the script |
portrait | bool | True | Portrait mode | |
bypass_prompt_expansion | `bool | None` | None | Skip prompt expansion (safety-only mode). Requires the expansion bypass privilege. |
Script entry format:
| Key | Type | Description |
|---|
timestamp_ms | int | When this action occurs (milliseconds from start) |
start | { "prompt": str, "image": str | bytes } | Begin a new stream |
interact | { "prompt": str } | Send an interaction prompt |
end | {} | End the current stream (empty dict) |
Returns: SimulationJobDetail - The created simulation job with ID, status, and streams.
job = await client.simulate(
script=[
{"timestamp_ms": 0, "start": {"prompt": "A cat sitting on a windowsill"}},
{"timestamp_ms": 3000, "interact": {"prompt": "The cat stretches"}},
{"timestamp_ms": 6000, "interact": {"prompt": "The cat yawns"}},
{"timestamp_ms": 9000, "end": {}}
],
portrait=True
)
print(f"Simulation started: {job.job_id}")
get_simulate_status()
Get the current status of a simulation job.
async def get_simulate_status(job_id: str) -> SimulationJobDetail
| Parameter | Type | Description |
|---|
job_id | str | The job ID to check |
Returns: SimulationJobDetail - Detailed status including streams created.
status = await client.get_simulate_status(job.job_id)
print(f"Status: {status.status}")
if status.status == "completed":
for stream in status.streams:
print(f"Stream: {stream.stream_id}")
list_simulations()
List simulation jobs for the authenticated user.
async def list_simulations(
*,
status: SimulationJobStatus | None = None,
active: bool | None = None,
limit: int | None = None,
offset: int | None = None,
) -> SimulationJobsList
| Parameter | Type | Default | Description | |
|---|
status | `SimulationJobStatus | None` | None | Filter by job status |
active | `bool | None` | None | Only show active jobs (pending/dispatched/processing) |
limit | `int | None` | None | Maximum jobs to return |
offset | `int | None` | None | Number of jobs to skip for pagination |
Returns: SimulationJobsList - Paginated list of simulation jobs.
result = await client.list_simulations(limit=10)
for sim in result.jobs:
print(f"{sim.job_id}: {sim.status}")
print(f"Total: {result.total}")
cancel_simulation()
Cancel a pending or running simulation job.
async def cancel_simulation(job_id: str) -> SimulationJobInfo
| Parameter | Type | Description |
|---|
job_id | str | The job ID to cancel |
Returns: SimulationJobInfo - The cancelled job’s summary.
cancelled = await client.cancel_simulation(job.job_id)
print(f"Simulation {cancelled.job_id} cancelled (status: {cancelled.status})")
Simulation methods can be called without an active connection. They only require a valid API key.
Properties
is_connected
@property
def is_connected(self) -> bool
Whether the client is currently connected and ready.
current_status
@property
def current_status(self) -> ConnectionStatus
Current connection status.
Possible values: AUTHENTICATING, CONNECTING, RECONNECTING, CONNECTED, DISCONNECTED, FAILED
current_session_id
@property
def current_session_id(self) -> str | None
Current session ID, or None if not connected.