Skip to main content
The main client class for connecting to Odyssey’s audio-visual intelligence platform.

Constructor

Odyssey(api_key: str, **kwargs)
Creates a new Odyssey client instance with the provided API key.
ParameterTypeDescription
api_keystrAPI key for authentication (required)
**kwargsAdditional configuration options
from odyssey import Odyssey

client = Odyssey(api_key="ody_your_api_key_here")

Methods

connect()

Connect to a streaming session. The Odyssey API automatically assigns an available session.
async def connect(
    on_connected: Callable[[], None] | None = None,
    on_disconnected: Callable[[], None] | None = None,
    on_video_frame: Callable[[VideoFrame], None] | None = None,
    on_stream_started: Callable[[str], None] | None = None,
    on_stream_ended: Callable[[], None] | None = None,
    on_interact_acknowledged: Callable[[str], None] | None = None,
    on_stream_error: Callable[[str, str], None] | None = None,
    on_error: Callable[[Exception, bool], None] | None = None,
    on_status_change: Callable[[ConnectionStatus, str | None], None] | None = None,
	on_broadcast_ready: Callable[[BroadcastInfo], None] | None = None,
) -> None
ParameterTypeDescription
on_connectedCallable[[], None]Called when connection is established
on_disconnectedCallable[[], None]Called when connection is closed
on_video_frameCallable[[VideoFrame], None]Called for each video frame
on_stream_startedCallable[[str], None]Called when stream starts (receives stream_id)
on_stream_endedCallable[[], None]Called when stream ends
on_interact_acknowledgedCallable[[str], None]Called when interaction is acknowledged
on_stream_errorCallable[[str, str], None]Called on stream error (reason, message)
on_errorCallable[[Exception, bool], None]Called on error (error, fatal)
on_status_change`Callable[[ConnectionStatus, strNone], None]`Called on status change
on_broadcast_readyCallable[[BroadcastInfo], None]Called when broadcast playback details are available
Raises:
ExceptionDescription
OdysseyAuthErrorAuthentication failed (invalid API key)
OdysseyConnectionErrorConnection failed (no streamers, timeout, etc.)
try:
    await client.connect(
        on_video_frame=lambda frame: process_frame(frame),
        on_stream_error=lambda reason, msg: print(f"Stream error: {reason} - {msg}"),
        on_status_change=lambda status, msg: print(f"Status: {status.value}"),
    )
except OdysseyAuthError:
    print("Invalid API key")
except OdysseyConnectionError as e:
    print(f"Connection failed: {e}")
finally:
    await self.client.disconnect()

disconnect()

Disconnect from the session and clean up resources.
async def disconnect() -> None
await client.disconnect()

start_stream()

Start an interactive stream session. Supports Broadcast mode by passing broadcast=True. When Broadcast is enabled, the stream can be viewed by multiple spectators simultaneously. Playback details are delivered via the on_broadcast_ready callback.
async def start_stream(
    prompt: str = "",
    portrait: bool = True,
    image: str | bytes | Image.Image | np.ndarray | None = None,
    image_path: str | None = None, # deprecated
    bypass_prompt_expansion: bool | None = None,
    broadcast: bool = False
) -> str
ParameterTypeDefaultDescription
promptstr""Initial prompt to generate video content
portraitboolTrueTrue for portrait (704x1280), False for landscape (1280x704). Resolution may vary by model.
image`str | bytesImage.Imagenp.ndarrayNone`NoneImage for image-to-video generation (see formats below)
image_path`strNone`NoneDeprecated. Use image instead.
bypass_prompt_expansion`boolNone`NoneSkip prompt expansion (safety-only mode). Requires the expansion bypass privilege.
broadcastboolFalseEnable broadcast mode, allowing multiple spectators to view the same running stream.
Supported image formats for the image parameter:
TypeDescription
strFile path to an image
bytesRaw image bytes
PIL.Image.ImagePIL Image object
np.ndarrayNumPy array (RGB uint8, shape HxWx3)
Returns: str - Stream ID when the stream is ready. Use this ID to retrieve recordings. Raises: OdysseyStreamError - If not connected or stream fails to start.
try:
    stream_id = await client.start_stream("A cat", portrait=True)
    print(f"Stream started: {stream_id}")
