Notes taken by Horeb S
Links
🔗 Link to the video
🔗 Alexey’s notes
Table of contents
Previously, we created a data ingestion process in a notebook : upload_data.ipynb
. Now, we'll convert that notebook into a Python script. To do so, we can use Jupyter's built-in command line tool.: jupyter nbconvert --to=script upload_data.ipynb
. This will create a new file called upload_data.py
containing all of our Python code from the notebook, that must be cleaned a little bit.
Finally, we get something like that :
We rename
upload_data.py
toingest_data.py
#!/usr/bin/env python
# coding: utf-8
# Libraries used
import os
from dotenv import load_dotenv
import pandas as pd
from sqlalchemy import create_engine
from time import time
import argparse
def main (params) :
# Import environment variables
load_dotenv()
password = os.getenv("POSTGRES_PASSWORD")
# Def the arguments
user = params.user
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
csv_file = "output.csv"
parquet_file = "output.parquet"
# Get the parquet file
os.system(f"wget {url} -O {parquet_file}")
# Load the file and get the csv
df = pd.read_parquet(parquet_file, engine ="pyarrow")
# Get the CSV
df.to_csv(csv_file, index=False)
# Ingestion into the database
# Create engine to connect to PostgreSQL
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# Create the iterative df
df_iter = pd.read_csv(csv_file, iterator=True, chunksize=100000)
# Get the current df and make formatting
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Insert in database
df.head(n=0).to_sql(name= table_name, con=engine, if_exists="replace") # Insert the columns of the dataframe
df.to_sql(name= table_name, con=engine, if_exists="append")
# Ingest the remain
while True:
try:
# Start the timer
t_start = time()
# Read the next chunk
df = next(df_iter)
# Format the columns
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Add data to the existing table
df.to_sql(name= table_name, con=engine, if_exists="append")
# End the timer
t_end = time()
print('Inserted another chunk... took %.3f second(s)' % (t_end - t_start))
except StopIteration:
print("End of data importation.")
break
if __name__ == "__main__":
# Def the arguments
parser = argparse.ArgumentParser(description="Ingest CSV data to Postgres")
parser.add_argument("--user", help="user name for postgres")
parser.add_argument("--host", help="host for postgres")
parser.add_argument("--port", help="port for postgres")
parser.add_argument("--db", help="database name for postgres")
parser.add_argument('--table_name', help="name of the table where we will write the results to")
parser.add_argument('--url', help="url of the parquet file")
args = parser.parse_args()
main(args)
Now, to manage the arguments in the script, we will use argparse
. This Python module will help us handle command-line arguments in a clean and organized way. We'll use it to specify parameters like user, password, host, port, database name, and table name when running our script.
Once the script is created, we can delete the table in our container to avoid redundancy. And then, we can execute the script by using a command like :
URL="<https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet>"
python ingest_data.py \\
--user=root \\
--host=localhost \\
--port=5432 \\
--db=ny_taxi \\
--table_name=yellow_taxi_trips \\
--url="${URL}"
<aside> 💡
Passing the password directly in the command line is not secure. A better approach would be to use environment variables or a configuration file to handle sensitive information like passwords. This helps prevent accidental exposure of credentials in command history or log files.
So, we created a file .env
and used the python-dotenv
library to load environment variables from this file. This allows us to securely store sensitive information like database credentials without exposing them in our code or command line arguments.
Here is my file .env
POSTGRES_PASSWORD=root
</aside>
<aside> 💡
*echo $?
displays the exit status of the last executed command. A value of 0 typically indicates success, while any non-zero value indicates an error occurred during execution.*
</aside>
Let’s come back into our Dockerfile. Apart from pandas, we’ll need to install sqlachemy
, psycopg2
and wget
(if we need to get something from a link).