Skip to content

Commit

Permalink
[LITE-30645] - TierConfig requests and Fulfillment requests are added…
Browse files Browse the repository at this point in the history
… to the sync
  • Loading branch information
Sainomori committed Jul 22, 2024
1 parent 29f49ce commit d679f3d
Show file tree
Hide file tree
Showing 17 changed files with 916 additions and 398 deletions.
10 changes: 5 additions & 5 deletions connect_ext_datalake/apis/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ def publish_product_info(
summary='Publish All Products Info',
)
def publish_all_product_info(
self,
context: Context = Depends(get_call_context),
client: ConnectClient = Depends(get_extension_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
self,
context: Context = Depends(get_call_context),
client: ConnectClient = Depends(get_extension_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
):
try:
create_task_publish_product(
Expand Down
20 changes: 10 additions & 10 deletions connect_ext_datalake/apis/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def retrieve_settings(
response_model=Setting,
)
def validate_settings(
self,
hub_id: str,
installation: dict = Depends(get_installation),
self,
hub_id: str,
installation: dict = Depends(get_installation),
):
try:
settings = get_settings(installation, hub_id)
Expand Down Expand Up @@ -94,11 +94,11 @@ def save_settings(
summary='Clear Datalake Pubsub Settings',
)
def remove_settings(
self,
hub_id: str,
installation: dict = Depends(get_installation),
client: ConnectClient = Depends(get_installation_client),
logger: LoggerAdapter = Depends(get_logger),
self,
hub_id: str,
installation: dict = Depends(get_installation),
client: ConnectClient = Depends(get_installation_client),
logger: LoggerAdapter = Depends(get_logger),
):
try:
delete_settings(
Expand All @@ -116,7 +116,7 @@ def remove_settings(
response_model=list[Hub],
)
def list_hubs(
self,
client: ConnectClient = Depends(get_installation_client),
self,
client: ConnectClient = Depends(get_installation_client),
):
return list_hubs(client)
26 changes: 14 additions & 12 deletions connect_ext_datalake/apis/tier_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ class TierConfigsWebAppMixin:
summary='Publish All Tier Configs Info',
)
def publish_all_tc_info(
self,
context: Context = Depends(get_call_context),
client: ConnectClient = Depends(get_extension_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
self,
context: Context = Depends(get_call_context),
client: ConnectClient = Depends(get_extension_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
):
try:
create_task_publish_tc(
Expand All @@ -53,11 +53,11 @@ def publish_all_tc_info(
summary='Publish All Tier Configs Info',
)
def publish_tc_info(
self,
tc_id: str,
client: ConnectClient = Depends(get_installation_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
self,
tc_id: str,
client: ConnectClient = Depends(get_installation_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
):
try:
tc = client('tier').configs[tc_id].get()
Expand All @@ -72,8 +72,10 @@ def publish_tc_info(
logger,
)
else:
logger.info(f"Publish of TC {tc['id']} is not processed"
f' as settings not available for respective hub.')
logger.info(
f"Publish of TC {tc['id']} is not processed"
f' as settings not available for respective hub.'
)
return HTMLResponse(status_code=200)
except ClientError as e:
return self.get_error_response(e)
10 changes: 5 additions & 5 deletions connect_ext_datalake/apis/translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ class TranslationWebAppMixin:
summary='Publish All Product Translations Info',
)
def publish_all_product_translations(
self,
context: Context = Depends(get_call_context),
client: ConnectClient = Depends(get_extension_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
self,
context: Context = Depends(get_call_context),
client: ConnectClient = Depends(get_extension_client),
installation: dict = Depends(get_installation),
logger: LoggerAdapter = Depends(get_logger),
):
try:
create_task_publish_translation(
Expand Down
4 changes: 4 additions & 0 deletions connect_ext_datalake/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
TierConfigTasksMixin,
TranslationEventsMixin,
TranslationTaskMixin,
FulfillmentEventsMixin,
FulfillmentTasksMixin,
)


Expand All @@ -25,5 +27,7 @@ class DatalakeExtensionEventsApplication(
TierConfigTasksMixin,
TranslationEventsMixin,
TranslationTaskMixin,
FulfillmentEventsMixin,
FulfillmentTasksMixin,
):
pass
4 changes: 4 additions & 0 deletions connect_ext_datalake/services/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@
TranslationEventsMixin,
TranslationTaskMixin,
)
from connect_ext_datalake.services.events.fulfillment import (
FulfillmentEventsMixin,
FulfillmentTasksMixin,
)
205 changes: 205 additions & 0 deletions connect_ext_datalake/services/events/fulfillment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
from connect.eaas.core.decorators import event, schedulable
from connect.eaas.core.extension import EventsApplicationBase
from connect.eaas.core.responses import (
BackgroundResponse,
ScheduledExecutionResponse,
)
from connect.client import ClientError
from connect_ext_datalake.services.settings import get_settings
from connect_ext_datalake.services.publish import (
publish_ff_request,
)
from connect_ext_datalake.services.client import GooglePubsubClient


class FulfillmentEventsMixin:
def __process_ff_request_event(self, ff_request):
self.logger.info(f"Obtained fulfillment request with id {ff_request['id']}")
hub_id = ff_request.get('asset', {}).get('connection', {}).get('hub', {}).get('id', None)
if hub_id:
setting = get_settings(self.installation, hub_id)
if setting:
try:
client = GooglePubsubClient(setting)
publish_ff_request(
client,
ff_request,
self.logger,
)
except Exception as e:
self.logger.exception(
f"Publish of fulfillment request {ff_request['id']} is failed."
)
raise e
else:
self.logger.info(
f"Publish of fulfillment request {ff_request['id']} is not processed"
f" as settings not available for respective hub {hub_id}."
)
else:
self.logger.info(
f"Publish of fulfillment request {ff_request['id']} is not processed"
f" as hub id is not found in request."
)
return BackgroundResponse.done()

@event(
'asset_suspend_request_processing',
statuses=[
'pending',
'approved',
'failed',
'scheduled',
'revoking',
'revoked',
],
)
def handle_asset_suspend_request_processing(self, ff_request):
return self.__process_ff_request_event(ff_request)

@event(
'asset_adjustment_request_processing',
statuses=[
'pending',
'approved',
'failed',
'inquiring',
'scheduled',
'revoking',
'revoked',
],
)
def handle_asset_adjustment_request_processing(self, ff_request):
return self.__process_ff_request_event(ff_request)

@event(
'asset_cancel_request_processing',
statuses=[
'pending',
'approved',
'failed',
'scheduled',
'revoking',
'revoked',
],
)
def handle_asset_cancel_request_processing(self, ff_request):
return self.__process_ff_request_event(ff_request)

@event(
'tier_account_update_request_processing',
statuses=[
'pending',
'accepted',
'ignored',
],
)
def handle_tier_account_update_request_processing(self, ff_request):
return self.__process_ff_request_event(ff_request)

@event(
'asset_purchase_request_processing',
statuses=[
'pending',
'approved',
'failed',
'inquiring',
'scheduled',
'revoking',
'revoked',
],
)
def handle_asset_purchase_request_processing(self, ff_request):
return self.__process_ff_request_event(ff_request)

@event(
'asset_change_request_processing',
statuses=[
'pending',
'approved',
'failed',
'inquiring',
'scheduled',
'revoking',
'revoked',
],
)
def handle_asset_change_request_processing(self, ff_request):
return self.__process_ff_request_event(ff_request)

@event(
'asset_resume_request_processing',
statuses=[
'pending',
'approved',
'failed',
'scheduled',
'revoking',
'revoked',
],
)
def handle_asset_resume_request_processing(self, ff_request):
return self.__process_ff_request_event(ff_request)


class FulfillmentTasksMixin:
@schedulable(
'Publish FF Requests',
'Publish FF Requests to Xvantage Goggle PubSub Topic.',
)
def publish_ff_requests(self, schedule):
self.logger.info(
f"Start of execution for schedule {schedule['id']} " f'for publishing all ff requests'
)
try:
installation_id = schedule['parameter']['installation_id']
installation_client = self.get_installation_admin_client(installation_id)
installation_client.resourceset_append = False

installation = installation_client('devops').installations[installation_id].get()
count = installation_client('requests').all().count()
self.logger.info(f'Total number of ff requests: {count}')
prs = (
installation_client('requests')
.all()
.select(
'-asset',
'-template',
'-connection',
'-params',
'-contract',
'-marketplace',
'-previous_approved_request',
)
)
counter = 1

for pr in prs:
try:
pr_full = installation_client('requests').get(pr['id'])
hub_id = pr_full['asset']['connection']['hub']['id']

setting = get_settings(installation, hub_id)
if setting:
pubsub_client = GooglePubsubClient(setting)
self.logger.info(f'Processing PR in {counter} position')
publish_ff_request(
pubsub_client,
pr_full,
self.logger,
)
else:
self.logger.info(
f"Publish of PR {pr['id']} is not processed"
f" as settings not available for respective hub."
)
counter += 1
except (ClientError, GoogleAPIError):
self.logger.exception(
f"Problem in while publishing Fulfillment request {pr['id']}."
)
except ClientError as e:
self.logger.exception('Problem in calling Connect or Google APIs.')
raise e

return ScheduledExecutionResponse.done()
16 changes: 9 additions & 7 deletions connect_ext_datalake/services/events/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def handle_listing_request_processing(self, request):
publish_payload('Product', product_id, payload, settings, self.logger)
except Exception as e:
self.logger.exception(
f"Publish of product {request['product']['id']} "
f'is failed. Payload: {payload}',
f"Publish of product {request['product']['id']} " f'is failed. Payload: {payload}',
)
raise e

Expand Down Expand Up @@ -81,8 +80,9 @@ class ProductTasksMixin:
'Publish products to Xvantage Goggle PubSub Topic.',
)
def publish_products(self, schedule):
self.logger.info(f"Start of execution for schedule {schedule['id']} "
f'for publishing all products')
self.logger.info(
f"Start of execution for schedule {schedule['id']} " f'for publishing all products'
)
try:
installation_id = schedule['parameter']['installation_id']
products = schedule['parameter']['products']
Expand All @@ -93,9 +93,11 @@ def publish_products(self, schedule):
product_ids = [product['id'] for product in products]
products = list(installation_client.products.filter(id__in=product_ids))
else:
products = list(installation_client.products.filter(
R().visibility.listing.eq(True) or R().visibility.syndication.eq(True),
).all())
products = list(
installation_client.products.filter(
R().visibility.listing.eq(True) or R().visibility.syndication.eq(True),
).all()
)

installation = installation_client('devops').installations[installation_id].get()
product_settings_map = creating_settings_map_from_product(
Expand Down
Loading

0 comments on commit d679f3d

Please sign in to comment.