Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/danswer-ai/danswer into bug…
Browse files Browse the repository at this point in the history
…fix/oauth_fix
  • Loading branch information
Richard Kuo (Danswer) committed Dec 19, 2024
2 parents 2f6974e + f83e7bf commit 4f3beb6
Show file tree
Hide file tree
Showing 32 changed files with 219 additions and 94 deletions.
28 changes: 22 additions & 6 deletions backend/ee/onyx/utils/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from posthog import Posthog

from ee.onyx.configs.app_configs import POSTHOG_API_KEY
Expand All @@ -6,13 +8,27 @@

logger = setup_logger()

posthog = Posthog(project_api_key=POSTHOG_API_KEY, host=POSTHOG_HOST)

def posthog_on_error(error: Any, items: Any) -> None:
"""Log any PostHog delivery errors."""
logger.error(f"PostHog error: {error}, items: {items}")


posthog = Posthog(
project_api_key=POSTHOG_API_KEY,
host=POSTHOG_HOST,
debug=True,
on_error=posthog_on_error,
)


def event_telemetry(
distinct_id: str,
event: str,
properties: dict | None = None,
distinct_id: str, event: str, properties: dict | None = None
) -> None:
logger.info(f"Capturing Posthog event: {distinct_id} {event} {properties}")
posthog.capture(distinct_id, event, properties)
"""Capture and send an event to PostHog, flushing immediately."""
logger.info(f"Capturing PostHog event: {distinct_id} {event} {properties}")
try:
posthog.capture(distinct_id, event, properties)
posthog.flush()
except Exception as e:
logger.error(f"Error capturing PostHog event: {e}")
6 changes: 6 additions & 0 deletions backend/onyx/auth/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
Expand Down Expand Up @@ -228,6 +229,11 @@ async def create(
safe: bool = False,
request: Optional[Request] = None,
) -> User:
# We verify the password here to make sure it's valid before we proceed
await self.validate_password(
user_create.password, cast(schemas.UC, user_create)
)

user_count: int | None = None
referral_source = (
request.cookies.get("referral_source", None)
Expand Down
5 changes: 3 additions & 2 deletions backend/onyx/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
from typing import Any

import requests
import sentry_sdk
from celery import Task
from celery.app import trace
Expand All @@ -23,6 +22,7 @@
from onyx.background.celery.celery_utils import celery_is_worker_primary
from onyx.configs.constants import OnyxRedisLocks
from onyx.db.engine import get_sqlalchemy_engine
from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client
from onyx.document_index.vespa_constants import VESPA_CONFIG_SERVER_URL
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
Expand Down Expand Up @@ -262,7 +262,8 @@ def wait_for_vespa(sender: Any, **kwargs: Any) -> None:
logger.info("Vespa: Readiness probe starting.")
while True:
try:
response = requests.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health")
client = get_vespa_http_client()
response = client.get(f"{VESPA_CONFIG_SERVER_URL}/state/v1/health")
response.raise_for_status()

response_dict = response.json()
Expand Down
5 changes: 0 additions & 5 deletions backend/onyx/background/celery/apps/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from onyx.utils.logger import setup_logger
from onyx.utils.variable_functionality import fetch_versioned_implementation
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
from shared_configs.configs import MULTI_TENANT

logger = setup_logger(__name__)

Expand Down Expand Up @@ -154,10 +153,6 @@ def on_beat_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
SqlEngine.init_engine(pool_size=2, max_overflow=0)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)


Expand Down
9 changes: 5 additions & 4 deletions backend/onyx/background/celery/apps/heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
SqlEngine.init_engine(pool_size=4, max_overflow=12)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
9 changes: 5 additions & 4 deletions backend/onyx/background/celery/apps/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=sender.concurrency)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
8 changes: 5 additions & 3 deletions backend/onyx/background/celery/apps/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

app_base.on_secondary_worker_init(sender, **kwargs)


Expand Down
8 changes: 4 additions & 4 deletions backend/onyx/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)

# Startup checks are not needed in multi-tenant case
if MULTI_TENANT:
return

app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)
app_base.wait_for_vespa(sender, **kwargs)

# Less startup checks in multi-tenant case
if MULTI_TENANT:
return

logger.info("Running as the primary celery worker.")

# This is singleton work that should be done on startup exactly once
Expand Down
32 changes: 17 additions & 15 deletions backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from onyx.configs.constants import OnyxCeleryQueues
from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.db.connector import mark_ccpair_with_indexing_trigger
from onyx.db.connector_credential_pair import fetch_connector_credential_pairs
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
Expand Down Expand Up @@ -176,7 +175,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:

# we need to use celery's redis client to access its redis data
# (which lives on a different db number)
redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore
# redis_client_celery: Redis = self.app.broker_connection().channel().client # type: ignore

lock_beat: RedisLock = redis_client.lock(
OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK,
Expand Down Expand Up @@ -319,20 +318,23 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
attempt.id, db_session, failure_reason=failure_reason
)

# we want to run this less frequently than the overall task
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
# clear any indexing fences that don't have associated celery tasks in progress
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# or be currently executing
try:
task_logger.info("Validating indexing fences...")
validate_indexing_fences(
tenant_id, self.app, redis_client, redis_client_celery, lock_beat
)
except Exception:
task_logger.exception("Exception while validating indexing fences")
# rkuo: The following code logically appears to work, but the celery inspect code may be unstable
# turning off for the moment to see if it helps cloud stability

redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)
# we want to run this less frequently than the overall task
# if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
# # clear any indexing fences that don't have associated celery tasks in progress
# # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
# # or be currently executing
# try:
# task_logger.info("Validating indexing fences...")
# validate_indexing_fences(
# tenant_id, self.app, redis_client, redis_client_celery, lock_beat
# )
# except Exception:
# task_logger.exception("Exception while validating indexing fences")

# redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)

except SoftTimeLimitExceeded:
task_logger.info(
Expand Down
2 changes: 2 additions & 0 deletions backend/onyx/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

DEFAULT_PERSONA_ID = 0

DEFAULT_CC_PAIR_ID = 1

# Postgres connection constants for application_name
POSTGRES_WEB_APP_NAME = "web"
POSTGRES_INDEXER_APP_NAME = "indexer"
Expand Down
3 changes: 3 additions & 0 deletions backend/onyx/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ def associate_default_cc_pair(db_session: Session) -> None:
if existing_association is not None:
return

# DefaultCCPair has id 1 since it is the first CC pair created
# It is DEFAULT_CC_PAIR_ID, but can't set it explicitly because it messed with the
# auto-incrementing id
association = ConnectorCredentialPair(
connector_id=0,
credential_id=0,
Expand Down
16 changes: 12 additions & 4 deletions backend/onyx/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

with get_vespa_http_client() as http_client:
with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
params = httpx.QueryParams(
{
Expand All @@ -546,8 +546,12 @@ def update_single(self, doc_id: str, fields: VespaDocumentFields) -> int:

while True:
try:
vespa_url = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}"
)
logger.debug(f'update_single PUT on URL "{vespa_url}"')
resp = http_client.put(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}",
vespa_url,
params=params,
headers={"Content-Type": "application/json"},
json=update_dict,
Expand Down Expand Up @@ -619,7 +623,7 @@ def delete_single(self, doc_id: str) -> int:
if self.secondary_index_name:
index_names.append(self.secondary_index_name)

with get_vespa_http_client() as http_client:
with get_vespa_http_client(http2=False) as http_client:
for index_name in index_names:
params = httpx.QueryParams(
{
Expand All @@ -630,8 +634,12 @@ def delete_single(self, doc_id: str) -> int:

while True:
try:
vespa_url = (
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}"
)
logger.debug(f'delete_single DELETE on URL "{vespa_url}"')
resp = http_client.delete(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}",
vespa_url,
params=params,
)
resp.raise_for_status()
Expand Down
4 changes: 2 additions & 2 deletions backend/onyx/document_index/vespa/shared_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def remove_invalid_unicode_chars(text: str) -> str:
return _illegal_xml_chars_RE.sub("", text)


def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client:
def get_vespa_http_client(no_timeout: bool = False, http2: bool = True) -> httpx.Client:
"""
Configure and return an HTTP client for communicating with Vespa,
including authentication if needed.
Expand All @@ -67,5 +67,5 @@ def get_vespa_http_client(no_timeout: bool = False) -> httpx.Client:
else None,
verify=False if not MANAGED_VESPA else True,
timeout=None if no_timeout else VESPA_REQUEST_TIMEOUT,
http2=True,
http2=http2,
)
4 changes: 3 additions & 1 deletion backend/onyx/llm/chat_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,9 @@ def _stream_implementation(
if LOG_DANSWER_MODEL_INTERACTIONS:
self.log_model_configs()

if DISABLE_LITELLM_STREAMING:
if (
DISABLE_LITELLM_STREAMING or self.config.model_name == "o1-2024-12-17"
): # TODO: remove once litellm supports streaming
yield self.invoke(prompt, tools, tool_choice, structured_response_format)
return

Expand Down
1 change: 1 addition & 0 deletions backend/onyx/llm/llm_provider_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class WellKnownLLMProviderDescriptor(BaseModel):
OPEN_AI_MODEL_NAMES = [
"o1-mini",
"o1-preview",
"o1-2024-12-17",
"gpt-4",
"gpt-4o",
"gpt-4o-mini",
Expand Down
3 changes: 2 additions & 1 deletion backend/onyx/server/documents/cc_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ def associate_credential_to_connector(
)

return response
except IntegrityError:
except IntegrityError as e:
logger.error(f"IntegrityError: {e}")
raise HTTPException(status_code=400, detail="Name must be unique")


Expand Down
3 changes: 2 additions & 1 deletion backend/onyx/server/onyx_api/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from sqlalchemy.orm import Session

from onyx.auth.users import api_key_dep
from onyx.configs.constants import DEFAULT_CC_PAIR_ID
from onyx.configs.constants import DocumentSource
from onyx.connectors.models import Document
from onyx.connectors.models import IndexAttemptMetadata
Expand Down Expand Up @@ -79,7 +80,7 @@ def upsert_ingestion_doc(
document.source = DocumentSource.FILE

cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=doc_info.cc_pair_id or 0, db_session=db_session
cc_pair_id=doc_info.cc_pair_id or DEFAULT_CC_PAIR_ID, db_session=db_session
)
if cc_pair is None:
raise HTTPException(
Expand Down
2 changes: 1 addition & 1 deletion backend/requirements/default.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trafilatura==1.12.2
langchain==0.1.17
langchain-core==0.1.50
langchain-text-splitters==0.0.1
litellm==1.54.1
litellm==1.55.4
lxml==5.3.0
lxml_html_clean==0.2.2
llama-index==0.9.45
Expand Down
2 changes: 1 addition & 1 deletion backend/requirements/model_server.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ torch==2.2.0
transformers==4.39.2
uvicorn==0.21.1
voyageai==0.2.3
litellm==1.54.1
litellm==1.55.4
sentry-sdk[fastapi,celery,starlette]==2.14.0
Loading

0 comments on commit 4f3beb6

Please sign in to comment.