MongoDB upsert new fields
operationType update
When MongoDB Spark Connector upserts data, fields that are not in the data frame are removed by default.
In cases where we only want to upsert new fields while keeping existing fields, remember to set operationType to update in options.
This is much more effective than:
- read all documents out into memory
- declare or infer the schema
- add new fields to data frame
- write data frame back into MongoDB
Sample Code
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(
"spark.jars.packages",
"org.mongodb.spark:mongo-spark-connector_2.12:10.2.2",
).getOrCreate()
URI = "mongodb+srv://<user>:<password>@<url>/<database>"
options = {
"collection": "test_upsert_partial",
"connection.uri": URI,
}
# Set up test data
df_existing = spark.createDataFrame(
[
("A", "A1", "A2"),
("B", "B1", "B2"),
],
schema="_id:string,field_1:string,field_2:string",
)
(
df_existing.write.format("mongodb").mode("append")
.options(**options)
.save()
)
spark.read.format("mongodb").options(**options).load().show()
"""
+---+-------+-------+
|_id|field_1|field_2|
+---+-------+-------+
| A| A1| A2|
| B| B1| B2|
+---+-------+-------+
"""
df_new = spark.createDataFrame(
[
("B", "B1_NEW"),
("C", "C1"),
],
schema="_id:string,field_1:string",
)
df_new.show()
'''
+---+-------+
|_id|field_1|
+---+-------+
| B| B1_NEW|
| C| C1|
+---+-------+
'''
# Test writing new fields WITHOUT 'update' as 'operationType'
options = {
"collection": "test_upsert_partial",
'connection.uri': URI,
# operationType != 'update' will REMOVE existing fields
# "operationType": "update",
}
(
df_new.write.format("mongodb").mode("append")
.options(**options)
.save()
)
spark.read.format('mongodb').options(**options).load().show()
'''
+---+-------+-------+
|_id|field_1|field_2|
+---+-------+-------+
| A| A1| A2|
| B| B1_NEW| null| <-- field_2 of _id B was removed
| C| C1| null|
+---+-------+-------+
'''
Last modified on 2024-07-31