Skip to content

Commit

Permalink
Chore/allow limiting of pagination (#48)
Browse files Browse the repository at this point in the history
* Allow user to limit pagination to help reduce resource utilization

* Remove redundant item

* Update metrics/api_metric.py

Co-authored-by: Edward Park <[email protected]>

* Update naming and default to pagination being enabled

* Flip flop

* Default true

---------

Co-authored-by: Edward Park <[email protected]>
  • Loading branch information
jimid27 and parkedwards authored Oct 24, 2024
1 parent 2ca2eb1 commit 0c57b85
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 22 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ You can modify environment variables to change the behavior of the exporter.
| `OFFSET_MINUTES` | Number of minutes to offset the start time when fetching metrics from Prefect API | `5` |
| `PREFECT_API_URL` | Prefect API URL | `https://localhost:4200/api` |
| `PREFECT_API_KEY` | Prefect API KEY (Optional) | `""` |
| `PREFECT_CSRF_ENABLED` | Enable compatibilty with Prefect Servers using CSRF protection | `False` |
| `PAGINATION_ENABLED` | Enable pagination usage. (Uses more resources) | `True` |
| `PAGINATION_LIMIT` | Pagination limit | `200` |
## Contributing
Expand Down
4 changes: 3 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def metrics():
url = str(os.getenv("PREFECT_API_URL", "http://localhost:4200/api"))
api_key = str(os.getenv("PREFECT_API_KEY", ""))
csrf_client_id = str(uuid.uuid4())

# Configure logging
logging.basicConfig(
level=loglevel, format="%(asctime)s - %(name)s - [%(levelname)s] %(message)s"
Expand All @@ -48,6 +47,9 @@ def metrics():
client_id=csrf_client_id,
csrf_enabled=str(os.getenv("PREFECT_CSRF_ENABLED", "False")) == "True",
logger=logger,
# Enable pagination if not specified to avoid breaking existing deployments
enable_pagination=str(os.getenv("PAGINATION_ENABLED", "True")) == "True",
pagination_limit=int(os.getenv("PAGINATION_LIMIT", 200)),
)

# Register the metrics with Prometheus
Expand Down
23 changes: 20 additions & 3 deletions metrics/api_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@ class PrefectApiMetric:
PrefectDeployments class for interacting with Prefect's endpoints
"""

def __init__(self, url, headers, max_retries, logger, uri) -> None:
def __init__(
self,
url,
headers,
max_retries,
logger,
enable_pagination,
pagination_limit,
uri,
) -> None:
"""
Initialize the PrefectDeployments instance.
Expand All @@ -19,13 +28,16 @@ def __init__(self, url, headers, max_retries, logger, uri) -> None:
max_retries (int): The maximum number of retries for HTTP requests.
logger (obj): The logger object.
uri (str, optional): The URI path for the intended endpoint.
enable_pagination (bool): Whether to use pagination or not.
pagination_limit (int): The limit for pagination.
"""
self.headers = headers
self.uri = uri
self.url = url
self.max_retries = max_retries
self.logger = logger
self.enable_pagination = enable_pagination
self.pagination_limit = pagination_limit

def _get_with_pagination(self, base_data: Optional[dict] = None) -> list:
"""
Expand All @@ -35,7 +47,8 @@ def _get_with_pagination(self, base_data: Optional[dict] = None) -> list:
dict: JSON response containing all items from the endpoint.
"""
endpoint = f"{self.url}/{self.uri}/filter"
limit = 200
enable_pagination = self.enable_pagination
limit = self.pagination_limit
offset = 0
all_items = []

Expand All @@ -61,6 +74,10 @@ def _get_with_pagination(self, base_data: Optional[dict] = None) -> list:

curr_page_items = resp.json()

# If pagination is not used, break the loop
if not enable_pagination:
break

# If the current page is empty, break the loop
if not curr_page_items:
break
Expand Down
21 changes: 18 additions & 3 deletions metrics/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ class PrefectDeployments(PrefectApiMetric):
PrefectDeployments class for interacting with Prefect's deployments endpoints.
"""

def __init__(self, url, headers, max_retries, logger, uri="deployments") -> None:
def __init__(
self,
url,
headers,
max_retries,
logger,
enable_pagination,
pagination_limit,
uri="deployments",
) -> None:
"""
Initialize the PrefectDeployments instance.
Expand All @@ -16,10 +25,16 @@ def __init__(self, url, headers, max_retries, logger, uri="deployments") -> None
max_retries (int): The maximum number of retries for HTTP requests.
logger (obj): The logger object.
uri (str, optional): The URI path for deployments endpoints. Default is "deployments".
pagination_limit (int): The maximum number of pages to fetch.
"""
super().__init__(
url=url, headers=headers, max_retries=max_retries, logger=logger, uri=uri
url=url,
headers=headers,
max_retries=max_retries,
logger=logger,
enable_pagination=enable_pagination,
pagination_limit=pagination_limit,
uri=uri,
)

def get_deployments_info(self) -> list:
Expand Down
18 changes: 16 additions & 2 deletions metrics/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@ class PrefectFlowRuns(PrefectApiMetric):
"""

def __init__(
self, url, headers, max_retries, offset_minutes, logger, uri="flow_runs"
self,
url,
headers,
max_retries,
offset_minutes,
logger,
enable_pagination,
pagination_limit,
uri="flow_runs",
) -> None:
"""
Initialize the PrefectFlowRuns instance.
Expand All @@ -24,7 +32,13 @@ def __init__(
"""
super().__init__(
url=url, headers=headers, max_retries=max_retries, logger=logger, uri=uri
url=url,
headers=headers,
max_retries=max_retries,
logger=logger,
enable_pagination=enable_pagination,
pagination_limit=pagination_limit,
uri=uri,
)

# Calculate timestamps for before and after data
Expand Down
19 changes: 17 additions & 2 deletions metrics/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ class PrefectFlows(PrefectApiMetric):
PrefectFlows class for interacting with Prefect's flows endpoints.
"""

def __init__(self, url, headers, max_retries, logger, uri="flows") -> None:
def __init__(
self,
url,
headers,
max_retries,
logger,
enable_pagination,
pagination_limit,
uri="flows",
) -> None:
"""
Initialize the PrefectFlows instance.
Expand All @@ -19,7 +28,13 @@ def __init__(self, url, headers, max_retries, logger, uri="flows") -> None:
"""
super().__init__(
url=url, headers=headers, max_retries=max_retries, logger=logger, uri=uri
url=url,
headers=headers,
max_retries=max_retries,
logger=logger,
enable_pagination=enable_pagination,
pagination_limit=pagination_limit,
uri=uri,
)

def get_flows_info(self) -> list:
Expand Down
65 changes: 58 additions & 7 deletions metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ class PrefectMetrics(object):
"""

def __init__(
self, url, headers, offset_minutes, max_retries, csrf_enabled, client_id, logger
self,
url,
headers,
offset_minutes,
max_retries,
csrf_enabled,
client_id,
logger,
enable_pagination,
pagination_limit,
) -> None:
"""
Initialize the PrefectMetrics instance.
Expand All @@ -36,6 +45,8 @@ def __init__(
self.logger = logger
self.client_id = client_id
self.csrf_enabled = csrf_enabled
self.enable_pagination = enable_pagination
self.pagination_limit = pagination_limit
self.csrf_token = None
self.csrf_token_expiration = None

Expand All @@ -58,25 +69,65 @@ def collect(self):
self.headers["Prefect-Csrf-Token"] = self.csrf_token
self.headers["Prefect-Csrf-Client"] = self.client_id
##
# NOTIFY IF PAGINATION IS ENABLED
#
if self.enable_pagination:
self.logger.info("Pagination is enabled")
self.logger.info(f"Pagination limit is {self.pagination_limit}")
else:
self.logger.info("Pagination is disabled")
##
# PREFECT GET RESOURCES
#
deployments = PrefectDeployments(
self.url, self.headers, self.max_retries, self.logger
self.url,
self.headers,
self.max_retries,
self.logger,
self.enable_pagination,
self.pagination_limit,
).get_deployments_info()
flows = PrefectFlows(
self.url, self.headers, self.max_retries, self.logger
self.url,
self.headers,
self.max_retries,
self.logger,
self.enable_pagination,
self.pagination_limit,
).get_flows_info()
flow_runs = PrefectFlowRuns(
self.url, self.headers, self.max_retries, self.offset_minutes, self.logger
self.url,
self.headers,
self.max_retries,
self.offset_minutes,
self.logger,
self.enable_pagination,
self.pagination_limit,
).get_flow_runs_info()
all_flow_runs = PrefectFlowRuns(
self.url, self.headers, self.max_retries, self.offset_minutes, self.logger
self.url,
self.headers,
self.max_retries,
self.offset_minutes,
self.logger,
self.enable_pagination,
self.pagination_limit,
).get_all_flow_runs_info()
work_pools = PrefectWorkPools(
self.url, self.headers, self.max_retries, self.logger
self.url,
self.headers,
self.max_retries,
self.logger,
self.enable_pagination,
self.pagination_limit,
).get_work_pools_info()
work_queues = PrefectWorkQueues(
self.url, self.headers, self.max_retries, self.logger
self.url,
self.headers,
self.max_retries,
self.logger,
self.enable_pagination,
self.pagination_limit,
).get_work_queues_info()

##
Expand Down
19 changes: 17 additions & 2 deletions metrics/work_pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ class PrefectWorkPools(PrefectApiMetric):
PrefectWorkPools class for interacting with Prefect's work pools endpoints.
"""

def __init__(self, url, headers, max_retries, logger, uri="work_pools") -> None:
def __init__(
self,
url,
headers,
max_retries,
logger,
enable_pagination,
pagination_limit,
uri="work_pools",
) -> None:
"""
Initialize the PrefectWorkPools instance.
Expand All @@ -19,7 +28,13 @@ def __init__(self, url, headers, max_retries, logger, uri="work_pools") -> None:
"""
super().__init__(
url=url, headers=headers, max_retries=max_retries, logger=logger, uri=uri
url=url,
headers=headers,
max_retries=max_retries,
logger=logger,
enable_pagination=enable_pagination,
pagination_limit=pagination_limit,
uri=uri,
)

def get_work_pools_info(self) -> list:
Expand Down
19 changes: 17 additions & 2 deletions metrics/work_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ class PrefectWorkQueues(PrefectApiMetric):
PrefectWorkQueues class for interacting with Prefect's work queues endpoints.
"""

def __init__(self, url, headers, max_retries, logger, uri="work_queues") -> None:
def __init__(
self,
url,
headers,
max_retries,
logger,
enable_pagination,
pagination_limit,
uri="work_queues",
) -> None:
"""
Initialize the PrefectWorkQueues instance.
Expand All @@ -24,7 +33,13 @@ def __init__(self, url, headers, max_retries, logger, uri="work_queues") -> None
"""
super().__init__(
url=url, headers=headers, max_retries=max_retries, logger=logger, uri=uri
url=url,
headers=headers,
max_retries=max_retries,
logger=logger,
enable_pagination=enable_pagination,
pagination_limit=pagination_limit,
uri=uri,
)

def get_work_queues_info(self) -> list:
Expand Down

0 comments on commit 0c57b85

Please sign in to comment.