Skip to content

Bug fixing and optimizations#53

Open
rapimo wants to merge 2 commits into
masterfrom
bug_fix
Open

Bug fixing and optimizations#53
rapimo wants to merge 2 commits into
masterfrom
bug_fix

Conversation

@rapimo

@rapimo rapimo commented Jun 11, 2026

Copy link
Copy Markdown
Member

@mrimbault found that librdkafka's enable.partition.eof defaults to false since librdkafka 0.11, so the consumer
never emitted RD_KAFKA_RESP_ERR__PARTITION_EOF
plus some minor bugfixes

kafka_fdw maintainer added 2 commits June 11, 2026 17:01
partion_member(): fix off-by-one in binary search (last was
partition_cnt instead of partition_cnt - 1) causing a heap
over-read past the partitions array.

kafkaAcquireSampleRowsFunc(): bound sample collection by targrows.
The loop wrote rows[cnt++] without a bound check, overflowing the
caller-provided array on large topics during ANALYZE. The previous
Assert(cnt <= targrows) was off-by-one and a no-op in release builds.

kafkaExecForeignInsert(): guard slot_getattr() with
partition_attnum != -1; inserting into a table without a partition
column passed attnum -1 (a system attribute of a virtual slot) and
crashed.

KafkaFdwGetConnection(): destroy conf on rd_kafka_new() failure and
topic_conf on topic_conf_set() failure to avoid leaks on error paths.

kafkaBeginForeignModify(): destroy the producer before elog(ERROR);
the previous rd_kafka_destroy(rk) after elog was unreachable and
leaked the producer when topic creation failed.

get_kafka_fdw_attribute_options(): count num_parse_col only for
non-dropped attributes; dropped columns inflated the field count and
broke strict-mode parsing after DROP COLUMN.

kafka_get_watermarks(): wrap result values in Int32GetDatum/
Int64GetDatum for correctness on 32-bit / !FLOAT8PASSBYVAL builds.
librdkafka's enable.partition.eof defaults to false, so the consumer
never emitted RD_KAFKA_RESP_ERR__PARTITION_EOF and the existing EOF
handling in kafkaIterateForeignScan was effectively dead code. The scan
end then relied solely on rd_kafka_consume_batch() returning an empty
batch within buffer_delay (100ms default). A slow or still in-flight
fetch also yields an empty batch, so the scan could declare a partition
finished prematurely and silently skip unread messages, producing
incomplete, timing-dependent query results.

connection.c: enable enable.partition.eof on the consumer so the
explicit PARTITION_EOF marker is delivered.

kafka_fdw.c: treat an empty batch as "no data yet", not EOF. Retry the
same partition up to KAFKA_MAX_EMPTY_POLLS (3) consecutive empty polls
before giving up; rely on the explicit PARTITION_EOF message for the
authoritative end of a partition. The retry cap bounds the loop in case
a broker becomes unresponsive.
@rapimo rapimo requested a review from mrimbault June 11, 2026 17:38

@mrimbault mrimbault left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @rapimo
All look proper to me, and the commit messages and comments are nice.

I checked a bit the new behavior vs master using this ad-hoc script:

CREATE FOREIGN TABLE eof_test (
    part int    OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int  int,
    some_text text,
    some_date date,
    some_time timestamp
)
SERVER kafka_server OPTIONS (format 'csv', topic 'contrib_regress', buffer_delay '100');

-- get initial number of messages
SELECT sum(offset_high - offset_low)::bigint AS broker_message_count
  FROM kafka_get_watermarks('eof_test');

-- baseline: default 100ms timeout
SELECT count(*) AS scanned_delay_100 FROM eof_test;

-- very low timeout
ALTER FOREIGN TABLE eof_test OPTIONS (SET buffer_delay '2');
SELECT count(*) AS scanned_delay_2 FROM eof_test;

-- excessively low timeout
ALTER FOREIGN TABLE eof_test OPTIONS (SET buffer_delay '1');
SELECT count(*) AS scanned_delay_1 FROM eof_test;

DROP FOREIGN TABLE eof_test;

On master, both the 2ms and 1ms give me 0 rows fetched.
On bug_fix, the 2ms gives me all rows \o/
1ms still gives me 0 rows, which is not really that surprising (that's also why I suggest further below to maybe add a sending a message in case we hit that case).

But bottom line is, as far as I can tell this change covers the bug already well, and more than that.

Comment thread src/kafka_fdw.c
Comment on lines +834 to +835
if (++empty_polls < KAFKA_MAX_EMPTY_POLLS)
continue; /* retry the same partition */

@mrimbault mrimbault Jun 16, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could also write a NOTICE message here if we ever reach KAFKA_MAX_EMPTY_POLLS, just so the SQL client gets some hint that we gave up trying, and distinguish from a normal EOF that could be otherwise tricky to debug?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants