Query Processing

    Citus’s query processing pipeline involves the two components:

    • Distributed Query Planner and Executor

    • PostgreSQL Planner and Executor

    We discuss them in greater detail in the subsequent sections.

    Citus’s distributed query planner takes in a SQL query and plans it for distributed execution.

    Next, the planner breaks the query into two parts - the coordinator query which runs on the coordinator and the worker query fragments which run on individual shards on the workers. The planner then assigns these query fragments to the workers such that all their resources are used efficiently. After this step, the distributed query plan is passed on to the distributed executor for execution.

    The planning process for key-value lookups on the distribution column or modification queries is slightly different as they hit exactly one shard. Once the planner receives an incoming query, it needs to decide the correct shard to which the query should be routed. To do this, it extracts the distribution column in the incoming row and looks up the metadata to determine the right shard for the query. Then, the planner rewrites the SQL of that command to reference the shard table instead of the original table. This re-written plan is then passed to the distributed executor.

    Citus’s distributed executor runs distributed query plans and handles failures. The executor is well suited for getting fast responses to queries involving filters, aggregations and co-located joins, as well as running single-tenant queries with full SQL coverage. It opens one connection per shard to the workers as needed and sends all fragment queries to them. It then fetches the results from each fragment query, merges them, and gives the final results back to the user.

    If necessary Citus can gather results from subqueries and CTEs into the coordinator node and then push them back across workers for use by an outer query. This allows Citus to support a greater variety of SQL constructs.

    For example, having subqueries in a WHERE clause sometimes cannot execute inline at the same time as the main query, but must be done separately. Suppose a web analytics application maintains a table partitioned by page_id. To query the number of visitor hosts on the top twenty most visited pages, we can use a subquery to find the list of pages, then an outer query to count the hosts.

    The executor would like to run a fragment of this query against each shard by page_id, counting distinct host_ips, and combining the results on the coordinator. However, the LIMIT in the subquery means the subquery cannot be executed as part of the fragment. By recursively planning the query Citus can run the subquery separately, push the results to all workers, run the main fragment query, and pull the results back to the coordinator. The “push-pull” design supports subqueries like the one above.

    1. GroupAggregate (cost=0.00..0.00 rows=0 width=0)
    2. Group Key: remote_scan.page_id
    3. -> Sort (cost=0.00..0.00 rows=0 width=0)
    4. Sort Key: remote_scan.page_id
    5. -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
    6. -> Distributed Subplan 6_1
    7. -> Limit (cost=0.00..0.00 rows=0 width=0)
    8. -> Sort (cost=0.00..0.00 rows=0 width=0)
    9. Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
    10. -> HashAggregate (cost=0.00..0.00 rows=0 width=0)
    11. -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
    12. Task Count: 32
    13. Tasks Shown: One of 32
    14. Node: host=localhost port=9701 dbname=postgres
    15. -> HashAggregate (cost=54.70..56.70 rows=200 width=12)
    16. Group Key: page_id
    17. -> Seq Scan on page_views_102008 page_views (cost=0.00..43.47 rows=2247 width=4)
    18. Task Count: 32
    19. Tasks Shown: One of 32
    20. -> Task
    21. Node: host=localhost port=9701 dbname=postgres
    22. -> HashAggregate (cost=84.50..86.75 rows=225 width=36)
    23. Group Key: page_views.page_id, page_views.host_ip
    24. -> Hash Join (cost=17.00..78.88 rows=1124 width=36)
    25. Hash Cond: (page_views.page_id = intermediate_result.page_id)
    26. -> Hash (cost=14.50..14.50 rows=200 width=4)
    27. -> HashAggregate (cost=12.50..14.50 rows=200 width=4)
    28. -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..10.00 rows=1000 width=4)

    Let’s break it apart and examine each piece.

    The root of the tree is what the coordinator node does with the results from the workers. In this case it is grouping them, and GroupAggregate requires they be sorted first.

    1. -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
    2. -> Distributed Subplan 6_1
    3. .

    The custom scan has two large sub-trees, starting with a “distributed subplan.”

    Worker nodes run the above for each of the thirty-two shards (Citus is choosing one representative for display). We can recognize all the pieces of the IN (…) subquery: the sorting, grouping and limiting. When all workers have completed this query, they send their output back to the coordinator which puts it together as “intermediate results.”

    1. Task Count: 32
    2. Tasks Shown: One of 32
    3. -> Task
    4. Node: host=localhost port=9701 dbname=postgres
    5. -> HashAggregate (cost=84.50..86.75 rows=225 width=36)
    6. Group Key: page_views.page_id, page_views.host_ip
    7. -> Hash Join (cost=17.00..78.88 rows=1124 width=36)
    8. Hash Cond: (page_views.page_id = intermediate_result.page_id)
    9. .

    Citus starts another executor job in this second subtree. It’s going to count distinct hosts in page_views. It uses a JOIN to connect with the intermediate results. The intermediate results will help it restrict to the top twenty pages.

    The worker internally retrieves intermediate results using a read_intermediate_result function which loads data from a file that was copied in from the coordinator node.

    Once the distributed executor sends the query fragments to the workers, they are processed like regular PostgreSQL queries. The PostgreSQL planner on that worker chooses the most optimal plan for executing that query locally on the corresponding shard table. The PostgreSQL executor then runs that query and returns the query results back to the distributed executor. You can learn more about the PostgreSQL planner and from the PostgreSQL manual. Finally, the distributed executor passes the results to the coordinator for final aggregation.