july 09, 2023
Apache Kylin distributed analytics data store
Analysts have to group and merge data frequently. These operations in relational databases are resource-intensive. Operational analytical processing (OLAP) is a technology that organizes large commercial databases and supports complex analysis. It can be used to perform complex analytic queries without negatively impacting transactional systems. OLAP data is pre-calculated and aggregated, which speeds up analysis.
Apache Kylin™ is an open source distributed analytics data store designed to enable OLAP (online analytical processing) in the era of Big Data. Distributed computing and data storage provide a number of benefits such as scalability, fault tolerance and load balancing.
The fundamental structure of Kylin OLAP Engine includes a metadata engine, query engine, job engine and storage engine to run the entire stack. It also includes a REST server to serve client requests. As an analytic data store, Kylin offers ANSI SQL for Hadoop/Spark and supports most ANSI SQL query functions. Figure 1 shows the Apache Kylin subsystem interaction diagram. The latest stable version is Kylin4, whose main components are:
- Hadoop
- Hive
- Spark + Parquet
Figure 1 — Apache Kylin subsystems interaction diagram
Hadoop
Hadoop is a freely distributed set of utilities, libraries and framework for developing and executing distributed programs running on clusters of multiple nodes (nodes).
The project was originally developed in Java within the MapReduce computing paradigm, when an application is divided into a large number of identical elementary tasks, which are executed on distributed computers (nodes) of the cluster and summarized into a single result.
The project includes such modules as:
- Hadoop Common — Common utilities that support other Hadoop modules.
- Hadoop Distributed File System (HDFS™) — A distributed file system that provides high-speed access to application data.
- Hadoop YARN — A platform for job scheduling and cluster resource management.
- Hadoop MapReduce is a YARN-based system for parallel processing of large datasets
The HDFS cluster includes the following components:
- A management node, name node or name server (NameNode) is a separate, single server in the cluster, with program code for managing the file system namespace, storing the file tree, as well as file and directory metadata. The NameNode is a mandatory component of the HDFS cluster and is responsible for opening and closing files, creating and deleting directories, managing access by external clients, and matching files and blocks duplicated (replicated) on data nodes. The nameserver exposes the location of data blocks on cluster machines to anyone who wants to see them.
- Secondary NameNode — Secondary NameNode is a separate server, the only one in the cluster, that copies the HDFS image and the transaction log of file block operations to a temporary folder, applies the changes accumulated in the transaction log to the HDFS image, and writes it to the NameNode and clears the transaction log. Secondary NameNode is required for quick manual recovery of the NameNode in case of its failure.
- A DataNode (Node) is one of the many servers in a cluster with program code responsible for file and block operations. DataNode is a mandatory component of the HDFS cluster, which is responsible for writing and reading data, executing commands from the NameNode to create, delete and replicate blocks, as well as periodically sending status messages (heartbeats) and processing read and write requests from HDFS file system clients. It is worth noting that data flows from the rest of the cluster nodes to the client past the NameNode.
- Client — a user or application interacting with the distributed file system through a special interface (API — Application Programming Interface). If the client has sufficient rights, it is allowed to perform the following operations with files and directories: create, delete, read, write, rename and move. When creating a file, the client can explicitly specify the file block size (64 MB by default) and the number of replicas to be created (the default value is 3).
The fundamental idea behind YARN is to separate the resource management and job scheduling/monitoring functions into separate daemons. The idea is to have a global ResourceManager ( RM ) and an ApplicationMaster for each application ( AM ). An application is either a single job or a group of DAG jobs.
The ResourceManager and the NodeManager form the data computation structure. The ResourceManager is the ultimate authority that allocates resources to all applications in the system. The NodeManager is the infrastructure agent for each machine, which is responsible for containers by monitoring their resource usage (CPU, memory, disk, network) and reporting it to the Resource Manager/Scheduler.
The ApplicationMaster for each application is essentially a platform-specific library, and is tasked with coordinating resources with the ResourceManager and working with the NodeManager(s) to execute and monitor tasks.
Figure 2 — process interaction diagram in Hadoop
ResourceManager consists of two main components: Scheduler and ApplicationsManager.
The Scheduler is responsible for allocating resources to the various running applications, taking into account known capacity constraints, queues, etc. Scheduler does not perform monitoring or tracking of the application state. In addition, it does not provide any guarantees regarding restarting uncompleted tasks due to application failure or hardware failures. Scheduler performs its scheduling function based on application resource requirements; this is done based on the abstract concept of container resources, which includes items such as memory, CPU, disk, network, etc.
The ApplicationsManager is responsible for accepting submitted jobs, matching the first container to execute a particular ApplicationMaster application, and providing a service to restart the ApplicationMaster container in case of failure. The ApplicationMaster for each application is responsible for coordinating the corresponding resource containers with the scheduler, tracking their state, and monitoring execution progress.
Hadoop MapReduce is a software environment for easily writing applications that process huge amounts of data (multi-terabyte datasets) in parallel on large clusters (thousands of nodes) of public hardware in a reliable and fault-tolerant manner.
A MapReduce job typically splits an input dataset into independent chunks that are processed by map tasks in full parallel. The framework sorts the map output, which is then input to reduce tasks. Typically, both input and output tasks are stored in the file system. YARN takes care of scheduling tasks, monitoring them, and re-executing uncompleted tasks.
Hive
Apache Hive™ is data store software that makes it easy to read, write, and manage large datasets residing in HDFS using the SQL-like query language HiveQL.
Hive was created to enable non-programmers familiar with SQL to work with petabytes of data using HiveQL. Traditional relational databases are designed for interactive queries on small to medium-sized datasets and do not handle huge datasets well. Instead, Hive uses batch processing to quickly handle a very large distributed database. Hive converts HiveQL queries into MapReduce jobs that run in Apache Hadoop’s distributed job scheduling environment, Yet Another Resource Negotiator (YARN). It queries data stored in Hadoop distributed storage (HDFS).
HiveQL allows you to create datastore tables and for them explicitly specify the HDFS file storage directory, file type, and delimiter. An example script is shown in Figure 3.
Figure 3- script for creating a table in Hive
The peculiarity of this type of tables is the process of adding, deleting, changing data, which is performed by working with files in HDFS. That is, to load data into the table created by the script shown in Figure 2, it is necessary and sufficient to place a text file (e.g. .csv) corresponding to the table structure (without column headers) in the directory hdfs:/tmp/sqldata/t_dim_items.
Spark
Spark is a fast and versatile engine for large-scale data processing. The secret of speed is that Spark runs in RAM, which makes processing much faster than on hard disks.
RDD (Resilient Distributed Dataset) is the basic concept behind Spark. A set of N-dimensional cuboids can be well described as an RDD, an N-dimensional cube will have N + 1 RDDs. These RDDs have a parent/descendant relationship because the parent can be used to create children. With the parent RDD cached in memory, generating a child RDD can be much more efficient than reading from disk. Figure 4 describes this process.
Figure 4 — Schematic of the process of building RDDs
Figure 5 illustrates the cuboid computation process in detail: in «step 5» Kylin uses the HiveContext to read the intermediate Hive table, and then performs a «match» operation, which is a one-to-one mapping to encode the original values into K-V bytes. When complete, Kylin receives the intermediate encoded RDD. In «step 6», the intermediate RDD is aggregated using the «reduceByKey» operation to get RDD-1, which is a basic rectangular parallelepiped. Then, a «flatMap» (one-to-many map) is performed on RDD-1 because the base cuboid has N child cuboids. And so on, the RDDs of all levels are computed. These RDDs will be stored in the distributed file system when finished, and will be cached in memory for the next level computation. When a child is generated, it will be removed from the cache.
Figure 5 — DAG Cubing in Spark
Spark generates aggregated data files in Parquet format. Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and performance-enhanced coding schemes for processing large amounts of complex data.
Spark operates in distributed computing mode to prevent the existence of performance bottleneck. The computing power of the system can be increased through horizontal expansion (scaling). There are various resource scheduling schemes such as Yarn, K8S, or Mesos to meet resource isolation needs (Apache Kylin4 uses Yarn).
Conclusion
After reviewing the subsystems of Apache Kylin, it becomes logical to describe the process of working with this tool, which includes the following main steps:
- Loading data into HDFS
- Developing a data store in Hive (creating source tables in HDFS)
- Developing the data model, the cube in Kylin
- Cube assembly
- YARN performs the planning and resource allocation process
- Spark generates a plan for building cuboids
- Spark performs cuboid construction and writes data to Parquet files
- Work with analytical cube (development of sql-queries)
The article considers the Apache Kylin architecture, understanding of which significantly helps in developing analytical solutions and allows controlling the construction of unnecessary combinations of rectangular parallelepipeds when executing Spark jobs. This significantly reduces cube build time and query latency.
List of sources:
https://medium.com/@cesaradvincula/next-generation-olap-analytics-apache-kylin-7a1c12a0c082