Timeseries Data

    To deal with this workload, a single-node PostgreSQL database would typically use table partitioning to break a big table of time-ordered data into multiple inherited tables with each containing different time ranges.

    Storing data in multiple physical tables speeds up data expiration. In a single big table, deleting rows incurs the cost of scanning to find which to delete, and then the emptied space. On the other hand, dropping a partition is a fast operation independent of data size. It’s the equivalent of simply removing files on disk that contain the data.

    Partitioning a table also makes indices smaller and faster within each date range. Queries operating on recent data are likely to operate on “hot” indices that fit in memory. This speeds up reads.

    select from a big table vs select from a smaller partition

    Also inserts have smaller indices to update, so they go faster too.

    Time-based partitioning makes most sense when:

    1. Most queries access a very small subset of the most recent data

    2. Older data is periodically expired (deleted/dropped)

    Keep in mind that, in the wrong situation, reading all these partitions can hurt overhead more than it helps. However, in the right situations it is quite helpful. For example, when keeping a year of time series data and regularly querying only the most recent week.

    shards of partitions

    For example, let’s distribute and partition a table holding historical GitHub events data.

    Each record in this GitHub data set represents an event created in GitHub, along with key information regarding the event such as event type, creation date, and the user who created the event.

    The first step is to create and partition the table by time as we would in a single-node PostgreSQL database:

    Notice the . This tells Postgres that the table will be partitioned by the created_at column in ordered ranges. We have not yet created any partitions for specific ranges, though.

    Before creating specific partitions, let’s distribute the table in Citus. We’ll shard by repo_id, meaning the events will be clustered into shards per repository.

    1. SELECT create_distributed_table('github_events', 'repo_id');

    At this point Citus has created shards for this table across worker nodes. Internally each shard is a table with the name github_events_N for each shard identifier N. Also, Citus propagated the partitioning information, and each of these shards has Partition key: RANGE (created_at) declared.

    A partitioned table cannot directly contain data, it is more like a view across its partitions. Thus the shards are not yet ready to hold data. We need to create partitions and specify their time ranges, after which we can insert data that match the ranges.

    Citus provides helper functions for partition management. We can create a batch of monthly partitions using create_time_partitions():

    1. SELECT create_time_partitions(
    2. table_name := 'github_events',
    3. partition_interval := '1 month',
    4. end_at := now() + '12 months'
    5. );

    Citus also includes a view, , for an easy way to investigate the partitions it has created.

    1. SELECT partition
    2. FROM time_partitions
    3. WHERE parent_table = 'github_events'::regclass;
    4. ┌────────────────────────┐
    5. partition
    6. ├────────────────────────┤
    7. github_events_p2021_10
    8. github_events_p2021_11
    9. github_events_p2021_12
    10. github_events_p2022_01
    11. github_events_p2022_02
    12. github_events_p2022_03
    13. github_events_p2022_04
    14. github_events_p2022_05
    15. github_events_p2022_06
    16. github_events_p2022_07
    17. github_events_p2022_08
    18. github_events_p2022_09
    19. github_events_p2022_10
    20. └────────────────────────┘

    As time progresses, you will need to do some maintenance to create new partitions and drop old ones. It’s best to set up a periodic job to run the maintenance functions with an extension like pg_cron:

    1. -- set two monthly cron jobs:
    2. -- 1. ensure we have partitions for the next 12 months
    3. SELECT cron.schedule('create-partitions', '0 0 1 * *', $$
    4. SELECT create_time_partitions(
    5. table_name := 'github_events',
    6. partition_interval := '1 month',
    7. end_at := now() + '12 months'
    8. )
    9. $$);
    10. -- 2. (optional) ensure we never have more than one year of data
    11. CALL drop_old_time_partitions(
    12. now() - interval '12 months' /* older_than */
    13. );
    14. $$);

    Note

    Be aware that native partitioning in Postgres is still quite new and has a few quirks. Maintenance operations on partitioned tables will acquire aggressive locks that can briefly stall queries. There is currently a lot of work going on within the postgres community to resolve these issues, so expect time partitioning in Postgres to only get better.

    Some applications have data that logically divides into a small updatable part and a larger part that’s “frozen.” Examples include logs, clickstreams, or sales records. In this case we can combine partitioning with (introduced in Citus 10) to compress historical partitions on disk. Citus columnar tables are currently append-only, meaning they do not support updates or deletes, but we can use them for the immutable historical partitions.

    A partitioned table may be made up of any combination of row and columnar partitions. When using range partitioning on a timestamp key, we can make the newest partition a row table, and periodically roll the newest partition into another historical columnar partition.

    Let’s see an example, using GitHub events again. We’ll create a new table called github_columnar_events for disambiguation from the earlier example. To focus entirely on the columnar storage aspect, we won’t distribute this table.

    Next, download sample data:

    1. -- our new table, same structure as the example in
    2. -- the previous section
    3. CREATE TABLE github_columnar_events ( LIKE github_events )
    4. PARTITION BY RANGE (created_at);
    5. -- create partitions to hold two hours of data each
    6. SELECT create_time_partitions(
    7. table_name := 'github_columnar_events',
    8. partition_interval := '2 hours',
    9. start_from := '2015-01-01 00:00:00',
    10. end_at := '2015-01-01 08:00:00'
    11. );
    12. -- fill with sample data
    13. -- (note that this data requires the database to have UTF8 encoding)
    14. \COPY github_columnar_events FROM 'github_events.csv' WITH (format CSV)
    15. -- list the partitions, and confirm they're
    16. -- using row-based storage (heap access method)
    17. SELECT partition, access_method
    18. FROM time_partitions
    19. WHERE parent_table = 'github_columnar_events'::regclass;
    1. ┌─────────────────────────────────────────┬───────────────┐
    2. partition access_method
    3. ├─────────────────────────────────────────┼───────────────┤
    4. github_columnar_events_p2015_01_01_0000 heap
    5. github_columnar_events_p2015_01_01_0200 heap
    6. github_columnar_events_p2015_01_01_0400 heap
    7. github_columnar_events_p2015_01_01_0600 heap
    8. └─────────────────────────────────────────┴───────────────┘
    1. -- convert older partitions to use columnar storage
    2. CALL alter_old_partitions_set_access_method(
    3. 'github_columnar_events',
    4. '2015-01-01 06:00:00' /* older_than */,
    5. 'columnar'
    6. );
    7. -- latest uses row storage and can be updated
    8. SELECT partition, access_method
    9. FROM time_partitions
    10. WHERE parent_table = 'github_columnar_events'::regclass;
    1. ┌─────────────────────────────────────────┬───────────────┐
    2. partition access_method
    3. ├─────────────────────────────────────────┼───────────────┤
    4. github_columnar_events_p2015_01_01_0000 columnar
    5. github_columnar_events_p2015_01_01_0200 columnar
    6. github_columnar_events_p2015_01_01_0400 columnar
    7. github_columnar_events_p2015_01_01_0600 heap
    8. └─────────────────────────────────────────┴───────────────┘

    To see the compression ratio for a columnar table, use VACUUM VERBOSE. The compression ratio for our three columnar partitions is pretty good:

    1. INFO: statistics for "github_columnar_events_p2015_01_01_0000":
    2. storage id: 10000000003
    3. total file size: 4481024, total data size: 4444425
    4. compression rate: 8.31x
    5. total row count: 15129, stripe count: 1, average rows per stripe: 15129
    6. chunk count: 18, containing data for dropped columns: 0, zstd compressed: 18
    7. INFO: statistics for "github_columnar_events_p2015_01_01_0200":
    8. storage id: 10000000004
    9. total file size: 3579904, total data size: 3548221
    10. compression rate: 8.26x
    11. total row count: 12714, stripe count: 1, average rows per stripe: 12714
    12. chunk count: 18, containing data for dropped columns: 0, zstd compressed: 18
    13. INFO: statistics for "github_columnar_events_p2015_01_01_0400":
    14. storage id: 10000000005
    15. total file size: 2949120, total data size: 2917407
    16. compression rate: 8.51x
    17. total row count: 11756, stripe count: 1, average rows per stripe: 11756
    18. chunk count: 18, containing data for dropped columns: 0, zstd compressed: 18

    One power of the partitioned table github_columnar_events is that it can be queried in its entirety like a normal table.

    1. SELECT COUNT(DISTINCT repo_id)
    2. FROM github_columnar_events;
    1. ┌───────┐
    2. count
    3. ├───────┤
    4. 16001
    5. └───────┘

    Entries can be updated or deleted, as long as there’s a WHERE clause on the partition key which filters entirely into row table partitions.

    When a row partition has filled its range, you can archive it to compressed columnar storage. We can automate this with pg_cron like so:

    1. -- a monthly cron job
    2. SELECT cron.schedule('compress-partitions', '0 0 1 * *', $$
    3. CALL alter_old_partitions_set_access_method(
    4. 'github_columnar_events',
    5. now() - interval '6 months' /* older_than */,
    6. 'columnar'

    For more information, see Columnar Storage.