throttler
This page describes how you may use our quantpylib.throttler
module and the functionalities exposed by our APIs. This module contains classes meant for synchronizing, throttling and handling tasks such as network requests efficiently.
quantpylib.throttler.aiosonic
features a HTTP client written on top of aiosonic
and orjson
, acting as an asynchronous HTTP library optimizing the speed of HTTP requests and handling of data packets.
quantpylib.throttler.rate_semaphore
features both synchronous and asynchronous support for rate-limiting access to function calls via a credit-based semaphore synchronization tool. A simple but common use would be to maximise throughput of API requests to an external server that places rate-limits on the number of API requests per minute/hour with request credits charged against a resource pool. This is made available through the RateSemaphore
and AsyncRateSemaphore
classes - and can be seen as an extension of the synchronization
primitives threading.Semaphore
and asyncio.locks.Semaphore
in the Python standard library respectively.
The quantpylib.throttler.decorators
module provides decorators for instance methods that wraps threads and coroutines to provide seamless integration with the synchronization features.
quantpylib.throttler.aiohttp
features utility functions to batch asynchronous network requests using aiohttp
client sessions and optionally, semaphores.
A high-level walkthrough of the individual quant packages are presented in this page. Comprehensive documentation may be found in the respective pages. To follow along, make sure you have installed the necessary dependencies. Code example scripts are also provided in the repo.
Examples
Aiosonic Client
The module is an efficiency-optimized HTTP request library. The usage is extremely simple. Let's import some HTTP clients for comparison:
import asyncio, aiohttp, requests
from quantpylib.throttler.aiosonic import HTTPClient
from quantpylib.utilities.general import stopwatch
HTTPClient
. Let's set up some parameters for request:
url = "https://api.hyperliquid.xyz"
http_client = HTTPClient(base_url=url)
request = {
"endpoint":"/info",
"method":"POST",
"data":{"type":"meta"}
}
orjson
package,
as is deserialization of the returned data packet. Of course, all methods such as GET, POST, PUT and DELETE are supported.
Let's compare how this fares against other HTTP clients, using the timer from quantpylib.utilities.general
.
async def _aiohttp_request(session,url,payload):
async with session.post(url, json=payload, headers={"Content-Type": "application/json"}) as response:
return await response.json()
async def main():
print(await http_client.request(**request))
await timers()
async def timers():
async with aiohttp.ClientSession() as session:
for i in range(30):
print('>>>>>>>>>')
await stopwatch(units="ms")(http_client.request)(**request),
stopwatch(units="ms")(requests.post)(
url + request["endpoint"],
json=request["data"],
headers={"content-type": "application/json"}
).json()
await stopwatch(units="ms")(_aiohttp_request)(
session=session,
url=url + request["endpoint"],
payload=request["data"]
)
print('<<<<<<<<<')
if __name__ == "__main__":
asyncio.run(main())
...
>>>>>>>>>
request::69ms
post::225ms
_aiohttp_request::69ms
<<<<<<<<<
>>>>>>>>>
request::75ms
post::221ms
_aiohttp_request::87ms
<<<<<<<<<
>>>>>>>>>
request::67ms
post::230ms
_aiohttp_request::82ms
<<<<<<<<<
>>>>>>>>>
request::77ms
post::226ms
_aiohttp_request::67ms
<<<<<<<<<
...
requests.post
method is least performant. Both our requests
using the throttler
client and aiohttp
client are comparative due to cacheing and maintenance of
the handshake - but the throttler
library should edge out
small performance gains due to efficient (de)serialization of messages. This is important if we are working with
large packets. The network latency dominates in most cases, however. It is clear which library is easiest to use.
Rate Semaphore
The module is flexible enough to handle both fixed-cost endpoints (specs such as '20 API requests/min') or variable-cost endpoints (specs such as `5 API credits/getTick', '15 API credits/getOHLCV' with '100 credits/min per client').
Suppose we have a financial API endpoint that has rate limits as such:
Period | Credits Awarded / app |
---|---|
10 seconds | 40 |
The API gives us 40 credits to use every 10 seconds (capped at 40 credits max). Different endpoints can have variable credit costs, depending on the server load.
Suppose we have the following endpoints and their respective costs:
Endpoint | Cost (credits / req) |
---|---|
getTick | 20 |
getOHLCV | 5..30 |
getPrice | 12.5 |
... | ... |
Both synchronous and asynchronous semaphores expose their synchronization features through the semaphore.transact(...)
method, which have the same function signature, except
that the first parameter to a synchronous transaction is a parameter-less function, and the first parameter to the asynchronous transaction is a coroutine. Their detailed
documentations are defined here.
For the demonstration, we require the following imports:
import time
import random
import asyncio
import threading
from datetime import datetime
from quantpylib.throttler.decorators import wrap_in_thread, aconsume_credits
from quantpylib.throttler.rate_semaphore import RateSemaphore,AsyncRateSemaphore
Synchronous Example
Let's begin with the example for the synchronous semaphore, given by the RateSemaphore
class. Our objective is to maximise the throughput to a bunch of blocking requests that consume credits on some external server. To simulate blocking requests, such as a requests.get
method, we use the time.sleep
method.
Note that this generalizes to any blocking method.
def getTick(work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
def getOHLCV(work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
Since we want to maximise throughput, and the function calls block the main thread, we want to create the requests in multiple threads. But since sending all of these requests simultaneously in different threads would overload the market data server (giving us errors/downgraded/blacklisted), we would wrap them in rate-limited transactions and submit them to the semaphore.
Let's create some script to run some requests through the semaphore with the
transact
method :
def sync_example():
print("-----------------------sync with thread-----------------------")
sem = RateSemaphore(40)
tick_req = lambda x: getTick(random.randint(1, 5), x)
ohlcv_req = lambda x: getOHLCV(random.randint(1, 5), x)
threads = [
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: tick_req(1),
"credits":20,
"refund_time":10,
"transaction_id":1, #optional
"verbose":True #optional
}),
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: ohlcv_req(2),
"credits":30,
"refund_time":10,
"transaction_id":2, #optional
"verbose":True #optional
}),
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: ohlcv_req(3),
"credits":5,
"refund_time":10,
"transaction_id":3,
"verbose":True
}),
threading.Thread(target=sem.transact,kwargs={
"lambda_func":lambda: tick_req(4),
"credits":20,
"refund_time":10,
"transaction_id":4,
"verbose":True
}),
]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
if __name__ == "__main__":
sync_example()
-----------------------sync with thread-----------------------
2024-03-28 19:16:27.105953:: TXN 1 acquiring CreditSemaphore
2024-03-28 19:16:27.106051:: TXN 2 acquiring CreditSemaphore
2024-03-28 19:16:27.106083:: TXN 1 entered CreditSemaphore...
2024-03-28 19:16:27.106161:: TXN 3 acquiring CreditSemaphore
2024-03-28 19:16:27.106303:: TXN 4 acquiring CreditSemaphore
2024-03-28 19:16:27.106367:: TXN 3 entered CreditSemaphore...
2024-03-28 19:16:27.106426:: getOHLCV processing 3 takes 4 seconds
2024-03-28 19:16:27.106449:: getTick processing 1 takes 5 seconds
2024-03-28 19:16:31.106689:: getOHLCV processed 3
2024-03-28 19:16:31.106877:: TXN 3 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:32.107982:: getTick processed 1
2024-03-28 19:16:32.108390:: TXN 1 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:41.109162:: TXN 4 entered CreditSemaphore...
2024-03-28 19:16:41.109260:: getTick processing 4 takes 2 seconds
2024-03-28 19:16:43.110862:: getTick processed 4
2024-03-28 19:16:43.111243:: TXN 4 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:53.113011:: TXN 2 entered CreditSemaphore...
2024-03-28 19:16:53.113060:: getOHLCV processing 2 takes 3 seconds
2024-03-28 19:16:56.115643:: getOHLCV processed 2
2024-03-28 19:16:56.115815:: TXN 2 exits CreditSemaphore, schedule refund in 10...
Easy with Decorators
The example given is nice but somewhat unwiedly due to having to wrap the method/function of interest first in a semaphore
transaction, then followed by a threading.Thread
object. In many software applications, we have a data-service layer with a poller object/SDK for API calls made to
external servers. A simulated example looks something like this:
class _Throttler():
def __init__(self):
self.rate_semaphore=RateSemaphore(31)
@wrap_in_thread(costs=20,refund_in=10)#,attrname="rate_semaphore",verbose=True (optional-defaults)
def _getTick(self, work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
@wrap_in_thread(costs=10,refund_in=10,attrname="rate_semaphore",verbose=True)
def _getOHLCV(self, work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
time.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
RateSemaphore
objects corresponding to a unique credit/resource pool. For each
method that makes a resource-consuming request, we can decorate the function using wrap_in_thread
with the costs and refund timers as parameters.
The decorated function calls then directly return threading.Thread
instances wrapping transactions that contain the function request, which can be activated using
the threading.Thread.start
method.
def sync_example():
print("-----------------------sync with thread-----------------------")
#... previous example
print("-----------------------sync with decorator-----------------------")
throttler = _Throttler()
threads = [
throttler._getTick(3, 1),
throttler._getOHLCV(1, 2),
throttler._getTick(3, 3),
throttler._getOHLCV(1, 4),
]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
----------------------sync with decorator-----------------------
2024-03-28 19:16:56.116058:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116154:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116220:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:16:56.116302:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116555:: getTick processing 1 takes 3 seconds
2024-03-28 19:16:56.116366:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:16:56.116515:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:16:56.116691:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:16:57.120664:: getOHLCV processed 2
2024-03-28 19:16:57.120879:: TXN {'fn': '_getOHLCV', 'args': (1, 2), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:16:59.117150:: getTick processed 1
2024-03-28 19:16:59.117323:: TXN {'fn': '_getTick', 'args': (3, 1), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:07.123469:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:07.123532:: getOHLCV processing 4 takes 1 seconds
2024-03-28 19:17:08.125678:: getOHLCV processed 4
2024-03-28 19:17:08.126219:: TXN {'fn': '_getOHLCV', 'args': (1, 4), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:09.118384:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:09.118482:: getTick processing 3 takes 3 seconds
2024-03-28 19:17:12.119387:: getTick processed 3
2024-03-28 19:17:12.119741:: TXN {'fn': '_getTick', 'args': (3, 3), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
Asynchronous Example
We now move onto the asynchronous semaphore, given by the AsyncRateSemaphore
class. Our objective is to maximise the throughput to a bunch of non-blocking requests that consume credits on some external server. To simulate non-blocking requests, such as an
async with session.get
method of an aiohttp.ClientSession
object or asynchronous database requests, we use the
asyncio.sleep
method. Note that this generalizes to any non-blocking coroutine.
async def agetTick(work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
async def agetOHLCV(work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
Since we want to maximise throughput, and the function calls are coroutines that do not block the main thread, we want to create the requests concurrently and place them on the event loop. But since sending all of these requests simultaneously in different coroutines would overload the market data server (giving us errors/downgraded/blacklisted), we would wrap them in rate-limited transactions and submit them to the semaphore. Let's create some script to run some request transactions through the semaphore:
async def async_example():
print("-----------------------async with transactions-----------------------")
sem = AsyncRateSemaphore(40, greedy_entry=True, greedy_exit=True)
tick_req = lambda x: agetTick(random.randint(1, 5), x)
ohlcv_req = lambda x: agetOHLCV(random.randint(1, 5), x)
transactions = [
sem.transact(coroutine=tick_req(1), credits=20, refund_time=10, transaction_id=1, verbose=True),
sem.transact(coroutine=ohlcv_req(2), credits=30, refund_time=10, transaction_id=2, verbose=True),
sem.transact(coroutine=ohlcv_req(3), credits=5, refund_time=10, transaction_id=3, verbose=True),
sem.transact(coroutine=tick_req(4), credits=20, refund_time=10, transaction_id=4, verbose=True),
]
await asyncio.gather(*transactions)
if __name__ == "__main__":
sync_example() #previous example
asyncio.run(async_example())
-----------------------async with transactions-----------------------
2024-03-28 19:17:12.120864:: TXN 1 acquiring CreditSemaphore
2024-03-28 19:17:12.120915:: TXN 1 entered CreditSemaphore...
2024-03-28 19:17:12.120927:: getTick processing 1 takes 4 seconds
2024-03-28 19:17:12.120964:: TXN 2 acquiring CreditSemaphore
2024-03-28 19:17:12.120988:: TXN 3 acquiring CreditSemaphore
2024-03-28 19:17:12.121006:: TXN 3 entered CreditSemaphore...
2024-03-28 19:17:12.121016:: getOHLCV processing 3 takes 5 seconds
2024-03-28 19:17:12.121044:: TXN 4 acquiring CreditSemaphore
2024-03-28 19:17:16.121665:: getTick processed 1
2024-03-28 19:17:16.121740:: TXN 1 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:17.122353:: getOHLCV processed 3
2024-03-28 19:17:17.122462:: TXN 3 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:26.123119:: TXN 2 entered CreditSemaphore...
2024-03-28 19:17:26.123192:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:17:27.123519:: getOHLCV processed 2
2024-03-28 19:17:27.123633:: TXN 2 exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:37.124999:: TXN 4 entered CreditSemaphore...
2024-03-28 19:17:37.125077:: getTick processing 4 takes 3 seconds
2024-03-28 19:17:40.126377:: getTick processed 4
2024-03-28 19:17:40.126493:: TXN 4 exits CreditSemaphore, schedule refund in 10...
Easy with Decorators
The example given is nice but somewhat unwiedly due to having to wrap the method/function of interest in semaphore transactions. We would like to hide the throttling intricacies at the caller level, such that a user of said data-access layer or SDKs do not need to be aware of the presence of a semaphore. For instance:
class _Throttler():
def __init__(self):
#... previous example
self.arate_semaphore=AsyncRateSemaphore(31)
#... previous example
@aconsume_credits(costs=20,refund_in=10,attrname="arate_semaphore") #we want the asynchronous semaphore, and since the default name is not rate_semaphore, we pass in attrname
async def _agetTick(self, work, id):
print(f"{datetime.now()}:: getTick processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getTick processed {id}")
return True
@aconsume_credits(costs=10,refund_in=10,attrname="arate_semaphore",verbose=True)
async def _agetOHLCV(self, work, id):
print(f"{datetime.now()}:: getOHLCV processing {id} takes {work} seconds")
await asyncio.sleep(work)
print(f"{datetime.now()}:: getOHLCV processed {id}")
return True
AsyncRateSemaphore
objects corresponding to a unique credit/resource pool. For each
method that makes a resource-consuming request, we can decorate the function using aconsume_credits
with the costs and refund timers as parameters.
The decorated function calls then are submitted through the object attribute's asynchronous semaphore instance.
async def async_example():
print("-----------------------async with transactions-----------------------")
#... previous example
print("-----------------------async with decorator-----------------------")
throttler = _Throttler()
transactions = [
throttler._agetTick(3, 1),
throttler._agetOHLCV(1, 2),
throttler._agetTick(3, 3),
throttler._agetOHLCV(1, 4),
]
await asyncio.gather(*transactions)
-----------------------async with decorator-----------------------
2024-03-28 19:17:40.126936:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127015:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:40.127044:: getTick processing 1 takes 3 seconds
2024-03-28 19:17:40.127110:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127149:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:40.127170:: getOHLCV processing 2 takes 1 seconds
2024-03-28 19:17:40.127217:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:40.127266:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} acquiring CreditSemaphore
2024-03-28 19:17:41.127912:: getOHLCV processed 2
2024-03-28 19:17:41.128021:: TXN {'fn': '_agetOHLCV', 'args': (1, 2), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:43.127485:: getTick processed 1
2024-03-28 19:17:43.127539:: TXN {'fn': '_agetTick', 'args': (3, 1), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:51.129270:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:51.129314:: getOHLCV processing 4 takes 1 seconds
2024-03-28 19:17:52.130482:: getOHLCV processed 4
2024-03-28 19:17:52.130534:: TXN {'fn': '_agetOHLCV', 'args': (1, 4), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
2024-03-28 19:17:53.128193:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} entered CreditSemaphore...
2024-03-28 19:17:53.128281:: getTick processing 3 takes 3 seconds
2024-03-28 19:17:56.129573:: getTick processed 3
2024-03-28 19:17:56.129683:: TXN {'fn': '_agetTick', 'args': (3, 3), 'kwargs': {}} exits CreditSemaphore, schedule refund in 10...
Notes on Behavior
-
Entry Behavior
AsyncRateSemaphore
acquires the semaphore if there are enough resources, but ifgreedy_entry=False
, then the submitted transaction will wait behind the earlier pending transactions regardless of the resource pool availability. Otherwise, any submitted transaction that can run immediately will run without consideration for existing waiters.RateSemaphore
acquires the semaphore immediately if there are enough resources, and otherwise waits. The order in which blocked threads are awakened should not be relied on and is OS-scheduler dependent.
-
Exit Behavior
Requests submitted to both semaphore types exit from their respectivetransact
method without waiting for the credits to be refunded. The credits are scheduled to be refunded separately on a thread (for synchronous implementations) or the event loop (for asynchronous implementations). When the credit is refunded-
AsyncRateSemaphore
wakes up pending transactions that are able to execute on the state of the resource pool. Ifgreedy_exit=False
, then the number of pending transactions woken up will respect the FIFO order until the resource pool is insufficient for the earliest transaction. Otherwise, when credits are refunded with >=2 waiting transactions with arrival timetxn A
<txn B
. If the semaphore has not enough credits to executetxn A
, it can first runtxn B
. This helps to maximise throughput. -
RateSemaphore
wakes up pending transactions that are able to execute on the state of the resource pool. The order in which blocked threads are awakened should not be relied on and is OS-scheduler dependent.
-
-
Exception Behavior
Failed transactions (raised Exceptions) consume and refund credit in the same way as successful transactions.
Best Practices
-
Wrap unstable networks and expensive requests in a timeout transaction. This is to prevent the coroutine from 'await'-ing forever and hogging the semaphore.
-
Since the transactions wrap native Python functions and coroutines, it does not know when the code actually performs the credit-costing request. The
transact
functions should be closest to the costful logic as possible. It should not perform heavy compute or multiple requests so that the credits can refunded as quickly as possible for other transactions.