Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple cloud/indexing fixes #3609

Merged
merged 9 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ def check_for_connector_deletion_task(
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
# collect cc_pair_ids
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
# get all cc pairs that need to be synced
cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
cc_pair_ids_to_sync: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pairs = get_all_auto_sync_cc_pairs(db_session)
Expand Down
62 changes: 53 additions & 9 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from onyx.redis.redis_connector_index import RedisConnectorIndexPayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import global_version
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
Expand Down Expand Up @@ -204,6 +205,10 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[
def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
"""a lightweight task used to kick off indexing tasks.
Occcasionally does some validation of existing state to clear up error conditions"""
debug_tenants = {
"tenant_i-043470d740845ec56",
"tenant_82b497ce-88aa-4fbd-841a-92cae43529c8",
}
time_start = time.monotonic()

tasks_created = 0
Expand All @@ -219,11 +224,11 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
locked = True

# check for search settings swap
Expand All @@ -246,15 +251,25 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
)

# gather cc_pair_ids
lock_beat.reacquire()
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
lock_beat.reacquire()
cc_pairs = fetch_connector_credential_pairs(db_session)
for cc_pair_entry in cc_pairs:
cc_pair_ids.append(cc_pair_entry.id)

# kick off index attempts
for cc_pair_id in cc_pair_ids:
# debugging logic - remove after we're done
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing cc_pair lock: "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"ttl={ttl}"
)

lock_beat.reacquire()

redis_connector = RedisConnector(tenant_id, cc_pair_id)
Expand Down Expand Up @@ -331,14 +346,33 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
)
tasks_created += 1

# debugging logic - remove after we're done
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing unfenced lock: "
f"tenant={tenant_id} "
f"ttl={ttl}"
)

lock_beat.reacquire()

# Fail any index attempts in the DB that don't have fences
# This shouldn't ever happen!
with get_session_with_tenant(tenant_id) as db_session:
lock_beat.reacquire()
unfenced_attempt_ids = get_unfenced_index_attempt_ids(
db_session, redis_client
)
for attempt_id in unfenced_attempt_ids:
# debugging logic - remove after we're done
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing unfenced attempt id lock: "
f"tenant={tenant_id} "
f"ttl={ttl}"
)

lock_beat.reacquire()

attempt = get_index_attempt(db_session, attempt_id)
Expand All @@ -356,9 +390,18 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
attempt.id, db_session, failure_reason=failure_reason
)

# debugging logic - remove after we're done
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing validate fences lock: "
f"tenant={tenant_id} "
f"ttl={ttl}"
)

lock_beat.reacquire()
# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
lock_beat.reacquire()
# clear any indexing fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
Expand All @@ -370,7 +413,6 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
task_logger.exception("Exception while validating indexing fences")

redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)

except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
Expand Down Expand Up @@ -405,7 +447,9 @@ def validate_indexing_fences(
)

# validate all existing indexing jobs
for key_bytes in r.scan_iter(RedisConnectorIndex.FENCE_PREFIX + "*"):
for key_bytes in r.scan_iter(
RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT
):
lock_beat.reacquire()
with get_session_with_tenant(tenant_id) as db_session:
validate_indexing_fence(
Expand Down
8 changes: 4 additions & 4 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)

try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
return None

try:
cc_pair_ids: list[int] = []
with get_session_with_tenant(tenant_id) as db_session:
cc_pairs = get_connector_credential_pairs(db_session)
Expand Down
Loading
Loading