Data lake in S3 from MongoDB

Using Python to upload MongoDB data to AWS S3 to build a data lake

Juan Gesino
Towards Data Science

--

I was extremely lucky to be able to write my bachelor’s thesis combining the topics I am most passionate about; economics (in the form of auction theory), software development, and data analysis. Even though my thesis was an empirical research about auctions and bidding strategies, it involved gathering data using web crawling. However, that’s a whole other story for another post. The point is that a year after my thesis, I am still paying MongoDB Atlas a monthly fee to be able to store the data I collected, even though I am not using it. It’s time to archive the data.

In this post, I will not only show you how I moved the data from MongoDB to AWS S3 for cold storage archiving but also how you could use the same principle for a data pipeline to build a data lake. Depending on the type of Modern Data Stack that you are building, you might need to create a data lake. A common way to do this is by leveraging the cheap storage of AWS S3 and a common use-case is to stage data before ingesting it into a data warehouse for processing and transformation. For example, you might have a data warehouse running in Snowflake or Redshift and these warehouses have trivial ways of reading or copying data from AWS S3.

Planning

As always, I like over-engineering things, building abstractions and reusable code. Instead of writing a script that reads all records from MongoDB, creates a JSON file and uploads it to S3, I had the following premises in mind:

  • Use YAML configuration files to determine extraction
  • Iterate through a timestamp field such as createdAt
  • Breakdown extraction by the time of day to avoid over-consuming RAM
  • Read from MongoDB using a cursor with pagination to avoid overhead in the server
  • Each collection should have its own config file to be able to customise it

Configuration Files

I love configuration files in YAML. They are the ultimate abstraction. You can have an extremely complicated process running of a YAML file which anyone can modify to their liking without having to understand the logic behind it. They are decently user-friendly and easy to work with. The idea for using them here is that they can be used to extract different MongoDB collections in different ways. For example, the auctions collection in my MongoDB instance is not that big, it has around 16 thousand records. However, the bids collection contains more than 3.6 billion records. I can configure those differently to optimise the extraction process and avoid consuming too much RAM. This also saves me from hardcoding variables in the script, having to change them whenever I want to change the collection to extract and improves the reusability of the code.

The end result should be a script (main.py) that takes a command-line argument for the name of the config file, for example main.py auctions for running the auctions.yaml configuration file. This file will determine the name of the MongoDB collection, the start date, the end date and how we want to break up the extraction by hours of the day. Here's an example of a configuration file:

This will extract all the data in the collection catawiki_auctions with a createdAt field from 2020-03-01 to 2020-08-10 and will use 24-hour intervals for each day between those two dates.

Here’s an overview of the execution of the script and how it breaks up uploading the data:

Script overview for loading data from MongoDB into AWS S3 data lake (Image by Author)

Implementation

AWS S3 bucket

The first step was to create an AWS S3 bucket to store the data and an IAM user to connect to AWS using Boto3. You can check this guide on creating an S3 bucket, but it should be fairly straightforward.

Create an AWS S3 bucket to use as a data lake (Image by Author)

Now we’ll need some credentials to be able to connect with AWS S3 and upload our files. This is managed using AWS IAM. From there you can first create a policy to allow the user we will create to access the bucket we have created:

Then you can create a user and when selecting permissions use the “Attach existing policies directly” option. This will allow you to attach the policy we created to the user. After that, you should be able to download the Access Key ID and the Secret Access Key. These are the credentials you need to call the AWS APIs using Boto3 to upload our files.

Looping to optimize RAM

Now it’s time to get to the code. I uploaded the final script to Github and you can access it here. For didactical purposes, I left the script as a monolithic single file, but otherwise, I would break it up into several utility modules that I can import back into our main file. In any case, you can take a look at it and it should be pretty easy to follow, but I’ll go through the most important parts in this post while also explaining more about some definitions.

The most important aspect of this script is the looping and generators used. The goal of this is to optimise RAM, when running processes like this in production with MongoDB collections with many documents, you may encounter RAM problems. This is especially true when running the script on a small server in production. For that reason, we go through 3 loops that allow us to compartmentalize the data we get from MongoDB and avoid loading all of it in memory.

