# Monitoring query performance¶

When analyzing options for query tuning, the first step is to analyze the query plan and execution.

The query plan and execution details explains how SQream DB processes a query and where time is spent.

This document details how to analyze query performance with execution plans.

This guide focuses specifically on identifying bottlenecks and possible optimization techniques to improve query performance.

Performance tuning options for each query are different. You should adapt the recommendations and tips for your own workloads.

## Setting up the system for monitoring¶

By default, SQream DB logs execution details for every statement that runs for more than 60 seconds. If you want to see the execution details for a currently running statement, see The SHOW_NODE_INFO command below.

To adjust the frequency of logging for statements, you may want to reduce the interval from 60 seconds down to, say, 5 or 10 seconds. Modify the configuration files and set the nodeInfoLoggingSec parameter as you see fit:

{
"compileFlags":{
},
"runtimeFlags":{
},
"runtimeGlobalFlags":{
"nodeInfoLoggingSec" : 5,
},
"server":{
}
}


After restarting the SQream DB cluster, the execution plan details will be logged to the standard SQream DB logs directory, as a message of type 200.

You can see these messages with a text viewer or with queries on the log Foreign Tables.

### Reading execution plans with a foreign table¶

First, create a foreign table for the logs

CREATE FOREIGN TABLE logs
(
start_marker      VARCHAR(4),
row_id            BIGINT,
timestamp         DATETIME,
message_level     TEXT,
worker_hostname   TEXT,
worker_port       INT,
connection_id     INT,
database_name     TEXT,
user_name         TEXT,
statement_id      INT,
service_name      TEXT,
message_type_id   INT,
message           TEXT,
end_message       VARCHAR(5)
)
WRAPPER cdv_fdw
OPTIONS
(
LOCATION = '/home/rhendricks/sqream_storage/logs/**/sqream*.log',
DELIMITER = '|'
)
;


Once you’ve defined the foreign table, you can run queries to observe the previously logged execution plans. This is recommended over looking at the raw logs.

t=> SELECT message
.     FROM logs
.     WHERE message_type_id = 200
.     AND timestamp BETWEEN '2020-06-11' AND '2020-06-13';
message
---------------------------------------------------------------------------------------------------------------------------------
SELECT *,coalesce((depdelay > 15),false) AS isdepdelayed FROM ontime WHERE year IN (2005, 2006, 2007, 2008, 2009, 2010)
:
: 1,PushToNetworkQueue  ,10354468,10,1035446,2020-06-12 20:41:42,-1,,,,13.55
: 2,Rechunk             ,10354468,10,1035446,2020-06-12 20:41:42,1,,,,0.10
: 3,ReorderInput        ,10354468,10,1035446,2020-06-12 20:41:42,2,,,,0.00
: 4,DeferredGather      ,10354468,10,1035446,2020-06-12 20:41:42,3,,,,1.23
: 5,ReorderInput        ,10354468,10,1035446,2020-06-12 20:41:41,4,,,,0.01
: 6,GpuToCpu            ,10354468,10,1035446,2020-06-12 20:41:41,5,,,,0.07
: 7,GpuTransform        ,10354468,10,1035446,2020-06-12 20:41:41,6,,,,0.02
: 8,ReorderInput        ,10354468,10,1035446,2020-06-12 20:41:41,7,,,,0.00
: 9,Filter              ,10354468,10,1035446,2020-06-12 20:41:41,8,,,,0.07
: 10,GpuTransform        ,10485760,10,1048576,2020-06-12 20:41:41,9,,,,0.07
: 11,GpuDecompress       ,10485760,10,1048576,2020-06-12 20:41:41,10,,,,0.03
: 12,GpuTransform        ,10485760,10,1048576,2020-06-12 20:41:41,11,,,,0.22
: 13,CpuToGpu            ,10485760,10,1048576,2020-06-12 20:41:41,12,,,,0.76
: 14,ReorderInput        ,10485760,10,1048576,2020-06-12 20:41:40,13,,,,0.11
: 15,Rechunk             ,10485760,10,1048576,2020-06-12 20:41:40,14,,,,5.58
: 16,CpuDecompress       ,10485760,10,1048576,2020-06-12 20:41:34,15,,,,0.04


## The SHOW_NODE_INFO command¶

The SHOW_NODE_INFO command returns a snapshot of the current query plan, similar to EXPLAIN ANALYZE from other databases.

The SHOW_NODE_INFO result, just like the periodically-logged execution plans described above, are an at-the-moment view of the compiler’s execution plan and runtime statistics for the specified statement.

To inspect a currently running statement, execute the show_node_info utility function in a SQL client like

sqream sql, the SQream Studio Editor, or any other third party SQL terminal.

In this example, we inspect a statement with statement ID of 176. The command looks like this:

t=> SELECT SHOW_NODE_INFO(176);
stmt_id | node_id | node_type          | rows | chunks | avg_rows_in_chunk | time                | parent_node_id | read | write | comment    | timeSum
--------+---------+--------------------+------+--------+-------------------+---------------------+----------------+------+-------+------------+--------
176 |       1 | PushToNetworkQueue |    1 |      1 |                 1 | 2019-12-25 23:53:13 |             -1 |      |       |            |  0.0025
176 |       2 | Rechunk            |    1 |      1 |                 1 | 2019-12-25 23:53:13 |              1 |      |       |            |       0
176 |       3 | GpuToCpu           |    1 |      1 |                 1 | 2019-12-25 23:53:13 |              2 |      |       |            |       0
176 |       4 | ReorderInput       |    1 |      1 |                 1 | 2019-12-25 23:53:13 |              3 |      |       |            |       0
176 |       5 | Filter             |    1 |      1 |                 1 | 2019-12-25 23:53:13 |              4 |      |       |            |  0.0002
176 |       6 | GpuTransform       |  457 |      1 |               457 | 2019-12-25 23:53:13 |              5 |      |       |            |  0.0002
176 |       7 | GpuDecompress      |  457 |      1 |               457 | 2019-12-25 23:53:13 |              6 |      |       |            |       0
176 |       8 | CpuToGpu           |  457 |      1 |               457 | 2019-12-25 23:53:13 |              7 |      |       |            |  0.0003
176 |       9 | Rechunk            |  457 |      1 |               457 | 2019-12-25 23:53:13 |              8 |      |       |            |       0
176 |      10 | CpuDecompress      |  457 |      1 |               457 | 2019-12-25 23:53:13 |              9 |      |       |            |       0
176 |      11 | ReadTable          |  457 |      1 |               457 | 2019-12-25 23:53:13 |             10 | 4MB  |       | public.nba |  0.0004


## Understanding the query execution plan output¶

Both SHOW_NODE_INFO and the logged execution plans represents the query plan as a graph hierarchy, with data separated into different columns.

Each row represents a single logical database operation, which is also called a node or chunk producer. A node reports several metrics during query execution, such as how much data it has read and written, how many chunks and rows, and how much time has elapsed.

Consider the example show_node_info presented above. The source node with ID #11 (ReadTable), has a parent node ID #10 (CpuDecompress). If we were to draw this out in a graph, it’d look like this:

This graph explains how the query execution details are arranged in a logical order, from the bottom up.

The last node, also called the sink, has a parent node ID of -1, meaning it has no parent. This is typically a node that sends data over the network or into a table.

When using SHOW_NODE_INFO, a tabular representation of the currently running statement execution is presented.

See the examples below to understand how the query execution plan is instrumental in identifying bottlenecks and optimizing long-running statements.

### Information presented in the execution plan¶

Result columns

Column name

Description

stmt_id

The ID for the statement

node_id

This node’s ID in this execution plan

node_type

The node type

rows

Total number of rows this node has processed

chunks

Number of chunks this node has processed

avg_rows_in_chunk

Average amount of rows that this node processes in each chunk (rows / chunks)

time

Timestamp for this node’s creation

parent_node_id

The node_id of this node’s parent

read

write

Total data written to disk

comment

Additional information (e.g. table name for ReadTable)

timesum

Total elapsed time for this execution node’s processing

### Commonly seen nodes¶

Node types

Column name

Execution location

Description

CpuDecompress

CPU

Decompression operation, common for longer VARCHAR types

CpuLoopJoin

CPU

A non-indexed nested loop join, performed on the CPU

CpuReduce

CPU

A reduce process performed on the CPU, primarily with DISTINCT aggregates (e.g. COUNT(DISTINCT ...))

CpuToGpu, GpuToCpu

An operation that moves data to or from the GPU for processing

CpuTransform

CPU

A transform operation performed on the CPU, usually a scalar function

DeferredGather

CPU

Merges the results of GPU operations with a result set

Distinct

GPU

Removes duplicate rows (usually as part of the DISTINCT operation)

Distinct_Merge

CPU

The merge operation of the Distinct operation

Filter

GPU

A filtering operation, such as a WHERE or JOIN clause

GpuDecompress

GPU

Decompression operation

GpuReduceMerge

GPU

An operation to optimize part of the merger phases in the GPU

GpuTransform

GPU

A transformation operation such as a type cast or scalar function

LocateFiles

CPU

Validates external file paths for foreign data wrappers, expanding directories and GLOB patterns

LoopJoin

GPU

A non-indexed nested loop join, performed on the GPU

ParseCsv

CPU

A CSV parser, used after ReadFiles to convert the CSV into columnar data

PushToNetworkQueue

CPU

Sends result sets to a client connected over the network

ReadFiles

CPU

ReadTable

CPU

Reads data from a standard table stored on disk

Rechunk

Reorganize multiple small chunks into a full chunk. Commonly found after joins and when HIGH_SELECTIVITY is used

Reduce

GPU

A reduction operation, such as a GROUP BY

ReduceMerge

GPU

A merge operation of a reduction operation, helps operate on larger-than-RAM data

ReorderInput

Change the order of arguments in preparation for the next operation

SeparatedGather

GPU

Gathers additional columns for the result

Sort

GPU

Sort operation

TakeRowsFromChunk

Take the first N rows from each chunk, to optimize LIMIT when used alongside ORDER BY

Top

Limits the input size, when used with LIMIT (or its alias TOP)

UdfTransform

CPU

Executes a user defined function

UnionAll

Combines two sources of data when UNION ALL is used

Window

GPU

Executes a non-ranking window function

WindowRanking

GPU

Executes a ranking window function

WriteTable

CPU

Writes the result set to a standard table stored on disk

Tip

The full list of nodes appears in the Node types table, as part of the SHOW_NODE_INFO reference.

## Examples¶

In general, looking at the top three longest running nodes (as is detailed in the timeSum column) can indicate the biggest bottlenecks.

In the following examples you will learn how to identify and solve some common issues.

### 1. Spooling to disk¶

