अभिव्यक्ति

Building a simple Query Optimizer from scratch: Part 5

Going Deeper: Histograms, DP Join Ordering, and Cost Calibration

This series

Code for this series

This is Part 5 in a 6-part series on building a query optimizer from scratch in Java. Part 4 built the cost model and cardinality estimator.

In the last post we built a cost model that uses operator-specific formulas and a cardinality estimator that uses NDV-based selectivity. It works, but it has three significant weaknesses. Range predicates get a flat 0.33 selectivity regardless of the actual data distribution. Join ordering uses a greedy heuristic that can miss the globally optimal plan. And the cost constants are arbitrary numbers unrelated to actual hardware performance.

This post tackles all three. We'll implement equi-depth histograms for accurate selectivity estimation, dynamic programming join ordering for provably optimal join plans (for small numbers of tables), and hardware microbenchmarks to calibrate cost constants against real measurements.

Equi-depth histograms

The 0.33 default for range selectivity is a blunt instrument. If a price column ranges from 10 to 1000 and you filter WHERE price > 900, the true selectivity might be 5% - but our estimator says 33%. That's a 6x error, which compounds through the plan tree and can lead to terrible join ordering decisions.

Histograms fix this by capturing the actual data distribution. An equi-depth histogram divides the data into buckets where each bucket contains approximately the same number of rows. Within each bucket, we assume uniform distribution - a much better assumption within a narrow range than across the entire column.

The Bucket structure

public class Histogram<T extends Comparable<? super T>> {
    private final String columnName;
    private final DataType dataType;
    private final List<Bucket<T>> buckets;
    private final long totalRows;

    public record Bucket<T extends Comparable<? super T>>(
        T lowerBound,      // Inclusive lower bound
        T upperBound,      // Inclusive upper bound
        long rowCount,     // Rows in this bucket
        long distinctCount // Distinct values in this bucket
    ) {
        public boolean contains(T value) {
            return lowerBound.compareTo(value) <= 0
                && upperBound.compareTo(value) >= 0;
        }

        public double density() {
            return distinctCount > 0
                ? (double) rowCount / distinctCount
                : 0.0;
        }
    }
}

Each bucket knows its value range, the number of rows it contains, and how many distinct values those rows represent. The density() - rows per distinct value - is used for equality estimation within a bucket. A bucket with 100 rows and 20 distinct values has a density of 5.0, meaning each distinct value appears roughly 5 times.

Building histograms

The Histogram.build static method creates a histogram from raw column values:

public static <T extends Comparable<? super T>> Histogram<T> build(
        String columnName, DataType dataType,
        List<Object> values, int numBuckets) {

    // Filter nulls and sort
    List<T> sorted = new ArrayList<>();
    for (Object val : values) {
        if (val instanceof Comparable<?>)
            sorted.add((T) val);
    }
    Collections.sort(sorted);

    long totalRows = sorted.size();
    int targetBucketSize = (int) Math.ceil((double) totalRows / numBuckets);

    // Build buckets
    List<Bucket<T>> buckets = new ArrayList<>();
    int startIdx = 0;

    while (startIdx < sorted.size()) {
        int endIdx = getEndIdx(startIdx, targetBucketSize, sorted);

        T lowerBound = sorted.get(startIdx);
        T upperBound = sorted.get(endIdx - 1);

        Set<T> distinctValues = new HashSet<>(
            sorted.subList(startIdx, endIdx));

        buckets.add(new Bucket<>(
            lowerBound, upperBound,
            endIdx - startIdx, distinctValues.size()));

        startIdx = endIdx;
    }

    return new Histogram<>(columnName, dataType,
        buckets.size(), buckets, totalRows);
}

The algorithm is straightforward: sort all values, then sweep through in chunks of targetBucketSize. But there's a subtlety in getEndIdx:

private static <T extends Comparable<? super T>> int getEndIdx(
        int startIdx, int targetBucketSize, List<T> values) {
    int endIdx = Math.min(startIdx + targetBucketSize, values.size());

    // Don't split equal values across buckets
    if (endIdx < values.size()) {
        T endValue = values.get(endIdx - 1);
        while (endIdx < values.size()
                && values.get(endIdx).equals(endValue)) {
            endIdx++;
        }
    }
    return endIdx;
}