Loops to optimize RAM while loading data to the data lake (Image by Author)

Another extremely important part of these loops is to be able to recover from errors or connection problems. If there is a network error or a bug in our code, we can restart the process from where it left off, instead of having to purge and start all over again.

Iterate from start_date to end_date

We first iterate from the start_date to end_date parameters defined in the config file. This iteration is done one day at a time and, as usual in Python, the endpoint is not part of the generated sequence. The script defines a generator to implement this call date_range. The idea is that you can pass a start_date, an end_date and an increment in number of days. It works the same way as a range generator.

If this sounds interesting, you should definitely check out Python generators, they are quite handy.

Iterate from 00:00 to 23:59

Now that we are iterating each day, we can iterate different hours from that day. We can do this for example in batches of 6 hours (00:00 to 06:00, 06:00 to 12:00, 12:00 to 18:00, 18:00 to 00:00), batches of 12 hours (00:00 to 12:00, 12:00 to 00:00) or do the entire day at once using 24 hours intervals. For small collections this doesn’t make sense, we can just load the entire day at once using 24 hour-intervals. However, if we are loading massive amounts of data, this would make sense to be able to commit data extractions and be able to recover from possible errors. At the same time, this means we can parallelise our tasks. For example, if we are using this in Airflow to generate a DAG as such:

Example Airflow DAG to upload data to an S3 data lake (Image by Author)

Receive MongoDB data in batches of 5K documents

Finally, we don’t want to be asking for millions of documents all at once to the MongoDB server. This would also be true in production, otherwise, your engineering team might give you a call to see what you’re doing. At the same time, and as already mentioned, we can also optimise RAM usage by requesting fewer documents and paginating over the results. Lastly, if you are planning to later ingest these files into a warehouse such as Snowflake, it is actually useful to break down records into smaller files. This allows you to parallelise ingestion by reading over those files in parallel. For more information about file size and Snowflake ingestion, check the General File Sizing Recommendations in their docs.

To accomplish this we again use Python generators and iterate over the MongoDB cursor. This allows us to get records 5K at a time without having to store them in memory or having to request all of them to the server. The 5K is completely customisable, in this case, I use 5K as a completely arbitrary number, but you can definitely increase that. In fact, the YAML files allow for a batch_size property that configures this exactly (the default is 5000).

File uploading to S3

After that, the process is trivial, we just get our records from MongoDB and store them in a JSON file. It’s important here to pay attention at all the loops we are executing, essentially, we want all the loops to be represented in one way or another in the file name. If we fail to do so, we might end up overwriting the same file over and over again. Furthermore, I include a timestamp for the date the data was extracted, this helps with reprocessing in case of errors and in case of overlaps during parallel/distributed runs.

One thing to notice is that MongoDB has some types that are a bit particular and not defined in JSON standard, so if you were to try to write a JSON string with them, you would get a serialisation error (“TypeError: Object of type ObjectId is not JSON serializable”).

You can solve this by using PyMongo dumper:

Notice also that I used the .gz file extension, this is to store the JSON data compressed:

Finally, to upload the generated file, we simply initialise an S3 client using Boto3 and call the upload_file method:

Conclusion

Although the main objective for me was to archive data I had in MongoDB to AWS S3, I hope I have shown how this same script can be used to ingest data to build a data lake. This type of script can be easily extended to run on Airflow, Dagster or Airbyte as part of your data pipelines to extract data from production systems and load it into your data lake or data warehouse. In fact, this script is extremely similar to the Airflow DAGs we use at Sirena to extract and load data into Snowflake. You can even use this script for one-time reprocessing of data or package everything as a CLI, dockerize it and run it in Kubernetes. The script is so simple that you can even run it using Celery in a distributed way to parallelise the extractions. The possibilities are endless.

--

--

Economist • Insights Manager @ Belvo • Ex-Data Analytics Manager @ Zenvia • Ex-Head of Data @ Sirena