When there is not enough RAM to process a statement, SQream DB will spill over data to the temp folder in the storage disk. While this ensures that a statement can always finish processing, it can slow down the processing significantly.

It’s worth identifying these statements, to figure out if the cluster is configured correctly, as well as potentially reduce the statement size.

You can identify a statement that spools to disk by looking at the write column in the execution details. A node that spools will have a value, shown in megabytes in the write column.

Common nodes that write spools include Join or LoopJoin.

#### Identifying the offending nodes¶

1. Run a query.

For example, a query from the TPC-H benchmark:

SELECT o_year,
SUM(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END) / SUM(volume) AS mkt_share
FROM (SELECT datepart(YEAR,o_orderdate) AS o_year,
l_extendedprice*(1 - l_discount / 100.0) AS volume,
n2.n_name AS nation
FROM lineitem
JOIN part ON p_partkey = CAST (l_partkey AS INT)
JOIN orders ON l_orderkey = o_orderkey
JOIN customer ON o_custkey = c_custkey
JOIN nation n1 ON c_nationkey = n1.n_nationkey
JOIN region ON n1.n_regionkey = r_regionkey
JOIN supplier ON s_suppkey = l_suppkey
JOIN nation n2 ON s_nationkey = n2.n_nationkey
WHERE o_orderdate BETWEEN '1995-01-01' AND '1996-12-31') AS all_nations
GROUP BY o_year
ORDER BY o_year;

2. Observe the execution information by using the foreign table, or use show_node_info

This statement is made up of 199 nodes, starting from a ReadTable, and finishes by returning only 2 results to the client.

The execution below has been shortened, but note the highlighted rows for LoopJoin:

t=> SELECT message FROM logs WHERE message_type_id = 200 LIMIT 1;
message
-----------------------------------------------------------------------------------------
SELECT o_year,
SUM(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END) / SUM(volume) AS mkt_share
: FROM (SELECT datepart(YEAR,o_orderdate) AS o_year,
:              l_extendedprice*(1 - l_discount / 100.0) AS volume,
:              n2.n_name AS nation
:       FROM lineitem
:         JOIN part ON p_partkey = CAST (l_partkey AS INT)
:         JOIN orders ON l_orderkey = o_orderkey
:         JOIN customer ON o_custkey = c_custkey
:         JOIN nation n1 ON c_nationkey = n1.n_nationkey
:         JOIN region ON n1.n_regionkey = r_regionkey
:         JOIN supplier ON s_suppkey = l_suppkey
:         JOIN nation n2 ON s_nationkey = n2.n_nationkey
:       WHERE o_orderdate BETWEEN '1995-01-01' AND '1996-12-31') AS all_nations
: GROUP BY o_year
: ORDER BY o_year
: 1,PushToNetworkQueue  ,2,1,2,2020-09-04 18:32:50,-1,,,,0.27
: 2,Rechunk             ,2,1,2,2020-09-04 18:32:50,1,,,,0.00
: 3,SortMerge           ,2,1,2,2020-09-04 18:32:49,2,,,,0.00
: 4,GpuToCpu            ,2,1,2,2020-09-04 18:32:49,3,,,,0.00
: 5,Sort                ,2,1,2,2020-09-04 18:32:49,4,,,,0.00
: 6,ReorderInput        ,2,1,2,2020-09-04 18:32:49,5,,,,0.00
: 7,GpuTransform        ,2,1,2,2020-09-04 18:32:49,6,,,,0.00
: 8,CpuToGpu            ,2,1,2,2020-09-04 18:32:49,7,,,,0.00
: 9,Rechunk             ,2,1,2,2020-09-04 18:32:49,8,,,,0.00
: 10,ReduceMerge         ,2,1,2,2020-09-04 18:32:49,9,,,,0.03
: 11,GpuToCpu            ,6,3,2,2020-09-04 18:32:49,10,,,,0.00
: 12,Reduce              ,6,3,2,2020-09-04 18:32:49,11,,,,0.64
[...]
: 49,LoopJoin            ,182369485,7,26052783,2020-09-04 18:32:36,48,1915MB,1915MB,inner,4.94
[...]
: 98,LoopJoin            ,182369485,12,15197457,2020-09-04 18:32:16,97,2191MB,2191MB,inner,5.01
[...]
: 124,LoopJoin            ,182369485,8,22796185,2020-09-04 18:32:03,123,3064MB,3064MB,inner,6.73
[...]
: 150,LoopJoin            ,182369485,10,18236948,2020-09-04 18:31:47,149,12860MB,12860MB,inner,23.62
[...]


Because of the relatively low amount of RAM in the machine and because the data set is rather large at around 10TB, SQream DB needs to spool.

The total spool used by this query is around 20GB (1915MB + 2191MB + 3064MB + 12860MB).

#### Common solutions for reducing spool¶

• Increase the amount of spool memory available for the workers, as a proportion of the maximum statement memory. When the amount of spool memory is increased, SQream DB may not need to write to disk.

This setting is called spoolMemoryGB. Refer to the Configuration guide.

• Reduce the amount of workers per host, and increase the amount of spool available to the (now reduced amount of) active workers. This may reduce the amount of concurrent statements, but will improve performance for heavy statements.

### 2. Queries with large result sets¶

When queries have large result sets, you may see a node called DeferredGather.

This gathering occurs when the result set is assembled, in preparation for sending it to the client.

#### Identifying the offending nodes¶

1. Run a query.

