Skip to content

Commit

Permalink
[LITE-30645] - updated task for sync ff requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sainomori committed Aug 20, 2024
1 parent c5332c1 commit e628d72
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
46 changes: 28 additions & 18 deletions connect_ext_datalake/services/events/fulfillment.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from connect.client import ClientError, R
from connect.client import ClientError, ConnectClient, R
from connect.eaas.core.decorators import event, schedulable, variables
from connect.eaas.core.extension import EventsApplicationBase
from connect.eaas.core.inject.synchronous import get_installation, get_installation_client
from connect.eaas.core.responses import BackgroundResponse, ScheduledExecutionResponse
from fastapi import Depends

from connect_ext_datalake.services.client import GooglePubsubClient
from connect_ext_datalake.services.publish import publish_ff_request
Expand Down Expand Up @@ -65,6 +67,7 @@ def handle_asset_suspend_request_processing(self, ff_request):
'scheduled',
'revoking',
'revoked',
'tiers_setup',
],
)
def handle_asset_adjustment_request_processing(self, ff_request):
Expand Down Expand Up @@ -105,6 +108,7 @@ def handle_tier_account_update_request_processing(self, ff_request):
'scheduled',
'revoking',
'revoked',
'tiers_setup',
],
)
def handle_asset_purchase_request_processing(self, ff_request):
Expand All @@ -120,6 +124,7 @@ def handle_asset_purchase_request_processing(self, ff_request):
'scheduled',
'revoking',
'revoked',
'tiers_setup',
],
)
def handle_asset_change_request_processing(self, ff_request):
Expand Down Expand Up @@ -154,28 +159,37 @@ class FulfillmentTasksMixin:
'Publish FF Requests to Xvantage Goggle PubSub Topic. "asset_ids" and "installation_id" '
'should be specified',
)
def publish_ff_requests(self, schedule):
def publish_ff_requests(
self,
schedule,
):
self.logger.info(
f"Start of execution for schedule {schedule['id']} " f'for publishing all ff requests'
f"Start of execution for schedule {schedule['id']} " f'for publishing ff requests'
)
installation_id = schedule['parameter'].get('installation_id')
asset_ids = schedule['parameter'].get('asset_ids')
if installation_id is None or asset_ids is None:
statuses = schedule['parameter'].get('statuses')
if installation_id is None:
return ScheduledExecutionResponse.fail(
'Installation_id and array of asset ids should be specified'
'Installation_id should be specified'
)

try:
installation_client = self.get_installation_admin_client(installation_id)
installation_client.resourceset_append = False
installation = installation_client('devops').installations[installation_id].get()
client = self.get_installation_admin_client(installation_id)
client.resourceset_append = False
installation = client('devops').installations[installation_id].get()

try:
ff_request_qs = (
installation_client.requests.all()
client.requests.all()
.order_by('created')
.limit(int(self.config['FF_REQUEST_PAGE_SIZE']))
.filter(R().asset__id.in_(asset_ids))
.filter(
)
if asset_ids:
ff_request_qs = ff_request_qs.filter(R().asset__id.in_(asset_ids))
if statuses:
ff_request_qs = ff_request_qs.filter(R().status.in_(statuses))
else:
ff_request_qs = ff_request_qs.filter(
R().status.in_(
[
'tiers_setup',
Expand All @@ -190,7 +204,6 @@ def publish_ff_requests(self, schedule):
]
)
)
)

self.logger.info(f'Total number of FF requests is: {ff_request_qs.count()}')
counter = 1
Expand All @@ -199,13 +212,10 @@ def publish_ff_requests(self, schedule):
hub_id = ff_request['asset']['connection']['hub']['id']
setting = get_settings(installation, hub_id)
if setting:
validate_hub_cd(setting.hub.hub_cd, hub_id)
pubsub_client = GooglePubsubClient(setting)
self.logger.info(f'Processing FF request in {counter} position')
publish_ff_request(
pubsub_client,
ff_request,
self.logger,
)
publish_ff_request(pubsub_client, ff_request, self.logger, setting.hub.hub_cd)
else:
self.logger.info(
f"Publish of FF request {ff_request['id']} is not processed"
Expand Down
23 changes: 12 additions & 11 deletions connect_ext_datalake/services/events/tier_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,9 @@ def publish_tier_config_requests(self, schedule):

installation_id = schedule['parameter'].get('installation_id')
asset_ids = schedule['parameter'].get('asset_ids')
if installation_id is None or asset_ids is None:
return ScheduledExecutionResponse.fail(
'Installation_id and array of asset ids should be specified'
)
asset_statuses = schedule['parameter'].get('asset_statuses')
if installation_id is None:
return ScheduledExecutionResponse.fail('Installation_id should be specified')
installation_client = self.get_installation_admin_client(installation_id)
installation_client.resourceset_append = False
installation = installation_client('devops').installations[installation_id].get()
Expand All @@ -164,7 +163,6 @@ def publish_tier_config_requests(self, schedule):
installation_client('subscriptions')
.assets.all()
.order_by('events.created.at')
.filter(R().id.in_(asset_ids))
.select(
'-items',
'-params',
Expand All @@ -174,6 +172,12 @@ def publish_tier_config_requests(self, schedule):
'-contract',
)
)
if asset_ids:
assets = assets.filter(R().id.in_(asset_ids))
if asset_statuses:
assets = assets.filter(R().status.in_(asset_statuses))

self.logger.info(f'Total number of assets is: {assets.count()}')
unique_product_tier_accounts = set()
for request in assets:
tier1 = request['tiers'].get('tier1')
Expand All @@ -186,7 +190,7 @@ def publish_tier_config_requests(self, schedule):
f'Number of unique pairs of TA-ProductID: {len(unique_product_tier_accounts)}'
)

counter = 0
counter = 1
for product_id, tier_account_id in unique_product_tier_accounts:
tier_config_requests = (
installation_client('tier')
Expand All @@ -203,12 +207,9 @@ def publish_tier_config_requests(self, schedule):
hub_id = tcr['configuration']['connection']['hub']['id']
setting = get_settings(installation, hub_id)
if setting:
validate_hub_cd(setting.hub.hub_cd, hub_id)
pubsub_client = GooglePubsubClient(setting)
publish_tcr(
pubsub_client,
tcr,
self.logger,
)
publish_tcr(pubsub_client, tcr, self.logger, setting.hub.hub_cd)
else:
self.logger.info(
f"Publish of TCR {tcr['id']} is not processed"
Expand Down

0 comments on commit e628d72

Please sign in to comment.