Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cost tracking for otel_tracing #1569

Merged
merged 32 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6e400c8
cleaning up old PR
sfc-gh-pmardziel Oct 17, 2024
a09c3a2
adding cost tracking to dummy endpoint
sfc-gh-pmardziel Oct 17, 2024
00e1705
finishing dummy and starting bedrock
sfc-gh-pmardziel Oct 18, 2024
0add8e0
finish up
sfc-gh-pmardziel Oct 18, 2024
4b748b0
replacement for track_all_costs_tally
sfc-gh-pmardziel Oct 18, 2024
9180fd0
docs
sfc-gh-pmardziel Oct 18, 2024
9ef5106
Merge remote-tracking branch 'origin/main' into piotrm/otel-cost-trac…
sfc-gh-pmardziel Oct 21, 2024
b8627c2
fixing up dummy api
sfc-gh-pmardziel Oct 22, 2024
1abb615
added huggingface cost tracking for otel
sfc-gh-pmardziel Oct 22, 2024
69b0e2a
working on awaitables
sfc-gh-pmardziel Oct 23, 2024
54ddad3
docs
sfc-gh-pmardziel Oct 23, 2024
5a36084
debugging async issue
sfc-gh-pmardziel Oct 24, 2024
4f4f624
work
sfc-gh-pmardziel Oct 24, 2024
ca3b5eb
async parallel fixes
sfc-gh-pmardziel Oct 24, 2024
4e88f2c
nit
sfc-gh-pmardziel Oct 24, 2024
62f8556
Merge remote-tracking branch 'origin/main' into piotrm/otel-cost-trac…
sfc-gh-pmardziel Oct 24, 2024
5686e3d
fix optional imports handling with fromlist
sfc-gh-pmardziel Oct 25, 2024
e491efe
remove print
sfc-gh-pmardziel Oct 25, 2024
a340a67
remove another print
sfc-gh-pmardziel Oct 25, 2024
2835286
doc format
sfc-gh-pmardziel Oct 25, 2024
1d6c2e0
put snowflake env var check back in test_endpoints
sfc-gh-pmardziel Oct 25, 2024
a8ec165
remove context.run
sfc-gh-pmardziel Oct 25, 2024
66dd6b7
make feature private
sfc-gh-pmardziel Oct 25, 2024
67d9cb6
making more things private
sfc-gh-pmardziel Oct 25, 2024
e703b6c
Merge remote-tracking branch 'origin/main' into piotrm/otel-cost-trac…
sfc-gh-pmardziel Oct 25, 2024
5fcc28c
Merge remote-tracking branch 'origin/main' into piotrm/otel-cost-trac…
sfc-gh-pmardziel Oct 25, 2024
24f31e4
add type args
sfc-gh-pmardziel Oct 29, 2024
356bb2b
nits
sfc-gh-pmardziel Oct 29, 2024
0ac6f5a
Merge remote-tracking branch 'origin/main' into piotrm/otel-cost-trac…
sfc-gh-pmardziel Nov 4, 2024
5c73258
Merge remote-tracking branch 'origin/main' into piotrm/otel-cost-trac…
sfc-gh-pmardziel Nov 6, 2024
c9e2c13
addressing PR comments
sfc-gh-pmardziel Nov 6, 2024
c07ec92
typo
sfc-gh-pmardziel Nov 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 67 additions & 25 deletions examples/experimental/dummy_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
"source": [
"from trulens.core import TruSession\n",
"\n",
"TruSession().reset_database()"
"session = TruSession()\n",
"session.reset_database()\n",
"\n",
"session.experimental_enable_feature(\"otel_tracing\")"
]
},
{
Expand All @@ -75,7 +78,7 @@
" overloaded_prob=0.0,\n",
" rpm=10000,\n",
" alloc=0, # how much fake data to allocate during requests\n",
" delay=0.5,\n",
" delay=0.1,\n",
")\n",
"\n",
"provider_llm = DummyLLM(\n",
Expand All @@ -85,14 +88,14 @@
" overloaded_prob=0.0,\n",
" rpm=10000,\n",
" alloc=0, # how much fake data to allocate during requests\n",
" delay=0.5,\n",
" delay=0.1,\n",
")\n",
"\n",
"session = TruSession()\n",
"\n",
"session.reset_database()\n",
"\n",
"run_dashboard(session, force=True, _dev=base_dir)"
"run_dashboard(session, force=True, _dev=base_dir, port=8080)"
]
},
{
Expand All @@ -112,7 +115,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Simple Invocation"
"# Synchronous Invocation"
]
},
{
Expand All @@ -127,17 +130,46 @@
"# Create trulens wrapper:\n",
"ta = session.App(\n",
" ca,\n",
" app_name=\"DummyApp\",\n",
" app_version=\"simple invoke\",\n",
" app_name=\"synchronous invoke\",\n",
" feedbacks=[f_dummy1, f_dummy2, f_dummy3],\n",
")\n",
"\n",
"with ta as recorder:\n",
" res = ca.respond_to_query(\"hello there\")\n",
" print(res)\n",
"\n",
"for result in recorder.get().feedback_results_as_completed:\n",
" print(result)"
"print(recorder.get().cost)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Asynchronous Invocation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create custom app:\n",
"ca = DummyApp(delay=0.0, alloc=0)\n",
"\n",
"# Create trulens wrapper:\n",
"ta = session.App(\n",
" ca,\n",
" app_name=\"asynchronous invoke\",\n",
" feedbacks=[f_dummy1, f_dummy2, f_dummy3],\n",
")\n",
"\n",
"async with ta as recorder:\n",
" res = await ca.arespond_to_query(\"hello there\")\n",
"\n",
" print(res)\n",
"\n",
"print(recorder.get().cost)"
]
},
{
Expand All @@ -157,16 +189,23 @@
"\n",
"ta = session.App(\n",
" ca,\n",
" app_name=\"DummyApp\",\n",
" app_version=\"sequential invoke\",\n",
" app_name=\"sequential invoke\",\n",
" feedbacks=[f_dummy1, f_dummy2, f_dummy3],\n",
")\n",
"\n",
"for i in tqdm(range(10), desc=\"invoking app\"):\n",
" with ta as recorder:\n",
" ca.respond_to_query(f\"hello {i}\")\n",
"\n",
" rec = recorder.get()\n",
" assert rec is not None"
" assert rec is not None\n",
"\n",
" print(rec.record_id)\n",
"\n",
"for res in tqdm(\n",
" ta.wait_for_feedback_results(), desc=\"waiting for feedback results\"\n",
"):\n",
" print(res)"
]
},
{
Expand Down Expand Up @@ -236,13 +275,12 @@
"\n",
"ta = session.App(\n",
" ca,\n",
" app_name=\"DummyApp\",\n",
" app_version=\"sequential invoke with deferred feedback\",\n",
" feedbacks=[f_dummy1, f_dummy2],\n",
" app_name=\"sequential invoke with deferred feedback\",\n",
" feedbacks=[f_dummy1, f_dummy2, f_dummy3],\n",
" feedback_mode=\"deferred\",\n",
")\n",
"\n",
"for i in tqdm(range(100), desc=\"invoking app\"):\n",
"for i in tqdm(range(10), desc=\"invoking app\"):\n",
" with ta as recorder:\n",
" ca.respond_to_query(f\"hello {i}\")\n",
"\n",
Expand All @@ -266,14 +304,13 @@
"from threading import Thread # must be imported after trulens\n",
"\n",
"# Create custom app:\n",
"ca = DummyApp(delay=0.0, alloc=0, use_parallel=True)\n",
"ca = DummyApp(delay=0.1, alloc=0, use_parallel=True)\n",
"\n",
"# Create trulens wrapper:\n",
"ta = session.App(\n",
" ca,\n",
" app_name=\"DummyApp\",\n",
" app_version=\"threaded parallel invoke\",\n",
" feedbacks=[f_dummy1, f_dummy2],\n",
" app_name=\"threaded parallel invoke\",\n",
" feedbacks=[f_dummy1, f_dummy2, f_dummy3],\n",
")\n",
"\n",
"\n",
Expand Down Expand Up @@ -325,25 +362,30 @@
"source": [
"# Create custom app:\n",
"ca = DummyApp(\n",
" delay=1.0,\n",
" delay=0.1,\n",
" alloc=0,\n",
" use_parallel=True, # need to enable this for DummyApp to use tasks internally\n",
")\n",
"\n",
"# Create trulens wrapper:\n",
"ta = session.App(\n",
" ca,\n",
" app_name=\"customapp\",\n",
" app_version=\"async parallel invoke\",\n",
" feedbacks=[f_dummy1, f_dummy2],\n",
" # app_name=\"customapp\",\n",
" app_name=\"async parallel invoke\",\n",
" feedbacks=[f_dummy1, f_dummy2, f_dummy3],\n",
")\n",
"\n",
"\n",
"async def arun_query(q):\n",
" print(f\"starting {q}\")\n",
" async with ta as recorder:\n",
" print(\"awaiting respond\")\n",
" await ca.arespond_to_query(q)\n",
"\n",
" print(\"got respond\")\n",
"\n",
" print(\"out of context\")\n",
"\n",
" rec = recorder.get()\n",
" assert rec is not None\n",
"\n",
Expand All @@ -360,7 +402,7 @@
" t = loop.create_task(arun_query(f\"hello {i}\"))\n",
" tasks.append(t)\n",
"\n",
"for t in tqdm(\n",
"async for t in tqdm(\n",
" asyncio.as_completed(tasks), desc=\"awaiting task\", total=len(tasks)\n",
"): # have to use sync loop if python < 3.13\n",
" await t\n",
Expand Down
2 changes: 1 addition & 1 deletion examples/experimental/export_dummy_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"source": [
"# Tracing Custom Dummy App with OTEL Spans using _TruLens_\n",
"\n",
"This notebook demonstrates the \"otel-tracing\" experimental feature in _TruLens_.\n",
"This notebook demonstrates the \"otel_tracing\" experimental feature in _TruLens_.\n",
"This enables the collection of _OpenTelemetry_ spans during app evaluation. Data\n",
"that is collected by _TruLens_ is recorded as spans.\n",
"\n",
Expand Down
2 changes: 1 addition & 1 deletion examples/experimental/export_llamaindex_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"source": [
"# Tracing LlamaIndex with OTEL Spans using _TruLens_\n",
"\n",
"This notebook demonstrates the \"otel-tracing\" experimental feature in _TruLens_.\n",
"This notebook demonstrates the \"otel_tracing\" experimental feature in _TruLens_.\n",
"This enables the collection of _OpenTelemetry_ spans during app execution. Data\n",
"that is collected by _TruLens_ is recorded as spans. Spans created by other tools\n",
"can also be made available alongside those created by TruLens. Spans can be\n",
Expand Down
7 changes: 7 additions & 0 deletions src/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ optional = true

[tool.poetry.group.tqdm.dependencies]
tqdm = { version = ">=4.2.0", optional = true }

[tool.poetry.group.otel]
optional = true

[tool.poetry.group.otel.dependencies]
opentelemetry-api = { version = ">=1", optional = true }
opentelemetry-sdk = { version = ">=1", optional = true }
10 changes: 5 additions & 5 deletions src/core/trulens/core/_utils/optional.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
Not for use outside trulens namespace.
"""

from trulens.core.utils.imports import format_import_errors
from trulens.core.utils import imports as import_utils

# Optional app types:
REQUIREMENT_APPS_LLAMA = format_import_errors(
REQUIREMENT_APPS_LLAMA = import_utils.format_import_errors(
"trulens-apps-llamaindex", purpose="instrumenting LlamaIndex apps"
)
REQUIREMENT_APPS_LANGCHAIN = format_import_errors(
REQUIREMENT_APPS_LANGCHAIN = import_utils.format_import_errors(
"trulens-apps-langchain", purpose="instrumenting LangChain apps"
)
REQUIREMENT_APPS_NEMO = format_import_errors(
REQUIREMENT_APPS_NEMO = import_utils.format_import_errors(
"trulens-apps-nemo", purpose="instrumenting NeMo Guardrails apps"
)

REQUIREMENT_TQDM = format_import_errors(
REQUIREMENT_TQDM = import_utils.format_import_errors(
"tqdm", purpose="displaying progress bars"
)
29 changes: 11 additions & 18 deletions src/core/trulens/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def _manage_pending_feedback_results(

def wait_for_feedback_results(
self, feedback_timeout: Optional[float] = None
) -> List[record_schema.Record]:
) -> Iterable[record_schema.Record]:
"""Wait for all feedbacks functions to complete.

Args:
Expand All @@ -518,7 +518,7 @@ def wait_for_feedback_results(
total timeout for this entire blocking call.

Returns:
A list of records that have been waited on. Note a record will be
An iterable of records that have been waited on. Note a record will be
included even if a feedback computation for it failed or
timed out.

Expand All @@ -527,17 +527,13 @@ def wait_for_feedback_results(
this is running, it will include them.
"""

records = []

while (
record := self.records_with_pending_feedback_results.pop(
blocking=False
)
) is not None:
record.wait_for_feedback_results(feedback_timeout=feedback_timeout)
records.append(record)

return records
yield record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I assume this is to avoid waiting on long blocking feedback computation. just curious what are some of the actual deadlock scenarios you've seen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen deadlocks deeper in wait_for_feedback_results which I couldn't fully explain. This change is to make things a bit more robust but wouldn't really fully address deadlocks if they are still there.


@classmethod
def select_context(cls, app: Optional[Any] = None) -> serial_utils.Lens:
Expand Down Expand Up @@ -919,10 +915,8 @@ def _on_new_root_span(
def on_method_instrumented(
self, obj: object, func: Callable, path: serial_utils.Lens
):
"""
Called by instrumentation system for every function requested to be
instrumented by this app.
"""
"""Called by instrumentation system for every function requested to be
instrumented by this app."""

if id(obj) in self.instrumented_methods:
funcs = self.instrumented_methods[id(obj)]
Expand Down Expand Up @@ -953,11 +947,11 @@ def on_method_instrumented(
def get_methods_for_func(
self, func: Callable
) -> Iterable[Tuple[int, Callable, serial_utils.Lens]]:
"""
Get the methods (rather the inner functions) matching the given `func`
and the path of each.
"""Get the methods (rather the inner functions) matching the given
`func` and the path of each.

See [WithInstrumentCallbacks.get_methods_for_func][trulens.core.instruments.WithInstrumentCallbacks.get_methods_for_func].
See
[WithInstrumentCallbacks.get_methods_for_func][trulens.core.instruments.WithInstrumentCallbacks.get_methods_for_func].
"""

for _id, funcs in self.instrumented_methods.items():
Expand All @@ -967,9 +961,8 @@ def get_methods_for_func(

# WithInstrumentCallbacks requirement
def get_method_path(self, obj: object, func: Callable) -> serial_utils.Lens:
"""
Get the path of the instrumented function `method` relative to this app.
"""
"""Get the path of the instrumented function `method` relative to this
app."""

# TODO: cleanup and/or figure out why references to objects change when executing langchain chains.

Expand Down
Loading