Python vs. SPL 12 -- Big Data Processing

 

In data analysis, we usually encounter the data that is too big to fit in memory and has to be processed on hard disk. In this article, we’ll compare the calculation abilities of Python and SPL for such order of magnitude of data. As for bigger order of magnitude like PBs, it requires the distributed system to analyze the data, which is out of the scope of this article.

 

Aggregation

A simple aggregation only needs to traverse the data and calculate the aggregation column once according to the aggregation target. For example: sum - accumulate the read data when traversing the data; count - record the number of traversals when traversing the data; mean - record the numbers of both accumulation and traversals when traversing the data, and then divide the numbers. Here we illustrate the sum calculation.

 

Based on the following file, calculate the total amount of orders.

 

Some of the data are:

orderkey orderdate      state       quantity amount

1     2008-01-01  Wyoming      10   282.7

2     2008-01-01  Indiana   3     84.81

3     2008-01-01  Nebraska       82   2318.14

4     2008-01-01  West Virginia 9     254.43

 

Python

order_file="D:\data\orders.txt"

total=0

with open(order_file,'r') as f:

    line=f.readline()

    while True:

        line = f.readline()

        if not line:

            break

        total += float(line.split("\t")[4])

print(total)

 

 

Open the file

Title line

 

Read line by line

End the operation when no content can be read

 

Accumulate

 

Python can read the file line by line, find the order amount field of each line, and keep aggregating to get the sum result, which does not need any skill but just hard-coding. Python can also use Pandas to read the file in blocks in order to perform the sum operation, and the code is as follows:

 

import pandas as pd

order_file="D:\data\orders.txt"

chunk_data = pd.read_csv(order_file,sep="\t",chunksize=100000)

total=0

for chunk in chunk_data:

      total+=chunk['amount'].sum()

print(total)

 

 

Read the file in blocks, and there are 100,000 lines of data in each block

Accumulate the sale amount of each block

 

Pandas allows to read data in blocks, and it is very easy to aggregate each block, then loop through all blocks, and aggregate all blocks to get the final sum result. This method is slightly simpler in coding and more efficient in computing, however, it is still considered as hard-coded and troublesome to write.

 

SPL


A

B

1

D:\data\orders.txt


2

=file(A1).cursor@t()

/create a cursor

3

=A2.total(sum(amount))


 

SPL uses the cursor to calculate the big data, and provides abundant cursor calculating functions. And total function is one of them, which allows multiple aggregation operations simultaneously in order to use the cursor multiple times. For example, if we want to get the sum and maximum value at the same time, the code can be written as:

 

A2.total(sum(amount),max(quantity))

 

Such a concise piece of code is able to aggregate the total order amount and calculate the maximum number of orders, which is much simple and more efficient compared to Python.

 

Filtering

The filtering operation is similar to aggregation: divide a big file into N segments, filter each segment separately, and finally union the result of each segment to get the target result.

 

Based on the data of the last example, filter out the sale information of New York State.

 

Small result set

Python

import pandas as pd

order_file="D:\data\orders.txt"

chunk_data = pd.read_csv(order_file,sep="\t",chunksize=100000)

chunk_list = []

for chunk in chunk_data:

      chunk_list.append(chunk[chunk.state=="New York"])

res = pd.concat(chunk_list)

print(res)

 

 

 

Define an empty list to store the result

Segment and filter

 

Union the result

 

Python uses Pandas to segment and filter the data, thus gets the target result, but it is still difficult to write. The filtering operation can also be done by reading the data line by line. This method does not need any skill and can be coded just according to the logic, so it will be omitted here.

 

Big result set

A big result set is one that does not fit in memory even after filtering.

import pandas as pd

infile="D:\data\orders.txt"

ofile="D:\data\orders_filter.txt"

chunk_data = pd.read_csv(infile,sep="\t",chunksize=100000)

n=0

for chunk in chunk_data:

    need_data = chunk[chunk.state=='New York']

    if n == 0:

          need_data.to_csv(ofile,index=None)

         n+=1

    else:

          need_data.to_csv(ofile,index=None,mode='a',header=None)

 

 

 

Read the file in blocks

 

 

Filter

 

Export the file

 

Append the file

 

 

The big result set can not be stored in memory, so we have to store the filtered result on hard disk for later calculations. Pandas reads the file in blocks, then filters each block and stores the result on hard disk.

 

SPL


A

B

1

D:\data\orders.txt


2

=file(A1).cursor@t()


3

=A2.select(state=="New York")


4

=A3.fetch()

/fetch the small result set

5

=file(out_file).export@tc(A3)

/export the big result set on hard disk

 

SPL uses the select function to filter the cursor, which is used differently from that calculated in memory. Instead of immediately executing the select action, it only records the to-be-executed select action, and actually executes it when performing “fetch” or exporting.

 

Please note that A4 is to store the small result set in memory; A5 is to store the big result set on hard disk, which can not be executed simultaneously.

 

Sorting

Sorting big data is also a very common operation, which may consume a lot of resources. For example:

 

Sort the orders according to the order amount.

 

Python

import pandas as pd

import os

import time

import shutil

import uuid

import traceback

def parse_type(s):

    if s.isdigit():

        return int(s)

    try:

        res = float(s)

        return res

    except:

        return s

 

def pos_by(by, head, sep):

    by_num = 0

    for col in head.split(sep):

        if col.strip() == by:

            break

        else:

            by_num += 1

    return by_num

 

