Backtesting

Feed replayed market data through the same event loop you use live.

Source
WebSocket replay
For
Backtests and dry runs

Replay

Backtesting guide

Feed synchronized historical market data through the same event loop you use for live handling.

Python
import asyncio, json, websockets
async def backtest_strategy():
uri = "wss://api.0xarchive.io/ws?apiKey=0xa_your_api_key"
async with websockets.connect(uri) as ws:
# Replay orderbook + trades together at 50x speed
await ws.send(json.dumps({
"op": "replay",
"channels": ["orderbook", "trades"],
"symbol": "BTC",
"start": 1704067200000, # Jan 1 2024
"end": 1704153600000, # Jan 2 2024
"speed": 50
}))
orderbook = None
trades = []
async for message in ws:
msg = json.loads(message)
if msg["type"] == "replay_snapshot":
# Initial state before timeline starts
if msg["channel"] == "orderbook":
orderbook = msg["data"]
elif msg["type"] == "historical_data":
if msg["channel"] == "orderbook":
orderbook = msg["data"]
# Run strategy on each orderbook update
signal = my_strategy(orderbook, trades)
if signal:
execute_paper_trade(signal, msg["timestamp"])
elif msg["channel"] == "trades":
trades.append(msg["data"])
elif msg["type"] == "gap_detected":
print(f"Data gap: {msg['duration_minutes']}min at {msg['gap_start']}")
elif msg["type"] == "replay_completed":
print(f"Backtest done. {msg['snapshots_sent']} data points processed.")
break
asyncio.run(backtest_strategy())

Keep exploring WebSocket