Skip to content

Commit

Permalink
Only log size of deferred job and Support timestamp in get_deferred_j…
Browse files Browse the repository at this point in the history
…obs (#36)

* Only log size of deferred job

* Support timestamp in get_deferred_jobs
  • Loading branch information
Wh1isper authored Sep 4, 2024
1 parent 7c78351 commit 966c85f
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions brq/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ async def enque_deferred_job(self, function_name: str, maxlen: int = 1000):
redis.call('ZREM', zset_key, element)
redis.call('XADD', stream_key, 'MAXLEN', maxlen, '*', 'payload', element)
end
return elements
return #elements
"""
defer_key = self.get_deferred_key(function_name)
stream_name = self.get_stream_name(function_name)
Expand All @@ -49,7 +48,7 @@ async def enque_deferred_job(self, function_name: str, maxlen: int = 1000):
maxlen,
)
if elements:
logger.debug(f"Enqueued deferred jobs: {elements}")
logger.debug(f"Enqueued {elements} deferred jobs")

async def _remove_deferred_job(
self,
Expand Down Expand Up @@ -98,12 +97,19 @@ async def get_defer_timestamp_ms(

return defer_until * 1000

async def get_deferred_jobs(self, function_name: str) -> dict[datetime:Job]:
async def get_deferred_jobs(
self,
function_name: str,
start_timestamp: str | int = "-inf",
end_timestamp: str | int = "+inf",
) -> dict[datetime:Job]:
"""
Get all deferred jobs
Args:
function_name (str): function name
start_timestamp (str | int, optional): start timestamp in millisecond. Defaults to "-inf".
end_timestamp (str | int, optional): end timestamp in millisecond. Defaults to "+inf".
Returns:
dict[datetime:Job]: deferred jobs,
Expand All @@ -114,7 +120,7 @@ async def get_deferred_jobs(self, function_name: str) -> dict[datetime:Job]:
return {
datetime.fromtimestamp(float(element[1]) / 1000): Job.from_redis(element[0])
for element in await self.redis.zrangebyscore(
defer_key, "-inf", "+inf", withscores=True
defer_key, start_timestamp, end_timestamp, withscores=True
)
}

Expand Down

0 comments on commit 966c85f

Please sign in to comment.