CrateDB Blog | Development, integrations, IoT, & more

Distributed query execution in CrateDB: What you need to know

Written by Marija Selakovic | 2022-07-20

This blog post has been inspired and based on the content published by Mathias Fußenegger in a series of articles on CrateDB internals. For more examples of how CrateDB executes queries please check the original posts.

The query language in CrateDB

CrateDB relies on SQL for data querying and manipulation. Using SQL as a query language reduces the learning curve, eases porting, and allows users to focus on query logic rather than dealing with low-level details of a distributed system. Furthermore, CrateDB also enables users to write UDFs (e.g., user-defined functions) to manipulate data where needed.

SQL statements are internally transformed into a series of operators that have to be applied in order, and every operator can only process its input data. For instance, WHERE is one of the innermost operators, and SELECT is applied almost at the end. The order of operators to be executed depends on the optimizer. As in other database systems, the optimizer in CrateDB applies optimization rules to the tree of operators or to a subset of the tree to re-write the tree into an equivalent variant that is cheaper to execute.

A better understanding of the execution engine in CrateDB can help you design queries that fully benefit from the scalability and performance properties of CrateDB.

Learn how CrateDB generates execution plans, and the optimizations influence the order of operators. 

Query compilation and execution

Logical plan

Generally speaking, a logical plan is the abstraction of all transformation steps needed to execute a query. To understand the logical plan in CrateDB execute the SQL query with the EXPLAIN statement. For instance, consider the simple SELECT statement:

Explain SELECT name FROM users;
-> Collect[doc.users | [name] | true]

In this case, the logical plan consists of a single Collect operator. The parameters include doc.users - the name of the table, name - the attribute being collected, and true - a query expression. Adding a filter in the WHERE clause will only change the query expression as follows:

EXPLAIN SELECT name FROM users WHERE name = 'name';
-> Collect[doc.users | [name] | (name = 'name')]

This behavior is due to the implementation of the Collect operator being quite robust in CrateDB, and there is currently no distinction between table scans or index lookup at the logical planner level.

With more complex queries, more complex logical plans get generated. For instance, consider subquery with SELECT statement:

EXPLAIN SELECT name FROM (SELECT name FROM users WHERE name = 'name') AS n
WHERE n.name = 'foo';

And the corresponding logical plan:

