Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/include/clone.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@
--defer-indexes Defer index building until after all table data is copied
--defer-analyze Defer ANALYZE until after post-data restore
--use-copy-binary Use the COPY BINARY format for COPY operations
--cleanup-threshold Max size of applied CDC files to retain (e.g. 10GB, 0 to disable)
--cleanup-min-age Min age before applied CDC files can be deleted (e.g. 15m, 2h)

2 changes: 2 additions & 0 deletions docs/include/follow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@
--create-slot Create the replication slot
--origin Use this Postgres replication origin node name
--endpos Stop replaying changes when reaching this LSN
--cleanup-threshold Max size of applied CDC files to retain (e.g. 10GB, 0 to disable)
--cleanup-min-age Min age before applied CDC files can be deleted (e.g. 15m, 2h)

14 changes: 11 additions & 3 deletions src/bin/pgcopydb/cli_clone_follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
" --defer-indexes Defer index building until after all table data is copied\n" \
" --defer-analyze Defer ANALYZE until after post-data restore\n" \
" --use-copy-binary Use the COPY BINARY format for COPY operations\n" \
" --cleanup-threshold Max size of applied CDC files to retain (e.g. 10GB, 0 to disable)\n" \
" --cleanup-min-age Min age before applied CDC files can be deleted (e.g. 15m, 2h)\n" \

CommandLine clone_command =
make_command(
Expand Down Expand Up @@ -109,7 +111,9 @@ CommandLine follow_command =
" --slot-name Use this Postgres replication slot name\n"
" --create-slot Create the replication slot\n"
" --origin Use this Postgres replication origin node name\n"
" --endpos Stop replaying changes when reaching this LSN\n",
" --endpos Stop replaying changes when reaching this LSN\n"
" --cleanup-threshold Max size of applied CDC files to retain (e.g. 10GB, 0 to disable)\n"
" --cleanup-min-age Min age before applied CDC files can be deleted (e.g. 15m, 2h)\n",
cli_copy_db_getopts,
cli_follow);

Expand Down Expand Up @@ -224,7 +228,9 @@ clone_and_follow(CopyDataSpec *copySpecs)
&(copySpecs->filters),
copyDBoptions.stdIn,
copyDBoptions.stdOut,
logSQL))
logSQL,
copyDBoptions.cleanupThresholdBytes,
copyDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down Expand Up @@ -560,7 +566,9 @@ cli_follow(int argc, char **argv)
&(copySpecs.filters),
copyDBoptions.stdIn,
copyDBoptions.stdOut,
logSQL))
logSQL,
copyDBoptions.cleanupThresholdBytes,
copyDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down
58 changes: 58 additions & 0 deletions src/bin/pgcopydb/cli_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ cli_copy_db_getopts(int argc, char **argv)
{ "restore-tolerance", required_argument, NULL, 256 },
{ "defer-indexes", no_argument, NULL, 257 },
{ "defer-analyze", no_argument, NULL, 258 },
{ "cleanup-threshold", required_argument, NULL, 259 },
{ "cleanup-min-age", required_argument, NULL, 260 },
{ "help", no_argument, NULL, 'h' },
{ NULL, 0, NULL, 0 }
};
Expand Down Expand Up @@ -1149,6 +1151,45 @@ cli_copy_db_getopts(int argc, char **argv)
break;
}

case 259:
{
if (!cli_parse_bytes_pretty(
optarg,
&(options.cleanupThresholdBytes),
(char *) &(options.cleanupThresholdPretty),
sizeof(options.cleanupThresholdPretty)))
{
log_fatal("Failed to parse --cleanup-threshold: \"%s\"",
optarg);
++errors;
}

log_trace("--cleanup-threshold %s (%lld)",
options.cleanupThresholdPretty,
(long long) options.cleanupThresholdBytes);
break;
}

case 260:
{
if (!cli_parse_duration(
optarg,
&(options.cleanupMinAgeSeconds)))
{
log_fatal("Failed to parse --cleanup-min-age: \"%s\"",
optarg);
++errors;
}

strlcpy(options.cleanupMinAgePretty, optarg,
sizeof(options.cleanupMinAgePretty));

log_trace("--cleanup-min-age %s (%d seconds)",
options.cleanupMinAgePretty,
options.cleanupMinAgeSeconds);
break;
}

