The 2024 CrateDB architecture guide covering all key concepts is out.

Download now
Skip to content
Blog

Join performance to the rescue

Introduction

CrateDB is a SQL standard-compatible distributed database. This means each query executed on CrateDB goes through a query optimizer, which tries to find the best logical plan possible for a given query. In some situations, the query optimizer cannot generate the optimal execution plan for a given query.  In cases when the performance of a query suffers from an inefficient plan, it is helpful to have more control over the query execution. In this post, we will show how the query optimizer in CrateDB can be fine-tuned for specific queries.

Problem description

Let's explore this together with a real-world example. Imagine the following tables:

SELECT * FROM customers ORDER BY id LIMIT 5;
+----+------------------------------+---------------------------------+-----------------------------------------------+-------------+
| id | name                         | email                           | address                                       | country     | 
+----+------------------------------+---------------------------------+-----------------------------------------------+-------------+
|  1 | Cornelius Kugler             | c.kugler@schober.com            | Selma-Huber-Ring 8 7636 Lienz                 | Austria     |
|  2 | Lionel Seiwald               | seiwald@hammerer-mandl.de       | Ingeborg-Margraf-Straße 79 96464 Kronach      | Germany     |
|  3 | Ela Bader                    | bader@stocher.com               | Grabnergasse 3 6750 Oberwart                  | Austria     |  
|  4 | Emilija Leithner             | emilija@siegl.com               | 14 Tanner Parkways Vogthaven, Nj 77764        | US          |
|  5 | Sebastian Sattler            | s.sattler@weiss.net             | Winklerallee 4 02786 Augsburg                 | Germany     |
+----+------------------------------+---------------------------------+-----------------------------------------------+-------------+

SELECT * FROM clusters ORDER BY id LIMIT 5;
+----+-------------+-----------------+--------------+
| id | customer_id | number_of_nodes | name         |
+----+-------------+-----------------+--------------+
|  1 |           1 |              10 | Hahnenköpfle |
|  2 |           2 |              35 | Diedamskopf  |
|  3 |           2 |               8 | Mondspitze   |
|  4 |           3 |              12 | Bocksberg    |
|  5 |           3 |              23 | Valluga      |
+----+-------------+-----------------+--------------+

Now assume the following use-case: We would like to send a special offer to our cloud customers, especially those running the big clusters. We need to know the total number of nodes each customer has in its clusters and all the necessary customer details to contact them. So we need all the customer's details from the customers table and the summary of all nodes each customer owns. Therefore we need to join customers with clusters and aggregate the number_of_nodes:

SELECT 
  customers.id, 
  customers.name, 
  customers.email, 
  customers.address, 
  customers.country, 
  Sum(clusters.number_of_nodes) AS number_of_nodes 
FROM 
  customers 
  INNER JOIN clusters ON clusters.customer_id = customers.id 
GROUP BY 
  customers.id, 
  customers.name, 
  customers.address, 
  customers.email, 
  customers.address, 
  customers.country 
ORDER BY 
  number_of_nodes DESC;   
+-----+------------------------------+---------------------------------+-----------------------------------------------+---------+-----------------+
|  id | name                         | email                           | address                                       | country | number_of_nodes |
+-----+------------------------------+---------------------------------+-----------------------------------------------+---------+-----------------+
| 810 | Gino Neuschäfer              | gino@haenel.com                 | Valentino-Aigner-Gasse 702 4427 Eggenburg     | Austria |            219  |
| 177 | Heinz Siegrist               | heinz@schoch.com                | Huhnallee 7/0 29172 Fulda                     | Germany |            210  |
| 201 | Diogo Stauffer               | stauffer@mader.com              | Grabnergasse 3 6750 Oberwart                  | Austria |            201  | 
| 127 | James Rosario                | j.rosario@smith.net             | 6313 Amanda Brook Apt. 229 Lake Ryanfur       | US      |            190  |
| 384 | Felipa Guerrero Benítez      | elipa@alegre-oller.net          | Poseo De Miguel Sierra 67 Toledo, 55721       | Spain   |            188  |
|  12 | Damian König                 | damian@simic.net                | Klausring 5/2 7161 Rust                       | Austria |            177  |
...	

This query takes over 300 ms on my M1 Macbook with a single node setup with 150k rows for customers and 100k rows for clusters.

Investigation

Let's investigate and see if we can improve the performance. First, we will have a look into the execution plan using the EXPLAIN command:

EXPLAIN 
SELECT 
  customers.id, 
  customers.name, 
  customers.email, 
  customers.address, 
  customers.country, 
  Sum(clusters.number_of_nodes) AS number_of_nodes 
FROM 
  customers 
  INNER JOIN clusters ON clusters.customer_id = customers.id 
GROUP BY 
  customers.id, 
  customers.name, 
  customers.address, 
  customers.email, 
  customers.address, 
  customers.country 
ORDER BY 
  number_of_nodes DESC;
