Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
],
"require": {
"php": ">=8.1",
"cycle/database": "^2.8.1",
"cycle/database": "^2.17.0",
"doctrine/instantiator": "^1.3.1 || ^2.0",
"spiral/core": "^2.8 || ^3.0"
},
Expand All @@ -52,6 +52,7 @@
"mockery/mockery": "^1.1",
"phpunit/phpunit": "^9.5",
"ramsey/uuid": "^4.0",
"roxblnfk/unpoly": "^1.8",
"spiral/code-style": "~2.2.0",
"spiral/tokenizer": "^2.8 || ^3.0",
"vimeo/psalm": "^6.0"
Expand Down
151 changes: 151 additions & 0 deletions src/Select.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

namespace Cycle\ORM;

use Cycle\Database\Driver\CursorOptions;
use Cycle\Database\Injection\FragmentInterface;
use Cycle\Database\Injection\Parameter;
use Cycle\Database\Query\SelectQuery;
use Cycle\Database\StatementInterface;
use Cycle\ORM\Heap\Node;
use Cycle\ORM\Select\Options\LoadOptions;
use Cycle\ORM\Service\EntityFactoryInterface;
Expand Down Expand Up @@ -867,6 +869,116 @@ public function getIterator(bool $findInHeap = false): Iterator
);
}

/**
* Stream entities from the database using a server-side cursor.
*
* The returned generator pulls rows lazily, hydrates them into entities, and
* yields one entity at a time. Memory usage is bound by `$chunkSize` parents
* plus the heap (which the caller is responsible for clearing between batches
* via `$orm->getHeap()->clean()` if needed).
*
* Chunking happens at **parent boundaries**, not strict row counts. When a
* relation is loaded inline and multiplies rows (HAS_MANY / MANY_TO_MANY), one
* parent corresponds to several rows. A chunk is closed only when a new parent
* PK appears in the stream, so a parent's full set of joined rows is never
* split. A chunk may therefore contain more than `$chunkSize` rows when
* children are present, but never more than `$chunkSize` distinct parents.
*
* For correctness with inline HAS_MANY / MANY_TO_MANY, rows of the same parent
* must arrive contiguously. The cursor appends the root primary key to the
* query's ORDER BY clause to guarantee this in the common case (no ORDER BY,
* or ORDER BY on parent columns). If you order by a joined child column, the
* scatter is not auto-fixed β€” order by parent columns instead.
*
* Requirements:
* - The underlying driver must implement {@see \Cycle\Database\Driver\CursorInterface}
* (Postgres, SQLite, SQL Server). Other drivers throw a {@see \Cycle\Database\Exception\DriverException}.
* - An active transaction is required on the underlying database before
* iteration starts.
*
* @param int<1, max> $chunkSize Maximum number of distinct parent entities
* per chunk. Controls when the ORM flushes the parser node and yields
* a batch of hydrated entities. This is **independent** of any
* DBAL-level chunk knob (e.g. Postgres `FETCH FORWARD N`), which is
* configured via `$options`.
* @param CursorOptions|null $options Driver-specific cursor configuration forwarded
* to {@see \Cycle\Database\Database::cursor()}. Use driver-specific subclasses
* ({@see \Cycle\Database\Driver\Postgres\PostgresCursorOptions},
* {@see \Cycle\Database\Driver\SQLServer\SQLServerCursorOptions}) to tune
* Postgres FETCH FORWARD size, WITH HOLD, SQL Server cursor type, etc.
* Row fetch mode is forced to `FETCH_NUM` internally β€” the parser expects
* positional rows.
*
* Entity identity: if a yielded row corresponds to a PK already attached to the
* heap, the same instance is returned. Fresh data from the current row is merged
* into previously-unresolved relations ({@see \Cycle\ORM\Reference\ReferenceInterface})
* via {@see EntityFactoryInterface::make()}.
*
* @return \Generator<int, TEntity>
*/
public function cursor(int $chunkSize = 1000, CursorOptions $options = new CursorOptions()): \Generator
{
$query = $this->buildQuery();

// Append root PK to ORDER BY so rows for the same parent stay contiguous
// in the stream. PK is unique, so appending it never alters an existing
// user-defined order β€” it only resolves ties.
$pk = $this->loader->getPK();
foreach ((array) $pk as $column) {
$query->orderBy($column);
}

$database = $this->loader->getSource()->getDatabase();
$role = $this->loader->getTarget();
$loader = $this->loader;
$extractPk = $this->buildPkExtractor();

$rows = static function () use ($database, $query, $chunkSize, $loader, $extractPk, $options): \Generator {
$node = $loader->createNode();
$lastPk = null;
$hasLast = false;
$parentCount = 0;

foreach ($database->cursor($query, $options, StatementInterface::FETCH_NUM) as $row) {
$rowPk = $extractPk($row);

if (!$hasLast || $rowPk !== $lastPk) {
if ($parentCount >= $chunkSize) {
// Current row starts a new parent; previous chunk is fully complete.
$loader->loadChildren($node, true);
yield from $node->getResult();
$node = $loader->createNode();
$parentCount = 0;
}
$parentCount++;
$lastPk = $rowPk;
$hasLast = true;
}

$node->parseRow(0, $row);
}

if ($parentCount > 0) {
$loader->loadChildren($node, true);
yield from $node->getResult();
}
};

yield from Iterator::createWithServices(
$this->heap,
$this->schema,
$this->entityFactory,
$role,
$rows(),
// Hardcoded false β€” do NOT expose as a parameter. Iterator's findInHeap=true
// takes a fast path that returns heap-attached entities untouched, skipping
// the merge of fresh row data into previously-unresolved Reference relations.
// For cursor streaming we always want the latest row to be merged in.
findInHeap: false,
typecast: true,
);
}

