Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion ext/duckdb/table_function.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ static VALUE rbduckdb_table_function_set_init(VALUE self);
static void table_function_init_callback(duckdb_init_info info);
static VALUE rbduckdb_table_function_set_execute(VALUE self);
static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output);
#ifdef HAVE_DUCKDB_H_GE_V1_5_0
/* Thread detection (declared in function_executor.c); used to skip the proxy on Ruby threads. */
extern int ruby_native_thread_p(void);
static void table_function_local_init_callback(duckdb_init_info info);
#endif

static const rb_data_type_t table_function_data_type = {
"DuckDB/TableFunction",
Expand Down Expand Up @@ -358,6 +363,10 @@ static VALUE rbduckdb_table_function_set_execute(VALUE self) {

ctx->execute_proc = rb_block_proc();
duckdb_table_function_set_function(ctx->table_function, table_function_execute_callback);
#ifdef HAVE_DUCKDB_H_GE_V1_5_0
/* Per-worker proxy threads for the execute path (DuckDB >= 1.5.0). */
duckdb_table_function_set_local_init(ctx->table_function, table_function_local_init_callback);
#endif

rbduckdb_function_executor_ensure_started();

Expand Down Expand Up @@ -405,6 +414,7 @@ static void execute_execute_callback_protected(void *user_data) {
static void table_function_execute_callback(duckdb_function_info info, duckdb_data_chunk output) {
rubyDuckDBTableFunction *ctx;
struct execute_dispatch_arg darg;
struct worker_proxy *proxy = NULL;

ctx = (rubyDuckDBTableFunction *)duckdb_function_get_extra_info(info);
if (!ctx || ctx->execute_proc == Qnil) return;
Expand All @@ -413,8 +423,64 @@ static void table_function_execute_callback(duckdb_function_info info, duckdb_da
darg.info = info;
darg.output = output;

rbduckdb_function_executor_dispatch(execute_execute_callback_protected, &darg);
#ifdef HAVE_DUCKDB_H_GE_V1_5_0
/* On DuckDB >= 1.5.0 each worker thread carries its own proxy (see local_init). */
proxy = (struct worker_proxy *)duckdb_function_get_local_init_data(info);
#endif
rbduckdb_function_executor_dispatch_via_proxy(execute_execute_callback_protected, &darg, proxy);
}

#ifdef HAVE_DUCKDB_H_GE_V1_5_0
/*
* Per-worker init for the execute path (DuckDB >= 1.5.0).
*
* DuckDB calls this once on each worker thread that will run the execute
* callback. We create a per-worker proxy (allocating its Ruby thread under the
* GVL via the global executor, since this runs on a non-Ruby thread) and store
* it as thread-local init data. The execute callback then dispatches through it
* instead of the shared global executor, so workers run callbacks concurrently.
* DuckDB invokes rbduckdb_worker_proxy_destroy when the local state is freed.
*/
struct create_proxy_callback_arg {
struct worker_proxy *proxy;
};

static VALUE create_proxy_callback(VALUE varg) {
struct create_proxy_callback_arg *arg = (struct create_proxy_callback_arg *)varg;
arg->proxy = rbduckdb_worker_proxy_create();
return Qnil;
}

/*
* rbduckdb_worker_proxy_create may raise (NoMemError, Thread.new failure),
* and the executor runs callbacks unprotected — a raise would longjmp past
* its done-signaling and block the waiting DuckDB worker forever. Swallow
* the exception instead: the proxy stays NULL, local_init sets no state, and
* the execute callback falls back to the global executor.
*/
static void create_proxy_callback_protected(void *user_data) {
int exception_state;

rb_protect(create_proxy_callback, (VALUE)user_data, &exception_state);
if (exception_state) {
rb_set_errinfo(Qnil);
}
}

static void table_function_local_init_callback(duckdb_init_info info) {
struct create_proxy_callback_arg arg;

/* A Ruby calling thread runs the callback inline (Case 1/2); no proxy needed. */
if (ruby_native_thread_p()) return;

arg.proxy = NULL;
rbduckdb_function_executor_dispatch(create_proxy_callback_protected, &arg);

if (arg.proxy != NULL) {
duckdb_init_set_init_data(info, arg.proxy, rbduckdb_worker_proxy_destroy);
}
}
#endif

rubyDuckDBTableFunction *get_struct_table_function(VALUE self) {
rubyDuckDBTableFunction *ctx;
Expand Down
88 changes: 76 additions & 12 deletions sample/issue1136.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

# GH-1136: per-worker proxy threads for scalar UDF callbacks (DuckDB >= 1.5.0).
# GH-1136: per-worker proxy threads for scalar and table UDF callbacks
# (DuckDB >= 1.5.0).
#
# Each DuckDB worker thread gets its own dedicated Ruby proxy thread, so UDF
# callbacks from different workers run concurrently instead of serializing on
Expand All @@ -11,10 +12,14 @@
#
# Note: pure-CPU Ruby callbacks stay effectively serialized by the GVL; the
# throughput win is specific to callbacks that release it (e.g. on I/O,
# simulated here with sleep). A large base table is used so the morsel-driven
# scan actually distributes work across workers.
# simulated here with sleep). The scalar section uses a large base table so
# the morsel-driven scan actually distributes work across workers; the table
# section sets both planner hints (set_cardinality + max_threads) for the
# same reason.
require 'duckdb'

# --- scalar UDF -------------------------------------------------------------

ROWS = 500_000
SLEEP_EVERY = 1_000 # simulate I/O on every Nth value
SLEEP_SEC = 0.002
Expand All @@ -32,29 +37,88 @@ def register_slow_triple(con, threads_seen)
con.register_scalar_function(sf)
end

def timed_sum(con)
def timed_sum(con, query)
started = Process.clock_gettime(Process::CLOCK_MONOTONIC)
sum = con.execute('SELECT SUM(slow_triple(value)) FROM t').first.first
sum = con.execute(query).first.first
[Process.clock_gettime(Process::CLOCK_MONOTONIC) - started, sum]
end

def measure(threads)
def measure_scalar(threads)
db = DuckDB::Database.open
con = db.connect
con.execute("SET threads=#{threads}")
con.execute("CREATE TABLE t AS SELECT range::INTEGER AS value FROM range(#{ROWS})")
threads_seen = {}
register_slow_triple(con, threads_seen)
elapsed, sum = timed_sum(con)
elapsed, sum = timed_sum(con, 'SELECT SUM(slow_triple(value)) FROM t')
con.close
db.close
[elapsed, threads_seen.size, sum]
end

# --- table UDF --------------------------------------------------------------

CHUNKS = 200
ROWS_PER_CHUNK = 50
CHUNK_SLEEP_SEC = 0.005 # simulate I/O per emitted chunk

def emit_chunk(output)
ROWS_PER_CHUNK.times { |i| output.set_value(0, i, 1) }
output.size = ROWS_PER_CHUNK
sleep(CHUNK_SLEEP_SEC)
end

def emitter_bind_block
proc do |bind_info|
bind_info.add_result_column('v', DuckDB::LogicalType::BIGINT)
# Tell the planner there is real work so it distributes across workers.
bind_info.set_cardinality(CHUNKS * ROWS_PER_CHUNK, false)
end
end

def emitter_execute_block(threads_seen)
remaining = CHUNKS
mutex = Mutex.new
proc do |_info, output|
threads_seen[Thread.current] = true
has_work = mutex.synchronize { (remaining -= 1) >= 0 }
has_work ? emit_chunk(output) : output.size = 0
end
end

def register_slow_emitter(con, threads_seen)
tf = DuckDB::TableFunction.new
tf.name = 'slow_emitter'
tf.bind(&emitter_bind_block)
# Without max_threads DuckDB assigns a single worker and the proxy never fires.
tf.init { |init_info| init_info.max_threads = 4 }
tf.execute(&emitter_execute_block(threads_seen))
con.register_table_function(tf)
end

def measure_table(threads)
db = DuckDB::Database.open
con = db.connect
con.execute("SET threads=#{threads}")
threads_seen = {}
register_slow_emitter(con, threads_seen)
elapsed, sum = timed_sum(con, 'SELECT SUM(v) FROM slow_emitter()')
con.close
db.close
[elapsed, threads_seen.size, sum]
end

expected = (ROWS - 1) * ROWS / 2 * 3
[1, 4].each do |threads|
elapsed, distinct, sum = measure(threads)
raise "wrong result: #{sum} (expected #{expected})" unless sum == expected
# --- run both ---------------------------------------------------------------

puts "SET threads=#{threads}: #{elapsed.round(3)}s, callbacks ran on #{distinct} distinct Ruby thread(s)"
def report(label, expected)
puts "#{label}:"
[1, 4].each do |threads|
elapsed, distinct, sum = yield(threads)
raise "wrong result: #{sum} (expected #{expected})" unless sum == expected

puts " SET threads=#{threads}: #{elapsed.round(3)}s, callbacks ran on #{distinct} distinct Ruby thread(s)"
end
end

report('scalar UDF (slow_triple)', (ROWS - 1) * ROWS / 2 * 3) { |threads| measure_scalar(threads) }
report('table UDF (slow_emitter)', CHUNKS * ROWS_PER_CHUNK) { |threads| measure_table(threads) }
65 changes: 65 additions & 0 deletions test/duckdb_test/table_function_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,71 @@ def test_symbol_columns
db.close
end

# Per-worker proxy: exercises the local_init -> proxy -> destroy lifecycle
# under real multi-threaded execution (SET threads=4) and asserts the
# proxy path actually fired: execute callbacks must run on more than two
# distinct Ruby threads. Without per-worker proxies that count can never
# exceed two (the calling thread plus the single global executor), so this
# fails on the old implementation. Simultaneity assertions (max
# concurrency) are avoided as scheduler-dependent; sample/issue1136.rb
# demonstrates the throughput win for the scalar twin of this mechanism.
# Requires DuckDB >= 1.5.0 (duckdb_table_function_set_local_init).
def test_execute_runs_on_per_worker_proxy_threads
if ::DuckDBTest.duckdb_library_version < Gem::Version.new('1.5.0')
skip 'per-worker proxy requires DuckDB >= 1.5.0'
end

chunks = 64
rows_per_chunk = 100
remaining = chunks
mutex = Mutex.new
threads_seen = {}

db = DuckDB::Database.open
conn = db.connect
conn.execute('SET threads=4')

tf = DuckDB::TableFunction.new
tf.name = 'parallel_emitter'
tf.bind do |bind_info|
bind_info.add_result_column('v', DuckDB::LogicalType::BIGINT)
# Tell the planner there is real work so it distributes across workers.
bind_info.set_cardinality(chunks * rows_per_chunk, false)
end
tf.init do |init_info|
# Without this DuckDB assigns a single worker and the proxy never fires.
init_info.max_threads = 4
end
tf.execute do |_info, output|
threads_seen[Thread.current] = true
has_work = mutex.synchronize do
next false if remaining.zero?

remaining -= 1
true
end

unless has_work
output.size = 0
next
end

rows_per_chunk.times { |i| output.set_value(0, i, 1) }
output.size = rows_per_chunk
sleep 0.001 # release the GVL so workers can overlap
end

conn.register_table_function(tf)
result = conn.query('SELECT COUNT(*), SUM(v) FROM parallel_emitter()').each.to_a

assert_equal [chunks * rows_per_chunk, chunks * rows_per_chunk], result.first
assert_operator threads_seen.size, :>, 2,
'expected callbacks on per-worker proxy threads, not just caller + global executor'

conn.disconnect
db.close
end

private

def setup_incomplete_function
Expand Down
Loading