Skip to content

Commit

Permalink
[Integration][Gitlab] - Introduce Pagination and Run Code in Async (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
PeyGis authored Sep 25, 2024
1 parent a49d0b2 commit c3e2f02
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 190 deletions.
8 changes: 8 additions & 0 deletions integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

0.1.124 (2024-09-24)
====================

### Improvements

- Added more logs and implemented the webhook creation in async (0.1.124)


0.1.123 (2024-09-22)
====================

Expand Down
13 changes: 5 additions & 8 deletions integrations/gitlab/gitlab_integration/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,14 @@ def on(self, events: list[str], observer: Observer) -> None:
self._observers[event].append(observer)

async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
observers = asyncio.gather(
*(
observer(event_id, body)
for observer in self._observers.get(event_id, [])
)
)

if not observers:
observers_list = self._observers.get(event_id, [])
if not observers_list:
logger.info(
f"event: {event_id} has no matching handler. the handlers available are for events: {self._observers.keys()}"
)
return

await asyncio.gather(*(observer(event_id, body) for observer in observers_list))


class SystemEventHandler(BaseEventHandler):
Expand Down
74 changes: 41 additions & 33 deletions integrations/gitlab/gitlab_integration/events/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def validate_hooks_override_config(
validate_groups_hooks_events(groups_paths)


def setup_listeners(gitlab_service: GitlabService, webhook_id: str) -> None:
def setup_listeners(gitlab_service: GitlabService, group_id: str) -> None:
handlers = [
PushHook(gitlab_service),
MergeRequest(gitlab_service),
Expand All @@ -127,9 +127,9 @@ def setup_listeners(gitlab_service: GitlabService, webhook_id: str) -> None:
]
for handler in handlers:
logger.info(
f"Setting up listeners for webhook {webhook_id} for group mapping {gitlab_service.group_mapping}"
f"Setting up listeners {handler.events} for group {group_id} for group mapping {gitlab_service.group_mapping}"
)
event_ids = [f"{event_name}:{webhook_id}" for event_name in handler.events]
event_ids = [f"{event_name}:{group_id}" for event_name in handler.events]
event_handler.on(event_ids, handler.on_hook)


Expand All @@ -144,13 +144,14 @@ def setup_system_listeners(gitlab_clients: list[GitlabService]) -> None:
ProjectFiles,
]
for handler in handlers:
logger.info(f"Setting up system listeners {handler.system_events}")
system_event_handler.on(handler)

for gitlab_service in gitlab_clients:
system_event_handler.add_client(gitlab_service)


def create_webhooks_by_client(
async def create_webhooks_by_client(
gitlab_host: str,
app_host: str,
token: str,
Expand All @@ -160,16 +161,16 @@ def create_webhooks_by_client(
gitlab_client = Gitlab(gitlab_host, token)
gitlab_service = GitlabService(gitlab_client, app_host, group_mapping)

groups_for_webhooks = gitlab_service.get_filtered_groups_for_webhooks(
groups_for_webhooks = await gitlab_service.get_filtered_groups_for_webhooks(
list(groups_hooks_events_override.keys())
if groups_hooks_events_override
else None
)

webhooks_ids: list[str] = []
groups_ids_with_webhooks: list[str] = []

for group in groups_for_webhooks:
webhook_id = gitlab_service.create_webhook(
group_id = await gitlab_service.create_webhook(
group,
(
groups_hooks_events_override.get(
Expand All @@ -180,13 +181,13 @@ def create_webhooks_by_client(
),
)

if webhook_id:
webhooks_ids.append(webhook_id)
if group_id:
groups_ids_with_webhooks.append(group_id)

return gitlab_service, webhooks_ids
return gitlab_service, groups_ids_with_webhooks


def setup_application(
async def setup_application(
token_mapping: dict[str, list[str]],
gitlab_host: str,
app_host: str,
Expand All @@ -196,45 +197,52 @@ def setup_application(
validate_token_mapping(token_mapping)

if use_system_hook:
logger.info("Using system hook")
validate_use_system_hook(token_mapping)
token, group_mapping = list(token_mapping.items())[0]
gitlab_client = Gitlab(gitlab_host, token)
gitlab_service = GitlabService(gitlab_client, app_host, group_mapping)
setup_system_listeners([gitlab_service])

else:
logger.info("Using group hooks")
validate_hooks_override_config(
token_mapping, token_group_override_hooks_mapping
)

client_to_webhooks: list[tuple[GitlabService, list[str]]] = []
client_to_group_ids_with_webhooks: list[tuple[GitlabService, list[str]]] = []

for token, group_mapping in token_mapping.items():
if not token_group_override_hooks_mapping:
client_to_webhooks.append(
create_webhooks_by_client(
gitlab_host,
app_host,
token,
None,
group_mapping,
)
)
else:
groups = token_group_override_hooks_mapping.tokens.get(
token, WebhookTokenConfig(groups=[])
).groups
if groups:
client_to_webhooks.append(
create_webhooks_by_client(
try:
if not token_group_override_hooks_mapping:
client_to_group_ids_with_webhooks.append(
await create_webhooks_by_client(
gitlab_host,
app_host,
token,
groups,
None,
group_mapping,
)
)
else:
groups = token_group_override_hooks_mapping.tokens.get(
token, WebhookTokenConfig(groups=[])
).groups
if groups:
client_to_group_ids_with_webhooks.append(
await create_webhooks_by_client(
gitlab_host,
app_host,
token,
groups,
group_mapping,
)
)
except Exception as e:
logger.exception(
f"Failed to create webhooks for group mapping {group_mapping}, error: {e}"
)

for client, webhook_ids in client_to_webhooks:
for webhook_id in webhook_ids:
setup_listeners(client, webhook_id)
for client, group_ids in client_to_group_ids_with_webhooks:
for group_id in group_ids:
setup_listeners(client, group_id)
86 changes: 58 additions & 28 deletions integrations/gitlab/gitlab_integration/gitlab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ProjectFile,
ProjectPipeline,
ProjectPipelineJob,
Hook,
)
from gitlab_integration.core.async_fetcher import AsyncFetcher
from gitlab_integration.core.entities import generate_entity_from_port_yaml
Expand Down Expand Up @@ -63,22 +64,34 @@ def __init__(
GITLAB_SEARCH_RATE_LIMIT * 0.95, 60
)

def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None:
async def get_group_hooks(self, group: RESTObject) -> AsyncIterator[List[Hook]]:
async for hooks_batch in AsyncFetcher.fetch_batch(group.hooks.list):
hooks = typing.cast(List[Hook], hooks_batch)
yield hooks

async def _get_webhook_for_group(self, group: RESTObject) -> RESTObject | None:
webhook_url = f"{self.app_host}/integration/hook/{group.get_id()}"
for hook in group.hooks.list(iterator=True):
if hook.url == webhook_url:
return hook
logger.info(
f"Getting webhook for group {group.get_id()} with url {webhook_url}"
)
async for hook_batch in self.get_group_hooks(group):
for hook in hook_batch:
if hook.url == webhook_url:
logger.info(
f"Found webhook for group {group.get_id()} with id {hook.id} and url {hook.url}"
)
return hook
return None

def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None:
async def _delete_group_webhook(self, group: RESTObject, hook_id: int) -> None:
logger.info(f"Deleting webhook with id {hook_id} in group {group.get_id()}")
try:
group.hooks.delete(hook_id)
await AsyncFetcher.fetch_single(group.hooks.delete, hook_id)
logger.info(f"Deleted webhook for {group.get_id()}")
except Exception as e:
logger.error(f"Failed to delete webhook for {group.get_id()} error={e}")

def _create_group_webhook(
async def _create_group_webhook(
self, group: RESTObject, events: list[str] | None
) -> None:
webhook_events = {
Expand All @@ -87,20 +100,23 @@ def _create_group_webhook(
}

logger.info(
f"Creating webhook for {group.get_id()} with events: {[event for event in webhook_events if webhook_events[event]]}"
f"Creating webhook for group {group.get_id()} with events: {[event for event in webhook_events if webhook_events[event]]}"
)
try:
resp = group.hooks.create(
resp = await AsyncFetcher.fetch_single(
group.hooks.create,
{
"url": f"{self.app_host}/integration/hook/{group.get_id()}",
**webhook_events,
}
},
)
logger.info(
f"Created webhook for {group.get_id()}, id={resp.id}, url={resp.url}"
f"Created webhook for group {group.get_id()}, webhook id={resp.id}, url={resp.url}"
)
except Exception as e:
logger.error(f"Failed to create webhook for {group.get_id()} error={e}")
logger.exception(
f"Failed to create webhook for group {group.get_id()} error={e}"
)

def _get_changed_files_between_commits(
self, project_id: int, head: str
Expand Down Expand Up @@ -253,14 +269,27 @@ def should_process_project(
return True
return project.name in repos

def get_root_groups(self) -> List[Group]:
groups = self.gitlab_client.groups.list(iterator=True)
async def get_root_groups(self) -> List[Group]:
groups: list[RESTObject] = []
async for groups_batch in AsyncFetcher.fetch_batch(
self.gitlab_client.groups.list, retry_transient_errors=True
):
groups_batch = typing.cast(List[RESTObject], groups_batch)
groups.extend(groups_batch)

return typing.cast(
List[Group], [group for group in groups if group.parent_id is None]
)

def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]:
groups = self.gitlab_client.groups.list(get_all=True)
async def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]:
groups: list[RESTObject] = []

async for groups_batch in AsyncFetcher.fetch_batch(
self.gitlab_client.groups.list, retry_transient_errors=True
):
groups_batch = typing.cast(List[RESTObject], groups_batch)
groups.extend(groups_batch)

return typing.cast(
List[Group],
[
Expand All @@ -270,17 +299,17 @@ def filter_groups_by_paths(self, groups_full_paths: list[str]) -> List[Group]:
],
)

def get_filtered_groups_for_webhooks(
async def get_filtered_groups_for_webhooks(
self,
groups_hooks_override_list: list[str] | None,
) -> List[Group]:
groups_for_webhooks = []
if groups_hooks_override_list is not None:
if groups_hooks_override_list:
logger.info(
"Getting all the specified groups in the mapping for a token to create their webhooks"
f"Getting all the specified groups in the mapping for a token to create their webhooks for: {groups_hooks_override_list}"
)
groups_for_webhooks = self.filter_groups_by_paths(
groups_for_webhooks = await self.filter_groups_by_paths(
groups_hooks_override_list
)

Expand All @@ -302,7 +331,7 @@ def get_filtered_groups_for_webhooks(
)
else:
logger.info("Getting all the root groups to create their webhooks")
root_groups = self.get_root_groups()
root_groups = await self.get_root_groups()
groups_for_webhooks = [
group
for group in root_groups
Expand All @@ -316,31 +345,32 @@ def get_filtered_groups_for_webhooks(

return groups_for_webhooks

def create_webhook(self, group: Group, events: list[str] | None) -> str | None:
async def create_webhook(
self, group: Group, events: list[str] | None
) -> str | None:
logger.info(f"Creating webhook for the group: {group.attributes['full_path']}")

webhook_id = None
group_id = group.get_id()

if group_id is None:
logger.info(f"Group {group.attributes['full_path']} has no id. skipping...")
return None
else:
hook = self._get_webhook_for_group(group)
hook = await self._get_webhook_for_group(group)
if hook:
logger.info(f"Webhook already exists for group {group.get_id()}")

if hook.alert_status == "disabled":
logger.info(
f"Webhook exists for group {group.get_id()} but is disabled, deleting and re-creating..."
)
self._delete_group_webhook(group, hook.id)
self._create_group_webhook(group, events)
await self._delete_group_webhook(group, hook.id)
await self._create_group_webhook(group, events)
logger.info(f"Webhook re-created for group {group.get_id()}")
else:
self._create_group_webhook(group, events)
webhook_id = str(group_id)
await self._create_group_webhook(group, events)

return webhook_id
return str(group_id)

def create_system_hook(self) -> None:
logger.info("Checking if system hook already exists")
Expand Down
Loading

0 comments on commit c3e2f02

Please sign in to comment.