Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Dec 25, 2024
1 parent bb35818 commit 61f47c1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 38 deletions.
5 changes: 5 additions & 0 deletions api/configs/feature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ class DataSetConfig(BaseSettings):
default=30,
)

PLAN_SANDBOX_CLEAN_MESSAGE_BATCH_SIZE: PositiveInt = Field(
description="Batch size for message cleanup operations",
default=200,
)


class WorkspaceConfig(BaseSettings):
"""
Expand Down
90 changes: 52 additions & 38 deletions api/schedule/clean_messages.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import logging
import time

import click
Expand All @@ -25,58 +26,71 @@
def clean_messages():
click.echo(click.style("Start clean messages.", fg="green"))
start_at = time.perf_counter()
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
upper_message_time = datetime.datetime.now() - datetime.timedelta(
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
)
page = 1
deleted_messages_count = 0
while True:
try:
# Main query with join and filter
# FIXME:for mypy no paginate method error
messages = (
db.session.query(Message) # type: ignore
.filter(Message.created_at < plan_sandbox_clean_message_day)
.order_by(Message.created_at.desc())
.limit(100)
.filter(Message.created_at < upper_message_time)
.order_by(Message.created_at.asc())
.limit(dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_BATCH_SIZE)
.all()
)

except NotFound:
break
if not messages:
break

for message in messages:
plan_sandbox_clean_message_day = message.created_at
app = App.query.filter_by(id=message.app_id).first()
features_cache_key = f"features:{app.tenant_id}"
plan_cache = redis_client.get(features_cache_key)
if plan_cache is None:
features = FeatureService.get_features(app.tenant_id)
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
plan = features.billing.subscription.plan
else:
plan = plan_cache.decode()
plan = determine_plan(message)
if plan == "sandbox":
# clean related message
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(Message).filter(Message.id == message.id).delete()
db.session.commit()
is_delete_success = delete_single_message(message.id)
if is_delete_success:
deleted_messages_count = deleted_messages_count + 1
end_at = time.perf_counter()
click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))
click.echo(
click.style(
f"Cleaned outdated messages from db success latency: {end_at - start_at},"
f" deleted messages count: {deleted_messages_count}",
fg="green",
)
)


def determine_plan(message) -> str:
app = App.query.filter_by(id=message.app_id).first()
features_cache_key = f"features:{app.tenant_id}"
plan_cache = redis_client.get(features_cache_key)
if plan_cache is None:
features = FeatureService.get_features(app.tenant_id)
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
return features.billing.subscription.plan
else:
return plan_cache.decode()


def delete_single_message(message_id: str):
try:
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete(
synchronize_session=False
)
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete(
synchronize_session=False
)
db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete(synchronize_session=False)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete(
synchronize_session=False
)
db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False)
db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete(synchronize_session=False)
db.session.query(Message).filter(Message.id == message_id).delete()
db.session.commit()
return True
except Exception:
logging.exception(f"Failed to delete message {message_id}")
return False

0 comments on commit 61f47c1

Please sign in to comment.