diff --git a/api/pyproject.toml b/api/pyproject.toml index f5597c729ce4c9..eddeeb0cddfca6 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -74,7 +74,6 @@ exclude = [ "controllers/**/*.py", "models/**/*.py", "migrations/**/*", - "tasks/**/*.py", ] [tool.pytest_env] diff --git a/api/tasks/add_document_to_index_task.py b/api/tasks/add_document_to_index_task.py index e0a1b219095b92..b50876cc794c55 100644 --- a/api/tasks/add_document_to_index_task.py +++ b/api/tasks/add_document_to_index_task.py @@ -14,7 +14,7 @@ from models.dataset import DocumentSegment -@shared_task(queue='dataset') +@shared_task(queue="dataset") def add_document_to_index_task(dataset_document_id: str): """ Async Add document to index @@ -22,24 +22,25 @@ def add_document_to_index_task(dataset_document_id: str): Usage: add_document_to_index.delay(document_id) """ - logging.info(click.style('Start add document to index: {}'.format(dataset_document_id), fg='green')) + logging.info(click.style("Start add document to index: {}".format(dataset_document_id), fg="green")) start_at = time.perf_counter() dataset_document = db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document_id).first() if not dataset_document: - raise NotFound('Document not found') + raise NotFound("Document not found") - if dataset_document.indexing_status != 'completed': + if dataset_document.indexing_status != "completed": return - indexing_cache_key = 'document_{}_indexing'.format(dataset_document.id) + indexing_cache_key = "document_{}_indexing".format(dataset_document.id) try: - segments = db.session.query(DocumentSegment).filter( - DocumentSegment.document_id == dataset_document.id, - DocumentSegment.enabled == True - ) \ - .order_by(DocumentSegment.position.asc()).all() + segments = ( + db.session.query(DocumentSegment) + .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True) + .order_by(DocumentSegment.position.asc()) + .all() + ) documents = [] for segment in segments: @@ -50,7 +51,7 @@ def add_document_to_index_task(dataset_document_id: str): "doc_hash": segment.index_node_hash, "document_id": segment.document_id, "dataset_id": segment.dataset_id, - } + }, ) documents.append(document) @@ -58,7 +59,7 @@ def add_document_to_index_task(dataset_document_id: str): dataset = dataset_document.dataset if not dataset: - raise Exception('Document has no dataset') + raise Exception("Document has no dataset") index_type = dataset.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() @@ -66,12 +67,15 @@ def add_document_to_index_task(dataset_document_id: str): end_at = time.perf_counter() logging.info( - click.style('Document added to index: {} latency: {}'.format(dataset_document.id, end_at - start_at), fg='green')) + click.style( + "Document added to index: {} latency: {}".format(dataset_document.id, end_at - start_at), fg="green" + ) + ) except Exception as e: logging.exception("add document to index failed") dataset_document.enabled = False dataset_document.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - dataset_document.status = 'error' + dataset_document.status = "error" dataset_document.error = str(e) db.session.commit() finally: diff --git a/api/tasks/annotation/add_annotation_to_index_task.py b/api/tasks/annotation/add_annotation_to_index_task.py index b3aa8b596c8a3c..25c55bcfafe11c 100644 --- a/api/tasks/annotation/add_annotation_to_index_task.py +++ b/api/tasks/annotation/add_annotation_to_index_task.py @@ -10,9 +10,10 @@ from services.dataset_service import DatasetCollectionBindingService -@shared_task(queue='dataset') -def add_annotation_to_index_task(annotation_id: str, question: str, tenant_id: str, app_id: str, - collection_binding_id: str): +@shared_task(queue="dataset") +def add_annotation_to_index_task( + annotation_id: str, question: str, tenant_id: str, app_id: str, collection_binding_id: str +): """ Add annotation to index. :param annotation_id: annotation id @@ -23,38 +24,34 @@ def add_annotation_to_index_task(annotation_id: str, question: str, tenant_id: s Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) """ - logging.info(click.style('Start build index for annotation: {}'.format(annotation_id), fg='green')) + logging.info(click.style("Start build index for annotation: {}".format(annotation_id), fg="green")) start_at = time.perf_counter() try: dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( - collection_binding_id, - 'annotation' + collection_binding_id, "annotation" ) dataset = Dataset( id=app_id, tenant_id=tenant_id, - indexing_technique='high_quality', + indexing_technique="high_quality", embedding_model_provider=dataset_collection_binding.provider_name, embedding_model=dataset_collection_binding.model_name, - collection_binding_id=dataset_collection_binding.id + collection_binding_id=dataset_collection_binding.id, ) document = Document( - page_content=question, - metadata={ - "annotation_id": annotation_id, - "app_id": app_id, - "doc_id": annotation_id - } + page_content=question, metadata={"annotation_id": annotation_id, "app_id": app_id, "doc_id": annotation_id} ) - vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) + vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) vector.create([document], duplicate_check=True) end_at = time.perf_counter() logging.info( click.style( - 'Build index successful for annotation: {} latency: {}'.format(annotation_id, end_at - start_at), - fg='green')) + "Build index successful for annotation: {} latency: {}".format(annotation_id, end_at - start_at), + fg="green", + ) + ) except Exception: logging.exception("Build index for annotation failed") diff --git a/api/tasks/annotation/batch_import_annotations_task.py b/api/tasks/annotation/batch_import_annotations_task.py index 6e6b16045dad9b..fa7e5ac9190f3c 100644 --- a/api/tasks/annotation/batch_import_annotations_task.py +++ b/api/tasks/annotation/batch_import_annotations_task.py @@ -14,9 +14,8 @@ from services.dataset_service import DatasetCollectionBindingService -@shared_task(queue='dataset') -def batch_import_annotations_task(job_id: str, content_list: list[dict], app_id: str, tenant_id: str, - user_id: str): +@shared_task(queue="dataset") +def batch_import_annotations_task(job_id: str, content_list: list[dict], app_id: str, tenant_id: str, user_id: str): """ Add annotation to index. :param job_id: job_id @@ -26,72 +25,66 @@ def batch_import_annotations_task(job_id: str, content_list: list[dict], app_id: :param user_id: user_id """ - logging.info(click.style('Start batch import annotation: {}'.format(job_id), fg='green')) + logging.info(click.style("Start batch import annotation: {}".format(job_id), fg="green")) start_at = time.perf_counter() - indexing_cache_key = 'app_annotation_batch_import_{}'.format(str(job_id)) + indexing_cache_key = "app_annotation_batch_import_{}".format(str(job_id)) # get app info - app = db.session.query(App).filter( - App.id == app_id, - App.tenant_id == tenant_id, - App.status == 'normal' - ).first() + app = db.session.query(App).filter(App.id == app_id, App.tenant_id == tenant_id, App.status == "normal").first() if app: try: documents = [] for content in content_list: annotation = MessageAnnotation( - app_id=app.id, - content=content['answer'], - question=content['question'], - account_id=user_id + app_id=app.id, content=content["answer"], question=content["question"], account_id=user_id ) db.session.add(annotation) db.session.flush() document = Document( - page_content=content['question'], - metadata={ - "annotation_id": annotation.id, - "app_id": app_id, - "doc_id": annotation.id - } + page_content=content["question"], + metadata={"annotation_id": annotation.id, "app_id": app_id, "doc_id": annotation.id}, ) documents.append(document) # if annotation reply is enabled , batch add annotations' index - app_annotation_setting = db.session.query(AppAnnotationSetting).filter( - AppAnnotationSetting.app_id == app_id - ).first() + app_annotation_setting = ( + db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).first() + ) if app_annotation_setting: - dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( - app_annotation_setting.collection_binding_id, - 'annotation' + dataset_collection_binding = ( + DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( + app_annotation_setting.collection_binding_id, "annotation" + ) ) if not dataset_collection_binding: raise NotFound("App annotation setting not found") dataset = Dataset( id=app_id, tenant_id=tenant_id, - indexing_technique='high_quality', + indexing_technique="high_quality", embedding_model_provider=dataset_collection_binding.provider_name, embedding_model=dataset_collection_binding.model_name, - collection_binding_id=dataset_collection_binding.id + collection_binding_id=dataset_collection_binding.id, ) - vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) + vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) vector.create(documents, duplicate_check=True) db.session.commit() - redis_client.setex(indexing_cache_key, 600, 'completed') + redis_client.setex(indexing_cache_key, 600, "completed") end_at = time.perf_counter() logging.info( click.style( - 'Build index successful for batch import annotation: {} latency: {}'.format(job_id, end_at - start_at), - fg='green')) + "Build index successful for batch import annotation: {} latency: {}".format( + job_id, end_at - start_at + ), + fg="green", + ) + ) except Exception as e: db.session.rollback() - redis_client.setex(indexing_cache_key, 600, 'error') - indexing_error_msg_key = 'app_annotation_batch_import_error_msg_{}'.format(str(job_id)) + redis_client.setex(indexing_cache_key, 600, "error") + indexing_error_msg_key = "app_annotation_batch_import_error_msg_{}".format(str(job_id)) redis_client.setex(indexing_error_msg_key, 600, str(e)) logging.exception("Build index for batch import annotations failed") diff --git a/api/tasks/annotation/delete_annotation_index_task.py b/api/tasks/annotation/delete_annotation_index_task.py index 81155a35e42b0e..5758db53de820b 100644 --- a/api/tasks/annotation/delete_annotation_index_task.py +++ b/api/tasks/annotation/delete_annotation_index_task.py @@ -9,36 +9,33 @@ from services.dataset_service import DatasetCollectionBindingService -@shared_task(queue='dataset') -def delete_annotation_index_task(annotation_id: str, app_id: str, tenant_id: str, - collection_binding_id: str): +@shared_task(queue="dataset") +def delete_annotation_index_task(annotation_id: str, app_id: str, tenant_id: str, collection_binding_id: str): """ Async delete annotation index task """ - logging.info(click.style('Start delete app annotation index: {}'.format(app_id), fg='green')) + logging.info(click.style("Start delete app annotation index: {}".format(app_id), fg="green")) start_at = time.perf_counter() try: dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( - collection_binding_id, - 'annotation' + collection_binding_id, "annotation" ) dataset = Dataset( id=app_id, tenant_id=tenant_id, - indexing_technique='high_quality', - collection_binding_id=dataset_collection_binding.id + indexing_technique="high_quality", + collection_binding_id=dataset_collection_binding.id, ) try: - vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) - vector.delete_by_metadata_field('annotation_id', annotation_id) + vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) + vector.delete_by_metadata_field("annotation_id", annotation_id) except Exception: logging.exception("Delete annotation index failed when annotation deleted.") end_at = time.perf_counter() logging.info( - click.style('App annotations index deleted : {} latency: {}'.format(app_id, end_at - start_at), - fg='green')) + click.style("App annotations index deleted : {} latency: {}".format(app_id, end_at - start_at), fg="green") + ) except Exception as e: logging.exception("Annotation deleted index failed:{}".format(str(e))) - diff --git a/api/tasks/annotation/disable_annotation_reply_task.py b/api/tasks/annotation/disable_annotation_reply_task.py index e5e03c9b51758b..0f83dfdbd4a72f 100644 --- a/api/tasks/annotation/disable_annotation_reply_task.py +++ b/api/tasks/annotation/disable_annotation_reply_task.py @@ -12,49 +12,44 @@ from models.model import App, AppAnnotationSetting, MessageAnnotation -@shared_task(queue='dataset') +@shared_task(queue="dataset") def disable_annotation_reply_task(job_id: str, app_id: str, tenant_id: str): """ Async enable annotation reply task """ - logging.info(click.style('Start delete app annotations index: {}'.format(app_id), fg='green')) + logging.info(click.style("Start delete app annotations index: {}".format(app_id), fg="green")) start_at = time.perf_counter() # get app info - app = db.session.query(App).filter( - App.id == app_id, - App.tenant_id == tenant_id, - App.status == 'normal' - ).first() + app = db.session.query(App).filter(App.id == app_id, App.tenant_id == tenant_id, App.status == "normal").first() annotations_count = db.session.query(MessageAnnotation).filter(MessageAnnotation.app_id == app_id).count() if not app: raise NotFound("App not found") - app_annotation_setting = db.session.query(AppAnnotationSetting).filter( - AppAnnotationSetting.app_id == app_id - ).first() + app_annotation_setting = ( + db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).first() + ) if not app_annotation_setting: raise NotFound("App annotation setting not found") - disable_app_annotation_key = 'disable_app_annotation_{}'.format(str(app_id)) - disable_app_annotation_job_key = 'disable_app_annotation_job_{}'.format(str(job_id)) + disable_app_annotation_key = "disable_app_annotation_{}".format(str(app_id)) + disable_app_annotation_job_key = "disable_app_annotation_job_{}".format(str(job_id)) try: - dataset = Dataset( id=app_id, tenant_id=tenant_id, - indexing_technique='high_quality', - collection_binding_id=app_annotation_setting.collection_binding_id + indexing_technique="high_quality", + collection_binding_id=app_annotation_setting.collection_binding_id, ) try: if annotations_count > 0: - vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) - vector.delete_by_metadata_field('app_id', app_id) + vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) + vector.delete_by_metadata_field("app_id", app_id) except Exception: logging.exception("Delete annotation index failed when annotation deleted.") - redis_client.setex(disable_app_annotation_job_key, 600, 'completed') + redis_client.setex(disable_app_annotation_job_key, 600, "completed") # delete annotation setting db.session.delete(app_annotation_setting) @@ -62,12 +57,12 @@ def disable_annotation_reply_task(job_id: str, app_id: str, tenant_id: str): end_at = time.perf_counter() logging.info( - click.style('App annotations index deleted : {} latency: {}'.format(app_id, end_at - start_at), - fg='green')) + click.style("App annotations index deleted : {} latency: {}".format(app_id, end_at - start_at), fg="green") + ) except Exception as e: logging.exception("Annotation batch deleted index failed:{}".format(str(e))) - redis_client.setex(disable_app_annotation_job_key, 600, 'error') - disable_app_annotation_error_key = 'disable_app_annotation_error_{}'.format(str(job_id)) + redis_client.setex(disable_app_annotation_job_key, 600, "error") + disable_app_annotation_error_key = "disable_app_annotation_error_{}".format(str(job_id)) redis_client.setex(disable_app_annotation_error_key, 600, str(e)) finally: redis_client.delete(disable_app_annotation_key) diff --git a/api/tasks/annotation/enable_annotation_reply_task.py b/api/tasks/annotation/enable_annotation_reply_task.py index fda8b7a250f128..82b70f6b71eddf 100644 --- a/api/tasks/annotation/enable_annotation_reply_task.py +++ b/api/tasks/annotation/enable_annotation_reply_task.py @@ -15,37 +15,39 @@ from services.dataset_service import DatasetCollectionBindingService -@shared_task(queue='dataset') -def enable_annotation_reply_task(job_id: str, app_id: str, user_id: str, tenant_id: str, score_threshold: float, - embedding_provider_name: str, embedding_model_name: str): +@shared_task(queue="dataset") +def enable_annotation_reply_task( + job_id: str, + app_id: str, + user_id: str, + tenant_id: str, + score_threshold: float, + embedding_provider_name: str, + embedding_model_name: str, +): """ Async enable annotation reply task """ - logging.info(click.style('Start add app annotation to index: {}'.format(app_id), fg='green')) + logging.info(click.style("Start add app annotation to index: {}".format(app_id), fg="green")) start_at = time.perf_counter() # get app info - app = db.session.query(App).filter( - App.id == app_id, - App.tenant_id == tenant_id, - App.status == 'normal' - ).first() + app = db.session.query(App).filter(App.id == app_id, App.tenant_id == tenant_id, App.status == "normal").first() if not app: raise NotFound("App not found") annotations = db.session.query(MessageAnnotation).filter(MessageAnnotation.app_id == app_id).all() - enable_app_annotation_key = 'enable_app_annotation_{}'.format(str(app_id)) - enable_app_annotation_job_key = 'enable_app_annotation_job_{}'.format(str(job_id)) + enable_app_annotation_key = "enable_app_annotation_{}".format(str(app_id)) + enable_app_annotation_job_key = "enable_app_annotation_job_{}".format(str(job_id)) try: documents = [] dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding( - embedding_provider_name, - embedding_model_name, - 'annotation' + embedding_provider_name, embedding_model_name, "annotation" + ) + annotation_setting = ( + db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app_id).first() ) - annotation_setting = db.session.query(AppAnnotationSetting).filter( - AppAnnotationSetting.app_id == app_id).first() if annotation_setting: annotation_setting.score_threshold = score_threshold annotation_setting.collection_binding_id = dataset_collection_binding.id @@ -58,48 +60,42 @@ def enable_annotation_reply_task(job_id: str, app_id: str, user_id: str, tenant_ score_threshold=score_threshold, collection_binding_id=dataset_collection_binding.id, created_user_id=user_id, - updated_user_id=user_id + updated_user_id=user_id, ) db.session.add(new_app_annotation_setting) dataset = Dataset( id=app_id, tenant_id=tenant_id, - indexing_technique='high_quality', + indexing_technique="high_quality", embedding_model_provider=embedding_provider_name, embedding_model=embedding_model_name, - collection_binding_id=dataset_collection_binding.id + collection_binding_id=dataset_collection_binding.id, ) if annotations: for annotation in annotations: document = Document( page_content=annotation.question, - metadata={ - "annotation_id": annotation.id, - "app_id": app_id, - "doc_id": annotation.id - } + metadata={"annotation_id": annotation.id, "app_id": app_id, "doc_id": annotation.id}, ) documents.append(document) - vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) + vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) try: - vector.delete_by_metadata_field('app_id', app_id) + vector.delete_by_metadata_field("app_id", app_id) except Exception as e: - logging.info( - click.style('Delete annotation index error: {}'.format(str(e)), - fg='red')) + logging.info(click.style("Delete annotation index error: {}".format(str(e)), fg="red")) vector.create(documents) db.session.commit() - redis_client.setex(enable_app_annotation_job_key, 600, 'completed') + redis_client.setex(enable_app_annotation_job_key, 600, "completed") end_at = time.perf_counter() logging.info( - click.style('App annotations added to index: {} latency: {}'.format(app_id, end_at - start_at), - fg='green')) + click.style("App annotations added to index: {} latency: {}".format(app_id, end_at - start_at), fg="green") + ) except Exception as e: logging.exception("Annotation batch created index failed:{}".format(str(e))) - redis_client.setex(enable_app_annotation_job_key, 600, 'error') - enable_app_annotation_error_key = 'enable_app_annotation_error_{}'.format(str(job_id)) + redis_client.setex(enable_app_annotation_job_key, 600, "error") + enable_app_annotation_error_key = "enable_app_annotation_error_{}".format(str(job_id)) redis_client.setex(enable_app_annotation_error_key, 600, str(e)) db.session.rollback() finally: diff --git a/api/tasks/annotation/update_annotation_to_index_task.py b/api/tasks/annotation/update_annotation_to_index_task.py index 7219abd3cdf6fc..b685d84d07ad28 100644 --- a/api/tasks/annotation/update_annotation_to_index_task.py +++ b/api/tasks/annotation/update_annotation_to_index_task.py @@ -10,9 +10,10 @@ from services.dataset_service import DatasetCollectionBindingService -@shared_task(queue='dataset') -def update_annotation_to_index_task(annotation_id: str, question: str, tenant_id: str, app_id: str, - collection_binding_id: str): +@shared_task(queue="dataset") +def update_annotation_to_index_task( + annotation_id: str, question: str, tenant_id: str, app_id: str, collection_binding_id: str +): """ Update annotation to index. :param annotation_id: annotation id @@ -23,39 +24,35 @@ def update_annotation_to_index_task(annotation_id: str, question: str, tenant_id Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) """ - logging.info(click.style('Start update index for annotation: {}'.format(annotation_id), fg='green')) + logging.info(click.style("Start update index for annotation: {}".format(annotation_id), fg="green")) start_at = time.perf_counter() try: dataset_collection_binding = DatasetCollectionBindingService.get_dataset_collection_binding_by_id_and_type( - collection_binding_id, - 'annotation' + collection_binding_id, "annotation" ) dataset = Dataset( id=app_id, tenant_id=tenant_id, - indexing_technique='high_quality', + indexing_technique="high_quality", embedding_model_provider=dataset_collection_binding.provider_name, embedding_model=dataset_collection_binding.model_name, - collection_binding_id=dataset_collection_binding.id + collection_binding_id=dataset_collection_binding.id, ) document = Document( - page_content=question, - metadata={ - "annotation_id": annotation_id, - "app_id": app_id, - "doc_id": annotation_id - } + page_content=question, metadata={"annotation_id": annotation_id, "app_id": app_id, "doc_id": annotation_id} ) - vector = Vector(dataset, attributes=['doc_id', 'annotation_id', 'app_id']) - vector.delete_by_metadata_field('annotation_id', annotation_id) + vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) + vector.delete_by_metadata_field("annotation_id", annotation_id) vector.add_texts([document]) end_at = time.perf_counter() logging.info( click.style( - 'Build index successful for annotation: {} latency: {}'.format(annotation_id, end_at - start_at), - fg='green')) + "Build index successful for annotation: {} latency: {}".format(annotation_id, end_at - start_at), + fg="green", + ) + ) except Exception: logging.exception("Build index for annotation failed") diff --git a/api/tasks/batch_create_segment_to_index_task.py b/api/tasks/batch_create_segment_to_index_task.py index 67cc03bdebee54..de7f0ddec1f3b6 100644 --- a/api/tasks/batch_create_segment_to_index_task.py +++ b/api/tasks/batch_create_segment_to_index_task.py @@ -16,9 +16,10 @@ from models.dataset import Dataset, Document, DocumentSegment -@shared_task(queue='dataset') -def batch_create_segment_to_index_task(job_id: str, content: list, dataset_id: str, document_id: str, - tenant_id: str, user_id: str): +@shared_task(queue="dataset") +def batch_create_segment_to_index_task( + job_id: str, content: list, dataset_id: str, document_id: str, tenant_id: str, user_id: str +): """ Async batch create segment to index :param job_id: @@ -30,44 +31,44 @@ def batch_create_segment_to_index_task(job_id: str, content: list, dataset_id: s Usage: batch_create_segment_to_index_task.delay(segment_id) """ - logging.info(click.style('Start batch create segment jobId: {}'.format(job_id), fg='green')) + logging.info(click.style("Start batch create segment jobId: {}".format(job_id), fg="green")) start_at = time.perf_counter() - indexing_cache_key = 'segment_batch_import_{}'.format(job_id) + indexing_cache_key = "segment_batch_import_{}".format(job_id) try: dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: - raise ValueError('Dataset not exist.') + raise ValueError("Dataset not exist.") dataset_document = db.session.query(Document).filter(Document.id == document_id).first() if not dataset_document: - raise ValueError('Document not exist.') + raise ValueError("Document not exist.") - if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': - raise ValueError('Document is not available.') + if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": + raise ValueError("Document is not available.") document_segments = [] embedding_model = None - if dataset.indexing_technique == 'high_quality': + if dataset.indexing_technique == "high_quality": model_manager = ModelManager() embedding_model = model_manager.get_model_instance( tenant_id=dataset.tenant_id, provider=dataset.embedding_model_provider, model_type=ModelType.TEXT_EMBEDDING, - model=dataset.embedding_model + model=dataset.embedding_model, ) for segment in content: - content = segment['content'] + content = segment["content"] doc_id = str(uuid.uuid4()) segment_hash = helper.generate_text_hash(content) # calc embedding use tokens - tokens = embedding_model.get_text_embedding_num_tokens( - texts=[content] - ) if embedding_model else 0 - max_position = db.session.query(func.max(DocumentSegment.position)).filter( - DocumentSegment.document_id == dataset_document.id - ).scalar() + tokens = embedding_model.get_text_embedding_num_tokens(texts=[content]) if embedding_model else 0 + max_position = ( + db.session.query(func.max(DocumentSegment.position)) + .filter(DocumentSegment.document_id == dataset_document.id) + .scalar() + ) segment_document = DocumentSegment( tenant_id=tenant_id, dataset_id=dataset_id, @@ -80,20 +81,22 @@ def batch_create_segment_to_index_task(job_id: str, content: list, dataset_id: s tokens=tokens, created_by=user_id, indexing_at=datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), - status='completed', - completed_at=datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + status="completed", + completed_at=datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), ) - if dataset_document.doc_form == 'qa_model': - segment_document.answer = segment['answer'] + if dataset_document.doc_form == "qa_model": + segment_document.answer = segment["answer"] db.session.add(segment_document) document_segments.append(segment_document) # add index to db indexing_runner = IndexingRunner() indexing_runner.batch_add_segments(document_segments, dataset) db.session.commit() - redis_client.setex(indexing_cache_key, 600, 'completed') + redis_client.setex(indexing_cache_key, 600, "completed") end_at = time.perf_counter() - logging.info(click.style('Segment batch created job: {} latency: {}'.format(job_id, end_at - start_at), fg='green')) + logging.info( + click.style("Segment batch created job: {} latency: {}".format(job_id, end_at - start_at), fg="green") + ) except Exception as e: logging.exception("Segments batch created index failed:{}".format(str(e))) - redis_client.setex(indexing_cache_key, 600, 'error') + redis_client.setex(indexing_cache_key, 600, "error") diff --git a/api/tasks/clean_dataset_task.py b/api/tasks/clean_dataset_task.py index 1f26c966c46af5..36249038011747 100644 --- a/api/tasks/clean_dataset_task.py +++ b/api/tasks/clean_dataset_task.py @@ -19,9 +19,15 @@ # Add import statement for ValueError -@shared_task(queue='dataset') -def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, - index_struct: str, collection_binding_id: str, doc_form: str): +@shared_task(queue="dataset") +def clean_dataset_task( + dataset_id: str, + tenant_id: str, + indexing_technique: str, + index_struct: str, + collection_binding_id: str, + doc_form: str, +): """ Clean dataset when dataset deleted. :param dataset_id: dataset id @@ -33,7 +39,7 @@ def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct) """ - logging.info(click.style('Start clean dataset when dataset deleted: {}'.format(dataset_id), fg='green')) + logging.info(click.style("Start clean dataset when dataset deleted: {}".format(dataset_id), fg="green")) start_at = time.perf_counter() try: @@ -48,9 +54,9 @@ def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, segments = db.session.query(DocumentSegment).filter(DocumentSegment.dataset_id == dataset_id).all() if documents is None or len(documents) == 0: - logging.info(click.style('No documents found for dataset: {}'.format(dataset_id), fg='green')) + logging.info(click.style("No documents found for dataset: {}".format(dataset_id), fg="green")) else: - logging.info(click.style('Cleaning documents for dataset: {}'.format(dataset_id), fg='green')) + logging.info(click.style("Cleaning documents for dataset: {}".format(dataset_id), fg="green")) # Specify the index type before initializing the index processor if doc_form is None: raise ValueError("Index type must be specified.") @@ -71,15 +77,16 @@ def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, if documents: for document in documents: try: - if document.data_source_type == 'upload_file': + if document.data_source_type == "upload_file": if document.data_source_info: data_source_info = document.data_source_info_dict - if data_source_info and 'upload_file_id' in data_source_info: - file_id = data_source_info['upload_file_id'] - file = db.session.query(UploadFile).filter( - UploadFile.tenant_id == document.tenant_id, - UploadFile.id == file_id - ).first() + if data_source_info and "upload_file_id" in data_source_info: + file_id = data_source_info["upload_file_id"] + file = ( + db.session.query(UploadFile) + .filter(UploadFile.tenant_id == document.tenant_id, UploadFile.id == file_id) + .first() + ) if not file: continue storage.delete(file.key) @@ -90,6 +97,9 @@ def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str, db.session.commit() end_at = time.perf_counter() logging.info( - click.style('Cleaned dataset when dataset deleted: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) + click.style( + "Cleaned dataset when dataset deleted: {} latency: {}".format(dataset_id, end_at - start_at), fg="green" + ) + ) except Exception: logging.exception("Cleaned dataset when dataset deleted failed") diff --git a/api/tasks/clean_document_task.py b/api/tasks/clean_document_task.py index 0fd05615b65430..ae2855aa2ebc4d 100644 --- a/api/tasks/clean_document_task.py +++ b/api/tasks/clean_document_task.py @@ -12,7 +12,7 @@ from models.model import UploadFile -@shared_task(queue='dataset') +@shared_task(queue="dataset") def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_id: Optional[str]): """ Clean document when document deleted. @@ -23,14 +23,14 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_i Usage: clean_document_task.delay(document_id, dataset_id) """ - logging.info(click.style('Start clean document when document deleted: {}'.format(document_id), fg='green')) + logging.info(click.style("Start clean document when document deleted: {}".format(document_id), fg="green")) start_at = time.perf_counter() try: dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: - raise Exception('Document has no dataset') + raise Exception("Document has no dataset") segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() # check segment is exist @@ -44,9 +44,7 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_i db.session.commit() if file_id: - file = db.session.query(UploadFile).filter( - UploadFile.id == file_id - ).first() + file = db.session.query(UploadFile).filter(UploadFile.id == file_id).first() if file: try: storage.delete(file.key) @@ -57,6 +55,10 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_i end_at = time.perf_counter() logging.info( - click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) + click.style( + "Cleaned document when document deleted: {} latency: {}".format(document_id, end_at - start_at), + fg="green", + ) + ) except Exception: logging.exception("Cleaned document when document deleted failed") diff --git a/api/tasks/clean_notion_document_task.py b/api/tasks/clean_notion_document_task.py index 9b697b63511a12..75d9e031306381 100644 --- a/api/tasks/clean_notion_document_task.py +++ b/api/tasks/clean_notion_document_task.py @@ -9,7 +9,7 @@ from models.dataset import Dataset, Document, DocumentSegment -@shared_task(queue='dataset') +@shared_task(queue="dataset") def clean_notion_document_task(document_ids: list[str], dataset_id: str): """ Clean document when document deleted. @@ -18,20 +18,20 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str): Usage: clean_notion_document_task.delay(document_ids, dataset_id) """ - logging.info(click.style('Start clean document when import form notion document deleted: {}'.format(dataset_id), fg='green')) + logging.info( + click.style("Start clean document when import form notion document deleted: {}".format(dataset_id), fg="green") + ) start_at = time.perf_counter() try: dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: - raise Exception('Document has no dataset') + raise Exception("Document has no dataset") index_type = dataset.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() for document_id in document_ids: - document = db.session.query(Document).filter( - Document.id == document_id - ).first() + document = db.session.query(Document).filter(Document.id == document_id).first() db.session.delete(document) segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all() @@ -44,8 +44,12 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str): db.session.commit() end_at = time.perf_counter() logging.info( - click.style('Clean document when import form notion document deleted end :: {} latency: {}'.format( - dataset_id, end_at - start_at), - fg='green')) + click.style( + "Clean document when import form notion document deleted end :: {} latency: {}".format( + dataset_id, end_at - start_at + ), + fg="green", + ) + ) except Exception: logging.exception("Cleaned document when import form notion document deleted failed") diff --git a/api/tasks/create_segment_to_index_task.py b/api/tasks/create_segment_to_index_task.py index d31286e4cc4c42..26375743b68e25 100644 --- a/api/tasks/create_segment_to_index_task.py +++ b/api/tasks/create_segment_to_index_task.py @@ -14,7 +14,7 @@ from models.dataset import DocumentSegment -@shared_task(queue='dataset') +@shared_task(queue="dataset") def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] = None): """ Async create segment to index @@ -22,23 +22,23 @@ def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] :param keywords: Usage: create_segment_to_index_task.delay(segment_id) """ - logging.info(click.style('Start create segment to index: {}'.format(segment_id), fg='green')) + logging.info(click.style("Start create segment to index: {}".format(segment_id), fg="green")) start_at = time.perf_counter() segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() if not segment: - raise NotFound('Segment not found') + raise NotFound("Segment not found") - if segment.status != 'waiting': + if segment.status != "waiting": return - indexing_cache_key = 'segment_{}_indexing'.format(segment.id) + indexing_cache_key = "segment_{}_indexing".format(segment.id) try: # update segment status to indexing update_params = { DocumentSegment.status: "indexing", - DocumentSegment.indexing_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + DocumentSegment.indexing_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), } DocumentSegment.query.filter_by(id=segment.id).update(update_params) db.session.commit() @@ -49,23 +49,23 @@ def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] "doc_hash": segment.index_node_hash, "document_id": segment.document_id, "dataset_id": segment.dataset_id, - } + }, ) dataset = segment.dataset if not dataset: - logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan')) + logging.info(click.style("Segment {} has no dataset, pass.".format(segment.id), fg="cyan")) return dataset_document = segment.document if not dataset_document: - logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan')) + logging.info(click.style("Segment {} has no document, pass.".format(segment.id), fg="cyan")) return - if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': - logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan')) + if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": + logging.info(click.style("Segment {} document status is invalid, pass.".format(segment.id), fg="cyan")) return index_type = dataset.doc_form @@ -75,18 +75,20 @@ def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] # update segment to completed update_params = { DocumentSegment.status: "completed", - DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), } DocumentSegment.query.filter_by(id=segment.id).update(update_params) db.session.commit() end_at = time.perf_counter() - logging.info(click.style('Segment created to index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green')) + logging.info( + click.style("Segment created to index: {} latency: {}".format(segment.id, end_at - start_at), fg="green") + ) except Exception as e: logging.exception("create segment to index failed") segment.enabled = False segment.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - segment.status = 'error' + segment.status = "error" segment.error = str(e) db.session.commit() finally: diff --git a/api/tasks/deal_dataset_vector_index_task.py b/api/tasks/deal_dataset_vector_index_task.py index ce93e111e54aa4..cfc54920e23caa 100644 --- a/api/tasks/deal_dataset_vector_index_task.py +++ b/api/tasks/deal_dataset_vector_index_task.py @@ -11,7 +11,7 @@ from models.dataset import Document as DatasetDocument -@shared_task(queue='dataset') +@shared_task(queue="dataset") def deal_dataset_vector_index_task(dataset_id: str, action: str): """ Async deal dataset from index @@ -19,41 +19,46 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str): :param action: action Usage: deal_dataset_vector_index_task.delay(dataset_id, action) """ - logging.info(click.style('Start deal dataset vector index: {}'.format(dataset_id), fg='green')) + logging.info(click.style("Start deal dataset vector index: {}".format(dataset_id), fg="green")) start_at = time.perf_counter() try: - dataset = Dataset.query.filter_by( - id=dataset_id - ).first() + dataset = Dataset.query.filter_by(id=dataset_id).first() if not dataset: - raise Exception('Dataset not found') + raise Exception("Dataset not found") index_type = dataset.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() if action == "remove": index_processor.clean(dataset, None, with_keywords=False) elif action == "add": - dataset_documents = db.session.query(DatasetDocument).filter( - DatasetDocument.dataset_id == dataset_id, - DatasetDocument.indexing_status == 'completed', - DatasetDocument.enabled == True, - DatasetDocument.archived == False, - ).all() + dataset_documents = ( + db.session.query(DatasetDocument) + .filter( + DatasetDocument.dataset_id == dataset_id, + DatasetDocument.indexing_status == "completed", + DatasetDocument.enabled == True, + DatasetDocument.archived == False, + ) + .all() + ) if dataset_documents: dataset_documents_ids = [doc.id for doc in dataset_documents] - db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \ - .update({"indexing_status": "indexing"}, synchronize_session=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)).update( + {"indexing_status": "indexing"}, synchronize_session=False + ) db.session.commit() for dataset_document in dataset_documents: try: # add from vector index - segments = db.session.query(DocumentSegment).filter( - DocumentSegment.document_id == dataset_document.id, - DocumentSegment.enabled == True - ) .order_by(DocumentSegment.position.asc()).all() + segments = ( + db.session.query(DocumentSegment) + .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True) + .order_by(DocumentSegment.position.asc()) + .all() + ) if segments: documents = [] for segment in segments: @@ -64,32 +69,39 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str): "doc_hash": segment.index_node_hash, "document_id": segment.document_id, "dataset_id": segment.dataset_id, - } + }, ) documents.append(document) # save vector index index_processor.load(dataset, documents, with_keywords=False) - db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ - .update({"indexing_status": "completed"}, synchronize_session=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( + {"indexing_status": "completed"}, synchronize_session=False + ) db.session.commit() except Exception as e: - db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ - .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( + {"indexing_status": "error", "error": str(e)}, synchronize_session=False + ) db.session.commit() - elif action == 'update': - dataset_documents = db.session.query(DatasetDocument).filter( - DatasetDocument.dataset_id == dataset_id, - DatasetDocument.indexing_status == 'completed', - DatasetDocument.enabled == True, - DatasetDocument.archived == False, - ).all() + elif action == "update": + dataset_documents = ( + db.session.query(DatasetDocument) + .filter( + DatasetDocument.dataset_id == dataset_id, + DatasetDocument.indexing_status == "completed", + DatasetDocument.enabled == True, + DatasetDocument.archived == False, + ) + .all() + ) # add new index if dataset_documents: # update document status dataset_documents_ids = [doc.id for doc in dataset_documents] - db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)) \ - .update({"indexing_status": "indexing"}, synchronize_session=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id.in_(dataset_documents_ids)).update( + {"indexing_status": "indexing"}, synchronize_session=False + ) db.session.commit() # clean index @@ -98,10 +110,12 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str): for dataset_document in dataset_documents: # update from vector index try: - segments = db.session.query(DocumentSegment).filter( - DocumentSegment.document_id == dataset_document.id, - DocumentSegment.enabled == True - ).order_by(DocumentSegment.position.asc()).all() + segments = ( + db.session.query(DocumentSegment) + .filter(DocumentSegment.document_id == dataset_document.id, DocumentSegment.enabled == True) + .order_by(DocumentSegment.position.asc()) + .all() + ) if segments: documents = [] for segment in segments: @@ -112,23 +126,25 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str): "doc_hash": segment.index_node_hash, "document_id": segment.document_id, "dataset_id": segment.dataset_id, - } + }, ) documents.append(document) # save vector index index_processor.load(dataset, documents, with_keywords=False) - db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ - .update({"indexing_status": "completed"}, synchronize_session=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( + {"indexing_status": "completed"}, synchronize_session=False + ) db.session.commit() except Exception as e: - db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id) \ - .update({"indexing_status": "error", "error": str(e)}, synchronize_session=False) + db.session.query(DatasetDocument).filter(DatasetDocument.id == dataset_document.id).update( + {"indexing_status": "error", "error": str(e)}, synchronize_session=False + ) db.session.commit() - end_at = time.perf_counter() logging.info( - click.style('Deal dataset vector index: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) + click.style("Deal dataset vector index: {} latency: {}".format(dataset_id, end_at - start_at), fg="green") + ) except Exception: logging.exception("Deal dataset vector index failed") diff --git a/api/tasks/delete_segment_from_index_task.py b/api/tasks/delete_segment_from_index_task.py index d79286cf3d8a3c..c3e0ea5d9fbb77 100644 --- a/api/tasks/delete_segment_from_index_task.py +++ b/api/tasks/delete_segment_from_index_task.py @@ -10,7 +10,7 @@ from models.dataset import Dataset, Document -@shared_task(queue='dataset') +@shared_task(queue="dataset") def delete_segment_from_index_task(segment_id: str, index_node_id: str, dataset_id: str, document_id: str): """ Async Remove segment from index @@ -21,22 +21,22 @@ def delete_segment_from_index_task(segment_id: str, index_node_id: str, dataset_ Usage: delete_segment_from_index_task.delay(segment_id) """ - logging.info(click.style('Start delete segment from index: {}'.format(segment_id), fg='green')) + logging.info(click.style("Start delete segment from index: {}".format(segment_id), fg="green")) start_at = time.perf_counter() - indexing_cache_key = 'segment_{}_delete_indexing'.format(segment_id) + indexing_cache_key = "segment_{}_delete_indexing".format(segment_id) try: dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: - logging.info(click.style('Segment {} has no dataset, pass.'.format(segment_id), fg='cyan')) + logging.info(click.style("Segment {} has no dataset, pass.".format(segment_id), fg="cyan")) return dataset_document = db.session.query(Document).filter(Document.id == document_id).first() if not dataset_document: - logging.info(click.style('Segment {} has no document, pass.'.format(segment_id), fg='cyan')) + logging.info(click.style("Segment {} has no document, pass.".format(segment_id), fg="cyan")) return - if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': - logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment_id), fg='cyan')) + if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": + logging.info(click.style("Segment {} document status is invalid, pass.".format(segment_id), fg="cyan")) return index_type = dataset_document.doc_form @@ -44,7 +44,9 @@ def delete_segment_from_index_task(segment_id: str, index_node_id: str, dataset_ index_processor.clean(dataset, [index_node_id]) end_at = time.perf_counter() - logging.info(click.style('Segment deleted from index: {} latency: {}'.format(segment_id, end_at - start_at), fg='green')) + logging.info( + click.style("Segment deleted from index: {} latency: {}".format(segment_id, end_at - start_at), fg="green") + ) except Exception: logging.exception("delete segment from index failed") finally: diff --git a/api/tasks/disable_segment_from_index_task.py b/api/tasks/disable_segment_from_index_task.py index 4788bf4e4b5960..15e1e50076e8c9 100644 --- a/api/tasks/disable_segment_from_index_task.py +++ b/api/tasks/disable_segment_from_index_task.py @@ -11,7 +11,7 @@ from models.dataset import DocumentSegment -@shared_task(queue='dataset') +@shared_task(queue="dataset") def disable_segment_from_index_task(segment_id: str): """ Async disable segment from index @@ -19,33 +19,33 @@ def disable_segment_from_index_task(segment_id: str): Usage: disable_segment_from_index_task.delay(segment_id) """ - logging.info(click.style('Start disable segment from index: {}'.format(segment_id), fg='green')) + logging.info(click.style("Start disable segment from index: {}".format(segment_id), fg="green")) start_at = time.perf_counter() segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() if not segment: - raise NotFound('Segment not found') + raise NotFound("Segment not found") - if segment.status != 'completed': - raise NotFound('Segment is not completed , disable action is not allowed.') + if segment.status != "completed": + raise NotFound("Segment is not completed , disable action is not allowed.") - indexing_cache_key = 'segment_{}_indexing'.format(segment.id) + indexing_cache_key = "segment_{}_indexing".format(segment.id) try: dataset = segment.dataset if not dataset: - logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan')) + logging.info(click.style("Segment {} has no dataset, pass.".format(segment.id), fg="cyan")) return dataset_document = segment.document if not dataset_document: - logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan')) + logging.info(click.style("Segment {} has no document, pass.".format(segment.id), fg="cyan")) return - if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': - logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan')) + if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": + logging.info(click.style("Segment {} document status is invalid, pass.".format(segment.id), fg="cyan")) return index_type = dataset_document.doc_form @@ -53,7 +53,9 @@ def disable_segment_from_index_task(segment_id: str): index_processor.clean(dataset, [segment.index_node_id]) end_at = time.perf_counter() - logging.info(click.style('Segment removed from index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green')) + logging.info( + click.style("Segment removed from index: {} latency: {}".format(segment.id, end_at - start_at), fg="green") + ) except Exception: logging.exception("remove segment from index failed") segment.enabled = True diff --git a/api/tasks/document_indexing_sync_task.py b/api/tasks/document_indexing_sync_task.py index 4cced36ecdd856..9ea4c99649cfe3 100644 --- a/api/tasks/document_indexing_sync_task.py +++ b/api/tasks/document_indexing_sync_task.py @@ -14,7 +14,7 @@ from models.source import DataSourceOauthBinding -@shared_task(queue='dataset') +@shared_task(queue="dataset") def document_indexing_sync_task(dataset_id: str, document_id: str): """ Async update document @@ -23,50 +23,50 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): Usage: document_indexing_sync_task.delay(dataset_id, document_id) """ - logging.info(click.style('Start sync document: {}'.format(document_id), fg='green')) + logging.info(click.style("Start sync document: {}".format(document_id), fg="green")) start_at = time.perf_counter() - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() if not document: - raise NotFound('Document not found') + raise NotFound("Document not found") data_source_info = document.data_source_info_dict - if document.data_source_type == 'notion_import': - if not data_source_info or 'notion_page_id' not in data_source_info \ - or 'notion_workspace_id' not in data_source_info: + if document.data_source_type == "notion_import": + if ( + not data_source_info + or "notion_page_id" not in data_source_info + or "notion_workspace_id" not in data_source_info + ): raise ValueError("no notion page found") - workspace_id = data_source_info['notion_workspace_id'] - page_id = data_source_info['notion_page_id'] - page_type = data_source_info['type'] - page_edited_time = data_source_info['last_edited_time'] + workspace_id = data_source_info["notion_workspace_id"] + page_id = data_source_info["notion_page_id"] + page_type = data_source_info["type"] + page_edited_time = data_source_info["last_edited_time"] data_source_binding = DataSourceOauthBinding.query.filter( db.and_( DataSourceOauthBinding.tenant_id == document.tenant_id, - DataSourceOauthBinding.provider == 'notion', + DataSourceOauthBinding.provider == "notion", DataSourceOauthBinding.disabled == False, - DataSourceOauthBinding.source_info['workspace_id'] == f'"{workspace_id}"' + DataSourceOauthBinding.source_info["workspace_id"] == f'"{workspace_id}"', ) ).first() if not data_source_binding: - raise ValueError('Data source binding not found.') + raise ValueError("Data source binding not found.") loader = NotionExtractor( notion_workspace_id=workspace_id, notion_obj_id=page_id, notion_page_type=page_type, notion_access_token=data_source_binding.access_token, - tenant_id=document.tenant_id + tenant_id=document.tenant_id, ) last_edited_time = loader.get_notion_last_edited_time() # check the page is updated if last_edited_time != page_edited_time: - document.indexing_status = 'parsing' + document.indexing_status = "parsing" document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) db.session.commit() @@ -74,7 +74,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): try: dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: - raise Exception('Dataset not found') + raise Exception("Dataset not found") index_type = document.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() @@ -89,7 +89,13 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): end_at = time.perf_counter() logging.info( - click.style('Cleaned document when document update data source or process rule: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) + click.style( + "Cleaned document when document update data source or process rule: {} latency: {}".format( + document_id, end_at - start_at + ), + fg="green", + ) + ) except Exception: logging.exception("Cleaned document when document update data source or process rule failed") @@ -97,8 +103,10 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): indexing_runner = IndexingRunner() indexing_runner.run([document]) end_at = time.perf_counter() - logging.info(click.style('update document: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) + logging.info( + click.style("update document: {} latency: {}".format(document.id, end_at - start_at), fg="green") + ) except DocumentIsPausedException as ex: - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) except Exception: pass diff --git a/api/tasks/document_indexing_task.py b/api/tasks/document_indexing_task.py index cc93a1341e180d..e0da5f9ed032c7 100644 --- a/api/tasks/document_indexing_task.py +++ b/api/tasks/document_indexing_task.py @@ -12,7 +12,7 @@ from services.feature_service import FeatureService -@shared_task(queue='dataset') +@shared_task(queue="dataset") def document_indexing_task(dataset_id: str, document_ids: list): """ Async process document @@ -36,16 +36,17 @@ def document_indexing_task(dataset_id: str, document_ids: list): if count > batch_upload_limit: raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") if 0 < vector_space.limit <= vector_space.size: - raise ValueError("Your total number of documents plus the number of uploads have over the limit of " - "your subscription.") + raise ValueError( + "Your total number of documents plus the number of uploads have over the limit of " + "your subscription." + ) except Exception as e: for document_id in document_ids: - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = ( + db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() + ) if document: - document.indexing_status = 'error' + document.indexing_status = "error" document.error = str(e) document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) db.session.add(document) @@ -53,15 +54,14 @@ def document_indexing_task(dataset_id: str, document_ids: list): return for document_id in document_ids: - logging.info(click.style('Start process document: {}'.format(document_id), fg='green')) + logging.info(click.style("Start process document: {}".format(document_id), fg="green")) - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = ( + db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() + ) if document: - document.indexing_status = 'parsing' + document.indexing_status = "parsing" document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) documents.append(document) db.session.add(document) @@ -71,8 +71,8 @@ def document_indexing_task(dataset_id: str, document_ids: list): indexing_runner = IndexingRunner() indexing_runner.run(documents) end_at = time.perf_counter() - logging.info(click.style('Processed dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) + logging.info(click.style("Processed dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")) except DocumentIsPausedException as ex: - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) except Exception: pass diff --git a/api/tasks/document_indexing_update_task.py b/api/tasks/document_indexing_update_task.py index f129d93de8da5a..6e681bcf4f614a 100644 --- a/api/tasks/document_indexing_update_task.py +++ b/api/tasks/document_indexing_update_task.py @@ -12,7 +12,7 @@ from models.dataset import Dataset, Document, DocumentSegment -@shared_task(queue='dataset') +@shared_task(queue="dataset") def document_indexing_update_task(dataset_id: str, document_id: str): """ Async update document @@ -21,18 +21,15 @@ def document_indexing_update_task(dataset_id: str, document_id: str): Usage: document_indexing_update_task.delay(dataset_id, document_id) """ - logging.info(click.style('Start update document: {}'.format(document_id), fg='green')) + logging.info(click.style("Start update document: {}".format(document_id), fg="green")) start_at = time.perf_counter() - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() if not document: - raise NotFound('Document not found') + raise NotFound("Document not found") - document.indexing_status = 'parsing' + document.indexing_status = "parsing" document.processing_started_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) db.session.commit() @@ -40,7 +37,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str): try: dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: - raise Exception('Dataset not found') + raise Exception("Dataset not found") index_type = document.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() @@ -57,7 +54,13 @@ def document_indexing_update_task(dataset_id: str, document_id: str): db.session.commit() end_at = time.perf_counter() logging.info( - click.style('Cleaned document when document update data source or process rule: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) + click.style( + "Cleaned document when document update data source or process rule: {} latency: {}".format( + document_id, end_at - start_at + ), + fg="green", + ) + ) except Exception: logging.exception("Cleaned document when document update data source or process rule failed") @@ -65,8 +68,8 @@ def document_indexing_update_task(dataset_id: str, document_id: str): indexing_runner = IndexingRunner() indexing_runner.run([document]) end_at = time.perf_counter() - logging.info(click.style('update document: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) + logging.info(click.style("update document: {} latency: {}".format(document.id, end_at - start_at), fg="green")) except DocumentIsPausedException as ex: - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) except Exception: pass diff --git a/api/tasks/duplicate_document_indexing_task.py b/api/tasks/duplicate_document_indexing_task.py index 884e222d1b3eef..0a7568c385ba98 100644 --- a/api/tasks/duplicate_document_indexing_task.py +++ b/api/tasks/duplicate_document_indexing_task.py @@ -13,7 +13,7 @@ from services.feature_service import FeatureService -@shared_task(queue='dataset') +@shared_task(queue="dataset") def duplicate_document_indexing_task(dataset_id: str, document_ids: list): """ Async process document @@ -37,16 +37,17 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): if count > batch_upload_limit: raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.") if 0 < vector_space.limit <= vector_space.size: - raise ValueError("Your total number of documents plus the number of uploads have over the limit of " - "your subscription.") + raise ValueError( + "Your total number of documents plus the number of uploads have over the limit of " + "your subscription." + ) except Exception as e: for document_id in document_ids: - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = ( + db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() + ) if document: - document.indexing_status = 'error' + document.indexing_status = "error" document.error = str(e) document.stopped_at = datetime.datetime.utcnow() db.session.add(document) @@ -54,12 +55,11 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): return for document_id in document_ids: - logging.info(click.style('Start process document: {}'.format(document_id), fg='green')) + logging.info(click.style("Start process document: {}".format(document_id), fg="green")) - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = ( + db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() + ) if document: # clean old data @@ -77,7 +77,7 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): db.session.delete(segment) db.session.commit() - document.indexing_status = 'parsing' + document.indexing_status = "parsing" document.processing_started_at = datetime.datetime.utcnow() documents.append(document) db.session.add(document) @@ -87,8 +87,8 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): indexing_runner = IndexingRunner() indexing_runner.run(documents) end_at = time.perf_counter() - logging.info(click.style('Processed dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) + logging.info(click.style("Processed dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")) except DocumentIsPausedException as ex: - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) except Exception: pass diff --git a/api/tasks/enable_segment_to_index_task.py b/api/tasks/enable_segment_to_index_task.py index e37c06855d47de..1412ad9ec74c06 100644 --- a/api/tasks/enable_segment_to_index_task.py +++ b/api/tasks/enable_segment_to_index_task.py @@ -13,7 +13,7 @@ from models.dataset import DocumentSegment -@shared_task(queue='dataset') +@shared_task(queue="dataset") def enable_segment_to_index_task(segment_id: str): """ Async enable segment to index @@ -21,17 +21,17 @@ def enable_segment_to_index_task(segment_id: str): Usage: enable_segment_to_index_task.delay(segment_id) """ - logging.info(click.style('Start enable segment to index: {}'.format(segment_id), fg='green')) + logging.info(click.style("Start enable segment to index: {}".format(segment_id), fg="green")) start_at = time.perf_counter() segment = db.session.query(DocumentSegment).filter(DocumentSegment.id == segment_id).first() if not segment: - raise NotFound('Segment not found') + raise NotFound("Segment not found") - if segment.status != 'completed': - raise NotFound('Segment is not completed, enable action is not allowed.') + if segment.status != "completed": + raise NotFound("Segment is not completed, enable action is not allowed.") - indexing_cache_key = 'segment_{}_indexing'.format(segment.id) + indexing_cache_key = "segment_{}_indexing".format(segment.id) try: document = Document( @@ -41,23 +41,23 @@ def enable_segment_to_index_task(segment_id: str): "doc_hash": segment.index_node_hash, "document_id": segment.document_id, "dataset_id": segment.dataset_id, - } + }, ) dataset = segment.dataset if not dataset: - logging.info(click.style('Segment {} has no dataset, pass.'.format(segment.id), fg='cyan')) + logging.info(click.style("Segment {} has no dataset, pass.".format(segment.id), fg="cyan")) return dataset_document = segment.document if not dataset_document: - logging.info(click.style('Segment {} has no document, pass.'.format(segment.id), fg='cyan')) + logging.info(click.style("Segment {} has no document, pass.".format(segment.id), fg="cyan")) return - if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != 'completed': - logging.info(click.style('Segment {} document status is invalid, pass.'.format(segment.id), fg='cyan')) + if not dataset_document.enabled or dataset_document.archived or dataset_document.indexing_status != "completed": + logging.info(click.style("Segment {} document status is invalid, pass.".format(segment.id), fg="cyan")) return index_processor = IndexProcessorFactory(dataset_document.doc_form).init_index_processor() @@ -65,12 +65,14 @@ def enable_segment_to_index_task(segment_id: str): index_processor.load(dataset, [document]) end_at = time.perf_counter() - logging.info(click.style('Segment enabled to index: {} latency: {}'.format(segment.id, end_at - start_at), fg='green')) + logging.info( + click.style("Segment enabled to index: {} latency: {}".format(segment.id, end_at - start_at), fg="green") + ) except Exception as e: logging.exception("enable segment to index failed") segment.enabled = False segment.disabled_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - segment.status = 'error' + segment.status = "error" segment.error = str(e) db.session.commit() finally: diff --git a/api/tasks/mail_invite_member_task.py b/api/tasks/mail_invite_member_task.py index a46eafa797907c..4ef6e29994a3d0 100644 --- a/api/tasks/mail_invite_member_task.py +++ b/api/tasks/mail_invite_member_task.py @@ -9,7 +9,7 @@ from extensions.ext_mail import mail -@shared_task(queue='mail') +@shared_task(queue="mail") def send_invite_member_mail_task(language: str, to: str, token: str, inviter_name: str, workspace_name: str): """ Async Send invite member mail @@ -24,31 +24,38 @@ def send_invite_member_mail_task(language: str, to: str, token: str, inviter_nam if not mail.is_inited(): return - logging.info(click.style('Start send invite member mail to {} in workspace {}'.format(to, workspace_name), - fg='green')) + logging.info( + click.style("Start send invite member mail to {} in workspace {}".format(to, workspace_name), fg="green") + ) start_at = time.perf_counter() # send invite member mail using different languages try: - url = f'{dify_config.CONSOLE_WEB_URL}/activate?token={token}' - if language == 'zh-Hans': - html_content = render_template('invite_member_mail_template_zh-CN.html', - to=to, - inviter_name=inviter_name, - workspace_name=workspace_name, - url=url) + url = f"{dify_config.CONSOLE_WEB_URL}/activate?token={token}" + if language == "zh-Hans": + html_content = render_template( + "invite_member_mail_template_zh-CN.html", + to=to, + inviter_name=inviter_name, + workspace_name=workspace_name, + url=url, + ) mail.send(to=to, subject="立即加入 Dify 工作空间", html=html_content) else: - html_content = render_template('invite_member_mail_template_en-US.html', - to=to, - inviter_name=inviter_name, - workspace_name=workspace_name, - url=url) + html_content = render_template( + "invite_member_mail_template_en-US.html", + to=to, + inviter_name=inviter_name, + workspace_name=workspace_name, + url=url, + ) mail.send(to=to, subject="Join Dify Workspace Now", html=html_content) end_at = time.perf_counter() logging.info( - click.style('Send invite member mail to {} succeeded: latency: {}'.format(to, end_at - start_at), - fg='green')) + click.style( + "Send invite member mail to {} succeeded: latency: {}".format(to, end_at - start_at), fg="green" + ) + ) except Exception: - logging.exception("Send invite member mail to {} failed".format(to)) \ No newline at end of file + logging.exception("Send invite member mail to {} failed".format(to)) diff --git a/api/tasks/mail_reset_password_task.py b/api/tasks/mail_reset_password_task.py index 4e1b8a89135e7f..cbb78976cace4e 100644 --- a/api/tasks/mail_reset_password_task.py +++ b/api/tasks/mail_reset_password_task.py @@ -9,7 +9,7 @@ from extensions.ext_mail import mail -@shared_task(queue='mail') +@shared_task(queue="mail") def send_reset_password_mail_task(language: str, to: str, token: str): """ Async Send reset password mail @@ -20,26 +20,24 @@ def send_reset_password_mail_task(language: str, to: str, token: str): if not mail.is_inited(): return - logging.info(click.style('Start password reset mail to {}'.format(to), fg='green')) + logging.info(click.style("Start password reset mail to {}".format(to), fg="green")) start_at = time.perf_counter() # send reset password mail using different languages try: - url = f'{dify_config.CONSOLE_WEB_URL}/forgot-password?token={token}' - if language == 'zh-Hans': - html_content = render_template('reset_password_mail_template_zh-CN.html', - to=to, - url=url) + url = f"{dify_config.CONSOLE_WEB_URL}/forgot-password?token={token}" + if language == "zh-Hans": + html_content = render_template("reset_password_mail_template_zh-CN.html", to=to, url=url) mail.send(to=to, subject="重置您的 Dify 密码", html=html_content) else: - html_content = render_template('reset_password_mail_template_en-US.html', - to=to, - url=url) + html_content = render_template("reset_password_mail_template_en-US.html", to=to, url=url) mail.send(to=to, subject="Reset Your Dify Password", html=html_content) end_at = time.perf_counter() logging.info( - click.style('Send password reset mail to {} succeeded: latency: {}'.format(to, end_at - start_at), - fg='green')) + click.style( + "Send password reset mail to {} succeeded: latency: {}".format(to, end_at - start_at), fg="green" + ) + ) except Exception: logging.exception("Send password reset mail to {} failed".format(to)) diff --git a/api/tasks/ops_trace_task.py b/api/tasks/ops_trace_task.py index 6b4cab55b399ae..260069c6e2cd4e 100644 --- a/api/tasks/ops_trace_task.py +++ b/api/tasks/ops_trace_task.py @@ -10,7 +10,7 @@ from models.workflow import WorkflowRun -@shared_task(queue='ops_trace') +@shared_task(queue="ops_trace") def process_trace_tasks(tasks_data): """ Async process trace tasks @@ -20,17 +20,17 @@ def process_trace_tasks(tasks_data): """ from core.ops.ops_trace_manager import OpsTraceManager - trace_info = tasks_data.get('trace_info') - app_id = tasks_data.get('app_id') - trace_info_type = tasks_data.get('trace_info_type') + trace_info = tasks_data.get("trace_info") + app_id = tasks_data.get("app_id") + trace_info_type = tasks_data.get("trace_info_type") trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) - if trace_info.get('message_data'): - trace_info['message_data'] = Message.from_dict(data=trace_info['message_data']) - if trace_info.get('workflow_data'): - trace_info['workflow_data'] = WorkflowRun.from_dict(data=trace_info['workflow_data']) - if trace_info.get('documents'): - trace_info['documents'] = [Document(**doc) for doc in trace_info['documents']] + if trace_info.get("message_data"): + trace_info["message_data"] = Message.from_dict(data=trace_info["message_data"]) + if trace_info.get("workflow_data"): + trace_info["workflow_data"] = WorkflowRun.from_dict(data=trace_info["workflow_data"]) + if trace_info.get("documents"): + trace_info["documents"] = [Document(**doc) for doc in trace_info["documents"]] try: if trace_instance: diff --git a/api/tasks/recover_document_indexing_task.py b/api/tasks/recover_document_indexing_task.py index 02278f512b74b6..18bae14ffac549 100644 --- a/api/tasks/recover_document_indexing_task.py +++ b/api/tasks/recover_document_indexing_task.py @@ -10,7 +10,7 @@ from models.dataset import Document -@shared_task(queue='dataset') +@shared_task(queue="dataset") def recover_document_indexing_task(dataset_id: str, document_id: str): """ Async recover document @@ -19,16 +19,13 @@ def recover_document_indexing_task(dataset_id: str, document_id: str): Usage: recover_document_indexing_task.delay(dataset_id, document_id) """ - logging.info(click.style('Recover document: {}'.format(document_id), fg='green')) + logging.info(click.style("Recover document: {}".format(document_id), fg="green")) start_at = time.perf_counter() - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() if not document: - raise NotFound('Document not found') + raise NotFound("Document not found") try: indexing_runner = IndexingRunner() @@ -39,8 +36,10 @@ def recover_document_indexing_task(dataset_id: str, document_id: str): elif document.indexing_status == "indexing": indexing_runner.run_in_indexing_status(document) end_at = time.perf_counter() - logging.info(click.style('Processed document: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) + logging.info( + click.style("Processed document: {} latency: {}".format(document.id, end_at - start_at), fg="green") + ) except DocumentIsPausedException as ex: - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) except Exception: pass diff --git a/api/tasks/remove_app_and_related_data_task.py b/api/tasks/remove_app_and_related_data_task.py index 4efe7ee38c0b32..66f78636ecca60 100644 --- a/api/tasks/remove_app_and_related_data_task.py +++ b/api/tasks/remove_app_and_related_data_task.py @@ -33,9 +33,9 @@ from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun -@shared_task(queue='app_deletion', bind=True, max_retries=3) +@shared_task(queue="app_deletion", bind=True, max_retries=3) def remove_app_and_related_data_task(self, tenant_id: str, app_id: str): - logging.info(click.style(f'Start deleting app and related data: {tenant_id}:{app_id}', fg='green')) + logging.info(click.style(f"Start deleting app and related data: {tenant_id}:{app_id}", fg="green")) start_at = time.perf_counter() try: # Delete related data @@ -59,13 +59,14 @@ def remove_app_and_related_data_task(self, tenant_id: str, app_id: str): _delete_conversation_variables(app_id=app_id) end_at = time.perf_counter() - logging.info(click.style(f'App and related data deleted: {app_id} latency: {end_at - start_at}', fg='green')) + logging.info(click.style(f"App and related data deleted: {app_id} latency: {end_at - start_at}", fg="green")) except SQLAlchemyError as e: logging.exception( - click.style(f"Database error occurred while deleting app {app_id} and related data", fg='red')) + click.style(f"Database error occurred while deleting app {app_id} and related data", fg="red") + ) raise self.retry(exc=e, countdown=60) # Retry after 60 seconds except Exception as e: - logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg='red')) + logging.exception(click.style(f"Error occurred while deleting app {app_id} and related data", fg="red")) raise self.retry(exc=e, countdown=60) # Retry after 60 seconds @@ -77,7 +78,7 @@ def del_model_config(model_config_id: str): """select id from app_model_configs where app_id=:app_id limit 1000""", {"app_id": app_id}, del_model_config, - "app model config" + "app model config", ) @@ -85,12 +86,7 @@ def _delete_app_site(tenant_id: str, app_id: str): def del_site(site_id: str): db.session.query(Site).filter(Site.id == site_id).delete(synchronize_session=False) - _delete_records( - """select id from sites where app_id=:app_id limit 1000""", - {"app_id": app_id}, - del_site, - "site" - ) + _delete_records("""select id from sites where app_id=:app_id limit 1000""", {"app_id": app_id}, del_site, "site") def _delete_app_api_tokens(tenant_id: str, app_id: str): @@ -98,10 +94,7 @@ def del_api_token(api_token_id: str): db.session.query(ApiToken).filter(ApiToken.id == api_token_id).delete(synchronize_session=False) _delete_records( - """select id from api_tokens where app_id=:app_id limit 1000""", - {"app_id": app_id}, - del_api_token, - "api token" + """select id from api_tokens where app_id=:app_id limit 1000""", {"app_id": app_id}, del_api_token, "api token" ) @@ -113,44 +106,47 @@ def del_installed_app(installed_app_id: str): """select id from installed_apps where tenant_id=:tenant_id and app_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_installed_app, - "installed app" + "installed app", ) def _delete_recommended_apps(tenant_id: str, app_id: str): def del_recommended_app(recommended_app_id: str): db.session.query(RecommendedApp).filter(RecommendedApp.id == recommended_app_id).delete( - synchronize_session=False) + synchronize_session=False + ) _delete_records( """select id from recommended_apps where app_id=:app_id limit 1000""", {"app_id": app_id}, del_recommended_app, - "recommended app" + "recommended app", ) def _delete_app_annotation_data(tenant_id: str, app_id: str): def del_annotation_hit_history(annotation_hit_history_id: str): db.session.query(AppAnnotationHitHistory).filter( - AppAnnotationHitHistory.id == annotation_hit_history_id).delete(synchronize_session=False) + AppAnnotationHitHistory.id == annotation_hit_history_id + ).delete(synchronize_session=False) _delete_records( """select id from app_annotation_hit_histories where app_id=:app_id limit 1000""", {"app_id": app_id}, del_annotation_hit_history, - "annotation hit history" + "annotation hit history", ) def del_annotation_setting(annotation_setting_id: str): db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.id == annotation_setting_id).delete( - synchronize_session=False) + synchronize_session=False + ) _delete_records( """select id from app_annotation_settings where app_id=:app_id limit 1000""", {"app_id": app_id}, del_annotation_setting, - "annotation setting" + "annotation setting", ) @@ -162,7 +158,7 @@ def del_dataset_join(dataset_join_id: str): """select id from app_dataset_joins where app_id=:app_id limit 1000""", {"app_id": app_id}, del_dataset_join, - "dataset join" + "dataset join", ) @@ -174,7 +170,7 @@ def del_workflow(workflow_id: str): """select id from workflows where tenant_id=:tenant_id and app_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_workflow, - "workflow" + "workflow", ) @@ -186,89 +182,93 @@ def del_workflow_run(workflow_run_id: str): """select id from workflow_runs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_workflow_run, - "workflow run" + "workflow run", ) def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): def del_workflow_node_execution(workflow_node_execution_id: str): - db.session.query(WorkflowNodeExecution).filter( - WorkflowNodeExecution.id == workflow_node_execution_id).delete(synchronize_session=False) + db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution_id).delete( + synchronize_session=False + ) _delete_records( """select id from workflow_node_executions where tenant_id=:tenant_id and app_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_workflow_node_execution, - "workflow node execution" + "workflow node execution", ) def _delete_app_workflow_app_logs(tenant_id: str, app_id: str): def del_workflow_app_log(workflow_app_log_id: str): - db.session.query(WorkflowAppLog).filter(WorkflowAppLog.id == workflow_app_log_id).delete(synchronize_session=False) + db.session.query(WorkflowAppLog).filter(WorkflowAppLog.id == workflow_app_log_id).delete( + synchronize_session=False + ) _delete_records( """select id from workflow_app_logs where tenant_id=:tenant_id and app_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_workflow_app_log, - "workflow app log" + "workflow app log", ) def _delete_app_conversations(tenant_id: str, app_id: str): def del_conversation(conversation_id: str): db.session.query(PinnedConversation).filter(PinnedConversation.conversation_id == conversation_id).delete( - synchronize_session=False) + synchronize_session=False + ) db.session.query(Conversation).filter(Conversation.id == conversation_id).delete(synchronize_session=False) _delete_records( """select id from conversations where app_id=:app_id limit 1000""", {"app_id": app_id}, del_conversation, - "conversation" + "conversation", ) + def _delete_conversation_variables(*, app_id: str): stmt = delete(ConversationVariable).where(ConversationVariable.app_id == app_id) with db.engine.connect() as conn: conn.execute(stmt) conn.commit() - logging.info(click.style(f"Deleted conversation variables for app {app_id}", fg='green')) + logging.info(click.style(f"Deleted conversation variables for app {app_id}", fg="green")) def _delete_app_messages(tenant_id: str, app_id: str): def del_message(message_id: str): db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message_id).delete( - synchronize_session=False) + synchronize_session=False + ) db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message_id).delete( - synchronize_session=False) - db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete( - synchronize_session=False) + synchronize_session=False + ) + db.session.query(MessageChain).filter(MessageChain.message_id == message_id).delete(synchronize_session=False) db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message_id).delete( - synchronize_session=False) + synchronize_session=False + ) db.session.query(MessageFile).filter(MessageFile.message_id == message_id).delete(synchronize_session=False) - db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete( - synchronize_session=False) + db.session.query(SavedMessage).filter(SavedMessage.message_id == message_id).delete(synchronize_session=False) db.session.query(Message).filter(Message.id == message_id).delete() _delete_records( - """select id from messages where app_id=:app_id limit 1000""", - {"app_id": app_id}, - del_message, - "message" + """select id from messages where app_id=:app_id limit 1000""", {"app_id": app_id}, del_message, "message" ) def _delete_workflow_tool_providers(tenant_id: str, app_id: str): def del_tool_provider(tool_provider_id: str): db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.id == tool_provider_id).delete( - synchronize_session=False) + synchronize_session=False + ) _delete_records( """select id from tool_workflow_providers where tenant_id=:tenant_id and app_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_tool_provider, - "tool workflow provider" + "tool workflow provider", ) @@ -280,7 +280,7 @@ def del_tag_binding(tag_binding_id: str): """select id from tag_bindings where tenant_id=:tenant_id and target_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_tag_binding, - "tag binding" + "tag binding", ) @@ -292,20 +292,21 @@ def del_end_user(end_user_id: str): """select id from end_users where tenant_id=:tenant_id and app_id=:app_id limit 1000""", {"tenant_id": tenant_id, "app_id": app_id}, del_end_user, - "end user" + "end user", ) def _delete_trace_app_configs(tenant_id: str, app_id: str): def del_trace_app_config(trace_app_config_id: str): db.session.query(TraceAppConfig).filter(TraceAppConfig.id == trace_app_config_id).delete( - synchronize_session=False) + synchronize_session=False + ) _delete_records( """select id from trace_app_config where app_id=:app_id limit 1000""", {"app_id": app_id}, del_trace_app_config, - "trace app config" + "trace app config", ) @@ -321,7 +322,7 @@ def _delete_records(query_sql: str, params: dict, delete_func: Callable, name: s try: delete_func(record_id) db.session.commit() - logging.info(click.style(f"Deleted {name} {record_id}", fg='green')) + logging.info(click.style(f"Deleted {name} {record_id}", fg="green")) except Exception: logging.exception(f"Error occurred while deleting {name} {record_id}") continue diff --git a/api/tasks/remove_document_from_index_task.py b/api/tasks/remove_document_from_index_task.py index cff8dddc53c630..1909eaf3418517 100644 --- a/api/tasks/remove_document_from_index_task.py +++ b/api/tasks/remove_document_from_index_task.py @@ -11,7 +11,7 @@ from models.dataset import Document, DocumentSegment -@shared_task(queue='dataset') +@shared_task(queue="dataset") def remove_document_from_index_task(document_id: str): """ Async Remove document from index @@ -19,23 +19,23 @@ def remove_document_from_index_task(document_id: str): Usage: remove_document_from_index.delay(document_id) """ - logging.info(click.style('Start remove document segments from index: {}'.format(document_id), fg='green')) + logging.info(click.style("Start remove document segments from index: {}".format(document_id), fg="green")) start_at = time.perf_counter() document = db.session.query(Document).filter(Document.id == document_id).first() if not document: - raise NotFound('Document not found') + raise NotFound("Document not found") - if document.indexing_status != 'completed': + if document.indexing_status != "completed": return - indexing_cache_key = 'document_{}_indexing'.format(document.id) + indexing_cache_key = "document_{}_indexing".format(document.id) try: dataset = document.dataset if not dataset: - raise Exception('Document has no dataset') + raise Exception("Document has no dataset") index_processor = IndexProcessorFactory(document.doc_form).init_index_processor() @@ -49,7 +49,10 @@ def remove_document_from_index_task(document_id: str): end_at = time.perf_counter() logging.info( - click.style('Document removed from index: {} latency: {}'.format(document.id, end_at - start_at), fg='green')) + click.style( + "Document removed from index: {} latency: {}".format(document.id, end_at - start_at), fg="green" + ) + ) except Exception: logging.exception("remove document from index failed") if not document.archived: diff --git a/api/tasks/retry_document_indexing_task.py b/api/tasks/retry_document_indexing_task.py index 1114809b3019a1..73471fd6e77c9b 100644 --- a/api/tasks/retry_document_indexing_task.py +++ b/api/tasks/retry_document_indexing_task.py @@ -13,7 +13,7 @@ from services.feature_service import FeatureService -@shared_task(queue='dataset') +@shared_task(queue="dataset") def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): """ Async process document @@ -27,22 +27,23 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() for document_id in document_ids: - retry_indexing_cache_key = 'document_{}_is_retried'.format(document_id) + retry_indexing_cache_key = "document_{}_is_retried".format(document_id) # check document limit features = FeatureService.get_features(dataset.tenant_id) try: if features.billing.enabled: vector_space = features.vector_space if 0 < vector_space.limit <= vector_space.size: - raise ValueError("Your total number of documents plus the number of uploads have over the limit of " - "your subscription.") + raise ValueError( + "Your total number of documents plus the number of uploads have over the limit of " + "your subscription." + ) except Exception as e: - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = ( + db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() + ) if document: - document.indexing_status = 'error' + document.indexing_status = "error" document.error = str(e) document.stopped_at = datetime.datetime.utcnow() db.session.add(document) @@ -50,11 +51,10 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): redis_client.delete(retry_indexing_cache_key) return - logging.info(click.style('Start retry document: {}'.format(document_id), fg='green')) - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + logging.info(click.style("Start retry document: {}".format(document_id), fg="green")) + document = ( + db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() + ) try: if document: # clean old data @@ -70,7 +70,7 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): db.session.delete(segment) db.session.commit() - document.indexing_status = 'parsing' + document.indexing_status = "parsing" document.processing_started_at = datetime.datetime.utcnow() db.session.add(document) db.session.commit() @@ -79,13 +79,13 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): indexing_runner.run([document]) redis_client.delete(retry_indexing_cache_key) except Exception as ex: - document.indexing_status = 'error' + document.indexing_status = "error" document.error = str(ex) document.stopped_at = datetime.datetime.utcnow() db.session.add(document) db.session.commit() - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) redis_client.delete(retry_indexing_cache_key) pass end_at = time.perf_counter() - logging.info(click.style('Retry dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green')) + logging.info(click.style("Retry dataset: {} latency: {}".format(dataset_id, end_at - start_at), fg="green")) diff --git a/api/tasks/sync_website_document_indexing_task.py b/api/tasks/sync_website_document_indexing_task.py index 320da8718a12cb..99fb66e1f3d1cd 100644 --- a/api/tasks/sync_website_document_indexing_task.py +++ b/api/tasks/sync_website_document_indexing_task.py @@ -13,7 +13,7 @@ from services.feature_service import FeatureService -@shared_task(queue='dataset') +@shared_task(queue="dataset") def sync_website_document_indexing_task(dataset_id: str, document_id: str): """ Async process document @@ -26,22 +26,23 @@ def sync_website_document_indexing_task(dataset_id: str, document_id: str): dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() - sync_indexing_cache_key = 'document_{}_is_sync'.format(document_id) + sync_indexing_cache_key = "document_{}_is_sync".format(document_id) # check document limit features = FeatureService.get_features(dataset.tenant_id) try: if features.billing.enabled: vector_space = features.vector_space if 0 < vector_space.limit <= vector_space.size: - raise ValueError("Your total number of documents plus the number of uploads have over the limit of " - "your subscription.") + raise ValueError( + "Your total number of documents plus the number of uploads have over the limit of " + "your subscription." + ) except Exception as e: - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + document = ( + db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() + ) if document: - document.indexing_status = 'error' + document.indexing_status = "error" document.error = str(e) document.stopped_at = datetime.datetime.utcnow() db.session.add(document) @@ -49,11 +50,8 @@ def sync_website_document_indexing_task(dataset_id: str, document_id: str): redis_client.delete(sync_indexing_cache_key) return - logging.info(click.style('Start sync website document: {}'.format(document_id), fg='green')) - document = db.session.query(Document).filter( - Document.id == document_id, - Document.dataset_id == dataset_id - ).first() + logging.info(click.style("Start sync website document: {}".format(document_id), fg="green")) + document = db.session.query(Document).filter(Document.id == document_id, Document.dataset_id == dataset_id).first() try: if document: # clean old data @@ -69,7 +67,7 @@ def sync_website_document_indexing_task(dataset_id: str, document_id: str): db.session.delete(segment) db.session.commit() - document.indexing_status = 'parsing' + document.indexing_status = "parsing" document.processing_started_at = datetime.datetime.utcnow() db.session.add(document) db.session.commit() @@ -78,13 +76,13 @@ def sync_website_document_indexing_task(dataset_id: str, document_id: str): indexing_runner.run([document]) redis_client.delete(sync_indexing_cache_key) except Exception as ex: - document.indexing_status = 'error' + document.indexing_status = "error" document.error = str(ex) document.stopped_at = datetime.datetime.utcnow() db.session.add(document) db.session.commit() - logging.info(click.style(str(ex), fg='yellow')) + logging.info(click.style(str(ex), fg="yellow")) redis_client.delete(sync_indexing_cache_key) pass end_at = time.perf_counter() - logging.info(click.style('Sync document: {} latency: {}'.format(document_id, end_at - start_at), fg='green')) + logging.info(click.style("Sync document: {} latency: {}".format(document_id, end_at - start_at), fg="green"))