Comparison of esProc composite table, ORC, Parquet

 

1.     Preface

Big data technology has given rise to some columnar storage formats, and suitable storage solutions are the foundation of high-performance computing. This article mainly compares the differences in data compression and reading between three open-source columnar storage files: esProc composite table, ORC, and Parquet from an application perspective.

2.     Prepare data

Table inbound storage data structure:

No Field   name Field   type
1 pid long
2 store string
3 product int
4 indate date
5 num int

 
Generate 200 million items of txt data for the inbound table, store them locally as esProc composite tables, ORC, and Parquet files, compare their sizes, and test their corresponding read performance. The 'indate' field refers to the storage method of the Hive table and is converted into numerical format for storage.  

Generate txt data using the esProc SPL script, and then convert this txt data into other data format files for testing.

3.     Generate data

Generate unordered data from the inbound table and save it as esProc composite table, ORC, and Parquet files.

3.1 TXT data

The SPL script for generating TXT data is as follows:

A B C D
1 /uuid(Segmented data  ) / Segment  Number /Segment length /Sequence number in   the segment
2 [] 1 0 0
3 [[300000,400000],[50000,     100000],[600000,700000],[100000,200000],[200000,300000],[1,     50000],[400000,500000],
    [900000,1000000],[500000,600000],[800000,900000],[700000,800000]]
0 /B3:   Starting   position of segment
4 for   220 for   A3 =[[B4(1)+(A4-1)*1000000,     B4(2)+(A4-1)*1000000]]
5 =A2=A2.insert(0,     C4)
6 1000 100 1500 10000
7 200000000 20 60 10
8 0 =date(2015,10,1) =date(2016,9,30) 100
9 =create(pid,     store,product,indate,num)
10 =file("H:/tmp/inbound.txt")  
11 func   genOne   (store,product) ="store_id_"+mid(string(store+1000000),2)  
12 =B7+rand(C7-B7) =to(1000).(rand(interval(B8,C8))).id@u().m(to(B12)).sort()  
13 for   B12 =elapse(B8,C12(B13)) =D7+rand(D8-D7)
14 >A8   = A8+1
15 if  D2>=C2 >B3=A2(B2)(1)
16 >C2   =   A2(B2)(2)-B3
17 >B2=B2+1
18 =D2=0
19 >A9.insert(0,(D2+B3),     B11,product,C13,D13)
20 >D2   = D2+1
21 for   D6 for   A6 =func(genOne,A21,B21)  
22 if   (A8 >=   A7) >A10.export@a(A9.cursor())  
23 break   A21
24 >A10.export@at(A9.cursor())   >A9.reset()

 
Generate 200 million items of data and store them in a txt file, with a file size of 8448MB. The A2 sequence is used to assist in generating unordered and non-repeating numbers as key values.

3.2 esProc composite table

The script for generating esProc composite table data is as follows:

A

B

C

1 =now()   0
2 =file("H:/tmp/   inbound.txt").cursor@t()
3 =file("H:/tmp/data3/     inbound.ctx").create@y(#pid, store, product, indate,num)
4 for   A2, 50000 >A3.append(A4)  
5 =B1=B1+A4.len()  
6 if   B1%5000000==0 >output(B1)  
7 >A3.close()  
8 >output("sum   =" / B1)
9 =interval@ms(A1,now())  


Convert txt data into composite table data, and the generated composite table ctx file size is 608MB.

3.3 ORC

Use Java program to convert txt data into ORC file data.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.CompressionKind;
import com.scudata.common.Logger;
import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;

public class CreateOrcFile {
    private TypeDescription m_schema;
    Writer m_writer;

    public CreateOrcFile (String destFile){
        try {
            /* Define table structure*/
            String struct = "struct<pid:int,store:string,product:int,indate:date,num:int>";
            m_schema = TypeDescription.fromString(struct);
       	
            File f = new File(destFile);
            if (f.exists()) {

               f.delete();
            }
            m_writer = getWriter(destFile);    	
         } catch (IOException e) {    	
            Logger.error("CreateOrcFile:: "+e);
        }
    }
 
    public static void main(String[] args) throws IOException {
        String srcFile = "H:/tmp/data3/inbound.txt";
        String destFile = "h:/tmp/data3/out_inbound.orc";
        CreateOrcFile cls = new CreateOrcFile(destFile);
        cls.writeToOrcFile(srcFile);
    }
 
    private void writeToOrcFile(String srcFile)
    {
      try {
        Sequence tab = null;
        FileObject oRet = new FileObject(srcFile);    	
        String exp = String.format(";,\",\"");
        IParam param = ParamParser.parse(exp, null, null);
        ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context());
 
        long total = 0;
        int unitSize = 10000;
        VectorizedRowBatch batch = m_schema.createRowBatch();
        LongColumnVector pid = (LongColumnVector) batch.cols[0];
        BytesColumnVector store = (BytesColumnVector) batch.cols[1];
        LongColumnVector product = (LongColumnVector) batch.cols[2];
        DateColumnVector indate = (DateColumnVector) batch.cols[3];
        LongColumnVector num = (LongColumnVector) batch.cols[4];
  	
        while(null!=(tab = (Sequence) c.fetch(unitSize)) ) {
          total += tab.length();
          for(int i=0; i<tab.length(); i++) {
            BaseRecord sq = (BaseRecord)tab.get(i+1);
               
            int row = batch.size++;
            pid.vector[row] = Long.parseLong(sq.getFieldValue(0).toString());
            byte[] buffer = ((String)sq.getFieldValue(1)).getBytes(StandardCharsets.UTF_8);
            store.setRef(row, buffer, 0, buffer.length);
            product.vector[row] = (Integer)sq.getFieldValue(2);
            Date d = (Date)sq.getFieldValue(3);
            indate.vector[row] = dateToDays(d);
            num.vector[row] = Long.parseLong(sq.getFieldValue(4).toString());
 
            if (batch.size == batch.getMaxSize()) {
              m_writer.addRowBatch(batch);
              batch.reset();
            }        	
          }
      
          if (total %500000==0) {
            System.out.println("idx = "+total);
          }
        
          if (tab.length()<unitSize) {
            break;
          }
        }
        if (batch.size != 0) {
          m_writer.addRowBatch(batch);
        }
  	
        m_writer.close();
      }catch(Exception e) {
        System.out.println("aaa: "+e);
      }
   }
  
   /* Convert date to days*/
   public static int dateToDays(Date date) { 
      int days = 0;
      ZoneId zoneId = ZoneId.systemDefault();
      int offsetSeconds = zoneId.getRules().getOffset(Instant.now()).getTotalSeconds();
      long zonems = offsetSeconds * 1000;
      long milliseconds = date.getTime()+zonems;
      days = (int)(milliseconds / (1000 * 60 * 60 * 24));
 
      return days;
   }
 
   /* ORC write operation object */
   private Writer getWriter(String filePath) throws IOException {    	
      return OrcFile.createWriter(new Path(filePath), 
        OrcFile.writerOptions(new Configuration()).
        setSchema(m_schema).
        compress(CompressionKind.SNAPPY)); 
    }
  }

Convert txt data into ORC file data, generating a file size of 513.9MB.

3.4 Parquet

Same as above, generate data using Java program.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
 import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;
public class CreateParquetFile {
	private static final MessageType FILE_SCHEMA;
	private static final String TABLE_NAME = "inbound_parquet";
	ParquetWriter m_writer;
        static {
	  // Define table structure
	  Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
	  messageTypeBuilder
		.optional(PrimitiveType.PrimitiveTypeName.INT64)
		.named("pid")
	  .optional(PrimitiveType.PrimitiveTypeName.BINARY)
	  .as(LogicalTypeAnnotation.stringType())
	  .named("store")
	  .optional(PrimitiveType.PrimitiveTypeName.INT32)
	  .named("product")
	  .optional(PrimitiveType.PrimitiveTypeName.INT32)
	  .named("indate")
	  .optional(PrimitiveType.PrimitiveTypeName.INT64)
	  .named("num");
	  FILE_SCHEMA = messageTypeBuilder.named(TABLE_NAME);
	}
 
