diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 276afba0d13..7827fd054b3 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -3,7 +3,6 @@ import time from typing import Any -import requests import sentry_sdk from celery import Task from celery.app import trace @@ -23,6 +22,7 @@ from onyx.background.celery.celery_utils import celery_is_worker_primary from onyx.configs.constants import OnyxRedisLocks from onyx.db.engine import get_sqlalchemy_engine +from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client from onyx.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair @@ -262,7 +262,8 @@ def wait_for_vespa(sender: Any, **kwargs: Any) -> None: logger.info("Vespa: Readiness probe starting.") while True: try: - response = requests.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health") + client = get_vespa_http_client() + response = client.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health") response.raise_for_status() response_dict = response.json() diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 6e372f75f97..8ebf6d90c0c 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -13,7 +13,6 @@ from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import fetch_versioned_implementation from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST -from shared_configs.configs import MULTI_TENANT logger = setup_logger(__name__) @@ -154,10 +153,6 @@ def on_beat_init(sender: Any, **kwargs: Any) -> None: SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME) SqlEngine.init_engine(pool_size=2, max_overflow=0) - # Startup checks are not needed in multi-tenant case - if MULTI_TENANT: - return - app_base.wait_for_redis(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index efbbf64fda5..f45e6df9aa4 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -61,13 +61,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) SqlEngine.init_engine(pool_size=4, max_overflow=12) - # Startup checks are not needed in multi-tenant case - if MULTI_TENANT: - return - app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) app_base.wait_for_vespa(sender, **kwargs) + + # Less startup checks in multi-tenant case + if MULTI_TENANT: + return + app_base.on_secondary_worker_init(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 466964c7a5a..d2cc42e18e2 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -62,13 +62,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=sender.concurrency) - # Startup checks are not needed in multi-tenant case - if MULTI_TENANT: - return - app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) app_base.wait_for_vespa(sender, **kwargs) + + # Less startup checks in multi-tenant case + if MULTI_TENANT: + return + app_base.on_secondary_worker_init(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index cf156f05f0b..e6567b14770 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -60,13 +60,15 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) - # Startup checks are not needed in multi-tenant case - if MULTI_TENANT: - return app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) app_base.wait_for_vespa(sender, **kwargs) + + # Less startup checks in multi-tenant case + if MULTI_TENANT: + return + app_base.on_secondary_worker_init(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 48d60b801b2..5a3b61552f9 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -84,14 +84,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) SqlEngine.init_engine(pool_size=8, max_overflow=0) - # Startup checks are not needed in multi-tenant case - if MULTI_TENANT: - return - app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) app_base.wait_for_vespa(sender, **kwargs) + # Less startup checks in multi-tenant case + if MULTI_TENANT: + return + logger.info("Running as the primary celery worker.") # This is singleton work that should be done on startup exactly once diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 62be3e609ad..26ff7d21127 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -29,7 +29,6 @@ from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks -from onyx.configs.constants import OnyxRedisSignals from onyx.db.connector import mark_ccpair_with_indexing_trigger from onyx.db.connector_credential_pair import fetch_connector_credential_pairs from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id @@ -176,7 +175,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: # we need to use celery's redis client to access its redis data # (which lives on a different db number) - redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore + # redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore lock_beat: RedisLock = redis_client.lock( OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK, @@ -319,20 +318,23 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: attempt.id, db_session, failure_reason=failure_reason ) - # we want to run this less frequently than the overall task - if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES): - # clear any indexing fences that don't have associated celery tasks in progress - # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), - # or be currently executing - try: - task_logger.info("Validating indexing fences...") - validate_indexing_fences( - tenant_id, self.app, redis_client, redis_client_celery, lock_beat - ) - except Exception: - task_logger.exception("Exception while validating indexing fences") + # rkuo: The following code logically appears to work, but the celery inspect code may be unstable + # turning off for the moment to see if it helps cloud stability - redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60) + # we want to run this less frequently than the overall task + # if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES): + # # clear any indexing fences that don't have associated celery tasks in progress + # # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), + # # or be currently executing + # try: + # task_logger.info("Validating indexing fences...") + # validate_indexing_fences( + # tenant_id, self.app, redis_client, redis_client_celery, lock_beat + # ) + # except Exception: + # task_logger.exception("Exception while validating indexing fences") + + # redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60) except SoftTimeLimitExceeded: task_logger.info( diff --git a/backend/onyx/document_index/vespa/index.py b/backend/onyx/document_index/vespa/index.py index 09455cff24a..1b7478f8cd3 100644 --- a/backend/onyx/document_index/vespa/index.py +++ b/backend/onyx/document_index/vespa/index.py @@ -535,7 +535,7 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int: if self.secondary_index_name: index_names.append(self.secondary_index_name) - with get_vespa_http_client() as http_client: + with get_vespa_http_client(http2=False) as http_client: for index_name in index_names: params = httpx.QueryParams( { @@ -546,8 +546,12 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int: while True: try: + vespa_url = ( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}" + ) + logger.debug(f'update_single PUT on URL "{vespa_url}"') resp = http_client.put( - f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}", + vespa_url, params=params, headers={"Content-Type": "application/json"}, json=update_dict, @@ -619,7 +623,7 @@ def delete_single(self, doc_id: str) -> int: if self.secondary_index_name: index_names.append(self.secondary_index_name) - with get_vespa_http_client() as http_client: + with get_vespa_http_client(http2=False) as http_client: for index_name in index_names: params = httpx.QueryParams( { @@ -630,8 +634,12 @@ def delete_single(self, doc_id: str) -> int: while True: try: + vespa_url = ( + f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}" + ) + logger.debug(f'delete_single DELETE on URL "{vespa_url}"') resp = http_client.delete( - f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}", + vespa_url, params=params, ) resp.raise_for_status() diff --git a/backend/onyx/document_index/vespa/shared_utils/utils.py b/backend/onyx/document_index/vespa/shared_utils/utils.py index 175b2afa9cc..da0f5c29819 100644 --- a/backend/onyx/document_index/vespa/shared_utils/utils.py +++ b/backend/onyx/document_index/vespa/shared_utils/utils.py @@ -55,7 +55,7 @@ def remove_invalid_unicode_chars(text: str) -> str: return _illegal_xml_chars_RE.sub("", text) -def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client: +def get_vespa_http_client(no_timeout: bool = False, http2: bool = True) -> httpx.Client: """ Configure and return an HTTP client for communicating with Vespa, including authentication if needed. @@ -67,5 +67,5 @@ def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client: else None, verify=False if not MANAGED_VESPA else True, timeout=None if no_timeout else VESPA_REQUEST_TIMEOUT, - http2=True, + http2=http2, )