Skip to content

High level consumer#54

Open
rapimo wants to merge 3 commits into
bug_fixfrom
high_level_consumer
Open

High level consumer#54
rapimo wants to merge 3 commits into
bug_fixfrom
high_level_consumer

Conversation

@rapimo

@rapimo rapimo commented Jun 11, 2026

Copy link
Copy Markdown
Member

Move the foreign-scan read path off the deprecated simple/legacy consumer onto the high-level consumer

kafka_fdw maintainer added 3 commits June 11, 2026 22:18
Move the foreign-scan read path off the deprecated simple/legacy consumer
(rd_kafka_consume_start/_batch/_stop) onto the high-level consumer driven
purely by rd_kafka_assign() + rd_kafka_consumer_poll(). No consumer group
is joined: group.id is left unset and enable.auto.commit is forced to
false, which librdkafka permits per KIP-289. This keeps the existing
explicit per-partition/offset scan semantics while using the maintained
API and pairs naturally with enable.partition.eof.

connection.c: new KafkaFdwGetConsumer() that configures bootstrap.servers,
enable.auto.commit=false, enable.partition.eof=true (no group.id), creates
an RD_KAFKA_CONSUMER, calls rd_kafka_poll_set_consumer(), and keeps a topic
handle only for metadata lookups.

kafka_fdw.c:
  - kafkaBeginForeignScan uses KafkaFdwGetConsumer.
  - kafkaStart assigns a single partition with an explicit start offset via
    rd_kafka_assign() instead of rd_kafka_consume_start().
  - kafkaStop clears the assignment via rd_kafka_assign(rk, NULL) instead of
    rd_kafka_consume_stop().
  - kafkaIterateForeignScan fetches one message via rd_kafka_consumer_poll()
    instead of rd_kafka_consume_batch(), reusing the existing buffer and the
    empty-poll retry / PARTITION_EOF logic.

The ANALYZE sampling path (kafkaAcquireSampleRowsFunc) and kafka_get_watermarks
deliberately keep using the legacy connection/simple consumer to limit the
blast radius of this change; they can be migrated separately.

NOTE: untested - this rewrites the consumer core and must be built and tested
against a real broker before merging.
The high-level consumer's rd_kafka_assign() requires group.id to be
configured; without it librdkafka fails the assign with "Local: Unknown
group" (RD_KAFKA_RESP_ERR__UNKNOWN_GROUP). The KIP-289 relaxation that
allows omitting group.id when enable.auto.commit=false is not honored by
all librdkafka versions for the assign() path.

Configure a fixed group.id ("kafka_fdw") together with
enable.auto.commit=false. Since the FDW only uses rd_kafka_assign()
(never subscribe()) and never commits offsets, no consumer group is
actually joined: no rebalancing and no coordinator traffic occur.
The migration to rd_kafka_consumer_poll() (one message per call) broke
enforcement of the requested upper offset bound (offset_lim). That check
lived in the top-of-function branch that only runs for messages still
buffered from a previous rd_kafka_consume_batch(); with one-message-at-a-
time polling the buffer is always drained, so the check never fired and
the scan read the entire partition up to PARTITION_EOF (e.g. a query with
OFFSET <= 1 returned the whole partition and relied on the executor
filter, visible as a large "Rows Removed by Filter" in EXPLAIN ANALYZE).

Apply the same offset_lim test to the freshly polled message in the poll
loop, mirroring the existing PARTITION_EOF handling: when the message is
past offset_lim, advance to the next partition via kafkaNext() (which
drains and destroys the over-limit message) instead of returning it.
@rapimo rapimo requested a review from mrimbault June 11, 2026 20:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant