Preface

The MergeTree table engine is the most commonly used table engine in Clickhouse. Queries must first read data from the table, and there is no doubt that multi-threaded parallel reading, especially parallel reading from multiple disks, is the most efficient. This article introduces and analyzes the code implementation of parallel reading data on the MergeTree table engine.

Processor class for reading MergeTree tables

The entry point for reading MergeTree tables is MergeTreeDataSelectExecutor, from which all SELECT statements for MergeTree tables are executed. This executor generates the query step ReadFromMergeTree, which is part of the query plan. They are a higher-level abstraction of the query execution network, from which the specific query execution network is generated.

The query step ReadFromMergeTree chooses whether to generate a multi-stream query pipeline or a single-stream query pipeline based on whether the current query is expected to generate multiple data streams or a single data stream and the nature of the query. Generating a multi-stream query is done by calling the function readFromPool(), while generating a single-stream query is done by calling the function readInOrder().

Here are two examples of execution pipelines:

Multi-stream pipeline

DESKTOP-UBNBIFE.localdomain :) explain pipeline select * from events;

┌─explain───────────────────────┐
 (Expression)                  
 ExpressionTransform × 4       
   (SettingQuotaAndLimits)     
     (ReadFromMergeTree)       
     MergeTreeThread × 4 0  1 
└───────────────────────────────┘

MergeTreeThread is implemented by class MergeTreeThreadSelectProcessor.

Single data stream

DESKTOP-UBNBIFE.localdomain :) explain pipeline select * from events settings max_threads=1;

┌─explain────────────────────┐
 (Expression)               
 ExpressionTransform        
   (SettingQuotaAndLimits)  
     (ReadFromMergeTree)    
     MergeTreeInOrder 0  1 
└────────────────────────────┘

MergeTreeInOrder is implemented by the class MergeTreeInOrderSelectProcessor.

The explain results show that MergeTreeThread runs in four threads at the same time, while MergeTreeInOrder runs in a single thread. Since MergeTreeThread runs in multiple threads, it requires a task allocation mechanism, which is implemented by MergeTreeReadPool.

MergeTreeReadPool

The MergeTreeReadPool organizes and distributes the total number of read tasks passed in during construction for performance optimization purposes. It is shared among multiple threads as a task pool.

/**   Provides read tasks for MergeTreeThreadSelectBlockInputStream`s in fine-grained batches, allowing for more
 *    uniform distribution of work amongst multiple threads. All parts and their ranges are divided into `threads`
 *    workloads with at most `sum_marks / threads` marks. Then, threads are performing reads from these workloads
 *    in "sequential" manner, requesting work in small batches. As soon as some thread has exhausted
 *    it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or
 *    continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`).
 */
class MergeTreeReadPool : private boost::noncopyable
{
public:
    /** Pull could dynamically lower (backoff) number of threads, if read operation are too slow.
      * Settings for that backoff.
      */
    struct BackoffSettings
    {
        // ...
    };

    BackoffSettings backoff_settings;

private:
    /** State to track numbers of slow reads.
      */
    struct BackoffState
    {
        // ...
    };

    BackoffState backoff_state;

public:
    MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names);

    /** Each worker could call this method and pass information about read performance.
      * If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads.
      * This allows to overcome excessive load to disk subsystem, when reads are not from page cache.
      */
    void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info);

    Block getHeader() const;

private:
    std::vector<size_t> fillPerPartInfo(const RangesInDataParts & parts);

    void fillPerThreadInfo(
        const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
        const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);

    const MergeTreeData & data;
    StorageMetadataPtr metadata_snapshot;
    const Names column_names;
    bool do_not_steal_tasks;
    bool predict_block_size_bytes;
    std::vector<NameSet> per_part_column_name_set;
    std::vector<NamesAndTypesList> per_part_columns;
    std::vector<NamesAndTypesList> per_part_pre_columns;
    std::vector<char> per_part_should_reorder;
    std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
    PrewhereInfoPtr prewhere_info;

    struct Part
    {
        MergeTreeData::DataPartPtr data_part;
        size_t part_index_in_query;
    };

    std::vector<Part> parts_with_idx;

    struct ThreadTask
    {
        struct PartIndexAndRange
        {
            size_t part_idx;
            MarkRanges ranges;
        };

        std::vector<PartIndexAndRange> parts_and_ranges;
        std::vector<size_t> sum_marks_in_parts;
    };

    std::vector<ThreadTask> threads_tasks;

    std::set<size_t> remaining_thread_tasks;

    RangesInDataParts parts_ranges;

    mutable std::mutex mutex;

    Poco::Logger * log = &Poco::Logger::get("MergeTreeReadPool");

    std::vector<bool> is_part_on_remote_disk;

    bool suffled_in_partition;
};

