quantpylib.wrappers.databento
We demonstrate a simple example to stream and register handler for AAPL MBP-1 (L1) data stream from Databento's EQUS.MINI dataset.
We print out the millisecond timestamp, the distance between exchange event and local timestamp, mid price as well as the packet capture latency in microseconds from Databento's servers.
import os
import time
import asyncio
import logging
from dotenv import load_dotenv
load_dotenv()
from quantpylib.wrappers.databento import AsyncDBNManager
DBN_KEY = os.getenv("DBN_KEY")
async def print_handler(record):
ts_event = record.ts_event // 1000
ts_recv = record.ts_recv // 1000
now_ms = int(time.time() * 1000)
lvl = record.levels[0]
bid = lvl.bid_px / 1e9
ask = lvl.ask_px / 1e9
mid = (bid + ask) / 2
print(record)
print(f"t={now_ms} lag={now_ms - (ts_event//1000):>5}ms mid={mid:.4f} gateway_delta={ts_recv - ts_event:>8}us")
'''
Mbp1Msg(ts_recv=1776431986532826825, pretty_ts_recv='2026-04-17T13:19:46.532826825Z', rtype=<RType.MBP_1: 1>, publisher_id=EQUS.MINI.EQUS (95), instrument_id=38, ts_event=1776431986532684125, pretty_ts_event='2026-04-17T13:19:46.532684125Z', action='T', side='N', depth=0, price=268100000000, pretty_price=268.1, size=91, flags=LAST (128), ts_in_delta=0, sequence=0, bid_px_00=267000000000, pretty_bid_px_00=267, ask_px_00=268550000000, pretty_ask_px_00=268.55, bid_sz_00=20, ask_sz_00=5, bid_ct_00=0, ask_ct_00=0)
t=1776431987781 lag= 1249ms mid=267.7750 gateway_delta= 142us
'''
async def main():
'''using the SDK'''
dbn = AsyncDBNManager(key=DBN_KEY, dataset="EQUS.MINI")
await dbn.init_client()
await dbn.subscribe(schema="mbp-1",symbols=["AAPL"],handler=print_handler)
await asyncio.sleep(1e9)
if __name__ == "__main__":
asyncio.run(main())
AsyncDBNManager
__init__(key, dataset, ts_out=False, heartbeat_interval_s=10, compression=Compression.NONE)
Initializes the AsyncDBNManager instance.
Creates a manager for Databento's live streaming protocol over raw TCP connection using Databento Binary Encoding (DBN) format for data transmission.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Databento API key for authentication. |
required |
dataset
|
str
|
Dataset identifier (e.g., 'EQUS.MINI', 'GLBX.MDP3'). |
required |
ts_out
|
bool
|
If set, DBN records will be timestamped when they are sent by the gateway. |
False
|
heartbeat_interval_s
|
int
|
Heartbeat interval in seconds. Defaults to 10. |
10
|
compression
|
Compression
|
Compression mode for data transmission. Defaults to Compression.NONE. |
NONE
|
close(reconnecting=False)
async
Close connection and clean up resources.
connect()
async
Establish TCP connection to Databento gateway and authenticate.
init_client()
async
Establish connection and prepare session by calling connect and start.
Called after object creation. Connects to gateway, authenticates, and prepares the session
ready to accept subscribe requests.
start()
async
Start the protocol and begin receiving metadata/data from gateway.
subscribe(schema, symbols, handler, stype_in='raw_symbol', start=None, snapshot=False)
async
Subscribe to market data for specified symbols and schema.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema
|
str
|
Data schema ('mbp-1', 'mbp-10', 'trades', 'ohlcv', etc.). |
required |
symbols
|
list[str]
|
List of symbols to subscribe to. |
required |
handler
|
coroutine
|
Async callback function to handle incoming records. |
required |
stype_in
|
str
|
Symbol type format. Defaults to 'raw_symbol'. |
'raw_symbol'
|
start
|
str
|
Start time for historical replay. Defaults to None. |
None
|
snapshot
|
bool
|
Request snapshot before streaming. Defaults to False. |
False
|