def merge_sort(directory, ofile, by, ascending=True, sep=","):

    with open(ofile, 'w') as outfile:

        file_list = os.listdir(directory)

        file_chunk = [open(directory + "/" + file, 'r') for file in file_list]

        k_row = [file_chunk[i].readline()for i in range(len(file_chunk))]

        by = pos_by(by, k_row[0], sep)

        outfile.write(k_row[0])

    k_row = [file_chunk[i].readline()for i in range(len(file_chunk))]

    k_by = [parse_type(k_row[i].split(sep)[by].strip())  for i in range(len(file_chunk))]

    with open(ofile, 'a') as outfile:

        while True:

            for i in range(len(k_by)):

                if i >= len(k_by):

                    break

                sorted_k_by = sorted(k_by) if ascending else sorted(k_by, reverse=True)

                if k_by[i] == sorted_k_by[0]:

                    outfile.write(k_row[i])

                    k_row[i] = file_chunk[i].readline()

                    if not k_row[i]:

                        file_chunk[i].close()

                        del (file_chunk[i])

                        del (k_row[i])

                        del (k_by[i])

                    else:

                        k_by[i] =   parse_type(k_row[i].split(sep)[by].strip())

            if len(k_by) == 0:

                break

def   external_sort(file_path, by, ofile, tmp_dir, ascending=True, chunksize=50000, sep=',', usecols=None,index_col=None):

    os.makedirs(tmp_dir, exist_ok=True)

    try:

        data_chunk = pd.read_csv(file_path, sep=sep, usecols=usecols, index_col=index_col, chunksize=chunksize)

        for chunk in data_chunk:

            chunk = chunk.sort_values(by, ascending=ascending)

            chunk.to_csv(tmp_dir + "/" + "chunk" + str(int(time.time()*10**7))+str(uuid.uuid4()) + ".csv", index=None, sep=sep)

        merge_sort(tmp_dir, ofile=ofile, by=by, ascending=ascending, sep=sep)

    except Exception:

        print(traceback.format_exc())

    finally:

        shutil.rmtree(tmp_dir, ignore_errors=True)

if __name__ == "__main__":

    infile = "D:\data\orders.txt"

    ofile = "D:\data\orders_sort.txt"

    tmp = "D:/data/tmp"

    external_sort(infile, 'amount', ofile,   tmp, ascending=True, chunksize=1000000, sep='\t')

 

 

 

 

 

 

Parse the data type of strings

 

 

 

 

 

 

 

 

Calculate the positions of the to-be-sorted column names in the table header

 

 

 

 

 

 

 

Merge and sort on external storage

List the temporary file

Open the temporary file

 

Read the table header

The positions of to-be-sorted column names in the table header

Write the table header

Read the text

Maintain a list of k elements to store k values of sorted columns

 

 

 

Sort

 Read and process files one by one

 

 

 

 

 

 

 

 

End the operation after all files are read

 

 

Sort on external storage

 

 

 

Read in blocks

 

 

Sort in memory

Export to hard disk

 

Merge and sort

 

 

 

Delete the temporary file

The main program

 

 

Sort on external storage

 

Python provides no ready-made function for sorting on external storage, which can only be written by ourselves. But the code for merging and sorting on external storage is much more complicated than those for aggregation or filtering. It is more like an impossible task for many non-professional programmers, and its computational efficiency is not prominent either.

 

SPL


A

B

1

D:\data\orders.txt


2

D:\data\orders_sort.txt


3

=file(A1).cursor@t()

/create a cursor

4

=A3.sortx(amount)

/sort

5

=file(A2).export@t(A4)

/export

 

SPL provides the sortx()function for sorting on cursor, which is written similarly to the sort() function for in-memory operations, except that it returns a cursor and fetches or exports the data to hard disk. This is an outstanding improvement over the hard-coding in Python, both in writing and computing.

 

Grouping and association

It is far too difficult for Python to perform grouping or association on big data. Because such calculation involves Hash and is impossible to be implemented in Python for the average programmers, so we won’t illustrate the code in Python here.

 

Let’s dive into the grouping and association operations in SPL instead.

 

Grouping

The calculation task is to aggregate the sale amount of orders in each state.

 

SPL


A

B

1

D:\data\orders.txt


2

=file(A1).cursor@t()


3

=A2.groups(state;sum(amount):amount)


 

The groups function in SPL supports grouping and aggregation on cursor, which is used in the same way as that in memory. Besides, since it has implemented the effective code of Hash grouping internally, it is easy to write and efficient to compute.

 

Association

Suppose there are ID fields of customers in the order table, and the customer information needs to be associated during the operation.

 

SPL


A

B

1

D:\data\orders.txt


2

D:\data\client.txt


3

=file(A1).cursor@t()


4

=file(A2).import@t()


5

=A3.switch(client,A4:clientid)

/associate

6

=A5.groups(...)

/group and aggregate

 

SPL provides plenty of cursor functions, among which the switch function is used similarly to that in memory, but it returns a deferred cursor and only executes switch action actually when the code is executed later. The whole code is similar to that of in-memory function, which is easy to write and understand with fast computational speed.

 

Summary

It takes Python a lot of efforts to process big data, mainly because it does not provide cursors and related operations for big data. So we have to write code by ourselves, which is not only tedious but also inefficient.

 

While SPL possesses a well-developed system of cursor, and the usage of most cursor functions is similar to that of in-memory functions. Therefore, they are very programmer-friendly, and execute very efficiently because of effective internal algorithms.