1.1 billion taxi rides: 108-core ClickHouse cluster

A translation of the article was prepared specifically for students of the Data Engineer course.










ClickHouse is an open source column database. This is a great environment where hundreds of analysts can quickly request detailed data, even when tens of billions of new records are entered per day. The cost of infrastructure to support such a system can reach 100 thousand US dollars per year, and potentially half as much, depending on use. At some point, the Yandex.Metrica ClickHouse installation contained 10 trillion entries. In addition to Yandex, ClickHouse also gained success with Bloomberg and Cloudflare.



Two years ago, I did a comparative analysis of databases using a single machine, and it became the fastest free database software I have ever seen. Since then, developers have not stopped adding features, including support for Kafka, HDFS and ZStandard compression. Last year, they added support for cascading compression methods, and delta-delta encoding became possible. When compressing time series data, gauge values ​​can be well compressed using delta coding, but it will be better to use delta-delta coding for counters. Good compression has become the key to ClickHouse performance.



ClickHouse consists of 170 thousand lines of C ++ code, with the exception of third-party libraries, and is one of the smallest code bases for distributed databases. For comparison, SQLite does not support distribution and consists of 235 thousand lines of code in the C language. At the time of this writing, 207 engineers have contributed to ClickHouse, and the intensity of commits has recently been increasing.



In March 2017, ClickHouse started a change log as an easy way to track development. They also split the monolithic documentation file into a Markdown-based file hierarchy. Problems and features are tracked through GitHub, and overall, this software has become much more accessible in the last few years.



In this article, I am going to take a look at ClickHouse cluster performance on AWS EC2 using 36-core processors and an NVMe drive.

UPDATE: A week after the initial publication of this post, I re-run the test with an improved configuration and achieved much better results. This post has been updated to reflect these changes.

Starting an AWS EC2 Cluster



I will use three instances of c5d.9xlarge EC2 for this post. Each of them contains 36 virtual CPUs, 72 GB of RAM, 900 GB of NVMe SSD and supports a 10-gigabit network. They cost $ 1,962 / hour each in the eu-west-1 region when launched on demand. I will use Ubuntu Server 16.04 LTS as the operating system.



The firewall is configured so that each machine can communicate with each other without restrictions, and only my IPv4 address is whitelisted in the SSH cluster.



Standby NVMe



For ClickHouse to work, I will create an EXT4 file system on each server in the NVMe drive.



$ sudo mkfs -t ext4 /dev/nvme1n1 $ sudo mkdir /ch $ sudo mount /dev/nvme1n1 /ch
      
      





After everything is configured, you can see the mount point and 783 GB of space available in each of the systems.



 $ lsblk
      
      





 NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT loop0 7:0 0 87.9M 1 loop /snap/core/5742 loop1 7:1 0 16.5M 1 loop /snap/amazon-ssm-agent/784 nvme0n1 259:1 0 8G 0 disk └─nvme0n1p1 259:2 0 8G 0 part / nvme1n1 259:0 0 838.2G 0 disk /ch
      
      





 $ df -h
      
      





 Filesystem Size Used Avail Use% Mounted on udev 35G 0 35G 0% /dev tmpfs 6.9G 8.8M 6.9G 1% /run /dev/nvme0n1p1 7.7G 967M 6.8G 13% / tmpfs 35G 0 35G 0% /dev/shm tmpfs 5.0M 0 5.0M 0% /run/lock tmpfs 35G 0 35G 0% /sys/fs/cgroup /dev/loop0 88M 88M 0 100% /snap/core/5742 /dev/loop1 17M 17M 0 100% /snap/amazon-ssm-agent/784 tmpfs 6.9G 0 6.9G 0% /run/user/1000 /dev/nvme1n1 825G 73M 783G 1% /ch
      
      





The data set that I will use in this test is a data dump that I generated from 1.1 billion taxi rides made in New York in six years. The Billion Taxi blog on Redshift details how I gathered this data set. They are stored in AWS S3, so I will configure the AWS command line interface using my access and private keys.



 $ sudo apt update $ sudo apt install awscli $ aws configure
      
      





I will set the limit on the number of simultaneous client requests to 100 so that files load faster than with standard settings.



 $ aws configure set \ default.s3.max_concurrent_requests \ 100
      
      





I will download the taxi ride dataset from AWS S3 and save it on the NVMe drive on the first server. This dataset is ~ 104 GB in GZIP-compressed CSV format.



 $ sudo mkdir -p /ch/csv $ sudo chown -R ubuntu /ch/csv $ aws s3 sync s3://<bucket>/csv /ch/csv
      
      





Install ClickHouse



I will install the OpenJDK distribution for Java 8, as it is required to run Apache ZooKeeper, which is necessary for the distributed installation of ClickHouse on all three machines.



 $ sudo apt update $ sudo apt install \ openjdk-8-jre \ openjdk-8-jdk-headless
      
      





