Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/core/libraries/kernel/sync/semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class Semaphore {
#endif
}

bool try_acquire_non_alertable() {
#ifdef _WIN64
return WaitForSingleObjectEx(sem, 0, false) == WAIT_OBJECT_0;
#elif defined(__APPLE__)
return dispatch_semaphore_wait(sem, DISPATCH_TIME_NOW) == 0;
#else
return sem.try_acquire();
#endif
}

template <class Rep, class Period>
bool try_acquire_for(const std::chrono::duration<Rep, Period>& rel_time) {
#ifdef _WIN64
Expand Down Expand Up @@ -134,4 +144,4 @@ class Semaphore {
using BinarySemaphore = Semaphore<1>;
using CountingSemaphore = Semaphore<0x7FFFFFFF /*ORBIS_KERNEL_SEM_VALUE_MAX*/>;

} // namespace Libraries::Kernel
} // namespace Libraries::Kernel
65 changes: 62 additions & 3 deletions src/core/libraries/kernel/threads/condvar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ static constexpr PthreadCondAttr PthreadCondattrDefault = {
.c_clockid = ClockId::Realtime,
};

#ifdef _WIN64
static bool TryImmediateRelease(Pthread* target_thread, const u64 target_wait_generation,
BinarySemaphore* wake_sema) {
if (target_thread == nullptr || wake_sema == nullptr) {
return false;
}

if (target_thread->GetCondWaitGeneration() != target_wait_generation ||
target_thread->GetCondWaitArmedGeneration() != target_wait_generation) {
return false;
}

target_thread->SetLastCondWakeGeneration(target_wait_generation);
wake_sema->release();
return true;
}
#endif

static int CondInit(PthreadCondT* cond, const PthreadCondAttrT* cond_attr, const char* name) {
auto* cvp = new (std::nothrow) PthreadCond{};
if (cvp == nullptr) {
Expand Down Expand Up @@ -114,6 +132,8 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
mp->CvUnlock(&recurse);

curthread->mutex_obj = mp;
const u64 wait_generation = curthread->BeginCondWaitGeneration();
curthread->ArmCondWaitGeneration(wait_generation);
SleepqAdd(this, curthread);

int error = 0;
Expand All @@ -124,8 +144,19 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
//_thr_cancel_enter2(curthread, 0);
error = curthread->Sleep(abstime, usec) ? 0 : POSIX_ETIMEDOUT;
//_thr_cancel_leave(curthread, 0);
curthread->ClearCondWaitGenerationArm();

SleepqLock(this);
#ifdef _WIN64
const u64 last_wake_generation = curthread->GetLastCondWakeGeneration();
if (error == 0 && curthread->wchan != nullptr && last_wake_generation != 0 &&
last_wake_generation != wait_generation) {
curthread->ClearWake();
curthread->ArmCondWaitGeneration(wait_generation);
SleepqUnlock(this);
continue;
}
#endif
if (curthread->wchan == nullptr) {
error = 0;
break;
Expand All @@ -134,7 +165,9 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
has_user_waiters = SleepqRemove(sq, curthread);
SleepqUnlock(this);
curthread->mutex_obj = nullptr;
curthread->ClearCondWaitGenerationArm();
mp->CvLock(recurse);
curthread->ClearWake();
return 0;
} else if (error == POSIX_ETIMEDOUT) {
SleepQueue* sq = SleepqLookup(this);
Expand All @@ -145,7 +178,9 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
}
SleepqUnlock(this);
curthread->mutex_obj = nullptr;
curthread->ClearCondWaitGenerationArm();
const int error2 = mp->CvLock(recurse);
curthread->ClearWake();
if (error == 0) {
error = error2;
}
Expand Down Expand Up @@ -188,6 +223,7 @@ int PthreadCond::Signal(Pthread* thread) {
}

Pthread* td = thread ? thread : sq->sq_blocked.front();
const u64 td_wait_generation = td->GetCondWaitGeneration();

PthreadMutex* mp = td->mutex_obj;
has_user_waiters = SleepqRemove(sq, td);
Expand All @@ -197,21 +233,29 @@ int PthreadCond::Signal(Pthread* thread) {
if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) {
curthread->WakeAll();
}
curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema;
auto& deferred_entry = curthread->defer_waiters[curthread->nwaiter_defer++];
deferred_entry.thread = td;
deferred_entry.wait_generation = td_wait_generation;
mp->m_flags |= PthreadMutexFlags::Deferred;
} else {
waddr = &td->wake_sema;
}

SleepqUnlock(this);
if (waddr != nullptr) {
#ifdef _WIN64
TryImmediateRelease(td, td_wait_generation, waddr);
#else
waddr->release();
#endif
}
return 0;
}

