diff --git a/src/runtime/encore/encore.c b/src/runtime/encore/encore.c index 533f1e1fe..1f012fa6e 100644 --- a/src/runtime/encore/encore.c +++ b/src/runtime/encore/encore.c @@ -32,7 +32,7 @@ static void actor_resume_context(encore_actor_t *actor, ucontext_t *ctx); static void actor_resume_context(encore_actor_t *actor, ucontext_t *ctx); #endif -extern void public_run(pony_actor_t *actor); +extern void public_run(pony_actor_t *actor, void * info_node); extern bool pony_system_actor(pony_actor_t *actor); static void pony_sendargs(pony_ctx_t *ctx, pony_actor_t* to, uint32_t id, @@ -53,17 +53,6 @@ static __pony_thread_local unsigned int available_context = 0; __pony_thread_local context *root_context; __pony_thread_local context *this_context; -void actor_unlock(encore_actor_t *actor) -{ - if (!pony_system_actor((pony_actor_t*) actor)) { - if (actor->lock) { - pthread_mutex_t *lock = actor->lock; - actor->lock = NULL; - pthread_mutex_unlock(lock); - } - } -} - #ifndef LAZY_IMPL static __pony_thread_local stack_page *stack_pool = NULL; @@ -121,7 +110,7 @@ bool actor_run_to_completion(encore_actor_t *actor) #ifdef LAZY_IMPL -static context *pop_context(encore_actor_t *actor) +static context *pop_context(encore_actor_t *actor, void * info_node) { context *c; if (available_context == 0) { @@ -142,7 +131,7 @@ static context *pop_context(encore_actor_t *actor) context_pool->uctx.uc_stack.ss_flags = 0; #endif } - makecontext(&context_pool->uctx, (void(*)(void))public_run, 1, actor); + makecontext(&context_pool->uctx, (void(*)(void))public_run, 2, actor, info_node); c = context_pool; context_pool = c->next; return c; @@ -198,7 +187,7 @@ static void force_thread_local_variable_access(context *old_this_context, #endif void actor_save_context(pony_ctx_t **ctx, encore_actor_t *actor, - ucontext_t *uctx) + ucontext_t *uctx, void * info_node) { #ifndef LAZY_IMPL @@ -221,7 +210,7 @@ void actor_save_context(pony_ctx_t **ctx, encore_actor_t *actor, context *old_this_context = this_context; context *old_root_context = root_context; encore_actor_t *old_actor = actor; - this_context = pop_context(actor); + this_context = pop_context(actor, info_node); assert_swap(uctx, &this_context->uctx); #if defined(PLATFORM_IS_MACOSX) force_thread_local_variable_access(old_this_context, old_root_context); @@ -235,14 +224,14 @@ void actor_save_context(pony_ctx_t **ctx, encore_actor_t *actor, *ctx = pony_ctx(); // Context might have gone stale, update it } -void actor_block(pony_ctx_t **ctx, encore_actor_t *actor) +void actor_block(pony_ctx_t **ctx, encore_actor_t *actor, void * info_node) { #ifndef LAZY_IMPL - actor_save_context(ctx, actor, &actor->uctx); + actor_save_context(ctx, actor, &actor->uctx, info_node); #else actor->saved = &this_context->uctx; - actor_save_context(ctx, actor, actor->saved); + actor_save_context(ctx, actor, actor->saved, info_node); #endif } @@ -255,18 +244,18 @@ void actor_suspend(pony_ctx_t **ctx) ucontext_t uctx; pony_sendp(*ctx, (pony_actor_t*) actor, _ENC__MSG_RESUME_SUSPEND, &uctx); - actor_save_context(ctx, actor, &uctx); + actor_save_context(ctx, actor, &uctx, NULL); actor->suspend_counter--; assert(actor->suspend_counter >= 0); } -void actor_await(pony_ctx_t **ctx, ucontext_t *uctx) +void actor_await(pony_ctx_t **ctx, ucontext_t *uctx, void * info_node) { encore_actor_t *actor = (encore_actor_t*)(*ctx)->current; actor->await_counter++; - actor_save_context(ctx, actor, uctx); + actor_save_context(ctx, actor, uctx, info_node); actor->await_counter--; diff --git a/src/runtime/encore/encore.h b/src/runtime/encore/encore.h index 1ffcde01d..35cffa3e5 100644 --- a/src/runtime/encore/encore.h +++ b/src/runtime/encore/encore.h @@ -40,6 +40,8 @@ static pony_type_t *ENCORE_PRIMITIVE = (pony_type_t *)1; typedef struct encore_actor_t encore_actor_t; typedef struct encore_oneway_msg encore_oneway_msg_t; typedef struct encore_fut_msg encore_fut_msg_t; +typedef struct encore_vanilla_fut_msg encore_vanilla_fut_msg_t; +typedef struct encore_poly_vanilla_fut_msg encore_poly_vanilla_fut_msg_t; typedef struct pony_main_msg_t { @@ -87,6 +89,18 @@ struct encore_fut_msg future_t *_fut; }; +struct encore_vanilla_fut_msg +{ + encore_oneway_msg_t pad; + vanilla_future_t *_fut; +}; + +struct encore_poly_vanilla_fut_msg +{ + encore_oneway_msg_t pad; + poly_vanilla_future_t *_fut; +}; + typedef struct stack_page { void *stack; struct stack_page *next; @@ -139,10 +153,9 @@ void *encore_realloc(pony_ctx_t *ctx, void *p, size_t s); /// The starting point of all Encore programs int encore_start(int argc, char** argv, pony_type_t *type); -void actor_unlock(encore_actor_t *actor); bool encore_actor_run_hook(encore_actor_t *actor); bool encore_actor_handle_message_hook(encore_actor_t *actor, pony_msg_t* msg); -void actor_block(pony_ctx_t **ctx, encore_actor_t *actor); +void actor_block(pony_ctx_t **ctx, encore_actor_t *actor, void * info_node); void actor_set_resume(encore_actor_t *actor); #ifndef LAZY_IMPL @@ -150,7 +163,7 @@ void actor_set_run_to_completion(encore_actor_t *actor); bool actor_run_to_completion(encore_actor_t *actor); #endif void actor_suspend(); -void actor_await(pony_ctx_t **ctx, ucontext_t *uctx); +void actor_await(pony_ctx_t **ctx, ucontext_t *uctx, void * info_node); /// calls the pony's respond with the current object's scheduler void call_respond_with_current_scheduler(); diff --git a/src/runtime/future/future.c b/src/runtime/future/future.c index 0d0b38791..9664b9977 100644 --- a/src/runtime/future/future.c +++ b/src/runtime/future/future.c @@ -15,11 +15,14 @@ #include "../libponyrt/actor/messageq.h" #include "../libponyrt/sched/scheduler.h" -pthread_mutexattr_t attr; -#define BLOCK pthread_mutex_lock(&fut->lock); -#define UNBLOCK pthread_mutex_unlock(&fut->lock); +#define GET 0 +#define AWAIT 1 #define perr(m) // fprintf(stderr, "%s\n", m); +static void future_finalizer(future_t *fut); +static inline void future_gc_send_value(pony_ctx_t *ctx, future_t *fut); +static inline void future_gc_recv_value(pony_ctx_t *ctx, future_t *fut); + extern void encore_future_gc_acquireactor(pony_ctx_t* ctx, pony_actor_t* actor); extern void encore_future_gc_acquireobject(pony_ctx_t* ctx, void* p, pony_type_t *t, int mutability); @@ -31,49 +34,18 @@ static void encore_gc_acquire(pony_ctx_t* ctx) } typedef struct actor_entry actor_entry_t; -typedef struct closure_entry closure_entry_t; -typedef struct message_entry message_entry_t; // Terminology: // Producer -- the actor responsible for fulfilling a future // Consumer -- an non-producer actor using a future -typedef enum responsibility_t -{ - // A closure that should be run by the producer - DETACHED_CLOSURE, - // A message blocked on this future - BLOCKED_MESSAGE -} responsibility_t; struct closure_entry { - // The consumer that created closure pony_actor_t *actor; - // The future where the result of the closure should be stored future_t *future; - // The closure to be run on fulfilment of the future closure_t *closure; - closure_entry_t *next; - -}; - -struct message_entry -{ - // The consumer that created closure - pony_actor_t *actor; - // FIXME: add context -}; - -struct actor_entry -{ - responsibility_t type; - union - { - closure_entry_t closure; - message_entry_t message; - }; }; typedef struct actor_list { @@ -82,28 +54,15 @@ typedef struct actor_list { struct actor_list *next; } actor_list; -struct future -{ - pony_type_t *future_type; - encore_arg_t value; - pony_type_t *type; - bool fulfilled; - // Stupid limitation for now - actor_entry_t responsibilities[16]; - int no_responsibilities; - // Lock-based for now - pthread_mutex_t lock; - future_t *parent; - closure_entry_t *children; - actor_list *awaited_actors; -}; -static void future_block_actor(pony_ctx_t **ctx, future_t *fut); +static inline void future_block_actor(pony_ctx_t **ctx, future_t *fut); static void future_finalizer(future_t *fut); +static inline void future_chain(pony_ctx_t **ctx, future_t *fut, +closure_t *c, future_t *r); static inline void future_gc_send_value(pony_ctx_t *ctx, future_t *fut); static inline void future_gc_recv_value(pony_ctx_t *ctx, future_t *fut); -static void future_chain(pony_ctx_t **ctx, future_t *fut, pony_type_t *type, - closure_t *c, future_t *r); +static inline void chain_treiber_stack_push(closure_entry_t **head, closure_entry_t *newhead); +static inline closure_entry_t * chain_treiber_stack_pop(closure_entry_t **head); pony_type_t future_type = { .id = ID_FUTURE, @@ -116,16 +75,6 @@ pony_type_t *future_get_type(future_t *fut){ return fut->type; } -static void trace_closure_entry(pony_ctx_t *ctx, void *p) -{ - assert(p); - pony_trace(ctx, p); - closure_entry_t *c = (closure_entry_t*)p; - encore_trace_actor(ctx, c->actor); - encore_trace_object(ctx, c->future, &future_trace); - encore_trace_object(ctx, c->closure, &closure_trace); -} - void future_trace(pony_ctx_t *ctx, void* p) { (void) ctx; @@ -140,57 +89,38 @@ void future_trace(pony_ctx_t *ctx, void* p) // if (fut->parent) encore_trace_object(fut->parent, future_trace); } -static inline void future_gc_trace_value(pony_ctx_t *ctx, future_t *fut) -{ - assert(fut); - if (fut->type == ENCORE_ACTIVE) { - encore_trace_actor(ctx, fut->value.p); - } else if (fut->type != ENCORE_PRIMITIVE) { - encore_trace_object(ctx, fut->value.p, fut->type->trace); - } -} // =============================================================== // Create, inspect and fulfil // =============================================================== -// -// a future has a lock attached to it. actors may acquire the same lock -// multiple times (re-entrant locks). this does not happen when we work with plain Encore -// however, re-entrant locks are necessary for the ParT runtime library. -// -// Use case: -// By allowing re-entrant locks, one can create a promise that is fulfilled only -// when all futures in a ParT (function `party_promise_await_on_futures`) are -// fulfilled. all futures in a ParT get chained a closure that contains the promise -// to fulfil and the original ParT. this promise gets called only when all -// futures in the ParT have been fulfilled. when that happens, the closure -// fulfils the promise and runs some function on the original ParT. -// in pseudo-code: -// -// val par = liftf(Fut t) :: Par t -// var promise = new Promise -// val clos = \(..., env = [par, promise]) -> do something -// (forall f in (getFuts par), f ~~> clos) -// await(promise) -// -// this same code can be found in `party_promise_await_on_futures`. -// -future_t *future_mk(pony_ctx_t **ctx, pony_type_t *type) -{ - pony_ctx_t *cctx = *ctx; - assert(cctx->current); +static inline closure_entry_t * chain_treiber_stack_pop(closure_entry_t **head) { + closure_entry_t * newhead; + closure_entry_t * oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); + if (oldhead == NULL) { + return NULL; + } + newhead = oldhead->next; + while (!__sync_bool_compare_and_swap(head, oldhead, newhead)) { + oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); + if (oldhead == NULL) { + return NULL; + } + newhead = oldhead->next; + } + return oldhead; +} - future_t *fut = pony_alloc_final(cctx, sizeof(future_t)); - *fut = (future_t) { .future_type = &future_type, .type = type }; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&fut->lock, &attr); +static inline void chain_treiber_stack_push(closure_entry_t **head, closure_entry_t *newhead) { + closure_entry_t * oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); - ENC_DTRACE3(FUTURE_CREATE, (uintptr_t) ctx, (uintptr_t) fut, (uintptr_t) type); + newhead->next = oldhead; - return fut; + while (!__sync_bool_compare_and_swap(head, oldhead, newhead) ) { + oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); + newhead->next = oldhead; + } } static inline encore_arg_t run_closure(pony_ctx_t **ctx, closure_t *c, encore_arg_t value) @@ -202,81 +132,43 @@ bool future_fulfilled(future_t *fut) { perr("future_fulfilled"); bool r; - BLOCK; - r = fut->fulfilled; - UNBLOCK; + r = __atomic_load_n(&fut->fulfilled, __ATOMIC_SEQ_CST); return r; } -void future_fulfil(pony_ctx_t **ctx, future_t *fut, encore_arg_t value) +static inline void future_gc_trace_value(pony_ctx_t *ctx, future_t *fut) { - assert(fut->fulfilled == false); - ENC_DTRACE2(FUTURE_FULFIL_START, (uintptr_t) *ctx, (uintptr_t) fut); - - BLOCK; - // Update the modifiable context - fut->value = value; - fut->fulfilled = true; - - // Create pointer to a `pony_ctx_t * const` (in practice, PonyRT omits the `const`) - pony_ctx_t *cctx = *ctx; - future_gc_send_value(cctx, fut); - - for (int i = 0; i < fut->no_responsibilities; ++i) { - actor_entry_t e = fut->responsibilities[i]; - switch (e.type) { - case BLOCKED_MESSAGE: - perr("Unblocking"); - actor_set_resume((encore_actor_t*)e.message.actor); - pony_schedule(cctx, e.message.actor); - break; - case DETACHED_CLOSURE: - assert(0); - exit(-1); - } + assert(fut); + if (fut->type == ENCORE_ACTIVE) { + encore_trace_actor(ctx, fut->value.p); + } else if (fut->type != ENCORE_PRIMITIVE) { + encore_trace_object(ctx, fut->value.p, fut->type->trace); } +} - { - closure_entry_t *current = fut->children; - while(current) { - encore_arg_t result = run_closure(ctx, current->closure, value); - if (current->future) { - // This case happens when futures can be chained on. - // As an optimisation to the ParT library, we do know - // that certain functions in the ParT do not need to fulfil - // a future, e.g. the ParT optimised version is called - // `future_register_callback` and sets `current->future = NULL` - future_fulfil(ctx, current->future, result); - } - - cctx = *ctx; // ctx might have been changed - pony_gc_recv(cctx); - trace_closure_entry(cctx, current); - pony_recv_done(cctx); - - current = current->next; - } - } - { - actor_list *current = fut->awaited_actors; - while(current) { - pony_sendp(cctx, (pony_actor_t *)current->actor, _ENC__MSG_RESUME_AWAIT, - current->uctx); - - pony_gc_recv(cctx); - pony_trace(cctx, current); - encore_trace_actor(cctx, (pony_actor_t *)current->actor); - pony_recv_done(cctx); - - current = current->next; - } - } +static inline void future_finalizer(future_t *fut) +{ + pony_ctx_t* cctx = pony_ctx(); + future_gc_recv_value(cctx, fut); + ENC_DTRACE2(FUTURE_DESTROY, (uintptr_t) cctx, (uintptr_t) fut); +} - UNBLOCK; - ENC_DTRACE2(FUTURE_FULFIL_END, (uintptr_t) cctx, (uintptr_t) fut); +static inline void future_gc_send_value(pony_ctx_t *ctx, future_t *fut) +{ + pony_gc_send(ctx); + future_gc_trace_value(ctx, fut); + pony_send_done(ctx); } -static void acquire_future_value(pony_ctx_t **ctx, future_t *fut) +static inline void future_gc_recv_value(pony_ctx_t *ctx, future_t *fut) +{ + pony_gc_recv(ctx); + future_gc_trace_value(ctx, fut); + // note the asymmetry with send + ponyint_gc_handlestack(ctx); +} + +static inline void acquire_future_value(pony_ctx_t **ctx, future_t *fut) { pony_ctx_t *cctx = *ctx; encore_gc_acquire(cctx); @@ -284,73 +176,61 @@ static void acquire_future_value(pony_ctx_t **ctx, future_t *fut) pony_acquire_done(cctx); } -// =============================================================== -// Means for actors to get, block and chain -// =============================================================== -encore_arg_t future_get_actor(pony_ctx_t **ctx, future_t *fut) -{ - if (!fut->fulfilled) { - ENC_DTRACE2(FUTURE_BLOCK, (uintptr_t) *ctx, (uintptr_t) fut); - future_block_actor(ctx, fut); - ENC_DTRACE2(FUTURE_UNBLOCK, (uintptr_t) *ctx, (uintptr_t) fut); - } - - acquire_future_value(ctx, fut); - ENC_DTRACE2(FUTURE_GET, (uintptr_t) *ctx, (uintptr_t) fut); - - return fut->value; -} - -future_t *future_chain_actor(pony_ctx_t **ctx, future_t *fut, pony_type_t *type, - closure_t *c) +future_t *future_mk(pony_ctx_t **ctx, pony_type_t *type) { - ENC_DTRACE3(FUTURE_CHAINING, (uintptr_t) *ctx, (uintptr_t) fut, (uintptr_t) type); - future_t *r = future_mk(ctx, type); - future_chain(ctx, fut, type, c, r); - return r; -} + pony_ctx_t *cctx = *ctx; + assert(cctx->current); + future_t *fut = pony_alloc_final(cctx, sizeof(future_t), + (void *)&future_finalizer); + *fut = (future_t) { .type = type, .blocking_stack = NULL, .chain_stack = NULL }; + ENC_DTRACE3(FUTURE_CREATE, (uintptr_t) ctx, (uintptr_t) fut, (uintptr_t) type); + return fut; +} void future_chain_forward(pony_ctx_t **ctx, future_t *fut, pony_type_t *type, closure_t *c, future_t *r) { ENC_DTRACE3(FUTURE_CHAINING, (uintptr_t) *ctx, (uintptr_t) fut, (uintptr_t) type); (void)type; - future_chain(ctx, fut, type, c, r); + perr("future_chain_actor"); + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + acquire_future_value(ctx, fut); + value_t result = run_closure(ctx, c, fut->value); + future_fulfil(ctx, fut, result); + return; + } + future_chain(ctx, fut, c, r); return; } -static void future_chain(pony_ctx_t **ctx, future_t *fut, pony_type_t *type, - closure_t *c, future_t *r) +future_t* future_chain_actor(pony_ctx_t **ctx, future_t *fut, pony_type_t *type, + closure_t *c) { - (void)type; + ENC_DTRACE3(FUTURE_CHAINING, (uintptr_t) *ctx, (uintptr_t) fut, (uintptr_t) type); + future_t *r = future_mk(ctx, type); perr("future_chain_actor"); - BLOCK; - - if (fut->fulfilled) { + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { acquire_future_value(ctx, fut); value_t result = run_closure(ctx, c, fut->value); future_fulfil(ctx, r, result); - UNBLOCK; - return; + return r; } + future_chain(ctx, fut, c, r); + return r; +} +static inline void future_chain(pony_ctx_t **ctx, future_t *fut, closure_t *c, future_t *r) +{ pony_ctx_t* cctx = *ctx; - closure_entry_t *entry = encore_alloc(cctx, sizeof *entry); - entry->actor = (cctx)->current; - entry->future = r; - entry->closure = c; - entry->next = fut->children; - fut->children = entry; - - pony_gc_send(cctx); - trace_closure_entry(cctx, entry); - pony_send_done(cctx); - - UNBLOCK; - - r->parent = fut; + closure_entry_t *entry = POOL_ALLOC(closure_entry_t); + *entry = (closure_entry_t) { .actor = (cctx)->current, .future = r, .closure = c }; + chain_treiber_stack_push(&fut->chain_stack, entry); + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + future_discharge(ctx, fut); + } } + // Similar to `future_chain_actor` except that it returns void, avoiding the // creation of a new future. This is used in the ParTs library and is an // optimisation over the `future_chain_actor`. @@ -360,107 +240,140 @@ void future_register_callback(pony_ctx_t **ctx, { ENC_DTRACE2(FUTURE_REGISTER_CALLBACK, (uintptr_t) *ctx, (uintptr_t) fut); perr("future_chain_actor"); - BLOCK; - - if (fut->fulfilled) { + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { acquire_future_value(ctx, fut); - // the closure is in charge of fulfilling the promise that it contains. // if this is not the case, a deadlock situation may happen. run_closure(ctx, c, fut->value); - UNBLOCK; return ; } - pony_ctx_t* cctx = *ctx; - closure_entry_t *entry = encore_alloc(cctx, sizeof *entry); - entry->actor = (cctx)->current; - entry->future = NULL; - entry->closure = c; - entry->next = fut->children; - fut->children = entry; - - pony_gc_send(cctx); - trace_closure_entry(cctx, entry); - pony_send_done(cctx); - - UNBLOCK; + closure_entry_t *entry = POOL_ALLOC(closure_entry_t); + *entry = (closure_entry_t){ .actor = (cctx)->current, .future = NULL, .closure = c }; + chain_treiber_stack_push(&fut->chain_stack, entry); + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + future_discharge(ctx, fut); + } } +void future_await(pony_ctx_t **ctx, future_t *fut) +{ + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + return; + } + ucontext_t uctx; + future_tnode_info_t * pony_node = POOL_ALLOC(future_tnode_info_t); + *pony_node = (future_tnode_info_t) { .fut = (void*)fut, .awaited_uctx = &uctx, .isget = false, .fclass = FUTURE }; + actor_await(ctx, &uctx, (void *)pony_node); +} -static void future_block_actor(pony_ctx_t **ctx, future_t *fut) +encore_arg_t future_get_actor(pony_ctx_t **ctx, future_t *fut) { - perr("future_block_actor"); - pony_ctx_t* cctx = *ctx; - pony_actor_t *a = (cctx)->current; - BLOCK; + if (!__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + ENC_DTRACE2(FUTURE_BLOCK, (uintptr_t) *ctx, (uintptr_t) fut); + future_block_actor(ctx, fut); + ENC_DTRACE2(FUTURE_UNBLOCK, (uintptr_t) *ctx, (uintptr_t) fut); + } + ENC_DTRACE2(FUTURE_GET, (uintptr_t) *ctx, (uintptr_t) fut); + return fut->value; +} - if (fut->fulfilled) { - UNBLOCK; +static inline void future_block_actor(pony_ctx_t **ctx, future_t *fut) +{ + perr("future_block_actor"); + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { return; } - + pony_ctx_t* cctx = *ctx; + pony_actor_t *a = cctx->current; pony_unschedule(cctx, a); - assert(fut->no_responsibilities < 16); - fut->responsibilities[fut->no_responsibilities++] = (actor_entry_t) { .type = BLOCKED_MESSAGE, .message = (message_entry_t) { .actor = a } }; - encore_actor_t *actor = (encore_actor_t*) a; - - assert(actor->lock == NULL); - actor->lock = &fut->lock; - actor_block(ctx, actor); + future_tnode_info_t * pony_node = POOL_ALLOC(future_tnode_info_t); + *pony_node = (future_tnode_info_t) { .fut = (void*)fut, .awaited_uctx = NULL, .isget = true, .fclass = FUTURE }; + actor_block(ctx, actor, (void *)pony_node); } -// =============================================================== -// Possibly these functions do not belong in the future library -// =============================================================== +void future_fulfil(pony_ctx_t **ctx, future_t *fut, encore_arg_t value) +{ + assert(fut->fulfilled == false); + ENC_DTRACE2(FUTURE_FULFIL_START, (uintptr_t) *ctx, (uintptr_t) fut); + fut->value = value; + __atomic_store_n(&(fut->fulfilled), true, __ATOMIC_SEQ_CST); + future_gc_send_value(*ctx, fut); + future_discharge(ctx, fut); + ENC_DTRACE2(FUTURE_FULFIL_END, (uintptr_t) *ctx, (uintptr_t) fut); +} -void future_await(pony_ctx_t **ctx, future_t *fut) +void future_discharge(pony_ctx_t **ctx, future_t *fut) { - pony_ctx_t* cctx = *ctx; - encore_actor_t *actor = (encore_actor_t *)cctx->current; - BLOCK; - if (fut->fulfilled) { - UNBLOCK; - return; + future_tnode_t * a = NULL; + pony_ctx_t * cctx = * ctx; + encore_actor_t * ea; + + a = treiber_stack_pop(&fut->blocking_stack); + while (a!=NULL) { + if (a->isget) { + ea = (encore_actor_t *)a->actor; + perr("Unblocking"); + actor_set_resume(ea); + pony_schedule(cctx, a->actor); + } else { + pony_sendp(cctx, a->actor, _ENC__MSG_RESUME_AWAIT, a->awaited_uctx); + } + + POOL_FREE(future_tnode_t, a); + a = treiber_stack_pop(&fut->blocking_stack); } - ucontext_t uctx; - - actor_list *entry = encore_alloc(cctx, sizeof *entry); - entry->actor = actor; - entry->uctx = &uctx; - entry->next = fut->awaited_actors; - fut->awaited_actors = entry; + closure_entry_t *current = chain_treiber_stack_pop(&fut->chain_stack); + while(current) { + encore_arg_t result = run_closure(ctx, current->closure, fut->value); + if (current->future) { + future_fulfil(ctx, current->future, result); + } - pony_gc_send(cctx); - pony_trace(cctx, entry); - encore_trace_actor(cctx, (pony_actor_t *)entry->actor); - pony_send_done(cctx); + POOL_FREE(closure_entry_t, current); + current = chain_treiber_stack_pop(&fut->chain_stack); + } - assert(actor->lock == NULL); - actor->lock = &fut->lock; - actor_await(ctx, &uctx); } -static void future_finalizer(future_t *fut) -{ - pony_ctx_t* cctx = pony_ctx(); - future_gc_recv_value(cctx, fut); - ENC_DTRACE2(FUTURE_DESTROY, (uintptr_t) cctx, (uintptr_t) fut); -} +/** -static inline void future_gc_send_value(pony_ctx_t *ctx, future_t *fut) +Treiber stack implementation + +**/ + +future_tnode_t * treiber_stack_pop(future_tnode_t **head) { - pony_gc_send(ctx); - future_gc_trace_value(ctx, fut); - pony_send_done(ctx); + future_tnode_t * newhead; + future_tnode_t * oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); + if (oldhead == NULL) { + return NULL; + } + newhead = oldhead->next; + while (!__sync_bool_compare_and_swap(head, oldhead, newhead)) { + oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); + if (oldhead == NULL) { + return NULL; + } + newhead = oldhead->next; + } + + return oldhead; } -static inline void future_gc_recv_value(pony_ctx_t *ctx, future_t *fut) +void treiber_stack_push(future_tnode_t **head, pony_actor_t* a, ucontext_t *awaited_uctx, int isget) { - pony_gc_recv(ctx); - future_gc_trace_value(ctx, fut); - // note the asymmetry with send - ponyint_gc_handlestack(ctx); + future_tnode_t * newhead = POOL_ALLOC(future_tnode_t); //encore_alloc(cctx, sizeof(future_tnode_t)); + future_tnode_t * oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); + + *newhead = (future_tnode_t) { .actor = a, .awaited_uctx = awaited_uctx, .isget = isget, .next = oldhead }; + + while (!__sync_bool_compare_and_swap(head, oldhead, newhead) ) { + oldhead = __atomic_load_n(head, __ATOMIC_SEQ_CST); + newhead->next = oldhead; + } } + + diff --git a/src/runtime/future/future.h b/src/runtime/future/future.h index 505c0c360..9b41bd88b 100644 --- a/src/runtime/future/future.h +++ b/src/runtime/future/future.h @@ -2,7 +2,48 @@ #define __future_using_actors_h #include -#include "closure.h" +#include + +typedef enum future_class { + FUTURE, + VANILLA_FUTURE, + POLY_VANILLA_FUTURE, +} future_class_t ; + +typedef struct future_tnode { + pony_actor_t * actor; + ucontext_t *awaited_uctx; + struct future_tnode * next; + bool isget; +} future_tnode_t; + +typedef struct future_tnode_info { + void * fut; + ucontext_t * awaited_uctx; + future_class_t fclass; + bool isget; +} future_tnode_info_t; + +typedef struct closure_entry closure_entry_t; + +/* TREIBER STACK API */ + +void treiber_stack_push(future_tnode_t **head, pony_actor_t* a, ucontext_t *awaited_uctx, int futop); +future_tnode_t * treiber_stack_pop(future_tnode_t **head); + +/* STANDARD FUTURE API */ + +typedef struct future { + future_tnode_t *blocking_stack; + closure_entry_t *chain_stack; + const pony_type_t *type; + encore_arg_t value; + bool fulfilled; +} future_t; + +void handle_future(encore_actor_t *actor, pony_ctx_t * futctx, void * pony_node); + +void future_trace(pony_ctx_t *ctx, void* p); typedef struct future future_t; @@ -55,7 +96,8 @@ future_t *future_chain_actor(pony_ctx_t **ctx, future_t *fut, pony_type_t *type, closure_t *c); void future_chain_forward(pony_ctx_t **ctx, future_t *fut, pony_type_t *type, - closure_t *c, future_t *r); +closure_t *c, future_t *r); + /** Registers a callback and returns void * @@ -79,4 +121,37 @@ void future_register_callback(pony_ctx_t **ctx, * puts on hold the processing of this message. */ void future_await(pony_ctx_t **ctx, future_t *fut); + +extern pony_type_t future_type; + +void future_discharge(pony_ctx_t **ctx, future_t *fut); + +/* VANILLA FUTURE API */ + +typedef struct vanilla_future { + const pony_type_t *type; + pony_actor_t *actor; + encore_arg_t value; + bool fulfilled; + bool blocking; +} vanilla_future_t; + +encore_arg_t vanilla_future_get_actor(pony_ctx_t **ctx, vanilla_future_t *fut); +void vanilla_future_fulfil(pony_ctx_t **ctx, vanilla_future_t *fut, encore_arg_t value); +vanilla_future_t *vanilla_future_mk(pony_ctx_t **ctx, pony_type_t *type); + +/* POLY VANILLA FUTURE API */ + +typedef struct poly_vanilla_future { + future_tnode_t *blocking_stack; + const pony_type_t *type; + encore_arg_t value; + bool fulfilled; +} poly_vanilla_future_t; + +void poly_vanilla_future_discharge(pony_ctx_t **ctx, poly_vanilla_future_t *fut); +encore_arg_t poly_vanilla_future_get_actor(pony_ctx_t **ctx, poly_vanilla_future_t *fut); +void poly_vanilla_future_fulfil(pony_ctx_t **ctx, poly_vanilla_future_t *fut, encore_arg_t value); +poly_vanilla_future_t *poly_vanilla_future_mk(pony_ctx_t **ctx, pony_type_t *type); + #endif diff --git a/src/runtime/future/poly_vanilla_future.c b/src/runtime/future/poly_vanilla_future.c new file mode 100644 index 000000000..09e7f63a5 --- /dev/null +++ b/src/runtime/future/poly_vanilla_future.c @@ -0,0 +1,141 @@ +#define _XOPEN_SOURCE 800 +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "encore.h" +#include "future.h" +#include "../libponyrt/actor/messageq.h" +#include "../libponyrt/sched/scheduler.h" + +#define perr(m) // fprintf(stderr, "%s\n", m); + +extern void encore_future_gc_acquireactor(pony_ctx_t* ctx, pony_actor_t* actor); +extern void encore_future_gc_acquireobject(pony_ctx_t* ctx, void* p, + pony_type_t *t, int mutability); + +typedef struct actor_entry actor_entry_t; + +// Terminology: +// Producer -- the actor responsible for fulfilling a future +// Consumer -- an non-producer actor using a future + +static inline void poly_vanilla_future_block_actor(pony_ctx_t **ctx, poly_vanilla_future_t *fut); +static inline void poly_vanilla_future_finalizer(poly_vanilla_future_t *fut); +static inline void poly_vanilla_future_gc_send_value(pony_ctx_t *ctx, poly_vanilla_future_t *fut); +static inline void poly_vanilla_future_gc_recv_value(pony_ctx_t *ctx, poly_vanilla_future_t *fut); + + +// =============================================================== +// Create, inspect and fulfil +// =============================================================== + + +static inline void poly_vanilla_future_gc_trace_value(pony_ctx_t *ctx, poly_vanilla_future_t *fut) +{ + assert(fut); + if (fut->type == ENCORE_ACTIVE) { + encore_trace_actor(ctx, fut->value.p); + } else if (fut->type != ENCORE_PRIMITIVE) { + encore_trace_object(ctx, fut->value.p, fut->type->trace); + } +} + +static inline void poly_vanilla_future_finalizer(poly_vanilla_future_t *fut) +{ + pony_ctx_t* cctx = pony_ctx(); + poly_vanilla_future_gc_recv_value(cctx, fut); + ENC_DTRACE2(FUTURE_DESTROY, (uintptr_t) cctx, (uintptr_t) fut); +} + +static inline void poly_vanilla_future_gc_send_value(pony_ctx_t *ctx, poly_vanilla_future_t *fut) +{ + pony_gc_send(ctx); + poly_vanilla_future_gc_trace_value(ctx, fut); + pony_send_done(ctx); +} + +static inline void poly_vanilla_future_gc_recv_value(pony_ctx_t *ctx, poly_vanilla_future_t *fut) +{ + pony_gc_recv(ctx); + poly_vanilla_future_gc_trace_value(ctx, fut); + // note the asymmetry with send + ponyint_gc_handlestack(ctx); +} + +/** + +poly future methods + +**/ + +poly_vanilla_future_t *poly_vanilla_future_mk(pony_ctx_t **ctx, pony_type_t *type) +{ + pony_ctx_t *cctx = *ctx; + assert(cctx->current); + poly_vanilla_future_t *fut = pony_alloc_final(cctx, sizeof(poly_vanilla_future_t), + (void *)&poly_vanilla_future_finalizer); + *fut = (poly_vanilla_future_t) { .type = type, .blocking_stack = NULL }; + ENC_DTRACE3(FUTURE_CREATE, (uintptr_t) ctx, (uintptr_t) fut, (uintptr_t) type); + return fut; +} + +encore_arg_t poly_vanilla_future_get_actor(pony_ctx_t **ctx, poly_vanilla_future_t *fut) +{ + if (!__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + ENC_DTRACE2(FUTURE_BLOCK, (uintptr_t) *ctx, (uintptr_t) fut); + poly_vanilla_future_block_actor(ctx, fut); + ENC_DTRACE2(FUTURE_UNBLOCK, (uintptr_t) *ctx, (uintptr_t) fut); + } + ENC_DTRACE2(FUTURE_GET, (uintptr_t) *ctx, (uintptr_t) fut); + return fut->value; +} + +static inline void poly_vanilla_future_block_actor(pony_ctx_t **ctx, poly_vanilla_future_t *fut) +{ + perr("future_block_actor"); + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + return; + } + pony_ctx_t* cctx = *ctx; + pony_actor_t *a = cctx->current; + pony_unschedule(cctx, a); + encore_actor_t *actor = (encore_actor_t*) a; + future_tnode_info_t * pony_node = POOL_ALLOC(future_tnode_info_t); + *pony_node = (future_tnode_info_t) { .fut = (void*)fut, .fclass = POLY_VANILLA_FUTURE }; + actor_block(ctx, actor, (void *)pony_node); +} + +void poly_vanilla_future_fulfil(pony_ctx_t **ctx, poly_vanilla_future_t *fut, encore_arg_t value) +{ + assert(fut->fulfilled == false); + ENC_DTRACE2(FUTURE_FULFIL_START, (uintptr_t) *ctx, (uintptr_t) fut); + fut->value = value; + __atomic_store_n(&(fut->fulfilled), true, __ATOMIC_SEQ_CST); + poly_vanilla_future_gc_send_value(*ctx, fut); + poly_vanilla_future_discharge(ctx, fut); + ENC_DTRACE2(FUTURE_FULFIL_END, (uintptr_t) *ctx, (uintptr_t) fut); +} + +void poly_vanilla_future_discharge(pony_ctx_t **ctx, poly_vanilla_future_t *fut) { + future_tnode_t * a = NULL; + pony_ctx_t * cctx = * ctx; + encore_actor_t * ea; + a = treiber_stack_pop(&fut->blocking_stack); + while (a!=NULL) { + ea = (encore_actor_t *)a->actor; + perr("Unblocking"); + actor_set_resume(ea); + pony_schedule(cctx, a->actor); + POOL_FREE(future_tnode_t, a); + a = treiber_stack_pop(&fut->blocking_stack); + } +} + diff --git a/src/runtime/future/vanilla_future.c b/src/runtime/future/vanilla_future.c new file mode 100644 index 000000000..b6936820d --- /dev/null +++ b/src/runtime/future/vanilla_future.c @@ -0,0 +1,168 @@ +#define _XOPEN_SOURCE 800 +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "encore.h" +#include "future.h" +#include "../libponyrt/actor/messageq.h" +#include "../libponyrt/sched/scheduler.h" + +#define perr(m) // fprintf(stderr, "%s\n", m); + +/* + The most minified future API possible in encore. + +*/ + +extern bool pony_system_actor(pony_actor_t *actor); +static inline void vanilla_future_discharge(pony_ctx_t **ctx, vanilla_future_t * fut); +static void vanilla_future_finalizer(vanilla_future_t *fut); +static inline void vanilla_future_gc_send_value(pony_ctx_t *ctx, vanilla_future_t *fut); +static inline void vanilla_future_gc_recv_value(pony_ctx_t *ctx, vanilla_future_t *fut); +static inline void vanilla_future_block_actor(pony_ctx_t **ctx, vanilla_future_t *fut); +extern void encore_future_gc_acquireactor(pony_ctx_t* ctx, pony_actor_t* actor); +extern void encore_future_gc_acquireobject(pony_ctx_t* ctx, void* p, + pony_type_t *t, int mutability); + +static void vanilla_future_finalizer(vanilla_future_t *fut) +{ + pony_ctx_t* cctx = pony_ctx(); + vanilla_future_gc_recv_value(cctx, fut); + ENC_DTRACE2(FUTURE_DESTROY, (uintptr_t) cctx, (uintptr_t) fut); +} + +pony_type_t *vanilla_future_get_type(vanilla_future_t *fut){ + return fut->type; +} + +bool vanilla_future_fulfilled(vanilla_future_t *fut) +{ + perr("future_fulfilled"); + bool r; + r = __atomic_load_n(&fut->fulfilled, __ATOMIC_SEQ_CST); + return r; +} + +static inline void vanilla_future_gc_trace_value(pony_ctx_t *ctx, vanilla_future_t *fut) +{ + assert(fut); + if (fut->type == ENCORE_ACTIVE) { + encore_trace_actor(ctx, fut->value.p); + } else if (fut->type != ENCORE_PRIMITIVE) { + encore_trace_object(ctx, fut->value.p, fut->type->trace); + } +} + +static inline void vanilla_future_gc_send_value(pony_ctx_t *ctx, vanilla_future_t *fut) +{ + pony_gc_send(ctx); + vanilla_future_gc_trace_value(ctx, fut); + pony_send_done(ctx); +} + +static inline void vanilla_future_gc_recv_value(pony_ctx_t *ctx, vanilla_future_t *fut) +{ + pony_gc_recv(ctx); + vanilla_future_gc_trace_value(ctx, fut); + ponyint_gc_handlestack(ctx); +} + +vanilla_future_t *vanilla_future_mk(pony_ctx_t **ctx, pony_type_t *type) +{ + pony_ctx_t *cctx = *ctx; + assert(cctx->current); + vanilla_future_t *fut = pony_alloc_final(cctx, sizeof(future_t), + (void *)&vanilla_future_finalizer); + *fut = (vanilla_future_t) { .type = type, .blocking = false }; + ENC_DTRACE3(FUTURE_CREATE, (uintptr_t) ctx, (uintptr_t) fut, (uintptr_t) type); + return fut; +} + +encore_arg_t vanilla_future_get_actor(pony_ctx_t **ctx, vanilla_future_t *fut) +{ + if (!__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + ENC_DTRACE2(FUTURE_BLOCK, (uintptr_t) *ctx, (uintptr_t) fut); + vanilla_future_block_actor(ctx, fut); + ENC_DTRACE2(FUTURE_UNBLOCK, (uintptr_t) *ctx, (uintptr_t) fut); + } + ENC_DTRACE2(FUTURE_GET, (uintptr_t) *ctx, (uintptr_t) fut); + return fut->value; +} + +static inline void vanilla_future_block_actor(pony_ctx_t **ctx, vanilla_future_t *fut) +{ + perr("future_block_actor"); + if (__atomic_load_n(&(fut->fulfilled), __ATOMIC_SEQ_CST)) { + return; + } + pony_ctx_t* cctx = *ctx; + pony_actor_t *a = cctx->current; + pony_unschedule(cctx, a); + encore_actor_t *actor = (encore_actor_t*) a; + future_tnode_info_t * pony_node = POOL_ALLOC(future_tnode_info_t); + *pony_node = (future_tnode_info_t) { .fclass = VANILLA_FUTURE, .fut = (void*)fut }; + actor_block(ctx, actor, (void *)pony_node); +} + +void vanilla_future_fulfil(pony_ctx_t **ctx, vanilla_future_t *fut, encore_arg_t value) +{ + assert(fut->fulfilled == false); + ENC_DTRACE2(FUTURE_FULFIL_START, (uintptr_t) *ctx, (uintptr_t) fut); + fut->value = value; + __atomic_store_n(&(fut->fulfilled), true, __ATOMIC_SEQ_CST); + vanilla_future_gc_send_value(*ctx, fut); + vanilla_future_discharge(ctx, fut); + ENC_DTRACE2(FUTURE_FULFIL_END, (uintptr_t) *ctx, (uintptr_t) fut); +} + +static inline void vanilla_future_discharge(pony_ctx_t **ctx, vanilla_future_t *fut) { + if (__sync_bool_compare_and_swap(&(fut->blocking), true, false)) { + perr("Unblocking"); + actor_set_resume((encore_actor_t *)fut->actor); + pony_schedule(*ctx, fut->actor); + } +} + +void handle_future(encore_actor_t *actor, pony_ctx_t *futctx, void * pony_node) +{ + if (!pony_system_actor((pony_actor_t*) actor)) { + if ( pony_node != NULL ) { + future_tnode_info_t * pinfo = (future_tnode_info_t *)pony_node; + if (pinfo->fclass == VANILLA_FUTURE ) { + vanilla_future_t * fut = (vanilla_future_t *)pinfo->fut; + fut->actor = (pony_actor_t *)actor; + __atomic_store_n(&fut->blocking, true, __ATOMIC_SEQ_CST); + if (__atomic_load_n(&fut->fulfilled, __ATOMIC_SEQ_CST)) { + vanilla_future_discharge(&futctx, fut); + } + POOL_FREE(future_tnode_info_t, pinfo); + } else if ( pinfo->fclass == POLY_VANILLA_FUTURE ) { + poly_vanilla_future_t * fut = (poly_vanilla_future_t *)pinfo->fut; + treiber_stack_push(&fut->blocking_stack, (pony_actor_t*)actor, NULL, 0); + POOL_FREE(future_tnode_info_t, (future_tnode_info_t *)pony_node); + if (__atomic_load_n(&fut->fulfilled, __ATOMIC_SEQ_CST)) { + poly_vanilla_future_discharge(&futctx, fut); + } + } else if ( pinfo->fclass == FUTURE ) { + future_t * fut = (future_t *)pinfo->fut; + treiber_stack_push(&fut->blocking_stack, (pony_actor_t*)actor, pinfo->awaited_uctx, pinfo->isget); + POOL_FREE(future_tnode_info_t, (future_tnode_info_t *)pony_node); + if (__atomic_load_n(&fut->fulfilled, __ATOMIC_SEQ_CST)) { + future_discharge(&futctx, fut); + } + } + } + } +} + + + + diff --git a/src/runtime/pony/libponyrt/sched/scheduler.c b/src/runtime/pony/libponyrt/sched/scheduler.c index b6c52a17b..9f198b610 100644 --- a/src/runtime/pony/libponyrt/sched/scheduler.c +++ b/src/runtime/pony/libponyrt/sched/scheduler.c @@ -377,13 +377,13 @@ static void __attribute__ ((noreturn)) jump_origin() } __attribute__ ((noreturn)) -void public_run(pony_actor_t *actor) +void public_run(pony_actor_t *actor, void * info_node) { assert(this_scheduler); if (pony_reschedule(actor)) { push(this_scheduler, actor); } - actor_unlock((encore_actor_t *)actor); + handle_future((encore_actor_t *)actor, &(this_scheduler->ctx), info_node); run(this_scheduler); __atomic_fetch_add(&context_waiting, 1, __ATOMIC_RELAXED); diff --git a/src/runtime/pony/premake4.lua b/src/runtime/pony/premake4.lua index 3ac7a0d9e..e580532a8 100644 --- a/src/runtime/pony/premake4.lua +++ b/src/runtime/pony/premake4.lua @@ -231,7 +231,7 @@ project "future" c_lib() links { "closure", "array" } files { - "../future/future.c", + "../future/**.c", } project "stream"