From ee230b6df8f874d47f764084e5569fe752a61345 Mon Sep 17 00:00:00 2001 From: kafka_fdw maintainer Date: Thu, 11 Jun 2026 14:59:54 +0000 Subject: [PATCH 1/2] Fix memory corruption, crash and resource-leak bugs partion_member(): fix off-by-one in binary search (last was partition_cnt instead of partition_cnt - 1) causing a heap over-read past the partitions array. kafkaAcquireSampleRowsFunc(): bound sample collection by targrows. The loop wrote rows[cnt++] without a bound check, overflowing the caller-provided array on large topics during ANALYZE. The previous Assert(cnt <= targrows) was off-by-one and a no-op in release builds. kafkaExecForeignInsert(): guard slot_getattr() with partition_attnum != -1; inserting into a table without a partition column passed attnum -1 (a system attribute of a virtual slot) and crashed. KafkaFdwGetConnection(): destroy conf on rd_kafka_new() failure and topic_conf on topic_conf_set() failure to avoid leaks on error paths. kafkaBeginForeignModify(): destroy the producer before elog(ERROR); the previous rd_kafka_destroy(rk) after elog was unreachable and leaked the producer when topic creation failed. get_kafka_fdw_attribute_options(): count num_parse_col only for non-dropped attributes; dropped columns inflated the field count and broke strict-mode parsing after DROP COLUMN. kafka_get_watermarks(): wrap result values in Int32GetDatum/ Int64GetDatum for correctness on 32-bit / !FLOAT8PASSBYVAL builds. --- src/connection.c | 8 ++++++++ src/kafka_expr.c | 2 +- src/kafka_fdw.c | 43 ++++++++++++++++++++++++++++++++++--------- src/option.c | 5 +++-- src/utils.c | 6 +++--- 5 files changed, 49 insertions(+), 15 deletions(-) diff --git a/src/connection.c b/src/connection.c index 67d93a6..8ddaa67 100644 --- a/src/connection.c +++ b/src/connection.c @@ -29,9 +29,12 @@ KafkaFdwGetConnection(KafkaOptions *k_options, if (rd_kafka_topic_conf_set(topic_conf, "auto.commit.enable", "false", errstr, KAFKA_MAX_ERR_MSG) != RD_KAFKA_CONF_OK) + { + rd_kafka_topic_conf_destroy(topic_conf); ereport( ERROR, (errcode(ERRCODE_FDW_ERROR), errmsg_internal("kafka_fdw: Unable to create topic %s", k_options->topic))); + } *kafka_topic_handle = rd_kafka_topic_new(*kafka_handle, k_options->topic, topic_conf); if (!*kafka_topic_handle) @@ -43,6 +46,11 @@ KafkaFdwGetConnection(KafkaOptions *k_options, } else { + /* + * On failure rd_kafka_new() does NOT take ownership of conf, so we + * must free it ourselves to avoid leaking it. + */ + rd_kafka_conf_destroy(conf); ereport(ERROR, (errcode(ERRCODE_FDW_UNABLE_TO_ESTABLISH_CONNECTION), errmsg_internal("kafka_fdw: Unable to connect to %s", k_options->brokers), diff --git a/src/kafka_expr.c b/src/kafka_expr.c index 6bf433c..f4463f2 100644 --- a/src/kafka_expr.c +++ b/src/kafka_expr.c @@ -225,7 +225,7 @@ partion_member(KafkaPartitionList *partition_list, int32 search_partition) { int32 first, last, middle; first = 0; - last = partition_list->partition_cnt; + last = partition_list->partition_cnt - 1; middle = (first + last) / 2; while (first <= last) diff --git a/src/kafka_fdw.c b/src/kafka_fdw.c index af64cc5..5bae6d2 100644 --- a/src/kafka_fdw.c +++ b/src/kafka_fdw.c @@ -1375,8 +1375,11 @@ kafkaBeginForeignModify(ModifyTableState *mtstate, rkt = rd_kafka_topic_new(rk, kafka_options.topic, NULL); if (!rkt) { - elog(ERROR, "%% Failed to create topic object: %s\n", rd_kafka_err2str(rd_kafka_last_error())); + rd_kafka_resp_err_t last_error = rd_kafka_last_error(); + + /* destroy the producer before erroring out, elog(ERROR) does not return */ rd_kafka_destroy(rk); + elog(ERROR, "%% Failed to create topic object: %s\n", rd_kafka_err2str(last_error)); } festate->kafka_topic_handle = rkt; @@ -1428,10 +1431,13 @@ kafkaExecForeignInsert(EState *estate, ResultRelInfo *rinfo, TupleTableSlot *slo KafkaWriteAttributes(festate, slot, festate->parse_options.format); } - /* fetch partition if given */ - value = slot_getattr(slot, festate->kafka_options.partition_attnum, &isnull); - if (!isnull) - partition = DatumGetInt32(value); + /* fetch partition if a partition column is configured */ + if (festate->kafka_options.partition_attnum != -1) + { + value = slot_getattr(slot, festate->kafka_options.partition_attnum, &isnull); + if (!isnull) + partition = DatumGetInt32(value); + } DEBUGLOG("Message: %s", festate->attribute_buf.data); @@ -1563,6 +1569,7 @@ kafkaAcquireSampleRowsFunc(Relation relation, volatile bool catched_error = false; volatile int cnt = 0; volatile int64 total = 0; + volatile bool enough = false; /* Initialize execution state */ kafkaGetOptions(RelationGetRelid(relation), &kafka_options, &parse_options); @@ -1622,6 +1629,9 @@ kafkaAcquireSampleRowsFunc(Relation relation, volatile int m; volatile bool done = false; + if (enough) /* collected enough sample rows already */ + break; + /* * Ideally we need to peak individual messages from the partition evenly for * statistics to be more accurate. Unfortunatelly it leads to a very slow @@ -1668,10 +1678,25 @@ kafkaAcquireSampleRowsFunc(Relation relation, if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { - ReadKafkaMessage(relation, festate, messages[m], CurrentMemoryContext, &values, &nulls); - - Assert(cnt <= targrows); - rows[cnt++] = heap_form_tuple(RelationGetDescr(relation), values, nulls); + /* + * Never write past the caller supplied rows + * array (it holds at most targrows entries). + * Once it is full we keep draining/destroying + * the already fetched messages but stop + * collecting and signal the outer loops to + * finish. + */ + if (cnt < targrows) + { + ReadKafkaMessage( + relation, festate, messages[m], CurrentMemoryContext, &values, &nulls); + rows[cnt++] = heap_form_tuple(RelationGetDescr(relation), values, nulls); + } + else + { + enough = true; + done = true; + } } else if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { diff --git a/src/option.c b/src/option.c index 84a7acf..ef8468e 100644 --- a/src/option.c +++ b/src/option.c @@ -442,12 +442,13 @@ get_kafka_fdw_attribute_options(Oid relid, KafkaOptions *kafka_options) Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1); List * options; ListCell * lc; - kafka_options->num_parse_col++; - /* Skip dropped attributes. */ + /* Skip dropped attributes - they are not part of the parsed data. */ if (attr->attisdropped) continue; + kafka_options->num_parse_col++; + options = GetForeignColumnOptions(relid, attnum); foreach (lc, options) { diff --git a/src/utils.c b/src/utils.c index 41312a8..c2958fe 100644 --- a/src/utils.c +++ b/src/utils.c @@ -84,9 +84,9 @@ kafka_get_watermarks(PG_FUNCTION_ARGS) rd_kafka_err2str(err)))); } - values[0] = p; - values[1] = low; - values[2] = high; + values[0] = Int32GetDatum(p); + values[1] = Int64GetDatum(low); + values[2] = Int64GetDatum(high); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } } From 235e82bb5f71407b35c6c6947ba7ac4f21531d30 Mon Sep 17 00:00:00 2001 From: kafka_fdw maintainer Date: Thu, 11 Jun 2026 15:12:40 +0000 Subject: [PATCH 2/2] Fix unreliable end-of-partition detection librdkafka's enable.partition.eof defaults to false, so the consumer never emitted RD_KAFKA_RESP_ERR__PARTITION_EOF and the existing EOF handling in kafkaIterateForeignScan was effectively dead code. The scan end then relied solely on rd_kafka_consume_batch() returning an empty batch within buffer_delay (100ms default). A slow or still in-flight fetch also yields an empty batch, so the scan could declare a partition finished prematurely and silently skip unread messages, producing incomplete, timing-dependent query results. connection.c: enable enable.partition.eof on the consumer so the explicit PARTITION_EOF marker is delivered. kafka_fdw.c: treat an empty batch as "no data yet", not EOF. Retry the same partition up to KAFKA_MAX_EMPTY_POLLS (3) consecutive empty polls before giving up; rely on the explicit PARTITION_EOF message for the authoritative end of a partition. The retry cap bounds the loop in case a broker becomes unresponsive. --- src/connection.c | 11 +++++++++++ src/kafka_fdw.c | 27 +++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/connection.c b/src/connection.c index 8ddaa67..763e292 100644 --- a/src/connection.c +++ b/src/connection.c @@ -20,6 +20,17 @@ KafkaFdwGetConnection(KafkaOptions *k_options, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) elog(ERROR, "%s\n", errstr); + /* + * Emit an explicit RD_KAFKA_RESP_ERR__PARTITION_EOF marker when a + * partition is exhausted. This defaults to false in librdkafka, in which + * case the only end-of-partition signal would be an empty batch within the + * poll timeout - which is unreliable because a slow/in-flight fetch also + * yields an empty batch and would make us skip unread messages. + */ + if (rd_kafka_conf_set(conf, "enable.partition.eof", "true", + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + elog(ERROR, "%s\n", errstr); + *kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, KAFKA_MAX_ERR_MSG); if (*kafka_handle != NULL) diff --git a/src/kafka_fdw.c b/src/kafka_fdw.c index 5bae6d2..8fa4c0b 100644 --- a/src/kafka_fdw.c +++ b/src/kafka_fdw.c @@ -11,6 +11,14 @@ PG_MODULE_MAGIC; #define MAX(_a, _b) ((_a > _b) ? _a : _b) #define STEP_FACTOR 20 +/* + * Number of consecutive empty consume batches (no data within buffer_delay) + * that we tolerate before giving up on a partition. The authoritative + * end-of-partition signal is RD_KAFKA_RESP_ERR__PARTITION_EOF; this is only a + * safety bound to avoid looping forever if a broker becomes unresponsive. + */ +#define KAFKA_MAX_EMPTY_POLLS 3 + #if PG_VERSION_NUM >= 160000 #define PG_TRY_ARGS(...) PG_TRY(__VA_ARGS__) #define PG_CATCH_ARGS(...) PG_CATCH(__VA_ARGS__) @@ -681,6 +689,7 @@ kafkaIterateForeignScan(ForeignScanState *node) MemoryContext ccxt = CurrentMemoryContext; KafkaScanDataDesc * scand = festate->scan_data_desc; int param_num = 0; + int empty_polls = 0; KafkaScanP * scan_p; /* first run eval expressions and setup working list */ @@ -811,14 +820,28 @@ kafkaIterateForeignScan(ForeignScanState *node) (errcode(ERRCODE_FDW_ERROR), errmsg_internal("kafka_fdw got an error fetching data %s", rd_kafka_err2str(rd_kafka_last_error())))); - if (festate->buffer_count <= 0) /* no more messages within timeout*/ + if (festate->buffer_count <= 0) /* no data within the poll timeout */ { + /* + * An empty batch does NOT mean the partition is exhausted - that + * is signalled by an explicit RD_KAFKA_RESP_ERR__PARTITION_EOF + * message (enable.partition.eof is turned on in connection.c). + * An empty batch only means no message arrived within + * buffer_delay (e.g. a slow or still in-flight fetch), so retry + * the same partition a few times before giving up to avoid + * silently skipping unread messages. + */ + if (++empty_polls < KAFKA_MAX_EMPTY_POLLS) + continue; /* retry the same partition */ + + empty_polls = 0; if (!kafkaNext(festate)) return slot; } else { - message = festate->buffer[festate->buffer_cursor]; + empty_polls = 0; /* got data, reset the empty-poll counter */ + message = festate->buffer[festate->buffer_cursor]; if (message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { DEBUGLOG("kafka_fdw has reached the end of the queue 2");