For example, a modified query from the TPC-H benchmark:

SELECT s.*,
l.*,
r.*,
n1.*,
n2.*,
p.*,
o.*,
c.*
FROM lineitem l
JOIN part p ON p_partkey = CAST (l_partkey AS INT)
JOIN orders o ON l_orderkey = o_orderkey
JOIN customer c ON o_custkey = c_custkey
JOIN nation n1 ON c_nationkey = n1.n_nationkey
JOIN region r ON n1.n_regionkey = r_regionkey
JOIN supplier s ON s_suppkey = l_suppkey
JOIN nation n2 ON s_nationkey = n2.n_nationkey
WHERE r_name = 'AMERICA'
AND   o_orderdate BETWEEN '1995-01-01' AND '1996-12-31'
AND   high_selectivity(p_type = 'ECONOMY BURNISHED NICKEL');

2. Observe the execution information by using the foreign table, or use show_node_info

This statement is made up of 221 nodes, containing 8 ReadTable nodes, and finishes by returning billions of results to the client.

The execution below has been shortened, but note the highlighted rows for DeferredGather:

t=> SELECT show_node_info(494);
stmt_id | node_id | node_type            | rows      | chunks | avg_rows_in_chunk | time                | parent_node_id | read    | write | comment         | timeSum
--------+---------+----------------------+-----------+--------+-------------------+---------------------+----------------+---------+-------+-----------------+--------
494 |       1 | PushToNetworkQueue   |    242615 |      1 |            242615 | 2020-09-04 19:07:55 |             -1 |         |       |                 |    0.36
494 |       2 | Rechunk              |    242615 |      1 |            242615 | 2020-09-04 19:07:55 |              1 |         |       |                 |       0
494 |       3 | ReorderInput         |    242615 |      1 |            242615 | 2020-09-04 19:07:55 |              2 |         |       |                 |       0
494 |       4 | DeferredGather       |    242615 |      1 |            242615 | 2020-09-04 19:07:55 |              3 |         |       |                 |    0.16
[...]
494 |     166 | DeferredGather       |   3998730 |     39 |            102531 | 2020-09-04 19:07:47 |            165 |         |       |                 |   21.75
[...]
494 |     194 | DeferredGather       |    133241 |     20 |              6662 | 2020-09-04 19:07:03 |            193 |         |       |                 |    0.41
[...]
494 |     221 | ReadTable            |  20000000 |     20 |           1000000 | 2020-09-04 19:07:01 |            220 | 20MB    |       | public.part     |     0.1


When you see DeferredGather operations taking more than a few seconds, that’s a sign that you’re selecting too much data. In this case, the DeferredGather with node ID 166 took over 21 seconds.

3. Modify the statement to see the difference

Altering the select clause to be more restrictive will reduce the deferred gather time back to a few milliseconds.

SELECT DATEPART(year, o_orderdate) AS o_year,
l_extendedprice * (1 - l_discount / 100.0) as volume,
n2.n_name as nation
FROM ...


#### Common solutions for reducing gather time¶

• Reduce the effect of the preparation time. Avoid selecting unnecessary columns (SELECT * FROM...), or reduce the result set size by using more filters.

### 3. Inefficient filtering¶

When running statements, SQream DB tries to avoid reading data that is not needed for the statement by skipping chunks.

If statements do not include efficient filtering, SQream DB will read a lot of data off disk. In some cases, you need the data and there’s nothing to do about it. However, if most of it gets pruned further down the line, it may be efficient to skip reading the data altogether by using the metadata.

#### Identifying the situation¶

We consider the filtering to be inefficient when the Filter node shows that the number of rows processed is less than a third of the rows passed into it by the ReadTable node.

For example:

1. Run a query.

In this example, we execute a modified query from the TPC-H benchmark. Our lineitem table contains 600,037,902 rows.

SELECT o_year,
SUM(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END) / SUM(volume) AS mkt_share
FROM (SELECT datepart(YEAR,o_orderdate) AS o_year,
l_extendedprice*(1 - l_discount / 100.0) AS volume,
n2.n_name AS nation
FROM lineitem
JOIN part ON p_partkey = CAST (l_partkey AS INT)
JOIN orders ON l_orderkey = o_orderkey
JOIN customer ON o_custkey = c_custkey
JOIN nation n1 ON c_nationkey = n1.n_nationkey
JOIN region ON n1.n_regionkey = r_regionkey
JOIN supplier ON s_suppkey = l_suppkey
JOIN nation n2 ON s_nationkey = n2.n_nationkey
WHERE r_name = 'AMERICA'
AND   lineitem.l_quantity = 3
AND   o_orderdate BETWEEN '1995-01-01' AND '1996-12-31'
AND   high_selectivity(p_type = 'ECONOMY BURNISHED NICKEL')) AS all_nations
GROUP BY o_year
ORDER BY o_year;

2. Observe the execution information by using the foreign table, or use show_node_info

