Setting Up a Data Lake for GeoSpacial Analysis

Building a data lake on your laptop to get hands-on experience with big data technologies. With PySpark and Delta Lake, you can create a scalable, reliable data storage solution right on your local machine. This guide will walk you through the process, step-by-step, including setting up a virtual environment, installing the necessary tools, and loading and querying data.

Step 1: Setting Up Your Python Virtual Environment

Before diving into PySpark and Delta Lake, it’s a good practice to create a Python virtual environment. This isolates your project dependencies, ensuring they don’t interfere with other projects.

Step 1: Create a Virtual Environment Navigate to your project directory and create a virtual environment:

python -m venv datalake_env

Step 2: Activate the Virtual Environment

  • On macOS/Linux:
source datalake_env/bin/activate
  • On Windows:
datalake_env\Scripts\activate

Step 2: Install Required Packages

 With your virtual environment activated, install PySpark and Delta Lake:

pip install --upgrade pyspark          # Upgrade PySpark to the latest version
pip install delta-spark==3.2            # Install a compatible Delta Lake version
pip install jupyter                              # Notebook to run the code
pip install geopandas
# start jupyter notebook
jupyter notebook  

Step 3: Set Up PySpark with Delta Lake

Now that you’ve set up your environment, you’ll need to configure Spark to work with Delta Lake.

  •  In your Python script or Jupyter Notebook, configure Spark to use Delta Lake:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Step 4: Download and Prepare New York City Taxi Data

  1. Download Data: Obtain the New York City taxi trip data(a perfect dataset for Data Lake use cases ) from NYC Taxi and Limousine Commission. For this guide, you might choose a sample file like yellow_tripdata_2024-01.parquet.
  2. Load Data into PySpark: Create a Python script or Jupyter Notebook and set up the Spark session:
! wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet .
df = spark.read.parquet("yellow_tripdata_2024-01.parquet")
df.show(5)

Step 5: Create and Manage Delta Tables in Data Lake

  1. Write and Read Data from Delta Table: Save the DataFrame as a Delta table for efficient querying and updates
df.write.format("delta").mode("overwrite").save("/content/nytaxi_delta_table")
delta_df = spark.read.format("delta").load("/content/nytaxi_delta_table")
delta_df.show(5)

Step 6: Transforming Data Using PySpark

Now let’s perform some data transformations to clean and prepare the data for analysis.

Pre-Transformation View

  1. Filter trips with distance greater than 1 mile and passenger count less than 5.
  2. Add a new column calculating trip duration in minutes

filtered_df = delta_df.filter((delta_df.trip_distance > 1) & (delta_df.passenger_count < 5))


from pyspark.sql.functions import col, unix_timestamp, round

transformed_df = filtered_df.withColumn(
    "trip_duration",
    round((unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60, 2)
)
transformed_df.show(5)

3. Group by VendorID and calculate the average trip duration:

avg_duration_df = transformed_df.groupBy("VendorID").avg("trip_duration")
avg_duration_df.show()

+--------+------------------+
|VendorID|avg(trip_duration)|
+--------+------------------+
|       1|19.204002108542397|
|       2|18.543873848867833|
+--------+------------------+

4. Identify peak times by hour:


from pyspark.sql.functions import hour

peak_times_df = transformed_df.groupBy(hour("tpep_pickup_datetime").alias("hour")).count().orderBy("hour")
peak_times_df.show()

+----+------+
|hour| count|
+----+------+
|   0| 56024|
|   1| 35794|
|   2| 25240|
|   3| 16282|
|   4| 10612|
|   5| 12931|
|   6| 28417|
|   7| 57112|
|   8| 77286|
|   9| 86733|
|  10| 94516|
|  11|100894|
|  12|109917|
|  13|114464|
|  14|124946|
|  15|129590|
|  16|131931|
|  17|140441|
|  18|143232|
|  19|127176|
+----+------+
only showing top 20 rows

Step 6: Geo Spacial Analysis

Download data from TLC Trip Record Data website and join it with the trip Dataframe. Details steps explained at: Databricks website

import geopandas as gpd
file_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zones.zip"
gdf = gpd.read_file(file_url)
gdf.head()
import pandas as pd

# this function returns a spak dataframe with x and y points from the provided GeoDataFrame
def get_location_cords(gdf):
  gdf['lon'] = gdf.geometry.centroid.x
  gdf['lat'] = gdf.geometry.centroid.y
  sdf = spark.createDataFrame(pd.DataFrame(gdf).drop(['geometry'], axis=1))
  return sdf

sdf = get_location_cords(gdf)
sdf.show()

Next, join both the DataFrames and to create new DataFrame with Pickup location and location coordinates:

from pyspark.sql.functions import monotonically_increasing_id
geo_df = transformed_df.join(sdf, transformed_df.PULocationID == sdf.LocationID, "left").select(transformed_df["PULocationID"], sdf["lon"], sdf["lat"]).withColumn("index_column",monotonically_increasing_id())
geo_df.show(5)

Next, will get the Geometry Points using below function:

def spark_df_to_geopandas_df_for_points(sdf):
  df = sdf.toPandas()
  gdf = gpd.GeoDataFrame(
    df.drop(['lon', 'lat'], axis=1),
    crs={'init': 'epsg:4326'},
    geometry=[Point(xy) for xy in zip(df.lon, df.lat)])
  return gdf

Finally, aggregate the location count and plot the data and no surprise the brightest point in JFK airport for highest number of trips:

geo_count_df=geo_df.groupBy('PULocationID','lat','lon').count()
result_gdf = spark_df_to_geopandas_df_for_points(geo_count_df)

plt = gdf.plot(column='LocationID', cmap='Greys', figsize=(9,5))
plt = result_gdf.plot(ax=plt, column='count',cmap='OrRd', legend=True,aspect=1,legend_kwds={"label": "NYC Taxi Number of Trips", "orientation": "horizontal"},)
display(plt)
Data Lake for GeoSpacial

This article provides a practical step by step approach to build and manage a data lake using PySpark and Delta Lake with real-world New York City taxi data. It covers setting up the environment, installing tools, transforming the dataset, and performing queries and analysis using GeoSpacial plot.

Leave a comment