Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cmd/server_foreground.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,37 @@ func startRuntimeBroker(ctx context.Context, cmd *cobra.Command, cfg *config.Glo
log.Printf("Runtime broker using runtime: %s", rt.Name())

mgr := agent.NewManager(rt)

// Initialize port pool from broker settings
if versionedSettings, _, err := config.LoadEffectiveSettings(""); err == nil &&
versionedSettings != nil && versionedSettings.Server != nil &&
versionedSettings.Server.Broker != nil && versionedSettings.Server.Broker.PortPool != nil {
pp := versionedSettings.Server.Broker.PortPool
if pp.Enabled == nil || *pp.Enabled {
portRange := pp.Range
if portRange == "" {
portRange = "8000-9000"
}
perAgent := pp.PortsPerAgent
if perAgent <= 0 {
perAgent = 2
}
pMin, pMax, err := api.ParsePortRange(portRange)
if err != nil {
log.Printf("Warning: invalid port_pool range %q: %v", portRange, err)
} else {
pool, poolErr := runtime.NewPortPool(pMin, pMax, perAgent, strings.TrimRight(pp.HostURL, "/"))
if poolErr != nil {
log.Printf("Warning: failed to initialize port pool: %v", poolErr)
} else {
mgr.PortPool = pool
log.Printf("Port pool initialized: range %d-%d (%d ports, %d agents max), %d per agent, host_url=%q",
pMin, pMax, pool.Total(), pool.Total()/perAgent, perAgent, pp.HostURL)
}
}
Comment on lines +1308 to +1317

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If pp.HostURL is configured with a trailing slash (e.g., http://35.232.118.211/), the constructed agent port URLs will contain an invalid double-slash/colon sequence (e.g., http://35.232.118.211/:8000).

Trimming any trailing slashes from the host URL during initialization ensures that the constructed URLs are always valid.

			} else {
				hostURL := strings.TrimRight(pp.HostURL, "/")
				mgr.PortPool = runtime.NewPortPool(pMin, pMax, perAgent, hostURL)
				log.Printf("Port pool initialized: range %d-%d, %d ports per agent, host_url=%q", pMin, pMax, perAgent, hostURL)
			}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already addressed — strings.TrimRight(pp.HostURL, "/") is applied when creating the PortPool in server_foreground.go (see commit 0da4a06).

}
}

settings := brokerSettings

// Try loading versioned settings to get broker identity from server.broker
Expand Down
26 changes: 23 additions & 3 deletions pkg/agent/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ type Manager interface {

type AgentManager struct {
Runtime runtime.Runtime
PortPool *runtime.PortPool
msgBuffer *MessageBuffer
}

// defaultBufferDelay is the debounce window for message delivery.
// Messages arriving within this window are coalesced into a single delivery.
const defaultBufferDelay = 2 * time.Second

func NewManager(rt runtime.Runtime) Manager {
func NewManager(rt runtime.Runtime) *AgentManager {
mgr := &AgentManager{
Runtime: rt,
}
Expand Down Expand Up @@ -113,13 +114,29 @@ func (m *AgentManager) Stop(ctx context.Context, agentID string, projectPath str
if stopProjectName != "" && !matchAgentProject(a, stopProjectName, "") {
continue
}
return m.Runtime.Stop(ctx, a.ContainerID)
if stopErr := m.Runtime.Stop(ctx, a.ContainerID); stopErr != nil {
return stopErr
}
if m.PortPool != nil {
// Release using the user-facing agent name (agentID),
// which matches the key used in Start()'s Allocate call.
// a.Name is the container name (e.g. "project--agent")
// and would never match the port pool key.
m.PortPool.Release(agentID)
}
return nil
}
}
}
// Fallback: agentID may already be a container ID, or the list
// failed — pass it through directly.
return m.Runtime.Stop(ctx, agentID)
if stopErr := m.Runtime.Stop(ctx, agentID); stopErr != nil {
return stopErr
}
if m.PortPool != nil {
m.PortPool.Release(agentID)
}
return nil
}

func (m *AgentManager) Delete(ctx context.Context, agentID string, deleteFiles bool, projectPath string, removeBranch bool) (bool, error) {
Expand Down Expand Up @@ -171,6 +188,9 @@ func (m *AgentManager) Delete(ctx context.Context, agentID string, deleteFiles b
return false, fmt.Errorf("failed to delete container: %w", err)
}
util.Debugf("delete: runtime delete completed for container %s", targetID)
if m.PortPool != nil {
m.PortPool.Release(agentID)
}
}

if deleteFiles {
Expand Down
29 changes: 29 additions & 0 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,22 @@ func (m *AgentManager) Start(ctx context.Context, opts api.StartOptions) (*api.A
}
}

// Allocate ports from the pool if available
var allocatedPorts []int
if m.PortPool != nil {
// Release any previously allocated ports for this agent name.
// This handles the restart case (stop + start) where the old
// ports were never freed because the previous run didn't go
// through Delete. Release is idempotent — a no-op if the name
// has no allocation.
m.PortPool.Release(opts.Name)
ports, err := m.PortPool.Allocate(opts.Name, m.PortPool.PerAgent())
if err != nil {
return nil, fmt.Errorf("port allocation failed: %w", err)
}
allocatedPorts = ports
}
Comment on lines +821 to +833

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When an agent container is recreated or restarted, the existing container is deleted via m.Runtime.Delete directly, which bypasses the manager's Delete method and does not release the allocated ports back to the pool. Consequently, calling Allocate again for the same agent name will leak the previously allocated ports, eventually leading to port pool exhaustion.

To prevent this resource leak, release any existing ports for the agent name from the pool before allocating new ones.

Suggested change
if m.PortPool != nil {
ports, err := m.PortPool.Allocate(opts.Name, m.PortPool.PerAgent())
if err != nil {
return nil, fmt.Errorf("port allocation failed: %w", err)
}
allocatedPorts = ports
}
if m.PortPool != nil {
m.PortPool.Release(opts.Name)
ports, err := m.PortPool.Allocate(opts.Name, m.PortPool.PerAgent())
if err != nil {
return nil, fmt.Errorf("port allocation failed: %w", err)
}
allocatedPorts = ports
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already addressed — Release is called before Allocate in the Start path to handle the restart case where old ports were not freed.


runCfg := runtime.RunConfig{
Name: containerName(projectName, opts.Name),
Template: template,
Expand Down Expand Up @@ -890,6 +906,13 @@ func (m *AgentManager) Start(ctx context.Context, opts api.StartOptions) (*api.A
MetadataInterception: hasMetadataInterception(agentEnv),
ExtraHosts: mergeExtraHosts(opts.ExtraHosts, runtime.BridgeExtraHosts(m.Runtime.Name(), agentEnv)),
NetworkMode: dockerNetworkMode,
AllocatedPorts: allocatedPorts,
PortHostURL: func() string {
if m.PortPool != nil {
return m.PortPool.HostURL()
}
return ""
}(),
Labels: func() map[string]string {
l := map[string]string{
"scion.agent": "true",
Expand Down Expand Up @@ -919,6 +942,9 @@ func (m *AgentManager) Start(ctx context.Context, opts api.StartOptions) (*api.A
}
id, err := m.Runtime.Run(ctx, runCfg)
if err != nil {
if m.PortPool != nil {
m.PortPool.Release(opts.Name)
}
// Provisioning writes agent-info.json in "created" state before the
// runtime launch. If the launch itself fails, keep the provisioned
// workspace but flip the local state to "error" so list/status do not
Expand Down Expand Up @@ -948,6 +974,9 @@ func (m *AgentManager) Start(ctx context.Context, opts api.StartOptions) (*api.A
// Try to get logs for diagnosis
logs, _ := m.Runtime.GetLogs(ctx, id)
_ = m.Runtime.Delete(ctx, id)
if m.PortPool != nil {
m.PortPool.Release(opts.Name)
}
return nil, fmt.Errorf("container started but exited immediately (status: %s). Container logs:\n%s", a.ContainerStatus, logs)
}
a.Detached = detached
Expand Down
35 changes: 34 additions & 1 deletion pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -352,6 +353,37 @@ type ResourceList struct {
Memory string `json:"memory,omitempty" yaml:"memory,omitempty"`
}

// PortPoolConfig controls the hub-managed port pool that allocates unique host
// ports for agent containers.
type PortPoolConfig struct {
Enabled *bool `json:"enabled,omitempty" yaml:"enabled,omitempty"`
Range string `json:"range,omitempty" yaml:"range,omitempty"` // e.g. "8000-9000"
PortsPerAgent int `json:"ports_per_agent,omitempty" yaml:"ports_per_agent,omitempty"`
}

// ParsePortRange parses a "min-max" range string into its integer bounds.
func ParsePortRange(s string) (min, max int, err error) {
parts := strings.SplitN(s, "-", 2)
if len(parts) != 2 {
return 0, 0, fmt.Errorf("invalid port range %q: expected format \"min-max\"", s)
}
min, err = strconv.Atoi(strings.TrimSpace(parts[0]))
if err != nil {
return 0, 0, fmt.Errorf("invalid port range %q: bad min: %w", s, err)
}
max, err = strconv.Atoi(strings.TrimSpace(parts[1]))
if err != nil {
return 0, 0, fmt.Errorf("invalid port range %q: bad max: %w", s, err)
}
if min < 1 || max > 65535 {
return 0, 0, fmt.Errorf("invalid port range %q: ports must be between 1 and 65535", s)
}
if min > max {
return 0, 0, fmt.Errorf("invalid port range %q: min (%d) > max (%d)", s, min, max)
}
return min, max, nil
}

// AgentHubConfig holds hub connection settings that can be specified per-agent
// or per-template in scion-agent.yaml. When set, these take highest priority
// for the agent's hub endpoint, overriding project settings and server config.
Expand Down Expand Up @@ -459,7 +491,8 @@ type ScionConfig struct {
Hub *AgentHubConfig `json:"hub,omitempty" yaml:"hub,omitempty"`
Telemetry *TelemetryConfig `json:"telemetry,omitempty" yaml:"telemetry,omitempty"`

Secrets []RequiredSecret `json:"secrets,omitempty" yaml:"secrets,omitempty"`
PortPool *PortPoolConfig `json:"port_pool,omitempty" yaml:"port_pool,omitempty"`
Secrets []RequiredSecret `json:"secrets,omitempty" yaml:"secrets,omitempty"`

// Agnostic template fields
AgentInstructions string `json:"agent_instructions,omitempty" yaml:"agent_instructions,omitempty"`
Expand Down
45 changes: 45 additions & 0 deletions pkg/api/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,51 @@ func TestValidateServices(t *testing.T) {
}
}

func TestParsePortRange(t *testing.T) {
tests := []struct {
name string
input string
wantMin int
wantMax int
wantErr bool
wantErrContain string
}{
{"valid range", "8000-9000", 8000, 9000, false, ""},
{"single port", "8080-8080", 8080, 8080, false, ""},
{"edge low", "1-100", 1, 100, false, ""},
{"edge high", "65000-65535", 65000, 65535, false, ""},
{"with spaces", " 8000 - 9000 ", 8000, 9000, false, ""},
{"missing dash", "8000", 0, 0, true, "expected format"},
{"empty string", "", 0, 0, true, "expected format"},
{"non-numeric min", "abc-9000", 0, 0, true, "bad min"},
{"non-numeric max", "8000-xyz", 0, 0, true, "bad max"},
{"min zero", "0-9000", 0, 0, true, "must be between 1 and 65535"},
{"max exceeds 65535", "8000-70000", 0, 0, true, "must be between 1 and 65535"},
{"min > max", "9000-8000", 0, 0, true, "min (9000) > max (8000)"},
{"negative min", "-1-9000", 0, 0, true, "bad min"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
min, max, err := ParsePortRange(tt.input)
if tt.wantErr {
if err == nil {
t.Fatalf("ParsePortRange(%q) = (%d, %d, nil), want error", tt.input, min, max)
}
if tt.wantErrContain != "" && !strings.Contains(err.Error(), tt.wantErrContain) {
t.Errorf("error %q does not contain %q", err, tt.wantErrContain)
}
return
}
if err != nil {
t.Fatalf("ParsePortRange(%q) unexpected error: %v", tt.input, err)
}
if min != tt.wantMin || max != tt.wantMax {
t.Errorf("ParsePortRange(%q) = (%d, %d), want (%d, %d)", tt.input, min, max, tt.wantMin, tt.wantMax)
}
})
}
}

func TestParseDuration(t *testing.T) {
tests := []struct {
input string
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/settings_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,15 @@ type V1BrokerConfig struct {
// dispatch agents whose harness-config declares container-script
// provisioning. Defaults to true; set false to block container-script dispatches.
AllowContainerScriptHarnesses *bool `json:"allow_container_script_harnesses,omitempty" yaml:"allow_container_script_harnesses,omitempty" koanf:"allow_container_script_harnesses"`

PortPool *V1PortPoolConfig `json:"port_pool,omitempty" yaml:"port_pool,omitempty" koanf:"port_pool"`
}

type V1PortPoolConfig struct {
Enabled *bool `json:"enabled,omitempty" yaml:"enabled,omitempty" koanf:"enabled"`
Range string `json:"range,omitempty" yaml:"range,omitempty" koanf:"range"`
PortsPerAgent int `json:"ports_per_agent,omitempty" yaml:"ports_per_agent,omitempty" koanf:"ports_per_agent"`
HostURL string `json:"host_url,omitempty" yaml:"host_url,omitempty" koanf:"host_url"`
}

// V1DatabaseConfig holds database settings.
Expand Down
9 changes: 9 additions & 0 deletions pkg/runtime/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,15 @@ func buildCommonRunArgs(config RunConfig) ([]string, error) {
}
}

for i, port := range config.AllocatedPorts {
suffix := string(rune('A' + i))
addEnv(fmt.Sprintf("AVAILABLE_LOCALHOST_PORT_%s", suffix), fmt.Sprintf("%d", port))
addArg("-p", fmt.Sprintf("%d:%d", port, port))
if config.PortHostURL != "" {
addEnv(fmt.Sprintf("AVAILABLE_LOCALHOST_URL_%s", suffix), fmt.Sprintf("%s:%d", config.PortHostURL, port))
}
}

// Inject environment-type resolved secrets
for _, s := range config.ResolvedSecrets {
if s.Type == "environment" || s.Type == "" {
Expand Down
3 changes: 3 additions & 0 deletions pkg/runtime/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type RunConfig struct {
// wait-for-sentinel init container instead of the cloning one.
// Callers should not set this field.
nfsProvisionLockLost bool

AllocatedPorts []int // Host ports allocated from the port pool, published as -p and injected as env vars
PortHostURL string // Base URL for constructing agent port URLs (e.g. "http://35.232.118.211")
}

type Runtime interface {
Expand Down
14 changes: 14 additions & 0 deletions pkg/runtime/k8s_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,20 @@ func (r *KubernetesRuntime) buildPod(namespace string, config RunConfig) (*corev
}
}

for i, port := range config.AllocatedPorts {
suffix := string(rune('A' + i))
envVars = append(envVars, corev1.EnvVar{
Name: fmt.Sprintf("AVAILABLE_LOCALHOST_PORT_%s", suffix),
Value: fmt.Sprintf("%d", port),
})
if config.PortHostURL != "" {
envVars = append(envVars, corev1.EnvVar{
Name: fmt.Sprintf("AVAILABLE_LOCALHOST_URL_%s", suffix),
Value: fmt.Sprintf("%s:%d", config.PortHostURL, port),
})
}
}

// Secret mounting: determine strategy and inject secrets
var extraVolumes []corev1.Volume
var extraVolumeMounts []corev1.VolumeMount
Expand Down
Loading
Loading