Living With Ass Pain, A ClickHouse Story

ClickHouse offers a friendly, unassuming, first impression. With it’s compatibility with MYSQL’s wire protocol, many may think that it should be easy to integrate with your existing system. With it’s promises of speed, it may seem like like a compelling option for your database application.

However, doing even the most simple thing in ClickHouse becomes a mess incredibly quickly, and the speed advantages often come at the cost of data accuracy. Programmers are often left out in the cold to design/redesign systems specifically to play nice with ClickHouse, which operates on a paradigm that relatively few are familiar with.

ClickHouse is what’s known as a “column based database”. You may scratch your head and think to yourself… “well.. my database has columns! What on earth do you mean?!”

Well, your typical MYSQL database is a “row based database”, it makes adding rows, deleting rows, easy, but when you want to add a column to the table, you have to do a very expensive restructuring operation that touches the entire table.

Now if you can imagine… with a “row based database” you’re essentially taking that table and rotating it 90 degrees on disk. Now, adding and deleting a column is a piece of cake, but altering rows is the super-expensive part.

WTF Right!?

Why would I want to do that? Well… as long as you’re spelunking through historical, archival data that never or rarely changes, there are some pretty serious performance advantages for organizing your data in a column-based database. The most potent advantage is that when you’re searching for a record in the database, all the data that you’re searching through is mashed tightly together on the disk, making search times lightning speed. In a traditional database, there might be hundreds or even thousands of bytes in between subsequent column values on the disk.

Since it would be completely impractical to restructure the entire table every time you add a record to the database, ClickHouse is very heavy on “partitioning”… which is an internal organizational concept where what you think is one table is actually a whole bunch of smaller tables stitched together. Aggregate data in ClickHouse gets dumped onto disk in new sets of files… and when you query that data, the partitions aren’t really aware of each other. Clickhouse prefers speed to accuracy, so unless you specifically do things to prevent it, you might get duplicate records coming from multiple partitions. If you don’t carefully plan your attack, you might have… actually you’ll probably have… a lot of troubles with duplicates. Getting rid of them can be seriously difficult and annoying and potentially slow, depending on your application.

I was originally pulled into ClickHouse to rescue a system that was plagued with duplicate records. The coder in charge of this database had put in his notice and was soon leaving the company, it was on me to take it on. Coming into it with zero prior knowledge of his system nor ClickHouse… removing the duplicate records required a lot of research and a lot of data restructuring, reimporting, experimentation, and failure.

So I took some notes of things that caught me off guard and this blog is intended mostly as tips to myself and colleagues (because nobody else really reads it).

ClickHouse Tips

I’m not going to offer any cluster setup tips, mainly because I’ve forgotten just about all of it at this point. But I will say that I remember getting the required “ZooKeeper” server configured properly to be a huge pain in the butt… but I eventually got it working and somewhere around here I have some automated scripts I built to take care of the difficult parts. The internet is full of blogs about the setup stages… try one of the other ones.

Creating Distributed Tables

When you create a “distributed” table, it needs “node” tables to reference. A distributed table across a cluster does not specify a storage engine, you have to create the “node” tables on each node (which can be automated with the ON CLUSTER clause).

Each are created independently, so you’ll need two CREATE TABLE ON CLUSTER calls to create one distributed table. We typically create distributed tables with a “dist_” prefix so… you could create a “users” table on each node and then “dist_users” that ties the nodes together.

Each table will have different ENGINE clauses. Here’s an example of the “node” table. The “on cluster” keyword broadcasts this query to each node, so you don’t have to run it individually on each node.. but effectively, it is the same as running it on each node without the “on cluster” clause.

CREATE TABLE db1.shipment_header ON CLUSTER fuck (
DATE Date,
PO1_ID UInt32,
PO1_NUMBER FixedString(15),
CO1_ID UInt32,
VD1_ID UInt32,
SH1_TRACKING_NUMBER FixedString(40),
SH1_CREATED_ON DateTime,
SH1_CREATED_BY UInt32,
SH1_UPLOAD_TIMESTAMP UInt64,
SH1_INTERNAL_ID UInt64,
SH1_DELETE_FLAG UInt8
) ENGINE = ReplacingMergeTree(DATE, (PO1_ID, SH1_INTERNAL_ID), 8192);

So now,… here is the distributed table CREATE statement , which defines the sharding scheme in it’s ENGINE clause. The engine simply refers to the other tables… this engine does not directly store any data to disk.

CREATE TABLE db1.dist_shipment_header ON CLUSTER fuck (
DATE Date, 
PO1_ID UInt32, 
PO1_NUMBER FixedString(15), 
CO1_ID UInt32, 
VD1_ID UInt32, 
SH1_TRACKING_NUMBER FixedString(40), 
SH1_CREATED_ON DateTime, 
SH1_CREATED_BY UInt32, 
SH1_UPLOAD_TIMESTAMP UInt64, 
SH1_INTERNAL_ID UInt64, 
SH1_DELETE_FLAG UInt8
) ENGINE = Distributed(
fuck, db1, shipment_header_temp, CO1_ID
);

