Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# syntax=docker/dockerfile:1
ARG ROAD_RUNNER_IMAGE=2024.2.1
ARG CENTRIFUGO_IMAGE=v4
ARG DOLT_IMAGE=1.42.8
Expand Down Expand Up @@ -29,7 +30,7 @@ RUN echo $CACHE_BUST
# Build RoadRunner with velox. The GitHub token is mounted as a secret
# and never stored in image layers.
RUN --mount=type=secret,id=gh_token \
RT_TOKEN=$(cat /run/secrets/gh_token) \
export GH_TOKEN="$(cat /run/secrets/gh_token 2>/dev/null | tr -d '\n' || true)" && \
vx build -c velox.toml -o /usr/bin/

# Build JS files
Expand Down
12 changes: 11 additions & 1 deletion .docker/velox.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,14 @@ repository = "tcp"
[github.plugins.smtp-server]
ref = "2.0.0"
owner = "buggregator"
repository = "smtp-server"
repository = "smtp-server"

[github.plugins.var-dumper-server]
ref = "1.0.1"
owner = "buggregator"
repository = "var-dumper-server"

[github.plugins.profiler-server]
ref = "1.0.0"
owner = "buggregator"
repository = "profiler-server"
28 changes: 24 additions & 4 deletions .rr-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ tcp:
# Address to listen.
addr: ${RR_TCP_MONOLOG_ADDR:-:9913}
delimiter: "\n"
var-dumper:
# Address to listen.
addr: ${RR_TCP_VAR_DUMPER_ADDR:-:9912}
delimiter: "\n"
# Chunks that RR uses to read the data. In bytes.
# If you expect big payloads on a TCP server, to reduce `read` syscalls,
# would be a good practice to use a fairly big enough buffer.
Expand All @@ -77,15 +73,32 @@ kv:
driver: memory
config: { }

var-dumper:
addr: ${RR_VAR_DUMPER_ADDR:-:9912}
max_message_size: 10485760
jobs:
pipeline: "var-dumper"
auto_ack: true

jobs:
consume:
- smtp
- var-dumper
- profiler
pipelines:
smtp:
driver: memory
config:
priority: 10
prefetch: 10
var-dumper:
driver: memory
config:
priority: 10
profiler:
driver: memory
config:
priority: 10
pool:
num_workers: ${RR_JOBS_NUM_WORKERS:-1}

Expand All @@ -95,6 +108,13 @@ smtp:
jobs:
pipeline: smtp

profiler:
addr: ${RR_PROFILER_ADDR:-:9914}
max_request_size: 52428800
jobs:
pipeline: "profiler"
auto_ack: true

service:
nginx:
service_name_in_log: true
Expand Down
31 changes: 25 additions & 6 deletions .rr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ tcp:
monolog:
addr: 127.0.0.1:9913
delimiter: "\n"
var-dumper:
addr: 127.0.0.1:9912
delimiter: "\n"
smtp:
addr: 127.0.0.1:1025
pool:
num_workers: 2

Expand All @@ -47,11 +42,35 @@ kv:
driver: memory
config: { }

var-dumper:
addr: 127.0.0.1:9912
jobs:
pipeline: "var-dumper"
auto_ack: true

jobs:
consume: [ ]
pipelines:
var-dumper:
driver: memory
config:
priority: 10
profiler:
driver: memory
config:
priority: 10
smtp:
driver: memory
config:
priority: 10
consume: ["var-dumper", "profiler", "smtp"]
pool:
num_workers: 1

profiler:
addr: 127.0.0.1:9914
jobs:
pipeline: "profiler"
auto_ack: true

smtp:
addr: ${RR_SMTP_ADDR:-:1025}
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pull-latest:
docker compose pull;

build-server:
docker compose build buggregator-server --no-cache;
DOCKER_BUILDKIT=1 docker compose build buggregator-server --no-cache;

# ====== Database ========