The execution below has been shortened, but note the highlighted rows for ReadTable and Filter:

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 t=> SELECT show_node_info(559); stmt_id | node_id | node_type | rows | chunks | avg_rows_in_chunk | time | parent_node_id | read | write | comment | timeSum --------+---------+----------------------+-----------+--------+-------------------+---------------------+----------------+--------+-------+-----------------+-------- 559 | 1 | PushToNetworkQueue | 2 | 1 | 2 | 2020-09-07 11:12:01 | -1 | | | | 0.28 559 | 2 | Rechunk | 2 | 1 | 2 | 2020-09-07 11:12:01 | 1 | | | | 0 559 | 3 | SortMerge | 2 | 1 | 2 | 2020-09-07 11:12:01 | 2 | | | | 0 559 | 4 | GpuToCpu | 2 | 1 | 2 | 2020-09-07 11:12:01 | 3 | | | | 0 [...] 559 | 189 | Filter | 12007447 | 12 | 1000620 | 2020-09-07 11:12:00 | 188 | | | | 0.3 559 | 190 | GpuTransform | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 189 | | | | 0.02 559 | 191 | GpuDecompress | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 190 | | | | 0.16 559 | 192 | GpuTransform | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 191 | | | | 0.02 559 | 193 | CpuToGpu | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 192 | | | | 1.47 559 | 194 | ReorderInput | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 193 | | | | 0 559 | 195 | Rechunk | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 194 | | | | 0 559 | 196 | CpuDecompress | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 195 | | | | 0 559 | 197 | ReadTable | 600037902 | 12 | 50003158 | 2020-09-07 11:12:00 | 196 | 7587MB | | public.lineitem | 0.1 [...] 559 | 208 | Filter | 133241 | 20 | 6662 | 2020-09-07 11:11:57 | 207 | | | | 0.01 559 | 209 | GpuTransform | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 208 | | | | 0.02 559 | 210 | GpuDecompress | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 209 | | | | 0.03 559 | 211 | GpuTransform | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 210 | | | | 0 559 | 212 | CpuToGpu | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 211 | | | | 0.01 559 | 213 | ReorderInput | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 212 | | | | 0 559 | 214 | Rechunk | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 213 | | | | 0 559 | 215 | CpuDecompress | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 214 | | | | 0 559 | 216 | ReadTable | 20000000 | 20 | 1000000 | 2020-09-07 11:11:57 | 215 | 20MB | | public.part | 0 
• The Filter on line 9 has processed 12,007,447 rows, but the output of ReadTable on public.lineitem on line 17 was 600,037,902 rows. This means that it has filtered out 98% ($$1 - \dfrac{600037902}{12007447} = 98\%$$) of the data, but the entire table was read.

• The Filter on line 19 has processed 133,000 rows, but the output of ReadTable on public.part on line 27 was 20,000,000 rows. This means that it has filtered out >99% ($$1 - \dfrac{133241}{20000000} = 99.4\%$$) of the data, but the entire table was read. However, this table is small enough that we can ignore it.

3. Modify the statement to see the difference

Altering the statement to have a WHERE condition on the clustered l_orderkey column of the lineitem table will help SQream DB skip reading the data.

SELECT o_year,
SUM(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END) / SUM(volume) AS mkt_share
FROM (SELECT datepart(YEAR,o_orderdate) AS o_year,
l_extendedprice*(1 - l_discount / 100.0) AS volume,
n2.n_name AS nation
FROM lineitem
JOIN part ON p_partkey = CAST (l_partkey AS INT)
JOIN orders ON l_orderkey = o_orderkey
JOIN customer ON o_custkey = c_custkey
JOIN nation n1 ON c_nationkey = n1.n_nationkey
JOIN region ON n1.n_regionkey = r_regionkey
JOIN supplier ON s_suppkey = l_suppkey
JOIN nation n2 ON s_nationkey = n2.n_nationkey
WHERE r_name = 'AMERICA'
AND   lineitem.l_orderkey > 4500000
AND   o_orderdate BETWEEN '1995-01-01' AND '1996-12-31'
AND   high_selectivity(p_type = 'ECONOMY BURNISHED NICKEL')) AS all_nations
GROUP BY o_year
ORDER BY o_year;

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 t=> SELECT show_node_info(586); stmt_id | node_id | node_type | rows | chunks | avg_rows_in_chunk | time | parent_node_id | read | write | comment | timeSum --------+---------+----------------------+-----------+--------+-------------------+---------------------+----------------+--------+-------+-----------------+-------- [...] 586 | 190 | Filter | 494621593 | 8 | 61827699 | 2020-09-07 13:20:45 | 189 | | | | 0.39 586 | 191 | GpuTransform | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 190 | | | | 0.03 586 | 192 | GpuDecompress | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 191 | | | | 0.26 586 | 193 | GpuTransform | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 192 | | | | 0.01 586 | 194 | CpuToGpu | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 193 | | | | 1.86 586 | 195 | ReorderInput | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 194 | | | | 0 586 | 196 | Rechunk | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 195 | | | | 0 586 | 197 | CpuDecompress | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 196 | | | | 0 586 | 198 | ReadTable | 494927872 | 8 | 61865984 | 2020-09-07 13:20:44 | 197 | 6595MB | | public.lineitem | 0.09 [...] 

In this example, the filter processed 494,621,593 rows, while the output of ReadTable on public.lineitem was 494,927,872 rows. This means that it has filtered out all but 0.01% ($$1 - \dfrac{494621593}{494927872} = 0.01\%$$) of the data that was read.

The metadata skipping has performed very well, and has pre-filtered the data for us by pruning unnecessary chunks.

### 4. Joins with varchar keys¶

Joins on long text keys, such as varchar(100) do not perform as well as numeric data types or very short text keys.

#### Identifying the situation¶

When a join is inefficient, you may note that a query spends a lot of time on the Join node.

For example, consider these two table structures:

CREATE TABLE t_a
(
amt            FLOAT NOT NULL,
i              INT NOT NULL,
ts             DATETIME NOT NULL,
country_code   VARCHAR(3) NOT NULL,
flag           VARCHAR(10) NOT NULL,
fk             VARCHAR(50) NOT NULL
);

CREATE TABLE t_b
(
id          VARCHAR(50) NOT NULL
prob        FLOAT NOT NULL,
j           INT NOT NULL,
);

1. Run a query.

In this example, we will join t_a.fk with t_b.id, both of which are VARCHAR(50).

SELECT AVG(t_b.j :: BIGINT),
t_a.country_code
FROM t_a
JOIN t_b ON (t_a.fk = t_b.id)
GROUP BY t_a.country_code

2. Observe the execution information by using the foreign table, or use show_node_info

The execution below has been shortened, but note the highlighted rows for Join. The Join node is by far the most time-consuming part of this statement - clocking in at 69.7 seconds joining 1.5 billion records.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 t=> SELECT show_node_info(5); stmt_id | node_id | node_type | rows | chunks | avg_rows_in_chunk | time | parent_node_id | read | write | comment | timeSum --------+---------+----------------------+------------+--------+-------------------+---------------------+----------------+-------+-------+------------+-------- [...] 5 | 19 | GpuTransform | 1497366528 | 204 | 7340032 | 2020-09-08 18:29:03 | 18 | | | | 1.46 5 | 20 | ReorderInput | 1497366528 | 204 | 7340032 | 2020-09-08 18:29:03 | 19 | | | | 0 5 | 21 | ReorderInput | 1497366528 | 204 | 7340032 | 2020-09-08 18:29:03 | 20 | | | | 0 5 | 22 | Join | 1497366528 | 204 | 7340032 | 2020-09-08 18:29:03 | 21 | | | inner | 69.7 5 | 24 | AddSortedMinMaxMet.. | 6291456 | 1 | 6291456 | 2020-09-08 18:26:05 | 22 | | | | 0 5 | 25 | Sort | 6291456 | 1 | 6291456 | 2020-09-08 18:26:05 | 24 | | | | 2.06 [...] 5 | 31 | ReadTable | 6291456 | 1 | 6291456 | 2020-09-08 18:26:03 | 30 | 235MB | | public.t_b | 0.02 [...] 5 | 41 | CpuDecompress | 10000000 | 2 | 5000000 | 2020-09-08 18:26:09 | 40 | | | | 0 5 | 42 | ReadTable | 10000000 | 2 | 5000000 | 2020-09-08 18:26:09 | 41 | 14MB | | public.t_a | 0 

#### Improving query performance¶

• In general, try to avoid VARCHAR as a join key. As a rule of thumb, BIGINT works best as a join key.

• Convert text values on-the-fly before running the query. For example, the CRC64 function takes a text input and returns a BIGINT hash.

For example:

SELECT AVG(t_b.j :: BIGINT),
t_a.country_code
FROM t_a
JOIN t_b ON (crc64_join(t_a.fk) = crc64_join(t_b.id))
GROUP BY t_a.country_code


The execution below has been shortened, but note the highlighted rows for Join. The Join node went from taking nearly 70 seconds, to just 6.67 seconds for joining 1.5 billion records.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 t=> SELECT show_node_info(6); stmt_id | node_id | node_type | rows | chunks | avg_rows_in_chunk | time | parent_node_id | read | write | comment | timeSum --------+---------+----------------------+------------+--------+-------------------+---------------------+----------------+-------+-------+------------+-------- [...] 6 | 19 | GpuTransform | 1497366528 | 85 | 17825792 | 2020-09-08 18:57:04 | 18 | | | | 1.48 6 | 20 | ReorderInput | 1497366528 | 85 | 17825792 | 2020-09-08 18:57:04 | 19 | | | | 0 6 | 21 | ReorderInput | 1497366528 | 85 | 17825792 | 2020-09-08 18:57:04 | 20 | | | | 0 6 | 22 | Join | 1497366528 | 85 | 17825792 | 2020-09-08 18:57:04 | 21 | | | inner | 6.67 6 | 24 | AddSortedMinMaxMet.. | 6291456 | 1 | 6291456 | 2020-09-08 18:55:12 | 22 | | | | 0 [...] 6 | 32 | ReadTable | 6291456 | 1 | 6291456 | 2020-09-08 18:55:12 | 31 | 235MB | | public.t_b | 0.02 [...] 6 | 43 | CpuDecompress | 10000000 | 2 | 5000000 | 2020-09-08 18:55:13 | 42 | | | | 0 6 | 44 | ReadTable | 10000000 | 2 | 5000000 | 2020-09-08 18:55:13 | 43 | 14MB | | public.t_a | 0 
• You can map some text values to numeric types by using a dimension table. Then, reconcile the values when you need them by joining the dimension table.

### 5. Sorting on big VARCHAR fields¶

In general, SQream DB automatically inserts a Sort node which arranges the data prior to reductions and aggregations.

When running a GROUP BY on large VARCHAR fields, you may see nodes for Sort and Reduce taking a long time.

#### Identifying the situation¶

When running a statement, inspect it with SHOW_NODE_INFO. If you see Sort and Reduce among your top five longest running nodes, there is a potential issue.

For example:

1. Run a query to test it out.

Our t_inefficient table contains 60,000,000 rows, and the structure is simple, but with an oversized country_code column:

