Relational Database Update in Batches
Heavy Update Transactions UPDATE inventory SET quantity = <select from another table> That query is running for more than 5 mins, then customers complain that orders cannot be completed.
The only fix is to cancel the update, and wait for hours for the database to complete rollback.
Above is a very common scenario with relational database, which can be avoided by the method update in batches.
Relational Database Lock with Heavy Update Transactions Reason for the issue is database lock and transaction.
The Zen of Python, for SQL
Naming convention “Beautiful is better than ugly.”
“Readability counts.”
with cte (product_id, price) as ( select 'p1', 5 union all select 'p2', 10 ) select count(*) from cte with products (product_id, price) as ( select 'p1', 5 union all select 'p2', 10 ) select count(*) as product_count from products Explicit SELECT “Explicit is better than implicit.”
with product_1 (product_id, price) as ( select 'p1', 5 ) , product_2 (price, product_id) as ( select 10, 'p2' ) select * from product_1 union all select * from product_2 -- ERROR: UNION types text and integer -- cannot be matched with product_1 (product_id, price) as ( select 'p1', 5 ) , product_2 (price, product_id) as ( select 10, 'p2' ) select product_id, price from product_1 union all select product_id, price from product_2 Simplicity “Simple is better than complex.
Python List & Set syntax recommendation
List vs Set commonly-used syntax inconsistencies List Set >>> ls = [1, 2] >>> ls.append(3) >>> ls [1, 2, 3] >>> s = {1, 2} >>> s.union({3}) {1, 2, 3} >>> s {1, 2} >>> ls = [1, 2] >>> ls.extend([3, 4]) >>> ls [1, 2, 3, 4] >>> s = {1, 2} >>> s.union({3, 4}) {1, 2, 3, 4} >>> s {1, 2} >>> ls = [1, 2] >>> [x for x in ls if x in [2, 3]] [2] >>> ls [1, 2] >>> s = {1, 2} >>> s.
Hive-style partition done wrong
Wrong Hive-style partition More than once I have seen this nicely-done partition structure, only to be ruined by having different file types in the same place.
s3://bucket/ year=2023/ year=2024/ .. month=07/ month=08/ raw.csv processed.csv Spark/Glue or Presto/Trino/Athena would have worked directly on top of this structure, if there is a single file type either raw or processed.
The other file types are to be put in separate buckets with similar structures.
AWS S3 API continuation token
AWS S3 ListObjects 1000 per request There is a common misunderstanding that AWS S3 ListObjects returns only 1000 results.
resp = s3.list_objects_v2(Bucket='gdc-mmrf-commpass-phs000748-2-open') assert len(resp['Contents']) == 1000 This API and the documentation does not emphasize enough that 1000 results are per page/request.
We are expected to call the API multiple times while checking for NextContinuationToken and pass it to the next call.
continuation_token = None while True: api_kwargs = ( {'ContinuationToken': continuation_token} if continuation_token else {} ) resp = s3.
MongoDB upsert new fields
operationType update When MongoDB Spark Connector upserts data, fields that are not in the data frame are removed by default.
In cases where we only want to upsert new fields while keeping existing fields, remember to set operationType to update in options.
This is much more effective than:
read all documents out into memory declare or infer the schema add new fields to data frame write data frame back into MongoDB Sample Code from pyspark.
Spark Join Clean Code
TL;DR If inner join columns have the same name, then use on parameter. If join columns are different, then drop the redundant column after join. Schema of a Spark join Joining 2 Spark dataframes will combine columns from both, including the join columns. This requires an additional cleanup step after the join.
df_a = spark.createDataFrame([(1, 'A')], 'id:int,a_name:string') df_b = spark.createDataFrame([(1, 'B')], 'id:int,b_name:string') df_a.join(df_b, df_a.id == df_b.id).printSchema() """root |-- id: integer (nullable = true) |-- a_name: string (nullable = true) |-- id: integer (nullable = true) <----- TODO clean up |-- b_name: string (nullable = true)""" df_a = spark.
Python Memory & Garbage Collection
Out-of-memory issue Working with big data frames sometimes hangs the system with an out-of-memory error. For Spark, this issue can still happen when you collect data back into master node.
The out-of-memory issue can be prevented in the following two ways:
1: signal Garbage Collector asap (venv) ➜ ~ python -m memory_profiler t.py Line # Mem usage Increment Occurrences Line Contents ====== ========= ========= =========== ============= 35 150.836 MiB 150.836 MiB 1 @profile 36 def func(): 37 192.