diff --git a/connect_ext_datalake/services/events/fulfillment.py b/connect_ext_datalake/services/events/fulfillment.py index 2d4e504..5ff1f89 100644 --- a/connect_ext_datalake/services/events/fulfillment.py +++ b/connect_ext_datalake/services/events/fulfillment.py @@ -1,6 +1,5 @@ from connect.client import ClientError, R from connect.eaas.core.decorators import event, schedulable, variables -from connect.eaas.core.extension import EventsApplicationBase from connect.eaas.core.responses import BackgroundResponse, ScheduledExecutionResponse from connect_ext_datalake.services.client import GooglePubsubClient @@ -65,6 +64,7 @@ def handle_asset_suspend_request_processing(self, ff_request): 'scheduled', 'revoking', 'revoked', + 'tiers_setup', ], ) def handle_asset_adjustment_request_processing(self, ff_request): @@ -105,6 +105,7 @@ def handle_tier_account_update_request_processing(self, ff_request): 'scheduled', 'revoking', 'revoked', + 'tiers_setup', ], ) def handle_asset_purchase_request_processing(self, ff_request): @@ -120,6 +121,7 @@ def handle_asset_purchase_request_processing(self, ff_request): 'scheduled', 'revoking', 'revoked', + 'tiers_setup', ], ) def handle_asset_change_request_processing(self, ff_request): @@ -154,28 +156,35 @@ class FulfillmentTasksMixin: 'Publish FF Requests to Xvantage Goggle PubSub Topic. "asset_ids" and "installation_id" ' 'should be specified', ) - def publish_ff_requests(self, schedule): + def publish_ff_requests( + self, + schedule, + ): self.logger.info( - f"Start of execution for schedule {schedule['id']} " f'for publishing all ff requests' + f"Start of execution for schedule {schedule['id']} " f'for publishing ff requests' ) installation_id = schedule['parameter'].get('installation_id') asset_ids = schedule['parameter'].get('asset_ids') - if installation_id is None or asset_ids is None: - return ScheduledExecutionResponse.fail( - 'Installation_id and array of asset ids should be specified' - ) + statuses = schedule['parameter'].get('statuses') + if installation_id is None: + return ScheduledExecutionResponse.fail('Installation_id should be specified') - try: - installation_client = self.get_installation_admin_client(installation_id) - installation_client.resourceset_append = False - installation = installation_client('devops').installations[installation_id].get() + client = self.get_installation_admin_client(installation_id) + client.resourceset_append = False + installation = client('devops').installations[installation_id].get() + try: ff_request_qs = ( - installation_client.requests.all() + client.requests.all() .order_by('created') .limit(int(self.config['FF_REQUEST_PAGE_SIZE'])) - .filter(R().asset__id.in_(asset_ids)) - .filter( + ) + if asset_ids: + ff_request_qs = ff_request_qs.filter(R().asset__id.in_(asset_ids)) + if statuses: + ff_request_qs = ff_request_qs.filter(R().status.in_(statuses)) + else: + ff_request_qs = ff_request_qs.filter( R().status.in_( [ 'tiers_setup', @@ -190,7 +199,6 @@ def publish_ff_requests(self, schedule): ] ) ) - ) self.logger.info(f'Total number of FF requests is: {ff_request_qs.count()}') counter = 1 @@ -199,13 +207,10 @@ def publish_ff_requests(self, schedule): hub_id = ff_request['asset']['connection']['hub']['id'] setting = get_settings(installation, hub_id) if setting: + validate_hub_cd(setting.hub.hub_cd, hub_id) pubsub_client = GooglePubsubClient(setting) self.logger.info(f'Processing FF request in {counter} position') - publish_ff_request( - pubsub_client, - ff_request, - self.logger, - ) + publish_ff_request(pubsub_client, ff_request, self.logger, setting.hub.hub_cd) else: self.logger.info( f"Publish of FF request {ff_request['id']} is not processed" diff --git a/connect_ext_datalake/services/events/tier_config.py b/connect_ext_datalake/services/events/tier_config.py index ee0e440..4280cd4 100644 --- a/connect_ext_datalake/services/events/tier_config.py +++ b/connect_ext_datalake/services/events/tier_config.py @@ -152,10 +152,9 @@ def publish_tier_config_requests(self, schedule): installation_id = schedule['parameter'].get('installation_id') asset_ids = schedule['parameter'].get('asset_ids') - if installation_id is None or asset_ids is None: - return ScheduledExecutionResponse.fail( - 'Installation_id and array of asset ids should be specified' - ) + asset_statuses = schedule['parameter'].get('asset_statuses') + if installation_id is None: + return ScheduledExecutionResponse.fail('Installation_id should be specified') installation_client = self.get_installation_admin_client(installation_id) installation_client.resourceset_append = False installation = installation_client('devops').installations[installation_id].get() @@ -164,7 +163,6 @@ def publish_tier_config_requests(self, schedule): installation_client('subscriptions') .assets.all() .order_by('events.created.at') - .filter(R().id.in_(asset_ids)) .select( '-items', '-params', @@ -174,6 +172,12 @@ def publish_tier_config_requests(self, schedule): '-contract', ) ) + if asset_ids: + assets = assets.filter(R().id.in_(asset_ids)) + if asset_statuses: + assets = assets.filter(R().status.in_(asset_statuses)) + + self.logger.info(f'Total number of assets is: {assets.count()}') unique_product_tier_accounts = set() for request in assets: tier1 = request['tiers'].get('tier1') @@ -186,7 +190,7 @@ def publish_tier_config_requests(self, schedule): f'Number of unique pairs of TA-ProductID: {len(unique_product_tier_accounts)}' ) - counter = 0 + counter = 1 for product_id, tier_account_id in unique_product_tier_accounts: tier_config_requests = ( installation_client('tier') @@ -203,12 +207,9 @@ def publish_tier_config_requests(self, schedule): hub_id = tcr['configuration']['connection']['hub']['id'] setting = get_settings(installation, hub_id) if setting: + validate_hub_cd(setting.hub.hub_cd, hub_id) pubsub_client = GooglePubsubClient(setting) - publish_tcr( - pubsub_client, - tcr, - self.logger, - ) + publish_tcr(pubsub_client, tcr, self.logger, setting.hub.hub_cd) else: self.logger.info( f"Publish of TCR {tcr['id']} is not processed"