Rename[name] AS n
    └ Collect[doc.users | [name] | ((name = 'foo') AND (name = 'name'))

 

 

 

 

 

 

 

The logical plan results in a Rename operator. What’s interesting is that the inner WHERE clause and the outer WHERE clause got merged into (name = 'foo') AND (name = 'name') and this expression is part of the Collect operator. This happened due to filter pushdown optimization that tries to move predicates as far down the tree as possible to reduce the number of rows that need to be examined.

If the Collect operator contains an expression, CrateDB will use the Lucene index if feasible. In the previous case, CrateDB will look for the terms foo and name in an inverted index. The inverted index maps the terms to a set of document IDs as illustrated in our previous article on Indexing and Storage in CrateDB. Using the Lucene index is significantly cheaper than loading rows and computing name = 'foo' with the concrete name value for each row.

As illustrated, EXPLAIN prints a list of logical operators. Before executing the query, CrateDB transforms the logical plan into a physical execution plan.

Physical execution plan

A logical execution plan does not take into account the information about data distribution. CrateDB is a distributed database and data is sharded: a table can be split into many parts - so-called shards. Shards can be independently replicated and moved from one node to another. The number of shards a table can have is specified at the time the table is created.

The physical execution plan reasons where the data is located and how it is supposed to arrive at the node with which the client communicates. We refer to this node as the handler node because it handles the communication with the client and because it initiates the execution of the query. The nodes from which data is gathered are called collect nodes, and sometimes there are also merge nodes involved, which merge the results from collect nodes.

Most of the physical execution plans share the same building blocks, but how it is going to look depends on the exact query. In general, an execution plan consists of one or more execution phases. Each logical operator contains logic to either create an execution phase or to add additional transformations to an execution phase created by a child operator.

Let’s consider the physical execution plan for the simple SELECT name FROM users;  statement:

 

 

RoutedCollectPhase:
  toCollect: [Ref{name}]
  routing:
    node-0:
      users: [0, 1]

    node-1:
      users: [2, 3]

MergePhase:
  executionNodes: [node-0]
  numUpstreams: 2

The plan consists of CollectPhase and a MergePhase. In this example, the Collect operator creates a CollectPhase. CollectPhase is a source, which means that it is supposed to output rows by reading them from disk or from memory. The Collect operator creates this CollectPhase by combining the information it holds itself with information from the cluster state. The cluster state is a snapshot that represents the current state of the cluster. This includes information like which tables exist, what columns they have, and on which node the different table shards are located.

Furthermore, The CollectPhase can also contain a list of columns or expressions to retrieve, an optional filter expression, and an optional ordering expression.

The toCollect property tells us which attributes should be collected and the routing property tells us from where. The routing includes the shard IDs of the tables or partitions which should be queried and on which nodes they reside. In this case, the executor has to retrieve the data from node-0 and node-1 as both these nodes contain two shard copies of the table.

The MergePhase is used to indicate that a node must merge data from all nodes involved in the CollectPhase. Usually, this merge phase is assigned to the handler node. In this case, it is node-0 and it is expecting results from two other nodes. In this scenario, node-0 is both the handler node and a collect node. It is expecting results from itself as well as from one other node.

Once the planner/optimizer finishes creating the physical execution plan, it executes it.

The execution

The execution layer looks at the routing information of the execution plan to figure out which nodes are involved in the query, and then sends each node the phases of the execution plan that they have to execute as part of a JobRequest. Each node contains a handler that is responsible for accepting and processing these JobRequests. To process them, they look at the phases of the plan and initiate their corresponding operations.

In the case of a CollectPhase, this includes creating a Lucene query out of the filter expression (e.g., name = 'name'), acquiring a Lucene reader for each shard, and iterating over the matching documents while applying all the transformations that are part of the phase before sending the result back to the handler node.

Finally, the results from execution phases might be pushed to merge nodes (e.g., node-0 in the previous example) before the final result is sent to the handler node and then back to the client.

Example: Query then fetch execution

Let’s extend the simple SELECT query with a LIMIT, and to make the example more realistic we also add a WHERE clause and select an additional column. The query now looks like the following:

SELECT name, age FROM users WHERE name LIKE 'A%' LIMIT 10`

Let’s explore the logical plan with the EXPLAIN statement:

Fetch[name, age]
  └ Limit[50::bigint;0]
     └ Collect[doc.users | [_fetchid] | (name LIKE 'A%')]

Starting from top to bottom, the plan contains the following operators:

  • The Fetch operator: Fetch takes a relation as input and expects it to contain one or more _fetchid columns. It uses these _fetchid values to retrieve the value of other attributes, in this example, it retrieves name and age.
  • The Limit operator. Limit takes relation and limits it to at most 50 records.
  • The Collect operator. The Collect operator in this example indicates that an attribute called _fetchid should be retrieved from the doc.users table. The operator also includes the name LIKE 'A%' query expression, indicating that only records matching this expression should be included. _fetchid is a system column that can be used by the Fetch operator.

As the data in CrateDB is distributed across several shards and nodes, CrateDB cannot accurately know up-front how many records each shard holds and how many of these records will match the name LIKE 'A%' expression. Because of that, in this example CrateDB has to fetch at most 50 records from each node and then merge these together, stopping once the limit is hit.

The Fetch operation uses the readerId that is encoded into the _fetchid to figure out which node it has to contact to get the actual value. The whole Fetch operation works in batches, so multiple records are fetched at a time. It works a bit like an asynchronous flat map. The major advantage of this approach is that now each node only has to load the name and age values that are required. For instance, if data is stored on three nodes, the first node might load 16 records, the second node another 17, and the third node the remaining 17, making up the total 50.

What can we conclude?

This article gives the first introduction to how CrateDB executes distributed queries. We started with the overview of the logical plan, physical execution plan, and execution layer with a simple SELECT statement. Then, we illustrate some of the optimizations and execution strategies with a more complex example involving the LIMIT operator.

Know more about CrateDB here and check our official documentation. Get insights from our community.

To get more insights on how CrateDB executes other types of queries we refer to the original articles by Mathias Fußenegger.