diff --git a/src/api/marketplace/orders/tasks.py b/src/api/marketplace/orders/tasks.py index 562aee59..dec33eb0 100644 --- a/src/api/marketplace/orders/tasks.py +++ b/src/api/marketplace/orders/tasks.py @@ -20,6 +20,8 @@ from pyxfluencer import validate_escrow_to_cancel, validate_escrow_to_delivered from pyxfluencer.utils import get_local_keypair_pubkey +import time + logger = logging.getLogger(__name__) """ @@ -287,7 +289,8 @@ def twitter_task(order_item_id): consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, access_token=ACCESS_TOKEN, - access_token_secret=ACCESS_SECRET + access_token_secret=ACCESS_SECRET, + wait_on_rate_limit=True ) try: service_type = order_item.service_master.twitter_service_type @@ -465,7 +468,8 @@ def is_post_published(order_item_id) -> bool: consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, access_token=ACCESS_TOKEN, - access_token_secret=ACCESS_SECRET + access_token_secret=ACCESS_SECRET, + wait_on_rate_limit=True ) res = client.get_tweet( id=order_item.published_tweet_id, user_auth=False) @@ -491,7 +495,8 @@ def is_post_liked(order_item_id) -> bool: consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, access_token=ACCESS_TOKEN, - access_token_secret=ACCESS_SECRET + access_token_secret=ACCESS_SECRET, + wait_on_rate_limit=True ) res = client.get_liking_users( id=order_item.published_tweet_id, user_auth=False) @@ -528,55 +533,116 @@ def validate_order_item(order_item_id): raise Exception(str(e)) -@celery_app.task() +@celery_app.task(base=QueueOnce, once={'graceful': True}) def store_order_item_metrics(): - # Get the current date now = timezone.now() - # Get all order items that are in published status and verified and service type is not retweet or like + # Get all order items that are in published status and service type is not retweet or like order_items = OrderItem.objects.filter( status='published', service_master__twitter_service_type__in=['tweet', 'reply_to_tweet', 'quote_tweet', 'poll', 'thread'] ) + logger.info(f'Found {len(order_items)} order items') + + # Group order items by influencer + influencers_order_items = {} for order_item in order_items: - # Calculate the number of days since the order item was published days_since_published = (now - order_item.publish_date).days - - # Check if the current day is the 1st, 2nd, 3rd, 7th, 14th, 21st, or 28th day since the order item was published if days_since_published in [1, 2, 3, 7, 14, 21, 28]: - # Get the twitter account of the influencer - twitter_account = TwitterAccount.objects.get( - id=order_item.package.influencer.twitter_account.id) - client = Client(bearer_token=twitter_account.access_token, - consumer_key=CONSUMER_KEY, - consumer_secret=CONSUMER_SECRET, - access_token=ACCESS_TOKEN, - access_token_secret=ACCESS_SECRET - ) + if order_item.package.influencer.twitter_account.id not in influencers_order_items: + influencers_order_items[order_item.package.influencer.twitter_account.id] = [ + ] + influencers_order_items[order_item.package.influencer.twitter_account.id].append( + order_item) + + logger.info( + f'Grouped order items by {len(influencers_order_items)} influencers') + + for influencer_id, influencer_order_items in influencers_order_items.items(): + logger.info(f'Processing influencer id {influencer_id}') + # Get the twitter account of the influencer + twitter_account = TwitterAccount.objects.get(id=influencer_id) + client = Client(bearer_token=twitter_account.access_token, + consumer_key=CONSUMER_KEY, + consumer_secret=CONSUMER_SECRET, + access_token=ACCESS_TOKEN, + access_token_secret=ACCESS_SECRET, + wait_on_rate_limit=True + ) + + # Collect all tweet ids for this influencer + tweet_ids = [ + order_item.published_tweet_id for order_item in influencer_order_items] + logger.info( + f'Collected {len(tweet_ids)} tweet ids for influencer id {influencer_id}') + + # Split tweet_ids into chunks of 100 + tweet_ids_chunks = [tweet_ids[i:i + 100] + for i in range(0, len(tweet_ids), 100)] + + for tweet_ids_chunk in tweet_ids_chunks: + logger.info( + f'Processing chunk of {len(tweet_ids_chunk)} tweet ids') try: - res = client.get_tweet( - id=order_item.published_tweet_id, user_auth=False, tweet_fields=TWEET_FIELDS) + # Use get_tweets() to get metrics of all tweets for this influencer + res = client.get_tweets( + ids=tweet_ids_chunk, user_auth=False, tweet_fields=TWEET_FIELDS) + + # Log the response object + logger.info(f'Got response object: {res.data}') + + logger.info( + f'Got metrics for {len(res.data)} tweets for influencer id {influencer_id}') + + # Check for errors in the response + if 'errors' in res: + for error in res['errors']: + logger.error('Error in getting tweet metrics for tweet id: %s : %s', + error['resource_id'], error['detail']) - # For all the res.data fields, create a OrderItemMetric object - public_metrics = res.data['public_metrics'] - organic_metrics = res.data['organic_metrics'] - non_public_metrics = res.data['non_public_metrics'] + for data in res.data: - order_item_metrics = [] + matching_items = [ + item for item in influencer_order_items if str(item.published_tweet_id) == str(data['id'])] + order_item = matching_items[0] if matching_items else None - for key, value in public_metrics.items(): - order_item_metrics.append( - OrderItemMetric(order_item=order_item, metric=key, value=value, type='public_metrics')) + logger.info(f'Processing tweet id {data["id"]}') - for key, value in organic_metrics.items(): - order_item_metrics.append( - OrderItemMetric(order_item=order_item, metric=key, value=value, type='organic_metrics')) + logger.info( + f'Found order item with order item id {order_item.id}') - for key, value in non_public_metrics.items(): - order_item_metrics.append( - OrderItemMetric(order_item=order_item, metric=key, value=value, type='non_public_metrics')) + if order_item is None: + logger.info( + f'No order item found for tweet id {data["id"]}') + continue - OrderItemMetric.objects.bulk_create(order_item_metrics) + # For all the data fields, create a OrderItemMetric object + public_metrics = data['public_metrics'] + organic_metrics = data['organic_metrics'] + non_public_metrics = data['non_public_metrics'] + + order_item_metrics = [] + + for key, value in public_metrics.items(): + order_item_metrics.append( + OrderItemMetric(order_item=order_item, metric=key, value=value, type='public_metrics')) + + for key, value in organic_metrics.items(): + order_item_metrics.append( + OrderItemMetric(order_item=order_item, metric=key, value=value, type='organic_metrics')) + + for key, value in non_public_metrics.items(): + order_item_metrics.append( + OrderItemMetric(order_item=order_item, metric=key, value=value, type='non_public_metrics')) + + OrderItemMetric.objects.bulk_create(order_item_metrics) + logger.info( + f'Created {len(order_item_metrics)} OrderItemMetric objects') except Exception as e: - logger.error('Error in getting tweet metrics: %s', str(e)) + logger.error( + 'Error in getting tweet metrics for influencer id: %s : %s', influencer_id, str(e)) continue + + # Sleep for 1 minute after each request + time.sleep(60) + logger.info('Sleeping for 1 minute')