except OdysseyStreamError as e:
    print(f"Failed to start stream: {e}")
Image-to-video requirements:
  • SDK version 1.0.0+
  • Max size: 25MB
  • Supported formats: JPEG, PNG, WebP, GIF, BMP, HEIC, HEIF, AVIF
  • Images are resized to 1280x704 (landscape) or 704x1280 (portrait)
# Image-to-video examples
await client.connect(on_video_frame=process_frame)

# Using a file path
stream_id = await client.start_stream(
    prompt="A cat",
    portrait=False,
    image="/path/to/image.jpg"
)

# Using PIL Image
from PIL import Image
pil_image = Image.open("/path/to/image.jpg")
stream_id = await client.start_stream(prompt="A cat", image=pil_image)

# Using bytes
with open("/path/to/image.jpg", "rb") as f:
    image_bytes = f.read()
stream_id = await client.start_stream(prompt="A cat", image=image_bytes)

connect_to_stream()

Connect to an existing broadcast stream as a spectator. This method is used when joining a broadcast created with start_stream(broadcast=True). The webrtc_url and spectator_token are provided via the on_broadcast_ready callback.
connect_to_stream(
   webrtc_url: str,
   spectator_token: str,
   on_video_frame: VideoFrameCallback | None = None,
   on_disconnected: DisconnectedCallback | None = None,
   debug: bool = False
) -> SpectatorConnection
ParameterTypeDescription
webrtc_urlstrWebRTC (WHEP) endpoint for the broadcast stream
spectator_tokenstrAuthentication token required to join the broadcast
on_video_frame`Callable[[VideoFrame], None]None`Optional callback receiving video frames
on_disconnected`Callable[[], None]None`Callback triggered when the broadcast ends
debugboolEnable debugging logs
Returns: SpectatorConnection - Handle for managing the playback session. Raises:
ExceptionDescription
ValueErrorIf spectator token is invalid (401)
ConnectionErrorIf stream not found (404) or connection fails
Example:
from odyssey import connect_to_stream

spectator = connect_to_stream(
   webrtc_url,
   spectator_token
)

print("Connected to broadcast")

interact()

Send an interaction prompt to update the video content.
async def interact(prompt: str) -> str
ParameterTypeDescription
promptstrThe interaction prompt
Returns: str - The acknowledged prompt when processed. Raises: OdysseyStreamError - If not connected or no active stream.
try:
    ack_prompt = await client.interact("Pet the cat")
    print(f"Interaction acknowledged: {ack_prompt}")
except OdysseyStreamError as e:
    print(f"Failed to interact: {e}")

end_stream()

End the current interactive stream session.
async def end_stream() -> None
Raises: OdysseyStreamError - If not connected.
await client.end_stream()

get_recording()

Get recording data for a stream with presigned URLs.
async def get_recording(stream_id: str) -> Recording
ParameterTypeDescription
stream_idstrThe stream ID to get recording for (from start_stream)
Returns: Recording - Recording data with presigned URLs valid for ~1 hour.
This method can be called without an active connection. It only requires a valid API key.
recording = await client.get_recording("stream-123")
if recording.video_url:
    print(f"Video: {recording.video_url}")
    print(f"Duration: {recording.duration_seconds}s")

list_stream_recordings()

List stream recordings for the authenticated user.
async def list_stream_recordings(
    limit: int | None = None,
    offset: int | None = None
) -> StreamRecordingsList
ParameterTypeDefaultDescription
limit`intNone`NoneMaximum number of recordings to return
offset`intNone`NoneNumber of recordings to skip for pagination
Returns: StreamRecordingsList - Paginated list of stream recordings.
This method can be called without an active connection. It only requires a valid API key.
result = await client.list_stream_recordings(limit=10)
for rec in result.recordings:
    print(f"{rec.stream_id}: {rec.duration_seconds}s ({rec.width}x{rec.height})")
