Back to Blog
Python & Code

Async Python for Real-Time Market Data

QFQuantForge Team·April 3, 2026·9 min read

Our platform runs 45 trading bots, a FastAPI server, a WebSocket data stream, a Telegram notification service, an equity snapshot scheduler, and a risk event persister, all in a single Python process on one machine. The glue that makes this possible is asyncio, Python's built-in asynchronous I/O framework. Without it, we would need multiprocessing, inter-process communication, and distributed state management for what is fundamentally a single-machine workload.

Here is how we use async Python in practice, what went wrong along the way, and why the event bus design is the most important architectural decision we made.

The asyncio Event Loop

Every async operation in our process shares one event loop. When a bot tick fetches candles from Binance, it does not block the thread. It registers a callback with the event loop and yields control. While that bot waits for the HTTP response, other bots can run their ticks, the API server can handle dashboard requests, and the WebSocket stream can process incoming price updates.

The FastAPI server creates the event loop on startup. Our lifespan handler initializes all services within that loop: the DataFetcher (ccxt exchange connection), BotManager, SchedulerService, and the event bus. Everything is coroutine-based from the ground up.

async def lifespan(app: FastAPI):
    exchange = ccxt.pro.binance(config)
    data_fetcher = DataFetcher(exchange)
    bot_manager = BotManager(data_fetcher, executor)
    scheduler = SchedulerService()
    yield  # app runs
    await scheduler.shutdown()

The yield in the lifespan is where the application runs. All 45 bots tick, all API requests are served, all events are published and consumed. When the server shuts down, execution resumes after yield, and we gracefully stop the scheduler.

ccxt.pro WebSocket Streaming

For real-time price data, we use ccxt.pro, the async WebSocket extension of ccxt. The standard ccxt library uses REST (HTTP request/response). ccxt.pro opens persistent WebSocket connections that push data as it arrives.

async def stream_prices(exchange, symbols):
    while True:
        for symbol in symbols:
            ticker = await exchange.watch_ticker(symbol)
            await event_bus.publish(Event(
                type=EventType.PRICE_UPDATE,
                data={"symbol": symbol, "price": ticker["last"]}
            ))

Each watch_ticker call returns the latest price the instant Binance broadcasts it. No polling interval. No wasted API weight. The WebSocket connection costs zero rate limit budget compared to REST calls.

In practice, we maintain WebSocket connections for all 25 symbols we track. The data streaming module (data/stream.py) manages these connections, handles reconnection on disconnect, and publishes price events to the internal event bus.

The critical detail is that await exchange.watch_ticker(symbol) is non-blocking within the event loop. While one symbol's WebSocket is waiting for data, the event loop serves other coroutines. All 25 WebSocket streams multiplex over the same event loop without threading.

The Event Bus: asyncio.Queue Pub/Sub

The event bus is the central nervous system of the platform. It is a publish/subscribe system built entirely on asyncio.Queue. Each event type (SIGNAL_GENERATED, ORDER_FILLED, CIRCUIT_BREAKER_FIRED, and 11 others) has a list of subscriber queues. Publishing an event puts it on all subscriber queues. Subscribing returns a queue that receives all events of that type.

class EventBus:
    async def publish(self, event: Event):
        for queue in self._subscribers[event.type]:
            try:
                queue.put_nowait(event)
            except asyncio.QueueFull:
                pass  # backpressure: drop if subscriber is behind

The put_nowait call is the key design choice. It never blocks the publisher. If a subscriber's queue is full (meaning the subscriber is processing events slower than they arrive), the event is dropped silently rather than stalling the publisher.

This is intentional. The Telegram notification service is a subscriber. If the Telegram API is slow (network latency, rate limiting), its queue fills up. Without non-blocking publish, a slow Telegram response would delay bot tick execution, which would delay trade signals, which could mean missed entries. The bot's trading logic should never wait on a notification service.

Backpressure: Why 1,000 Messages

Each subscriber queue has a maximum size of 1,000 messages. We arrived at this number through production observation.

During high-volatility periods, all 45 bots might generate signals simultaneously. Each signal publishes a SIGNAL_GENERATED event. If the signal passes risk checks, ORDER_PLACED follows. Then ORDER_FILLED, POSITION_OPENED, and potentially CIRCUIT_BREAKER_FIRED events cascade. A single market spike can produce 200+ events in a few seconds.

At 1,000 messages, the queue holds about 10 seconds of burst traffic at peak event rate. This gives slow subscribers enough buffer to process events without dropping during normal bursts, while preventing unbounded memory growth if a subscriber truly stalls.