CREATE TABLE t_inefficient (
i INT NOT NULL,
amt DOUBLE NOT NULL,
ts DATETIME NOT NULL,
country_code VARCHAR(100) NOT NULL,
flag VARCHAR(10) NOT NULL,
string_fk VARCHAR(50) NOT NULL
);


We will run a query, and inspect it’s execution details:

t=> SELECT country_code,
.          SUM(amt)
.   FROM t_inefficient
.   GROUP BY country_code;
executed
time: 47.55s

country_code | sum
-------------+-----------
VUT          | 1195416012
GIB          | 1195710372
TUR          | 1195946178
[...]

t=> select show_node_info(30);
stmt_id | node_id | node_type          | rows     | chunks | avg_rows_in_chunk | time                | parent_node_id | read  | write | comment              | timeSum
--------+---------+--------------------+----------+--------+-------------------+---------------------+----------------+-------+-------+----------------------+--------
30 |       1 | PushToNetworkQueue |      249 |      1 |               249 | 2020-09-10 16:17:10 |             -1 |       |       |                      |    0.25
30 |       2 | Rechunk            |      249 |      1 |               249 | 2020-09-10 16:17:10 |              1 |       |       |                      |       0
30 |       3 | ReduceMerge        |      249 |      1 |               249 | 2020-09-10 16:17:10 |              2 |       |       |                      |    0.01
30 |       4 | GpuToCpu           |     1508 |     15 |               100 | 2020-09-10 16:17:10 |              3 |       |       |                      |       0
30 |       5 | Reduce             |     1508 |     15 |               100 | 2020-09-10 16:17:10 |              4 |       |       |                      |    7.23
30 |       6 | Sort               | 60000000 |     15 |           4000000 | 2020-09-10 16:17:10 |              5 |       |       |                      |    36.8
30 |       7 | GpuTransform       | 60000000 |     15 |           4000000 | 2020-09-10 16:17:10 |              6 |       |       |                      |    0.08
30 |       8 | GpuDecompress      | 60000000 |     15 |           4000000 | 2020-09-10 16:17:10 |              7 |       |       |                      |    2.01
30 |       9 | CpuToGpu           | 60000000 |     15 |           4000000 | 2020-09-10 16:17:10 |              8 |       |       |                      |    0.16
30 |      10 | Rechunk            | 60000000 |     15 |           4000000 | 2020-09-10 16:17:10 |              9 |       |       |                      |       0
30 |      11 | CpuDecompress      | 60000000 |     15 |           4000000 | 2020-09-10 16:17:10 |             10 |       |       |                      |       0
30 |      12 | ReadTable          | 60000000 |     15 |           4000000 | 2020-09-10 16:17:10 |             11 | 520MB |       | public.t_inefficient |    0.05

2. We can look to see if there’s any shrinking we can do on the GROUP BY key

t=> SELECT MAX(LEN(country_code)) FROM t_inefficient;
max
---
3


With a maximum string length of just 3 characters, our VARCHAR(100) is way oversized.

3. We can recreate the table with a more restrictive VARCHAR(3), and can examine the difference in performance:

t=> CREATE TABLE t_efficient
.     AS SELECT i,
.              amt,
.              ts,
.              country_code::VARCHAR(3) AS country_code,
.              flag
.         FROM t_inefficient;
executed
time: 16.03s

t=> SELECT country_code,
.      SUM(amt::bigint)
.   FROM t_efficient
.   GROUP BY country_code;
executed
time: 4.75s
country_code | sum
-------------+-----------
VUT          | 1195416012
GIB          | 1195710372
TUR          | 1195946178
[...]


This time, the entire query took just 4.75 seconds, or just about 91% faster.

#### Improving sort performance on text keys¶

When using VARCHAR, ensure that the maximum length defined in the table structure is as small as necessary. For example, if you’re storing phone numbers, don’t define the field as VARCHAR(255), as that affects sort performance.

You can run a query to get the maximum column length (e.g. MAX(LEN(a_column))), and potentially modify the table structure.

### 6. High selectivity data¶

Selectivity is the ratio of cardinality to the number of records of a chunk. We define selectivity as $$\frac{\text{Distinct values}}{\text{Total number of records in a chunk}}$$

SQream DB has a hint called HIGH_SELECTIVITY, which is a function you can wrap a condition in.

The hint signals to SQream DB that the result of the condition will be very sparse, and that it should attempt to rechunk the results into fewer, fuller chunks.

Note

SQream DB doesn’t do this automatically because it adds a significant overhead on naturally ordered and well-clustered data, which is the more common scenario.

#### Identifying the situation¶

This is easily identifiable - when the amount of average of rows in a chunk is small, following a Filter operation.

Consider this execution plan:

t=> select show_node_info(30);
stmt_id | node_id | node_type         | rows      | chunks | avg_rows_in_chunk | time                | parent_node_id | read  | write | comment    | timeSum
--------+---------+-------------------+-----------+--------+-------------------+---------------------+----------------+-------+-------+------------+--------
[...]
30 |      38 | Filter            |     18160 |     74 |               245 | 2020-09-10 12:17:09 |             37 |       |       |            |   0.012
[...]
30 |      44 | ReadTable         |  77000000 |     74 |           1040540 | 2020-09-10 12:17:09 |             43 | 277MB |       | public.dim |   0.058


The table was read entirely - 77 million rows into 74 chunks.

The filter node reduced the output to just 18,160 relevant rows, but they’re distributed across the original 74 chunks. All of these rows could fit in one single chunk, instead of spanning 74 rather sparse chunks.