print(f"Total: {result.total}")

Simulate API Methods

Simulate API methods were added in v1.0.0
The Simulate API allows you to run scripted interactions asynchronously. Unlike the Interactive API, simulations execute in the background and produce recordings you can retrieve when complete.

simulate()

Create a new simulation job.
async def simulate(
    *,
    script: list[dict] | None = None,
    scripts: list[list[dict]] | None = None,
    script_url: str | None = None,
    portrait: bool = True,
    bypass_prompt_expansion: bool | None = None,
) -> SimulationJobDetail
ParameterTypeDefaultDescription
script`list[dict]None`NoneSingle script to run (provide one of script, scripts, or script_url)
scripts`list[list[dict]]None`NoneBatch mode: multiple scripts to run in a single job
script_url`strNone`NoneURL to a JSON file containing the script
portraitboolTruePortrait mode
bypass_prompt_expansion`boolNone`NoneSkip prompt expansion (safety-only mode). Requires the expansion bypass privilege.
Script entry format:
KeyTypeDescription
timestamp_msintWhen this action occurs (milliseconds from start)
start{ "prompt": str, "image": str | bytes }Begin a new stream
interact{ "prompt": str }Send an interaction prompt
end{}End the current stream (empty dict)
Returns: SimulationJobDetail - The created simulation job with ID, status, and streams.
job = await client.simulate(
    script=[
        {"timestamp_ms": 0, "start": {"prompt": "A cat sitting on a windowsill"}},
        {"timestamp_ms": 3000, "interact": {"prompt": "The cat stretches"}},
        {"timestamp_ms": 6000, "interact": {"prompt": "The cat yawns"}},
        {"timestamp_ms": 9000, "end": {}}
    ],
    portrait=True
)
print(f"Simulation started: {job.job_id}")

get_simulate_status()

Get the current status of a simulation job.
async def get_simulate_status(job_id: str) -> SimulationJobDetail
ParameterTypeDescription
job_idstrThe job ID to check
Returns: SimulationJobDetail - Detailed status including streams created.
status = await client.get_simulate_status(job.job_id)
print(f"Status: {status.status}")
if status.status == "completed":
    for stream in status.streams:
        print(f"Stream: {stream.stream_id}")

list_simulations()

List simulation jobs for the authenticated user.
async def list_simulations(
    *,
    status: SimulationJobStatus | None = None,
    active: bool | None = None,
    limit: int | None = None,
    offset: int | None = None,
) -> SimulationJobsList
ParameterTypeDefaultDescription
status`SimulationJobStatusNone`NoneFilter by job status
active`boolNone`NoneOnly show active jobs (pending/dispatched/processing)
limit`intNone`NoneMaximum jobs to return
offset`intNone`NoneNumber of jobs to skip for pagination
Returns: SimulationJobsList - Paginated list of simulation jobs.
result = await client.list_simulations(limit=10)
for sim in result.jobs:
    print(f"{sim.job_id}: {sim.status}")
print(f"Total: {result.total}")

cancel_simulation()

Cancel a pending or running simulation job.
async def cancel_simulation(job_id: str) -> SimulationJobInfo
ParameterTypeDescription
job_idstrThe job ID to cancel
Returns: SimulationJobInfo - The cancelled job’s summary.
cancelled = await client.cancel_simulation(job.job_id)
print(f"Simulation {cancelled.job_id} cancelled (status: {cancelled.status})")
Simulation methods can be called without an active connection. They only require a valid API key.

Properties

is_connected

@property
def is_connected(self) -> bool
Whether the client is currently connected and ready.

current_status

@property
def current_status(self) -> ConnectionStatus
Current connection status. Possible values: AUTHENTICATING, CONNECTING, RECONNECTING, CONNECTED, DISCONNECTED, FAILED

current_session_id

@property
def current_session_id(self) -> str | None
Current session ID, or None if not connected.