Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 162 additions & 23 deletions src/bin/pgcopydb/schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -3200,6 +3200,135 @@ typedef struct SourceFKConstraintContext
static void getFKConstraintArray(void *ctx, PGresult *result);


/*
* Base SELECT columns and FROM/WHERE clauses shared across all FK constraint
* filter variants. The filter-specific JOIN clauses are inserted between the
* FROM block and the WHERE block via the array below.
*/
#define FK_SELECT_COLS \
"SELECT c.oid, " \
" format('%I', c.conname), " \
" n.nspname, " \
" r.relname, " \
" format('%I.%I', n.nspname, r.relname), " \
" pg_get_constraintdef(c.oid), " \
" c.condeferrable, " \
" c.condeferred, " \
" c.convalidated "

#define FK_FROM_BLOCK \
" FROM pg_constraint c " \
" JOIN pg_class r ON c.conrelid = r.oid " \
" JOIN pg_namespace n ON r.relnamespace = n.oid " \
" JOIN pg_class ref ON c.confrelid = ref.oid " \
" JOIN pg_namespace rn ON ref.relnamespace = rn.oid "

#define FK_BASE_WHERE \
" WHERE c.contype = 'f' " \
" AND n.nspname !~ '^pg_' " \
" AND n.nspname <> 'information_schema' " \
" AND rn.nspname !~ '^pg_' " \
" AND rn.nspname <> 'information_schema' "

#define FK_ORDER_BY \
" ORDER BY n.nspname, r.relname, c.conname"

struct FilteringQueries listSourceFKConstraintsSQL[] = {
{
SOURCE_FILTER_TYPE_NONE,
FK_SELECT_COLS
FK_FROM_BLOCK
FK_BASE_WHERE
FK_ORDER_BY
},

{
SOURCE_FILTER_TYPE_INCL,
FK_SELECT_COLS
FK_FROM_BLOCK

/* include-only-table: referencing table must be included */
" JOIN filter_include_only_table inc "
" ON n.nspname = inc.nspname "
" AND r.relname = inc.relname "

/* include-only-table: referenced table must also be included */
" JOIN filter_include_only_table refinc "
" ON rn.nspname = refinc.nspname "
" AND ref.relname = refinc.relname "

FK_BASE_WHERE
FK_ORDER_BY
},

{
SOURCE_FILTER_TYPE_EXCL,
FK_SELECT_COLS
FK_FROM_BLOCK

/* exclude referencing table by schema */
" LEFT JOIN filter_exclude_schema fn ON n.nspname = fn.nspname "

/* exclude referencing table by table name */
" LEFT JOIN filter_exclude_table ft "
" ON n.nspname = ft.nspname AND r.relname = ft.relname "

/* exclude referenced table by schema */
" LEFT JOIN filter_exclude_schema rfn ON rn.nspname = rfn.nspname "

/* exclude referenced table by table name */
" LEFT JOIN filter_exclude_table rft "
" ON rn.nspname = rft.nspname AND ref.relname = rft.relname "

FK_BASE_WHERE

/* WHERE clause for exclusion filters */
" AND fn.nspname IS NULL "
" AND ft.relname IS NULL "
" AND rfn.nspname IS NULL "
" AND rft.relname IS NULL "

FK_ORDER_BY
},

{
/* list FK constraints whose referencing table is NOT in include list */
SOURCE_FILTER_TYPE_LIST_NOT_INCL,
FK_SELECT_COLS
FK_FROM_BLOCK

" LEFT JOIN filter_include_only_table inc "
" ON n.nspname = inc.nspname AND r.relname = inc.relname "

FK_BASE_WHERE
" AND inc.relname IS NULL "
FK_ORDER_BY
},

{
/* list FK constraints whose referencing table IS excluded */
SOURCE_FILTER_TYPE_LIST_EXCL,
FK_SELECT_COLS
FK_FROM_BLOCK

" LEFT JOIN filter_exclude_schema fn ON n.nspname = fn.nspname "
" LEFT JOIN filter_exclude_table ft "
" ON n.nspname = ft.nspname AND r.relname = ft.relname "

FK_BASE_WHERE
" AND (fn.nspname IS NOT NULL OR ft.relname IS NOT NULL) "
FK_ORDER_BY
},

/* Index-only and other specialized filter types don't affect FK constraints */
{ SOURCE_FILTER_TYPE_EXCL_INDEX,
FK_SELECT_COLS FK_FROM_BLOCK FK_BASE_WHERE FK_ORDER_BY },

{ SOURCE_FILTER_TYPE_LIST_EXCL_INDEX,
FK_SELECT_COLS FK_FROM_BLOCK FK_BASE_WHERE FK_ORDER_BY },
};