struct BroadcastArg {
Pthread* curthread;
Pthread* targets[Pthread::MaxDeferWaiters];
u64 target_wait_generations[Pthread::MaxDeferWaiters];
BinarySemaphore* waddrs[Pthread::MaxDeferWaiters];
int count;
};
Expand All @@ -225,21 +269,32 @@ int PthreadCond::Broadcast() {
auto* ba2 = static_cast<BroadcastArg*>(arg);
Pthread* curthread = ba2->curthread;
PthreadMutex* mp = td->mutex_obj;
const u64 td_wait_generation = td->GetCondWaitGeneration();

if (mp->m_owner == curthread) {
if (curthread->nwaiter_defer >= Pthread::MaxDeferWaiters) {
curthread->WakeAll();
}
curthread->defer_waiters[curthread->nwaiter_defer++] = &td->wake_sema;
auto& deferred_entry = curthread->defer_waiters[curthread->nwaiter_defer++];
deferred_entry.thread = td;
deferred_entry.wait_generation = td_wait_generation;
mp->m_flags |= PthreadMutexFlags::Deferred;
} else {
if (ba2->count >= Pthread::MaxDeferWaiters) {
for (int i = 0; i < ba2->count; i++) {
#ifdef _WIN64
TryImmediateRelease(ba2->targets[i], ba2->target_wait_generations[i],
ba2->waddrs[i]);
#else
ba2->waddrs[i]->release();
#endif
}
ba2->count = 0;
}
ba2->waddrs[ba2->count++] = &td->wake_sema;
ba2->targets[ba2->count] = td;
ba2->target_wait_generations[ba2->count] = td_wait_generation;
ba2->waddrs[ba2->count] = &td->wake_sema;
ba2->count++;
}
};

Expand All @@ -255,7 +310,11 @@ int PthreadCond::Broadcast() {
SleepqUnlock(this);

for (int i = 0; i < ba.count; i++) {
#ifdef _WIN64
TryImmediateRelease(ba.targets[i], ba.target_wait_generations[i], ba.waddrs[i]);
#else
ba.waddrs[i]->release();
#endif
}
return 0;
}
Expand Down
54 changes: 50 additions & 4 deletions src/core/libraries/kernel/threads/pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ struct Pthread {
static constexpr u32 ThrMagic = 0xd09ba115U;
static constexpr u32 MaxDeferWaiters = 50;

struct DeferredWakeEntry {
Pthread* thread;
u64 wait_generation;
};

std::atomic<s32> tid;
std::mutex lock;
u32 cycle;
Expand Down Expand Up @@ -305,7 +310,10 @@ struct Pthread {
bool will_sleep;
bool has_user_waiters;
int nwaiter_defer;
BinarySemaphore* defer_waiters[MaxDeferWaiters];
DeferredWakeEntry defer_waiters[MaxDeferWaiters]{};
std::atomic<u64> cond_wait_generation{0};
std::atomic<u64> cond_wait_armed_generation{0};
std::atomic<u64> last_cond_wake_generation{0};

bool InCritical() const noexcept {
return locklevel > 0 || critical_count > 0;
Expand All @@ -319,16 +327,54 @@ struct Pthread {
return cancel_pending && cancel_enable && no_cancel == 0;
}

u64 BeginCondWaitGeneration() noexcept {
return cond_wait_generation.fetch_add(1, std::memory_order_relaxed) + 1;
}

u64 GetCondWaitGeneration() const noexcept {
return cond_wait_generation.load(std::memory_order_relaxed);
}

void ArmCondWaitGeneration(const u64 generation) noexcept {
cond_wait_armed_generation.store(generation, std::memory_order_relaxed);
}

u64 GetCondWaitArmedGeneration() const noexcept {
return cond_wait_armed_generation.load(std::memory_order_relaxed);
}

void ClearCondWaitGenerationArm() noexcept {
cond_wait_armed_generation.store(0, std::memory_order_relaxed);
}

void SetLastCondWakeGeneration(const u64 generation) noexcept {
last_cond_wake_generation.store(generation, std::memory_order_relaxed);
}

u64 GetLastCondWakeGeneration() const noexcept {
return last_cond_wake_generation.load(std::memory_order_relaxed);
}

void WakeAll() {
for (int i = 0; i < nwaiter_defer; i++) {
defer_waiters[i]->release();
const DeferredWakeEntry entry = defer_waiters[i];
defer_waiters[i] = {};
if (entry.thread == nullptr) {
continue;
}
if (entry.thread->GetCondWaitGeneration() != entry.wait_generation ||
entry.thread->GetCondWaitArmedGeneration() != entry.wait_generation) {
continue;
}
entry.thread->SetLastCondWakeGeneration(entry.wait_generation);
entry.thread->wake_sema.release();
}
nwaiter_defer = 0;
}

void ClearWake() {
// Try to acquire wake semaphore to reset it.
void(wake_sema.try_acquire());
while (wake_sema.try_acquire_non_alertable()) {
}
}

bool Sleep(const OrbisKernelTimespec* abstime, u64 usec) {
Expand Down