Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ let package = Package(
.product(name: "ContainerizationOS", package: "containerization"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
"ContainerAPIClient",
"ContainerNetworkServiceClient",
"ContainerOS",
"ContainerPersistence",
"ContainerResource",
Expand Down
32 changes: 0 additions & 32 deletions Sources/ContainerResource/Network/AllocatedAttachment.swift

This file was deleted.

11 changes: 11 additions & 0 deletions Sources/ContainerXPC/XPCClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ extension XPCClient {
xpc_connection_cancel(connection)
}

/// Set a handler that is called when the remote end of the connection closes or crashes.
/// This replaces the no-op event handler installed at init time. Call before any ``send(_:)``
/// to avoid a narrow race between activation and handler registration.
public func setDisconnectHandler(_ handler: @Sendable @escaping () -> Void) {
xpc_connection_set_event_handler(connection) { event in
if xpc_get_type(event) == XPC_TYPE_ERROR {
handler()
}
}
}

/// Returns the pid of process to which we have a connection.
/// Note: `xpc_connection_get_pid` returns 0 if no activity
/// has taken place on the connection prior to it being called.
Expand Down
63 changes: 58 additions & 5 deletions Sources/ContainerXPC/XPCServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,46 @@ import Synchronization

public struct XPCServer: Sendable {
public typealias RouteHandler = @Sendable (XPCMessage) async throws -> XPCMessage
/// A persistent route handler returns a reply and an optional cleanup closure that is
/// invoked when the peer connection closes (normally or due to peer exit/crash).
/// The connection is kept alive until the peer disconnects, making connection lifetime
/// equivalent to the resource lifetime of whatever was allocated in the handler.
public typealias PersistentRouteHandler = @Sendable (XPCMessage) async throws -> (XPCMessage, (@Sendable () async -> Void)?)

private let routes: [String: RouteHandler]
private let persistentRoutes: [String: PersistentRouteHandler]
// Access to `connection` is protected by a lock.
private nonisolated(unsafe) let connection: xpc_connection_t
private let lock = NSLock()

let log: Logging.Logger

public init(identifier: String, routes: [String: RouteHandler], log: Logging.Logger) {
public init(
identifier: String,
routes: [String: RouteHandler],
persistentRoutes: [String: PersistentRouteHandler] = [:],
log: Logging.Logger
) {
let connection = xpc_connection_create_mach_service(
identifier,
nil,
UInt64(XPC_CONNECTION_MACH_SERVICE_LISTENER)
)

self.routes = routes
self.persistentRoutes = persistentRoutes
self.connection = connection
self.log = log
}

public init(connection: xpc_connection_t, routes: [String: RouteHandler], log: Logging.Logger) {
public init(
connection: xpc_connection_t,
routes: [String: RouteHandler],
persistentRoutes: [String: PersistentRouteHandler] = [:],
log: Logging.Logger
) {
self.routes = routes
self.persistentRoutes = persistentRoutes
self.connection = connection
self.log = log
}
Expand Down Expand Up @@ -100,6 +118,7 @@ public struct XPCServer: Sendable {

func handleClientConnection(connection: xpc_connection_t) async throws {
let replySent = Mutex(false)
let onClose = Mutex<(@Sendable () async -> Void)?>(nil)

let objects = AsyncStream<xpc_object_t> { cont in
xpc_connection_set_event_handler(connection) { object in
Expand Down Expand Up @@ -140,7 +159,7 @@ public struct XPCServer: Sendable {
// `object` isn't used concurrently.
nonisolated(unsafe) let object = object
let added = group.addTaskUnlessCancelled { @Sendable in
try await self.handleMessage(connection: connection, object: object)
try await self.handleMessage(connection: connection, object: object, onClose: onClose)
replySent.withLock { $0 = true }
}
if !added {
Expand All @@ -149,9 +168,14 @@ public struct XPCServer: Sendable {
}
group.cancelAll()
}

// Connection has closed — run any cleanup registered by a persistent route handler.
if let cleanup = onClose.withLock({ $0 }) {
await cleanup()
}
}

func handleMessage(connection: xpc_connection_t, object: xpc_object_t) async throws {
func handleMessage(connection: xpc_connection_t, object: xpc_object_t, onClose: borrowing Mutex<(@Sendable () async -> Void)?>) async throws {
// All requests are dictionary-valued.
guard xpc_get_type(object) == XPC_TYPE_DICTIONARY else {
log.error("invalid request - not a dictionary")
Expand Down Expand Up @@ -193,7 +217,36 @@ public struct XPCServer: Sendable {
return
}

if let handler = routes[route] {
if let handler = persistentRoutes[route] {
do {
let message = XPCMessage(object: object)
let (response, cleanup) = try await handler(message)
xpc_connection_send_message(connection, response.underlying)
if let cleanup {
onClose.withLock { $0 = cleanup }
}
} catch let error as ContainerizationError {
log.error(
"persistent route handler threw an error",
metadata: [
"route": "\(route)",
"error": "\(error)",
])
Self.replyWithError(connection: connection, object: object, err: error)
} catch {
log.error(
"persistent route handler threw an error",
metadata: [
"route": "\(route)",
"error": "\(error)",
])
let message = XPCMessage(object: object)
let reply = message.reply()
let err = ContainerizationError(.unknown, message: String(describing: error))
reply.set(error: err)
xpc_connection_send_message(connection, reply.underlying)
}
} else if let handler = routes[route] {
do {
let message = XPCMessage(object: object)
let response = try await handler(message)
Expand Down
5 changes: 3 additions & 2 deletions Sources/Plugins/NetworkVmnet/NetworkVmnetHelper+Start.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ extension NetworkVmnetHelper {
identifier: serviceIdentifier,
routes: [
NetworkRoutes.state.rawValue: server.state,
NetworkRoutes.allocate.rawValue: server.allocate,
NetworkRoutes.deallocate.rawValue: server.deallocate,
NetworkRoutes.lookup.rawValue: server.lookup,
NetworkRoutes.disableAllocator.rawValue: server.disableAllocator,
],
persistentRoutes: [
NetworkRoutes.allocate.rawValue: server.allocate
],
log: log
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public actor ContainersService {
struct ContainerState {
var snapshot: ContainerSnapshot
var client: SandboxClient?
var allocatedAttachments: [AllocatedAttachment]

func getClient() throws -> SandboxClient {
guard let client else {
Expand Down Expand Up @@ -96,7 +95,7 @@ public actor ContainersService {
}

let runtimePlugins = loader.findPlugins().filter { $0.hasType(.runtime) }
var results = [String: ContainerState]()
let results = [String: ContainerState]()
for dir in directories {
do {
let (config, options) = try Self.getContainerConfiguration(at: dir)
Expand All @@ -121,16 +120,6 @@ public actor ContainersService {
}
}

let state = ContainerState(
snapshot: .init(
configuration: config,
status: .stopped,
networks: [],
startedDate: nil
),
allocatedAttachments: []
)
results[config.id] = state
guard runtimePlugins.first(where: { $0.name == config.runtimeHandler }) != nil else {
throw ContainerizationError(
.internalError,
Expand Down Expand Up @@ -390,7 +379,7 @@ public actor ContainersService {
networks: [],
startedDate: nil
)
await self.setContainerState(configuration.id, ContainerState(snapshot: snapshot, allocatedAttachments: []), context: context)
await self.setContainerState(configuration.id, ContainerState(snapshot: snapshot), context: context)
} catch {
throw error
}
Expand Down Expand Up @@ -429,35 +418,14 @@ public actor ContainersService {
let path = self.containerRoot.appendingPathComponent(id)
let (config, _) = try Self.getContainerConfiguration(at: path)

var allocatedAttachments = [AllocatedAttachment]()
var networkPluginInfos = [NetworkPluginInfo]()
do {
for n in config.networks {
let allocatedAttach = try await self.networksService?.allocate(
id: n.network,
hostname: n.options.hostname,
macAddress: n.options.macAddress
)
guard var allocatedAttach = allocatedAttach else {
throw ContainerizationError(.internalError, message: "failed to allocate a network")
}

if let mtu = n.options.mtu {
let a = allocatedAttach.attachment
allocatedAttach = AllocatedAttachment(
attachment: Attachment(
network: a.network,
hostname: a.hostname,
ipv4Address: a.ipv4Address,
ipv4Gateway: a.ipv4Gateway,
ipv6Address: a.ipv6Address,
macAddress: a.macAddress,
mtu: mtu
),
additionalData: allocatedAttach.additionalData,
pluginInfo: allocatedAttach.pluginInfo
)
let pluginInfo = try await self.networksService?.pluginInfo(for: n.network)
guard let pluginInfo else {
throw ContainerizationError(.internalError, message: "failed to get plugin info for network \(n.network)")
}
allocatedAttachments.append(allocatedAttach)
networkPluginInfos.append(pluginInfo)
}

try Self.registerService(
Expand All @@ -473,31 +441,16 @@ public actor ContainersService {
id: id,
runtime: runtime
)
try await sandboxClient.bootstrap(stdio: stdio, allocatedAttachments: allocatedAttachments)
try await sandboxClient.bootstrap(stdio: stdio, networkPluginInfos: networkPluginInfos)

try await self.exitMonitor.registerProcess(
id: id,
onExit: self.handleContainerExit
)

state.client = sandboxClient
state.allocatedAttachments = allocatedAttachments
await self.setContainerState(id, state, context: context)
} catch {
for allocatedAttach in allocatedAttachments {
do {
try await self.networksService?.deallocate(attachment: allocatedAttach.attachment)
} catch {
self.log.error(
"failed to deallocate network attachment",
metadata: [
"id": "\(id)",
"network": "\(allocatedAttach.attachment.network)",
"error": "\(error)",
])
}
}

let label = Self.fullLaunchdServiceLabel(
runtimeName: config.runtimeHandler,
instanceId: id
Expand Down Expand Up @@ -992,27 +945,9 @@ public actor ContainersService {
])
}

// Best effort deallocate network attachments for the container. Don't throw on
// failure so we can continue with state cleanup.
self.log.info("deallocating network attachments", metadata: ["id": "\(id)"])
for allocatedAttach in state.allocatedAttachments {
do {
try await self.networksService?.deallocate(attachment: allocatedAttach.attachment)
} catch {
self.log.error(
"failed to deallocate network attachment",
metadata: [
"id": "\(id)",
"network": "\(allocatedAttach.attachment.network)",
"error": "\(error)",
])
}
}

state.snapshot.status = .stopped
state.snapshot.networks = []
state.client = nil
state.allocatedAttachments = []
await self.setContainerState(id, state, context: context)

let options = try getContainerCreationOptions(id: id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,26 +347,14 @@ public actor NetworksService {
}
}

public func allocate(id: String, hostname: String, macAddress: MACAddress?) async throws -> AllocatedAttachment {
public func pluginInfo(for id: String) async throws -> NetworkPluginInfo {
guard let serviceState = serviceStates[id] else {
throw ContainerizationError(.notFound, message: "no network for id \(id)")
}
guard let pluginInfo = serviceState.networkState.pluginInfo else {
throw ContainerizationError(.internalError, message: "network \(id) missing plugin information")
}
let (attach, additionalData) = try await serviceState.client.allocate(hostname: hostname, macAddress: macAddress)
return AllocatedAttachment(
attachment: attach,
additionalData: additionalData,
pluginInfo: pluginInfo
)
}

public func deallocate(attachment: Attachment) async throws {
guard let serviceState = serviceStates[attachment.network] else {
throw ContainerizationError(.notFound, message: "no network for id \(attachment.network)")
}
return try await serviceState.client.deallocate(hostname: attachment.hostname)
return pluginInfo
}

private static func getClient(configuration: NetworkConfiguration) throws -> NetworkClient {
Expand Down
Loading
Loading