Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
pg: [16, 15, 14, 13, 12, 11, 10]
pg: [18, 17, 16, 15, 14, 13, 12, 11, 10]
name: PostgreSQL ${{ matrix.pg }}
runs-on: ubuntu-latest
container: pgxn/pgxn-tools
Expand Down
36 changes: 0 additions & 36 deletions .github/workflows/ci_dockerfile.yml

This file was deleted.

22 changes: 10 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ OBJS = $(patsubst %.c,%.o,$(wildcard src/*.c))
PG_CONFIG ?= pg_config
PG_CPPFLAGS = -std=c99 -Wall -Wextra -Wno-unused-parameter


PLATFORM = $(shell uname -s)

ifndef NOINIT
REGRESS_PREP = prep_kafka
endif
Expand All @@ -21,19 +24,12 @@ ifdef DEBUG
PG_CPPFLAGS+= -DDO_DEBUG
endif

PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

ifeq ($(shell test $(VERSION_NUM) -lt 100000; echo $$?),0)
REGRESS := $(filter-out parallel, $(REGRESS))
endif


PLATFORM = $(shell uname -s)

ifeq ($(PLATFORM),Darwin)
SHLIB_LINK += -lrdkafka -lz -lpthread
PG_LIBS += -lrdkafka -lz -lpthread
PG_CPPFLAGS += -I/opt/homebrew/include
SHLIB_LINK += -lrdkafka -lz -lpthread -L/opt/homebrew/opt/librdkafka/lib -lrdkafka
PG_LIBS += -lrdkafka -lz -lpthread -L/opt/homebrew/opt/librdkafka/lib -lrdkafka
export KAFKA_PRODUCER = kafka-console-producer
export KAFKA_TOPICS = kafka-topics
else
SHLIB_LINK += -lrdkafka -lz -lpthread -lrt
PG_LIBS += -lrdkafka -lz -lpthread -lrt
Expand All @@ -43,6 +39,8 @@ ifdef TEST
REGRESS = $(TEST)
endif

PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

all: $(EXTENSION)--$(EXTVERSION).sql

Expand Down
50 changes: 49 additions & 1 deletion src/compatibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,55 @@ ExecInitExprList(List *nodes, PlanState *parent)
create_foreignscan_path signature has changed accros different pg versions
kafka_create_foreignscan_path harmonizes it
*/
#if PG_VERSION_NUM >= 90600

#if PG_VERSION_NUM >= 180000
/* PG18+: added disabled_nodes parameter and fdw_restrictinfo */
#define kafka_create_foreignscan_path(planner_info, \
reloptinfo, \
pathtarget, \
rows, \
startup_cost, \
total_cost, \
pathkeys, \
req_outer, \
fdw_outerpath, \
fdw_private) \
(create_foreignscan_path(planner_info, \
reloptinfo, \
pathtarget, \
rows, \
0, /* disabled_nodes */ \
startup_cost, \
total_cost, \
pathkeys, \
req_outer, \
fdw_outerpath, \
NIL, /* fdw_restrictinfo */ \
fdw_private))
#elif PG_VERSION_NUM >= 170000
/* PG17: added fdw_restrictinfo parameter */
#define kafka_create_foreignscan_path(planner_info, \
reloptinfo, \
pathtarget, \
rows, \
startup_cost, \
total_cost, \
pathkeys, \
req_outer, \
fdw_outerpath, \
fdw_private) \
(create_foreignscan_path(planner_info, \
reloptinfo, \
pathtarget, \
rows, \
startup_cost, \
total_cost, \
pathkeys, \
req_outer, \
fdw_outerpath, \
NIL, /* fdw_restrictinfo */ \
fdw_private))
#elif PG_VERSION_NUM >= 90600
#define kafka_create_foreignscan_path(planner_info, \
reloptinfo, \
pathtarget, \
Expand Down
30 changes: 18 additions & 12 deletions src/kafka_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@
#include "utils/lsyscache.h"
#include "utils/rel.h"

#if PG_VERSION_NUM >= 130000
#include "access/relation.h"
#endif

PG_MODULE_MAGIC;

#define MAX(_a, _b) ((_a > _b) ? _a : _b)
#define STEP_FACTOR 20

