Skip to content

Commit

Permalink
temporarily disabling validate indexing fences (#3502)
Browse files Browse the repository at this point in the history
* temporarily disabling validate indexing fences

* add back a few startup checks in the cloud

* use common vespa client to perform health check

* log vespa url and try using http1 on light worker index methods

---------

Co-authored-by: Richard Kuo <[email protected]>
Co-authored-by: Richard Kuo (Danswer) <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent a0fa4ad commit e9b10e8
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 43 deletions.
5 changes: 3 additions & 2 deletions backend/onyx/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
from typing import Any

import requests
import sentry_sdk
from celery import Task
from celery.app import trace
Expand All @@ -23,6 +22,7 @@
from onyx.background.celery.celery_utils import celery_is_worker_primary
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_sqlalchemy_engine
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
Expand Down Expand Up @@ -262,7 +262,8 @@ def wait_for_vespa(sender: Any, **kwargs: Any) -> None:
logger.info("Vespa: Readiness probe starting.")
while True:
try:
response = requests.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health")
client = get_vespa_http_client()
response = client.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health")
response.raise_for_status()

response_dict = response.json()
Expand Down
5 changes: 0 additions & 5 deletions backend/onyx/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_versioned_implementation
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
from shared_configs.configs import MULTI_TENANT

logger = setup_logger(__name__)

Expand Down Expand Up @@ -154,10 +153,6 @@ def on_beat_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
SqlEngine.init_engine(pool_size=2, max_overflow=0)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)


Expand Down
9 changes: 5 additions & 4 deletions backend/onyx/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
SqlEngine.init_engine(pool_size=4, max_overflow=12)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
9 changes: 5 additions & 4 deletions backend/onyx/background/celery/apps/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=sender.concurrency)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
8 changes: 5 additions & 3 deletions backend/onyx/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
8 changes: 4 additions & 4 deletions backend/onyx/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

logger.info("Running as the primary celery worker.")

# This is singleton work that should be done on startup exactly once
Expand Down
32 changes: 17 additions & 15 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.db.connector import mark_ccpair_with_indexing_trigger
from onyx.db.connector_credential_pair import fetch_connector_credential_pairs
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
Expand Down Expand Up @@ -176,7 +175,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:

# we need to use celery's redis client to access its redis data
# (which lives on a different db number)
redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore
# redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore

lock_beat: RedisLock = redis_client.lock(
OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK,
Expand Down Expand Up @@ -319,20 +318,23 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
attempt.id, db_session, failure_reason=failure_reason
)

# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
# clear any indexing fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
task_logger.info("Validating indexing fences...")
validate_indexing_fences(
tenant_id, self.app, redis_client, redis_client_celery, lock_beat
)
except Exception:
task_logger.exception("Exception while validating indexing fences")
# rkuo: The following code logically appears to work, but the celery inspect code may be unstable
# turning off for the moment to see if it helps cloud stability

redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)
# we want to run this less frequently than the overall task
# if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
# # clear any indexing fences that don't have associated celery tasks in progress
# # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# # or be currently executing
# try:
# task_logger.info("Validating indexing fences...")
# validate_indexing_fences(
# tenant_id, self.app, redis_client, redis_client_celery, lock_beat
# )
# except Exception:
# task_logger.exception("Exception while validating indexing fences")

# redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)

except SoftTimeLimitExceeded:
task_logger.info(
Expand Down
16 changes: 12 additions & 4 deletions backend/onyx/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

with get_vespa_http_client() as http_client:
with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
params = httpx.QueryParams(
{
Expand All @@ -546,8 +546,12 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:

while True:
try:
vespa_url = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}"
)
logger.debug(f'update_single PUT on URL "{vespa_url}"')
resp = http_client.put(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}",
vespa_url,
params=params,
headers={"Content-Type": "application/json"},
json=update_dict,
Expand Down Expand Up @@ -619,7 +623,7 @@ def delete_single(self, doc_id: str) -> int:
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

with get_vespa_http_client() as http_client:
with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
params = httpx.QueryParams(
{
Expand All @@ -630,8 +634,12 @@ def delete_single(self, doc_id: str) -> int:

while True:
try:
vespa_url = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}"
)
logger.debug(f'delete_single DELETE on URL "{vespa_url}"')
resp = http_client.delete(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}",
vespa_url,
params=params,
)
resp.raise_for_status()
Expand Down
4 changes: 2 additions & 2 deletions backend/onyx/document_index/vespa/shared_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def remove_invalid_unicode_chars(text: str) -> str:
return _illegal_xml_chars_RE.sub("", text)


def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client:
def get_vespa_http_client(no_timeout: bool = False, http2: bool = True) -> httpx.Client:
"""
Configure and return an HTTP client for communicating with Vespa,
including authentication if needed.
Expand All @@ -67,5 +67,5 @@ def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client:
else None,
verify=False if not MANAGED_VESPA else True,
timeout=None if no_timeout else VESPA_REQUEST_TIMEOUT,
http2=True,
http2=http2,
)

0 comments on commit e9b10e8

Please sign in to comment.