Hive-style partition done wrong
Wrong Hive-style partition More than once I have seen this nicely-done partition structure, only to be ruined by having different file types in the same place.
s3://bucket/ year=2023/ year=2024/ .. month=07/ month=08/ raw.csv processed.csv Spark/Glue or Presto/Trino/Athena would have worked directly on top of this structure, if there is a single file type either raw or processed.
The other file types are to be put in separate buckets with similar structures.
AWS S3 API continuation token
AWS S3 ListObjects 1000 per request There is a common misunderstanding that AWS S3 ListObjects returns only 1000 results.
resp = s3.list_objects_v2(Bucket='gdc-mmrf-commpass-phs000748-2-open') assert len(resp['Contents']) == 1000 This API and the documentation does not emphasize enough that 1000 results are per page/request.
We are expected to call the API multiple times while checking for NextContinuationToken and pass it to the next call.
continuation_token = None while True: api_kwargs = ( {'ContinuationToken': continuation_token} if continuation_token else {} ) resp = s3.
MongoDB upsert new fields
operationType update When MongoDB Spark Connector upserts data, fields that are not in the data frame are removed by default.
In cases where we only want to upsert new fields while keeping existing fields, remember to set operationType to update in options.
This is much more effective than:
read all documents out into memory declare or infer the schema add new fields to data frame write data frame back into MongoDB Sample Code from pyspark.
Spark Join Clean Code
TL;DR If inner join columns have the same name, then use on parameter. If join columns are different, then drop the redundant column after join. Schema of a Spark join Joining 2 Spark dataframes will combine columns from both, including the join columns. This requires an additional cleanup step after the join.
df_a = spark.createDataFrame([(1, 'A')], 'id:int,a_name:string') df_b = spark.createDataFrame([(1, 'B')], 'id:int,b_name:string') df_a.join(df_b, df_a.id == df_b.id).printSchema() """root |-- id: integer (nullable = true) |-- a_name: string (nullable = true) |-- id: integer (nullable = true) <----- TODO clean up |-- b_name: string (nullable = true)""" df_a = spark.
Python Memory & Garbage Collection
Out-of-memory issue Working with big data frames sometimes hangs the system with an out-of-memory error. For Spark, this issue can still happen when you collect data back into master node.
The out-of-memory issue can be prevented in the following two ways:
1: signal Garbage Collector asap (venv) ➜ ~ python -m memory_profiler t.py Line # Mem usage Increment Occurrences Line Contents ====== ========= ========= =========== ============= 35 150.836 MiB 150.836 MiB 1 @profile 36 def func(): 37 192.