march 23, 2023
Apache Kylin distributed database
Analysts often have to group and merge data. These operations in relational databases are resource intensive. Operational analytical processing (OLAP) is a technology that organises large commercial databases and supports complex analysis. It can be used to perform complex analytical queries without adversely affecting transaction systems. OLAP data is pre-calculated and aggregated, which speeds up analysis.
Apache Kylin™ is an open-source distributed database designed to enable OLAP (online analytics) in the era of Big Data. Distributed computing and data storage offer a number of benefits, such as scalability, fault tolerance and load balancing.
The fundamental structure of the Kylin OLAP Engine includes a metadata engine, a query engine, a job engine and a storage engine to run the entire stack. It also includes a REST server to serve client requests. As an analytical data store, Kylin offers ANSI SQL for Hadoop/Spark and supports most ANSI SQL query functions. Figure 1 shows an interaction diagram of Apache Kylin subsystems. The latest stable version of Kylin4, whose main components are:
Spark + Parquet
Figure 1 — Apache Kylin subsystem interaction diagram
Hadoop is a freely distributed set of utilities, libraries and frameworks for developing and executing distributed programs running on clusters of many nodes.
The project was originally developed in Java within the MapReduce computing paradigm, where an application is divided into a large number of identical elementary tasks that are executed on distributed computers (nodes) in a cluster and summarised into a single result.
The project includes modules such as:
- Hadoop Common — Common utilities that support other Hadoop modules.
- Hadoop Distributed File System (HDFS™) is a distributed file system providing high-speed access to application data.
- Hadoop YARN is a platform for job scheduling and cluster resource management.
- Hadoop MapReduce is a YARN-based system for parallel processing of large data sets.
The HDFS cluster includes the following components:
- A management node, name node or name server (NameNode) is a separate, single server in the cluster with software code to manage the file system namespace, storing the file tree as well as file and directory metadata. The NameNode is a mandatory component of the HDFS cluster and is responsible for opening and closing files, creating and deleting directories, managing access by external clients, and matching files and blocks duplicated (replicated) on data nodes. The name server exposes the location of the data blocks on the cluster machines to everyone.
- Secondary NameNode – Secondary name node, a separate server, the only one in the cluster that copies HDFS image and transaction log of file block transactions to a temporary folder, applies changes accumulated in transaction log to HDFS image, and writes it to NameNode and clears transaction log. Secondary NameNode is needed for quick manual recovery of the NameNode in the event of failure.
- Node or data server (DataNode, Node) is one of many cluster servers with software code responsible for file and block operations. The DataNode is an essential component of the HDFS cluster and is responsible for writing and reading data, executing commands from the NameNode to create, delete, and replicate blocks, as well as periodically sending status messages (heartbeats) and processing read and write requests from HDFS file system clients. It is worth noting that data flows from the rest of the cluster nodes to the client past the NameNode.
- Client (client) is a user or application interacting with distributed file system through special API (Application Programming Interface). If client has sufficient rights, it is allowed to do the following operations with files and directories: create, delete, read, write, rename and move. When creating a file, client can explicitly specify file block size (default is 64 Mb) and number of replicas to be created (default value is 3).
The fundamental idea behind YARN is to separate resource management and job scheduling/monitoring functions into separate daemons. The idea is to have a global ResourceManager ( RM ) and ApplicationMaster for each application ( AM ). The application is either a single job or a group of DAG jobs.
ResourceManager and NodeManager form the structure of the data calculation. The ResourceManager is the ultimate authority that allocates resources to all applications in the system. The NodeManager is the infrastructure agent for each machine and is responsible for containers, keeping track of their resource usage (CPU, memory, disk, network) and reporting this to the ResourceManager/planner.
The ApplicationMaster for each application is essentially a platform-specific library and is tasked with coordinating resources with the ResourceManager and working with the NodeManager(s) to perform and monitor tasks.
Figure 2 — Hadoop process interaction diagram
ResourceManager consists of two main components: Scheduler and ApplicationsManager.
The Scheduler is responsible for allocating resources to the various applications running, taking into account known capacity constraints, queues, etc. The Scheduler does not monitor or track the status of the application. In addition, it does not provide any guarantees about restarting unexecuted tasks due to application failure or hardware failures. Scheduler performs its scheduling function based on application resource requirements; this is done based on the abstract concept of container resources, which includes items such as memory, CPU, disk, network, etc.
ApplicationsManager is responsible for accepting submitted jobs, negotiating the first container for execution of a particular ApplicationMaster application, and providing a service to restart the ApplicationMaster container in case of failure. The ApplicationMaster for each application is responsible for negotiating the appropriate resource containers with the scheduler, keeping track of their status and monitoring their progress.
Hadoop MapReduce is a software environment for easily writing applications that process huge amounts of data (multi-terabyte datasets) in parallel on large clusters (thousands of nodes) of public hardware in a robust and fault-tolerant manner.
MapReduce (job) usually splits the input dataset into independent chunks, which are processed by map tasks completely in parallel. The framework sorts the map output, which is then input to reduce tasks. Normally both input and output tasks are stored in the file system. YARN takes care of task scheduling, task monitoring and re-execution of unperformed tasks.
Apache Hive™ is data warehouse software that facilitates the reading, writing and management of large datasets residing in HDFS using the SQL-like query language HiveQL.
Hive was created to enable non-programmers familiar with SQL to work with petabytes of data using HiveQL. Traditional relational databases are designed for interactive queries to small and medium datasets, and do not handle huge datasets well. Instead, Hive uses batch processing to quickly handle a very large distributed database. Hive converts HiveQL queries into MapReduce jobs that run in the Apache Hadoop distributed job scheduling environment, Yet Another Resource Negotiator (YARN). It queries data stored in the Hadoop Distributed Storage Facility (HDFS).
HiveQL allows you to create data storage tables and explicitly specify the file storage directory in HDFS, file type, delimiter. An example script is shown in Figure 3.
Figure 3 — Hive table creation script
The peculiarity of this type of tables is the process of adding, deleting, modifying data, which is done by working with files in HDFS. That is, to load data into the table created by the script shown in Figure 2, it is necessary and sufficient to locate a text file (e.g. .csv) corresponding to the table structure (without column headers) in the directory hdfs:/tmp/sqldata/t_dim_items.
Spark is a fast and versatile engine for large-scale data processing. The secret to speed is that Spark runs in RAM, which makes processing much faster than on hard disks.
RDD (Resilient Distributed Dataset) is a basic Spark concept. A set of N-dimension cuboids can be well described as RDDs, an N-dimension cube will have N + 1 RDDs. These RDDs have a parent/descendant relationship, as the parent can be used to create children. With the parent RDD cached in memory, generating a child RDD can be much more efficient than reading from disk. Figure 4 describes this process.
Figure 4 — Diagram of the RDDs construction process
Figure 5 illustrates the cuboid calculation process in detail: in «step 5» Kylin uses HiveContext to read the intermediate Hive table and then performs a «mapping» operation, which is a one-to-one mapping to encode the original values into K-V bytes. When completed, Kylin receives the RDD with the intermediate encoding. In «step 6», the intermediate RDD is aggregated using the «reduceByKey» operation to produce RDD-1, which is the base rectangular parallelepiped. Then a «flatMap» (one-to-many map) is performed on RDD-1, because the base cuboid has N child cuboids. And so on, RDDs of all levels are calculated. These RDDs will be stored in the distributed file system when complete, and will also be cached in memory for calculation of the next level. When a child is generated, it will be removed from the cache.
Figure 5 — DAG Cubing in Spark
The output of Spark generates aggregated data files in Parquet format. Apache Parquet is an open-source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and coding schemes with enhanced performance for handling large volumes of complex data.
Spark operates in distributed computing mode to prevent performance bottlenecks. The computational power of the system can be increased by horizontal expansion (scaling). There are various resource scheduling schemes, such as Yarn, K8S or Mesos, to meet resource isolation needs (Apache Kylin4 uses Yarn).
After reviewing the Apache Kylin subsystems, it becomes logical to describe the process of working with this tool, which includes the following basic steps:
1. Loading data into HDFS;
2. Developing the data storage in Hive (creating tables with the sources in HDFS);
3. Development of data model, cube in Kylin;
4. Assembly of the cube:
a. YARN carries out the planning and resource allocation process.
b. Spark generates a cuboid construction plan.
c. Spark builds cuboids and writes data to Parquet files.
5. Work with the analytical cube (developing sql-queries).
The article discusses the Apache Kylin architecture, the understanding of which significantly helps in developing analytic solutions and allows controlling the construction of unnecessary combinations of rectangular parallelepipeds when Spark jobs are executed. This significantly reduces cube build time and query latency.