Then I set the JAVA_HOME



environment variable.



 $ sudo vi /etc/profile export JAVA_HOME=/usr $ source /etc/profile
      
      





Then I will use the package management system in Ubuntu to install ClickHouse 18.16.1, glances and ZooKeeper on all three machines.



 $ sudo apt-key adv \ --keyserver hkp://keyserver.ubuntu.com:80 \ --recv E0C56BD4 $ echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | \ sudo tee /etc/apt/sources.list.d/clickhouse.list $ sudo apt-get update
      
      





 $ sudo apt install \ clickhouse-client \ clickhouse-server \ glances \ zookeeperd
      
      





I will create a directory for ClickHouse and also make some configuration overrides on all three servers.



 $ sudo mkdir /ch/clickhouse $ sudo chown -R clickhouse /ch/clickhouse $ sudo mkdir -p /etc/clickhouse-server/conf.d $ sudo vi /etc/clickhouse-server/conf.d/taxis.conf
      
      





These are the configuration overrides that I will use.



 <?xml version="1.0"?> <yandex> <listen_host>0.0.0.0</listen_host> <path>/ch/clickhouse/</path>
      
      





  <remote_servers> <perftest_3shards> <shard> <replica> <host>172.30.2.192</host> <port>9000</port> </replica> </shard> <shard> <replica> <host>172.30.2.162</host> <port>9000</port> </replica> </shard> <shard> <replica> <host>172.30.2.36</host> <port>9000</port> </replica> </shard> </perftest_3shards> </remote_servers>
      
      





  <zookeeper-servers> <node> <host>172.30.2.192</host> <port>2181</port> </node> <node> <host>172.30.2.162</host> <port>2181</port> </node> <node> <host>172.30.2.36</host> <port>2181</port> </node> </zookeeper-servers>
      
      





  <macros> <shard>03</shard> <replica>01</replica> </macros> </yandex>
      
      





Then I will launch ZooKeeper and the ClickHouse server on all three machines.



 $ sudo /etc/init.d/zookeeper start $ sudo service clickhouse-server start
      
      





Loading data into ClickHouse



