Notes taken by Horeb S

Links

🔗 Link to the video

🔗 Alexey’s notes

Table of contents

Create a python script

Previously, we created a data ingestion process in a notebook : upload_data.ipynb. Now, we'll convert that notebook into a Python script. To do so, we can use Jupyter's built-in command line tool.: jupyter nbconvert --to=script upload_data.ipynb. This will create a new file called upload_data.py containing all of our Python code from the notebook, that must be cleaned a little bit.

Finally, we get something like that :

We rename upload_data.py to ingest_data.py

#!/usr/bin/env python
# coding: utf-8

# Libraries used 
import os
from dotenv import load_dotenv
import pandas as pd
from sqlalchemy import create_engine
from time import time
import argparse

def main (params) : 
    # Import environment variables 
    load_dotenv()
    password = os.getenv("POSTGRES_PASSWORD")

    # Def the arguments 
    user = params.user
    host = params.host
    port = params.port
    db = params.db
    table_name = params.table_name
    url = params.url

    csv_file = "output.csv"
    parquet_file = "output.parquet"

    # Get the parquet file
    os.system(f"wget {url} -O {parquet_file}")

    # Load the file and get the csv 
    df = pd.read_parquet(parquet_file, engine ="pyarrow")

    # Get the CSV 
    df.to_csv(csv_file, index=False)

    # Ingestion into the database
    # Create engine to connect to PostgreSQL
    engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
   
    # Create the iterative df
    df_iter = pd.read_csv(csv_file, iterator=True, chunksize=100000)

    # Get the current df and make formatting
    df = next(df_iter)
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

    # Insert in database
    df.head(n=0).to_sql(name= table_name, con=engine, if_exists="replace") # Insert the columns of the dataframe
    df.to_sql(name= table_name, con=engine, if_exists="append")

    # Ingest the remain 
    while True:
        try:
            # Start the timer 
            t_start = time()

            # Read the next chunk
            df = next(df_iter)

            # Format the columns 
            df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
            df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

            # Add data to the existing table
            df.to_sql(name= table_name, con=engine, if_exists="append")

            # End the timer
            t_end = time()

            print('Inserted another chunk... took %.3f second(s)' % (t_end - t_start))

        except StopIteration:
            print("End of data importation.")
            break

if __name__ == "__main__":
    # Def the arguments
    parser = argparse.ArgumentParser(description="Ingest CSV data to Postgres")

    parser.add_argument("--user", help="user name for postgres")
    parser.add_argument("--host", help="host for postgres")
    parser.add_argument("--port", help="port for postgres")
    parser.add_argument("--db", help="database name for postgres")
    parser.add_argument('--table_name', help="name of the table where we will write the results to")
    parser.add_argument('--url', help="url of the parquet file")

    args = parser.parse_args()
    main(args)

Now, to manage the arguments in the script, we will use argparse. This Python module will help us handle command-line arguments in a clean and organized way. We'll use it to specify parameters like user, password, host, port, database name, and table name when running our script.

Once the script is created, we can delete the table in our container to avoid redundancy. And then, we can execute the script by using a command like :

URL="<https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet>"

python ingest_data.py \\
  --user=root \\
  --host=localhost \\
  --port=5432 \\
  --db=ny_taxi \\
  --table_name=yellow_taxi_trips \\
  --url="${URL}"

<aside> 💡

Passing the password directly in the command line is not secure. A better approach would be to use environment variables or a configuration file to handle sensitive information like passwords. This helps prevent accidental exposure of credentials in command history or log files.

So, we created a file .env and used the python-dotenv library to load environment variables from this file. This allows us to securely store sensitive information like database credentials without exposing them in our code or command line arguments.

Here is my file .env

POSTGRES_PASSWORD=root

</aside>

<aside> 💡

*echo $? displays the exit status of the last executed command. A value of 0 typically indicates success, while any non-zero value indicates an error occurred during execution.*

</aside>

Docker Compose

Let’s come back into our Dockerfile. Apart from pandas, we’ll need to install sqlachemy, psycopg2 and wget (if we need to get something from a link).