How to merge large tables in ClickHouse using join

One case that needs attention in ClickHouse is when we need to merge data from different tables horizontally using a join on a certain key column. Suppose, we have two tables - events and errors. Both of them have the event_id key and we would like to join them to the resulting table with all columns from both source tables:

Merging 2 tables in ClickHouse

OvercommitTracker problem

Wait, why can’t we just join those on the fly?

SELECT count(*)
FROM events e JOIN errors r
ON (r.event_id = e.event_id)
WHERE label = 'payment' and error = 'out-of-money'

Well, yes, but joining large tables can lead to consuming a lot of memory, and the following error might occur:

Code: 241. DB::Exception: Received from localhost:9000.
DB::Exception: Memory limit (total) exceeded: 
would use 8.39 GiB (attempt to allocate chunk of 2147614720 bytes),
maximum: 6.99 GiB. OvercommitTracker decision:
Query was selected to stop by OvercommitTracker.: While executing JoiningTransform. (MEMORY_LIMIT_EXCEEDED) (query: ...)

This happens, because ClickHouse has OvercommitTracker that decides to stop further execution of the query that’s trying to use more memory than we have (can be also configured).

Joining large tables by parts

Since we usually deal with large tables in ClickHouse, we might meet that error once in a while.

A practical approach to this problem is to do several smaller joins instead of the single big one:

  1. Define a total range (min and max values) of the key column (the one we join on, event_id in our case).
  2. Split that range into multiple parts, so we can join separate parts instead of entire tables. The number of parts should be picked so that there’s enough RAM to execute join for each individual part (so might vary from tens to hundreds or even thousands).
  3. Iterate through all parts to build the resulting table.
Merging large tables by parts in ClickHouse

Example PHP code to join our tables by parts:

$table_left = 'events';
$table_right = 'errors';
$bulk = 1000000;

$min_max = clickhousy::row('SELECT min(id) min, max(id) max FROM ' . $table_left);
$offset = $min_max['min'];
$max = $min_max['max'];

do {
  $limit = $offset + $bulk;
  $sql =  "INSERT INTO data " .
          "SELECT event_id, date, label, error FROM ". 
          "(SELECT * FROM {$table_left} WHERE event_id >= {$offset} and event_id < {$limit}) as t1 " .
          "LEFT JOIN (SELECT * FROM {$table_right} WHERE event_id >= {$offset} and event_id < {$limit}) as t2 " .
          "ON (t1.event_id = t2.event_id)";
  echo "{$offset}
";
  passthru('clickhouse-client --progress -q ' . escapeshellarg($sql));
  $offset = $limit;
} while ( $offset < $max );

Things to note

First of all, mind efficiency of filtering queries. It’s good if event_id can be used as an index. If it’s not, filtering might take a lot of time. Consider using columns that can leverage sorting key.

We assumed our event_id column is numeric and its values are distributed more or less sequentially. But in practice, we might have uuid, other types, or even multiple columns to join on. In this case, consider using a preliminary query with ORDER BY and LIMIT OFFSET to get filtering ranges:

SELECT col1 FROM source_table ORDER BY col1 LIMIT 1000000, 1

Further reading

Published 2 years ago in #data about #clickhouse by Denys Golotiuk

Edit this article on Github
Denys Golotiuk · golotyuk@gmail.com · my github