If the boundary falls in the middle of a run of identical values, we extend the bucket to include all of them. This prevents a value from appearing in two buckets, which would make equality estimation ambiguous. The trade-off is that buckets may be slightly unequal in size - but the alternative (splitting identical values) would violate the correctness of the estimation methods.

For example, given values [1, 2, 2, 3, 3, 3, 4, 5, 5, 6] with 3 buckets and a target size of ~3.3 rows per bucket, the builder might produce:

Bucket 1: [1, 2]  - 3 rows, 2 distinct
Bucket 2: [3, 3]  - 3 rows, 1 distinct
Bucket 3: [4, 6]  - 4 rows, 3 distinct

Bucket 2 absorbed all three copies of 3 rather than splitting them, and Bucket 3 is slightly larger to compensate.

The HistogramBuilder

Histograms are built automatically during statistics collection for numeric columns:

public class HistogramBuilder {
    private static final int DEFAULT_NUM_BUCKETS = 10;

    public static <T extends Comparable<? super T>> void buildHistograms(
            TableMetadata table, int numBuckets) {
        Schema schema = table.getSchema();
        List<Map<Schema.Column, Object>> data = table.getData();

        for (int colIdx = 0; colIdx < schema.columnCount(); colIdx++) {
            Schema.Column column = schema.getColumn(colIdx);

            if (column.type() == DataType.INTEGER
                    || column.type() == DataType.FLOAT) {
                Histogram<T> histogram = buildHistogramForColumn(
                    column.name(), column.type(),
                    data, colIdx, numBuckets);
                table.addHistogram(histogram);
            }
        }
    }
}

VARCHAR columns are skipped - string histograms are possible (PostgreSQL builds them using the first few characters as sort keys) but add complexity without much benefit for our optimizer.

Equality estimation with histograms

With histograms, equality estimation becomes much more precise than the global 1/NDV approach:

public double estimateEquality(Object value) {
    Bucket<T> bucket = findBucket((T) value);
    if (bucket == null) return 0.0;  // Value outside histogram range

    if (bucket.distinctCount == 0) return 0.0;

    // Uniform assumption *within the bucket*
    return bucket.density() / totalRows;
}

The formula is (rows_per_distinct_value_in_bucket) / total_rows. If a bucket has 100 rows and 20 distinct values, the estimated selectivity for any value in that bucket is (100/20) / total_rows = 5 / total_rows. This is much more accurate than global 1/NDV when the data is skewed - a bucket covering popular values will have higher density than a bucket covering rare values.

Range estimation with histograms

Range estimation is where histograms really shine. For column > value, we iterate through the buckets:

public double estimateGreaterThan(Object val, boolean inclusive) {
    double selectivity = 0.0;

    for (Bucket<T> bucket : buckets) {
        double v = ((Number) val).doubleValue();
        double upper = ((Number) bucket.upperBound()).doubleValue();
        double lower = ((Number) bucket.lowerBound()).doubleValue();

        if (v < upper || (inclusive && v == upper)) {
            if (v <= lower) {
                // Entire bucket qualifies
                selectivity += (double) bucket.rowCount / totalRows;
            } else {
                // Partial bucket - linear interpolation
                double fraction = (upper - v) / (upper - lower);
                selectivity += fraction * bucket.rowCount / totalRows;
            }
        }
    }

    return Math.min(1.0, selectivity);
}

Buckets that are entirely above the threshold contribute their full row count. Buckets that are entirely below contribute nothing. A bucket that straddles the threshold uses linear interpolation to estimate the fraction above. This is the uniform-within-bucket assumption in action - within a bucket's range, we assume values are evenly spread.

The estimateLessThan method works symmetrically, and estimateRange for BETWEEN combines the two:

public double estimateRange(T lowValue, T highValue) {
    double pLessHigh = estimateLessThan(highValue, true);
    double pLessLow = estimateLessThan(lowValue, false);
    return Math.max(0.0, pLessHigh - pLessLow);
}

Integration with the CardinalityEstimator

The cardinality estimator's estimateRangeSelectivity and estimateEqualitySelectivity methods (from Part 4) already check for histograms before falling back to the default heuristics:

private double estimateRangeSelectivity(...) {
    // ... extract column and value ...
    if (tableName != null) {
        TableMetadata table = catalog.getTableMetadata(tableName);
        Histogram<?> histogram = table.getHistogram(column.columnName());
        if (histogram != null) {
            return greaterThan
                ? histogram.estimateGreaterThan(value, inclusive)
                : histogram.estimateLessThan(value, inclusive);
        }
    }
    return 0.33; // Fallback when no histogram
}

This is a clean integration pattern - the existing code paths work with or without histograms, and the histogram provides a better estimate when available. No changes needed to any rule or the cost model itself.

Dynamic programming join ordering

The greedy join reorder rule from Part 3 considers local swaps - it can improve the join order, but it can't find the globally optimal arrangement. For three or more tables, the optimal join tree might require a non-obvious rearrangement that no sequence of local swaps would discover.

The System R algorithm solves this with dynamic programming: enumerate all subsets of tables, find the best plan for each subset, and build up to the full plan. It's guaranteed to find the optimal order (within the space of left-deep or bushy trees, depending on the implementation).

The algorithm

The DPJoinOrderer works in three phases:

public class DPJoinOrderer {
    private final CostModel costModel;
    private final Map<Set<String>, PlanInfo> memo;

    record PlanInfo(LogicalNode plan, double cost, Set<String> tables) {}

    public record JoinCondition(
        String leftTable, String rightTable, Expression condition
    ) {
        boolean connects(Set<String> left, Set<String> right) {
            return (left.contains(leftTable) && right.contains(rightTable))
                || (left.contains(rightTable) && right.contains(leftTable));
        }
    }

    public LogicalNode findBestJoinOrder(
            List<LogicalScan> scans,
            List<JoinCondition> conditions) {

        memo.clear();

        // Phase 1: Initialize single-table plans
        for (LogicalScan scan : scans) {
            Set<String> tableSet = Collections.singleton(scan.getTableName());
            double cost = costModel.estimate(scan);
            memo.put(tableSet, new PlanInfo(scan, cost, tableSet));
        }

        // Phase 2: Build up larger subsets
        for (int size = 2; size <= scans.size(); size++) {
            optimizeSubsetsOfSize(size, scans, conditions);
        }

        // Phase 3: Return best plan for all tables
        Set<String> allTables = new HashSet<>();
        for (LogicalScan scan : scans) {
            allTables.add(scan.getTableName());
        }
        return memo.get(allTables).plan();
    }
}

Phase 1 seeds the memoization table with the base case - each single table's scan plan and cost. Phase 2 iterates through subset sizes from 2 up to N, finding the cheapest plan for each subset by trying all ways to partition it into two previously-solved subsets. Phase 3 looks up the answer for the full set of tables.

Finding the best plan for a subset

This is the core of the DP algorithm. For each subset of tables, we try every way to split it into two non-empty parts, look up the best plans for both parts, and combine them with a join:

private PlanInfo findBestPlanForSubset(
        Set<String> tables,
        List<JoinCondition> conditions) {
    PlanInfo best = null;

    for (Set<String> left : properSubsets(tables)) {
        Set<String> right = new HashSet<>(tables);
        right.removeAll(left);

        if (right.isEmpty() || left.size() > right.size())
            continue;

        PlanInfo leftPlan = memo.get(left);
        PlanInfo rightPlan = memo.get(right);
        if (leftPlan == null || rightPlan == null)
            continue;

        // Find a join condition connecting left and right
        JoinCondition joinCond = findJoinCondition(left, right, conditions);
        if (joinCond != null) {
            LogicalNode join = new LogicalJoin(
                leftPlan.plan(), rightPlan.plan(),
                LogicalJoin.JoinType.INNER, joinCond.condition());

            double cost = costModel.estimate(join);
            if (best == null || cost < best.cost()) {
                best = new PlanInfo(join, cost, tables);
            }
        }
    }
    return best;
}

The left.size() > right.size() check avoids considering a partition and its mirror image - splitting {A, B, C} into ({A}, {B, C}) and ({B, C}, {A}) would produce the same join with left and right swapped. Since we try both anyway (the loop covers all proper subsets), we only need to consider each unordered partition once.