The important member variables and member functions are as follows:

Type Name Meaning
Member variable parts_ranges Specifies all the parts involved in the read operation and the range of marks within each part (the MergeTree table engine uses marks to index column data files).
Member variable threads_tasks The read tasks assigned to each thread.
Member variable data Points to the MergeTree storage, an IStorage object.
Member function fillPerPartInfo Collects and organizes information about the parts, obtaining information such as the total number of marks, whether it is remote data, and the number of marks in each part.
Member function fillPerThreadInfo Distributes the reading tasks according to the information collected by fillPerPartInfo, trying to keep the workload of each thread as even as possible.
At the same time, it tries to let a thread read an entire part, and tries to let different threads access different disks at the same time, with the aim of parallel reading as much as possible,
making full use of the capabilities of each thread.
Member function getTask: Each instance of MergeTreeThread that reads data obtains tasks from the pool represented by MergeTreeReadPool.  

Dependency order: fillPerPartInfo() collects and saves part information, fillPerThreadInfo() allocates reading tasks based on this information, and MergeTreeThread obtains tasks and executes them via the getTask() function.

Other processors that read the MergeTree

The class inheritance structure of processors that read the MergeTree is as follows:

MergeTreeBaseSelectProcessor
    |- (Processor: MergeTreeThread) MergeTreeThreadSelectProcessor
    |- (抽象类) MergeTreeSelectProcessor
        |- (Processor: MergeTreeReverse) MergeTreeReverseSelectProcessor
        |- (Processor: MergeTreeInOrder) MergeTreeInOrderSelectProcessor

MergeTreeBaseSelectProcessor is the base class for all processors. MergeTreeReverseSelectProcessor is similar to MergeTreeInOrderSelectProcessor, except that the order of reading is reversed.

Optimization options that affect the execution pipeline

optimize_read_in_order - When the ORDER BY keyword specified when building the MergeTree table is queried, use MergeTreeInOrder.

optimize_aggregation_in_order - When the ORDER BY keyword specified when building the MergeTree table is queried, use MergeTreeInOrder.

Other processors that read MergeTree

MergeTreeSequentialSource is a lightweight class for reading a single part. It is currently only used by the MergeTask class, which does the background merge operation.

/// Lightweight (in terms of logic) stream for reading single part from MergeTree
class MergeTreeSequentialSource : public SourceWithProgress
{
public:
    MergeTreeSequentialSource(
        const MergeTreeData & storage_,
        const StorageMetadataPtr & metadata_snapshot_,
        MergeTreeData::DataPartPtr data_part_,
        Names columns_to_read_,
        bool read_with_direct_io_,
        bool take_column_types_from_storage,
        bool quiet = false);

    ~MergeTreeSequentialSource() override;

    String getName() const override { return "MergeTreeSequentialSource"; }

    size_t getCurrentMark() const { return current_mark; }

    size_t getCurrentRow() const { return current_row; }

protected:
    Chunk generate() override;

private:

    const MergeTreeData & storage;
    StorageMetadataPtr metadata_snapshot;

    /// Data part will not be removed if the pointer owns it
    MergeTreeData::DataPartPtr data_part;

    /// Columns we have to read (each Block from read will contain them)
    Names columns_to_read;

    /// Should read using direct IO
    bool read_with_direct_io;

    Poco::Logger * log = &Poco::Logger::get("MergeTreeSequentialSource");

    std::shared_ptr<MarkCache> mark_cache;
    using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
    MergeTreeReaderPtr reader;

    /// current mark at which we stop reading
    size_t current_mark = 0;

    /// current row at which we stop reading
    size_t current_row = 0;

private:
    /// Closes readers and unlock part locks
    void finish();
};