      public CreateParquetFile (String outFilePath){ try { File f = new File(outFilePath); if (f.exists()) { f.delete(); } m_writer = getWriter(outFilePath); } catch (IOException e) { e.printStackTrace(); } } /* Get write operation object */ private static ParquetWriter getWriter(String filePath) throws IOException {Path path = new Path(filePath); return ExampleParquetWriter.builder(path) .withWriteMode(ParquetFileWriter.Mode.CREATE) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .withCompressionCodec(CompressionCodecName.SNAPPY) .withConf(new Configuration()) .withType(FILE_SCHEMA).build();} /* Generate ParquetFile*/ private void writeToParquetFile(String sfile) { try { FileObject oRet = new FileObject(sfile); String exp = String.format(";,\",\""); IParam param = ParamParser.parse(exp, null, null); ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context()); Sequence tab = null; long total = 0; while(null!=(tab = (Sequence) c.fetch(10000)) ) { total += tab.length(); writeToFile(tab); if (total %500000==0) { System.out.println("idx ="+total); } if (tab.length()<10000) {break; } } m_writer.close(); }catch(Exception e) { System.out.println("aaa:"+e); } } public static int dateToInt(Date date) {long milliseconds = date.getTime(); return (int) (milliseconds / 1000); } /* Data writing */ private void writeToFile(Sequence tab) throws IOException { SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(FILE_SCHEMA); Group group = null; for(int i=0;i<tab.length(); i++) { BaseRecord sq = (BaseRecord)tab.get(i+1); try { group = simpleGroupFactory.newGroup(); group.append("pid", Long.parseLong(sq.getFieldValue(0).toString()) ); group.append("store", (String)sq.getFieldValue(1)); group.append("product", (Integer)sq.getFieldValue(2)); Date d = (Date)sq.getFieldValue(3); group.append("indate", dateToDays(d)); group.append("num", Long.parseLong(sq.getFieldValue(4).toString()) ); m_writer.write(group); }catch(Exception e) { System.out.println("writeToFile:"+e); } } } public static int dateToDays(Date date) { int days = 0; ZoneId zoneId = ZoneId.systemDefault(); int offsetSeconds = zoneId.getRules().getOffset(Instant.now()).getTotalSeconds(); long zonems = offsetSeconds * 1000; long milliseconds = date.getTime()+zonems; days = (int)(milliseconds / (1000 * 60 * 60 * 24)); return days; } public static void main(String[] args) throws IOException { String srcFile = "H:/tmp/data3/inbound.txt"; /* Retrieve the directory where the file is written */ String destFile= "h:/tmp/data3/out_inbound.parquet"; CreateParquetFile cls = new CreateParquetFile(destFile); cls.writeToParquetFile(srcFile); System.out.println("OK."); } }

Convert text data into Parquet file data, generating a file size of 1157MB.

 

4.     Read data

Read the file data records, convert them according to the field data type, and store them in the Object [] array. Use this method to traverse and test the read time of the esProc composite table, ORC, and Parquet files.

4.1 esProc composite table

SPL script to read data:

A B
1 =now()  
2 =file("H:/tmp/data3/inbound.ctx").open()  
3 =A2.cursor()  
4 for   A3, 10000 >B1=B1+A4.len()  
5 =A2.close()  
6 >output("total="   / B1)
7 =interval@ms(A1,now())  


Using cursor traversal to read composite table data, takes 11.053 seconds.

4.2 ORC

Using Java program to read:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.time.LocalDate;

public class InboundReader {
    public static void main(String[] args) throws Exception {
      try {
      long start = System.currentTimeMillis();
      String sfile = "H:/tmp/data3/out_inbound.orc";
      Reader reader = OrcFile.createReader(new Path(sfile), OrcFile.readerOptions(conf));
      TypeDescription readSchema = reader.getSchema();
      System.out.println("Row count: " + reader.getNumberOfRows());
      VectorizedRowBatch batch = readSchema.createRowBatch(50000);
      RecordReader rowIterator = reader.rows(reader.options().schema(readSchema));
      LongColumnVector pid = (LongColumnVector) batch.cols[0];
      BytesColumnVector store = (BytesColumnVector) batch.cols[1];
      LongColumnVector product = (LongColumnVector) batch.cols[2];
      DateColumnVector indate = (DateColumnVector) batch.cols[3];
      LongColumnVector total = (LongColumnVector) batch.cols[4];
      long sum = 0;

      String[] cols = new String[]{"pid","store", "product","indate","num"};
      Object[] items = new Object[cols.length];

      while (rowIterator.nextBatch(batch)) {				
        for (int row = 0; row &lt; batch.size; ++row) {	
           int productRow = product.isRepeating ? 0 : row;
           int indateRow = indate.isRepeating ? 0 : row;
           int totalRow = total.isRepeating ? 0 : row;
           items[0] = pid.vector[row];
           items[1] = store.toString(row);
           items[2] = (product.noNulls || !product.isNull[productRow] ? product.vector[productRow] : 0);
           items[3] = (indate.noNulls || !indate.isNull[indateRow] ? LocalDate.ofEpochDay(indate.vector[indateRow]).toString() : null);
           items[4] = (total.noNulls || !total.isNull[totalRow] ? total.vector[totalRow] : 0);
         }
        sum+=batch.size;
      }

       rowIterator.close();
       reader.close();
       System.out.println("sum = "+ sum+"; Time = "+(System.currentTimeMillis() - start));
      }catch(Exception e) {
       System.out.println(e);
      }	
   }
}

