Backtesting
Feed replayed market data through the same event loop you use live.
Replay
Backtesting guide
Feed synchronized historical market data through the same event loop you use for live handling.
Python
import asyncio, json, websockets
async def backtest_strategy(): uri = "wss://api.0xarchive.io/ws?apiKey=0xa_your_api_key" async with websockets.connect(uri) as ws: # Replay orderbook + trades together at 50x speed await ws.send(json.dumps({ "op": "replay", "channels": ["orderbook", "trades"], "symbol": "BTC", "start": 1704067200000, # Jan 1 2024 "end": 1704153600000, # Jan 2 2024 "speed": 50 }))
orderbook = None trades = []
async for message in ws: msg = json.loads(message)
if msg["type"] == "replay_snapshot": # Initial state before timeline starts if msg["channel"] == "orderbook": orderbook = msg["data"]
elif msg["type"] == "historical_data": if msg["channel"] == "orderbook": orderbook = msg["data"] # Run strategy on each orderbook update signal = my_strategy(orderbook, trades) if signal: execute_paper_trade(signal, msg["timestamp"]) elif msg["channel"] == "trades": trades.append(msg["data"])
elif msg["type"] == "gap_detected": print(f"Data gap: {msg['duration_minutes']}min at {msg['gap_start']}")
elif msg["type"] == "replay_completed": print(f"Backtest done. {msg['snapshots_sent']} data points processed.") break
asyncio.run(backtest_strategy())