On the first server I will create a trips table, which will store a data set of taxi rides using the Log engine.



 $ clickhouse-client --host=0.0.0.0 CREATE TABLE trips ( trip_id UInt32, vendor_id String, pickup_datetime DateTime, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = Log;
      
      





Then I unpack and load each of the CSV files into a trips table. The following is done in 55 minutes and 10 seconds. After this operation, the data directory size was 134 GB.



 $ time (for FILENAME in /ch/csv/trips_x*.csv.gz; do echo $FILENAME gunzip -c $FILENAME | \ clickhouse-client \ --host=0.0.0.0 \ --query="INSERT INTO trips FORMAT CSV" done)
      
      





The import speed was 155 MB of uncompressed CSV content per second. I suspect this was due to a bottleneck in GZIP decompression. It might have been faster to unzip all the gzip files in parallel using xargs and then download the unzipped data. The following is a description of what was reported during the CSV import process.



 $ sudo glances
      
      





 ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 0:11:42 CPU 8.2% nice: 0.0% LOAD 36-core MEM 9.8% active: 5.20G SWAP 0.0% user: 6.0% irq: 0.0% 1 min: 2.24 total: 68.7G inactive: 61.0G total: 0 system: 0.9% iowait: 1.3% 5 min: 1.83 used: 6.71G buffers: 66.4M used: 0 idle: 91.8% steal: 0.0% 15 min: 1.01 free: 62.0G cached: 61.6G free: 0 NETWORK Rx/s Tx/s TASKS 370 (507 thr), 2 run, 368 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 136b 2Kb lo 343Mb 343Mb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 100.4 1.5 1.65G 1.06G 9909 ubuntu 0 S 1:01.33 0 0 clickhouse-client --host=0.0.0.0 --query=INSERT INTO trips FORMAT CSV DISK I/OR/s W/s 85.1 0.0 4.65M 708K 9908 ubuntu 0 R 0:50.60 32M 0 gzip -d -c /ch/csv/trips_xac.csv.gz loop0 0 0 54.9 5.1 8.14G 3.49G 8091 clickhous 0 S 1:44.23 0 45M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml loop1 0 0 4.5 0.0 0 0 319 root 0 S 0:07.50 1K 0 kworker/u72:2 nvme0n1 0 3K 2.3 0.0 91.1M 28.9M 9912 root 0 R 0:01.56 0 0 /usr/bin/python3 /usr/bin/glances nvme0n1p1 0 3K 0.3 0.0 0 0 960 root -20 S 0:00.10 0 0 kworker/28:1H nvme1n1 32.1M 495M 0.3 0.0 0 0 1058 root -20 S 0:00.90 0 0 kworker/23:1H
      
      





I will free up space on the NVMe drive by deleting the source CSV files before continuing.



 $ sudo rm -fr /ch/csv
      
      





Convert to Column Form



The Log ClickHouse engine will store data in a row-oriented format. To request data faster, I convert it to a column format using the MergeTree engine.



 $ clickhouse-client --host=0.0.0.0
      
      





The following is done in 34 minutes and 50 seconds. After this operation, the data directory size was 237 GB.



 CREATE TABLE trips_mergetree ENGINE = MergeTree(pickup_date, pickup_datetime, 8192) AS SELECT trip_id, CAST(vendor_id AS Enum8('1' = 1, '2' = 2, 'CMT' = 3, 'VTS' = 4, 'DDS' = 5, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14)) AS vendor_id, toDate(pickup_datetime) AS pickup_date, ifNull(pickup_datetime, toDateTime(0)) AS pickup_datetime, toDate(dropoff_datetime) AS dropoff_date, ifNull(dropoff_datetime, toDateTime(0)) AS dropoff_datetime, assumeNotNull(store_and_fwd_flag) AS store_and_fwd_flag, assumeNotNull(rate_code_id) AS rate_code_id, assumeNotNull(pickup_longitude) AS pickup_longitude, assumeNotNull(pickup_latitude) AS pickup_latitude, assumeNotNull(dropoff_longitude) AS dropoff_longitude, assumeNotNull(dropoff_latitude) AS dropoff_latitude, assumeNotNull(passenger_count) AS passenger_count, assumeNotNull(trip_distance) AS trip_distance, assumeNotNull(fare_amount) AS fare_amount, assumeNotNull(extra) AS extra, assumeNotNull(mta_tax) AS mta_tax, assumeNotNull(tip_amount) AS tip_amount, assumeNotNull(tolls_amount) AS tolls_amount, assumeNotNull(ehail_fee) AS ehail_fee, assumeNotNull(improvement_surcharge) AS improvement_surcharge, assumeNotNull(total_amount) AS total_amount, assumeNotNull(payment_type) AS payment_type_, assumeNotNull(trip_type) AS trip_type, pickup AS pickup, pickup AS dropoff, CAST(assumeNotNull(cab_type) AS Enum8('yellow' = 1, 'green' = 2)) AS cab_type, precipitation AS precipitation, snow_depth AS snow_depth, snowfall AS snowfall, max_temperature AS max_temperature, min_temperature AS min_temperature, average_wind_speed AS average_wind_speed, pickup_nyct2010_gid AS pickup_nyct2010_gid, pickup_ctlabel AS pickup_ctlabel, pickup_borocode AS pickup_borocode, pickup_boroname AS pickup_boroname, pickup_ct2010 AS pickup_ct2010, pickup_boroct2010 AS pickup_boroct2010, pickup_cdeligibil AS pickup_cdeligibil, pickup_ntacode AS pickup_ntacode, pickup_ntaname AS pickup_ntaname, pickup_puma AS pickup_puma, dropoff_nyct2010_gid AS dropoff_nyct2010_gid, dropoff_ctlabel AS dropoff_ctlabel, dropoff_borocode AS dropoff_borocode, dropoff_boroname AS dropoff_boroname, dropoff_ct2010 AS dropoff_ct2010, dropoff_boroct2010 AS dropoff_boroct2010, dropoff_cdeligibil AS dropoff_cdeligibil, dropoff_ntacode AS dropoff_ntacode, dropoff_ntaname AS dropoff_ntaname, dropoff_puma AS dropoff_puma FROM trips;
      
      





Here's what the glance output looked like during the operation:



 ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 1:06:09 CPU 10.3% nice: 0.0% LOAD 36-core MEM 16.1% active: 13.3G SWAP 0.0% user: 7.9% irq: 0.0% 1 min: 1.87 total: 68.7G inactive: 52.8G total: 0 system: 1.6% iowait: 0.8% 5 min: 1.76 used: 11.1G buffers: 71.8M used: 0 idle: 89.7% steal: 0.0% 15 min: 1.95 free: 57.6G cached: 57.2G free: 0 NETWORK Rx/s Tx/s TASKS 367 (523 thr), 1 run, 366 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 1Kb 8Kb lo 2Kb 2Kb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 241.9 12.8 20.7G 8.78G 8091 clickhous 0 S 30:36.73 34M 125M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml DISK I/OR/s W/s 2.6 0.0 90.4M 28.3M 9948 root 0 R 1:18.53 0 0 /usr/bin/python3 /usr/bin/glances loop0 0 0 1.3 0.0 0 0 203 root 0 S 0:09.82 0 0 kswapd0 loop1 0 0 0.3 0.1 315M 61.3M 15701 ubuntu 0 S 0:00.40 0 0 clickhouse-client --host=0.0.0.0 nvme0n1 0 3K 0.3 0.0 0 0 7 root 0 S 0:00.83 0 0 rcu_sched nvme0n1p1 0 3K 0.0 0.0 0 0 142 root 0 S 0:00.22 0 0 migration/27 nvme1n1 25.8M 330M 0.0 0.0 59.7M 1.79M 2764 ubuntu 0 S 0:00.00 0 0 (sd-pam)
      
      





In the last test, several columns were converted and recounted. I found that some of these functions no longer work properly in this dataset. To solve this problem, I removed inappropriate functions and loaded the data without conversion to more detailed types.



Cluster data distribution



I will distribute data across all three nodes of the cluster. To begin with, I will create a table on all three machines.



 $ clickhouse-client --host=0.0.0.0
      
      





 CREATE TABLE trips_mergetree_third ( trip_id UInt32, vendor_id String, pickup_date Date, pickup_datetime DateTime, dropoff_date Date, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = MergeTree(pickup_date, pickup_datetime, 8192);
      
      





Then I will make sure that the first server can see all three nodes in the cluster.



 SELECT * FROM system.clusters WHERE cluster = 'perftest_3shards' FORMAT Vertical;
      
      







 Row 1: ────── cluster: perftest_3shards shard_num: 1 shard_weight: 1 replica_num: 1 host_name: 172.30.2.192 host_address: 172.30.2.192 port: 9000 is_local: 1 user: default default_database:
      
      







 Row 2: ────── cluster: perftest_3shards shard_num: 2 shard_weight: 1 replica_num: 1 host_name: 172.30.2.162 host_address: 172.30.2.162 port: 9000 is_local: 0 user: default default_database:
      
      





 Row 3: ────── cluster: perftest_3shards shard_num: 3 shard_weight: 1 replica_num: 1 host_name: 172.30.2.36 host_address: 172.30.2.36 port: 9000 is_local: 0 user: default default_database:
      
      





Then I will define a new table on the first server, which is based on the trips_mergetree_third



and uses the Distributed engine.



 CREATE TABLE trips_mergetree_x3 AS trips_mergetree_third ENGINE = Distributed(perftest_3shards, default, trips_mergetree_third, rand());
      
      





Then I will copy the data from the table based on MergeTree to all three servers. The following is done in 34 minutes and 44 seconds.



 INSERT INTO trips_mergetree_x3 SELECT * FROM trips_mergetree;
      
      





After the above operation, I gave ClickHouse 15 minutes to move away from the maximum storage level mark. The data directories ended up being 264 GB, 34 GB, and 33 GB, respectively, on each of the three servers.



ClickHouse Cluster Performance Assessment



What I saw next was the fastest time that I saw when executing each query multiple times in the trips_mergetree_x3



table.



 $ clickhouse-client --host=0.0.0.0
      
      





The following completed in 2.449 seconds.



 SELECT cab_type, count(*) FROM trips_mergetree_x3 GROUP BY cab_type;
      
      





The following is completed in 0.691 seconds.



 SELECT passenger_count, avg(total_amount) FROM trips_mergetree_x3 GROUP BY passenger_count;
      
      





The following completed in 0. 582 seconds.



 SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year;
      
      





The following is completed in 0.983 seconds.



 SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
      
      





For comparison, I performed the same queries in a table based on MergeTree, which is located exclusively on the first server.



Performance assessment of one ClickHouse node



What I saw next was the fastest time that I saw when executing each query multiple times in the trips_mergetree_x3



table.



The following is completed in 0.241 seconds.



 SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type;
      
      





The following is completed in 0.826 seconds.



 SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count;
      
      





The following is completed in 1.209 seconds.



 SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year;
      
      





The following completed in 1.781 seconds.



 SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
      
      





Reflections on the results



This is the first time a free processor-based database has been able to outperform the GPU database in my tests. That GPU-based database has undergone two revisions since then, but, nevertheless, the performance that ClickHouse showed on one node is very impressive.



At the same time, when running Query 1 on a distributed engine, the overhead is an order of magnitude higher. I hope I missed something in my research for this post, because it would be nice to see how the query time decreases when I add more nodes to the cluster. However, it is remarkable that when performing other queries, productivity increased by about 2 times.



It would be nice if ClickHouse evolved so that it would be possible to separate storage and computing so that they could scale independently. HDFS support, which was added last year, could be a step towards this. As for computing, if a single request can be accelerated by adding more nodes to the cluster, then the future of this software will be very bright.



Thanks for taking the time to read this post. I offer consulting, architecture, and hands-on development services for clients in North America and Europe. If you want to discuss how my suggestions can help your business, contact me via LinkedIn .



All Articles