case '?':
default:
{
Expand Down Expand Up @@ -1200,6 +1241,23 @@ cli_copy_db_getopts(int argc, char **argv)
exit(EXIT_CODE_BAD_ARGS);
}

if (options.cleanupThresholdBytes == 0 && options.cleanupMinAgeSeconds > 0)
{
log_warn("--cleanup-min-age has no effect without --cleanup-threshold");
}

/*
* When cleanup threshold is set but min-age wasn't explicitly provided,
* default to 15 minutes (900 seconds) for safety.
*/
if (options.cleanupThresholdBytes > 0 && options.cleanupMinAgeSeconds == 0 &&
options.cleanupMinAgePretty[0] == '\0')
{
options.cleanupMinAgeSeconds = 900;
strlcpy(options.cleanupMinAgePretty, "15m",
sizeof(options.cleanupMinAgePretty));
}

if (errors > 0)
{
commandline_help(stderr);
Expand Down
6 changes: 6 additions & 0 deletions src/bin/pgcopydb/cli_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ typedef struct CopyDBOptions

char filterFileName[MAXPGPATH];
char requirementsFileName[MAXPGPATH];

/* CDC file cleanup configuration */
uint64_t cleanupThresholdBytes;
char cleanupThresholdPretty[BUFSIZE];
int cleanupMinAgeSeconds;
char cleanupMinAgePretty[BUFSIZE];
} CopyDBOptions;

extern bool outputJSON;
Expand Down
4 changes: 3 additions & 1 deletion src/bin/pgcopydb/cli_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ cli_create_snapshot(int argc, char **argv)
&(copySpecs.filters),
createSNoptions.stdIn,
createSNoptions.stdOut,
logSQL))
logSQL,
0,
0))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down
82 changes: 76 additions & 6 deletions src/bin/pgcopydb/cli_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ cli_stream_getopts(int argc, char **argv)
{ "debug", no_argument, NULL, 'd' },
{ "trace", no_argument, NULL, 'z' },
{ "quiet", no_argument, NULL, 'q' },
{ "cleanup-threshold", required_argument, NULL, 256 },
{ "cleanup-min-age", required_argument, NULL, 257 },
{ "help", no_argument, NULL, 'h' },
{ NULL, 0, NULL, 0 }
};
Expand Down Expand Up @@ -434,6 +436,45 @@ cli_stream_getopts(int argc, char **argv)
break;
}

case 256:
{
if (!cli_parse_bytes_pretty(
optarg,
&(options.cleanupThresholdBytes),
(char *) &(options.cleanupThresholdPretty),
sizeof(options.cleanupThresholdPretty)))
{
log_fatal("Failed to parse --cleanup-threshold: \"%s\"",
optarg);
++errors;
}

log_trace("--cleanup-threshold %s (%lld)",
options.cleanupThresholdPretty,
(long long) options.cleanupThresholdBytes);
break;
}

case 257:
{
if (!cli_parse_duration(
optarg,
&(options.cleanupMinAgeSeconds)))
{
log_fatal("Failed to parse --cleanup-min-age: \"%s\"",
optarg);
++errors;
}

strlcpy(options.cleanupMinAgePretty, optarg,
sizeof(options.cleanupMinAgePretty));

log_trace("--cleanup-min-age %s (%d seconds)",
options.cleanupMinAgePretty,
options.cleanupMinAgeSeconds);
break;
}

case '?':
default:
{
Expand Down Expand Up @@ -472,6 +513,23 @@ cli_stream_getopts(int argc, char **argv)
exit(EXIT_CODE_BAD_ARGS);
}

if (options.cleanupThresholdBytes == 0 && options.cleanupMinAgeSeconds > 0)
{
log_warn("--cleanup-min-age has no effect without --cleanup-threshold");
}

