Cylon DataFrame for Distributed Data Processing

Data engineering is becoming an increasingly important part of scientific discoveries with the adoption of deep learning and machine learning. Data engineering deals with a variety of data formats, storage, data extraction, transformation, and data movements. One goal of data engineering in HPC is to transform data from original data to vector/matrix/tensor formats accepted by scientific applications and deep learning applications.

There are many data structures such as tables, graphs, and trees to represent data. Among them, tables are a versatile and commonly used format to load and process data. Cylon provides a distributed memory DataFrame API on Python for processing data using a Tabular format. Unlike existing state-of-the-art data engineering tools written purely in Python, Cylon adopts high-performance compute kernels in C++, with an in-memory table representation. In the core system, Cylon uses MPI for distributed memory computations with a data-parallel approach for processing large datasets in HPC clusters.

Cylon is now with the 0.4.0 release. This release has many exciting new features including the DataFrame API.

First Example

Before diving into details of Cylon let's look at a simple example for joining two datasets. Cylon supports Python Conda-based installations. So let us try to install it in a Conda environment.

conda create -n cylon-0.4.0 -c cylondata pycylon python=3.7
conda activate cylon-0.4.0

Now let's try to run our first example which is a serial code.

from pycylon import DataFrame, CylonEnv
from pycylon.net import MPIConfig

df1 = DataFrame([[1, 2, 3], [2, 3, 4]])
df2 = DataFrame([[1, 1, 1], [2, 3, 4]])

df3 = df1.merge(right=df2, on=[0])
print(df3)

Here we are creating two DataFrame objects and join them on column 0. Inside the Conda environment, let us try to run this example.

python example.py

Distributed Execution

Now is the fun part to run this example on multiple processes. Instead of creating data in the memory, let's read data from a public dataset.

from pycylon import DataFrame, CylonEnv
from pycylon.net import MPIConfig
import pandas as pd
env = CylonEnv(config=MPIConfig())# read using pandas, we have a file for each process
pd1 = pd.read_csv('https://raw.githubusercontent.com/cylondata/cylon/main/data/input/csv1_' + str(env.rank) + '.csv')
df1 = DataFrame(pd1)
# read using pandas, we have a file for each process
pd2 = pd.read_csv('https://raw.githubusercontent.com/cylondata/cylon/main/data/input/csv2_' + str(env.rank) + '.csv')
df2 = DataFrame(pd2)
df3 = df1.merge(right=df2, on=[0], algorithm="hash")
print(df3)

Now let's run this program with 2-way parallelism. Cylon uses MPI to spawn processes and execute in parallel. The following command will spawn 2 parallel processes and execute the Python script.

mpirun -np 2 python example.py

After running we will get the following output from two processes. In this case, two processes load two tables independently and do joins independently.

   _x0    _x1  _y0    _y1
0 37 0.834 37 0.587
1 37 0.491 37 0.587
2 19 0.422 19 0.157
3 21 0.853 21 0.562
4 76 0.927 76 0.642
5 59 0.837 59 0.239
6 13 0.201 13 0.377
7 13 0.085 13 0.377
_x0 _x1 _y0 _y1
0 3 0.025 3 0.025
1 26 0.676 26 0.394
2 26 0.394 26 0.394
3 29 0.291 29 0.979
4 29 0.291 29 0.979
5 29 0.291 29 0.979
6 29 0.291 29 0.979
7 61 0.685 61 0.685
8 57 0.314 57 0.314
9 57 0.153 57 0.314

Now if we want to do a join across the two processes we need to change the merge call by adding a small argument. The merge call is now taking the env as an argument, that tells it to run across the 2 processes.

from pycylon import DataFrame, CylonEnv
from pycylon.net import MPIConfig
import pandas as pd
env = CylonEnv(config=MPIConfig())
# read using pandas, we have a file for each process
pd1 = pd.read_csv('https://raw.githubusercontent.com/cylondata/cylon/main/data/input/csv1_' + str(env.rank) + '.csv')
df1 = DataFrame(d1)
# read using pandas, we have a file for each process
pd2 = pd.read_csv('https://raw.githubusercontent.com/cylondata/cylon/main/data/input/csv2_' + str(env.rank) + '.csv')
df2 = DataFrame(d2)
df3 = df1.merge(right=df2, on=[0], algorithm="hash", env=env)
print(str(env.rank) + df3)

Now if we run the program as before with 2-way parallelism it will give the following output.

mpirun -np 2 python example.py

You can see that the output is now different because we are running the join across 2 processes.

_x0 _x1 _y0 _y1
0 3 0.025 3 0.993
1 43 0.419 43 0.610
2 37 0.834 37 0.587
3 37 0.491 37 0.587
……………………..

19 29 0.291 29 0.979
20 31 0.122 31 0.862
21 61 0.685 61 0.685
22 57 0.314 57 0.314
23 57 0.153 57 0.314

_x0 _x1 _y0 _y1
0 26 0.394 26 0.394
1 26 0.676 26 0.394
2 14 0.193 14 0.250
3 76 0.927 76 0.642
4 4 0.529 4 0.594

How Cylon Programs Work

Unlike other data frameworks that take a global view of the data, Cylon takes a local view of the data in parallel processes. When running in parallel it assumes there are N processes (parallel processes) and they execute the same Python script. At runtime, these processes can distinguish themselves using a unique process ID.

Each process can apply local operators on DataFrames or processes can collectively apply distributed operators as shown in Figure 1.

Figutre 1. Cylon Local and Distributed Operatora on DataFrames

It is a powerful model but requires users to carefully program their data processing applications. At the moment Cylon implements more than 40 DataFrame operators that are similar to Pandas.

Cylon Architecture

Cylon implements a set of relational algebra operators in C++ and expose those through Python and Java APIs. It uses Apache Arrow data format underneath to represent data as in-memory tables. The Python API is provided by wrapping the C++ API using Cython. Because we use Arrow format, users can use PyArrow API as well.

Figure 2 Cylon Architecture

The distributed operators are implemented using MPI based communication operators. There is an ongoing effort to implement the communication operators on the UCX library.

What is Next?

One goal of Cylon is to give user the ability to easily integrate data engineering in their data science workflows, We are exploring to integrate Cylon with workflow engines like Ray, Dask or Parsl. There is an effort to implement the TPCx-BB benchmark suite with Cylon. It will be interesting to compare performance with other popular tools and Cylon once this is complete.

High performance data analytics