Merge & Group & Structuralize Files
【Question】
Here's the problem:
In a folder in HDFS, there're a number of CSV files with each row being a record with the schema (ID, attribute1, attribute2, attribute3).
Some of the columns (except ID) could be null or empty strings, and no 2 records with the same ID can have the same non-empty value.
We'd like to merge all records with the same ID, and write all merged records also in HDFS. For example:
Record R1: ID =1, attribute1 ="hello", attribute2 =null, attribute3 ="";
Record R2: ID =1, attribute1 =null, attribute2 =null, attribute3 ="testa";
Record R3: ID =1 attribute1 =null, attribute2 ="okk", attribute3 ="testa";
Merged record should be: ID =1, attribute1 ="hello", attribute2 ="okk", attribute3 ="testa"
I'm just starting to learn Spark. Could anybody share some thoughts on how to write this in Java with Spark? Thanks!
Here're some sample CSV files:
file1.csv:
ID,str1,str2,str3
1,hello,,
file2.csv:
ID,str1,str2,str3
1,,,testa
file3.csv:
ID,str1,str2,str3
1,,okk,testa
The merged file should be:
ID,str1,str2,str3
1,hello,okk,testa
It's known beforehand that there won't be any conflicts on any fields.
Thanks!
【Answer】
There are a number of (N) CSV files and each has only one record, so there are altogether N records. You need to divide all records into multiple (M) groups by ID field and merge each group into a single record. For the would-be merged record, each of the field values from the second to the last will be the field’s first non-null value in the current group; if the field contains only null values, then the field value in the merged record will be null, too.
You can accomplish it in Java or with Spark packages. But the code is difficult to write and read. An alternative is Structured Process Language (SPL). It produces simple and easy to understand code and can access HDFS directly. Here’s the SPL script:
A |
|
1 |
=["file1.csv","file2.csv","file3.csv"].("hdfs://192.168.1.210:9000/user/hfiles/"+~) |
2 |
=hdfs_client(;"hdfs:// 192.168.1.210:9000") |
3 |
=A1.conj(hdfs_file(A2,~).import@ct()) |
4 |
=A3.group(#1) |
5 |
=A4.new(#1,~.(#2).select@1(~),~.(#3).select@1(~),~.(#4).select@1(~)) |
6 |
=hdfs_file(A2,"/user/hfiles/result.csv").export@tc(A5) |
A1: Join up the CSV files into a sequence of strings.
A2: Connect to the HDFS system.
A3: Read in content of each file and concatenate them together into a table sequence.
A4: Group the table sequence by the first column (index).
A5: Rearrange each group into a record. For the would-be merged record, each of the field values from the second to the last will be the field’s first non-null value in the current group. The SPL statement can be simply written as : =A4.new(#1,${to(2,4).("~.(#"/~/").select@1(~)").concat@c()})
A6: Write A5’s table sequence to the target CSV file HDFS.
An SPL script is integration-friendly with a Java program (Read How to Call an SPL Script in Java to learn more about the integration).
SPL Official Website 👉 https://www.scudata.com
SPL Feedback and Help 👉 https://www.reddit.com/r/esProcSPL
SPL Learning Material 👉 https://c.scudata.com
SPL Source Code and Package 👉 https://github.com/SPLWare/esProc
Discord 👉 https://discord.gg/cFTcUNs7
Youtube 👉 https://www.youtube.com/@esProc_SPL