#if PG_VERSION_NUM >= 160000
#define PG_TRY_ARGS(...) PG_TRY(__VA_ARGS__)
#define PG_CATCH_ARGS(...) PG_CATCH(__VA_ARGS__)
#define PG_END_TRY_ARGS(...) PG_END_TRY(__VA_ARGS__)
#else
#define PG_TRY_ARGS(...) PG_TRY()
#define PG_CATCH_ARGS(...) PG_CATCH()
#define PG_END_TRY_ARGS(...) PG_END_TRY()
#endif

double kafka_tuple_cost = 0.2f;

/*
Expand Down Expand Up @@ -356,12 +362,12 @@ KafkaScanOpListToString(List *scanop)
}

if (ScanopListGetOh(scanop) == ScanopListGetOl(scanop))
appendStringInfo(&buf, " AND OFFSET = %ld", ScanopListGetOl(scanop));
appendStringInfo(&buf, " AND OFFSET = " INT64_FORMAT, ScanopListGetOl(scanop));
else
{
appendStringInfo(&buf, " AND OFFSET >= %ld", ScanopListGetOl(scanop));
appendStringInfo(&buf, " AND OFFSET >= " INT64_FORMAT, ScanopListGetOl(scanop));
if (!ScanopListGetOhInvinite(scanop))
appendStringInfo(&buf, " AND OFFSET <= %ld", ScanopListGetOh(scanop));
appendStringInfo(&buf, " AND OFFSET <= " INT64_FORMAT, ScanopListGetOh(scanop));
}

return buf.data;
Expand Down Expand Up @@ -402,10 +408,10 @@ kafkaExplainForeignScan(ForeignScanState *node, ExplainState *es)
initStringInfo(&buf);
p = &festate->scan_data->data[i];
DEBUGLOG("p %d, of %ld, ofl %ld", p->partition, p->offset, p->offset_lim);
appendStringInfo(&buf, "PARTITION %d AND OFFSET >= %ld", p->partition, p->offset);
appendStringInfo(&buf, "PARTITION %d AND OFFSET >= " INT64_FORMAT, p->partition, p->offset);

if (p->offset_lim != -1)
appendStringInfo(&buf, " AND OFFSET <= %ld", p->offset_lim);
appendStringInfo(&buf, " AND OFFSET <= " INT64_FORMAT, p->offset_lim);

ExplainPropertyText("scanning", buf.data, es);
}
Expand Down Expand Up @@ -1654,7 +1660,7 @@ kafkaAcquireSampleRowsFunc(Relation relation,
/* Not empty dataset obtained */
if (rows_fetched > 0)
{
PG_TRY();
PG_TRY_ARGS(spl);
{
for (m = 0; m < rows_fetched; m++)
{
Expand Down Expand Up @@ -1684,7 +1690,7 @@ kafkaAcquireSampleRowsFunc(Relation relation,
rd_kafka_message_destroy(messages[m]);
}
}
PG_CATCH();
PG_CATCH_ARGS(spl);
{
/*
* If any error occurs during parsing messages we should
Expand All @@ -1698,7 +1704,7 @@ kafkaAcquireSampleRowsFunc(Relation relation,

PG_RE_THROW();
}
PG_END_TRY();
PG_END_TRY_ARGS(spl);
}
/* Error */
else if (rows_fetched < 0)
Expand Down
9 changes: 9 additions & 0 deletions src/kafka_fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@
#include "access/htup_details.h"
#include "access/reloptions.h"
#include "access/sysattr.h"
#if PG_VERSION_NUM >= 130000
#include "access/relation.h"
#endif
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_foreign_table.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#if PG_VERSION_NUM >= 180000
#include "commands/explain_state.h"
#include "commands/explain_format.h"
#endif
#include "commands/vacuum.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
Expand Down Expand Up @@ -69,6 +76,8 @@
#define ScanopListGetPhInvinite(l) (DatumGetInt32(((Const *) list_nth(l, PartitionHigh))->constisnull))
#define ScanopListGetOhInvinite(l) (DatumGetInt32(((Const *) list_nth(l, OffsetHigh))->constisnull))

extern double kafka_tuple_cost;

enum kafka_msg_format
{
INVALID = -1,
Expand Down
Loading