Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ let package = Package(
.product(name: "ContainerizationOS", package: "containerization"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
"ContainerAPIClient",
"ContainerNetworkServiceClient",
"ContainerOS",
"ContainerPersistence",
"ContainerResource",
Expand Down
32 changes: 0 additions & 32 deletions Sources/ContainerResource/Network/AllocatedAttachment.swift

This file was deleted.

11 changes: 11 additions & 0 deletions Sources/ContainerXPC/XPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ extension XPCClient {
xpc_connection_cancel(connection)
}

/// Set a handler that is called when the remote end of the connection closes or crashes.
/// This replaces the no-op event handler installed at init time. Call before any ``send(_:)``
/// to avoid a narrow race between activation and handler registration.
public func setDisconnectHandler(_ handler: @Sendable @escaping () -> Void) {
xpc_connection_set_event_handler(connection) { event in
if xpc_get_type(event) == XPC_TYPE_ERROR {
handler()
}
}
}

/// Returns the pid of process to which we have a connection.
/// Note: `xpc_connection_get_pid` returns 0 if no activity
/// has taken place on the connection prior to it being called.
Expand Down
63 changes: 58 additions & 5 deletions Sources/ContainerXPC/XPCServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,46 @@ import Synchronization

public struct XPCServer: Sendable {
public typealias RouteHandler = @Sendable (XPCMessage) async throws -> XPCMessage
/// A persistent route handler returns a reply and an optional cleanup closure that is
/// invoked when the peer connection closes (normally or due to peer exit/crash).
/// The connection is kept alive until the peer disconnects, making connection lifetime
/// equivalent to the resource lifetime of whatever was allocated in the handler.
public typealias PersistentRouteHandler = @Sendable (XPCMessage) async throws -> (XPCMessage, (@Sendable () async -> Void)?)

private let routes: [String: RouteHandler]
private let persistentRoutes: [String: PersistentRouteHandler]
// Access to `connection` is protected by a lock.
private nonisolated(unsafe) let connection: xpc_connection_t
private let lock = NSLock()

let log: Logging.Logger

public init(identifier: String, routes: [String: RouteHandler], log: Logging.Logger) {
public init(
identifier: String,
routes: [String: RouteHandler],
persistentRoutes: [String: PersistentRouteHandler] = [:],
log: Logging.Logger
) {
let connection = xpc_connection_create_mach_service(
identifier,
nil,
UInt64(XPC_CONNECTION_MACH_SERVICE_LISTENER)
)

self.routes = routes
self.persistentRoutes = persistentRoutes
self.connection = connection
self.log = log
}

public init(connection: xpc_connection_t, routes: [String: RouteHandler], log: Logging.Logger) {
public init(
connection: xpc_connection_t,
routes: [String: RouteHandler],
persistentRoutes: [String: PersistentRouteHandler] = [:],
log: Logging.Logger
) {
self.routes = routes
self.persistentRoutes = persistentRoutes
self.connection = connection
self.log = log
}
Expand Down Expand Up @@ -100,6 +118,7 @@ public struct XPCServer: Sendable {

func handleClientConnection(connection: xpc_connection_t) async throws {
let replySent = Mutex(false)
let onClose = Mutex<(@Sendable () async -> Void)?>(nil)

let objects = AsyncStream<xpc_object_t> { cont in
xpc_connection_set_event_handler(connection) { object in
Expand Down Expand Up @@ -140,7 +159,7 @@ public struct XPCServer: Sendable {
// `object` isn't used concurrently.
nonisolated(unsafe) let object = object
let added = group.addTaskUnlessCancelled { @Sendable in
try await self.handleMessage(connection: connection, object: object)
try await self.handleMessage(connection: connection, object: object, onClose: onClose)
replySent.withLock { $0 = true }
}
if !added {
Expand All @@ -149,9 +168,14 @@ public struct XPCServer: Sendable {
}
group.cancelAll()
}

// Connection has closed — run any cleanup registered by a persistent route handler.
if let cleanup = onClose.withLock({ $0 }) {
await cleanup()
}
}

func handleMessage(connection: xpc_connection_t, object: xpc_object_t) async throws {
func handleMessage(connection: xpc_connection_t, object: xpc_object_t, onClose: borrowing Mutex<(@Sendable () async -> Void)?>) async throws {
// All requests are dictionary-valued.
guard xpc_get_type(object) == XPC_TYPE_DICTIONARY else {
log.error("invalid request - not a dictionary")
Expand Down Expand Up @@ -193,7 +217,36 @@ public struct XPCServer: Sendable {
return
}

if let handler = routes[route] {
if let handler = persistentRoutes[route] {
do {
let message = XPCMessage(object: object)
let (response, cleanup) = try await handler(message)
xpc_connection_send_message(connection, response.underlying)
if let cleanup {
onClose.withLock { $0 = cleanup }
}
} catch let error as ContainerizationError {
log.error(
"persistent route handler threw an error",
metadata: [
"route": "\(route)",
"error": "\(error)",
])
Self.replyWithError(connection: connection, object: object, err: error)
} catch {
log.error(
"persistent route handler threw an error",
metadata: [
"route": "\(route)",
"error": "\(error)",
])
let message = XPCMessage(object: object)
let reply = message.reply()
let err = ContainerizationError(.unknown, message: String(describing: error))
reply.set(error: err)
xpc_connection_send_message(connection, reply.underlying)
}
} else if let handler = routes[route] {
do {
let message = XPCMessage(object: object)
let response = try await handler(message)
Expand Down
5 changes: 3 additions & 2 deletions Sources/Plugins/NetworkVmnet/NetworkVmnetHelper+Start.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ extension NetworkVmnetHelper {
identifier: serviceIdentifier,
routes: [
NetworkRoutes.state.rawValue: server.state,
NetworkRoutes.allocate.rawValue: server.allocate,
NetworkRoutes.deallocate.rawValue: server.deallocate,
NetworkRoutes.lookup.rawValue: server.lookup,
NetworkRoutes.disableAllocator.rawValue: server.disableAllocator,
],
persistentRoutes: [
NetworkRoutes.allocate.rawValue: server.allocate
],
log: log
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public actor ContainersService {
struct ContainerState {
var snapshot: ContainerSnapshot
var client: SandboxClient?
var allocatedAttachments: [AllocatedAttachment]

func getClient() throws -> SandboxClient {
guard let client else {
Expand Down Expand Up @@ -96,7 +95,7 @@ public actor ContainersService {
}

let runtimePlugins = loader.findPlugins().filter { $0.hasType(.runtime) }
var results = [String: ContainerState]()
let results = [String: ContainerState]()
for dir in directories {
do {
let (config, options) = try Self.getContainerConfiguration(at: dir)
Expand All @@ -121,16 +120,6 @@ public actor ContainersService {
}
}

let state = ContainerState(
snapshot: .init(
configuration: config,
status: .stopped,
networks: [],
startedDate: nil
),
allocatedAttachments: []
)
results[config.id] = state
guard runtimePlugins.first(where: { $0.name == config.runtimeHandler }) != nil else {
throw ContainerizationError(
.internalError,
Expand Down Expand Up @@ -390,7 +379,7 @@ public actor ContainersService {
networks: [],
startedDate: nil
)
await self.setContainerState(configuration.id, ContainerState(snapshot: snapshot, allocatedAttachments: []), context: context)
await self.setContainerState(configuration.id, ContainerState(snapshot: snapshot), context: context)
} catch {
throw error
}
Expand Down Expand Up @@ -429,35 +418,14 @@ public actor ContainersService {
let path = self.containerRoot.appendingPathComponent(id)
let (config, _) = try Self.getContainerConfiguration(at: path)

var allocatedAttachments = [AllocatedAttachment]()
var networkPluginInfos = [NetworkPluginInfo]()
do {
for n in config.networks {
let allocatedAttach = try await self.networksService?.allocate(
id: n.network,
hostname: n.options.hostname,
macAddress: n.options.macAddress
)
guard var allocatedAttach = allocatedAttach else {
throw ContainerizationError(.internalError, message: "failed to allocate a network")
}

if let mtu = n.options.mtu {
let a = allocatedAttach.attachment
allocatedAttach = AllocatedAttachment(
attachment: Attachment(
network: a.network,
hostname: a.hostname,
ipv4Address: a.ipv4Address,
ipv4Gateway: a.ipv4Gateway,
ipv6Address: a.ipv6Address,
macAddress: a.macAddress,
mtu: mtu
),
additionalData: allocatedAttach.additionalData,
pluginInfo: allocatedAttach.pluginInfo
)
let pluginInfo = try await self.networksService?.pluginInfo(for: n.network)
guard let pluginInfo else {
throw ContainerizationError(.internalError, message: "failed to get plugin info for network \(n.network)")
}
allocatedAttachments.append(allocatedAttach)
networkPluginInfos.append(pluginInfo)
}

try Self.registerService(
Expand All @@ -473,31 +441,16 @@ public actor ContainersService {
id: id,
runtime: runtime
)
try await sandboxClient.bootstrap(stdio: stdio, allocatedAttachments: allocatedAttachments)
try await sandboxClient.bootstrap(stdio: stdio, networkPluginInfos: networkPluginInfos)

try await self.exitMonitor.registerProcess(
id: id,
onExit: self.handleContainerExit
)

state.client = sandboxClient
state.allocatedAttachments = allocatedAttachments
await self.setContainerState(id, state, context: context)
} catch {
for allocatedAttach in allocatedAttachments {
do {
try await self.networksService?.deallocate(attachment: allocatedAttach.attachment)
} catch {
self.log.error(
"failed to deallocate network attachment",
metadata: [
"id": "\(id)",
"network": "\(allocatedAttach.attachment.network)",
"error": "\(error)",
])
}
}

let label = Self.fullLaunchdServiceLabel(
runtimeName: config.runtimeHandler,
instanceId: id
Expand Down Expand Up @@ -992,27 +945,9 @@ public actor ContainersService {
])
}

// Best effort deallocate network attachments for the container. Don't throw on
// failure so we can continue with state cleanup.
self.log.info("deallocating network attachments", metadata: ["id": "\(id)"])
for allocatedAttach in state.allocatedAttachments {
do {
try await self.networksService?.deallocate(attachment: allocatedAttach.attachment)
} catch {
self.log.error(
"failed to deallocate network attachment",
metadata: [
"id": "\(id)",
"network": "\(allocatedAttach.attachment.network)",
"error": "\(error)",
])
}
}

state.snapshot.status = .stopped
state.snapshot.networks = []
state.client = nil
state.allocatedAttachments = []
await self.setContainerState(id, state, context: context)

let options = try getContainerCreationOptions(id: id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,26 +347,14 @@ public actor NetworksService {
}
}

public func allocate(id: String, hostname: String, macAddress: MACAddress?) async throws -> AllocatedAttachment {
public func pluginInfo(for id: String) async throws -> NetworkPluginInfo {
guard let serviceState = serviceStates[id] else {
throw ContainerizationError(.notFound, message: "no network for id \(id)")
}
guard let pluginInfo = serviceState.networkState.pluginInfo else {
throw ContainerizationError(.internalError, message: "network \(id) missing plugin information")
}
let (attach, additionalData) = try await serviceState.client.allocate(hostname: hostname, macAddress: macAddress)
return AllocatedAttachment(
attachment: attach,
additionalData: additionalData,
pluginInfo: pluginInfo
)
}

public func deallocate(attachment: Attachment) async throws {
guard let serviceState = serviceStates[attachment.network] else {
throw ContainerizationError(.notFound, message: "no network for id \(attachment.network)")
}
return try await serviceState.client.deallocate(hostname: attachment.hostname)
return pluginInfo
}

private static func getClient(configuration: NetworkConfiguration) throws -> NetworkClient {
Expand Down
Loading
Loading