#### Improving performance with high selectivity hints¶

• Use when there’s a WHERE condition on an unclustered column, and when you expect the filter to cut out more than 60% of the result set.

• Use when the data is uniformly distributed or random

### 7. Performance of unsorted data in joins¶

When data is not well-clustered or naturally ordered, a join operation can take a long time.

#### Identifying the situation¶

When running a statement, inspect it with SHOW_NODE_INFO. If you see Join and DeferredGather among your top five longest running nodes, there is a potential issue.

In this case, we’re also interested in the number of chunks produced by these nodes.

Consider this execution plan:

t=> select show_node_info(30);
stmt_id | node_id | node_type         | rows      | chunks | avg_rows_in_chunk | time                | parent_node_id | read  | write | comment    | timeSum
--------+---------+-------------------+-----------+--------+-------------------+---------------------+----------------+-------+-------+------------+--------
[...]
30 |      13 | ReorderInput      | 181582598 |  70596 |              2572 | 2020-09-10 12:17:10 |             12 |       |       |            |   4.681
30 |      14 | DeferredGather    | 181582598 |  70596 |              2572 | 2020-09-10 12:17:10 |             13 |       |       |            |  29.901
30 |      15 | ReorderInput      | 181582598 |  70596 |              2572 | 2020-09-10 12:17:10 |             14 |       |       |            |   3.053
30 |      16 | GpuToCpu          | 181582598 |  70596 |              2572 | 2020-09-10 12:17:10 |             15 |       |       |            |   5.798
30 |      17 | ReorderInput      | 181582598 |  70596 |              2572 | 2020-09-10 12:17:10 |             16 |       |       |            |   2.899
30 |      18 | ReorderInput      | 181582598 |  70596 |              2572 | 2020-09-10 12:17:10 |             17 |       |       |            |   3.695
30 |      19 | Join              | 181582598 |  70596 |              2572 | 2020-09-10 12:17:10 |             18 |       |       | inner      |  22.745
[...]
30 |      38 | Filter            |     18160 |     74 |               245 | 2020-09-10 12:17:09 |             37 |       |       |            |   0.012
[...]
30 |      44 | ReadTable         |  77000000 |     74 |           1040540 | 2020-09-10 12:17:09 |             43 | 277MB |       | public.dim |   0.058

• Join is the node that matches rows from both table relations.

• DeferredGather gathers the required column chunks to decompress

Pay special attention to the volume of data removed by the Filter node. The table was read entirely - 77 million rows into 74 chunks.

The filter node reduced the output to just 18,160 relevant rows, but they’re distributed across the original 74 chunks. All of these rows could fit in one single chunk, instead of spanning 74 rather sparse chunks.

#### Improving join performance when data is sparse¶

You can tell SQream DB to reduce the amount of chunks involved, if you know that the filter is going to be quite agressive by using the HIGH_SELECTIVITY hint described above. This forces the compiler to rechunk the data into fewer chunks.

To tell SQream DB to rechunk the data, wrap a condition (or several) in the HIGH_SELECTIVITY hint:

-- Without the hint
SELECT *
FROM cdrs
WHERE
RequestReceiveTime BETWEEN '2018-01-01 00:00:00.000' AND '2018-08-31 23:59:59.999'
AND EnterpriseID=1150
AND MSISDN='9724871140341';

-- With the hint
SELECT *
FROM cdrs
WHERE
HIGH_SELECTIVITY(RequestReceiveTime BETWEEN '2018-01-01 00:00:00.000' AND '2018-08-31 23:59:59.999')
AND EnterpriseID=1150
AND MSISDN='9724871140341';


### 8. Manual join reordering¶

When joining multiple tables, you may wish to change the join order to join the smallest tables first.

#### Identifying the situation¶

When joining more than two tables, the Join nodes will be the most time-consuming nodes.

#### Changing the join order¶

Always prefer to join the smallest tables first.

Note

We consider small tables to be tables that only retain a small amount of rows after conditions are applied. This bears no direct relation to the amount of total rows in the table.

Changing the join order can reduce the query runtime significantly. In the examples below, we reduce the time from 27.3 seconds to just 6.4 seconds.

Original query
-- This variant runs in 27.3 seconds
SELECT SUM(l_extendedprice / 100.0*(1 - l_discount / 100.0)) AS revenue,
c_nationkey
FROM lineitem --6B Rows, ~183GB

JOIN orders --1.5B Rows, ~55GB
ON   l_orderkey = o_orderkey
JOIN customer --150M Rows, ~12GB
ON   c_custkey = o_custkey

WHERE c_nationkey = 1
AND   o_orderdate >= DATE '1993-01-01'
AND   o_orderdate < '1994-01-01'
AND   l_shipdate >= '1993-01-01'
GROUP BY c_nationkey

Modified query with improved join order
-- This variant runs in 6.4 seconds
SELECT SUM(l_extendedprice / 100.0*(1 - l_discount / 100.0)) AS revenue,
c_nationkey
FROM orders --1.5B Rows, ~55GB

JOIN customer --150M Rows, ~12GB
ON   c_custkey = o_custkey
JOIN lineitem --6B Rows, ~183GB
ON   l_orderkey = o_orderkey

WHERE c_nationkey = 1
AND   o_orderdate >= DATE '1993-01-01'
AND   o_orderdate < '1994-01-01'
AND   l_shipdate >= '1993-01-01'