-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat : Airtable connector Support #635
Changes from 9 commits
197740b
826aa02
cd78117
1ec1dd5
2171296
a1f25b8
02007ae
3f8f55f
8c0e07b
f6955eb
b2fe3a7
fc5c326
88c419d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from pandasai.connectors import AirtableConnector | ||
from pandasai.llm import OpenAI | ||
from pandasai import SmartDataframe | ||
|
||
|
||
airtable_connectors = AirtableConnector( | ||
config={ | ||
"api_key": "AIRTABLE_API_TOKEN", | ||
"table": "AIRTABLE_TABLE_NAME", | ||
"base_id": "AIRTABLE_BASE_ID", | ||
"where": [ | ||
# this is optional and filters the data to | ||
# reduce the size of the dataframe | ||
["Status", "=", "In progress"] | ||
], | ||
} | ||
) | ||
|
||
llm = OpenAI("OPENAI_API_KEY") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
df = SmartDataframe(airtable_connectors, config={"llm": llm}) | ||
|
||
response = df.chat("How many rows are there in data ?") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
print(response) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,292 @@ | ||
""" | ||
Airtable connectors are used to connect airtable records. | ||
""" | ||
|
||
from .base import AirtableConnectorConfig, BaseConnector, BaseConnectorConfig | ||
from typing import Union, Optional | ||
import requests | ||
import pandas as pd | ||
import os | ||
from ..helpers.path import find_project_root | ||
import time | ||
import hashlib | ||
from ..exceptions import InvalidRequestError | ||
from functools import cache, cached_property | ||
|
||
|
||
class AirtableConnector(BaseConnector): | ||
""" | ||
Airtable connector to retrieving record data. | ||
""" | ||
|
||
_rows_count: int = None | ||
_columns_count: int = None | ||
_instance = None | ||
|
||
def __init__( | ||
self, | ||
config: Optional[Union[AirtableConnectorConfig, dict]] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a cache mechanism similar to the other connectors! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Tanmaypatil123 check out yahoo finance connector for references |
||
cache_interval: int = 600, | ||
): | ||
Comment on lines
+26
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Comment on lines
+27
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
if isinstance(config, dict): | ||
if "api_key" in config and "base_id" in config and "table" in config: | ||
config = AirtableConnectorConfig(**config) | ||
else: | ||
raise KeyError( | ||
"Please specify all api_key,table,base_id properly in config ." | ||
) | ||
Comment on lines
+31
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The check for the presence of "api_key", "base_id", and "table" in the config dictionary is not robust. If any of these keys have a value that evaluates to
Comment on lines
+26
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The - if "api_key" in config and "base_id" in config and "table" in config:
- config = AirtableConnectorConfig(**config)
- else:
- raise KeyError(
- "Please specify all api_key,table,base_id properly in config ."
- )
+ missing_keys = [key for key in ["api_key", "base_id", "table"] if key not in config]
+ if missing_keys:
+ raise KeyError(f"Missing keys in config: {', '.join(missing_keys)}")
+ config = AirtableConnectorConfig(**config)
Comment on lines
+27
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The - raise KeyError("Please specify all api_key,table,base_id properly in config .")
+ raise KeyError("Please specify all api_key, table, and base_id properly in the config. api_key is your Airtable API key, base_id is the ID of the base you are connecting to, and table is the name of the table within the base.")
Comment on lines
+32
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The check for the presence of "api_key", "base_id", and "table" in the config dictionary is not robust. If any of these keys have a value that evaluates to |
||
|
||
elif not config: | ||
airtable_env_vars = { | ||
"api_key": "AIRTABLE_API_TOKEN", | ||
"base_id": "AIRTABLE_BASE_ID", | ||
"table": "AIRTABLE_TABLE_NAME", | ||
} | ||
config = AirtableConnectorConfig( | ||
**self._populate_config_from_env(config, airtable_env_vars) | ||
) | ||
Comment on lines
+39
to
+47
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Comment on lines
+38
to
+47
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code is not handling the case where the |
||
|
||
self._root_url: str = "https://api.airtable.com/v0/" | ||
self._cache_interval = cache_interval | ||
|
||
super().__init__(config) | ||
|
||
def _init_connection(self, config: BaseConnectorConfig): | ||
""" | ||
make connection to database | ||
""" | ||
config = config.dict() | ||
url = f"{self._root_url}{config['base_id']}/{config['table']}" | ||
response = requests.head( | ||
url=url, headers={"Authorization": f"Bearer {config['api_key']}"} | ||
) | ||
if response.status_code == 200: | ||
self.logger.log( | ||
""" | ||
Connected to Airtable. | ||
""" | ||
) | ||
else: | ||
raise InvalidRequestError( | ||
f"""Failed to connect to Airtable. | ||
Status code: {response.status_code}, | ||
message: {response.text}""" | ||
) | ||
Comment on lines
+61
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code makes a HEAD request to the Airtable API to check the connection. However, it does not handle potential network errors that could occur during the request, such as a timeout or a connection error. It would be better to wrap the request in a try/except block and handle these potential errors.
Comment on lines
+58
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Comment on lines
+54
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The - f"""Failed to connect to Airtable.
- Status code: {response.status_code},
- message: {response.text}"""
+ f"""Failed to connect to Airtable at {url}.
+ Status code: {response.status_code},
+ message: {response.text}"""
Comment on lines
+59
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The connection initialization does not handle potential network errors that could occur when making the request. Consider adding a try-except block to handle exceptions like |
||
|
||
def _get_cache_path(self, include_additional_filters: bool = False): | ||
""" | ||
Return the path of the cache file. | ||
|
||
Returns : | ||
str : The path of the cache file. | ||
""" | ||
cache_dir = os.path.join(os.getcwd(), "") | ||
try: | ||
cache_dir = os.path.join((find_project_root()), "cache") | ||
except ValueError: | ||
cache_dir = os.path.join(os.getcwd(), "cache") | ||
Comment on lines
+83
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code is trying to find the project root and if it fails, it defaults to the current working directory. This could lead to inconsistent behavior depending on where the script is run from. Consider making the cache directory a configurable option. |
||
return os.path.join(cache_dir, f"{self._config.table}_data.parquet") | ||
Comment on lines
+83
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
||
def _cached(self, include_additional_filters: bool = False): | ||
""" | ||
Returns the cached Airtable data if it exists and | ||
is not older than the cache interval. | ||
|
||
Returns : | ||
DataFrame | None : The cached data if | ||
it exists and is not older than the cache | ||
interval, None otherwise. | ||
""" | ||
cache_path = self._get_cache_path(include_additional_filters) | ||
if not os.path.exists(cache_path): | ||
return None | ||
|
||
# If the file is older than 1 day , delete it. | ||
if os.path.getmtime(cache_path) < time.time() - self._cache_interval: | ||
if self.logger: | ||
self.logger.log(f"Deleting expired cached data from {cache_path}") | ||
os.remove(cache_path) | ||
return None | ||
Comment on lines
+104
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cache invalidation logic is based on the modification time of the cache file. This could lead to unexpected behavior if the cache file is manually modified. Consider using a separate metadata file or a database to store the cache creation time.
Comment on lines
+90
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The - os.remove(cache_path)
+ try:
+ os.remove(cache_path)
+ except OSError as e:
+ if self.logger:
+ self.logger.log(f"Failed to delete expired cache file {cache_path}: {str(e)}")
Comment on lines
+105
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cache invalidation strategy is based on the modification time of the cache file. This could lead to issues if the system clock is changed or if the file is manually modified. Consider using a more robust cache invalidation strategy, like storing the cache creation time in the file itself or in a separate metadata file. |
||
|
||
if self.logger: | ||
self.logger.log(f"Loading cached data from {cache_path}") | ||
|
||
return cache_path | ||
|
||
def _save_cache(self, df): | ||
""" | ||
Save the given DataFrame to the cache. | ||
|
||
Args: | ||
df (DataFrame): The DataFrame to save to the cache. | ||
""" | ||
filename = self._get_cache_path( | ||
include_additional_filters=self._additional_filters is not None | ||
and len(self._additional_filters) > 0 | ||
) | ||
df.to_parquet(filename) | ||
Comment on lines
+123
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The cache file name does not take into account the |
||
|
||
@property | ||
def fallback_name(self): | ||
""" | ||
Returns the fallback table name of the connector. | ||
|
||
Returns : | ||
str : The fallback table name of the connector. | ||
""" | ||
return self._config.table | ||
|
||
def execute(self): | ||
""" | ||
Execute the connector and return the result. | ||
|
||
Returns: | ||
DataFrameType: The result of the connector. | ||
""" | ||
cached = self._cached() or self._cached(include_additional_filters=True) | ||
if cached: | ||
return pd.read_parquet(cached) | ||
|
||
if isinstance(self._instance, pd.DataFrame): | ||
return self._instance | ||
else: | ||
self._instance = self._fetch_data() | ||
|
||
Comment on lines
+146
to
+154
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
return self._instance | ||
|
||
def _build_formula(self): | ||
""" | ||
Build Airtable query formula for filtering. | ||
""" | ||
|
||
condition_strings = [] | ||
if self._config.where is not None: | ||
for i in self._config.where: | ||
filter_query = f"{i[0]}{i[1]}'{i[2]}'" | ||
condition_strings.append(filter_query) | ||
filter_formula = f'AND({",".join(condition_strings)})' | ||
return filter_formula | ||
|
||
def _request_api(self, params): | ||
url = f"{self._root_url}{self._config.base_id}/{self._config.table}" | ||
response = requests.get( | ||
url=url, | ||
headers={"Authorization": f"Bearer {self._config.api_key}"}, | ||
params=params, | ||
) | ||
return response | ||
|
||
def _fetch_data(self): | ||
""" | ||
Feteches data from airtable server through | ||
API and converts it to DataFrame. | ||
""" | ||
|
||
params = {} | ||
if self._config.where is not None: | ||
params["filterByFormula"] = self._build_formula() | ||
|
||
params["pageSize"] = 100 | ||
params["offset"] = "0" | ||
|
||
data = [] | ||
while True: | ||
response = self._request_api(params=params) | ||
|
||
if response.status_code == 200: | ||
res = response.json() | ||
data.append(res) | ||
if len(res["records"]) < 100: | ||
break | ||
else: | ||
raise InvalidRequestError( | ||
f"""Failed to connect to Airtable. | ||
Status code: {response.status_code}, | ||
message: {response.text}""" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The - f"""Failed to connect to Airtable.
- Status code: {response.status_code},
- message: {response.text}"""
+ f"""Failed to fetch data from Airtable at {url}.
+ Status code: {response.status_code},
+ message: {response.text}""" |
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also add a Example:
|
||
if "offset" in res: | ||
params["offset"] = res["offset"] | ||
|
||
data = self._preprocess(data=data) | ||
return data | ||
|
||
def _preprocess(self, data): | ||
""" | ||
Preprocesses Json response data | ||
To prepare dataframe correctly. | ||
""" | ||
columns = set() | ||
data_dict_list = [] | ||
for item in data: | ||
for entry in item["records"]: | ||
data_dict = {"id": entry["id"], "createdTime": entry["createdTime"]} | ||
for field_name, field_value in entry["fields"].items(): | ||
data_dict[field_name] = field_value | ||
columns.add(field_name) | ||
data_dict_list.append(data_dict) | ||
|
||
df = pd.DataFrame(data_dict_list) | ||
return df | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The - for field_name, field_value in entry["fields"].items():
+ for field_name, field_value in entry.get("fields", {}).items(): |
||
|
||
@cache | ||
def head(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be cache using @cache for reference check sql.py |
||
""" | ||
Return the head of the table that | ||
the connector is connected to. | ||
|
||
Returns : | ||
DatFrameType: The head of the data source | ||
that the conector is connected to . | ||
""" | ||
Comment on lines
+216
to
+224
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
data = self._request_api(params={"maxRecords": 5}) | ||
data = self._preprocess([data.json()]) | ||
return data | ||
|
||
@cached_property | ||
def rows_count(self): | ||
""" | ||
Return the number of rows in the data source that the connector is | ||
connected to. | ||
|
||
Returns: | ||
int: The number of rows in the data source that the connector is | ||
connected to. | ||
""" | ||
if self._rows_count is not None: | ||
return self._rows_count | ||
data = self.execute() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep properties of rows_count and store it in instance so we don't have to call again and again |
||
self._rows_count = len(data) | ||
return self._rows_count | ||
|
||
@cached_property | ||
def columns_count(self): | ||
""" | ||
Return the number of columns in the data source that the connector is | ||
connected to. | ||
|
||
Returns: | ||
int: The number of columns in the data source that the connector is | ||
connected to. | ||
""" | ||
if self._columns_count is not None: | ||
return self._columns_count | ||
data = self.head() | ||
self._columns_count = len(data.columns) | ||
return self._columns_count | ||
Comment on lines
+261
to
+263
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
||
@property | ||
def column_hash(self): | ||
""" | ||
Return the hash code that is unique to the columns of the data source | ||
that the connector is connected to. | ||
|
||
Returns: | ||
int: The hash code that is unique to the columns of the data source | ||
that the connector is connected to. | ||
""" | ||
Comment on lines
+266
to
+274
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
if not isinstance(self._instance, pd.DataFrame): | ||
self._instance = self.execute() | ||
columns_str = "|".join(self._instance.columns) | ||
columns_str += "WHERE" + self._build_formula() | ||
return hashlib.sha256(columns_str.encode("utf-8")).hexdigest() | ||
Comment on lines
+276
to
+279
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,16 @@ class BaseConnectorConfig(BaseModel): | |
where: list[list[str]] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
||
|
||
class AirtableConnectorConfig(BaseConnectorConfig): | ||
""" | ||
Connecter configuration for Airtable data. | ||
""" | ||
|
||
api_key: str | ||
base_id: str | ||
database: str = "airtable_data" | ||
|
||
|
||
class SQLBaseConnectorConfig(BaseConnectorConfig): | ||
""" | ||
Base Connector configuration. | ||
Comment on lines
33
to
35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code snippet demonstrates the usage of the Airtable connector. It's clear and easy to understand. However, the
api_key
,table
, andbase_id
are hardcoded. In a real-world scenario, these should be stored securely and not exposed in the code.This change uses the
os.getenv
function to retrieve the values from environment variables, which is a common practice for handling sensitive data like API keys.