Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ This change log lists the most relevant changes for past releases in reverse chr
* Bugs fixed:
** Properly check a new plant model for duplicate element names before accepting it.
** Don't allow transport orders to be created with a peripheral reservation token set to the empty string.
** Record allocation futures for retried tasks in `PendingAllocationManager` to ensure proper tracking and cancellation.
* Changes affecting developers:
** Mark layout coordinates of points and locations for removal with the next major version as they are apparently not really used in practice.
The model coordinates of points and locations should be used instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected void configure() {

private void configureSchedulerDependencies() {
bind(ReservationPool.class).in(Singleton.class);
bind(PendingAllocationManager.class).in(Singleton.class);

Multibinder<Scheduler.Module> moduleBinder = schedulerModuleBinder();
moduleBinder.addBinding().to(SingleVehicleBlockModule.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import static java.util.Objects.requireNonNull;

import jakarta.annotation.Nonnull;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import org.opentcs.components.kernel.Scheduler;
import org.opentcs.components.kernel.Scheduler.Client;
Expand Down Expand Up @@ -35,9 +35,9 @@ class AllocatorTask
*/
private final Scheduler.Module allocationAdvisor;
/**
* Allocations deferred because they couldn't be granted, yet.
* The pending allocation manager.
*/
private final Queue<AllocatorCommand.Allocate> deferredAllocations;
private final PendingAllocationManager pendingAllocationManager;
/**
* Executes tasks.
*/
Expand All @@ -58,7 +58,7 @@ class AllocatorTask
@Nonnull
ReservationPool reservationPool,
@Nonnull
Queue<AllocatorCommand.Allocate> deferredAllocations,
PendingAllocationManager allocationTracker,
@Nonnull
Scheduler.Module allocationAdvisor,
@Nonnull
Expand All @@ -70,7 +70,8 @@ class AllocatorTask
AllocatorCommand command
) {
this.reservationPool = requireNonNull(reservationPool, "reservationPool");
this.deferredAllocations = requireNonNull(deferredAllocations, "deferredAllocations");
this.pendingAllocationManager
= requireNonNull(allocationTracker, "pendingAllocationManager");
this.allocationAdvisor = requireNonNull(allocationAdvisor, "allocationAdvisor");
this.kernelExecutor = requireNonNull(kernelExecutor, "kernelExecutor");
this.globalSyncObject = requireNonNull(globalSyncObject, "globalSyncObject");
Expand Down Expand Up @@ -101,7 +102,7 @@ else if (command instanceof AllocatorCommand.AllocationsReleased) {
private void processAllocate(AllocatorCommand.Allocate command) {
if (!tryAllocate(command)) {
LOG.debug("{}: Resources unavailable, deferring allocation...", command.getClient().getId());
deferredAllocations.add(command);
pendingAllocationManager.addDeferredAllocation(command);
return;
}

Expand Down Expand Up @@ -215,18 +216,18 @@ private void undoAllocate(Client client, Set<TCSResource<?>> resources) {
* Moves all waiting allocations back into the incoming queue so they can be rechecked.
*/
private void scheduleRetryWaitingAllocations() {
for (AllocatorCommand.Allocate allocate : deferredAllocations) {
kernelExecutor.submit(
for (AllocatorCommand.Allocate allocate : pendingAllocationManager.drainDeferredAllocations()) {
Future<?> future = kernelExecutor.submit(
new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
allocate
)
);
pendingAllocationManager.addAllocationFuture(allocate.getClient(), future);
}
deferredAllocations.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@

import jakarta.annotation.Nonnull;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.opentcs.components.kernel.ResourceAllocationException;
Expand Down Expand Up @@ -58,9 +54,9 @@ public class DefaultScheduler
*/
private final ReservationPool reservationPool;
/**
* Allocations deferred because they couldn't be granted, yet.
* The pending allocation manager.
*/
private final Queue<AllocatorCommand.Allocate> deferredAllocations = new LinkedBlockingQueue<>();
private final PendingAllocationManager pendingAllocationManager;
/**
* Executes scheduling tasks.
*/
Expand All @@ -73,10 +69,6 @@ public class DefaultScheduler
* A global object to be used for synchronization within the kernel.
*/
private final Object globalSyncObject;
/**
* Allocations that are scheduled for execution on the kernel executor.
*/
private final Map<Client, List<Future<?>>> allocateFutures = new HashMap<>();
/**
* Indicates whether this component is enabled.
*/
Expand All @@ -95,6 +87,7 @@ public class DefaultScheduler
public DefaultScheduler(
AllocationAdvisor allocationAdvisor,
ReservationPool reservationPool,
PendingAllocationManager allocationTracker,
@KernelExecutor
ScheduledExecutorService kernelExecutor,
@ApplicationEventBus
Expand All @@ -104,6 +97,8 @@ public DefaultScheduler(
) {
this.allocationAdvisor = requireNonNull(allocationAdvisor, "allocationAdvisor");
this.reservationPool = requireNonNull(reservationPool, "reservationPool");
this.pendingAllocationManager
= requireNonNull(allocationTracker, "pendingAllocationManager");
this.kernelExecutor = requireNonNull(kernelExecutor, "kernelExecutor");
this.eventBus = requireNonNull(eventBus, "eventBus");
this.globalSyncObject = requireNonNull(globalSyncObject, "globalSyncObject");
Expand All @@ -117,6 +112,7 @@ public void initialize() {

reservationPool.clear();
allocationAdvisor.initialize();
pendingAllocationManager.initialize();

eventBus.subscribe(this);

Expand All @@ -137,6 +133,7 @@ public void terminate() {
eventBus.unsubscribe(this);

allocationAdvisor.terminate();
pendingAllocationManager.terminate();

initialized = false;
}
Expand Down Expand Up @@ -172,7 +169,7 @@ public void allocate(Client client, Set<TCSResource<?>> resources) {
Future<?> allocateFuture = kernelExecutor.submit(
new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
Expand All @@ -181,12 +178,7 @@ public void allocate(Client client, Set<TCSResource<?>> resources) {
);

// Remember the allocate future in case we need to cancel it.
addAllocateFuture(client, allocateFuture);

// Clean up the collection of allocate futures and remove futures that have already been
// completed. This could also be done in other places, but doing it for every new allocation
// should be sufficient.
removeCompletedAllocateFutures(client);
pendingAllocationManager.addAllocationFuture(client, allocateFuture);
}
}

Expand Down Expand Up @@ -240,7 +232,7 @@ public void free(Client client, Set<TCSResource<?>> resources) {
.collect(Collectors.toCollection(HashSet::new));
new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
Expand All @@ -250,7 +242,7 @@ public void free(Client client, Set<TCSResource<?>> resources) {
kernelExecutor.submit(
new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
Expand All @@ -272,7 +264,7 @@ public void freeAll(Client client) {

new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
Expand All @@ -282,7 +274,7 @@ public void freeAll(Client client) {
kernelExecutor.submit(
new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
Expand All @@ -296,16 +288,15 @@ public void clearPendingAllocations(Client client) {
requireNonNull(client, "client");
synchronized (globalSyncObject) {
LOG.debug("{}: Clearing pending allocation requests...", client.getId());
deferredAllocations.removeIf(allocate -> client.equals(allocate.getClient()));
cancelPendingAllocateFutures(client);
pendingAllocationManager.clearPendingAllocations(client);
}
}

@Override
public void reschedule() {
new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
Expand Down Expand Up @@ -335,7 +326,7 @@ public void preparationSuccessful(

new AllocatorTask(
reservationPool,
deferredAllocations,
pendingAllocationManager,
allocationAdvisor,
kernelExecutor,
globalSyncObject,
Expand Down Expand Up @@ -363,36 +354,6 @@ public void onEvent(Object event) {
}
}

private void addAllocateFuture(Client client, Future<?> allocateFuture) {
if (!allocateFutures.containsKey(client)) {
allocateFutures.put(client, new ArrayList<>());
}

allocateFutures.get(client).add(allocateFuture);
}

private void removeCompletedAllocateFutures(Client client) {
if (!allocateFutures.containsKey(client)) {
return;
}

allocateFutures.get(client).removeAll(
allocateFutures.get(client).stream()
.filter(future -> future.isDone())
.collect(Collectors.toList())
);
}

private void cancelPendingAllocateFutures(Client client) {
if (!allocateFutures.containsKey(client)) {
return;
}

allocateFutures.get(client).stream()
.filter(future -> !future.isDone())
.forEach(future -> future.cancel(false));
}

/**
* A dummy client for cases in which we need to provide a client but do not have a real one.
*/
Expand Down
Loading