Introduction
CrateDB is a SQL standard-compatible distributed database. This means each query executed on CrateDB goes through a query optimizer, which tries to find the best logical plan possible for a given query. In some situations, the query optimizer cannot generate the optimal execution plan for a given query. In cases when the performance of a query suffers from an inefficient plan, it is helpful to have more control over the query execution. In this post, we will show how the query optimizer in CrateDB can be fine-tuned for specific queries.
Problem description
Let's explore this together with a real-world example. Imagine the following tables:
SELECT * FROM customers ORDER BY id LIMIT 5; +----+------------------------------+---------------------------------+-----------------------------------------------+-------------+ | id | name | email | address | country | +----+------------------------------+---------------------------------+-----------------------------------------------+-------------+ | 1 | Cornelius Kugler | c.kugler@schober.com | Selma-Huber-Ring 8 7636 Lienz | Austria | | 2 | Lionel Seiwald | seiwald@hammerer-mandl.de | Ingeborg-Margraf-Straße 79 96464 Kronach | Germany | | 3 | Ela Bader | bader@stocher.com | Grabnergasse 3 6750 Oberwart | Austria | | 4 | Emilija Leithner | emilija@siegl.com | 14 Tanner Parkways Vogthaven, Nj 77764 | US | | 5 | Sebastian Sattler | s.sattler@weiss.net | Winklerallee 4 02786 Augsburg | Germany | +----+------------------------------+---------------------------------+-----------------------------------------------+-------------+ SELECT * FROM clusters ORDER BY id LIMIT 5; +----+-------------+-----------------+--------------+ | id | customer_id | number_of_nodes | name | +----+-------------+-----------------+--------------+ | 1 | 1 | 10 | Hahnenköpfle | | 2 | 2 | 35 | Diedamskopf | | 3 | 2 | 8 | Mondspitze | | 4 | 3 | 12 | Bocksberg | | 5 | 3 | 23 | Valluga | +----+-------------+-----------------+--------------+
Now assume the following use-case: We would like to send a special offer to our cloud customers, especially those running the big clusters. We need to know the total number of nodes each customer has in its clusters and all the necessary customer details to contact them. So we need all the customer's details from the customers
table and the summary of all nodes each customer owns. Therefore we need to join customers
with clusters
and aggregate the number_of_nodes
:
SELECT customers.id, customers.name, customers.email, customers.address, customers.country, Sum(clusters.number_of_nodes) AS number_of_nodes FROM customers INNER JOIN clusters ON clusters.customer_id = customers.id GROUP BY customers.id, customers.name, customers.address, customers.email, customers.address, customers.country ORDER BY number_of_nodes DESC; +-----+------------------------------+---------------------------------+-----------------------------------------------+---------+-----------------+ | id | name | email | address | country | number_of_nodes | +-----+------------------------------+---------------------------------+-----------------------------------------------+---------+-----------------+ | 810 | Gino Neuschäfer | gino@haenel.com | Valentino-Aigner-Gasse 702 4427 Eggenburg | Austria | 219 | | 177 | Heinz Siegrist | heinz@schoch.com | Huhnallee 7/0 29172 Fulda | Germany | 210 | | 201 | Diogo Stauffer | stauffer@mader.com | Grabnergasse 3 6750 Oberwart | Austria | 201 | | 127 | James Rosario | j.rosario@smith.net | 6313 Amanda Brook Apt. 229 Lake Ryanfur | US | 190 | | 384 | Felipa Guerrero Benítez | elipa@alegre-oller.net | Poseo De Miguel Sierra 67 Toledo, 55721 | Spain | 188 | | 12 | Damian König | damian@simic.net | Klausring 5/2 7161 Rust | Austria | 177 | ...
This query takes over 300 ms on my M1 Macbook with a single node setup with 150k rows for customers
and 100k rows for clusters
.
Investigation
Let's investigate and see if we can improve the performance. First, we will have a look into the execution plan using the EXPLAIN
command:
EXPLAIN SELECT customers.id, customers.name, customers.email, customers.address, customers.country, Sum(clusters.number_of_nodes) AS number_of_nodes FROM customers INNER JOIN clusters ON clusters.customer_id = customers.id GROUP BY customers.id, customers.name, customers.address, customers.email, customers.address, customers.country ORDER BY number_of_nodes DESC; +------------------------------------------------------------------------------------------------------+ | QUERY PLAN | +------------------------------------------------------------------------------------------------------+ | Eval[id, name, email, address, country, sum(number_of_nodes) AS number_of_nodes] | | └ OrderBy[sum(number_of_nodes) DESC] | | └ GroupHashAggregate[id, name, address, email, address, country | sum(number_of_nodes)] | | └ Eval[id, name, address, email, country, number_of_nodes, customer_id] | | └ HashJoin[(customer_id = id)] | | ├ Collect[doc.clusters | [number_of_nodes, customer_id] | true] | | └ Collect[doc.customers | [id, name, address, email, country] | true] | +------------------------------------------------------------------------------------------------------+
- The operators
Collect[doc.customers]
andCollect[doc.clusters]
collect data from the tablescustomers
andclusters
. - The operator
HashJoin[(customer_id = id)]
performs a hash-join on the rows from the underlying collect operators with the join conditioncustomer_id = id
. - The
Eval
operator will pick the values[id, name, address, email, country, number_of_nodes, customer_id]
from the underlying source and return them in the described order. - The operator
GroupHashAggregate[id, name | sum(number_of_nodes)]
aggregates the values of number_of_nodes
for each row. - The operator
OrderBy[sum(number_of_nodes) DESC]
will apply to descend ordering on the aggregation value. - The final
Eval
operator will rearrange the order and naming of the result tuple.
So how can this query plan be improved? A common case where the optimizer may be wrong is the order in which tables are joined together. The optimizer usually places the smaller table on the right side of a hash-join because, in most cases, this improves performance, and exactly this happened here. However, one exception is when the data volume of the smaller table exceeds the larger table. Let's investigate further into this. We can gather more information about the two tables customers
and clusters
from sys.shards
:
SELECT schema_name, table_name, sum(num_docs) AS docs, sum(size) AS "size in bytes" FROM sys.shards WHERE table_name IN ('customers', 'clusters') GROUP BY schema_name, table_name; +-------------+------------+--------+---------------+ | schema_name | table_name | docs | size in bytes | +-------------+------------+--------+---------------+ | doc | customers | 100000 | 21867499 | | doc | clusters | 150000 | 6731594 | +-------------+------------+--------+---------------+
This indicates that the table customers
with fewer rows has a larger data volume than clusters
, and this means the chosen join-order by the optimizer is not ideal for a hash-join. There are statistics available in pg_catalog.pg_stats
for more details:
SELECT schemaname, tablename, attname, avg_width * n_distinct AS "total size in bytes" FROM pg_catalog.pg_stats WHERE tablename IN ('customers', 'clusters'); +------------+-----------+-----------------+---------------------+ | schemaname | tablename | attname | total size in bytes | +------------+-----------+-----------------+---------------------+ | doc | customers | country | 56.0 | | doc | customers | email | 8051904.0 | | doc | customers | name | 6964800.0 | | doc | customers | address | 11300000.0 | | doc | customers | id | 1600000.0 | | doc | clusters | number_of_nodes | 1600.0 | | doc | clusters | customer_id | 16000.0 | | doc | clusters | id | 2400000.0 | +------------+-----------+-----------------+---------------------+
Now we can see exactly how the data volume is distributed over the columns of the tables. So for these problematic cases, there is the option to disable join-reordering and rely exactly on the order of the tables as described in the query. The join-reordering for the different join operators CrateDB supports can be disabled using the following session settings:
SET optimizer_reorder_hash_join = false; SET optimizer_reorder_nested_loop_join = false;
Please be aware that these settings are experimental and may change in the future. We have in our query only a hash-join operator. Therefore, it is sufficient to disable the join-reordering only for those. Let’s give this a try and check the new execution plan:
+-------------------------------------------------------------------------------------------------------+ | QUERY PLAN | +-------------------------------------------------------------------------------------------------------+ | Eval[id, name, email, address, sum(number_of_nodes) AS number_of_nodes] | | └ OrderBy[sum(number_of_nodes) DESC] | | └ GroupHashAggregate[id, name, email, address | sum(number_of_nodes)] | | └ HashJoin[(customer_id = id)] | | ├ Collect[doc.customers | [id, name, email, address] | true] | | └ Collect[doc.clusters | [number_of_nodes, customer_id] | true] | +-------------------------------------------------------------------------------------------------------+
As expected, the order of the Collect operators remains now exactly as described in the original query. To evaluate the performance of these two variants properly, we should run a benchmark. We can use the cr8 CrateDB command line tools for that purpose. The run-spec functionality of cr8 runs crate-benchmarks and helps to evaluate the results:
$ cr8 run-spec specs/top-cloud-customers.toml localhost:4200 Running query ... 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [05:18<00:00, 3.11 requests/s] Runtime (in ms): mean: 324.602 ± 3.054 min/max: 232.521 → 817.396 ...
$ cr8 run-spec specs/top-cloud-customers-join-reordering-disabled.toml localhost:4200 Running query ... 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [03:12<00:00, 5.59 requests/s] Runtime (in ms): mean: 180.223 ± 1.028 min/max: 163.816 → 676.503 ...
This looks good! Execution time for this query improved on average from 324ms to 180ms. So, preventing the optimizer from re-arranging the order of the joined tables improved the performance by 80%.
Summary
In this post, we covered how the performance of a query containing a join and an aggregation can be improved by fixating the join order. We usually recommend relying on the optimizer, but in certain cases, it’s worth investigating further. We also currently improving our query optimizer, moving to a cost-based approach to make the query optimization better.
Are you interested in CrateDB? Have a look at the Documentation. If you have questions, check out our CrateDB Community. We're happy to help!