Converting large csv's to nested data structure using apache spark
What is Apache Spark ?
Apache Spark brings fast, in-memory data processing to Hadoop. Elegant and expressive development APIs in Scala, Java, and Python allow data workers to efficiently execute streaming, machine learning or SQL workloads for fast iterative access to datasets. Quick start guide
Problem Statement / Task
To read lot of really big csv’s (~GBs) from Hadoop HDFS, clean, convert them to nested data structure and update it to MongoDB using Apache Spark.
Recently I was assigned to create a Mongo collection with some select financial values by reading csv’s containing income statements, balance sheets … junk data.
Shown above is sample csv, I had to convert them into schema as shown below and update them to MongoDB. Consider a scenario where each csv is about ~ 1 GB and you have hundreds of them.
Data Cleaning - Read multiple types of csv’s and convert all of them into tuples of structure (CompanyName, Map<Year, Map<TagName, Value>>>).
Union all created RDDs - Join all the cleaned csv rdd into one.
Reduce - Reduce all tuples related to a company into single tuple considering companyName as the key.
Update MongoDB - Update MongoDB with reduced tuples.
The order of fields in the csv dump differs according to the type of csv, so I had to write a generic function wherein we can specify the position of required fields. So let’s call this function on both income-statement.csv and balance-sheet.csv and to create two cleaned rdd datasets balanceSheetRdd and incomeStatemntRdd and later join them into one masterRdd.
The spark-csv plug-in can be used to read csv’s into a dataframe rdd, the plug-in is recommended over map(line.split(",")) for its ability to handle quotes and malformed entries.
Create two Java sets with required tags that we are planning to extract from the csv.
Filter out the unwanted tags using Sparks filter action.
From the filtered rdd create a new PairRdd (tuple) of the form (CompanyName, Map<Year, Map<TagName, Value>>>) using Spark mapToPair action.
Now we have cleaned the entire csv file contents into desirable format. Here I have arranged the filter and mapToPair actions into data cleaning class.
Assuming we have created two rdd’s balanceSheetRdd and incomeStatemntRdd using above method. Make a master rdd using spark union transformation. From here on masterRdd will be instead of balanceSheetRdd and IncomeStatementRdd.
Reduce the master rdd with companyName as the key. Idea is to aggregate all financial details related to a company aggregated year wise. Calling reduceByKey() on masterRdd will produce iterable list using companyName as key but we need to do more here, we have to aggregate them according to year. We can do this by writing a custom class implementing Function2.
Class reduceMaps, takes two tuples with same comapnyName and then reduces it by correctly grouping the tags by year.
To update mongoDB using Spark use mongo-hadoop connector. Before saving the rdd covert them into pairRdds of the type JavaPairRDD<Object, BSONObject>.
Actually we did quiet a lot of things here. This is how the DAG looks for this job.
Previously I had attempted to do this filtering and mapping jobs using dataframes, but the solution was not great. I like this program for the fact that I’m not collecting anything from rdds into driver anywhere and hence this should run distributed at each stage. I ran and tested this application on a Spark Standalone Cluster on HDP Stack with 4 nodes.