Real-time storage and count of ultra-multi-point high-frequency time series data
I. Problem posing
The time series data mainly refers to the data collected and generated by various types of real-time monitoring, inspection and analytical equipment in industries such as electric power, chemical, meteorology and geomatics. The typical characteristics of such industrial data include: i)high generation frequency (each monitoring point will generate multiple pieces of data within one second); ii)rely heavily on the collection time (it is required that each piece of data corresponds to a unique time); iii)huge amount of monitoring points and large amount of data (a conventional real-time monitoring system usually has thousands upon thousands of monitoring points; each point generates multiple pieces of data every second, and dozens of GBs or more data are generated every day). As a result, strict requirements are placed on the storage capacity, write and query performance of database.
In this article, we will take a certain detection system as an example to describe how to use the open-source esProc SPL to handle the real-time storage and count of such industrial data.
Requirement description: store the time series data of up to 200,000 points per second and retain the data for 1 year. Based on these data, count the data of multiple monitoring points within any specified time period, including the calculation of the maximum, minimum, average, variance, median, etc.
We will perform this task step by step as follows:
1) Data collection
2) Data storage
3) Data count
II. Data collection
Data collection refers to the process of reading the data from the detection device to SPL server. Usually, there are two collection methods:
1. Direct calling of the API of detection device
SPL reads the data through directly calling the API of detection device. Due to the inability to establish a unified standard for the APIs of various types of detection devices, it needs to customize and develop a calling interface. The advantage of this method is the lower data latency while the disadvantage is less universal, complicated in protocol conversion and difficult to develop.
2. Message queue
The data generated by the detection device are sent to the message queue in advance, and then SPL consumes the data of message queue. This method allows to directly use SPL’s own message queue client function to connect to the message queue to consume data. The advantage is that it is more universal, while the disadvantage is that the latency is higher than the last method when reading data.
In fact, no matter which method is adopted, the data storage strategy can be considered consistent as long as the data amount and the count requirements remain unchanged. In the following sections, we will take the data collection and consumption method of Kafka as an example to introduce in detail how to implement data storage and count.
III. Data storage
The data structure is as follows:
Field name | Type | Remark |
---|---|---|
TagFullName | String | Monitoring point name |
Time | Long integer | Timestamp |
Type | Value | Data type |
Qualitie | Value | QID |
Value | Floating point | Value |
Typically, the monitoring point name appears in the form of string. If the data were still stored directly by the original monitoring point name, the storage cost would be high, and the search efficiency during counting would be low, so the monitoring point name needs to be numberized. In this case, we may encounter two situations: one is that the monitoring point names are not known in advance and need to be obtained dynamically during data consumption and numberized as dictionary table; the other is that the monitoring point names are already known, and a numberized dictionary table is pre-established based on the known names, and updated irregularly at long intervals.
1. The situation where the monitoring point names need to be dynamically supplemented
A | B | C | D | |
---|---|---|---|---|
1 | =file(“kafka_client.properties”) | |||
2 | =kafka_open(A1;“data-transfer”) | |||
3 | >env(tagsList,if(file(“tagsList.btx”).exists(),file(“tagsList.btx”).import@b().sort(name),create(name,id))) | |||
4 | =now() | |||
5 | for | =now() | ||
6 | =kafka_poll(A2).(json(value)).news(Datas;~.TagFullName,round(Time,-3):Time,~.Type,~.Qualitie,~.Value) | |||
7 | >B4|=B6 | |||
8 | if interval@ms(B5,now())>10000 | |||
9 | =B4.id(#1)\tagsList.(#1) | |||
10 | if C9!=null | >env(tagsList,(tagsList.(#1)|C9).new(~:name,#:id).sort(#1)) | ||
11 | =file(“tagsList.btx”).export@b(tagsList) | |||
12 | =file(“url.txt”).import@i() | |||
13 | =httpfile(“http://”/D12(1)/“:”/D12(2)/“/httpinit.splx”).read() | |||
14 | >B4.run(#1=tagsList.select@b(name==~.TagFullName).id) | |||
15 | >B4=B4.sort(#1,#2) | |||
16 | =maxTime=B4.max(Time) | |||
17 | =file(“tmp_”/maxTime/“.btx”).export@b(B4) | |||
18 | =movefile(file(“tmp_”/maxTime/“.btx”),(maxTime/“.btx”)) | |||
19 | >B5=now(),B4=null |
Lines 1 and 2: use the configuration file kafka_client whose topic is data-transfer to connect kafka server.
Line 3: the global variable tagsList is a numberized dictionary table, which includes the monitoring point name and number, and is sorted by the monitoring point name.
Line 6: the data structure in Kafka is as follows:
{
"Datas":[
{
"TagFullName":"point1",
"Type":10,
"Qualitie":0,
"Value":62.29038255706044
},
…
{
"TagFullName":"point100",
"Type":10,
"Qualitie":0,
"Value":-53.27840963536921
}
],
"Time":1673321221862
}
In cell B6, the kafka_poll function of SPL consumes a round of data from kafka according to the content of configuration file, parses the json to flatten.
Lines 9 to 13: for each batch of in-memory data (consumed from Kafka for more than 10 seconds, which is determined in B8), calculate the difference set between the sequence obtained after de-duplicating and sorting the first column and the sequence of ordered monitoring point names in the known dictionary table. If the result is not empty, merge the existing monitoring point name sequence and the new monitoring point names to generate a new dictionary table, and then update the global variable and the external storage dictionary table. In addition, since the http service for counting is in another process, the global variable of the count service also needs to be updated. For details, see the “data count” section below.
Line 14: now that there is a dictionary table, the monitoring point names in the in-memory data of current batch can be numberized.
Line 15: sort the in-memory data by monitoring point and time.
Lines 16 to 18: export the sorted in-memory data to the bin file named after the maximum timestamp of this batch.
Line 19: reset the start time of the next batch of data consumption and clear the in-memory data.
2. The situation where some of monitoring point names are already known and needs to be updated later
The following code is used for the data consumption from Kafka and then persistence of data:
A | B | C | |
---|---|---|---|
1 | =file(“kafka_client.properties”) | ||
2 | =kafka_open(A1;“data-transfer”) | ||
3 | >env(tagsList,file(“tagsList.btx”).import@b().sort(name)) | ||
4 | =now() | ||
5 | for | =now() | |
6 | =kafka_poll(A2).(json(value)).news(Datas;tagsList.select@b(name==~.TagFullName).id:TagFullName,round(Time,-3):Time,~.Type,~.Qualitie,~.Value) | ||
7 | >B4=[B4,B6.sort(#1,#2)].merge(#1,#2) | ||
8 | if interval@ms(B5,now())>=10000 | ||
9 | =maxTime=B4.max(Time) | ||
10 | =file(“tmp_”/maxTime/“.btx”).export@b(B4) | ||
11 | =movefile(file(“tmp_”/maxTime/“.btx”),maxTime/“.btx”) | ||
12 | >B5=now(),B4=null |
Most of this code is already provided earlier and will not be explained in detail.
The way for adding new monitoring point name can be got through consumed data from kafka:
A | B | C | D | |
---|---|---|---|---|
1 | =file(“addpoint.properties”) | |||
2 | =kafka_open(A1;“insert-point”) | |||
3 | for | =now() | ||
4 | =kafka_poll(A2).conj(json(value)) | |||
5 | if B4.len()>0 | =B4.sort()\tagsList.(#1) | ||
6 | if C5!=null | =C5.len() | ||
7 | >env(tagsList,(tagsList.(#1)|C5).new(~:name,#:id).sort(#1)) | |||
8 | =file(“tagsList.btx”).export@b(tagsList) | |||
9 | =file(“url.txt”).import@i() | |||
10 | =httpfile(“http://”/D9(1)/“:”/D9(2)/“/httpinit.splx”).read() |
Likewise, this part of code is already provided earlier and will not be explained in detail. Note that the two SPL scripts need to be executed in the same process, as both of them utilize the global variable tagsList.
3. Data completing
In actual generation environment, there may also be a situation where if the data does not change in two consecutive seconds, then the data of the latter second won’t be sent to Kafka. In this case, if the data are persisted according to the above code, data missing will occur. To solve this problem, we need to complete the data in memory before writing to file.
To make the data complete, the above code should be modified as:
A | B | C | |
---|---|---|---|
1 | =file(“kafka_client.properties”) | ||
2 | =kafka_open(A1;“data-transfer”) | ||
3 | >env(tagsList,file(“tagsList.btx”).import@b().sort(name)) | ||
4 | >env(valsList,tagsList.len().([,,])) | ||
5 | =now() | ||
6 | for | =now() | |
7 | =kafka_poll(A2).(json(value)).news(Datas;tagsList.select@b(name==~.TagFullName).id:TagFullName,round(Time,-3):Time,~.Type,~.Qualitie,~.Value) | ||
8 | >B5=[B5,B7.sort(#1,#2)].merge(#1,#2) | ||
9 | if interval@ms(B6,now())>=12000 | ||
10 | =valsList.len().run(cedianhao=~,~=10.new(cedianhao:TagFullName,minTime+(~-1)*1000:Time,Type,Qualitie,Value)).conj() | ||
11 | =B5.select(Time<(minTime+10000)) | ||
12 | =C11.([#1,#2]) | ||
13 | >C10.select(C12.contain@b([TagFullName,Time])).run(Type=C11(#).Type,Qualitie=C11(#).Qualitie,Value=C11(#).Value) | ||
14 | >C10.run(tfn=#1,if(Value==null,Type=valsList(tfn) (1),valsList(tfn) (1)=Type),if(Value==null,Qualitie=valsList(tfn) (2),valsList(tfn) (2)=Qualitie),if(Value==null,Value=valsList(tfn) (3),valsList(tfn)(3)=Value)) |
||
15 | =file(“tmp_”/(minTime+9000)/“.btx”).export@b(C10) | ||
16 | =movefile(file(“tmp_”/(minTime+9000)/“.btx”),(minTime+9000)/“.btx”) | ||
17 | >B5=B5.select(Time>=(minTime+10000)),minTime+=10000 | ||
18 | >B6=datetime(long(B6)+10000) |
This code adds a global variable valsList to store the data generated in the previous second for each monitoring point.
Line 9: the time interval of each round of data consumption is set to 12 seconds. Considering there will be a network delay in sending data, we assume that the delay does not exceed two seconds, then the data generated in the first 10 seconds of 12 seconds will definitely be obtained.
Line 10: since the monitoring point name and time are already known, pre-create a table sequence in which other data are empty.
Lines 11 to 14: select the data of the first 10 seconds of 12 seconds, fill the data in the empty table sequence created in line 10, and use valsList to complete the data of the filled table sequence to obtain complete in-memory data.
The way for adding new monitoring point name is basically the same. It only needs to add >env(valsList,valsList|=C5.([,,])) to ensure the length of the monitoring point number of the previous second is kept consistent.
In any of the above situations, a bin file will be generated every 10 seconds. The difference is the way to update the monitoring point name and dictionary table, and the necessity to complete the data. The data storage function at this level is not enough, for the reason that for a long count time period, it will involve more files and more search times, which will affect the counting efficiency. To avoid this problem, the data need to be layered and merged according to actual data scale.
4. Data layering and merging
The purpose of data merging is mainly to improve the counting efficiency, and two factors should be taken into account: the time span within a single data file, and the number of files. For the former, the smaller the span, the easier it is to directly select effective files based on the time period in the counting condition; for the latter, the more the number, the more the search times. The two factors are contradictory, and a compromise needs to be found through testing based on actual data size.
Now we take 10-minute data as an example, the layering code is as follows:
A | B | |
---|---|---|
1 | =t=0 | |
2 | for | =now() |
3 | =directory(“?????????????.btx”).conj().sort(file(~).date()).m(:-2) | |
4 | =B3.max(long(left(~,13))) | |
5 | =B3.(file(~).cursor@b()).mergex(#1,#2) | |
6 | =file(“tmpl2_”/B4/“.ctx”).create@y(#TagFullName,#Time,Type,Qualitie,Value) | |
7 | >B6.append(B5) | |
8 | >B6.close() | |
9 | =movefile(file(“tmpl2_”/B4/“.ctx”),“l2_”/B4/“.ctx”) | |
10 | =B3.(movefile(~)) | |
11 | >t=interval@ms(B2,now()) | |
12 | =sleep(600000-t) |
At every 10 minutes, merge the bin files sorted by current n-1 monitoring points of every 10 seconds and time into a composite table file (the reason why the bin files are merged into a composite table is that the amount of data generated at 200,000 monitoring points in 10 minutes is large enough, otherwise the bin file is still used until the amount of data of a certain layer is very large. In practice, the bin file is usually used for storing small table, while the composite table is used for storing large table. The main reason for this arrangement is that large tables have significant impact on performance, and storing them as composite table helps improve system’s overall performance. Moreover, there is an index block at the header of composite table, and at least one block is required even if there is only one row of data, and more blocks will be required if the columnar storage is adopted. Index blocks can be ignored for big data, but they are not suitable for small data).
IV. Data count
The count service usually needs to be called by other systems. In this case, we can use SPL’s own http service for calling. However, since the http service and the backend script for storing data are not in the same process, the global variable in the data storage function cannot be shared. As a result, the dictionary table needs to be synchronized whenever the monitoring point table is updated.
1. Synchronization of monitoring point name and dictionary table
The code for synchronizing the dictionary table for count service based on the latest numberized monitoring point name file is very simple, just one statement:
>env(tagsList,file("tagsList.btx").import@b().sort(name))
Having synchronized the dictionary table, efficient counting function can be implemented based on appropriate layered file. Yet, because the file types of large table storage are different from those of small table storage, it needs to calculate different types of files separately when counting, and merge the calculation results.
2. Mixed calculation of composite tables and bin file
Let’s take the calculation of the average as an example:
A | B | C | |
---|---|---|---|
1 | =tags | =st | =et |
2 | =directory(“l*.ctx”).sort(mid(~,4,13)) | ||
3 | =A2.pselect(long(mid(~,4,13))>=st) | ||
4 | =A2.pselect(long(mid(~,4,13))>=et) | ||
5 | if A3!=null || A4!=null | =ctxFiles=A2.m(ifn(A3,1):ifn(A4,-1)) | |
6 | >ctxResult=null | ||
7 | for A1 | >ctxResult|=ctxFiles.(file(~).open().cursor(TagFullName,Time,Value;TagFullName==B7).select(between@r(Time,st:et))).conjx().new(TagFullName,Value).fetch() | |
8 | else | >ctxResult=null | |
9 | =directory(“?????????????.btx”).sort(left(~,13)) | ||
10 | fork A9 | =file(B10).iselect@rb(A1,TagFullName).select(between@r(Time,st:et)).new(TagFullName,Value).fetch() | |
11 | =btxResult=B10.conj().sort(#1) | ||
12 | =[ctxResult,btxResult].conj().group(#1).conj(~.select((#-1)%sn==0).groups(#1;avg(#2))).run(tmp=#1,#1=tagsList.select(#2==tmp).#1) |
tags refers to the numberized monitoring point name such as: [51,2,39,…], st is the start timestamp for counting, and et is the end timestamp for counting.
Lines 2 to 8: find out the composite table files that satisfy the time interval, and search for the monitoring points and values that satisfy the counting condition in turn.
Lines 9 to 11: parallel search of all bin files to search for the monitoring points and values that satisfy the counting condition.
Line 12: calculate the result in a mixed way and convert the numberized monitoring point names to string.
SPL Official Website 👉 https://www.scudata.com
SPL Feedback and Help 👉 https://www.reddit.com/r/esProcSPL
SPL Learning Material 👉 https://c.scudata.com
SPL Source Code and Package 👉 https://github.com/SPLWare/esProc
Discord 👉 https://discord.gg/cFTcUNs7
Youtube 👉 https://www.youtube.com/@esProc_SPL
Chinese version