-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat : Airtable connector Support #635
Changes from 1 commit
197740b
826aa02
cd78117
1ec1dd5
2171296
a1f25b8
02007ae
3f8f55f
8c0e07b
f6955eb
b2fe3a7
fc5c326
88c419d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,7 @@ class AirtableConnector(BaseConnector): | |
|
||
_rows_count: int = None | ||
_columns_count: int = None | ||
instance = None | ||
_instance = None | ||
|
||
def __init__( | ||
self, | ||
|
@@ -147,60 +147,86 @@ def execute(self): | |
if cached: | ||
return pd.read_parquet(cached) | ||
|
||
if isinstance(self.instance, pd.DataFrame): | ||
return self.instance | ||
if isinstance(self._instance, pd.DataFrame): | ||
return self._instance | ||
else: | ||
self.instance = self.fetch_data() | ||
return self.instance | ||
self._instance = self._fetch_data() | ||
|
||
def build_formula(self): | ||
return self._instance | ||
|
||
def _build_formula(self): | ||
""" | ||
Build Airtable query formula for filtering. | ||
""" | ||
|
||
condition_strings = [] | ||
for i in self._config.where: | ||
filter_query = f"{i[0]}{i[1]}'{i[2]}'" | ||
condition_strings.append(filter_query) | ||
if self._config.where is not None: | ||
for i in self._config.where: | ||
filter_query = f"{i[0]}{i[1]}'{i[2]}'" | ||
condition_strings.append(filter_query) | ||
filter_formula = f'AND({",".join(condition_strings)})' | ||
return filter_formula | ||
|
||
def fetch_data(self): | ||
""" | ||
Feteches data from airtable server through | ||
API and converts it to DataFrame. | ||
""" | ||
def _request_api(self, params): | ||
url = f"{self._root_url}{self._config.base_id}/{self._config.table}" | ||
params = {} | ||
if self._config.where: | ||
params["filterByFormula"] = self.build_formula() | ||
response = requests.get( | ||
url=url, | ||
headers={"Authorization": f"Bearer {self._config.api_key}"}, | ||
params=params, | ||
) | ||
if response.status_code == 200: | ||
data = response.json() | ||
data = self.preprocess(data=data) | ||
self._save_cache(data) | ||
else: | ||
raise InvalidRequestError( | ||
f"""Failed to connect to Airtable. | ||
Status code: {response.status_code}, | ||
message: {response.text}""" | ||
) | ||
return response | ||
|
||
def _fetch_data(self): | ||
""" | ||
Feteches data from airtable server through | ||
API and converts it to DataFrame. | ||
""" | ||
|
||
params = {} | ||
if self._config.where is not None: | ||
params["filterByFormula"] = self._build_formula() | ||
|
||
params["pageSize"] = 100 | ||
params["offset"] = "0" | ||
|
||
data = [] | ||
while True: | ||
response = self._request_api(params=params) | ||
|
||
if response.status_code == 200: | ||
res = response.json() | ||
data.append(res) | ||
if len(res["records"]) < 100: | ||
break | ||
else: | ||
raise InvalidRequestError( | ||
f"""Failed to connect to Airtable. | ||
Status code: {response.status_code}, | ||
message: {response.text}""" | ||
) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also add a Example:
|
||
if "offset" in res: | ||
params["offset"] = res["offset"] | ||
|
||
data = self._preprocess(data=data) | ||
return data | ||
|
||
def preprocess(self, data): | ||
def _preprocess(self, data): | ||
""" | ||
Preprocesses Json response data | ||
To prepare dataframe correctly. | ||
""" | ||
records = [ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets add this logic to _fetch_data function to get rid of three nested loops |
||
{"id": record["id"], **record["fields"]} for record in data["records"] | ||
] | ||
|
||
df = pd.DataFrame(records) | ||
columns = set() | ||
data_dict_list = [] | ||
for item in data: | ||
for entry in item["records"]: | ||
data_dict = {"id": entry["id"], "createdTime": entry["createdTime"]} | ||
for field_name, field_value in entry["fields"].items(): | ||
data_dict[field_name] = field_value | ||
columns.add(field_name) | ||
data_dict_list.append(data_dict) | ||
|
||
df = pd.DataFrame(data_dict_list) | ||
return df | ||
|
||
@cache | ||
|
@@ -213,12 +239,9 @@ def head(self): | |
DatFrameType: The head of the data source | ||
that the conector is connected to . | ||
""" | ||
# return self.fetch_data().head() | ||
if isinstance(self.instance, pd.DataFrame): | ||
return self.instance.head() | ||
else: | ||
self.instance = self.fetch_data() | ||
return self.instance.head() | ||
data = self._request_api(params={"maxRecords": 5}) | ||
data = self._preprocess([data.json()]) | ||
return data | ||
|
||
@cached_property | ||
def rows_count(self): | ||
|
@@ -248,7 +271,7 @@ def columns_count(self): | |
""" | ||
if self._columns_count is not None: | ||
return self._columns_count | ||
data = self.execute() | ||
data = self.head() | ||
self._columns_count = len(data.columns) | ||
return self._columns_count | ||
|
||
|
@@ -262,8 +285,8 @@ def column_hash(self): | |
int: The hash code that is unique to the columns of the data source | ||
that the connector is connected to. | ||
""" | ||
if not isinstance(self.instance, pd.DataFrame): | ||
self.instance = self.execute() | ||
columns_str = "|".join(self.instance.columns) | ||
columns_str += "WHERE" + self.build_formula() | ||
if not isinstance(self._instance, pd.DataFrame): | ||
self._instance = self.execute() | ||
columns_str = "|".join(self._instance.columns) | ||
columns_str += "WHERE" + self._build_formula() | ||
return hashlib.sha256(columns_str.encode("utf-8")).hexdigest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
_fetch_data
method sends aGET
request to the Airtable API to fetch data. If the status code is not 200, it raises anInvalidRequestError
. Similar to the_init_connection
method, the error message could be improved by including the URL that failed to fetch data.