New York Taxi Data
- import from raw data
- download of prepared partitions
See https://github.com/toddwschneider/nyc-taxi-data and for the description of a dataset and instructions for downloading.
Downloading will result in about 227 GB of uncompressed data in CSV files. The download takes about an hour over a 1 Gbit connection (parallel downloading from s3.amazonaws.com recovers at least half of a 1 Gbit channel).
Some of the files might not download fully. Check the file sizes and re-download any that seem doubtful.
Some of the files might contain invalid rows. You can fix them as follows:
Then the data must be pre-processed in PostgreSQL. This will create selections of points in the polygons (to match points on the map with the boroughs of New York City) and combine all the data into a single denormalized flat table by using a JOIN. To do this, you will need to install PostgreSQL with PostGIS support.
Be careful when running and manually re-check that all the tables were created correctly.
It takes about 20-30 minutes to process each month’s worth of data in PostgreSQL, for a total of about 48 hours.
You can check the number of downloaded rows as follows:
$ time psql nyc-taxi-data -c "SELECT count(*) FROM trips;"
## Count
1298979494
(1 row)
real 7m9.164s
(This is slightly more than 1.1 billion rows reported by Mark Litwintschik in a series of blog posts.)
The data in PostgreSQL uses 370 GB of space.
Exporting the data from PostgreSQL:
COPY
(
SELECT trips.id,
trips.vendor_id,
trips.pickup_datetime,
trips.dropoff_datetime,
trips.store_and_fwd_flag,
trips.rate_code_id,
trips.pickup_longitude,
trips.pickup_latitude,
trips.dropoff_longitude,
trips.dropoff_latitude,
trips.passenger_count,
trips.trip_distance,
trips.fare_amount,
trips.extra,
trips.mta_tax,
trips.tip_amount,
trips.tolls_amount,
trips.ehail_fee,
trips.improvement_surcharge,
trips.total_amount,
trips.payment_type,
trips.trip_type,
trips.pickup,
trips.dropoff,
cab_types.type cab_type,
weather.precipitation_tenths_of_mm rain,
weather.snow_depth_mm,
weather.snowfall_mm,
weather.max_temperature_tenths_degrees_celsius max_temp,
weather.min_temperature_tenths_degrees_celsius min_temp,
weather.average_wind_speed_tenths_of_meters_per_second wind,
pick_up.gid pickup_nyct2010_gid,
pick_up.ctlabel pickup_ctlabel,
pick_up.boroname pickup_boroname,
pick_up.ct2010 pickup_ct2010,
pick_up.boroct2010 pickup_boroct2010,
pick_up.cdeligibil pickup_cdeligibil,
pick_up.ntaname pickup_ntaname,
pick_up.puma pickup_puma,
drop_off.gid dropoff_nyct2010_gid,
drop_off.ctlabel dropoff_ctlabel,
drop_off.borocode dropoff_borocode,
drop_off.boroname dropoff_boroname,
drop_off.ct2010 dropoff_ct2010,
drop_off.boroct2010 dropoff_boroct2010,
drop_off.cdeligibil dropoff_cdeligibil,
drop_off.ntacode dropoff_ntacode,
drop_off.ntaname dropoff_ntaname,
drop_off.puma dropoff_puma
FROM trips
LEFT JOIN cab_types
ON trips.cab_type_id = cab_types.id
LEFT JOIN central_park_weather_observations_raw weather
ON weather.date = trips.pickup_datetime::date
LEFT JOIN nyct2010 pick_up
ON pick_up.gid = trips.pickup_nyct2010_gid
LEFT JOIN nyct2010 drop_off
ON drop_off.gid = trips.dropoff_nyct2010_gid
) TO '/opt/milovidov/nyc-taxi-data/trips.tsv';
The data snapshot is created at a speed of about 50 MB per second. While creating the snapshot, PostgreSQL reads from the disk at a speed of about 28 MB per second.
This takes about 5 hours. The resulting TSV file is 590612904969 bytes.
Create a temporary table in ClickHouse:
CREATE TABLE trips
(
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
precipitation Nullable(UInt8),
snow_depth Nullable(UInt8),
snowfall Nullable(UInt8),
max_temperature Nullable(UInt8),
min_temperature Nullable(UInt8),
average_wind_speed Nullable(UInt8),
pickup_nyct2010_gid Nullable(UInt8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(UInt8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(String),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = Log;
It is needed for converting fields to more correct data types and, if possible, to eliminate NULLs.
$ time clickhouse-client --query="INSERT INTO trips FORMAT TabSeparated" < trips.tsv
real 75m56.214s
Data is read at a speed of 112-140 Mb/second.
Loading data into a Log type table in one stream took 76 minutes.
The data in this table uses 142 GB.
Unfortunately, all the fields associated with the weather (precipitation…average_wind_speed) were filled with NULL. Because of this, we will remove them from the final data set.
To start, we’ll create a table on a single server. Later we will make the table distributed.
Create and populate a summary table:
This takes 3030 seconds at a speed of about 428,000 rows per second.
To load it faster, you can create the table with the Log
engine instead of MergeTree
. In this case, the download works faster than 200 seconds.
The table uses 126 GB of disk space.
SELECT formatReadableSize(sum(bytes)) FROM system.parts WHERE table = 'trips_mergetree' AND active
┌─formatReadableSize(sum(bytes))─┐
│ 126.18 GiB │
└────────────────────────────────┘
Among other things, you can run the OPTIMIZE query on MergeTree. But it’s not required since everything will be fine without it.
$ curl -O https://datasets.clickhouse.tech/trips_mergetree/partitions/trips_mergetree.tar
$ tar xvf trips_mergetree.tar -C /var/lib/clickhouse # path to ClickHouse data directory
$ # check permissions of unpacked data, fix if required
$ sudo service clickhouse-server restart
$ clickhouse-client --query "select count(*) from datasets.trips_mergetree"
Info
If you will run the queries described below, you have to use the full table name, datasets.trips_mergetree
.
Q1:
SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type
0.490 seconds.
Q2:
1.224 seconds.
Q3:
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year
2.104 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*)
FROM trips_mergetree
GROUP BY passenger_count, year, distance
ORDER BY year, count(*) DESC
3.593 seconds.
The following server was used:
Two Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz, 16 physical kernels total,128 GiB RAM,8x6 TB HD on hardware RAID-5
Execution time is the best of three runs. But starting from the second run, queries read data from the file system cache. No further caching occurs: the data is read out and processed in each run.
Creating a table on three servers:
On each server:
On the source server:
The following query redistributes data:
This takes 2454 seconds.
On three servers:
Q1: 0.212 seconds.
Q2: 0.438 seconds.
Q3: 0.733 seconds.
Q4: 1.241 seconds.
No surprises here, since the queries are scaled linearly.
We also have the results from a cluster of 140 servers:
Q1: 0.028 sec.
Q2: 0.043 sec.
Q3: 0.051 sec.
Q4: 0.072 sec.