Parallel computation of external data by esProc
Parallel computing of TXT files
SPL can roughly divide the text file into n segments by volume and only read one segment. For example, cardinfo.txt stores 10 million pieces of population information, which can be divided into ten parts. Take the second part, and the code can be written as follows:
A |
B |
|
1 |
=file("d:\\temp\\cardInfo.txt") |
|
2 |
=A1.import@t(;2:10) |
/ Read directly into memory |
3 |
=A1.cursor@t(;2:10).fetch@x() |
/Read in by a cursor |
Segment roughly by volume, rather than precisely by number of rows, in order to improve segmentation performance. For example, if you look at the first few fields of A2 or A3 in IDE, you can see that the number of rows is not exactly 1 million (related to specific data):
index |
cardNo |
name |
gender |
province |
mobile |
1 |
308200310180525 |
Alison Clinton |
female |
Idaho |
1024627490 |
2 |
709198311300191 |
Abby Wood |
female |
Kansas |
19668466 |
3 |
1005199807060610 |
George Bush |
male |
California |
1019879226 |
… |
… |
… |
… |
… |
… |
1000005 |
405199907050256 |
Mark Rowswell |
male |
Idaho |
1168620176 |
Segmented reading can be applied to multithreaded computing, so as to improve reading performance. For example, two threads are used to read cardinfo.txt. Each thread calculates the number of lines in this section, and finally merges them into the total number of lines. The following code is available:
5 |
fork to(2) |
=A1.cursor@t(;A5:2).total(count(1)) |
/2 threads |
6 |
=A5.sum() |
/Merge results |
The fork statement is suitable for complex algorithm. When the algorithm is simple, cursor@m can be used to read in by segment directly. For example, the previous code can be rewritten as follows:
7 |
=A1.cursor@tm(;2).total(count(1)) |
/2 threads |
The above code specifies the number of threads. If the number of threads is omitted, "parallel limit" in the configuration file is used as the default number of threads. Assuming parallel limit = 2, the above code can be rewritten as:
8 |
=A1.cursor@tm().total(count(1)) |
/ Default number of threads |
In order to verify the performance difference before and after segmented reading, an algorithm is designed below to calculate the total row number of cardinfo.txt with single thread and two threads respectively, and the performance can be significantly improved:
11 |
=now() |
|
12 |
=A1.cursor@t().total(count(1)) |
|
13 |
=interval@ms(A11,now()) |
/without segmentation,20882ms |
14 |
||
15 |
=now() |
|
16 |
=A1.cursor@tm(;2).total(count(1)) |
|
17 |
=interval@ms(A15,now()) |
/segmentation by 2 threads,12217ms |
JDBC parallel
When accessing data through JDBC, we sometimes encounter the situation that the database load is not heavy, but the access performance is still poor. In this case, we can use parallel retrieving data to improve the performance.
For example, Oracle database has a call record table, with 1 million records. The index field is calltime, and the data is basically distributed evenly according to this field. When using non parallel access, you can find that the performance is not ideal. The code is as follows:
A |
B |
|
1 |
=now() |
/ Record time for testing performance |
2 |
=connect("orcl") |
|
3 |
=A2.query@x("select * from callrecord") |
|
4 |
=interval@ms(A1,now()) |
/Non parallel data fetching,17654ms |
After changing to 2-thread parallel access, you can see that the performance is improved significantly, and the code is as follows:
6 |
=now() |
|
7 |
=connect("orcl").query@x("select min(callTime),max(callTime) from callrecordA") |
|
8 |
=2.(range(A7.#1,elapse@s(A7.#2,1),~:2)) |
/ Time interval parameter list |
9 |
fork A8 |
=connect("orcl") |
10 |
=B9.query@x("select * from callrecordA where callTime>=? and callTime<?",A9(1),A9(2)) |
|
11 |
=A9.conj() |
|
12 |
=interval@ms(A6,now()) |
/Data fetching in parallel,10045ms |
Since we want to access data in parallel, we need to divide the source data into multiple intervals so that the amount of data in each interval is approximately equal. In this example, the index field is the time type calltime, so use A7 to find the data range of calltime, and then use A8 to divide the range into two time intervals on average. After parallel computing in A9, each thread executes SQL with its own time interval as a parameter, and the result of fetching will be approximately equal. Finally, the fetching results of multithreading are merged as the final result.
The function range is very suitable for data segmentation. This function can divide a range into n intervals and get the ith interval. The data type of the interval can be adjusted automatically according to the data type of the range. The range type of this example is datetime, the function range will divide the range by seconds, and the return type is also datetime. If the range type is date, the function range will divide evenly by day; if the range type is integer, the function range will divide evenly by integer.
In the above example, the segmented field is an index. If the index is not established, the query performance will decline. In this case, parallel access can still bring significant performance improvement, so the same method can be used.
In the above example, the source data is basically evenly distributed by calltime, so it is easy to make the data amount of each interval approximately equal. If the source data distribution is very uneven, you can consider segmenting by row number. Each database has a way to generate row numbers, for example, ORALCE can use rownum.
In addition to single table single SQL parallel data fetching, SPL also supports multi table and multi SQL parallel data fetching. For example, the format of a report is complex, which requires SPL to execute multiple SQL statements and splice the result set in a certain format. When using non parallel data fetching, you can find that the performance is not ideal. The code is as follows:
A |
B |
|
1 |
=now() |
=connect("orcl") |
2 |
select count(1) from callrecordA where to_char(calltime,'yyyy')='2015' |
=B1.query(A2) |
3 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201501' |
=B1.query(A3) |
4 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201502' |
=B1.query(A4) |
5 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201503' |
=B1.query(A5) |
6 |
select count(1) from callrecordA where to_char(calltime,'yyyy')='2016' |
=B1.query(A6) |
7 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201601' |
=B1.query(A7) |
8 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201602' |
=B1.query(A8) |
9 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201603' |
=B1.query(A9) |
10 |
=B1.close() |
|
11 |
=[B2:B9].new(~.#1:data) |
|
12 |
=interval@ms(A1,now()) |
/Non parallel data fetching,2195ms |
After changing to 4-thread parallel data fetching, you can see that the performance is improved significantly, and the code is as follows:
14 |
=now() |
|
15 |
fork [A2:A9] |
=connect("orcl") |
16 |
=B15.query@x(A15) |
|
17 |
=A15.new(~.#1:data) |
|
18 |
=interval@ms(A14,now()) |
/4 thread parallel data fetching,1320ms |
It should be noted that the number of tasks can be greater than the parallel number when parallel fetching. For example, there are 8 tasks in the above code, but only 4 tasks are executed at the same time. Other tasks to be executed are in the queue. If a small task is finished first, SPL will take a task from the queue and execute it. It can be seen that when the number of tasks is large, the hardware performance can be brought into full play even if the load of each task varies greatly.
Parallel computing in hybrid data sources
When the amount of data is too large, in addition to the split databases computing, the hybrid data source parallel computing can also be carried out, and the latter has higher performance. The specific method is: divide the data into two parts (or more parts), one part is stored in the database, usually the current real-time data, one part is stored in the group file, usually the historical data, and then calculate the two data sources in parallel, so as to obtain higher performance.
For example, historical orders are stored in orders.ctx, and current orders are stored in the database orcl. Please group by year and month and sum the amount field of each group of data. The SPL code is as follows:
A |
B |
|
1 |
fork |
select extract(year from orderTime)y,extract(month from orderTime)m,sum(amount) amount from orders group by extract(year from orderTime),extract(month from orderTime) |
2 |
=connect("orcl") |
|
3 |
=B2.query@x(B1) |
|
4 |
fork |
=file("orders.ctx").create() |
5 |
=B4.groups(year(ORDERTIME):Y,month(ORDERTIME):M;sum(AMOUNT): AMOUNT) |
|
6 |
=[A1,A4].conj() |
|
7 |
=A6.groups(Y,M;sum(AMOUNT): AMOUNT) |
Pay attention to fork……fork…… usage. If a fork block is followed by a non fork block, the two execute in sequence. If a fork block is followed by a fork block, the two execute in parallel.
SPL Official Website 👉 https://www.scudata.com
SPL Feedback and Help 👉 https://www.reddit.com/r/esProcSPL
SPL Learning Material 👉 https://c.scudata.com
SPL Source Code and Package 👉 https://github.com/SPLWare/esProc
Discord 👉 https://discord.gg/cFTcUNs7
Youtube 👉 https://www.youtube.com/@esProc_SPL
Chinese version