Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ KafkaFdwGetConnection(KafkaOptions *k_options,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
elog(ERROR, "%s\n", errstr);

/*
* Emit an explicit RD_KAFKA_RESP_ERR__PARTITION_EOF marker when a
* partition is exhausted. This defaults to false in librdkafka, in which
* case the only end-of-partition signal would be an empty batch within the
* poll timeout - which is unreliable because a slow/in-flight fetch also
* yields an empty batch and would make us skip unread messages.
*/
if (rd_kafka_conf_set(conf, "enable.partition.eof", "true",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
elog(ERROR, "%s\n", errstr);

*kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, KAFKA_MAX_ERR_MSG);

if (*kafka_handle != NULL)
Expand All @@ -29,9 +40,12 @@ KafkaFdwGetConnection(KafkaOptions *k_options,

if (rd_kafka_topic_conf_set(topic_conf, "auto.commit.enable", "false", errstr, KAFKA_MAX_ERR_MSG) !=
RD_KAFKA_CONF_OK)
{
rd_kafka_topic_conf_destroy(topic_conf);
ereport(
ERROR,
(errcode(ERRCODE_FDW_ERROR), errmsg_internal("kafka_fdw: Unable to create topic %s", k_options->topic)));
}

*kafka_topic_handle = rd_kafka_topic_new(*kafka_handle, k_options->topic, topic_conf);
if (!*kafka_topic_handle)
Expand All @@ -43,6 +57,11 @@ KafkaFdwGetConnection(KafkaOptions *k_options,
}
else
{
/*
* On failure rd_kafka_new() does NOT take ownership of conf, so we
* must free it ourselves to avoid leaking it.
*/
rd_kafka_conf_destroy(conf);
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION),
errmsg_internal("kafka_fdw: Unable to connect to %s", k_options->brokers),
Expand Down
2 changes: 1 addition & 1 deletion src/kafka_expr.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ partion_member(KafkaPartitionList *partition_list, int32 search_partition)
{
int32 first, last, middle;
first = 0;
last = partition_list->partition_cnt;
last = partition_list->partition_cnt - 1;
middle = (first + last) / 2;

while (first <= last)
Expand Down
70 changes: 59 additions & 11 deletions src/kafka_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ PG_MODULE_MAGIC;
#define MAX(_a, _b) ((_a > _b) ? _a : _b)
#define STEP_FACTOR 20

/*
* Number of consecutive empty consume batches (no data within buffer_delay)
* that we tolerate before giving up on a partition. The authoritative
* end-of-partition signal is RD_KAFKA_RESP_ERR__PARTITION_EOF; this is only a
* safety bound to avoid looping forever if a broker becomes unresponsive.
*/
#define KAFKA_MAX_EMPTY_POLLS 3

#if PG_VERSION_NUM >= 160000
#define PG_TRY_ARGS(...) PG_TRY(__VA_ARGS__)
#define PG_CATCH_ARGS(...) PG_CATCH(__VA_ARGS__)
Expand Down Expand Up @@ -681,6 +689,7 @@ kafkaIterateForeignScan(ForeignScanState *node)
MemoryContext ccxt = CurrentMemoryContext;
KafkaScanDataDesc * scand = festate->scan_data_desc;
int param_num = 0;
int empty_polls = 0;
KafkaScanP * scan_p;

/* first run eval expressions and setup working list */
Expand Down Expand Up @@ -811,14 +820,28 @@ kafkaIterateForeignScan(ForeignScanState *node)
(errcode(ERRCODE_FDW_ERROR),
errmsg_internal("kafka_fdw got an error fetching data %s", rd_kafka_err2str(rd_kafka_last_error()))));

if (festate->buffer_count <= 0) /* no more messages within timeout*/
if (festate->buffer_count <= 0) /* no data within the poll timeout */
{
/*
* An empty batch does NOT mean the partition is exhausted - that
* is signalled by an explicit RD_KAFKA_RESP_ERR__PARTITION_EOF
* message (enable.partition.eof is turned on in connection.c).
* An empty batch only means no message arrived within
* buffer_delay (e.g. a slow or still in-flight fetch), so retry
* the same partition a few times before giving up to avoid
* silently skipping unread messages.
*/
if (++empty_polls < KAFKA_MAX_EMPTY_POLLS)
continue; /* retry the same partition */
Comment on lines +834 to +835

@mrimbault mrimbault Jun 16, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could also write a NOTICE message here if we ever reach KAFKA_MAX_EMPTY_POLLS, just so the SQL client gets some hint that we gave up trying, and distinguish from a normal EOF that could be otherwise tricky to debug?


empty_polls = 0;
if (!kafkaNext(festate))
return slot;
}
else
{
message = festate->buffer[festate->buffer_cursor];
empty_polls = 0; /* got data, reset the empty-poll counter */
message = festate->buffer[festate->buffer_cursor];
if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
DEBUGLOG("kafka_fdw has reached the end of the queue 2");
Expand Down Expand Up @@ -1375,8 +1398,11 @@ kafkaBeginForeignModify(ModifyTableState *mtstate,
rkt = rd_kafka_topic_new(rk, kafka_options.topic, NULL);
if (!rkt)
{
elog(ERROR, "%% Failed to create topic object: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_resp_err_t last_error = rd_kafka_last_error();

/* destroy the producer before erroring out, elog(ERROR) does not return */
rd_kafka_destroy(rk);
elog(ERROR, "%% Failed to create topic object: %s\n", rd_kafka_err2str(last_error));
}

festate->kafka_topic_handle = rkt;
Expand Down Expand Up @@ -1428,10 +1454,13 @@ kafkaExecForeignInsert(EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slo
KafkaWriteAttributes(festate, slot, festate->parse_options.format);
}

/* fetch partition if given */
value = slot_getattr(slot, festate->kafka_options.partition_attnum, &isnull);
if (!isnull)
partition = DatumGetInt32(value);
/* fetch partition if a partition column is configured */
if (festate->kafka_options.partition_attnum != -1)
{
value = slot_getattr(slot, festate->kafka_options.partition_attnum, &isnull);
if (!isnull)
partition = DatumGetInt32(value);
}

DEBUGLOG("Message: %s", festate->attribute_buf.data);

Expand Down Expand Up @@ -1563,6 +1592,7 @@ kafkaAcquireSampleRowsFunc(Relation relation,
volatile bool catched_error = false;
volatile int cnt = 0;
volatile int64 total = 0;
volatile bool enough = false;

/* Initialize execution state */
kafkaGetOptions(RelationGetRelid(relation), &kafka_options, &parse_options);
Expand Down Expand Up @@ -1622,6 +1652,9 @@ kafkaAcquireSampleRowsFunc(Relation relation,
volatile int m;
volatile bool done = false;

if (enough) /* collected enough sample rows already */
break;

/*
* Ideally we need to peak individual messages from the partition evenly for
* statistics to be more accurate. Unfortunatelly it leads to a very slow
Expand Down Expand Up @@ -1668,10 +1701,25 @@ kafkaAcquireSampleRowsFunc(Relation relation,

if (err == RD_KAFKA_RESP_ERR_NO_ERROR)
{
ReadKafkaMessage(relation, festate, messages[m], CurrentMemoryContext, &values, &nulls);

Assert(cnt <= targrows);
rows[cnt++] = heap_form_tuple(RelationGetDescr(relation), values, nulls);
/*
* Never write past the caller supplied rows
* array (it holds at most targrows entries).
* Once it is full we keep draining/destroying
* the already fetched messages but stop
* collecting and signal the outer loops to
* finish.
*/
if (cnt < targrows)
{
ReadKafkaMessage(
relation, festate, messages[m], CurrentMemoryContext, &values, &nulls);
rows[cnt++] = heap_form_tuple(RelationGetDescr(relation), values, nulls);
}
else
{
enough = true;
done = true;
}
}
else if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
Expand Down
5 changes: 3 additions & 2 deletions src/option.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,13 @@ get_kafka_fdw_attribute_options(Oid relid, KafkaOptions *kafka_options)
Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
List * options;
ListCell * lc;
kafka_options->num_parse_col++;

/* Skip dropped attributes. */
/* Skip dropped attributes - they are not part of the parsed data. */
if (attr->attisdropped)
continue;

kafka_options->num_parse_col++;

options = GetForeignColumnOptions(relid, attnum);
foreach (lc, options)
{
Expand Down
6 changes: 3 additions & 3 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ kafka_get_watermarks(PG_FUNCTION_ARGS)
rd_kafka_err2str(err))));
}

values[0] = p;
values[1] = low;
values[2] = high;
values[0] = Int32GetDatum(p);
values[1] = Int64GetDatum(low);
values[2] = Int64GetDatum(high);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
}
Expand Down
Loading