Skip to content

Commit

Permalink
Monday extractor fixes / improvements (#806)
Browse files Browse the repository at this point in the history
* Increase item retreival timeout to 30s to account for large responses

* Add RequestException handling, add retrieval of boardName and inject into item metadata

* bump timeouts

* Test fixes and updates, testing board name insertion

* adjust json parsing for using cursor

* black

* bump version, lockfile

* Add test for cursor handling, test file changes

* Update embedding model version

* Add test for adding embedded monday doc into Document object

* bump version
  • Loading branch information
rishimo authored Mar 27, 2024
1 parent c3c4faf commit 692e1fe
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 155 deletions.
6 changes: 3 additions & 3 deletions metaphor/monday/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ These defaults are provided; you don't have to manually configure them.
`include_text` refers to if you'd like to include the original document text alongside the embedded content.

```yaml
azure_openAI_version: <azure_openAI_version> # "2023-12-01-preview"
azure_openAI_model_name: <azure_openAI_model_name> # "Embedding_ada002"
azure_openAI_model: <azure_openAI_model> # "text-embedding-ada-002"
azure_openAI_version: <azure_openAI_version> # "2024-03-01-preview"
azure_openAI_model_name: <azure_openAI_model_name> # "Embedding_3_small"
azure_openAI_model: <azure_openAI_model> # "text-embedding-3-small"
include_text: <include_text> # False
```
Expand Down
9 changes: 3 additions & 6 deletions metaphor/monday/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ class MondayRunConfig(BaseConfig):
azure_openAI_endpoint: str

# Default Azure OpenAI services configs
azure_openAI_version: str = "2023-12-01-preview"
azure_openAI_model: str = "text-embedding-ada-002"
azure_openAI_model_name: str = "Embedding_ada002"
azure_openAI_version: str = "2024-03-01-preview"
azure_openAI_model: str = "text-embedding-3-small"
azure_openAI_model_name: str = "Embedding_3_small"

# Store the document's content alongside embeddings
include_text: bool = False

# Notion API version
notion_api_version: str = "2022-06-28"
80 changes: 67 additions & 13 deletions metaphor/monday/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import requests
from llama_index.core import Document
from requests.exceptions import HTTPError
from requests.exceptions import HTTPError, RequestException

from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.embeddings import embed_documents, map_metadata, sanitize_text
Expand All @@ -19,6 +19,7 @@

embedding_chunk_size = 512
embedding_overlap_size = 50
max_items_query = 500


class MondayExtractor(BaseExtractor):
Expand Down Expand Up @@ -67,7 +68,7 @@ async def extract(self) -> Collection[ENTITY_TYPES]:
board_items = self._get_board_items(board, board_columns)
board_docs = self._construct_items_documents(board_items, board_columns)

self.documents.extend(board_docs)
self.documents.extend(board_docs) # type: ignore[call-arg]

logger.info("Starting embedding process")

Expand Down Expand Up @@ -100,6 +101,7 @@ def _get_board_columns(
query = f"""
query{{
boards(ids: [{board}]) {{
name
columns {{
id
title
Expand All @@ -112,10 +114,10 @@ def _get_board_columns(

try:
logger.info(f"Getting columns for board {board}")
r = requests.post(url=baseURL, json=data, headers=self.headers, timeout=5)
r = requests.post(url=baseURL, json=data, headers=self.headers, timeout=15)
r.raise_for_status()

except HTTPError as error:
except (HTTPError, RequestException) as error:
logger.warning(f"Failed to get columns for board {board}, err: {error}")

content = r.json()
Expand All @@ -125,28 +127,30 @@ def _get_board_columns(
if col["type"] in set(valid_types):
columns[col["id"]] = col["title"]

# Extract board name from response
self.current_board_name = content["data"]["boards"][0]["name"]

return columns

def _get_board_items(
self,
board: int,
columns: dict,
params: str = "{}",
max_items: int = 500,
consume: bool = True,
) -> Collection[dict]:
"""
Retrieves max_items items from specified board.
If consume == True and a cursor is present,
uses consume_items_page() to get all remaining items available to the cursor. (TODO)
uses consume_items_page() to get all remaining items available to the cursor.
"""

column_ids = list(columns.keys())

query = f"""
{{
boards(ids: [{board}]) {{
items_page(query_params:{params}, limit:{max_items}) {{
items_page(query_params:{params}, limit:{max_items_query}) {{
cursor
items {{
id
Expand All @@ -169,10 +173,10 @@ def _get_board_items(

try:
logger.info(f"Getting items for board {board}")
r = requests.post(url=baseURL, json=data, headers=self.headers, timeout=5)
r = requests.post(url=baseURL, json=data, headers=self.headers, timeout=30)
r.raise_for_status()

except HTTPError as error:
except (HTTPError, RequestException) as error:
logger.warning(f"Failed to get items for board {board}, err: {error}")

content = r.json()
Expand All @@ -181,11 +185,58 @@ def _get_board_items(
items = content["data"]["boards"][0]["items_page"]["items"]

if consume and cursor:
pass
# items = consume_items_page(cursor, items)
items = self._consume_items_cursor(cursor, items, column_ids)

return items

def _consume_items_cursor(
self, cursor: str, items: Collection[dict], column_ids: list
) -> Collection[dict]:
query = f"""
{{
next_items_page (limit: {max_items_query}, cursor: "{cursor}") {{
cursor
items {{
id
name
updates {{
text_body
}}
column_values(ids: {json.dumps(column_ids)}) {{
id
text
value
}}
url
}}
}}
}}
"""

data = {"query": query}

try:
logger.info(f"Consuming cursor {cursor} for board {self.current_board}")
r = requests.post(url=baseURL, json=data, headers=self.headers, timeout=30)
r.raise_for_status()

except (HTTPError, RequestException) as error:
logger.warning(
f"Failed to get items for board {self.current_board} with cursor {cursor}, err: {error}"
)

content = r.json()

cursor = content["data"]["next_items_page"]["cursor"]
new_items = content["data"]["next_items_page"]["items"]

items.extend(new_items) # type: ignore[attr-defined]

if cursor:
return self._consume_items_cursor(cursor, items, column_ids)
else:
return items

def _get_monday_doc(self, object_id: int) -> str:
query = f"""
{{
Expand All @@ -200,10 +251,10 @@ def _get_monday_doc(self, object_id: int) -> str:

try:
logger.info(f"Retrieving Monday doc {object_id}")
r = requests.post(url=baseURL, json=data, headers=self.headers, timeout=5)
r = requests.post(url=baseURL, json=data, headers=self.headers, timeout=15)
r.raise_for_status()

except HTTPError as error:
except (HTTPError, RequestException) as error:
logger.warning(f"Failed to get Monday doc {object_id}, err: {error}")

content = r.json()
Expand Down Expand Up @@ -239,6 +290,8 @@ def _construct_items_documents(self, items: Collection[dict], columns: dict):
updates = item["updates"]
updates_text = [u["text_body"] for u in updates]

item_text_string += f"Board Name: {self.current_board_name}\n"

if updates_text:
for update in updates_text:
item_text_string += f"Update: {sanitize_text(update)}\n"
Expand All @@ -262,6 +315,7 @@ def _construct_items_documents(self, items: Collection[dict], columns: dict):
metadata = {
"title": item_name,
"board": self.current_board,
"boardName": self.current_board_name,
"link": item_url,
"pageId": item_id,
"platform": "monday",
Expand Down
Loading

0 comments on commit 692e1fe

Please sign in to comment.