One aync lock call or multiple async lock calls, what is better and safer in Python?

4 days ago 14
ARTICLE AD BOX

I trying to make ratelimiter for users in aiogram bot with middlware, now I created class for requests and class for ratelimiter, where reuqest are. My problem is what is better for performance and safety, use multiple async lock calls for each operation or make these operations under one async lock.

my rate_limiter.py:

import asyncio import time from collections import defaultdict from dataclasses import dataclass class RequestsData: def __init__( self, seconds_interval: int = 3, max_requests_per_day: int = 5, ): self.seconds_interval: int = seconds_interval self.max_requests_per_day: int = max_requests_per_day self.day_len_seconds: int = 60 * 60 * 24 self.last_time: float | None = None self.just_limit_exited: bool = True self.total_requests: int = 0 self.total_requests_start_time: float | None = None self._async_lock = asyncio.Lock() @classmethod def get_now_seconds(self) -> float: return time.time() async def reset_day_limit(self) -> bool: async with self._async_lock: # no requests start time if self.total_requests_start_time is None: return True current_time_seconds = self.get_now_seconds() # check and reset total requests start time as current time seconds if current_time_seconds - self.total_requests_start_time > self.day_len_seconds: self.total_requests_start_time = current_time_seconds self.total_requests = 0 return True return False async def exited_day_limit(self) -> tuple[bool, bool]: async with self._async_lock: if self.total_requests >= self.max_requests_per_day-1: # set and reuturn limit exited if self.just_limit_exited: self.just_limit_exited = False # limit exited, not blocked return (True, False) return (False, False) # return no exited, can make requests return (False, True) async def rate_limited(self) -> tuple[bool, bool]: async with self._async_lock: current_time_seconds = self.get_now_seconds() # check times diff less than interval if self.last_time is not None and current_time_seconds - self.last_time < self.seconds_interval: # set and return limit exited if self.just_limit_exited: self.just_limit_exited = False # limit exited and blocked return (True, False) # limit not exited and blocked return (False, False) # limit not exited and not blocked return (False, True) async def add_reuqest(self): async with self._async_lock: current_time_seconds = self.get_now_seconds() self.last_time = current_time_seconds self.just_limit_exited = True self.total_requests += 1 class RateLimiter: def __init__(self, seconds_interval: int=1, max_requests_per_day: int=5, ): self.requsts = defaultdict[int, RequestsData]( lambda: RequestsData(seconds_interval, max_requests_per_day) ) async def on_request(self, uid: int) -> tuple[bool, bool]: # get requests data by uid uid_requests: RequestsData = self.requsts[uid] # check and reset day limit start await uid_requests.reset_day_limit() # check exited day limit just_exited, can_make_requests = await uid_requests.exited_day_limit() if not can_make_requests: return (just_exited, can_make_requests) # check rate limited by last time just_exited, can_make_requests = await uid_requests.rate_limited() if not can_make_requests: return (just_exited, can_make_requests) # add reuqest with last time as now await uid_requests.add_reuqest() # can make requests, not blocked return (False, True)

I use this rate limiter in middleware:

class MainMiddleware(BaseMiddleware): def __init__(self): super().__init__() # set rate limiter self.rate_limiter = RateLimiter() async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: Message | CallbackQuery, data: Dict[str, Any] ): # check chat type is private if isinstance(event, Message): chat_type = event.chat.type else: chat_type = event.message.chat.type if chat_type != ChatType.PRIVATE: logger.info("chat is not private %s", event) return user_id = event.from_user.id lang = event.from_user.language_code # check with rete limiter just_limit_exited, can_make_request = await self.rate_limiter.on_request( user_id ) # notify can't make requests if just_limit_exited: text = get_text(texts.rate_limit_exited_text, lang) await event.answer(text) # cant make requests if not can_make_request: return None
Read Entire Article