Before settling on 1,000, we tested 100 (too aggressive, dropped Telegram notifications during normal operation), 10,000 (consumed too much memory under sustained load), and unlimited (memory leak risk if a subscriber deadlocked). The 1,000-message buffer is the Goldilocks zone for our workload.

How 45 Bots Share One Event Loop

The scheduler creates one APScheduler job per bot. Each job calls the bot's tick() method as an async coroutine. The scheduler uses the AsyncIOScheduler backend, which dispatches jobs into the existing event loop rather than creating threads.

When a 15-minute bot ticks, its tick() coroutine runs through this sequence: fetch candles (async HTTP via ccxt, yields while waiting for response), compute indicators (CPU-bound pandas operations, does not yield), run strategy analysis (CPU-bound, does not yield), publish signal event (non-blocking put_nowait), check risk limits (CPU-bound), execute order if approved (async, yields during simulated or real order placement).

The CPU-bound portions (indicator computation, strategy analysis) do not yield to the event loop. While one bot is computing its RSI and Bollinger Bands, no other coroutine runs. But these computations take 5-50 milliseconds depending on the strategy complexity and candle count. With 45 bots and staggered schedules, the total CPU time per tick cycle is well under 1 second, leaving plenty of event loop capacity for I/O operations.

The staggering is important. We start bots 2 seconds apart during initialization. This means 15-minute bots do not all tick at exactly the same second. Spread across a 90-second window (45 bots times 2 seconds), the load is smoothed enough that no single tick cycle monopolizes the event loop.

Error Isolation

A critical property of the async architecture is that errors in one bot do not affect others. Each bot's tick() is wrapped in a try/except that catches any exception, publishes a BOT_ERROR event, and returns. The scheduler continues calling other bots' ticks on schedule.

async def tick(self):
    try:
        candles = await self.data_adapter.fetch_candles(
            self.symbol, self.strategy.required_timeframes
        )
        # ... strategy analysis, risk check, execution
    except Exception as e:
        await self.event_bus.publish(Event(
            type=EventType.BOT_ERROR,
            data={"bot_id": self.id, "error": str(e)}
        ))

This isolation has saved us repeatedly. A malformed candle response from Binance for one symbol (it happens during exchange maintenance) stops that one bot's tick. The other 44 continue operating. A bug in a strategy's indicator calculation (we had one where a division by zero occurred when ATR was zero during a period of no price movement) crashes that bot's tick, not the process.

In a threaded or multiprocessing architecture, achieving this isolation requires careful exception handling in thread workers, process supervisors, and restart logic. In async, the isolation is natural: each coroutine is independent, and an unhandled exception in one propagates only to its caller.

The SSE Subscriber

The React dashboard receives real-time updates via Server-Sent Events (SSE). An SSE endpoint subscribes to all event types on the event bus and streams them to connected browser clients.

When a bot generates a signal, the event flows: bot tick publishes SIGNAL_GENERATED to the event bus, the SSE subscriber picks it up from its queue, serializes it to JSON, and pushes it to the SSE stream. The dashboard receives it and updates the UI. Total latency from signal generation to dashboard display is under 100 milliseconds.

The SSE subscriber is the one subscriber that gets all event types. Other subscribers are selective: the Telegram notifier only subscribes to ORDER_FILLED and CIRCUIT_BREAKER_FIRED, the risk event persister only subscribes to CIRCUIT_BREAKER_FIRED and RISK_DECISION_REJECTED.

Lessons Learned

Three lessons from running async Python in production for two years.

First, CPU-bound work is the enemy. Async does not help with computation, only with I/O waiting. If a strategy's indicator calculation takes 500 milliseconds, the event loop is blocked for 500 milliseconds. No other bot ticks, no API requests are served, no WebSocket data is processed. We keep strategy computation lean for this reason. Heavy computation (backtesting, ML training) runs in ProcessPoolExecutor, outside the event loop.

Second, debug async carefully. When a coroutine hangs (waiting for a response that never comes), it silently blocks nothing except itself. But if that coroutine holds a database write lock (via the busy_timeout mechanism), other writers queue behind it. We added timeout wrappers around all exchange API calls: if ccxt does not return within 30 seconds, the coroutine raises a timeout error and the bot tick fails cleanly.

Third, the event bus should be fire-and-forget. We initially tried an event bus with guaranteed delivery (blocking publish, retry on full queue). It was a disaster. One slow subscriber (Telegram during a rate limit) backed up the entire system. Switching to non-blocking publish with bounded queues solved every problem we had with system-wide stalls.

The async architecture is not the simplest approach. A synchronous system with one bot per thread would be easier to reason about. But it would not scale to 45 bots on a single machine without significant thread management overhead, and the event bus would need thread-safe queues instead of asyncio's lightweight implementation. For our single-process, single-machine architecture, asyncio is the right foundation.