Cylon Library for Fast, Scalable Data Engineering
Large-scale data engineering has gone through a remarkable transformation over the past decade. We have seen wide adoption of Big Data frameworks from Apache Hadoop, and Apache Spark to Apache Flink. Today, Artificial Intelligence (AI) and Machine Learning (ML) have further broadened the scope of data engineering, which demands faster and more integrable systems that can operate on both specialized and commodity hardware.
A data science workflow is a complex interactive process. It starts with data in large data stores. We create structured data sets from these row data using ETL (Extract, Transform Load) tools such as Hadoop or Spark. The structured data are stored in databases or files.
Once we have the data in structured storage we can start the iterative process of hypothesis creation, model training, and validation. The structured data is queried and transformed to fit the inputs of Machine learning or deep learning algorithms. We look at our outputs and validate them and repeatedly change our hypothesis and model parameters until we get satisfactory models. This process involves different tools, languages, different hardware platforms, data storages, and visualization tools. The actual Machine learning / Deep learning part may be very small compared to the overall data engineering environment surrounding it.
Two Ecosystems
The tools available to make this work comes from two separate ecosystems. On one side there are enterprise data management and data processing tools around the Java language. On the other side, there are machine learning, deep learning, visualization, and statistics frameworks around the Python language. We rarely see researchers use Flink, Spark, or Hadoop for doing serious work at the research application level. On the other hand, Python and deep learning frameworks are thriving in HPC clusters inside national labs and universities. This is because these Python frameworks are using high-performance C++ code underneath the user-friendly APIs.
To become part of the machine learning / deep learning world big data frameworks provide Python APIs. But Java and Python are not able to run in the same process space. So these integrations need to jump through hoops such as spawning Python and Java processes separately and establish communication between them to run them together. On the other hand, Python and C++ natively integrate together with minimal performance overhead. And C++ and Java also natively integrate with the JNI interfaces.
ML / DL has mostly moved away from Java frameworks and relies on the high-performance C/C++ implementations and Python APIs. With the end of Moore’s law insight, custom solutions that are closer to hardware will dominate these markets.
A Library
No single system will be able to meet all these demands. As we know 100% of system languages are not going to go well with users. We like business-friendly and user-friendly languages such as Java and Python. So a better approach would be to implement critical parts of these systems as highly optimized libraries with system-level languages. Compact data storage in memory and optimized distributed data operations are two key areas for accelerating data engineering.
There are two widely used data structures in data processing tools. Tables are used for storing heterogeneous data. Second numerical data are stored in tensors, matrices, and vectors. They are homogeneous data structures compared to tables.
We can store both these structures in column format or row format where it specifies what values are stored contiguously. How we store the data either in row format or column format can have a significant effect on performance due to the access patterns and cache use. We can build data processing APIs on the Table structure. ML / DL engines are based on the tensor structures. We need to convert from table to tensors when we integrate with ML / DL frameworks.
The natural operations around table structures are the relational algebra operations. They include operations such as joins, union, difference, select, and project. These operations can be implemented on top of a partitioned table structure across parallel processes creating a distributed table.
Cylon
Cylon is a high-performance, MPI (Message Passing Interface)-based distributed memory data-parallel library for processing structured data. Cylon implements a set of relational operators to process data. While “Core Cylon” is implemented using system-level C/C++, it has Python and Java interfaces. This enables both data and AI/ML engineers to invoke data processing operators in a familiar programming language. Its flexible C++ core and language interfaces allow it to be imported as a library for applications or run as a standalone framework. It includes a table abstraction and a set of core relational algebraic operations that are widely used in data processing systems, allowing Cylon to couple seamlessly with existing AI/ML and data engineering infrastructures. Internally it uses a compact Apache Arrow data format.
Cylon documentation — https://cylondata.org/
Cylon Github — https://github.com/cylondata/cylon
Cylon Architecture
Cylon currently provides a set of distributed data-parallel operators to extract, transform, and load structured data in a tabular format. These operators are exposed as APIs in C++, Python, and Java. When an operator is invoked in any of these platforms, that invocation is delegated to the “Core Cylon” framework, which implements the actual logic to perform the operation in a distributed setting. A high-level overview of Cylon, along with its core framework, is depicted in Figure X.
The Cylon core has a table abstraction to represent structured data. When a table is created in a distributed context, each worker or process will hold a partition of the data that logically belongs to the table. However, each process can work on their own portion of the table, as if they are working on the entire dataset. Cylon “local operators” are executed on the local data and distributed operators depend on these local operators after distributing data as per operator requirements. Distributed operators are implemented based on the Bulk Synchronous Parallel (BSP) approach and the framework synchronizes local operations as needed. The core distributed data operations of Cylon are
- Select
- Project
- Join
- Union
- Difference
- Intersection
- GroupBy
- Aggregations (SUM, MIN, MAX, etc)
- Sort
In order to take the complexity of distributed programming away from the user, Cylon internally performs network-level operations and abstracts out the distributed nature of the operators. Those network operations function on top of a layer where communication can take place over either TCP, Infiniband, or any other protocol supported by the underlying communication layer of the framework.
Cylon Use
Cylon is mainly intended for querying large amounts of structured data for both pre-processing and post-processing purposes. There are three target use cases for Cylon.
- As a fast and scalable Python library that can be integrated with frameworks such as PyTorch or Dask with similar APIs to Pandas.
- As a standalone distributed framework for data processing. Cylon uses OpenMPI to bootstrap in a distributed environment
- As a library to accelerate Java data processing frameworks
Because Cylon uses Apache Arrow data format underneath, it can integrate with an existing system that supports the format without data conversion costs. At the moment it uses MPI for running in a cluster environment. Because of this, it can be easily used in an HPC cluster as well.
Performance
System integration, performance, and usable APIs are the three main areas we are focusing on Cylon. The current codebase has shown a tremendous upside in the data processing operation performance with room for improvement.
In this test, we tried to load two tables each having 40 x 384 million records (30 billion). Each record has two attributes and both are long numbers. We joined these two tables both with Spark and Cylon to see if Cylon is performing reasonably well.
For Cylon, it took about 45 seconds with a parallelism of 384 to join these two tables with the 100Gbps network and about 100 seconds on the 10Gbps network. For Spark, it took about 1200 seconds to join these two tables with a 10Gbps network and we couldn’t get it to work with the Omni-Path for this experiment. We believe the primary reason behind this performance gap is the Cylon's ability to use memory efficiently. Spark cannot handle this many tuples in memory and needs to use the disk to handle the join. Limitations of Java have forced Spark-like systems to keep the data off the heap which essentially reduces the available memory in JVM space and off-heap space. Further JVM is not the best to handle large memory machines with larger core counts due to JVM noise and Garbage collectors.
The tests were carried out in a cluster with 8 nodes with dual Intel(R) Xeon(R) Platinum 8160 CPU @ 2.10GHz (48 cores each node), 256GB of memory, and SSDs. Nodes are connected via 10Gbps ethernet and 100Gbps Intel Omni-Path networks. We used OpenMPI 4.0.3 as the distributed runtime. We used Apache Spark 2.4.5. All executors were equally distributed among the nodes. Our tests used the 10Gbps network and Cylon can use the 100Gbps network as well.
This result illustrates the need for an architecture such as Cylon which uses system-level code to efficiently handle large data operations. Obviously, if there were more nodes available Spark could handle this load much more efficiently.