Skip to content

Commit

Permalink
Merge pull request #287 from Build-Squad/mudit/eng-230-handle-rate-li…
Browse files Browse the repository at this point in the history
…mits-for-order-item-metrics-api

Update Twitter tasks to handle rate limit
  • Loading branch information
varsha1305nav authored Apr 1, 2024
2 parents 9155661 + e41b9cd commit d1c11af
Showing 1 changed file with 102 additions and 36 deletions.
138 changes: 102 additions & 36 deletions src/api/marketplace/orders/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from pyxfluencer import validate_escrow_to_cancel, validate_escrow_to_delivered
from pyxfluencer.utils import get_local_keypair_pubkey

import time

logger = logging.getLogger(__name__)

"""
Expand Down Expand Up @@ -287,7 +289,8 @@ def twitter_task(order_item_id):
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_SECRET
access_token_secret=ACCESS_SECRET,
wait_on_rate_limit=True
)
try:
service_type = order_item.service_master.twitter_service_type
Expand Down Expand Up @@ -465,7 +468,8 @@ def is_post_published(order_item_id) -> bool:
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_SECRET
access_token_secret=ACCESS_SECRET,
wait_on_rate_limit=True
)
res = client.get_tweet(
id=order_item.published_tweet_id, user_auth=False)
Expand All @@ -491,7 +495,8 @@ def is_post_liked(order_item_id) -> bool:
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_SECRET
access_token_secret=ACCESS_SECRET,
wait_on_rate_limit=True
)
res = client.get_liking_users(
id=order_item.published_tweet_id, user_auth=False)
Expand Down Expand Up @@ -528,55 +533,116 @@ def validate_order_item(order_item_id):
raise Exception(str(e))


@celery_app.task()
@celery_app.task(base=QueueOnce, once={'graceful': True})
def store_order_item_metrics():
# Get the current date
now = timezone.now()
# Get all order items that are in published status and verified and service type is not retweet or like
# Get all order items that are in published status and service type is not retweet or like
order_items = OrderItem.objects.filter(
status='published',
service_master__twitter_service_type__in=['tweet', 'reply_to_tweet', 'quote_tweet', 'poll', 'thread']
)
logger.info(f'Found {len(order_items)} order items')

# Group order items by influencer
influencers_order_items = {}
for order_item in order_items:
# Calculate the number of days since the order item was published
days_since_published = (now - order_item.publish_date).days

# Check if the current day is the 1st, 2nd, 3rd, 7th, 14th, 21st, or 28th day since the order item was published
if days_since_published in [1, 2, 3, 7, 14, 21, 28]:
# Get the twitter account of the influencer
twitter_account = TwitterAccount.objects.get(
id=order_item.package.influencer.twitter_account.id)
client = Client(bearer_token=twitter_account.access_token,
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_SECRET
)
if order_item.package.influencer.twitter_account.id not in influencers_order_items:
influencers_order_items[order_item.package.influencer.twitter_account.id] = [
]
influencers_order_items[order_item.package.influencer.twitter_account.id].append(
order_item)

logger.info(
f'Grouped order items by {len(influencers_order_items)} influencers')

for influencer_id, influencer_order_items in influencers_order_items.items():
logger.info(f'Processing influencer id {influencer_id}')
# Get the twitter account of the influencer
twitter_account = TwitterAccount.objects.get(id=influencer_id)
client = Client(bearer_token=twitter_account.access_token,
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_SECRET,
wait_on_rate_limit=True
)

# Collect all tweet ids for this influencer
tweet_ids = [
order_item.published_tweet_id for order_item in influencer_order_items]
logger.info(
f'Collected {len(tweet_ids)} tweet ids for influencer id {influencer_id}')

# Split tweet_ids into chunks of 100
tweet_ids_chunks = [tweet_ids[i:i + 100]
for i in range(0, len(tweet_ids), 100)]

for tweet_ids_chunk in tweet_ids_chunks:
logger.info(
f'Processing chunk of {len(tweet_ids_chunk)} tweet ids')
try:
res = client.get_tweet(
id=order_item.published_tweet_id, user_auth=False, tweet_fields=TWEET_FIELDS)
# Use get_tweets() to get metrics of all tweets for this influencer
res = client.get_tweets(
ids=tweet_ids_chunk, user_auth=False, tweet_fields=TWEET_FIELDS)

# Log the response object
logger.info(f'Got response object: {res.data}')

logger.info(
f'Got metrics for {len(res.data)} tweets for influencer id {influencer_id}')

# Check for errors in the response
if 'errors' in res:
for error in res['errors']:
logger.error('Error in getting tweet metrics for tweet id: %s : %s',
error['resource_id'], error['detail'])

# For all the res.data fields, create a OrderItemMetric object
public_metrics = res.data['public_metrics']
organic_metrics = res.data['organic_metrics']
non_public_metrics = res.data['non_public_metrics']
for data in res.data:

order_item_metrics = []
matching_items = [
item for item in influencer_order_items if str(item.published_tweet_id) == str(data['id'])]
order_item = matching_items[0] if matching_items else None

for key, value in public_metrics.items():
order_item_metrics.append(
OrderItemMetric(order_item=order_item, metric=key, value=value, type='public_metrics'))
logger.info(f'Processing tweet id {data["id"]}')

for key, value in organic_metrics.items():
order_item_metrics.append(
OrderItemMetric(order_item=order_item, metric=key, value=value, type='organic_metrics'))
logger.info(
f'Found order item with order item id {order_item.id}')

for key, value in non_public_metrics.items():
order_item_metrics.append(
OrderItemMetric(order_item=order_item, metric=key, value=value, type='non_public_metrics'))
if order_item is None:
logger.info(
f'No order item found for tweet id {data["id"]}')
continue

OrderItemMetric.objects.bulk_create(order_item_metrics)
# For all the data fields, create a OrderItemMetric object
public_metrics = data['public_metrics']
organic_metrics = data['organic_metrics']
non_public_metrics = data['non_public_metrics']

order_item_metrics = []

for key, value in public_metrics.items():
order_item_metrics.append(
OrderItemMetric(order_item=order_item, metric=key, value=value, type='public_metrics'))

for key, value in organic_metrics.items():
order_item_metrics.append(
OrderItemMetric(order_item=order_item, metric=key, value=value, type='organic_metrics'))

for key, value in non_public_metrics.items():
order_item_metrics.append(
OrderItemMetric(order_item=order_item, metric=key, value=value, type='non_public_metrics'))

OrderItemMetric.objects.bulk_create(order_item_metrics)
logger.info(
f'Created {len(order_item_metrics)} OrderItemMetric objects')

except Exception as e:
logger.error('Error in getting tweet metrics: %s', str(e))
logger.error(
'Error in getting tweet metrics for influencer id: %s : %s', influencer_id, str(e))
continue

# Sleep for 1 minute after each request
time.sleep(60)
logger.info('Sleeping for 1 minute')

0 comments on commit d1c11af

Please sign in to comment.