-
Notifications
You must be signed in to change notification settings - Fork 0
/
spotify_daily_job.py
44 lines (38 loc) · 1.41 KB
/
spotify_daily_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from os import environ
from io import BytesIO
from SpotifyAPI import recently_played_df
from base64 import b64decode
import datetime
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload
from json import loads
gcp_cred_file = environ.get("SPOTIFY_RECENTLY_PLAYED")
gcp_cred_file = b64decode(gcp_cred_file)
gcp_cred_file = loads(gcp_cred_file.decode())
# GCS Configs
creds = service_account.Credentials.from_service_account_info(
gcp_cred_file
)
bucket_name = environ.get("BUCKET_NAME")
service = build("storage", "v1", credentials=creds)
# Filtering Today's ---->
today_date = datetime.datetime.today().strftime("%Y-%m-%d")
recently_played = recently_played_df()
print(f"\n [!] Filtering out Today's Data {today_date}.")
recently_played = recently_played[
recently_played["played_at"].dt.strftime("%Y-%m-%d") == today_date
]
# End Of Filtering ----->
# Converting the parquet file to Bytes Stream
stream = BytesIO()
recently_played.to_parquet(stream, index=False)
stream.seek(0) # seek(0) makes the stream to read mode.
# Pushing parquet file to GCS bucket
media = MediaIoBaseUpload(stream, mimetype="application/octet-stream")
blob_name = f"{today_date}.parquet"
destination_blob_name = f"recently_played/{blob_name}"
req = service.objects().insert(
bucket=bucket_name, name=destination_blob_name, media_body=media
)
resp = req.execute()