Skip to content

Commit

Permalink
chore(models-http-api): rate limit with a maximum of 60 retries (#3499)
Browse files Browse the repository at this point in the history
* chore(models-http-api): rate limit with a maximum of 60 retries

Signed-off-by: Wei Zhang <[email protected]>

* chore: update rate limit exceeded log

Signed-off-by: Wei Zhang <[email protected]>

---------

Signed-off-by: Wei Zhang <[email protected]>
  • Loading branch information
zwpaper authored Dec 2, 2024
1 parent 08e6262 commit f6e11ed
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions crates/http-api-bindings/src/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use async_trait::async_trait;
use futures::stream::BoxStream;
use ratelimit::Ratelimiter;
use tabby_inference::{ChatCompletionStream, CompletionOptions, CompletionStream, Embedding};
use tracing::warn;

fn new_rate_limiter(rpm: u64) -> Ratelimiter {
Ratelimiter::builder(rpm/60, Duration::from_secs(1))
Expand All @@ -34,7 +35,7 @@ pub fn new_embedding(embedding: Box<dyn Embedding>, request_per_minute: u64) ->
#[async_trait]
impl Embedding for RateLimitedEmbedding {
async fn embed(&self, prompt: &str) -> anyhow::Result<Vec<f32>> {
for _ in 0..5 {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
Expand All @@ -43,7 +44,7 @@ impl Embedding for RateLimitedEmbedding {
return self.embedding.embed(prompt).await;
}

anyhow::bail!("Rate limit exceeded for embedding computation");
anyhow::bail!("Failed to acquire request quota for embedding");
}
}

Expand All @@ -65,7 +66,7 @@ pub fn new_completion(
#[async_trait]
impl CompletionStream for RateLimitedCompletion {
async fn generate(&self, prompt: &str, options: CompletionOptions) -> BoxStream<String> {
for _ in 0..5 {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
Expand All @@ -74,7 +75,7 @@ impl CompletionStream for RateLimitedCompletion {
return self.completion.generate(prompt, options).await;
}

// Return an empty stream if the rate limit is exceeded
warn!("Failed to acquire request quota for completion");
Box::pin(futures::stream::empty())
}
}
Expand All @@ -100,7 +101,7 @@ impl ChatCompletionStream for RateLimitedChatStream {
&self,
request: CreateChatCompletionRequest,
) -> Result<CreateChatCompletionResponse, OpenAIError> {
for _ in 0..5 {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
Expand All @@ -110,7 +111,7 @@ impl ChatCompletionStream for RateLimitedChatStream {
}

Err(OpenAIError::ApiError(ApiError {
message: "Rate limit exceeded for chat completion".to_owned(),
message: "Failed to acquire request quota for chat".to_owned(),
r#type: None,
param: None,
code: None,
Expand All @@ -121,7 +122,7 @@ impl ChatCompletionStream for RateLimitedChatStream {
&self,
request: CreateChatCompletionRequest,
) -> Result<ChatCompletionResponseStream, OpenAIError> {
for _ in 0..5 {
for _ in 0..60 {
if let Err(sleep) = self.rate_limiter.try_wait() {
tokio::time::sleep(sleep).await;
continue;
Expand All @@ -131,7 +132,7 @@ impl ChatCompletionStream for RateLimitedChatStream {
}

Err(OpenAIError::ApiError(ApiError {
message: "Rate limit exceeded for chat completion".to_owned(),
message: "Failed to acquire request quota for chat stream".to_owned(),
r#type: None,
param: None,
code: None,
Expand Down

0 comments on commit f6e11ed

Please sign in to comment.