This is part two of a three-part miniseries that looks at how we improved join performance in the CrateDB 3.0 release.
For this batch of work, we were specifically only trying to improve equi-join performance. An equi-join is an inner join where the join condition has a single equality operator (=
) or multiple equality operators chained together with AND
operators.
In part one, Marios covered the following:
- Why we chose to focus on equi-joins
- Why we chose to implement the hash join algorithm for this work
- Step-by-step implementation details for the hash join algorithm
- Benchmarks showing performance improvements as substantial as 11,131x
11,131x is impressive, but it's not the ~23,000x we advertise in the title. And this is where part two and three of this miniseries come in.
One of the limitations of the hash join algorithm is that one of the tables in the join has to fit entirely in memory on the node that is doing the join.
This wasn't an acceptable limitation for us, because we have specifically built CrateDB to ingest and process billions of records per day, with tables that would easily take up terabytes of RAM.
Terabytes of RAM is not beyond the realm of possibility for a single node. But our goal is that you can run a high-performance CrateDB cluster using commodity hardware. And commodity hardware does not often come with terabytes of RAM.
Our solution? To implement the block hash join algorithm.
In this post, I'll talk you through what that means and how we achieved it.
Please note: join algorithms and their performance characteristics are common knowledge for RDBMS developers. This miniseries is written for people who want a behind the scenes look as we implement these algorithms and adapt them for CrateDB and our distributed SQL query execution engine.
Hash Joins Recap
Let's recap how the hash join algorithm works.
If you have read part one, or you have a good working knowledge of the hash join algorithm, you can skip to the next section.
The hash join algorithm has two phases:
- Build phase
- Read all the rows from the left (outer) table one by one. For each row, calculate the hash and insert the row into the hash map.
- Probe phase
- Read all the rows from the right (inner) table one by one. For each row, calculate the hash and look it up in the hash map.
- If no entry is found, then skip that right table row because it does not meet the join condition.
- If an entry is found, then for each row of the left table in the list of the hash map entry, try to validate the join condition. If the join condition is true, emit the row. If not, skip that row and continue with the next row in the list.
Block Hash Joins: Explained
The hash join algorithm (explained above) has one important constraint: the hash map has to be held in memory. And as the hash map contains all the rows from the left table, the contents of that table must fit in memory.
As explained in the introduction, this is not an acceptable limitation for CrateDB. Which is why we turned to the block hash join algorithm.
In a general sense, a block-type algorithm is an algorithm that divides a large dataset into smaller chunks (or blocks) and then sequentially works on those blocks in isolation.
So what does a block hash join algorithm looks like? Well, let's take the outline provided above and modify it a little:
- Build phase
- Read rows from the left table one by one. For each row, calculate the hash and insert the row into the hash map.This time, however, stop filling the hash table when it reaches a preconfigured size (the block size).
- Probe phase
- Perform the same steps as with the regular hash join algorithm, except this time, we are only operating on the subset of rows from the left table that was read into the hash table (i.e., the current block).
- Loop
- If there are any remaining rows in the left table, go back to the build phase and pick up where you left off.
So essentially what we have here is:
- An algorithm that runs multiple iterations of the hash join algorithm
- Each iteration operates on a portion of the left table
- The process ends when the left table has been fully processed
The end result is that as long as the block size fits in memory, we can run the hash join algorithm on tables of any size. Huzzah!
Now that we have a high-level understanding of what needs to be done, let's dive into the implementation details.
Implementation
In part one, we introduced the HashInnerJoinBatchIterator
, which is the Java class that is responsible for building the hash map during the build phase.
To implement the block hash join algorithm, we needed to modify this class so that it can iterate over the left table in blocks.
The first step was to modify the HashInnerJoinBatchIterator
class so that it can take a block size as an argument. When a block size is specified, the hash map is built until the left table has no more rows to iterate over or the maximum hash map size is reached.
But now we're faced with a question: how big should the blocks be?
Determining a Suitable Block Size
There are multiple steps needed to determine a suitable block size. In this subsection, I'll walk you through the process.
Circuit Breakers
CrateDB has what are known as circuit breakers.
In a nutshell, if a node exhausts its memory, it will crash. Circuit breakers monitor a number of CrateDB subprocesses and will terminate them if they get too close to exhausting memory.
One of the circuit breakers monitors query execution and will terminate the query if it uses too much memory.
What's interesting from our point of view is that circuit breakers use configured limits. These limits are set by hand or else they are set automatically by CrateDB. And a circuit is able to report on both its configured limit and the current memory usage.
This information will be useful for our calculations, but we will come back to this.
Statistics Collection
CrateDB has another feature that enables the collection of various statistical information about the cluster.
When this feature is enabled, we can use the data in sys.shards to get information about shard sizes and the number of documents in each shard
This is more useful information.
Estimating the Size of a Row
Here's an example query:
SELECT *
FROM t1
JOIN t2
ON t1.a = t2.b
JOIN t3
ON t2.b = t3.c
The rows that this query produces will be a composite of all columns from t1
, all columns from t2
, and all columns from t3
.
CrateDB has a LogicalPlan
class, which plans how to produce the composite result rows for a query like this using a tree of nested operations.
Here, the logical plan would look like this:
What this means is:
- Collect rows from
t1
andt2
- Perform a hash join on those two rowsets and produce a rowset
- Collect rows from
t3
- Perform a hash join on the remaining rowsets and produce a final rowset
To make a pessimistic estimate of the size of the composite rows we can modify the LogicalPlan
interface so that every operation in the tree has a method that can make a pessimistic estimate of the size of the rowset it produces.
These estimations are only possible because we have access to the statistical information in the sys.shards
table. Specifically, row size, which we need for the estimate, can be calculated for a shard by dividing the size of the shard by the number of rows that shard contains.
This produces a pessimistic estimate because:
- The amount of space a shard takes up on disk is more than the constituent rows take up in memory
- Because CrateDB uses a query then fetch model of query execution, the only columns we need to keep track are the ones necessary for executing the join, meaning that rows in the hash map are typically smaller than the original row.
These estimates "bubble up" the tree, until you reach the parent operator. At that point, you have a pessimistic estimate of the overall composite result row size.
Calculating Block Size
Now that we know have a size estimate for the composite result rows, we can combine that with information from the query circuit breaker to calculate a suitable block size.
To calculate a suitable block size, we do this:
availableMemory = circuitBreaker.limit - circuitBreaker.used;
maxRowCount = availableMemory / estimatedRowSize;
blockSize = Math.min(tableRowCount, maxRowCount);
We take the minimum of the two here because there's no sense in creating blocks that are larger than the table itself.
If statistics are not available, we use the default block size, which is calculated to fit into a single data fetch.
Calculating Block Size Again
Database traffic can fluctuate a lot, which means that the available memory can fluctuate a lot too. To accommodate for this, we don't just calculate the block size once.
A suitable block size is calculated at the start of every iteration. That is, the block hash join algorithm is adaptive, and every new block that is created is suitable for the memory conditions at the time.
Recap
Okay, now we've been over the details, let's put it all together.
Our final block hash join algorithm looks like this:
- Calculate a suitable block size given current memory conditions.
- Iterate over rows in the left table, inserting into the hash map until the block size is reached or we run out of rows.
- Iterate over all rows in the right table, check if it matches a row in the hash map, and if so, emit both rows.
- Clear the buffer and go to step one.
Optimizing The Algorithm
At this point, we started to think about the fact that we read the left table rows once, but we read the right table rows once for every block we create.
Let's consider two scenarios:
Left Table Rows | Right Table Rows | Blocks | Total Right Table Row Reads (Right Table Rows x Blocks) |
Total Row Reads (Left Table Rows + Right Table Row Reads) |
---|---|---|---|---|
100 | 200 | 10 | 2,000 | 2,100 |
200 | 100 | 10 | 1,000 | 1,200 |
The algorithm does more work for the same total row count when the right table is larger.
All we're doing with a hash join is comparing two lists to find matching elements. So it doesn't matter which list is on the left or the right.
So, given that this algorithm has more work to do when the right-hand list is bigger, we figured that a potential optimization at this point was to do a size check and swap them if necessary.
Benchmarking
We implemented this optimization and then ran a few benchmarks to validate that it was working as expected.
Scenario One
We tested the performance impact of switching the smaller table to the right when we only use one block. This is what happens when the entire left table fits in memory.
Left Table Rows | Right Table Rows | Time (milliseconds) |
---|---|---|
Test #1 | ||
1,000 | 1,000,000 | 2,180 |
1,000,000 | 1,000 | 2,230 |
Test #2 | ||
100 | 10,000 | 270 |
10,000 | 100 | 260 |
The performance of both variants is very similar. This was expected, because the algorithm does the same amount of row reads in both instances.
The behavior of the hash map in this scenario is predominantly responsible for performance differences. (It's out-of-scope for this blog post, but in summary, the algorithm will run faster when the table that supplies the hash map contains more matching rows than the table that is used to look up rows in the hash map.)
Scenario Two
In our second scenario, we wanted to test the switch when left table doesn't fit into memory. That is, when we end up using multiple blocks.
This is what the optimization was built for, so this is where we were hoping to see performance improvements.
We ran several tests, each one using 100 blocks.
Left Table Rows | Right Table Rows | Time (milliseconds) |
---|---|---|
Test #1 | ||
1,000 | 10,000 | 1,200 |
10,000 | 1,000 | 700 |
Test #2 | ||
1,000 | 100,000 | 1,300 |
100,000 | 1,000 | 900 |
Test #3 | ||
1,000 | 1,000,000 | 4,200 |
1,000,000 | 1,000 | 2,300 |
Test #4 | ||
10,000 | 100,000 | 6,500 |
100,000 | 10,000 | 3,100 |
Test #5 | ||
10,000 | 1,000,000 | 27,000 |
1,000,000 | 10,000 | 4,000 |
These results demonstrate a performance improvement of approximately 50% in most cases, which we considered a success.
More Optimizations
During our testing, we noticed something unusual.
When the tables being joined were large, and there was no ORDER BY
clause, and the LIMIT
was small, we saw query times that were orders of magnitude slower than what we expected.
Here's a sample query:
SELECT *
FROM t1
JOIN t2
ON t1.a = t2.b
LIMIT 100
Now, consider the case where:
- Both tables have 10 million rows
- The CrateDB node has enough memory to fit one million rows
What ends up happening here is that we maximize the block size to fit in memory, even though we only need to return 100 rows. So, let's say we find those 100 matching rows in the first 1,000 comparisons. We loaded an additional 999,000 rows into memory for no reason.
To address this issue, we had to alter the block size calculation to take the presence of a LIMIT
clause into account.
If an ORDER BY
clause is present, we have to process all matches first before we can order them. So these queries are unaffected.
However, if there is no ORDER BY
clause, and there is a LIMIT
clause that is smaller than the default block size, we use the smallest of following two values as the block size:
- The standard calculated block size (which maximizes memory usage)
- The default block size (which, as mentioned before, is calculated to fit into a single data fetch)
This ensures that the algorithm does not needlessly consume memory.
Wrap Up
In part one of this three-part miniseries, Marios introduced you to the hash join algorithm and showed you how we implemented it for CrateDB.
One of the significant limitations of the hash join algorithm is that it requires you to hold the full contents of a single table in memory. Because we have built CrateDB to process massive amounts of data, this was an unacceptable limitation.
In this post, I showed you how we removed this memory limitation with the block hash join algorithm.
But why stop there.
So far, all we have considered is a single node executing the join algorithm. However, CrateDB works best when it can distribute query execution across the whole cluster.
So, in the final part of this miniseries, Marios will show you how we took the block hash join algorithm and modified it to run in parallel on multiple nodes.