Traverse and read the ORC file, taking 19.595 seconds.
 

4.3   Parquet

import java.time.LocalDate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupValueSource;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.GroupType;
import com.scudata.common.Logger;

public class ParquetReaderTest {
  public static void main(String[] args) throws Exception {
    try {
      long nStart = System.currentTimeMillis();
      ParquetReaderTest cls = new ParquetReaderTest();
      String srcFile = "h:/tmp/data3/out_inbound.parquet";
      cls.parquetRead(srcFile);
      System.out.println("time = " + (System.currentTimeMillis() - nStart));
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

    private void parquetRead(String srcFile) throws Exception {
      long sum = 0;
      Map map = new LinkedHashMap&lt;&gt;();
      map.put("pid", "long");
      map.put("store", "string");
      map.put("product", "int");
      map.put("indate", "date");
      map.put("num", "long");

      GroupReadSupport readSupport = new GroupReadSupport();
      ParquetReader reader = new ParquetReader(new Path(srcFile), readSupport);
      Group line = null;
      Object[] items = null;
      while ((line = reader.read()) != null) {
        items = readGroup(line, map.values());
        sum++;
        if (sum % 5000000 == 0) {
          System.out.println("idx =" + sum);
        }
      }
      reader.close();
      System.out.println("sum=" + sum);
  }

    private Object[] readGroup(Group g, Collection colsType) throws Exception {
      Object[] items = new Object[colsType.size()];
      try {
        String sType = null;
        Iterator iter = colsType.iterator();
        int n = 0;
        while (iter.hasNext()) {sType = iter.next();
          Type colType = g.getType().getFields().get(n);
          if (colType.isPrimitive()) {if (sType.equals("string")) {items[n] = g.getString(n, 0);
            } else if (sType.equals("int")) {items[n] = g.getInteger(n, 0);
            } else if (sType.equals("long")) {items[n] = g.getLong(n, 0);
            } else if (sType.equals("double")) {items[n] = g.getDouble(n, 0);
            } else if (sType.equals("float")) {items[n] = g.getFloat(n, 0);
            } else if (sType.equals("boolean")) {items[n] = g.getBoolean(n, 0);
            } else if (sType.equals("int96")) {items[n] = g.getInt96(n, 0);
            } else if (sType.equals("date")) {items[n] = LocalDate.ofEpochDay(g.getInteger(n, 0)).toString();}
            n++;
          } else {GroupValueSource subG = g.getGroup(n, 0);
            GroupType pt = subG.getType();
            int colSize = pt.getFieldCount();
            System.out.println("subColSize =" + colSize);
          }
       }
    } catch (Exception e) {
         Logger.error("readGroup::" + e);
    }
    return items;
  }
}

Traverse and read Parquet file, taking 85.370 seconds. Testing has found that if the date type is stored in int96 binary format, converting it to date is slower.

5.     Summary

Composite   table ORC         Parquet    TXT
Compression format lz4 snappy snappy No
File size (MB) 608           513.9      1157   8448
Time taken for the 1st time (second) 15.519  28.517    93.972
Time taken for the 2nd time (second) 10.841  18.034    76.114
Time taken for the 3rd time (second)   11.053  19.595 85.370


From the table, it can be seen that compared to uncompressed text data, these three columnar storage formats have good data compression ratios, greatly saving data storage space. Among them, ORC has the highest compression rate, while the esProc composite table is slightly lower but the difference is not significant (about 20%). Parquet's compression rate is significantly worse, already exceeding twice that of ORC.

In terms of reading speed, esProc composite table is the fastest, almost twice as fast as ORC and far exceeding Parquet. This can indicate from another perspective that Parquet may have become an outdated file format and is being replaced by ORC.

From both the perspective of compression rate and read performance, the overall advantage of esProc composite table is greater. If we also consider the flexible segmentation and ordered positioning functions on the esProc composite table, its performance advantages for big data computing will be even more apparent.