The findJoinCondition check is crucial - we only create a join if there's an explicit join condition connecting the two subsets. Without this check, we'd produce Cartesian products, which are almost always catastrophically expensive. If no condition connects a particular partition, we skip it.

Subset enumeration with bit manipulation

The properSubsets method generates all subsets of a set using the classic bitmask technique:

private Iterable<? extends Set<String>> properSubsets(Set<String> set) {
    var list = new ArrayList<>(set);
    List<Set<String>> result = new ArrayList<>();

    int totalSubsets = (1 << list.size());
    for (int i = 1; i < totalSubsets - 1; i++) {
        Set<String> subset = new HashSet<>();
        for (int j = 0; j < list.size(); j++) {
            if ((i & (1 << j)) != 0) {
                subset.add(list.get(j));
            }
        }
        result.add(subset);
    }
    return result;
}

For a set of 4 tables, this generates 2^4 - 2 = 14 proper subsets (excluding the empty set and the full set). Each integer from 1 to 14 is treated as a bitmask where bit j indicates whether table j is included. This is the textbook approach from the System R paper.

Complexity and practical limits

The time complexity is O(n × 2^n) - for each of 2^n subsets, we enumerate O(n) ways to split it. The space complexity is O(2^n) for the memo table. This is fine for up to about 10 tables:

2 tables:  4 subsets   - instant
3 tables:  8 subsets   - instant
4 tables:  16 subsets  - instant
5 tables:  32 subsets  - instant
10 tables: 1024 subsets - milliseconds
15 tables: 32768 subsets - seconds
20 tables: 1M+ subsets  - impractical

For queries that exceed the limit, the optimizer falls back to a left-deep heuristic that sorts tables by cardinality (smallest first):

private LogicalNode buildLeftDeepTree(
        List<LogicalScan> scans,
        List<JoinCondition> conditions) {
    var sorted = new ArrayList<>(scans);
    sorted.sort((a, b) -> {
        long cardA = costModel.estimateCardinality(a);
        long cardB = costModel.estimateCardinality(b);
        return Long.compare(cardA, cardB);
    });

    LogicalNode result = sorted.getFirst();
    for (int i = 1; i < sorted.size(); i++) {
        var right = sorted.get(i);
        var leftTables = getTableNames(result);
        var rightTables = Collections.singleton(right.getTableName());
        JoinCondition condition = findJoinCondition(
            leftTables, rightTables, conditions);
        if (condition != null) {
            result = new LogicalJoin(result, right,
                LogicalJoin.JoinType.INNER, condition.condition());
        }
    }
    return result;
}

This fallback is essentially the greedy heuristic from Part 3, promoted to a standalone method. It won't find the optimal order, but it runs in O(n log n) and produces a reasonable plan.

DP vs greedy: when it matters