It might throw you off a bit to know that you can create the distributed table definition regardless of whether the node tables exist. You don’t get any error feedback until you try to run a query against the distributed table.

Also, super important, when distributing data across nodes, it is important that two records that need to be considered unique do not end up on different nodes. So “partition by random” is a big no-no. To keep it simple, I’d honestly partition things by some big IDs… like CompanyID or AccountID if possible. At this point you’ve already got two layers of CREATE statements that have to be absolutely perfect before you start inserting data… if you end up using views, you’ll have 3 layers that have be to perfect before you insert ANY data…. If you screw up… you have to reimport your entire data set. If you are really desperate, you can try adding “FINAL” to your select queries… but code changes of this nature might not always been possible, particularly if they are wide-spread. Also Clickhouse will become no faster than MYSQL if you have to run “FINAL” on every query.

VIEWS

VIEWS are a whole other can of worms. We used views, but they were kinda just copies of the tables… not sure they were required, but we didn’t want to remove them as it would break code. One reason we might have used views was because you can’t call “OPTIMIZE TABLE” on a distributed table, it has to be done on each node individually.

The most important thing to note is that views are implemented much differently than traditional databases. They are not views into existing data, but instead are populated from INSERT events in their source tables. It is really super important to note that when I say “INSERT events” … I mean, JUST INSERT EVENTS. Delete and Update events are not supported. Also if you need to recreate or change the structure of the view, you cannot repopulate the data without completely reimporting the data into the source tables entirely… a big pain in the ass… because maybe you have multiple views into that data! Maybe you’re working with a live production emergency that only affects one view and you don’t want to have to wipe all the views. Unfortunately… you have to wipe all the views and start the data import over into the source tables completely.

CREATE MATERIALIZED VIEW db1.v_dist_shipment_header
(
DATE Date,
PO1_ID UInt32,
PO1_NUMBER FixedString(15),
CO1_ID UInt32,
VD1_ID UInt32,
SH1_TRACKING_NUMBER FixedString(40),
SH1_CREATED_ON DateTime,
SH1_CREATED_BY UInt32,
SH1_UPLOAD_TIMESTAMP UInt64,
SH1_INTERNAL_ID UInt64,
SH1_DELETE_FLAG UInt8
)
ENGINE = ReplicatedReplacingMergeTree(
	'/clickhouse/db1/tables/v_dist_shipment_header', '{replica}', DATE, (PO1_ID, SH1_INTERNAL_ID), 8192, SH1_UPLOAD_TIMESTAMP
)

Removing Duplicate Records, if it is Even Possible
Unless you want 50 copies of the same record, setup your tables as a ReplacingMergeTree. This gives you a chance-in-hell of not seeing duplicates, but in the world of ClickHouse, nothing is guaranteed.

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = ReplacingMergeTree([ver])
[PARTITION BY expr]
[ORDER BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]

In the above documentation the [ver] parameter specifies a field that you can use to indicate the record version (could be a timestamp, for example)…the highest one wins.

Don’t intuitively think that this is where the unique key is specified, because that part is unintuitively specified in the ORDER BY clause (*facepalm*).

Also duplicates aren’t automatically removed by ReplacingMergeTree, you have to call OPTIMIZE TABLE (also read the next paragraph) to do so. Just to make things extra inconvenient, you can only do this on the individual node tables, not the distributed table… so if you want your system to do this automatically, you’ll have to connect to every node in your cluster individually.

Also don’t intuitively think that OPTIMIZE TABLE will actually optimize the table… because it only does this if there are multiple partitions in the table (as if you even know the answer to this). So call OPTIMIZE TABLE … FINAL instead which forces optimizations on single-partition tables as well.

In our configuration, we used a distributed tables, then built a VIEW into those tables. Removing duplicate records was… well… hard…especially given the limitations of the legacy code built on the Yii framework. Lots of trial and error. We ended up having the VIEWs live on the individual nodes… since the views are built with triggers, each node has a mirror of the other, so which node we connect to doesn’t matter. So you get a bit better read-scaling there, but really we did it because having the new on a single node allowed us to run OPTIMIZE table on just one node. Our ORM engine (Yii) made it difficult for us to tag “FINAL” on the end of all the clickhouse queries, else we might have opted for that route.

Anyway… I’m just a beginner in this messy ClickHouse world and no guru just yet. I’m not sure after seeing how messy this is that I really even want to be a ClickHouse guru… but hopefully by documenting my struggles in dealing with ClickHouse, I can prevent hairloss in the next person that tries to take this one.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.