- Set-up GCP account through CLI and made a new project in Google cloud.
- Installed Docker and Docker Compose.
- Prepared a working environment for the course
- Successfuly created a few trial containers.
- Made Dockerfile
- Runs python 3.1
- Installs Pandas
- ENTRYPOINT at 'python', 'pipeline.py'
- Created pipeline (pipeline.py)
- imports sys and pandas
- Made Dockerfile
- Successfully built and tested a container that takes an argument and gives an expected result! π
- Basic workflow notes
- Pull docker image:
docker pull 'name_of_container'
- Build container:
docker build -t 'name_of_container' .
- Run docker
docker run -it 'name_of_container'
- Pull docker image:
- Worked on ingesting data into docker file
- Struggled with port configuration. Couldn't get my data to where it needed to be. The issue was that my local installation of pgAdmin was already running on the default port. I had to I'll handle it tomorrow
- Was able to successfully create postgres container.
- Explored container with pgcli
- Used sqlalchemy inside of a jupyter notebook to successfuly ingest data to postgres container. Verified results with pgcli
- Created pgAdmin container and accessed it through localhost
- Attempted to setup docker network for both pgadmin and postgres. Kept running into issues
- Found the source of the errors was from white space in code being run in terminal
β οΈ CHECK YOUR BASH COMMANDS IN EDITOR FOR WHITESPACE BEFORE PASTING INTO TERMINAL π€
- accidentally deleted postgres container with ingested data π₯΄. Didn't take too long to recreate, but it would have sucked if that happened with a larger database. Not too much trouble to run the notebook again. Since I'm using parquet files, It makes me kind of nervous not to be able to see the progress from within the notebook . . . I'll probably need a better monitoring strategy in the future.
β οΈ MAKE SURE YOU SHUT YOUR CONTAINERS DOWN PROPERLY OR SUFFER THE CONSEQUENCES!!!
- I was able to get the containers networkd and talking together. I'm so stoked I figured it out!
- Converted ipynb to python script using
jupyter nbconvert
to make a 'poboy' ingestion script.- Used argparse library to parse arguments to containers
- This is referred to as a top-level code environment and required a main block
if __name__ == '__main__':
. I dont exactly know the broader context of why its needed here, but the instructor said it was needed for things we want to run as scripts.
- Dropped taxi data from container to test script
- I was running the script bind and it was erroring out. Went down a rabbit hole for error handling.
- OMG IT WORKED!!!!! I successfully ingested data to my containers with a python script! It ain't much, but its mine!
- Fixed python script do download data to the intended local directory
- Continued to run into confusion about local network vs. docker network. I got it ironed out.
- I was able to create a container that:
- connected to docker network containing postgres and pgadmin container
- installed dependencies to run python ingestion script
- programatically downloaded local parque file and ingested data to networked postgress database.
- confirmed success with pgAdmin container.
- started working on docker-compose.yml file to spin up containers programatically.
- YAY!!! docker compose works!
- Created jupyter notebook to inspect and ingest NY taxi zone information to postgres container
- Practiced some basic queries on database.
- Began setting up teraform with GCP
- Created a service account and generated keys
- Authenticated application credentials with SKD using OAUTH
- IAM enabled
- view
- storage Object Admin
- Storage Admin
- Big Query Admin
- Enabled IAM Service Account Credentials API
- Installed Terraform
β οΈ BE VERY SURE OF WHAT YOU'RE DEPLOYING BEFORE YOU DEPLOY IT π₯π²π₯
- Created terraform directory and required files
- .terraform-version
- specifies what version of terraform to use
- main.tf
- defines configuration of resources
- README.md
- variables.tf
- stores the variables that get used in the main.tf
- these are passed at runtime
- .terraform-version
-Started to work with terraform files.
- Edited main.tf and variables.tf to work in concert.
- Created and then destroyed bucket on gcp. Great success!
- terraform init
- terraform plan
- terraform apply
- terraform destroy
- Made sure to add appropriate entries for .gitignore
- Generated ssh keys and imported public key to GCP admin.
- SSH into vm and ran htop to confirm machine was working!
- Used vim for editing readme because I'm a glutton for punishment.
- Installed anaconda on vm
- Installed docker on vm
- created ssh config file to ssh into vm locally
- Installed SSH extension for vscode
- Added user to docker group on vm to run without root
- Downloaded docker compose, made executable, and added to path variable
chmod +x docker-compose
- generated ssh keys for github and cloned personal repo to vm
- installed devcontainer for class repo
- forwarded ports for jupyter and pgadmin
- installed terraform binary from website (wget, unzip, rm)
- transferred local gcp credentials to VM in .gcp directory using sftp and put
- set GOOGLE_APPLICATION_CREDENTIALS environmental variable
- Activated service account credentials
- Tested terraform (plan/apply) on vm.
- Setup global user name and email on gitconfig
- Pushed changes to github from vm.
- NOTE: There is still a little wonkyness with port forwarding. I'll need to iron that out later.
- Successfully modified all previously created code to run natively in my container.
- Finished and submitted my assessment for the first module.
- Mage and postgres will run in a docker image
- Archetecture
- Extract
- Pull data from a source (API-NYC taxi data set)
- Transform
- Data cleaning, transformation, and partitioning.
- Load
- API to Mage, Mage to Postgres, GCS, BigQuery
- Extract
- Orchestraton is the process of dependency management facilitated through automation. The idea is to minimize manual work. A good idea is to minimize as much work as possible.
- The data orchestrator manages scheduling, triggering, monitoring, and resource allocation.
- Every workflow requires sequential steps. Porly sequenced transformations mess up your data
- Mage is an open-source ETL tool
- projects
- pipelines
- blocks (sensors, conditionals, dynamics, webhooks, etc)
- load
- transform
- export
- Engineering best practices
- In-line testing and debugging
- Fully-features observability
- transfomations in one place
- DRY principles (Don't Repeat Yourself)
- Core components of Mage
- Projects
- Pipelines (Called dags on other products)
- Blocks (Bits of code that are run in your pipelines)
- Pipelines (Called dags on other products)
- Projects
- Anatomy of a block
- Imports
- Decorator
- Function (must return a dataframe)
- Assertion
- Pulled repo
- Ran
cp dev.env .env
- moves the dev environment to the .env to keep you from pushing secrets.
docker-compose up
to download the container in the exercise
- Stink! I ran out of space on my vm. Need to shut it down and give it more space.
- Okay, I'm back up. Gave myself 100 gb
- Woohoo! Ran first pipeline in Mage! Now I need to make my own!
- Edited
io_confit.yml
to add dev provile to inject local .env variables into docker container. You can do this both in vscode and in mage. The injection uses Jinga Templating
# Development pipeline
dev:
POSTGRES_CONNECT_TIMEOUT: 10
POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME') }}"
POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA') }}"
POSTGRES_USER: "{{ env_var('POSTGRES_USER') }}"
POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD') }}"
POSTGRES_HOST: "{{ env_var('POSTGRES_HOST') }}"
POSTGRES_PORT: "{{ env_var('POSTGRES_PORT') }}"
Added a data loader to pipeline to test postgres connection. Great success
Worked on a new pipeline that pulls data, performs a light transformation, and writes it postgres
- Extract - Data Loader
load_api_data
- Created function to download taxi data and change the dtypes of the columns.
- We need to map out the datatypes for pandas when loading .csv
- we need to return the data as csv because that's how you pass frames between blocks in Mage
- Created function to download taxi data and change the dtypes of the columns.
- Transform - Data Transformation Block
transform_taxi_data
- There is some wonkyness in the data (such that there are trips with 0 passengers)
- We handle this with some pre-proccessing with a transformer.
- Added a
@test
decorator
- Load - Data Exporter block (python, postgres) 'taxi_data_to_postgres'
- Takes data from last cell, opens connection to postgres, exports data to the indicated schema and drops it in.
WOOHOO, My first "local" ETL Pipeline!
Watched videos
SIDE Quiest: I made a bash script to
- start vm
- pass the IP to my config file
- mount a remote directory on my local machine
- shutting down the vm when i'm finished working
π Starting VM [VM_NAME]...
Starting instance(s) [VM_NAME]...done.
Updated [https://compute.googleapis.com/compute/v1/projects/[PROJECT_ID]/zones/[ZONE]/instances/[VM_NAME]].
Instance internal IP is [INTERNAL_IP]
Instance external IP is [EXTERNAL_IP]
π Fetching VM's IP address...
β
VM IP address is [EXTERNAL_IP]
β³ Waiting for VM to start and SSH to become ready...
π Waiting for VM SSH to become ready...
π Waiting for VM SSH to become ready...
π Waiting for VM SSH to become ready...
π Waiting for VM SSH to become ready...
VM is ready
π Updating existing SSH config for [ALIAS]...
π Mounting remote directory...
β
Successfully mounted [REMOTE_DIR] to [LOCAL_DIR]
π Do you want to shut down the VM? (y/n)
- Create GCP Bucket
- Go to google cloud storage
- Create a bucket
claytor-mage
- Make sure "Enforce public access prevention on this bucket" is enabled
- **Create Service Account for
claytor-mage**
- I set access to owner. Its very permissive. Would need to be more limiting in the future.
- Download key as json
- Copy it into the Mage project
- Create Service Volume
- created a volume
claytor-mage
- Authentication
- Edit
io_config.yaml
to add gcp credentials - Delete all but these lines undder
google
GOOGLE_SERVICE_ACC_KEY_FILEPATH: "/path/to/your/service/account/key.json"
GOOGLE_LOCATION: US # Optional
- Big Query
- Manually Added
titanic_clean
.csv to my bucket.
- Test Pipeline
big_query_test
- Create a pipeline in Mage with a sql data loader pointed at BigQuery.
- Include a simple sql query to test the connection
- Test GCP
- Edit pipeline (
big_query_test
) to remove test sql data loader block - Make new python Data Loader
- Change
bucket_name = 'claytor-mage' object_key = 'titanic_clean.csv'
- Change
- Created new pipeline
api_to_gcs
- Draged
load_api_data
loader that was previously completed intoapi_to_gcs
- Dragged
transform_taxi_data
transformner that was previously completed intoapi_to_gcs
- Connected the two in the tree view
- Created a Python data exporter
taxi_to_gcs_parquet
- Updated
bucket_name
andobject_key=ny_taxi_data.parquet
.
- Updated
- Executed all upstream blocks
- IT WORKED!
- It's not best practice to write to a single parquet file, especially if dealing wiht large amounts of data. So we will add another block to partioition by date.
- sometimes dates represent a "best practice" scenario for partitioning.
- added a generic python data loader called
taxi_to _gcs_partitioned_parquet
. - remove connection to previous block and associate it with the previous block
transform_taxi_data
so that they execute in parallel-
in this case, we have to define credentials manually and us the pyarrow library to partion data.
-
at the beginning of the .py script add
import pyarrow as pa import pyarrow.parquet as pq import os
-
We need to tell pyarrow where our credentials are. this goes under the first
if
statement. the credentials are already set in Mage's config file, but we have to add it manually in this case. go to the terminal and typels -la
to find our credentials (
/home/src/claytor-mage.json
)if 'data_exporter' not in globals(): from mage_ai.data_preparation.decorators import data_exporter os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/home/src/claytor-mage.json" bucket_name = 'claytor-mage' project_id = 'claytor-mage' table_name = 'nyc_taxi_data' root_path = f'{bucket_name}/{table_name}' @data_exporter def export_data(data, *args, **kwargs): # gives us the date as as string that pyarrow can use data['tpep_pickup_date'] = data['tpep_pickup_datetime'].dt.date # reads data into a pyarrow table with pandas table = pa.Table.from_pandas(data) # find google cloud storage object in pyarrow fs. # Authroizes using environment variable automatically gcs = pa.fs.GcsFileSystem() # Use parquet write to data set method to dataset. This requires three arguents pq.write_to_dataset( #first argument is "table" which is a pyarrow table table, # second argument is "root_path" root_path=root_path, # third argument is a list "partition_cols" where the columns to partion on are indicated partition_cols = ['tpep_pickup_date'], # The last argument is the file system, which is the gcs file system filesystem=gcs ) # This breaks our data up by date and writes to different parquet files. see results in bucket # this is how larger datasets are managed. Doesn't make sense to write a 2gb dataset into one file # it would be really slow to read and write
-
The next part of the workflow is to move the data from cloud strage to BigQuery (olap database). This mirrors a tradational workflow in DE where you take data from a source, stage it, then write it to a database.
- Create a new pipeline
gcs_to_bigquery
- Add a python data loader that points to gcs 'load_taxi_gcs'
- I'm going to use the parquet instead of the partitioned data for speed
- This loads our data from the the indicated bucket and object key
- add python transformer to standardize column names
@transformer
decorator takes the columns as a string- replaces any spaces with underscores
.str.replace(' ', '_')
- replaces ensures all characters are lower-case
.str.lower()
\
- replaces any spaces with underscores
- We are skipping the assertion here for funsies.
- add sql data exporter
write_taxi_to_bigquery
it selects the transformed data and exports it to theny_taxy
schema:
- BigQuery and default connection
ny_taxi
schemayellow_cab_data
table
Sometimes we need to run a pipeline with dependency on a certain parameter (like date). This is making execution of a pipeline dependent on a variable. It can be useful when calling data from an api and writing files based on a condition. In this exercise I built off of a previous pipeline and added a date parameter.
- NOTE!!! We will be cloning a pipeline for this exercise. However, you must make sure you edit the blocks in a safe way so that the do not get changed in other pipelines.
@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
# Gets key word argument "execution date"
now = kwargs.get('execution_date')
# Formats now to a string in the format year/month/day
now_fpath = now.strftime("%Y/%m/%d")
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'claytor-mage'
# uses fstring to pass our now_fpath
object_key = f'{now_fpath}/daily_trips.parquet'
# Its always a good idea to comment out the last block and check what happens with print first.
# print(object_key)
GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
bucket_name,
object_key,
)
-
Backfilling is crucial for filling in missing data or correcting erroneous data in historical pipeline runs.
- It is particularly relevant for parameterized pipelines, where executions depend on variables like dates.
-
Creating Backfills in Mage
- Select the pipeline and specify a date and time window for backfilling.
- Set the interval for backfill runs.
- Mage automatically creates runs for each date within the specified range, assigning the execution date variable to each run.
Terraform will be used to deploy an app with Google Cloud Run, create a backend database, and set up persistent storage on Google Cloud.
- Prerequisites:
- Terraform
- G-cloud CLI
- Configure Google Cloud Permissions:
- Go to IAM & Admin in Google Cloud Dashboard.
- Find the service account for Mage deployment.
- For broad permissions, set the service account role to Owner.
- For specific permissions, add:
- Artifact Registry Reader
- Artifact Registry Writer
- Cloud Run Developer
- Cloud SQL Admin
- Service Account Token Creator
- Download Terraform Mage Templates
[email protected]:mage-ai/mage-ai-terraform-templates.git
- Began working on Data Warehouse seciton by learning about big query and it's relationship to gcs
- Created Mage pipeline to ingest all green taxi data for 2022
- Extracted 12 parquet files and combined into one
- Transformed them by correcting the column names
- Loaded them into GCS as a single file
green_taxi_2022.parquet
- Created an external table from green_taxi_2021 .parquet in Big Query
- Directly added the table in big query
- Got really stuck with partitioning the external table by
lpep_pickup_datetime
CREATE OR REPLACE TABLE zoomcamp-2024.ny_taxi.external_green_2022_partitoned
PARTITION BY
DATE(lpep_pickup_datetime) AS
SELECT * FROM zoomcamp-2024.ny_taxi.external_green_2022;
No matching signature for function DATE for argument types: INT64. Supported signatures: DATE(TIMESTAMP, [STRING]); DATE(DATETIME); DATE(INT64, INT64, INT64); DATE(DATE); DATE(STRING) at [4:3]
- It has an int64 dtype and the values are similar to
1640996061000000000
. T his is a unix timestamp in milliseconds.
Note to self: RESPECT TIME WHEN INGESTING !!!!!!
- This is not as expected. I neet to figure out if there is a problem with the way that I have ingested the data BigQuery to make sure it's automatic schema detection works correctly.
- I tried my best to make sure that my pipeline was correctly addressing dtypes. I ran several versions of the code, but I simply could not force the paraquet to handle the date_time columns in a way that Big Querie's auto schema detector would understand. Having found no solution after several hours, I decided to unplug and spend some time with my family. I know that I will need to partition and cluster the big query table based on date, but the date_time columns just won't behave.
- Although I couldn't figure out a way to make the parquet behave in the external table, when I created the materialized table, I utilized this code in my select statement
TIMESTAMP_MICROS(DIV(lpep_pickup_datetime, 1000)) AS lpep_pickup_timestamp,
TIMESTAMP_MICROS(DIV(lpep_dropoff_datetime, 1000)) AS lpep_dropoff_timestamp,
- Admittedly, I used too much time trying to fix a percieved problem in my pipeline, when I should have turned to big query to help me in the first place. I don't know If what I did was "best practice" but it was a workable solution.
- I finished my homework.
- Practiced with numeric generator functions in python.
- Created a pipelines with DLT to load data from generator functions to duckdb
- Practiced viewing, manipulating, and querying duck db
- Practiced appending and merging tables with duck db
- made many duck-based jokes.
Module 4 Workshop: Analytics Engineering
- Created New