+------------------------------------------------------------------------------------------------------+
| QUERY PLAN                                                                                           |
+------------------------------------------------------------------------------------------------------+
| Eval[id, name, email, address, country, sum(number_of_nodes) AS number_of_nodes]                     |
|   └ OrderBy[sum(number_of_nodes) DESC]                                                               |
|     └ GroupHashAggregate[id, name, address, email, address, country | sum(number_of_nodes)]          |
|       └ Eval[id, name, address, email, country, number_of_nodes, customer_id]                        |
|         └ HashJoin[(customer_id = id)]                                                               |
|           ├ Collect[doc.clusters | [number_of_nodes, customer_id] | true]                            |
|           └ Collect[doc.customers | [id, name, address, email, country] | true]                      |
+------------------------------------------------------------------------------------------------------+               
Let’s dive into the details of the query plan to understand what is happening here. The query plan is a tree of operators that gives us insights into how this query is handled internally. The order of execution is handled from the bottom to the top. Thus we will also be looking into the operations backward:

  • The operators Collect[doc.customers] and Collect[doc.clusters] collect data from the tables customers and clusters.
  • The operator HashJoin[(customer_id = id)] performs a hash-join on the rows from the underlying collect operators with the join condition customer_id = id.
  • The Eval operator will pick the values [id, name, address, email, country, number_of_nodes, customer_id] from the underlying source and return them in the described order.
  • The operator GroupHashAggregate[id, name | sum(number_of_nodes)] aggregates the values of number_of_nodes for each row.
  • The operator OrderBy[sum(number_of_nodes) DESC] will apply to descend ordering on the aggregation value.
  • The final Eval operator will rearrange the order and naming of the result tuple.

So how can this query plan be improved? A common case where the optimizer may be wrong is the order in which tables are joined together. The optimizer usually places the smaller table on the right side of a hash-join because, in most cases, this improves performance, and exactly this happened here. However, one exception is when the data volume of the smaller table exceeds the larger table. Let's investigate further into this. We can gather more information about the two tables customers and clusters from sys.shards:

SELECT 
  schema_name, 
  table_name, 
  sum(num_docs) AS docs, 
  sum(size) AS "size in bytes" 
FROM 
  sys.shards 
WHERE 
  table_name IN ('customers', 'clusters') 
GROUP BY 
  schema_name, 
  table_name;
+-------------+------------+--------+---------------+
| schema_name | table_name |   docs | size in bytes |
+-------------+------------+--------+---------------+
| doc         | customers  | 100000 |      21867499 |
| doc         | clusters   | 150000 |       6731594 |
+-------------+------------+--------+---------------+

This indicates that the table customers with fewer rows has a larger data volume than clusters, and this means the chosen join-order by the optimizer is not ideal for a hash-join. There are statistics available in pg_catalog.pg_stats for more details:

SELECT 
  schemaname, 
  tablename, 
  attname, 
  avg_width * n_distinct AS "total size in bytes" 
FROM 
  pg_catalog.pg_stats 
WHERE 
  tablename IN ('customers', 'clusters');
+------------+-----------+-----------------+---------------------+
| schemaname | tablename | attname         | total size in bytes |
+------------+-----------+-----------------+---------------------+
| doc        | customers | country         |                56.0 |
| doc        | customers | email           |           8051904.0 |
| doc        | customers | name            |           6964800.0 |
| doc        | customers | address         |          11300000.0 |
| doc        | customers | id              |           1600000.0 |
| doc        | clusters  | number_of_nodes |              1600.0 |
| doc        | clusters  | customer_id     |             16000.0 |
| doc        | clusters  | id              |           2400000.0 |
+------------+-----------+-----------------+---------------------+

Now we can see exactly how the data volume is distributed over the columns of the tables. So for these problematic cases, there is the option to disable join-reordering and rely exactly on the order of the tables as described in the query. The join-reordering for the different join operators CrateDB supports can be disabled using the following session settings:

SET optimizer_reorder_hash_join = false;
SET optimizer_reorder_nested_loop_join = false;

Please be aware that these settings are experimental and may change in the future. We have in our query only a hash-join operator. Therefore, it is sufficient to disable the join-reordering only for those.  Let’s give this a try and check the new execution plan:

+-------------------------------------------------------------------------------------------------------+
| QUERY PLAN                                                                                            |
+-------------------------------------------------------------------------------------------------------+
| Eval[id, name, email, address, sum(number_of_nodes) AS number_of_nodes]                               |
|   └ OrderBy[sum(number_of_nodes) DESC]                                                                |
|     └ GroupHashAggregate[id, name, email, address | sum(number_of_nodes)]                             |
|       └ HashJoin[(customer_id = id)]                                                                  |
|         ├ Collect[doc.customers | [id, name, email, address] | true]                                  |
|         └ Collect[doc.clusters | [number_of_nodes, customer_id] | true]                               |
+-------------------------------------------------------------------------------------------------------+

As expected, the order of the Collect operators remains now exactly as described in the original query. To evaluate the performance of these two variants properly, we should run a benchmark. We can use the cr8 CrateDB command line tools for that purpose. The run-spec functionality of cr8 runs crate-benchmarks and helps to evaluate the results:

$ cr8 run-spec specs/top-cloud-customers.toml localhost:4200
Running query 
...
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [05:18<00:00,  3.11 requests/s]
Runtime (in ms):
    mean:    324.602 ± 3.054
    min/max: 232.521 → 817.396
...

$ cr8 run-spec specs/top-cloud-customers-join-reordering-disabled.toml localhost:4200
Running query 
...
100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [03:12<00:00,  5.59 requests/s]
Runtime (in ms):
    mean:    180.223 ± 1.028
    min/max: 163.816 → 676.503
...

This looks good! Execution time for this query improved on average from 324ms to 180ms. So, preventing the optimizer from re-arranging the order of the joined tables improved the performance by 80%.

Summary

In this post, we covered how the performance of a query containing a join and an aggregation can be improved by fixating the join order. We usually recommend relying on the optimizer, but in certain cases, it’s worth investigating further. We also currently improving our query optimizer, moving to a cost-based approach to make the query optimization better.

Are you interested in CrateDB? Have a look at the Documentation. If you have questions, check out our CrateDB Community. We're happy to help!