For a three-table join with tables of sizes 8, 10, and 7 rows, the DP optimizer and the greedy heuristic often agree - the search space is small enough that the optimal plan is easy to find either way. The difference becomes dramatic with more tables or when cardinality estimates vary widely. Consider four tables: a 3-row dimension table, two 10-row tables, and one 1000-row fact table. The greedy heuristic might join the two 10-row tables first, producing a 100-row intermediate. DP would discover that joining the 3-row table with the 1000-row table first (producing ~30 rows via the join condition's selectivity) and then joining the result with both 10-row tables is much cheaper overall.

The memo table also provides useful diagnostics:

public void printMemoStatistics() {
    System.out.println("Memoized plans: " + memo.size());
    var sizeCount = new HashMap<Integer, Integer>();
    memo.keySet().forEach(tables ->
        sizeCount.put(tables.size(),
            sizeCount.getOrDefault(tables.size(), 0) + 1));
    sizeCount.forEach((key, value) ->
        System.out.println("  Size " + key + ": " + value + " subsets"));
}

This shows how many subsets the algorithm explored at each size, confirming the exponential growth.

Cost calibration

The final piece: making cost constants meaningful. In Part 4, we set PAGE_COST = 1.0 and TUPLE_COST = 0.01 as arbitrary defaults. These ratios work for relative plan comparison, but the resulting cost numbers are in "optimizer units" with no connection to real execution time.

The CostCalibrator runs microbenchmarks on the current hardware to derive cost constants measured in milliseconds:

public class CostCalibrator {
    private static final int WARMUP_ITERATIONS = 100;
    private static final int BENCHMARK_ITERATIONS = 1000;

    public CostModel.CostConfig calibrate() {
        warmup();

        double pageCost = calibratePageAccess();
        double tupleCost = calibrateTupleProcessing();
        double comparisonCost = calibrateComparison();
        double hashCost = calibrateHashing();

        CostModel.CostConfig config = new CostModel.CostConfig();
        config.PAGE_COST = pageCost;
        config.TUPLE_COST = tupleCost;
        config.COMPARISON_COST = comparisonCost;
        config.HASH_COST = hashCost;

        return config;
    }
}

JVM warmup

Java microbenchmarks are notoriously tricky because the JIT compiler optimizes hot code paths. The calibrator runs warmup iterations first to get the JVM into steady state:

private void warmup() {
    Random rand = new Random(42);
    long sum = 0;

    for (int i = 0; i < WARMUP_ITERATIONS; i++) {
        byte[] page = new byte[8192];
        for (int j = 0; j < page.length; j += 64) {
            page[j] = (byte) rand.nextInt();
        }
        sum += page[0];
    }

    Object[][] tuples = generateTestTuples(100);
    for (int i = 0; i < WARMUP_ITERATIONS; i++) {
        for (Object[] tuple : tuples) {
            sum += (Integer) tuple[0];
        }
    }

    // Prevent dead code elimination
    if (sum == Long.MAX_VALUE) System.out.println("Warmup complete");
}

The sum variable and the final check prevent the JIT from eliminating the warmup code entirely. Without this, the compiler could determine that the loop has no observable side effects and skip it.

Page access benchmark

This measures the cost of touching a page of data - simulating what a table scan does for each page:

private double calibratePageAccess() {
    final int PAGE_SIZE = 8192;
    final int NUM_PAGES = 100;

    List<byte[]> pages = new ArrayList<>();
    for (int i = 0; i < NUM_PAGES; i++)
        pages.add(new byte[PAGE_SIZE]);

    long totalTime = 0;
    long operations = 0;

    for (int iter = 0; iter < BENCHMARK_ITERATIONS; iter++) {
        long startTime = System.nanoTime();

        for (byte[] page : pages) {
            for (int j = 0; j < PAGE_SIZE; j += 64)
                page[j] = (byte) rand.nextInt();
        }

        long endTime = System.nanoTime();
        totalTime += (endTime - startTime);
        operations += NUM_PAGES;
    }

    return (totalTime / 1_000_000.0) / operations; // milliseconds per page
}

The inner loop touches every cache line (64 bytes) in the page, which models the memory access pattern of scanning through row data. The result is averaged over 1000 iterations × 100 pages = 100,000 page accesses.

Tuple processing, comparison, and hash benchmarks

The other three benchmarks follow the same pattern. Tuple processing measures copying fields between arrays (modeling the work of passing tuples between operators). Comparison measures integer comparisons (modeling predicate evaluation). Hashing measures hashCode() calls (modeling hash join and hash aggregation).

private double calibrateComparison() {
    Integer[] values = new Integer[1000];
    // ... populate with random values ...

    long totalTime = 0;
    long operations = 0;
    int matches = 0;

    for (int iter = 0; iter < BENCHMARK_ITERATIONS; iter++) {
        long startTime = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            if (values[i] > 5000) matches++;
        }
        long endTime = System.nanoTime();
        totalTime += (endTime - startTime);
        operations += 1000;
    }

    return (totalTime / 1_000_000.0) / operations;
}

The matches counter serves double duty: it prevents dead code elimination (the JIT can't optimize away a loop whose result is used later) and provides a sanity check that the benchmark is actually doing work.

Saving and loading calibration

Calibration results can be persisted to a properties file so they don't need to be recomputed every time:

public void saveCalibration(CostModel.CostConfig config, String filename)
        throws IOException {
    Properties props = new Properties();
    props.setProperty("PAGE_COST", String.valueOf(config.PAGE_COST));
    props.setProperty("TUPLE_COST", String.valueOf(config.TUPLE_COST));
    props.setProperty("COMPARISON_COST", String.valueOf(config.COMPARISON_COST));
    props.setProperty("HASH_COST", String.valueOf(config.HASH_COST));

    try (FileOutputStream fos = new FileOutputStream(filename)) {
        props.store(fos, "Cost Model Calibration");
    }
}

public CostModel.CostConfig loadCalibration(String filename)
        throws IOException {
    Properties props = new Properties();
    try (FileInputStream fis = new FileInputStream(filename)) {
        props.load(fis);
    }
    // ... parse properties into CostConfig ...
}

The compareConfigs utility method lets you see how calibrated constants differ from defaults:

public static void compareConfigs(
        CostModel.CostConfig config1, String name1,
        CostModel.CostConfig config2, String name2) {
    System.out.printf("%-20s | %-15s | %-15s | Ratio\n",
        "Parameter", name1, name2);
    compareParam("PAGE_COST", config1.PAGE_COST, config2.PAGE_COST);
    compareParam("TUPLE_COST", config1.TUPLE_COST, config2.TUPLE_COST);
    // ...
}

On a typical modern machine, calibrated constants might look like:

PAGE_COST:       0.000850 ms
TUPLE_COST:      0.000003 ms
COMPARISON_COST: 0.000001 ms
HASH_COST:       0.000002 ms

The key observation: the ratio between PAGE_COST and COMPARISON_COST is about 850x - meaning memory access is nearly three orders of magnitude more expensive than a simple comparison. This ratio directly affects optimizer decisions. With the default constants (ratio of 1000x), the optimizer behaves similarly to calibrated constants on modern hardware. But if you were running on a system with extremely fast memory (low page cost) or extremely expensive comparisons (complex predicate evaluation), calibration would capture that difference and the optimizer would adapt.

How the three features interact

These three improvements aren't independent - they reinforce each other:

Histograms improve cardinality estimates. Better estimates for range predicates mean the cost model produces more accurate costs for filter operators and the joins above them.

Better costs improve DP join ordering. The DP algorithm's quality depends entirely on the cost model. If cardinality estimates are off by 10x, the "optimal" join order might actually be worse than the greedy heuristic. Histograms make the cost comparisons between alternative join orders more trustworthy.

Calibrated constants make costs comparable across queries. With calibrated costs in milliseconds, you can meaningfully compare costs across different queries and even predict approximate execution time. With arbitrary constants, you can only compare plans for the same query.

Together, they transform the optimizer from a system that makes rough structural improvements into one that makes informed quantitative decisions grounded in actual data distributions and hardware characteristics.

What production systems do differently

Histogram types. PostgreSQL uses a mix of most-common-values (MCV) lists and equi-depth histograms. The MCV list captures the top N most frequent values exactly, and the histogram covers the rest. This handles skewed distributions (where a few values dominate) much better than equi-depth alone. SQL Server uses equi-height histograms with up to 200 steps.

Adaptive query execution. Even with histograms, estimates can be wrong. Systems like Spark SQL and Oracle use adaptive execution: they start executing the plan, observe actual cardinalities at runtime, and re-optimize the remaining plan if the actuals diverge significantly from the estimates. This is a fundamentally different approach from static optimization.

Join ordering beyond DP. For very large queries (10+ tables, common in data warehouses with star schemas), even DP is too expensive. Production systems use techniques like branch-and-bound pruning, randomized search (genetic algorithms in PostgreSQL's GEQO), or top-down memoization with cost-based pruning (Cascades framework).

Cost calibration in practice. Production databases don't typically ask users to run microbenchmarks. Instead, they use adaptive approaches - measuring actual I/O latency and CPU usage during query execution and adjusting internal constants over time. Some systems expose "planner cost constants" that DBAs can tune (PostgreSQL's random_page_cost, cpu_tuple_cost, etc.), which is essentially manual calibration.

What's next

We now have a sophisticated optimization stack: histograms for accurate selectivity, DP for optimal join ordering, and calibrated costs for meaningful plan comparison. But all of this works on logical plans - abstract descriptions of what to compute. In the final post, we'll cross the logical-to-physical boundary: choosing algorithms (hash join vs. nested loop), implementing the Volcano iterator model, and executing plans to produce actual query results.

Next up: Part 6 - Physical Execution: From Plans to Results