/**
* Load data tree from database and linked loaders in a form of array.
*
Expand Down Expand Up @@ -941,6 +1053,45 @@ protected function loadData(bool $addRole = true): array
return $node->getResult();
}

/**
* Build a closure that pulls the root primary key value out of a FETCH_NUM row
* produced by the cursor query. For composite PKs, the values are joined with a
* NUL separator into a single comparable string.
*
* @return \Closure(array): (int|string|float|null)
*/
private function buildPkExtractor(): \Closure
{
$columnNames = $this->loader->getColumnNames();
$pkFields = $this->loader->getPrimaryFields();

$positions = [];
foreach ($pkFields as $field) {
$idx = \array_search($field, $columnNames, true);
if ($idx === false) {
throw new \LogicException(\sprintf(
'Cursor cannot locate primary key column `%s` among root loader columns [%s].',
$field,
\implode(', ', $columnNames),
));
}
$positions[] = $idx;
}

if (\count($positions) === 1) {
$p = $positions[0];
return static fn(array $row): int|string|float|null => $row[$p];
}

return static function (array $row) use ($positions): string {
$parts = [];
foreach ($positions as $i) {
$parts[] = (string) $row[$i];
}
return \implode("\0", $parts);
};
}

/**
* @param list<non-empty-string> $pk
* @param list<array|int|object|string> $args
Expand Down
47 changes: 43 additions & 4 deletions src/Select/RootLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ public function getQuery(): SelectQuery
return $this->query;
}

/**
* Return the ordered list of column names produced by the root loader's
* SELECT clause. Positions in this list correspond to positions in a
* FETCH_NUM row from {@see buildQuery()} β€” useful for callers that want
* to inspect raw rows before they reach the parser (e.g. cursor-based
* streaming with parent-boundary chunking).
*
* @return non-empty-string[]
*/
public function getColumnNames(): array
{
return $this->columnNames();
}

/**
* Compile query with all needed conditions, columns and etc.
*/
Expand All @@ -114,13 +128,38 @@ public function loadData(AbstractNode $node, bool $includeRole = false): void
{
$statement = $this->buildQuery()->run();

foreach ($statement->fetchAll(StatementInterface::FETCH_NUM) as $row) {
$node->parseRow(0, $row);
}
$this->parseRows($node, $statement->fetchAll(StatementInterface::FETCH_NUM));

$statement->close();

// loading child datasets
$this->loadChildren($node, $includeRole);
}

/**
* Push a batch of raw rows through the parser into the given node.
*
* Separated from {@see loadData()} so callers that produce their own row stream
* (e.g. cursor-based streaming) can reuse the parsing step independently of
* query execution and child-loader orchestration.
*
* @param iterable<array<int, mixed>> $rows Rows in FETCH_NUM (positional) shape.
*/
public function parseRows(AbstractNode $node, iterable $rows): void
{
foreach ($rows as $row) {
$node->parseRow(0, $row);
}
}

/**
* Run all child loaders (POSTLOAD relations, inheritance) against the given node.
*
* Designed to be called after {@see parseRows()} on a node that already holds
* the parent rows of a chunk. Child loaders aggregate parent keys from the node's
* index and issue their own queries.
*/
public function loadChildren(AbstractNode $node, bool $includeRole = false): void
{
foreach ($this->load as $relation => $loader) {
$loader->loadData($node->getNode($relation), $includeRole);
}
Expand Down
Loading
Loading