diff --git a/man/man1/pmseries.1 b/man/man1/pmseries.1 index fc79e8a90b5..e6d65902fbd 100644 --- a/man/man1/pmseries.1 +++ b/man/man1/pmseries.1 @@ -18,7 +18,7 @@ \f3pmseries\f1 \- display information about performance metric timeseries .SH SYNOPSIS \fBpmseries\fR -[\fB\-adFiIlLmMnqsStvV?\fR] +[\fB\-adFGiIlLmMnNqsStvV?\fR] [\fB\-c\fR \fIconfig\fR] [\f3\-D\f1 \f2debug\f1] [\fB\-g\fR \fIpattern\fR] @@ -745,6 +745,36 @@ Connect to the key-value server at .IR host , rather than the one the localhost. .TP +\fB\-G\fR, \fB\-\-gc\fR +Scan for expired time series and remove their ancillary data from the +key-value cluster. +A series is considered expired when its +.B pcp:values:series: +stream key no longer exists (i.e. the +.B stream.expire +TTL has elapsed without new data being ingested). +The following keys are removed for each expired series: +.B pcp:desc:series:, +.B pcp:metric.name:series:, +.B pcp:instances:series:, +.B pcp:labelvalue:series:, +and +.B pcp:labelflags:series:. +The series identifier is also removed from the forward-lookup sets +.B pcp:series:metric.name:, +.B pcp:series:inst.name:, +.B pcp:series:label..value:, +and +.B pcp:series:context.name:. +Orphaned +.B pcp:inst:series: +hashes are cleaned up when their associated instance-name set becomes empty. +.TP +\fB\-N\fR, \fB\-\-dryrun\fR +Used with +.BR \-\-gc : +log which series would be removed without actually deleting anything. +.TP \fB\-L\fR, \fB\-\-load\fR Load timeseries metadata and data into the key-value cluster. .TP diff --git a/src/bashrc/pcp_completion.sh b/src/bashrc/pcp_completion.sh index d5cdf61fc86..deaf2feb472 100644 --- a/src/bashrc/pcp_completion.sh +++ b/src/bashrc/pcp_completion.sh @@ -137,7 +137,7 @@ _pcp_complete() arg_regex="-[ahKcoFeDASTOstZiJ489N6XWwP0lEfqbyQBY]" ;; pmseries) - all_args="acdDFghiIlLmMnqpsStvVwZ" + all_args="acdDFgGhiIlLmMnNqpsStvVwZ" arg_regex="-[cDghpwZ]" ;; pmstat) diff --git a/src/include/pcp/pmwebapi.h b/src/include/pcp/pmwebapi.h index e2cb98d10ef..fabbec1af2b 100644 --- a/src/include/pcp/pmwebapi.h +++ b/src/include/pcp/pmwebapi.h @@ -70,6 +70,7 @@ typedef enum pmSeriesFlags { PM_SERIES_FLAG_METADATA = (1 << 0), /* only load metric metadata */ PM_SERIES_FLAG_ACTIVE = (1 << 1), /* continual source updates */ PM_SERIES_FLAG_TEXT = (1 << 2), /* load metric & indom help */ + PM_SERIES_FLAG_DRYRUN = (1 << 3), /* GC: log what would be removed, no writes */ PM_SERIES_FLAG_ALL = ((unsigned int)~PM_SERIES_FLAG_NONE) } pmSeriesFlags; @@ -169,6 +170,7 @@ extern int pmSeriesValues(pmSeriesSettings *, pmSeriesTimeWindow *, int, sds *, extern int pmSeriesWindow(pmSeriesSettings *, sds, pmSeriesTimeWindow *, void *); extern int pmSeriesQuery(pmSeriesSettings *, sds, pmSeriesFlags, void *); extern int pmSeriesLoad(pmSeriesSettings *, sds, pmSeriesFlags, void *); +extern int pmSeriesGC(pmSeriesSettings *, pmSeriesFlags, void *); /* * Timer list interface - global, thread-safe diff --git a/src/libpcp_web/src/batons.h b/src/libpcp_web/src/batons.h index 07b98faf7cf..a2905339ea3 100644 --- a/src/libpcp_web/src/batons.h +++ b/src/libpcp_web/src/batons.h @@ -25,6 +25,7 @@ typedef enum { MAGIC_NAMES, MAGIC_LABELMAP, MAGIC_SEARCH, + MAGIC_GC, MAGIC_COUNT } series_baton_magic; diff --git a/src/libpcp_web/src/exports.in b/src/libpcp_web/src/exports.in index db0b669b770..4c2dae690a0 100644 --- a/src/libpcp_web/src/exports.in +++ b/src/libpcp_web/src/exports.in @@ -294,3 +294,8 @@ PCP_WEB_1.23 { dictSetVal; keySlotsContextFree; } PCP_WEB_1.22; + +PCP_WEB_1.24 { + global: + pmSeriesGC; +} PCP_WEB_1.23; diff --git a/src/libpcp_web/src/schema.c b/src/libpcp_web/src/schema.c index b926db70c42..55b95e36dde 100644 --- a/src/libpcp_web/src/schema.c +++ b/src/libpcp_web/src/schema.c @@ -1685,6 +1685,21 @@ pmSeriesSetupMetrics(pmSeriesModule *module) "calls to /series/load", "total RESTAPI calls to /series/load"); + mmv_stats_add_metric(data->registry, "gc.calls", 10, + MMV_TYPE_U64, MMV_SEM_COUNTER, countunits, MMV_INDOM_NULL, + "garbage collection runs", + "total pmSeriesGC() invocations"); + + mmv_stats_add_metric(data->registry, "gc.scanned", 11, + MMV_TYPE_U64, MMV_SEM_COUNTER, countunits, MMV_INDOM_NULL, + "series examined by GC", + "total series examined across all GC runs"); + + mmv_stats_add_metric(data->registry, "gc.cleaned", 12, + MMV_TYPE_U64, MMV_SEM_COUNTER, countunits, MMV_INDOM_NULL, + "series deleted by GC", + "total series deleted across all GC runs"); + data->map = map = mmv_stats_start(data->registry); metrics = data->metrics; @@ -1706,6 +1721,12 @@ pmSeriesSetupMetrics(pmSeriesModule *module) "labelvalues.calls", NULL); metrics[SERIES_LOAD_CALLS] = mmv_lookup_value_desc(map, "load.calls", NULL); + metrics[SERIES_GC_CALLS] = mmv_lookup_value_desc(map, + "gc.calls", NULL); + metrics[SERIES_GC_SCANNED] = mmv_lookup_value_desc(map, + "gc.scanned", NULL); + metrics[SERIES_GC_CLEANED] = mmv_lookup_value_desc(map, + "gc.cleaned", NULL); } int @@ -1749,6 +1770,741 @@ pmSeriesSetup(pmSeriesModule *module, void *arg) return 0; } +/* + * ============================================================ + * Time series garbage collection + * + * Scans pcp:desc:series:* to find all known series, checks + * whether each stream (pcp:values:series:H) still exists, and + * removes all ancillary keys for series whose streams have expired. + * + * Metrics updated on completion (visible via pmproxy MMV): + * series.gc.calls - total GC invocations + * series.gc.scanned - cumulative series checked + * series.gc.cleaned - cumulative series deleted + * ============================================================ + */ + +/* Forward declarations */ +static void keys_series_gc_scan(seriesGCBaton *); +static void keys_series_gc_sweep(seriesGCEntry *); + +static seriesGCBaton * +initSeriesGCBaton(keySlots *slots, seriesModuleData *data, + pmSeriesSettings *settings, int dryrun, void *arg) +{ + seriesGCBaton *baton; + + if ((baton = calloc(1, sizeof(seriesGCBaton))) == NULL) + return NULL; + initSeriesBatonMagic(baton, MAGIC_GC); + baton->slots = slots; + baton->module = data; + baton->cursor = sdsnew("0"); + baton->dryrun = dryrun; + baton->info = settings->module.on_info; + baton->done = settings->callbacks.on_done; + baton->userdata = arg; + return baton; +} + +static void +freeSeriesGCBaton(seriesGCBaton *baton) +{ + sdsfree(baton->cursor); + memset(baton, 0, sizeof(*baton)); + free(baton); +} + +static void +doneSeriesGCBaton(seriesGCBaton *baton, const char *caller) +{ + seriesModuleData *data; + unsigned long long ll; + sds msg = NULL; + + seriesBatonCheckMagic(baton, MAGIC_GC, caller); + if (seriesBatonDereference(baton, caller)) { + data = (seriesModuleData *)baton->module; + if (data && data->map) { + mmv_inc(data->map, data->metrics[SERIES_GC_CALLS]); + ll = (unsigned long long)baton->nscanned; + mmv_add(data->map, data->metrics[SERIES_GC_SCANNED], &ll); + ll = (unsigned long long)baton->ncleaned; + mmv_add(data->map, data->metrics[SERIES_GC_CLEANED], &ll); + } + infofmt(msg, "GC: %u series scanned, %u cleaned", + baton->nscanned, baton->ncleaned); + batoninfo(baton, PMLOG_INFO, msg); + baton->done(0, baton->userdata); + freeSeriesGCBaton(baton); + } +} + +static seriesGCEntry * +initSeriesGCEntry(seriesGCBaton *parent, const char *hash) +{ + seriesGCEntry *entry; + + if ((entry = calloc(1, sizeof(seriesGCEntry))) == NULL) + return NULL; + initSeriesBatonMagic(entry, MAGIC_GC); + entry->slots = parent->slots; + entry->dryrun = parent->dryrun; + entry->info = parent->info; + entry->userdata = parent->userdata; + entry->parent = parent; + strncpy(entry->hash, hash, sizeof(entry->hash) - 1); + return entry; +} + +static void +freeSeriesGCEntry(seriesGCEntry *entry) +{ + unsigned int i; + + for (i = 0; i < entry->nmetric_ids; i++) + sdsfree(entry->metric_ids[i]); + free(entry->metric_ids); + + for (i = 0; i < entry->ninst_hashes; i++) + sdsfree(entry->inst_hashes[i]); + free(entry->inst_hashes); + + for (i = 0; i < entry->nlabels; i++) { + sdsfree(entry->label_name_ids[i]); + sdsfree(entry->label_val_ids[i]); + } + free(entry->label_name_ids); + free(entry->label_val_ids); + + memset(entry, 0, sizeof(*entry)); + free(entry); +} + +static void +doneSeriesGCEntry(seriesGCEntry *entry, const char *caller) +{ + seriesGCBaton *parent = entry->parent; + + seriesBatonCheckMagic(entry, MAGIC_GC, caller); + if (seriesBatonDereference(entry, caller)) { + freeSeriesGCEntry(entry); + doneSeriesGCBaton(parent, "doneSeriesGCEntry"); + } +} + +/* + * Generic sweep callback: one async write completed, release one entry ref. + */ +static void +keys_series_gc_done_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCEntry *entry = (seriesGCEntry *)arg; + + seriesBatonCheckMagic(entry, MAGIC_GC, "keys_series_gc_done_callback"); + doneSeriesGCEntry(entry, "keys_series_gc_done_callback"); +} + +/* + * Per-instance context for the HGET → SREM → EXISTS → (DEL) chain. + */ +typedef struct { + seriesGCEntry *entry; + char ih_hex[42]; /* instance name.hash in hex */ + char iid_hex[42]; /* instance name.id in hex (filled by HGET) */ +} seriesGCInstCtx; + +static void +keys_series_gc_inst_exists_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCInstCtx *ctx = (seriesGCInstCtx *)arg; + seriesGCEntry *entry = ctx->entry; + respReply *reply = r; + sds cmd, key; + + if (reply && reply->type == RESP_REPLY_INTEGER && reply->integer == 0) { + /* forward set was auto-deleted by SREM - clean the instance hash too */ + key = sdscatfmt(sdsempty(), "pcp:inst:series:%s", ctx->ih_hex); + cmd = resp_command(2); + cmd = resp_param_str(cmd, DEL, DEL_LEN); + cmd = resp_param_sds(cmd, key); + sdsfree(key); + free(ctx); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_done_callback, entry); + sdsfree(cmd); + } else { + /* set still has entries from other series */ + free(ctx); + doneSeriesGCEntry(entry, "keys_series_gc_inst_exists_callback"); + } +} + +static void +keys_series_gc_inst_srem_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCInstCtx *ctx = (seriesGCInstCtx *)arg; + seriesGCEntry *entry = ctx->entry; + sds cmd, key; + + /* check if the forward set was auto-deleted (became empty) */ + key = sdscatfmt(sdsempty(), "pcp:series:inst.name:%s", ctx->iid_hex); + cmd = resp_command(2); + cmd = resp_param_str(cmd, EXISTS, EXISTS_LEN); + cmd = resp_param_sds(cmd, key); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_inst_exists_callback, ctx); + sdsfree(cmd); +} + +static void +keys_series_gc_inst_hget_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCInstCtx *ctx = (seriesGCInstCtx *)arg; + seriesGCEntry *entry = ctx->entry; + respReply *reply = r; + sds cmd, key; + + if (reply == NULL || reply->type != RESP_REPLY_STRING || reply->len != 20) { + /* no name field or unexpected type - skip SREM for this instance */ + free(ctx); + doneSeriesGCEntry(entry, "keys_series_gc_inst_hget_callback skip"); + return; + } + + pmwebapi_hash_str((unsigned char *)reply->str, ctx->iid_hex, sizeof(ctx->iid_hex)); + + key = sdscatfmt(sdsempty(), "pcp:series:inst.name:%s", ctx->iid_hex); + cmd = resp_command(3); + cmd = resp_param_str(cmd, SREM, SREM_LEN); + cmd = resp_param_sds(cmd, key); + cmd = resp_param_str(cmd, entry->hash, 40); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_inst_srem_callback, ctx); + sdsfree(cmd); +} + +/* + * Context source resolution context for SMEMBERS → N SREMs. + */ +typedef struct { + seriesGCEntry *entry; + unsigned int npending; +} seriesGCCtxCtx; + +static void +keys_series_gc_ctx_srem_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCCtxCtx *ctx = (seriesGCCtxCtx *)arg; + seriesGCEntry *entry = ctx->entry; + + if (--ctx->npending == 0) { + free(ctx); + doneSeriesGCEntry(entry, "keys_series_gc_ctx_srem_callback"); + } +} + +static void +keys_series_gc_ctx_smembers_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCCtxCtx *ctx = (seriesGCCtxCtx *)arg; + seriesGCEntry *entry = ctx->entry; + respReply *reply = r; + char cidhex[42]; + sds cmd, key; + unsigned int i, nsrems = 0; + + if (reply == NULL || reply->type != RESP_REPLY_ARRAY || reply->elements == 0) { + free(ctx); + doneSeriesGCEntry(entry, "keys_series_gc_ctx_smembers_callback empty"); + return; + } + + for (i = 0; i < reply->elements; i++) { + if (reply->element[i]->type == RESP_REPLY_STRING && + reply->element[i]->len == 20) + nsrems++; + } + if (nsrems == 0) { + free(ctx); + doneSeriesGCEntry(entry, "keys_series_gc_ctx_smembers_callback no valid"); + return; + } + + ctx->npending = nsrems; + for (i = 0; i < reply->elements; i++) { + if (reply->element[i]->type != RESP_REPLY_STRING || + reply->element[i]->len != 20) + continue; + pmwebapi_hash_str((unsigned char *)reply->element[i]->str, + cidhex, sizeof(cidhex)); + key = sdscatfmt(sdsempty(), "pcp:series:context.name:%s", cidhex); + cmd = resp_command(3); + cmd = resp_param_str(cmd, SREM, SREM_LEN); + cmd = resp_param_sds(cmd, key); + cmd = resp_param_str(cmd, entry->hash, 40); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_ctx_srem_callback, ctx); + sdsfree(cmd); + } +} + +/* + * Sweep phase: issue all deletes and set-removes in parallel. + * + * Operations (all parallel): + * 1 DEL of the 5 per-series keys + * nmetric_ids SREMs from pcp:series:metric.name: + * ninst_hashes per-instance chains (HGET → SREM → EXISTS → maybe DEL) + * nlabels SREMs from pcp:series:label..value: + * 1 SMEMBERS pcp:context.name:source: → N context SREMs + */ +static void +keys_series_gc_sweep(seriesGCEntry *entry) +{ + seriesGCInstCtx *ictx; + seriesGCCtxCtx *cctx; + char hexbuf[42], nhex[42], vhex[42], chhex[42]; + sds cmd, key; + unsigned int refs, i; + + entry->parent->ncleaned++; + + if (entry->dryrun) { + seriesGCBaton *parent = entry->parent; + sds msg = NULL; + + infofmt(msg, "GC dryrun: %s (%u metric names, %u instances, %u labels)", + entry->hash, entry->nmetric_ids, + entry->ninst_hashes, entry->nlabels); + batoninfo(entry, PMLOG_INFO, msg); + /* + * Sweep is entered with entry refcount already at 0 (the last + * collect callback triggered it). Do not go through doneSeriesGCEntry + * which asserts refcount > 0; free directly and notify the parent. + */ + freeSeriesGCEntry(entry); + doneSeriesGCBaton(parent, "keys_series_gc_sweep dryrun"); + return; + } + + /* + * Pre-declare all refs before issuing any async command so no callback + * can race to refcount=0 before we are done issuing. + * + * Refs needed: + * 1 (DEL per-series keys) + * nmetric_ids (SREM metric.name forward sets) + * ninst_hashes (per-instance chains, one ref per instance) + * nlabels (SREM label forward sets) + * 1 (SMEMBERS context.name:source for context SREMs) + */ + refs = 1 + entry->nmetric_ids + entry->ninst_hashes + entry->nlabels + 1; + seriesBatonReferences(entry, refs, "keys_series_gc_sweep"); + + /* 1. DEL all five per-series keys in a single command */ + cmd = resp_command(6); + cmd = resp_param_str(cmd, DEL, DEL_LEN); + key = sdscatfmt(sdsempty(), "pcp:desc:series:%s", entry->hash); + cmd = resp_param_sds(cmd, key); sdsfree(key); + key = sdscatfmt(sdsempty(), "pcp:metric.name:series:%s", entry->hash); + cmd = resp_param_sds(cmd, key); sdsfree(key); + key = sdscatfmt(sdsempty(), "pcp:instances:series:%s", entry->hash); + cmd = resp_param_sds(cmd, key); sdsfree(key); + key = sdscatfmt(sdsempty(), "pcp:labelvalue:series:%s", entry->hash); + cmd = resp_param_sds(cmd, key); sdsfree(key); + key = sdscatfmt(sdsempty(), "pcp:labelflags:series:%s", entry->hash); + cmd = resp_param_sds(cmd, key); sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_done_callback, entry); + sdsfree(cmd); + + /* 2. SREM series H from pcp:series:metric.name: */ + for (i = 0; i < entry->nmetric_ids; i++) { + pmwebapi_hash_str((unsigned char *)entry->metric_ids[i], + hexbuf, sizeof(hexbuf)); + key = sdscatfmt(sdsempty(), "pcp:series:metric.name:%s", hexbuf); + cmd = resp_command(3); + cmd = resp_param_str(cmd, SREM, SREM_LEN); + cmd = resp_param_sds(cmd, key); + cmd = resp_param_str(cmd, entry->hash, 40); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_done_callback, entry); + sdsfree(cmd); + } + + /* 3. Per-instance chain: HGET name_id → SREM → EXISTS → maybe DEL */ + for (i = 0; i < entry->ninst_hashes; i++) { + pmwebapi_hash_str((unsigned char *)entry->inst_hashes[i], + hexbuf, sizeof(hexbuf)); + if ((ictx = calloc(1, sizeof(seriesGCInstCtx))) == NULL) { + doneSeriesGCEntry(entry, "keys_series_gc_sweep ictx OOM"); + continue; + } + ictx->entry = entry; + strncpy(ictx->ih_hex, hexbuf, sizeof(ictx->ih_hex) - 1); + + key = sdscatfmt(sdsempty(), "pcp:inst:series:%s", hexbuf); + cmd = resp_command(3); + cmd = resp_param_str(cmd, HGET, HGET_LEN); + cmd = resp_param_sds(cmd, key); + cmd = resp_param_str(cmd, "name", sizeof("name")-1); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_inst_hget_callback, ictx); + sdsfree(cmd); + } + + /* 4. SREM series H from pcp:series:label..value: */ + for (i = 0; i < entry->nlabels; i++) { + pmwebapi_hash_str((unsigned char *)entry->label_name_ids[i], + nhex, sizeof(nhex)); + pmwebapi_hash_str((unsigned char *)entry->label_val_ids[i], + vhex, sizeof(vhex)); + key = sdscatfmt(sdsempty(), "pcp:series:label.%s.value:%s", nhex, vhex); + cmd = resp_command(3); + cmd = resp_param_str(cmd, SREM, SREM_LEN); + cmd = resp_param_sds(cmd, key); + cmd = resp_param_str(cmd, entry->hash, 40); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_done_callback, entry); + sdsfree(cmd); + } + + /* 5. Resolve context source IDs then SREM from pcp:series:context.name:* */ + if ((cctx = calloc(1, sizeof(seriesGCCtxCtx))) == NULL) { + doneSeriesGCEntry(entry, "keys_series_gc_sweep cctx OOM"); + } else { + cctx->entry = entry; + pmwebapi_hash_str(entry->context_hash, chhex, sizeof(chhex)); + key = sdscatfmt(sdsempty(), "pcp:context.name:source:%s", chhex); + cmd = resp_command(2); + cmd = resp_param_str(cmd, SMEMBERS, SMEMBERS_LEN); + cmd = resp_param_sds(cmd, key); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_ctx_smembers_callback, cctx); + sdsfree(cmd); + } +} + +/* + * Metadata collection callbacks (4 parallel reads). + * Each callback stores its result then decrements the entry refcount. + * The last one to finish (refcount → 0) triggers the sweep. + */ +static void +keys_series_gc_metric_ids_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCEntry *entry = (seriesGCEntry *)arg; + respReply *reply = r; + unsigned int i; + + seriesBatonCheckMagic(entry, MAGIC_GC, "keys_series_gc_metric_ids_callback"); + + if (reply && reply->type == RESP_REPLY_ARRAY && reply->elements > 0) { + if ((entry->metric_ids = calloc(reply->elements, sizeof(sds))) != NULL) { + for (i = 0; i < reply->elements; i++) { + if (reply->element[i]->type == RESP_REPLY_STRING && + reply->element[i]->len == 20) + entry->metric_ids[entry->nmetric_ids++] = + sdsnewlen(reply->element[i]->str, 20); + } + } + } + if (seriesBatonDereference(entry, "keys_series_gc_metric_ids_callback")) + keys_series_gc_sweep(entry); +} + +static void +keys_series_gc_instances_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCEntry *entry = (seriesGCEntry *)arg; + respReply *reply = r; + unsigned int i; + + seriesBatonCheckMagic(entry, MAGIC_GC, "keys_series_gc_instances_callback"); + + if (reply && reply->type == RESP_REPLY_ARRAY && reply->elements > 0) { + if ((entry->inst_hashes = calloc(reply->elements, sizeof(sds))) != NULL) { + for (i = 0; i < reply->elements; i++) { + if (reply->element[i]->type == RESP_REPLY_STRING && + reply->element[i]->len == 20) + entry->inst_hashes[entry->ninst_hashes++] = + sdsnewlen(reply->element[i]->str, 20); + } + } + } + if (seriesBatonDereference(entry, "keys_series_gc_instances_callback")) + keys_series_gc_sweep(entry); +} + +static void +keys_series_gc_labelvalue_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCEntry *entry = (seriesGCEntry *)arg; + respReply *reply = r; + unsigned int i, npairs; + + seriesBatonCheckMagic(entry, MAGIC_GC, "keys_series_gc_labelvalue_callback"); + + /* HGETALL returns alternating field/value pairs */ + if (reply && reply->type == RESP_REPLY_ARRAY && + reply->elements > 0 && (reply->elements % 2) == 0) { + npairs = reply->elements / 2; + entry->label_name_ids = calloc(npairs, sizeof(sds)); + entry->label_val_ids = calloc(npairs, sizeof(sds)); + if (entry->label_name_ids && entry->label_val_ids) { + for (i = 0; i < reply->elements; i += 2) { + if (reply->element[i]->type == RESP_REPLY_STRING && + reply->element[i]->len == 20 && + reply->element[i+1]->type == RESP_REPLY_STRING && + reply->element[i+1]->len == 20) { + entry->label_name_ids[entry->nlabels] = + sdsnewlen(reply->element[i]->str, 20); + entry->label_val_ids[entry->nlabels] = + sdsnewlen(reply->element[i+1]->str, 20); + entry->nlabels++; + } + } + } + } + if (seriesBatonDereference(entry, "keys_series_gc_labelvalue_callback")) + keys_series_gc_sweep(entry); +} + +static void +keys_series_gc_source_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCEntry *entry = (seriesGCEntry *)arg; + respReply *reply = r; + + seriesBatonCheckMagic(entry, MAGIC_GC, "keys_series_gc_source_callback"); + + if (reply && reply->type == RESP_REPLY_STRING && reply->len == 20) + memcpy(entry->context_hash, reply->str, 20); + + if (seriesBatonDereference(entry, "keys_series_gc_source_callback")) + keys_series_gc_sweep(entry); +} + +/* + * Collect metadata for a stale series in four parallel reads. + * + * On entry the baton holds one ref (carried from the EXISTS check callback). + * We add 3 more to reach 4 total — one per collect callback. + * The last callback to fire (refcount → 0) calls keys_series_gc_sweep. + */ +static void +keys_series_gc_collect(seriesGCEntry *entry) +{ + sds cmd, key; + + seriesBatonReferences(entry, 3, "keys_series_gc_collect"); + + /* SMEMBERS pcp:metric.name:series:H → metric name ID hashes */ + key = sdscatfmt(sdsempty(), "pcp:metric.name:series:%s", entry->hash); + cmd = resp_command(2); + cmd = resp_param_str(cmd, SMEMBERS, SMEMBERS_LEN); + cmd = resp_param_sds(cmd, key); sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_metric_ids_callback, entry); + sdsfree(cmd); + + /* SMEMBERS pcp:instances:series:H → instance name hashes */ + key = sdscatfmt(sdsempty(), "pcp:instances:series:%s", entry->hash); + cmd = resp_command(2); + cmd = resp_param_str(cmd, SMEMBERS, SMEMBERS_LEN); + cmd = resp_param_sds(cmd, key); sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_instances_callback, entry); + sdsfree(cmd); + + /* HGETALL pcp:labelvalue:series:H → label name_id/value_id pairs */ + key = sdscatfmt(sdsempty(), "pcp:labelvalue:series:%s", entry->hash); + cmd = resp_command(2); + cmd = resp_param_str(cmd, HGETALL, HGETALL_LEN); + cmd = resp_param_sds(cmd, key); sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_labelvalue_callback, entry); + sdsfree(cmd); + + /* HGET pcp:desc:series:H source → context name hash */ + key = sdscatfmt(sdsempty(), "pcp:desc:series:%s", entry->hash); + cmd = resp_command(3); + cmd = resp_param_str(cmd, HGET, HGET_LEN); + cmd = resp_param_sds(cmd, key); + cmd = resp_param_str(cmd, "source", sizeof("source")-1); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_source_callback, entry); + sdsfree(cmd); +} + +/* + * EXISTS check: is the stream still alive? + */ +static void +keys_series_gc_check_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCEntry *entry = (seriesGCEntry *)arg; + respReply *reply = r; + + seriesBatonCheckMagic(entry, MAGIC_GC, "keys_series_gc_check_callback"); + entry->parent->nscanned++; + + if (reply && reply->type == RESP_REPLY_INTEGER && reply->integer > 0) { + /* stream still alive */ + doneSeriesGCEntry(entry, "keys_series_gc_check_callback alive"); + return; + } + + /* + * Stream gone. The ref carried from keys_series_gc_check is held here; + * pass it into the collect phase as one of the 4 total collect refs. + * keys_series_gc_collect adds 3 more, for 4 total. + */ + keys_series_gc_collect(entry); +} + +static void +keys_series_gc_check(seriesGCEntry *entry) +{ + sds cmd, key; + + seriesBatonReference(entry, "keys_series_gc_check"); + key = sdscatfmt(sdsempty(), "pcp:values:series:%s", entry->hash); + cmd = resp_command(2); + cmd = resp_param_str(cmd, EXISTS, EXISTS_LEN); + cmd = resp_param_sds(cmd, key); + sdsfree(key); + keySlotsRequestFirstNode(entry->slots, cmd, + keys_series_gc_check_callback, entry); + sdsfree(cmd); +} + +/* + * SCAN callback: parse one page of results, kick off EXISTS check per series. + */ +static void +keys_series_gc_scan_callback(keyClusterAsyncContext *c, void *r, void *arg) +{ + seriesGCBaton *baton = (seriesGCBaton *)arg; + respReply *reply = r, *cursor_r, *keys_r, *k; + seriesGCEntry *entry; + unsigned long long cursor; + const char *p; + unsigned int i; + + seriesBatonCheckMagic(baton, MAGIC_GC, "keys_series_gc_scan_callback"); + + if (reply == NULL || reply->type != RESP_REPLY_ARRAY || reply->elements != 2) { + doneSeriesGCBaton(baton, "keys_series_gc_scan_callback bad reply"); + return; + } + + cursor_r = reply->element[0]; + keys_r = reply->element[1]; + + if (cursor_r->type != RESP_REPLY_STRING || keys_r->type != RESP_REPLY_ARRAY) { + doneSeriesGCBaton(baton, "keys_series_gc_scan_callback bad types"); + return; + } + + cursor = strtoull(cursor_r->str, NULL, 10); + sdsfree(baton->cursor); + baton->cursor = sdsnew(cursor_r->str); + + /* For each key, spawn an entry and an EXISTS check */ + for (i = 0; i < keys_r->elements; i++) { + k = keys_r->element[i]; + if (k->type != RESP_REPLY_STRING) + continue; + + /* Key format: "pcp:desc:series:<40-char-hex>" */ + p = strrchr(k->str, ':'); + if (p == NULL || strlen(p + 1) != 40) + continue; + p++; /* advance past the final ':' to the 40-char hash */ + + if ((entry = initSeriesGCEntry(baton, p)) == NULL) + continue; + + /* Hold a parent baton ref until this entry has finished */ + seriesBatonReference(baton, "keys_series_gc_scan_callback entry"); + keys_series_gc_check(entry); + } + + /* If more pages remain, queue the next SCAN */ + if (cursor > 0) + keys_series_gc_scan(baton); + + /* Release the ref held for this SCAN page */ + doneSeriesGCBaton(baton, "keys_series_gc_scan_callback"); +} + +/* + * Issue one SCAN page. + * + * Note: keySlotsRequestFirstNode is used because SCAN is node-specific. + * In Redis Cluster mode this only scans the first node; full cluster + * support (iterating all nodes) is deferred to a future enhancement. + */ +static void +keys_series_gc_scan(seriesGCBaton *baton) +{ + sds cmd; + + seriesBatonReference(baton, "keys_series_gc_scan"); + cmd = resp_command(6); + cmd = resp_param_str(cmd, SCAN, SCAN_LEN); + cmd = resp_param_sds(cmd, baton->cursor); + cmd = resp_param_str(cmd, "MATCH", sizeof("MATCH")-1); + cmd = resp_param_str(cmd, "pcp:desc:series:*", sizeof("pcp:desc:series:*")-1); + cmd = resp_param_str(cmd, "COUNT", sizeof("COUNT")-1); + cmd = resp_param_str(cmd, "200", sizeof("200")-1); + keySlotsRequestFirstNode(baton->slots, cmd, + keys_series_gc_scan_callback, baton); + sdsfree(cmd); +} + +/* + * pmSeriesGC - scan for expired series and clean up ancillary keys. + * + * Connects via the established keySlots, SCAs pcp:desc:series:* and for + * each series whose pcp:values:series:H stream has expired, removes: + * - all per-series descriptor / mapping keys + * - the series' membership from forward-lookup sets + * - orphaned pcp:inst:series:* hashes where the instance set became empty + * + * On completion the on_done callback fires and three MMV counters are updated + * (series.gc.calls, series.gc.scanned, series.gc.cleaned) for pmproxy. + */ +int +pmSeriesGC(pmSeriesSettings *settings, pmSeriesFlags flags, void *arg) +{ + seriesModuleData *data = getSeriesModuleData(&settings->module); + seriesGCBaton *baton; + int dryrun; + + if (data == NULL || data->slots == NULL) + return -ENOTCONN; + + dryrun = (flags & PM_SERIES_FLAG_DRYRUN) ? 1 : 0; + if ((baton = initSeriesGCBaton(data->slots, data, settings, dryrun, arg)) == NULL) + return -ENOMEM; + + /* keys_series_gc_scan adds its own ref; no extra ref needed here */ + keys_series_gc_scan(baton); + return 0; +} + void pmSeriesClose(pmSeriesModule *module) { diff --git a/src/libpcp_web/src/schema.h b/src/libpcp_web/src/schema.h index 160f32e1286..28e028b24c0 100644 --- a/src/libpcp_web/src/schema.h +++ b/src/libpcp_web/src/schema.h @@ -28,8 +28,12 @@ #define COMMAND_LEN (sizeof(COMMAND)-1) #define CLUSTER "CLUSTER" #define CLUSTER_LEN (sizeof(CLUSTER)-1) +#define DEL "DEL" +#define DEL_LEN (sizeof(DEL)-1) #define EVALSHA "EVALSHA" #define EVALSHA_LEN (sizeof(EVALSHA)-1) +#define EXISTS "EXISTS" +#define EXISTS_LEN (sizeof(EXISTS)-1) #define EXPIRE "EXPIRE" #define EXPIRE_LEN (sizeof(EXPIRE)-1) #define GEOADD "GEOADD" @@ -60,10 +64,16 @@ #define PUBLISH_LEN (sizeof(PUBLISH)-1) #define SADD "SADD" #define SADD_LEN (sizeof(SADD)-1) +#define SCAN "SCAN" +#define SCAN_LEN (sizeof(SCAN)-1) +#define SCARD "SCARD" +#define SCARD_LEN (sizeof(SCARD)-1) #define SETS "SET" #define SETS_LEN (sizeof(SETS)-1) #define SMEMBERS "SMEMBERS" #define SMEMBERS_LEN (sizeof(SMEMBERS)-1) +#define SREM "SREM" +#define SREM_LEN (sizeof(SREM)-1) #define XADD "XADD" #define XADD_LEN (sizeof(XADD)-1) #define XRANGE "XRANGE" @@ -176,6 +186,9 @@ enum { SERIES_LABELS_CALLS, SERIES_LABELVALUES_CALLS, SERIES_LOAD_CALLS, + SERIES_GC_CALLS, + SERIES_GC_SCANNED, + SERIES_GC_CLEANED, NUM_SERIES_METRIC }; @@ -199,4 +212,43 @@ extern void pmSeriesStatsSet(pmSeriesModule *, const char *, const char *, doubl extern void keysSchemaLoad(keySlots *, keySlotsFlags, keysInfoCallBack, keysDoneCallBack, void *, void *, void *); +/* + * GC baton tracking one full SCAN pass over pcp:desc:series:* + */ +typedef struct seriesGCBaton { + seriesBatonMagic header; /* MAGIC_GC */ + keySlots *slots; + void *module; /* seriesModuleData * for metric updates */ + sds cursor; /* current SCAN cursor */ + unsigned int nscanned; /* total series checked this run */ + unsigned int ncleaned; /* total series GC'd this run */ + int dryrun; /* 1 = log only, no writes */ + pmLogInfoCallBack info; + pmSeriesDoneCallBack done; + void *userdata; +} seriesGCBaton; + +/* + * Per-series baton; lives for the duration of one series cleanup + */ +typedef struct seriesGCEntry { + seriesBatonMagic header; /* MAGIC_GC */ + keySlots *slots; + char hash[42]; /* series hash hex (NUL-terminated) */ + int dryrun; + pmLogInfoCallBack info; + void *userdata; + seriesGCBaton *parent; + + /* metadata collected in parallel before the sweep phase */ + sds *metric_ids; /* binary SHA20 array */ + unsigned int nmetric_ids; + sds *inst_hashes; /* binary SHA20 array (inst name.hash) */ + unsigned int ninst_hashes; + sds *label_name_ids; /* parallel arrays, binary SHA20 */ + sds *label_val_ids; + unsigned int nlabels; + unsigned char context_hash[20]; /* "source" field from pcp:desc:series:H */ +} seriesGCEntry; + #endif /* SERIES_SCHEMA_H */ diff --git a/src/pmseries/pmseries.c b/src/pmseries/pmseries.c index 5059b9ec68d..79cab0d7075 100644 --- a/src/pmseries/pmseries.c +++ b/src/pmseries/pmseries.c @@ -44,6 +44,8 @@ typedef enum series_flags { PMSERIES_OPT_QUERY = (1<<23), /* -q, --query option (default) */ PMSERIES_OPT_VALUES = (1<<24), /* -v, --values option */ PMSERIES_OPT_WINDOW = (1<<25), /* -w, --window option */ + PMSERIES_OPT_GC = (1<<26), /* -G, --gc option */ + PMSERIES_OPT_DRYRUN = (1<<27), /* -N, --dryrun option */ } series_flags; #define PMSERIES_META_OPTS (PMSERIES_OPT_DESC | PMSERIES_OPT_INSTS | \ @@ -345,6 +347,17 @@ series_type_phrase(const char *type_word) return "???"; } +static void +series_gc(series_data *dp) +{ + pmSeriesFlags flags = dp->flags & PMSERIES_OPT_DRYRUN ? + PM_SERIES_FLAG_DRYRUN : 0; + int sts; + + if ((sts = pmSeriesGC(&dp->settings, flags, dp)) < 0) + on_series_done(sts, dp); +} + static void series_load(series_data *dp) { @@ -1117,7 +1130,9 @@ on_series_setup(void *arg) sdsfree(msg); } - if (flags & PMSERIES_OPT_LOAD) + if (flags & PMSERIES_OPT_GC) + series_gc(dp); + else if (flags & PMSERIES_OPT_LOAD) series_load(dp); else if (flags & PMSERIES_OPT_QUERY) series_query(dp); @@ -1201,8 +1216,8 @@ static int pmseries_overrides(int opt, pmOptions *opts) { switch (opt) { - case 'a': case 'h': case 'g': case 'L': case 'n': - case 'p': case 's': case 'S': case 't': case 'Z': + case 'a': case 'G': case 'h': case 'g': case 'L': case 'n': + case 'N': case 'p': case 's': case 'S': case 't': case 'Z': return 1; } return 0; @@ -1233,6 +1248,8 @@ static pmLongOptions longopts[] = { { "load", 0, 'L', 0, "load time series values and metadata" }, { "query", 0, 'q', 0, "perform a time series query (default)" }, { "values", 0, 'v', 0, "extract values for given series or label(s)" }, + { "gc", 0, 'G', 0, "garbage collect expired time series" }, + { "dryrun", 0, 'N', 0, "with --gc, log what would be removed without writing" }, PMOPT_DEBUG, PMAPI_OPTIONS_HEADER("Reporting Options"), { "all", 0, 'a', 0, "report all metadata (-dilms) for time series" }, @@ -1257,7 +1274,7 @@ static pmLongOptions longopts[] = { static pmOptions opts = { .flags = PM_OPTFLAG_BOUNDARIES, - .short_options = "ac:dD:Fg:h:iIlLmMnqp:sStvVw:Z:?", + .short_options = "ac:dD:Fg:Gh:iIlLmMnNqp:sStvVw:Z:?", .long_options = longopts, .short_usage = "[options] [query ... | labels ... | series ... | source ...]", .override = pmseries_overrides, @@ -1306,6 +1323,10 @@ main(int argc, char *argv[]) flags |= PMSERIES_FAST; break; + case 'G': /* garbage collect expired series from key server */ + flags |= PMSERIES_OPT_GC; + break; + case 'g': match = sdsnew(opts.optarg); break; @@ -1343,6 +1364,10 @@ main(int argc, char *argv[]) flags |= (PMSERIES_OPT_LABELS|PMSERIES_ONLY_NAMES); break; + case 'N': /* dry-run for --gc: log only, no writes */ + flags |= PMSERIES_OPT_DRYRUN; + break; + case 'p': /* key server port to connect to */ keys_port = (unsigned int)strtol(opts.optarg, NULL, 10); break; @@ -1414,7 +1439,18 @@ main(int argc, char *argv[]) if (flags & PMSERIES_OPT_ALL) flags |= PMSERIES_META_OPTS; - if ((flags & PMSERIES_OPT_LOAD) && (flags & (PMSERIES_META_OPTS | + if ((flags & PMSERIES_OPT_GC) && (flags & (PMSERIES_OPT_LOAD | + PMSERIES_OPT_QUERY | PMSERIES_OPT_WINDOW | PMSERIES_OPT_VALUES | + PMSERIES_META_OPTS | PMSERIES_OPT_SOURCE))) { + pmprintf("%s: error - cannot combine --gc with other operation options\n", + pmGetProgname()); + opts.errors++; + } + else if ((flags & PMSERIES_OPT_DRYRUN) && !(flags & PMSERIES_OPT_GC)) { + pmprintf("%s: error - --dryrun requires --gc\n", pmGetProgname()); + opts.errors++; + } + else if ((flags & PMSERIES_OPT_LOAD) && (flags & (PMSERIES_META_OPTS | PMSERIES_OPT_SOURCE | PMSERIES_OPT_VALUES | PMSERIES_OPT_WINDOW))) { pmprintf("%s: error - cannot use load and reporting options together\n", pmGetProgname()); @@ -1450,7 +1486,7 @@ main(int argc, char *argv[]) * If all parameters are series hashes, assume --all metadata * mode otherwise assume its a --query request. */ - if (!(flags & (PMSERIES_META_OPTS | PMSERIES_OPT_LOAD)) && + if (!(flags & (PMSERIES_META_OPTS | PMSERIES_OPT_LOAD | PMSERIES_OPT_GC)) && !(flags & (PMSERIES_OPT_SOURCE | PMSERIES_OPT_VALUES)) && !(flags & (PMSERIES_NEED_DESCS | PMSERIES_NEED_INSTS))) { for (c = opts.optind; c < argc; c++) { @@ -1463,7 +1499,8 @@ main(int argc, char *argv[]) flags |= PMSERIES_OPT_ALL | PMSERIES_META_OPTS | PMSERIES_SERIESID; } - if (opts.optind == argc && !opts.errors && !(opts.flags & PM_OPTFLAG_EXIT)) { + if (opts.optind == argc && !opts.errors && !(opts.flags & PM_OPTFLAG_EXIT) && + !(flags & PMSERIES_OPT_GC)) { if ((flags & PMSERIES_OPT_QUERY)) { pmprintf("%s: error - no query string provided\n", pmGetProgname());