Expand Down
2 changes: 2 additions & 0 deletions app/config/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
'registry' => [
'handlers' => [
'smtp.email' => EmailHandler::class,
'vardumper.dump' => \Modules\VarDumper\Interfaces\Jobs\DumpHandler::class,
'profiler.profile' => \Modules\Profiler\Interfaces\Jobs\ProfileHandler::class,
],
'serializers' => [
WebhookHandler::class => 'symfony-json',
Expand Down
5 changes: 0 additions & 5 deletions app/config/tcp.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@
declare(strict_types=1);

use App\Application\TCP\ExceptionHandlerInterceptor;
use Modules\VarDumper\Interfaces\TCP\Service as VarDumperService;
use Modules\Monolog\Interfaces\TCP\Service as MonologService;

return [
'services' => [
'var-dumper' => VarDumperService::class,
'monolog' => MonologService::class,
],

'interceptors' => [
'var-dumper' => [
ExceptionHandlerInterceptor::class,
],
'monolog' => [
ExceptionHandlerInterceptor::class,
],
Expand Down
128 changes: 128 additions & 0 deletions app/modules/Profiler/Interfaces/Jobs/ProfileHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php

declare(strict_types=1);

namespace Modules\Profiler\Interfaces\Jobs;

use App\Application\Commands\HandleReceivedEvent;
use App\Application\Domain\ValueObjects\Uuid;
use Cycle\ORM\EntityManagerInterface;
use Modules\Profiler\Domain\Edge\Cost;
use Modules\Profiler\Domain\Edge\Diff;
use Modules\Profiler\Domain\Edge\Percents;
use Modules\Profiler\Domain\EdgeFactoryInterface;
use Modules\Profiler\Domain\Profile\Peaks;
use Modules\Profiler\Domain\ProfileFactoryInterface;
use Spiral\Core\InvokerInterface;
use Spiral\Cqrs\CommandBusInterface;
use Spiral\Queue\JobHandler;

/**
* Handles pre-processed profile events from the RoadRunner Profiler plugin.
* The Go plugin already computed peaks, diffs, edges tree, and percentages —
* this handler only persists them to the database and dispatches the event.
*/
final class ProfileHandler extends JobHandler
{
private const BATCH_SIZE = 100;

public function __construct(
private readonly ProfileFactoryInterface $profileFactory,
private readonly EdgeFactoryInterface $edgeFactory,
private readonly EntityManagerInterface $em,
private readonly CommandBusInterface $bus,
InvokerInterface $invoker,
) {
parent::__construct($invoker);
}

public function invoke(array $payload): void
{
$profileUuid = Uuid::generate();

// Create Profile entity with peaks from Go plugin
$peaks = $payload['peaks'] ?? [];
$profile = $this->profileFactory->create(
name: $payload['app_name'] ?? 'unknown',
tags: $payload['tags'] ?? [],
peaks: new Peaks(
cpu: $peaks['cpu'] ?? 0,
wt: $peaks['wt'] ?? 0,
ct: $peaks['ct'] ?? 0,
mu: $peaks['mu'] ?? 0,
pmu: $peaks['pmu'] ?? 0,
),
);

$this->em->persist($profile)->run();

// Store pre-processed edges
$edges = $payload['edges'] ?? [];
$parents = [];
$batchSize = 0;
$order = 0;

foreach ($edges as $id => $edge) {
$cost = $edge['cost'] ?? [];
$diff = $edge['diff'] ?? [];
$pcts = $edge['percents'] ?? [];

$edgeEntity = $this->edgeFactory->create(
profileUuid: $profile->getUuid(),
order: $order++,
cost: new Cost(
cpu: $cost['cpu'] ?? 0,
wt: $cost['wt'] ?? 0,
ct: $cost['ct'] ?? 0,
mu: $cost['mu'] ?? 0,
pmu: $cost['pmu'] ?? 0,
),
diff: new Diff(
cpu: $diff['d_cpu'] ?? 0,
wt: $diff['d_wt'] ?? 0,
ct: $diff['d_ct'] ?? 0,
mu: $diff['d_mu'] ?? 0,
pmu: $diff['d_pmu'] ?? 0,
),
percents: new Percents(
cpu: $pcts['p_cpu'] ?? 0.0,
wt: $pcts['p_wt'] ?? 0.0,
ct: $pcts['p_ct'] ?? 0.0,
mu: $pcts['p_mu'] ?? 0.0,
pmu: $pcts['p_pmu'] ?? 0.0,
),
callee: $edge['callee'],
caller: $edge['caller'] ?? null,
parentUuid: isset($edge['parent']) ? ($parents[$edge['parent']] ?? null) : null,
);

$this->em->persist($edgeEntity);
$parents[$id] = $edgeEntity->getUuid();

$batchSize++;
if ($batchSize >= self::BATCH_SIZE) {
$this->em->run();
$batchSize = 0;
}
}

$this->em->run();

// Dispatch event for broadcasting and storage in events table
$this->bus->dispatch(
new HandleReceivedEvent(
type: 'profiler',
payload: [
'profile_uuid' => (string) $profile->getUuid(),
'peaks' => $peaks,
'tags' => $payload['tags'] ?? [],
'app_name' => $payload['app_name'] ?? 'unknown',
'hostname' => $payload['hostname'] ?? 'unknown',
'date' => $payload['date'] ?? 0,
'total_edges' => \count($edges),
],
uuid: $profileUuid,
),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace Modules\VarDumper\Interfaces\TCP;
namespace Modules\VarDumper\Interfaces\Jobs;

use App\Application\Commands\HandleReceivedEvent;
use Modules\VarDumper\Application\Dump\BodyInterface;
Expand All @@ -12,36 +12,33 @@
use Modules\VarDumper\Application\Dump\MessageParser;
use Modules\VarDumper\Application\Dump\ParsedPayload;
use Modules\VarDumper\Application\Dump\PrimitiveBody;
use Spiral\Core\InvokerInterface;
use Spiral\Cqrs\CommandBusInterface;
use Spiral\RoadRunner\Tcp\Request;
use Spiral\RoadRunner\Tcp\TcpEvent;
use Spiral\RoadRunnerBridge\Tcp\Response\ContinueRead;
use Spiral\RoadRunnerBridge\Tcp\Response\ResponseInterface;
use Spiral\RoadRunnerBridge\Tcp\Service\ServiceInterface;
use Spiral\Queue\JobHandler;
use Symfony\Component\VarDumper\Cloner\Data;

final readonly class Service implements ServiceInterface
final class DumpHandler extends JobHandler
{
public function __construct(
private CommandBusInterface $commandBus,
private DumpIdGeneratorInterface $dumpId,
) {}
private readonly CommandBusInterface $commandBus,
private readonly DumpIdGeneratorInterface $dumpId,
InvokerInterface $invoker,
) {
parent::__construct($invoker);
}

public function handle(Request $request): ResponseInterface
public function invoke(array $payload): void
{
if ($request->event === TcpEvent::Connected) {
return new ContinueRead();
}

$messages = \array_filter(\explode("\n", $request->body));
// RR VarDumper plugin sends: { event, uuid, payload (base64), context, ... }
$message = $payload['payload'] ?? '';

foreach ($messages as $message) {
$payload = (new MessageParser())->parse($message);

$this->fireEvent($payload);
if ($message === '') {
return;
}

return new ContinueRead();
$parsed = (new MessageParser())->parse($message);

$this->fireEvent($parsed);
}

private function fireEvent(ParsedPayload $payload): void
Expand All @@ -58,7 +55,7 @@ private function fireEvent(ParsedPayload $payload): void
);
}

private function convertToPrimitive(Data $data): BodyInterface|null
private function convertToPrimitive(Data $data): BodyInterface
{
if (\in_array($data->getType(), ['string', 'boolean', 'integer', 'double'])) {
return new PrimitiveBody(
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.1'

services:
buggregator-reverse-proxy:
image: traefik:v2.9
image: traefik:v3.6
command:
- "--accesslog"
- "--api.insecure=true"
Expand Down
Loading
Loading