/*
* When cleanup threshold is set but min-age wasn't explicitly provided,
* default to 15 minutes (900 seconds) for safety.
*/
if (options.cleanupThresholdBytes > 0 && options.cleanupMinAgeSeconds == 0 &&
options.cleanupMinAgePretty[0] == '\0')
{
options.cleanupMinAgeSeconds = 900;
strlcpy(options.cleanupMinAgePretty, "15m",
sizeof(options.cleanupMinAgePretty));
}

if (errors > 0)
{
commandline_help(stderr);
Expand Down Expand Up @@ -585,7 +643,9 @@ cli_stream_setup(int argc, char **argv)
&(copySpecs.filters),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
logSQL,
streamDBoptions.cleanupThresholdBytes,
streamDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down Expand Up @@ -729,7 +789,9 @@ cli_stream_catchup(int argc, char **argv)
&(copySpecs.filters),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
logSQL,
streamDBoptions.cleanupThresholdBytes,
streamDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down Expand Up @@ -813,7 +875,9 @@ cli_stream_replay(int argc, char **argv)
&(copySpecs.filters),
true, /* stdin */
true, /* stdout */
logSQL))
logSQL,
streamDBoptions.cleanupThresholdBytes,
streamDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down Expand Up @@ -939,7 +1003,9 @@ cli_stream_transform(int argc, char **argv)
&(copySpecs.filters),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
logSQL,
streamDBoptions.cleanupThresholdBytes,
streamDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down Expand Up @@ -1102,7 +1168,9 @@ cli_stream_apply(int argc, char **argv)
&(copySpecs.filters),
true, /* streamDBoptions.stdIn */
false, /* streamDBoptions.stdOut */
logSQL))
logSQL,
streamDBoptions.cleanupThresholdBytes,
streamDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down Expand Up @@ -1215,7 +1283,9 @@ stream_start_in_mode(LogicalStreamMode mode)
&(copySpecs.filters),
streamDBoptions.stdIn,
streamDBoptions.stdOut,
logSQL))
logSQL,
streamDBoptions.cleanupThresholdBytes,
streamDBoptions.cleanupMinAgeSeconds))
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
Expand Down
36 changes: 34 additions & 2 deletions src/bin/pgcopydb/follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cli_common.h"
#include "cli_root.h"
#include "ld_cleanup.h"
#include "ld_stream.h"
#include "log.h"
#include "progress.h"
Expand Down Expand Up @@ -607,6 +608,23 @@ followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
}
}

/*
* When cleanup threshold is configured, start the cleanup watchdog
* to periodically remove old applied CDC files.
*/
if (streamSpecs->cleanupThresholdBytes > 0)
{
FollowSubProcess *cleanup = &(streamSpecs->cleanup);

if (!follow_start_subprocess(streamSpecs, cleanup))
{
log_error("Failed to start the %s process", cleanup->name);

(void) follow_exit_early(streamSpecs);
return false;
}
}

/*
* Close pipe ends which follow is not using. Otherwise the processes
* like transform and apply which reads from the pipe during replay
Expand Down Expand Up @@ -830,6 +848,18 @@ follow_start_catchup(StreamSpecs *specs)
}


/*
* follow_start_cleanup starts a sub-process that cleans up old CDC files.
* The catalog is already opened by follow_start_subprocess before this is
* called.
*/
bool
follow_start_cleanup(StreamSpecs *specs)
{
return cdc_cleanup_loop(specs);
}


/*
* follow_start_subprocess forks a subprocess and calls the given function.
*/
Expand Down Expand Up @@ -946,7 +976,8 @@ follow_wait_subprocesses(StreamSpecs *specs)
FollowSubProcess *processArray[] = {
&(specs->prefetch),
&(specs->transform),
&(specs->catchup)
&(specs->catchup),
&(specs->cleanup)
};

int count = sizeof(processArray) / sizeof(processArray[0]);
Expand Down Expand Up @@ -1136,7 +1167,8 @@ follow_terminate_subprocesses(StreamSpecs *specs)
FollowSubProcess *processArray[] = {
&(specs->prefetch),
&(specs->transform),
&(specs->catchup)
&(specs->catchup),
&(specs->cleanup)
};
int count = sizeof(processArray) / sizeof(processArray[0]);

Expand Down
Loading
Loading