Building a simple Query Optimizer from scratch: Part 6
Physical Execution: From Plans to Results
This series
- Part 1: Designing a Simple Query Optimizer: Architecture & Foundation
- Part 2: From SQL to Canonical Logical Plans
- Part 3: A Composable Rule Engine and Logical Rewrites
- Part 4: Cost Modeling and Cardinality Estimation
- Part 5: Going Deeper: Histograms, DP Join Ordering, and Cost Calibration
- Part 6: Physical Execution: From Plans to Results (this post)
This is Part 6 - the final post - in a 6-part series on building a query optimizer from scratch in Java. Part 5 covered histograms, DP join ordering, and cost calibration.
For five posts, we've been building query plans - parsing SQL, constructing logical operators, rearranging them with rules, estimating their costs. But nothing has actually executed. No rows have been read, no predicates evaluated, no join matches found. In this final post, we cross the logical-to-physical boundary and build the machinery that turns plans into results.
The transition has three parts. The PhysicalPlanBuilder converts an optimized logical plan into a physical plan by choosing concrete algorithms. Each physical operator implements the Iterator interface, producing tuples one at a time in the Volcano model. And the Executor drives the whole thing, pulling tuples from the root and collecting results.
The logical-to-physical boundary
A logical plan says what to compute. LogicalJoin says "join these two inputs on this condition" without specifying how. A physical plan says how. PhysicalHashJoin says "build a hash table on the right side, probe with the left" - that's a specific algorithm with specific performance characteristics.
The PhysicalPlanBuilder walks the optimized logical plan and makes these algorithm choices:
public class PhysicalPlanBuilder {
private final Catalog catalog;
private final boolean preferHashJoin;
public PhysicalPlanBuilder(Catalog catalog, boolean preferHashJoin) {
this.catalog = catalog;
this.preferHashJoin = preferHashJoin;
}
public PhysicalNode build(LogicalNode logicalPlan) {
return convertNode(logicalPlan);
}
private PhysicalNode convertNode(LogicalNode node) {
return switch (node) {
case LogicalScan scan -> convertScan(scan);
case LogicalFilter filter -> convertFilter(filter);
case LogicalProject project -> convertProject(project);
case LogicalJoin join -> convertJoin(join);
default -> throw new IllegalArgumentException(
"Unknown node: " + node.getClass().getSimpleName());
};
}
}
Most conversions are one-to-one: LogicalScan becomes PhysicalScan, LogicalFilter becomes PhysicalFilter. The interesting case is joins.
Choosing a join algorithm
The join conversion is where the "physical" in physical plan earns its name:
private PhysicalNode convertJoin(LogicalJoin join) {
PhysicalNode left = convertNode(join.getLeft());
PhysicalNode right = convertNode(join.getRight());
Schema leftSchema = getOutputSchema(join.getLeft());
Schema rightSchema = getOutputSchema(join.getRight());
if (preferHashJoin && isEquiJoin(join.getCondition())) {
return new PhysicalHashJoin(
left, right, join.getCondition(),
leftSchema, rightSchema);
} else {
return new PhysicalNestedLoopJoin(
left, right, join.getCondition(),
leftSchema, rightSchema);
}
}
The decision rule is simple: use a hash join for equi-joins (conditions of the form column = column), and fall back to nested loop for everything else. Hash join requires equality because it relies on hashing both sides of the condition to the same bucket - you can't hash > or < comparisons.
private boolean isEquiJoin(Expression condition) {
if (condition instanceof Expression.BinaryOp(
Expression.BinaryOp.Operator operator,
Expression left, Expression right)) {
return operator == Expression.BinaryOp.Operator.EQ
&& left instanceof Expression.ColumnRef
&& right instanceof Expression.ColumnRef;
}
return false;
}
This uses Java's record pattern matching to destructure the BinaryOp and check all three conditions in one expression: the operator is EQ, the left side is a column reference, and the right side is a column reference.
Schema propagation
Physical operators need schema information to evaluate expressions. A filter needs to know the column layout so it can resolve ColumnRef("city") to the right position in a tuple. The getOutputSchema method computes the output schema for each logical node:
private Schema getOutputSchema(LogicalNode node) {
return switch (node) {
case LogicalScan scan -> {
TableMetadata table = catalog.getTableMetadata(scan.getTableName());
yield table.getSchema();
}
case LogicalFilter filter ->
getOutputSchema(filter.getChild()); // Passthrough
case LogicalJoin join -> {
Schema left = getOutputSchema(join.getLeft());
Schema right = getOutputSchema(join.getRight());
List<Schema.Column> columns = new ArrayList<>();
columns.addAll(left.getColumns());
columns.addAll(right.getColumns());
yield new Schema(columns);
}
case LogicalProject project -> {
List<Schema.Column> columns = new ArrayList<>();
for (String name : project.getColumnNames()) {
columns.add(new Schema.Column(name, DataType.VARCHAR));
}
yield new Schema(columns);
}
default -> throw new IllegalArgumentException("...");
};
}
Scan returns the table's schema directly. Filter passes through its child's schema (filtering doesn't change column layout). Join concatenates the schemas of both children - the combined tuple has all columns from the left followed by all columns from the right. This concatenation is what lets join condition evaluation work: when the hash join combines a left tuple and a right tuple, the resulting combined tuple has columns from both sides, matching the combined schema.
The Iterator model
Every physical operator implements the Iterator interface:
public interface Iterator {
void open(); // Initialize, allocate resources
Tuple next(); // Get next tuple, or null if done
void close(); // Clean up resources
String describe(); // For debugging
}
This is the Volcano model, named after the Volcano query processing system from the early 1990s. The model has three properties that make it elegant for a teaching implementation:
Pull-based. The consumer (parent) calls next() on the producer (child). Data flows upward through the tree, driven by demand. The executor calls next() on the root, which calls next() on its child, and so on down to the leaves. This means operators that produce fewer rows (like filters with selective predicates) naturally avoid doing unnecessary work.
One tuple at a time. Each next() call returns exactly one tuple (or null for end of stream). This keeps memory usage constant - you never materialize an entire intermediate result. The exception is hash join, which materializes one side to build the hash table, but that's inherent to the algorithm.
Composable. Any operator can sit above any other operator, as long as they agree on the tuple format. The filter doesn't know if its child is a scan, a join, or another filter - it just calls next() and evaluates the predicate. This composability is what lets us build arbitrary plan trees.
PhysicalScan: where data enters
The scan operator is the leaf of every physical plan. It reads tuples from a table one at a time:
public class PhysicalScan extends PhysicalNode implements Iterator {
private final String tableName;
private final Catalog catalog;
private TableMetadata table;
private java.util.Iterator<Tuple> dataIterator;
private boolean isOpen = false;
@Override
public void open() {
table = catalog.getTableMetadata(tableName);
dataIterator = Tuple.convert(table.getData().iterator());
isOpen = true;
}
@Override
public Tuple next() {
if (dataIterator.hasNext()) {
return dataIterator.next();
}
return null;
}
@Override
public void close() {
dataIterator = null;
table = null;
isOpen = false;
}
}
open() looks up the table in the catalog and creates a Java iterator over its rows. next() returns the next row, or null when exhausted. close() releases the references. The Tuple.convert call adapts the internal row representation (Map<Schema.Column, Object>) into Tuple objects that the rest of the pipeline expects.
In a real database, open() would set up a buffer pool cursor, next() would fetch pages from disk or the buffer cache, and close() would unpin buffers. The API is the same - the abstraction holds.
PhysicalFilter: evaluating predicates
The filter operator wraps its child and passes through only tuples that satisfy the predicate:
public class PhysicalFilter extends PhysicalNode implements Iterator {
private final Expression predicate;
private final PhysicalNode child;
private final Schema schema;
private Iterator childIterator;
@Override
public void open() {
childIterator = (Iterator) child;
childIterator.open();
isOpen = true;
}
@Override
public Tuple next() {
while (true) {
Tuple tuple = childIterator.next();
if (tuple == null) return null;
Object result = predicate.evaluate(tuple, schema);
if (result instanceof Boolean && (Boolean) result) {
return tuple;
}
}
}
@Override
public void close() {
childIterator.close();
childIterator = null;
isOpen = false;
}
}
The next() method is a loop: it keeps pulling tuples from the child until one passes the predicate or the child is exhausted. This is the pull-based model in action - the filter doesn't eagerly evaluate all tuples. It evaluates just enough to produce one output tuple per next() call.
The predicate.evaluate(tuple, schema) call is where the Expression system from Part 1 meets actual data. The schema tells the expression evaluator how to map column names to positions in the tuple.
PhysicalProject: selecting columns
Projection evaluates expressions to produce output tuples with only the selected columns:
public class PhysicalProject extends PhysicalNode implements Iterator {
private final List<Expression> projections;
private final List<String> columnNames;
private final PhysicalNode child;
private final Schema inputSchema;
@Override
public Tuple next() {
Tuple inputTuple = childIterator.next();
if (inputTuple == null) return null;
Tuple outputTuple = new Tuple();
for (Expression projection : projections) {
Attribute attr = new Attribute(
inputSchema.getColumn(
((Expression.ColumnRef) projection).columnName()),
projection.evaluate(inputTuple, inputSchema));
outputTuple.add(attr);
}
return outputTuple;
}
}
For each input tuple, the projection evaluates each expression in the select list and builds a new, narrower tuple with just the selected columns. This is a one-to-one mapping: one input tuple produces exactly one output tuple.
PhysicalNestedLoopJoin: the simple join
Nested loop join is the brute-force algorithm: for each left tuple, scan the entire right side looking for matches.
public class PhysicalNestedLoopJoin extends PhysicalNode implements Iterator {
private final PhysicalNode left;
private final PhysicalNode right;
private final Expression condition;
private final Schema combinedSchema;
private Iterator leftIterator;
private Iterator rightIterator;
private Tuple currentLeftTuple;
@Override
public void open() {
leftIterator = (Iterator) left;
rightIterator = (Iterator) right;
leftIterator.open();
rightIterator.open();
currentLeftTuple = leftIterator.next();
isOpen = true;
}
@Override
public Tuple next() {
while (currentLeftTuple != null) {
Tuple rightTuple = rightIterator.next();
if (rightTuple != null) {
Tuple combined = combineTuples(currentLeftTuple, rightTuple);
Object result = condition.evaluate(combined, combinedSchema);
if (result instanceof Boolean && (Boolean) result) {
return combined;
}
} else {
// Exhausted right side - advance left, rewind right
currentLeftTuple = leftIterator.next();
if (currentLeftTuple != null) {
rightIterator.close();
rightIterator.open();
}
}
}
return null;
}
}
The state machine is straightforward. We maintain a currentLeftTuple and iterate through the right side. When the right side is exhausted, we advance to the next left tuple and rewind the right side by closing and reopening it. This is the key cost of nested loop join: the right side is scanned once for every left tuple, giving O(left × right) comparisons.
The combineTuples method creates a merged tuple by concatenating the attributes from both sides:
private Tuple combineTuples(Tuple left, Tuple right) {
Tuple combined = new Tuple();
combined.addAll(left);
combined.addAll(right);
return combined;
}
This combined tuple is evaluated against the join condition using the combined schema - the same schema that PhysicalPlanBuilder.getOutputSchema produced by concatenating the left and right schemas.
PhysicalHashJoin: the efficient equi-join
Hash join exploits equality conditions to avoid the quadratic comparison pattern. It works in two phases: build and probe.
public class PhysicalHashJoin extends PhysicalNode implements Iterator {
private final PhysicalNode left; // Probe side
private final PhysicalNode right; // Build side
private Map<Object, List<Tuple>> hashTable;
private Tuple currentLeftTuple;
private java.util.Iterator<Tuple> currentMatches;
private Schema.Column buildColumn;
private Schema.Column probeColumn;
@Override
public void open() {
leftIterator = (Iterator) left;
rightIterator = (Iterator) right;
// Build phase: hash all right-side tuples
buildHashTable();
// Probe phase: open left side
leftIterator.open();
currentLeftTuple = null;
currentMatches = null;
isOpen = true;
}
}
Build phase
The build phase reads the entire right side and inserts each tuple into a hash table, keyed on the join column:
private void buildHashTable() {
hashTable = new HashMap<>();
rightIterator.open();
Tuple tuple;
while ((tuple = rightIterator.next()) != null) {
Object key = tuple.find(buildColumn);
hashTable.computeIfAbsent(key, k -> new ArrayList<>()).add(tuple);
}
rightIterator.close();
}
After this phase, the hash table maps each distinct join key value to the list of right-side tuples with that value. If the orders table has customer_id values [1, 1, 2, 3, 3, 5], the hash table will have entries like {1: [order1, order2], 2: [order3], 3: [order4, order5], 5: [order6]}.
Probe phase
The probe phase reads left-side tuples one at a time and looks up their join key in the hash table:
@Override
public Tuple next() {
while (true) {
// Return pending matches from current left tuple
if (currentMatches != null && currentMatches.hasNext()) {
Tuple rightTuple = currentMatches.next();
return combineTuples(currentLeftTuple, rightTuple);
}
// Get next left tuple
currentLeftTuple = leftIterator.next();
if (currentLeftTuple == null) return null;
// Probe hash table
Object probeKey = currentLeftTuple.find(probeColumn);
List<Tuple> matches = hashTable.get(probeKey);
if (matches != null && !matches.isEmpty()) {
currentMatches = matches.iterator();
}
}
}
For each left tuple, we extract the probe key, look it up in the hash table in O(1), and iterate through the matching right tuples. A left tuple with no matches (no entry in the hash table) is skipped immediately - no scanning of the right side at all. This is why hash join is O(left + right) instead of O(left × right).
The state between next() calls is managed with currentLeftTuple and currentMatches. A single left tuple might match multiple right tuples (one-to-many join), so we iterate through currentMatches across multiple next() calls before advancing to the next left tuple.
Join column extraction
The constructor extracts which columns to hash on from the join condition:
private void extractJoinColumns() {
if (condition instanceof Expression.BinaryOp binOp) {
if (binOp.left() instanceof Expression.ColumnRef leftCol &&
binOp.right() instanceof Expression.ColumnRef rightCol) {
try {
probeColumn = leftSchema.getColumn(leftCol.columnName());
buildColumn = rightSchema.getColumn(rightCol.columnName());
} catch (Exception e) {
// Try reversed
probeColumn = leftSchema.getColumn(rightCol.columnName());
buildColumn = rightSchema.getColumn(leftCol.columnName());
}
}
}
}
This resolves customers.id = orders.customer_id into: probe on the id column from the left schema, build on the customer_id column from the right schema. The try/catch handles the case where the condition references columns in the opposite order from what we expect - orders.customer_id = customers.id should work the same way.
The Executor
The executor is the driver that pulls tuples out of the physical plan:
public class Executor {
public record ExecutionResult(
List<Tuple> tuples,
long executionTimeMs,
long tuplesProcessed
) {}
public ExecutionResult execute(PhysicalNode plan) {
if (!(plan instanceof Iterator iterator)) {
throw new IllegalArgumentException(
"Physical plan must implement Iterator");
}
List<Tuple> results = new ArrayList<>();
long tuplesProcessed = 0;
long startTime = System.currentTimeMillis();
try {
iterator.open();
Tuple tuple;
while ((tuple = iterator.next()) != null) {
results.add(tuple);
tuplesProcessed++;
}
} finally {
iterator.close();
}
long executionTimeMs = System.currentTimeMillis() - startTime;
return new ExecutionResult(results, executionTimeMs, tuplesProcessed);
}
}
The pattern is simple: open, pull until null, close. The finally block ensures cleanup even if an operator throws. The ExecutionResult record bundles the result tuples with timing and statistics - useful for the demo's performance comparisons.
There's also a executeWithLimit variant that stops after collecting a fixed number of rows:
public ExecutionResult executeWithLimit(PhysicalNode plan, int limit) {
// ... same setup ...
Tuple tuple;
while ((tuple = iterator.next()) != null && results.size() < limit) {
results.add(tuple);
tuplesProcessed++;
}
// ... same cleanup ...
}
This is the pull-based model's greatest strength: stopping early is trivial. The executor just stops calling next(). No operator needs to know about the limit - they simply never get asked for more tuples, and close() cleans them up.
End-to-end: SQL to results
Let's trace a complete query through the entire pipeline:
String sql = "SELECT c.name, o.amount FROM customers c " +
"INNER JOIN orders o ON c.id = o.customer_id " +
"WHERE c.age > 30 AND o.amount > 100";
Step 1: Parse -> AST with two select items, a join, and two AND'd predicates.
Step 2: Logical plan -> Canonical form with split filters:
|-- Project[name, amount]
|-- Filter[(o.amount > 100)]
|-- Filter[(c.age > 30)]
|-- Join[INNER, (c.id = o.customer_id)]
|-- Scan[customers]
|-- Scan[orders]
Step 3: Optimize -> Predicate pushdown moves filters below the join:
|-- Project[name, amount]
|-- Join[INNER, (c.id = o.customer_id)]
|---Filter[(c.age > 30)]
| |-- Scan[customers]
|---Filter[(o.amount > 100)]
|-- Scan[orders]
Step 4: Physical plan -> Hash join chosen for the equi-join condition:
|-- PhysicalProject[name, amount]
|--PhysicalHashJoin[(c.id = o.customer_id)]
|---PhysicalFilter[(c.age > 30)]
| |-- PhysicalScan[customers]
|---PhysicalFilter[(o.amount > 100)]
|-- PhysicalScan[orders]
Step 5: Execute -> The executor calls open() on the root, which cascades down:
PhysicalProject.open()callsPhysicalHashJoin.open()- Hash join's build phase: opens
PhysicalFilter.open()->PhysicalScan[orders].open(). Pulls all order tuples, filters toamount > 100, hashes surviving tuples bycustomer_id. - Hash join's probe phase: opens
PhysicalFilter.open()->PhysicalScan[customers].open().
Then the executor calls next() on the root repeatedly:
PhysicalProject.next()callsPhysicalHashJoin.next()- Hash join calls
PhysicalFilter[age > 30].next(), which pulls fromPhysicalScan[customers]until it finds a customer withage > 30. - Hash join probes the hash table with the customer's
id. If there are matching orders (amount > 100), it produces combined tuples. PhysicalProjectextracts justnameandamountfrom the combined tuple.- The executor collects the result.
This cascading pull continues until the customer scan is exhausted, at which point null propagates up through the tree and the executor stops.
What we built, and what we didn't
Over six posts, we built a query optimizer that handles a real (if narrow) slice of SQL. Let's take stock of what works and what doesn't.
What works. Parsing a meaningful SQL subset. Building canonical logical plans. Four optimization rules (predicate pushdown, projection pushdown, filter merge, greedy join reorder). A configurable cost model with operator-specific formulas. Cardinality estimation with NDV-based selectivity and histograms. Dynamic programming join ordering. Hardware cost calibration. Two join algorithms (hash join and nested loop). Full iterator-model execution with actual query results.
What such a simplified implementation trades away. There are substantial gaps compared to a production system. No index support (index scans, index-only scans, index joins). No sort operator (needed for merge join, ORDER BY, and efficient GROUP BY). No memory management or spill-to-disk. No outer joins, subqueries, or CTEs. No parallelism. No transaction isolation. No buffer pool or I/O scheduling. No adaptive execution. No plan caching. Type checking is minimal, and error handling is optimistic.
Where to go from here. The architecture supports extension in several directions. Adding a PhysicalSort operator and PhysicalMergeJoin would demonstrate sort-based processing. Adding index structures (B-tree or hash index) and an IndexScan operator would show how the optimizer chooses between access paths. Implementing the Cascades framework instead of the rule engine would demonstrate exploration-based optimization. Adding EXPLAIN output (instead of pretty-printed plan trees) would make the system feel more like a real database.
The most important thing isn't any single feature - it's the architecture. The layered pipeline (catalog -> parser -> logical -> optimizer -> physical -> executor), the canonical form that makes rules simple, the annotation system that carries metadata through the tree, the iterator model that makes execution composable - these are the same ideas that power PostgreSQL, SQL Server, CockroachDB, and every other relational database. The sophistication grows within these same abstractions.
This concludes the series. The full source code is at github.com/tripab/query-optimizer.