/*
* schema_list_fk_constraints grabs the list of foreign key constraints from
* the given source Postgres instance and stores them in the catalog.
Expand All @@ -3208,6 +3337,10 @@ static void getFKConstraintArray(void *ctx, PGresult *result);
* they don't have an associated index in pg_depend. pgcopydb creates them
* directly (rather than delegating to pg_restore) to support automatic
* NOT VALID retry when pre-existing data violations are found.
*
* Filters are applied to exclude FK constraints where either the referencing
* table or the referenced table has been excluded. A FK constraint cannot be
* created on the target if either table is absent.
*/
bool
schema_list_fk_constraints(PGSQL *pgsql,
Expand All @@ -3218,30 +3351,36 @@ schema_list_fk_constraints(PGSQL *pgsql,

log_trace("schema_list_fk_constraints");

char *sql =
"SELECT c.oid, "
" format('%I', c.conname), "
" n.nspname, "
" r.relname, "
" format('%I.%I', n.nspname, r.relname), "
" pg_get_constraintdef(c.oid), "
" c.condeferrable, "
" c.condeferred, "
" c.convalidated "
" FROM pg_constraint c "
" JOIN pg_class r ON c.conrelid = r.oid "
" JOIN pg_namespace n ON r.relnamespace = n.oid "
" JOIN pg_class ref ON c.confrelid = ref.oid "
" JOIN pg_namespace rn ON ref.relnamespace = rn.oid "
" WHERE c.contype = 'f' "
" AND n.nspname !~ '^pg_' "
" AND n.nspname <> 'information_schema' "
" AND rn.nspname !~ '^pg_' "
" AND rn.nspname <> 'information_schema' "
" ORDER BY n.nspname, r.relname, c.conname";
if (filters->type != SOURCE_FILTER_TYPE_NONE)
{
if (!prepareFilters(pgsql, filters))
{
log_error("Failed to prepare pgcopydb filters, "
"see above for details");
return false;
}
}

if (!pgsql_execute_with_params(pgsql, sql, 0, NULL, NULL,
&context, &getFKConstraintArray))
log_debug("listSourceFKConstraintsSQL[%s]", filterTypeToString(filters->type));

char *sql = prepareFilteredSQL(filters,
listSourceFKConstraintsSQL[filters->type].sql);

if (sql == NULL)
{
log_error("Failed to prepare filtered SQL for FK constraints query");
return false;
}

bool ok = pgsql_execute_with_params(pgsql, sql, 0, NULL, NULL,
&context, &getFKConstraintArray);

if (filters->ctePreamble != NULL && sql != NULL)
{
free(sql);
}

if (!ok)
{
log_error("Failed to list FK constraints");
return false;
Expand Down
1 change: 1 addition & 0 deletions tests/filtering/exclude.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ excluded_test
foo.test_tbl_2
app.foo
foo.matview_1_exclude_as_table
foo.tbl_ref_target

[exclude-table-data]
foo.matview_1_exclude_data
Expand Down
6 changes: 6 additions & 0 deletions tests/filtering/exclude/expected/5-fk-constraints.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-[ RECORD 1 ]---------------+--
fk_to_excluded_schema_count | 0

-[ RECORD 1 ]--------------+--
fk_to_excluded_table_count | 0

17 changes: 17 additions & 0 deletions tests/filtering/exclude/sql/5-fk-constraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- FK referencing an excluded schema should NOT exist on target
select count(*) as fk_to_excluded_schema_count
from pg_constraint c
join pg_class cl on c.conrelid = cl.oid
join pg_namespace n on cl.relnamespace = n.oid
where c.contype = 'f'
and n.nspname = 'foo'
and cl.relname = 'tbl_with_cross_fk';

-- FK referencing an excluded table should NOT exist on target
select count(*) as fk_to_excluded_table_count
from pg_constraint c
join pg_class cl on c.conrelid = cl.oid
join pg_namespace n on cl.relnamespace = n.oid
where c.contype = 'f'
and n.nspname = 'foo'
and cl.relname = 'tbl_with_fk_to_excluded_tbl';
12 changes: 12 additions & 0 deletions tests/filtering/extra.sql
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ create table foo.tbl_with_cross_fk (
user_id bigint references excluded_test.users(id)
);

-- Table to be excluded by table-level filter (referenced by FK below)
create table foo.tbl_ref_target (
id bigserial primary key,
name text
);

-- Table with FK referencing the excluded table above
create table foo.tbl_with_fk_to_excluded_tbl (
id bigserial primary key,
ref_id bigint references foo.tbl_ref_target(id)
);

insert into foo.tbl_with_cross_fk (id, user_id) values (1, 1);

-- View in foo schema referencing excluded schema
Expand Down
Loading