diff --git a/bitcoin/block.c b/bitcoin/block.c index 5a36a7dfc890..6838f2c39344 100644 --- a/bitcoin/block.c +++ b/bitcoin/block.c @@ -228,16 +228,24 @@ void bitcoin_block_blkid(const struct bitcoin_block *b, *out = b->hdr.hash; } -static bool bitcoin_blkid_to_hex(const struct bitcoin_blkid *blockid, - char *hexstr, size_t hexstr_len) +bool bitcoin_blkid_from_hex(const char *hexstr, size_t hexstr_len, + struct bitcoin_blkid *blkid) { - struct bitcoin_txid fake_txid; - fake_txid.shad = blockid->shad; - return bitcoin_txid_to_hex(&fake_txid, hexstr, hexstr_len); + if (!hex_decode(hexstr, hexstr_len, blkid, sizeof(*blkid))) + return false; + reverse_bytes(blkid->shad.sha.u.u8, sizeof(blkid->shad.sha.u.u8)); + return true; } -char *fmt_bitcoin_blkid(const tal_t *ctx, - const struct bitcoin_blkid *blkid) +bool bitcoin_blkid_to_hex(const struct bitcoin_blkid *blkid, + char *hexstr, size_t hexstr_len) +{ + struct sha256_double rev = blkid->shad; + reverse_bytes(rev.sha.u.u8, sizeof(rev.sha.u.u8)); + return hex_encode(&rev, sizeof(rev), hexstr, hexstr_len); +} + +char *fmt_bitcoin_blkid(const tal_t *ctx, const struct bitcoin_blkid *blkid) { char *hexstr = tal_arr(ctx, char, hex_str_size(sizeof(*blkid))); diff --git a/bitcoin/block.h b/bitcoin/block.h index a3289ee7cc14..8a945ca46af9 100644 --- a/bitcoin/block.h +++ b/bitcoin/block.h @@ -52,6 +52,11 @@ void fromwire_chainparams(const u8 **cursor, size_t *max, const struct chainparams **chainparams); void towire_chainparams(u8 **cursor, const struct chainparams *chainparams); +bool bitcoin_blkid_from_hex(const char *hexstr, size_t hexstr_len, + struct bitcoin_blkid *blkid); +bool bitcoin_blkid_to_hex(const struct bitcoin_blkid *blkid, + char *hexstr, size_t hexstr_len); + char *fmt_bitcoin_blkid(const tal_t *ctx, const struct bitcoin_blkid *blkid); diff --git a/bitcoin/test/run-bitcoin_block_from_hex.c b/bitcoin/test/run-bitcoin_block_from_hex.c index 0dc0bd640fc8..94b26ad5d05f 100644 --- a/bitcoin/test/run-bitcoin_block_from_hex.c +++ b/bitcoin/test/run-bitcoin_block_from_hex.c @@ -62,15 +62,6 @@ static const char block[] = STRUCTEQ_DEF(sha256_double, 0, sha); -static bool bitcoin_blkid_from_hex(const char *hexstr, size_t hexstr_len, - struct bitcoin_blkid *blockid) -{ - struct bitcoin_txid fake_txid; - if (!bitcoin_txid_from_hex(hexstr, hexstr_len, &fake_txid)) - return false; - blockid->shad = fake_txid.shad; - return true; -} int main(int argc, const char *argv[]) { struct bitcoin_blkid prev; diff --git a/ccan/ccan/json_out/json_out.c b/ccan/ccan/json_out/json_out.c index 9e371343ac75..915d525ef406 100644 --- a/ccan/ccan/json_out/json_out.c +++ b/ccan/ccan/json_out/json_out.c @@ -306,7 +306,7 @@ bool json_out_addstrn(struct json_out *jout, struct json_escape *e; if (json_escape_needed(str, len)) { - e = json_escape(NULL, str); + e = json_escape_len(NULL, str, len); str = e->s; len = strlen(str); } else diff --git a/common/json_param.c b/common/json_param.c index 1b529707016f..18bd284c0f22 100644 --- a/common/json_param.c +++ b/common/json_param.c @@ -478,6 +478,22 @@ struct command_result *param_string_or_array(struct command *cmd, const char *na return param_string(cmd, name, buffer, tok, &(*result)->str); } +struct command_result *param_string_array(struct command *cmd, const char *name, + const char *buffer, const jsmntok_t *tok, + const char ***arr) +{ + size_t i; + const jsmntok_t *s; + + if (tok->type != JSMN_ARRAY) + return command_fail_badparam(cmd, name, buffer, tok, + "should be an array"); + *arr = tal_arr(cmd, const char *, tok->size); + json_for_each_arr(i, s, tok) + (*arr)[i] = json_strdup(*arr, buffer, s); + return NULL; +} + struct command_result *param_invstring(struct command *cmd, const char *name, const char * buffer, const jsmntok_t *tok, const char **str) diff --git a/common/json_param.h b/common/json_param.h index a000c71b1179..54d2fa4236b5 100644 --- a/common/json_param.h +++ b/common/json_param.h @@ -206,6 +206,11 @@ struct command_result *param_string_or_array(struct command *cmd, const char *na const char * buffer, const jsmntok_t *tok, struct str_or_arr **result); +/* Array of strings */ +struct command_result *param_string_array(struct command *cmd, const char *name, + const char *buffer, const jsmntok_t *tok, + const char ***arr); + /* Extract an invoice string from a generic string, strip the `lightning:` * prefix from it if needed. */ struct command_result *param_invstring(struct command *cmd, const char *name, diff --git a/common/json_parse.c b/common/json_parse.c index 7841e0827ff9..5c1c68310b58 100644 --- a/common/json_parse.c +++ b/common/json_parse.c @@ -601,6 +601,13 @@ bool json_to_txid(const char *buffer, const jsmntok_t *tok, tok->end - tok->start, txid); } +bool json_to_bitcoin_blkid(const char *buffer, const jsmntok_t *tok, + struct bitcoin_blkid *blkid) +{ + return bitcoin_blkid_from_hex(buffer + tok->start, + tok->end - tok->start, blkid); +} + bool json_to_outpoint(const char *buffer, const jsmntok_t *tok, struct bitcoin_outpoint *op) { diff --git a/common/json_parse.h b/common/json_parse.h index 4706c3775f30..4c739154c4d9 100644 --- a/common/json_parse.h +++ b/common/json_parse.h @@ -108,6 +108,10 @@ bool json_to_msat(const char *buffer, const jsmntok_t *tok, bool json_to_txid(const char *buffer, const jsmntok_t *tok, struct bitcoin_txid *txid); +/* Extract a bitcoin blkid from this */ +bool json_to_bitcoin_blkid(const char *buffer, const jsmntok_t *tok, + struct bitcoin_blkid *blkid); + /* Extract a bitcoin outpoint from this */ bool json_to_outpoint(const char *buffer, const jsmntok_t *tok, struct bitcoin_outpoint *op); diff --git a/common/json_parse_simple.c b/common/json_parse_simple.c index 348be9ef6976..fbb2b12c67b5 100644 --- a/common/json_parse_simple.c +++ b/common/json_parse_simple.c @@ -2,6 +2,7 @@ #include "config.h" #include #include +#include #include #include #include @@ -151,6 +152,18 @@ bool json_to_bool(const char *buffer, const jsmntok_t *tok, bool *b) return false; } +bool json_hex_to_be32(const char *buffer, const jsmntok_t *tok, be32 *val) +{ + return hex_decode(buffer + tok->start, tok->end - tok->start, + val, sizeof(*val)); +} + +bool json_hex_to_be64(const char *buffer, const jsmntok_t *tok, be64 *val) +{ + return hex_decode(buffer + tok->start, tok->end - tok->start, + val, sizeof(*val)); +} + bool json_tok_is_num(const char *buffer, const jsmntok_t *tok) { diff --git a/common/json_parse_simple.h b/common/json_parse_simple.h index 0882812d76d4..56f97e2c14a1 100644 --- a/common/json_parse_simple.h +++ b/common/json_parse_simple.h @@ -2,6 +2,7 @@ #ifndef LIGHTNING_COMMON_JSON_PARSE_SIMPLE_H #define LIGHTNING_COMMON_JSON_PARSE_SIMPLE_H #include "config.h" +#include #include #include @@ -51,6 +52,12 @@ bool json_to_double(const char *buffer, const jsmntok_t *tok, double *num); /* Extract boolean from this */ bool json_to_bool(const char *buffer, const jsmntok_t *tok, bool *b); +/* Extract big-endian 32-bit from hex string (for datastore) */ +bool json_hex_to_be32(const char *buffer, const jsmntok_t *tok, be32 *val); + +/* Extract big-endian 64-bit from hex string (for datastore) */ +bool json_hex_to_be64(const char *buffer, const jsmntok_t *tok, be64 *val); + /* Is this a number? [0..9]+ */ bool json_tok_is_num(const char *buffer, const jsmntok_t *tok); diff --git a/common/json_stream.c b/common/json_stream.c index 6a0746074584..d1ec9d41c771 100644 --- a/common/json_stream.c +++ b/common/json_stream.c @@ -193,13 +193,22 @@ void json_add_primitive(struct json_stream *js, tal_free_if_taken(val); } +void json_add_stringn(struct json_stream *js, + const char *fieldname, + const char *str TAKES, + size_t len) +{ + if (json_filter_ok(js->filter, fieldname)) + json_out_addstrn(js->jout, fieldname, str, len); + if (taken(str)) + tal_free(str); +} + void json_add_string(struct json_stream *js, const char *fieldname, const char *str TAKES) { - if (json_filter_ok(js->filter, fieldname)) - json_out_addstr(js->jout, fieldname, str); - tal_free_if_taken(str); + json_add_stringn(js, fieldname, str, strlen(str)); } static char *json_member_direct(struct json_stream *js, @@ -298,13 +307,6 @@ void json_add_s32(struct json_stream *result, const char *fieldname, json_add_primitive_fmt(result, fieldname, "%d", value); } -void json_add_stringn(struct json_stream *result, const char *fieldname, - const char *value TAKES, size_t value_len) -{ - json_add_str_fmt(result, fieldname, "%.*s", (int)value_len, value); - tal_free_if_taken(value); -} - void json_add_bool(struct json_stream *result, const char *fieldname, bool value) { json_add_primitive(result, fieldname, value ? "true" : "false"); @@ -455,6 +457,15 @@ void json_add_txid(struct json_stream *result, const char *fieldname, json_add_string(result, fieldname, hex); } +void json_add_bitcoin_blkid(struct json_stream *result, const char *fieldname, + const struct bitcoin_blkid *blkid) +{ + char hex[hex_str_size(sizeof(*blkid))]; + + bitcoin_blkid_to_hex(blkid, hex, sizeof(hex)); + json_add_string(result, fieldname, hex); +} + void json_add_outpoint(struct json_stream *result, const char *fieldname, const struct bitcoin_outpoint *out) { diff --git a/common/json_stream.h b/common/json_stream.h index 7756c013d98f..3263dfd96d66 100644 --- a/common/json_stream.h +++ b/common/json_stream.h @@ -31,6 +31,7 @@ struct short_channel_id; struct sha256; struct preimage; struct bitcoin_tx; +struct bitcoin_blkid; struct wally_psbt; struct lease_rates; struct wireaddr; @@ -310,6 +311,10 @@ void json_add_channel_id(struct json_stream *response, void json_add_txid(struct json_stream *result, const char *fieldname, const struct bitcoin_txid *txid); +/* '"fieldname" : ' or "" if fieldname is NULL */ +void json_add_bitcoin_blkid(struct json_stream *result, const char *fieldname, + const struct bitcoin_blkid *blkid); + /* '"fieldname" : "txid:n" */ void json_add_outpoint(struct json_stream *result, const char *fieldname, const struct bitcoin_outpoint *out); diff --git a/doc/lightningd-config.5.md b/doc/lightningd-config.5.md index e85a1a19ce26..b118628405ba 100644 --- a/doc/lightningd-config.5.md +++ b/doc/lightningd-config.5.md @@ -575,6 +575,10 @@ command, so they invoices can also be paid onchain. The *name* is an ISO-4217 name (e.g. USD), which will be passed to *currencyrate* to fetch the exchange rate for that currency on each bookkeeper event. Setting *name* to the empty string is equivalent not setting it. +* **bwatch-poll-interval**=*MILLISECONDS* [plugin `bwatch`] + + Delay between polls for new blocks from `bitcoind` (default: 30000). + ### Networking options Note that for simple setups, the implicit *autolisten* option does the diff --git a/plugins/Makefile b/plugins/Makefile index 6840018d98a6..813626f01a10 100644 --- a/plugins/Makefile +++ b/plugins/Makefile @@ -17,6 +17,18 @@ PLUGIN_TXPREPARE_OBJS := $(PLUGIN_TXPREPARE_SRC:.c=.o) PLUGIN_BCLI_SRC := plugins/bcli.c PLUGIN_BCLI_OBJS := $(PLUGIN_BCLI_SRC:.c=.o) +PLUGIN_BWATCH_SRC := plugins/bwatch/bwatch.c \ + plugins/bwatch/bwatch_store.c \ + plugins/bwatch/bwatch_scanner.c \ + plugins/bwatch/bwatch_interface.c \ + plugins/bwatch/bwatch_wiregen.c +PLUGIN_BWATCH_HEADER := plugins/bwatch/bwatch.h \ + plugins/bwatch/bwatch_store.h \ + plugins/bwatch/bwatch_scanner.h \ + plugins/bwatch/bwatch_interface.h \ + plugins/bwatch/bwatch_wiregen.h +PLUGIN_BWATCH_OBJS := $(PLUGIN_BWATCH_SRC:.c=.o) + PLUGIN_COMMANDO_SRC := plugins/commando.c PLUGIN_COMMANDO_OBJS := $(PLUGIN_COMMANDO_SRC:.c=.o) @@ -82,6 +94,7 @@ PLUGIN_ALL_SRC := \ $(PLUGIN_AUTOCLEAN_SRC) \ $(PLUGIN_chanbackup_SRC) \ $(PLUGIN_BCLI_SRC) \ + $(PLUGIN_BWATCH_SRC) \ $(PLUGIN_COMMANDO_SRC) \ $(PLUGIN_FUNDER_SRC) \ $(PLUGIN_TOPOLOGY_SRC) \ @@ -102,12 +115,14 @@ PLUGIN_ALL_HEADER := \ $(PLUGIN_FUNDER_HEADER) \ $(PLUGIN_PAY_LIB_HEADER) \ $(PLUGIN_OFFERS_HEADER) \ - $(PLUGIN_SPENDER_HEADER) + $(PLUGIN_SPENDER_HEADER) \ + $(PLUGIN_BWATCH_HEADER) C_PLUGINS := \ plugins/autoclean \ plugins/chanbackup \ plugins/bcli \ + plugins/bwatch/bwatch \ plugins/commando \ plugins/funder \ plugins/topology \ @@ -185,6 +200,13 @@ plugins/exposesecret: $(PLUGIN_EXPOSESECRET_OBJS) $(PLUGIN_LIB_OBJS) libcommon.a plugins/bcli: $(PLUGIN_BCLI_OBJS) $(PLUGIN_LIB_OBJS) libcommon.a +plugins/bwatch/bwatch.o: $(PLUGIN_BWATCH_HEADER) +plugins/bwatch/bwatch_store.o: $(PLUGIN_BWATCH_HEADER) +plugins/bwatch/bwatch_scanner.o: $(PLUGIN_BWATCH_HEADER) +plugins/bwatch/bwatch_interface.o: $(PLUGIN_BWATCH_HEADER) +plugins/bwatch/bwatch_wiregen.o: $(PLUGIN_BWATCH_HEADER) +plugins/bwatch/bwatch: $(PLUGIN_BWATCH_OBJS) $(PLUGIN_LIB_OBJS) libcommon.a + plugins/keysend: $(PLUGIN_KEYSEND_OBJS) $(PLUGIN_LIB_OBJS) $(PLUGIN_PAY_LIB_OBJS) libcommon.a $(PLUGIN_KEYSEND_OBJS): $(PLUGIN_PAY_LIB_HEADER) libcommon.a diff --git a/plugins/bkpr/blockheights.c b/plugins/bkpr/blockheights.c index 35aa229e45df..5da70721353e 100644 --- a/plugins/bkpr/blockheights.c +++ b/plugins/bkpr/blockheights.c @@ -98,13 +98,6 @@ u32 find_blockheight(const struct bkpr *bkpr, return e ? e->height : 0; } -static bool json_hex_to_be32(const char *buffer, const jsmntok_t *tok, - be32 *val) -{ - return hex_decode(buffer + tok->start, tok->end - tok->start, - val, sizeof(*val)); -} - struct blockheights *init_blockheights(const tal_t *ctx, struct command *init_cmd) { diff --git a/plugins/bkpr/bookkeeper.c b/plugins/bkpr/bookkeeper.c index 8f61df1783c1..045350e73d71 100644 --- a/plugins/bkpr/bookkeeper.c +++ b/plugins/bkpr/bookkeeper.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -1836,13 +1837,6 @@ static const struct plugin_command commands[] = { }, }; -static bool json_hex_to_be64(const char *buffer, const jsmntok_t *tok, - be64 *val) -{ - return hex_decode(buffer + tok->start, tok->end - tok->start, - val, sizeof(*val)); -} - static void memleak_scan_currencyrates(struct htable *memtable, currencymap_t *currency_rates) { diff --git a/plugins/bwatch/bwatch.c b/plugins/bwatch/bwatch.c new file mode 100644 index 000000000000..4e5486089475 --- /dev/null +++ b/plugins/bwatch/bwatch.c @@ -0,0 +1,494 @@ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct bwatch *bwatch_of(struct plugin *plugin) +{ + return plugin_get_data(plugin, struct bwatch); +} + +/* + * ============================================================================ + * BLOCK PROCESSING: Polling + * + * Each cycle: getchaininfo → if blockcount > current_height, fetch the next + * block via getrawblockbyheight, append it to the in-memory history, persist + * it, and reschedule the next poll once the datastore write completes. + * + * Reorg detection (parent-hash mismatch) and watch matching land in + * subsequent commits. + * ============================================================================ + */ + +static struct command_result *handle_block(struct command *cmd, + const char *method, + const char *buf, + const jsmntok_t *result, + ptrint_t *block_height); + +/* Parse the bitcoin block out of a getrawblockbyheight response. */ +static struct bitcoin_block *block_from_response(const char *buf, + const jsmntok_t *result, + struct bitcoin_blkid *blockhash_out) +{ + const jsmntok_t *blocktok = json_get_member(buf, result, "block"); + struct bitcoin_block *block; + + if (!blocktok) + return NULL; + + block = bitcoin_block_from_hex(tmpctx, chainparams, + buf + blocktok->start, + blocktok->end - blocktok->start); + if (block && blockhash_out) + bitcoin_block_blkid(block, blockhash_out); + + return block; +} + +/* Fetch a block by height for normal polling. */ +static struct command_result *fetch_block_handle(struct command *cmd, + u32 height) +{ + struct out_req *req = jsonrpc_request_start(cmd, "getrawblockbyheight", + handle_block, handle_block, + int2ptr(height)); + json_add_u32(req->js, "height", height); + return send_outreq(req); +} + +/* Reschedule at the configured interval (used when there's nothing new to + * fetch, or on error). Once we're caught up to bitcoind's tip, this is + * what governs the steady-state poll cadence. */ +static struct command_result *poll_finished(struct command *cmd) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + + bwatch->poll_timer = global_timer(cmd->plugin, + time_from_msec(bwatch->poll_interval_ms), + bwatch_poll_chain, NULL); + return timer_complete(cmd); +} + +/* Send watch_revert for every owner affected by losing @removed_height. */ +static void bwatch_notify_reorg_watches(struct command *cmd, + struct bwatch *bwatch, + u32 removed_height) +{ + const char **owners = tal_arr(tmpctx, const char *, 0); + struct watch *w; + + /* Snapshot owners first; revert handlers may call watchman_del and + * mutate these tables. */ + + /* Scriptpubkey watches are perennial: always notify. */ + struct scriptpubkey_watches_iter sit; + for (w = scriptpubkey_watches_first(bwatch->scriptpubkey_watches, &sit); + w; + w = scriptpubkey_watches_next(bwatch->scriptpubkey_watches, &sit)) { + for (size_t i = 0; i < tal_count(w->owners); i++) + tal_arr_expand(&owners, w->owners[i]); + } + + /* Outpoint/scid/blockdepth: only notify watches whose anchor block is + * being torn down (start_block >= removed_height). Older long-lived + * watches stay armed and will refire naturally on the new chain. */ + struct outpoint_watches_iter oit; + for (w = outpoint_watches_first(bwatch->outpoint_watches, &oit); + w; + w = outpoint_watches_next(bwatch->outpoint_watches, &oit)) { + if (w->start_block < removed_height) + continue; + for (size_t i = 0; i < tal_count(w->owners); i++) + tal_arr_expand(&owners, w->owners[i]); + } + + struct scid_watches_iter scit; + for (w = scid_watches_first(bwatch->scid_watches, &scit); + w; + w = scid_watches_next(bwatch->scid_watches, &scit)) { + if (w->start_block < removed_height) + continue; + for (size_t i = 0; i < tal_count(w->owners); i++) + tal_arr_expand(&owners, w->owners[i]); + } + + struct blockdepth_watches_iter bdit; + for (w = blockdepth_watches_first(bwatch->blockdepth_watches, &bdit); + w; + w = blockdepth_watches_next(bwatch->blockdepth_watches, &bdit)) { + if (w->start_block < removed_height) + continue; + for (size_t i = 0; i < tal_count(w->owners); i++) + tal_arr_expand(&owners, w->owners[i]); + } + + for (size_t i = 0; i < tal_count(owners); i++) + bwatch_send_watch_revert(cmd, owners[i], removed_height); +} + +/* Remove tip block on reorg */ +void bwatch_remove_tip(struct command *cmd, struct bwatch *bwatch) +{ + const struct block_record_wire *newtip; + size_t count = tal_count(bwatch->block_history); + + if (count == 0) { + plugin_log(bwatch->plugin, LOG_BROKEN, + "remove_tip called with no block history!"); + return; + } + + plugin_log(bwatch->plugin, LOG_DBG, "Removing stale block %u: %s", + bwatch->current_height, + fmt_bitcoin_blkid(tmpctx, &bwatch->current_blockhash)); + + /* Notify owners of any watch affected by losing this block before we + * tear it down, so they can roll back in the same order things happened. */ + bwatch_notify_reorg_watches(cmd, bwatch, bwatch->current_height); + + /* Delete block from datastore */ + bwatch_delete_block_from_datastore(cmd, bwatch->current_height); + + /* Remove last block from history */ + tal_resize(&bwatch->block_history, count - 1); + + /* Move tip back one */ + newtip = bwatch_last_block(bwatch); + if (newtip) { + assert(newtip->height == bwatch->current_height - 1); + bwatch->current_height = newtip->height; + bwatch->current_blockhash = newtip->hash; + + /* Tell watchman the tip rolled back so it persists the new height+hash. + * If we crash before the ack, watchman's stale height > bwatch's height + * on restart, which naturally retriggers the rollback via getwatchmanheight. */ + bwatch_send_revert_block_processed(cmd, bwatch->current_height, + &bwatch->current_blockhash); + } else { + /* History exhausted: we've rolled back past everything we stored. + * Set current_height to 0 so getwatchmanheight_done can reset it to + * watchman_height. Don't notify watchman — it already knows its own + * height and we're about to resume from there via sequential polling. */ + bwatch->current_height = 0; + memset(&bwatch->current_blockhash, 0, sizeof(bwatch->current_blockhash)); + } +} + +/* Process or initialize from a block. */ +static struct command_result *handle_block(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + ptrint_t *block_heightptr) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + struct bitcoin_blkid blockhash; + struct bitcoin_block *block; + bool is_init = (bwatch->current_height == 0); + u32 block_height = ptr2int(block_heightptr); + + block = block_from_response(buf, result, &blockhash); + if (!block) { + plugin_log(cmd->plugin, LOG_UNUSUAL, + "Failed to get/parse block %u: '%.*s'", + block_height, + json_tok_full_len(result), + json_tok_full(buf, result)); + return poll_finished(cmd); + } + + if (!is_init) { + /* Verify the parent of the new block is our current tip; if + * not, we have a reorg. Pop the tip and refetch the block + * until we find a common ancestor, then roll forward from + * there. Skip when history is empty (rollback exhausted it). */ + if (tal_count(bwatch->block_history) > 0 && + !bitcoin_blkid_eq(&block->hdr.prev_hash, &bwatch->current_blockhash)) { + plugin_log(cmd->plugin, LOG_INFORM, + "Reorg detected at block %u: expected parent %s, got %s (fetched block hash: %s)", + block_height, + fmt_bitcoin_blkid(tmpctx, &bwatch->current_blockhash), + fmt_bitcoin_blkid(tmpctx, &block->hdr.prev_hash), + fmt_bitcoin_blkid(tmpctx, &blockhash)); + bwatch_remove_tip(cmd, bwatch); + return fetch_block_handle(cmd, bwatch->current_height + 1); + } + + /* Depth first: restart-marker watches (e.g. onchaind/ + * channel_close) start subdaemons before outpoint watches + * fire for the same block. */ + bwatch_check_blockdepth_watches(cmd, bwatch, block_height); + bwatch_process_block_txs(cmd, bwatch, block, block_height, + &blockhash, NULL); + } + + /* Update state */ + bwatch->current_height = block_height; + bwatch->current_blockhash = blockhash; + + /* Update in-memory history immediately */ + bwatch_add_block_to_history(bwatch, bwatch->current_height, &blockhash, + &block->hdr.prev_hash); + + struct block_record_wire br = { + bwatch->current_height, + bwatch->current_blockhash, + block->hdr.prev_hash, + }; + return bwatch_add_block_to_datastore(cmd, &br, + bwatch_send_block_processed); +} + +/* getchaininfo response: pick the next block to fetch (or just reschedule). */ +static struct command_result *getchaininfo_done(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + u32 blockheight; + const char *err; + + err = json_scan(tmpctx, buf, result, + "{blockcount:%}", + JSON_SCAN(json_to_number, &blockheight)); + if (err) { + plugin_log(cmd->plugin, LOG_BROKEN, + "getchaininfo parse failed: %s", err); + return poll_finished(cmd); + } + + if (blockheight > bwatch->current_height) { + u32 target_height; + + /* On first init we jump straight to the chain tip; afterwards + * we catch up one block at a time so handle_block can validate + * each parent hash (added in a later commit). */ + if (bwatch->current_height == 0) { + plugin_log(cmd->plugin, LOG_DBG, + "First poll: init at block %u", + blockheight); + target_height = blockheight; + } else { + target_height = bwatch->current_height + 1; + } + + return fetch_block_handle(cmd, target_height); + } + + plugin_log(cmd->plugin, LOG_DBG, + "No block change, current_height remains %u", + bwatch->current_height); + return poll_finished(cmd); +} + +/* Non-fatal: bcli may not have come up yet — log and retry on the next poll. */ +static struct command_result *getchaininfo_failed(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + void *unused UNUSED) +{ + plugin_log(cmd->plugin, LOG_DBG, + "getchaininfo failed (bcli not ready?): %.*s", + json_tok_full_len(result), json_tok_full(buf, result)); + return poll_finished(cmd); +} + +struct command_result *bwatch_poll_chain(struct command *cmd, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + struct out_req *req; + + req = jsonrpc_request_start(cmd, "getchaininfo", + getchaininfo_done, getchaininfo_failed, + NULL); + json_add_u32(req->js, "last_height", bwatch->current_height); + return send_outreq(req); +} + +/* + * ============================================================================ + * RESCAN + * + * When a watch is added with start_block <= current_height, replay the + * historical blocks for that one watch so it sees confirmations that + * happened before it was registered. Bounded by current_height so we + * never race the live polling loop. + * + * Async chain: fetch_block_rescan -> rescan_block_done -> next fetch. + * ============================================================================ + */ + +/* Fetch a single block by height during a rescan. */ +static struct command_result *fetch_block_rescan(struct command *cmd, + u32 height, + struct command_result *(*cb)(struct command *, + const char *, + const char *, + const jsmntok_t *, + struct rescan_state *), + struct rescan_state *rescan) +{ + struct out_req *req = jsonrpc_request_start(cmd, "getrawblockbyheight", + cb, cb, rescan); + json_add_u32(req->js, "height", height); + return send_outreq(req); +} + +/* Finish a rescan chain: RPC commands get a JSON result; aux/timer + * commands just terminate. */ +static struct command_result *rescan_complete(struct command *cmd) +{ + switch (cmd->type) { + case COMMAND_TYPE_NORMAL: + case COMMAND_TYPE_HOOK: + return command_success(cmd, json_out_obj(cmd, NULL, NULL)); + case COMMAND_TYPE_AUX: + return aux_command_done(cmd); + case COMMAND_TYPE_NOTIFICATION: + case COMMAND_TYPE_TIMER: + case COMMAND_TYPE_CHECK: + case COMMAND_TYPE_USAGE_ONLY: + break; + } + abort(); +} + +/* getrawblockbyheight callback for one block of a rescan: process the + * block, then either fetch the next or finish. */ +static struct command_result *rescan_block_done(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + struct rescan_state *rescan) +{ + struct bitcoin_blkid blockhash; + struct bitcoin_block *block = block_from_response(buf, result, &blockhash); + + if (!block) { + /* Chain may have rolled back past this height; stop quietly. */ + plugin_log(cmd->plugin, LOG_DBG, + "Rescan: block %u unavailable (chain rolled back?), stopping", + rescan->current_block); + return rescan_complete(cmd); + } + + /* rescan->watch is forwarded so the scanner only checks that one + * watch (or all watches when watch == NULL). */ + bwatch_process_block_txs(cmd, bwatch_of(cmd->plugin), block, + rescan->current_block, &blockhash, rescan->watch); + + /* Advance the cursor; if we still have blocks to scan, fetch the + * next one and chain back into rescan_block_done. */ + if (++rescan->current_block <= rescan->target_block) + return fetch_block_rescan(cmd, rescan->current_block, + rescan_block_done, rescan); + + plugin_log(cmd->plugin, LOG_INFORM, "Rescan complete"); + return rescan_complete(cmd); +} + +void bwatch_start_rescan(struct command *cmd, + const struct watch *w, + u32 start_block, + u32 target_block) +{ + struct rescan_state *rescan; + + if (w) { + plugin_log(cmd->plugin, LOG_INFORM, + "Starting rescan for %s watch: blocks %u-%u", + bwatch_get_watch_type_name(w->type), + start_block, target_block); + } else { + plugin_log(cmd->plugin, LOG_INFORM, + "Starting rescan for all watches: blocks %u-%u", + start_block, target_block); + } + + /* Owned by `cmd` so it lives across the async chain and gets + * freed automatically when the command completes. */ + rescan = tal(cmd, struct rescan_state); + rescan->watch = w; + rescan->current_block = start_block; + rescan->target_block = target_block; + + /* Fire the first getrawblockbyheight; each response runs + * rescan_block_done, which fetches the next block until we + * pass target_block. */ + fetch_block_rescan(cmd, rescan->current_block, + rescan_block_done, rescan); +} + +static const char *init(struct command *cmd, + const char *buf UNUSED, + const jsmntok_t *config UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + + bwatch->plugin = cmd->plugin; + + bwatch->scriptpubkey_watches = new_htable(bwatch, scriptpubkey_watches); + bwatch->outpoint_watches = new_htable(bwatch, outpoint_watches); + bwatch->scid_watches = new_htable(bwatch, scid_watches); + bwatch->blockdepth_watches = new_htable(bwatch, blockdepth_watches); + + bwatch->block_history = tal_arr(bwatch, struct block_record_wire, 0); + + /* Replay persisted block history. load_block_history sets + * current_height / current_blockhash from the most recent record; + * if there are no records, fall back to zero so the first poll + * initialises us at the chain tip. */ + bwatch_load_block_history(cmd, bwatch); + bwatch_load_watches_from_datastore(cmd, bwatch); + + /* Send chaininfo to watchman first; the ack/err callbacks then + * kick off the chain-poll loop. */ + global_timer(cmd->plugin, time_from_sec(0), + bwatch_send_chaininfo, NULL); + return NULL; +} + +static const struct plugin_command commands[] = { + { "addscriptpubkeywatch", json_bwatch_add_scriptpubkey }, + { "addoutpointwatch", json_bwatch_add_outpoint }, + { "addscidwatch", json_bwatch_add_scid }, + { "addblockdepthwatch", json_bwatch_add_blockdepth }, + { "delscriptpubkeywatch", json_bwatch_del_scriptpubkey }, + { "deloutpointwatch", json_bwatch_del_outpoint }, + { "delscidwatch", json_bwatch_del_scid }, + { "delblockdepthwatch", json_bwatch_del_blockdepth }, + { "listwatch", json_bwatch_list }, +}; + +int main(int argc, char *argv[]) +{ + struct bwatch *bwatch; + + setup_locale(); + bwatch = tal(NULL, struct bwatch); + bwatch->poll_interval_ms = 30000; + + plugin_main(argv, init, take(bwatch), PLUGIN_RESTARTABLE, true, NULL, + commands, ARRAY_SIZE(commands), + NULL, 0, + NULL, 0, + NULL, 0, + plugin_option("bwatch-poll-interval", "int", + "Milliseconds between chain polls (default: 30000)", + u32_option, u32_jsonfmt, &bwatch->poll_interval_ms), + NULL); +} diff --git a/plugins/bwatch/bwatch.h b/plugins/bwatch/bwatch.h new file mode 100644 index 000000000000..ab60286a1552 --- /dev/null +++ b/plugins/bwatch/bwatch.h @@ -0,0 +1,106 @@ +#ifndef LIGHTNING_PLUGINS_BWATCH_BWATCH_H +#define LIGHTNING_PLUGINS_BWATCH_BWATCH_H + +#include "config.h" +#include +#include +#include +#include +#include + +/* Forward declare hash table types (defined in bwatch_store.h) */ +struct scriptpubkey_watches; +struct outpoint_watches; +struct scid_watches; +struct blockdepth_watches; + +/* Timer handle returned by global_timer; defined in libplugin. */ +struct plugin_timer; + +/* Wire-format block record stored in lightningd's datastore. + * Defined by bwatch_wiregen.h; forward-declared here to avoid pulling + * the generated header into every consumer of bwatch.h. */ +struct block_record_wire; + +/* Watch type discriminator. */ +enum watch_type { + WATCH_SCRIPTPUBKEY, + WATCH_OUTPOINT, + WATCH_SCID, + WATCH_BLOCKDEPTH, +}; + +/* Scriptpubkey wrapper: tal-allocated bytes don't carry a length, so we + * keep them in a struct with an explicit length for hashing/equality. */ +struct scriptpubkey { + const u8 *script; + size_t len; +}; + +/* A single watch: one key plus the set of owner ids that registered it. */ +struct watch { + enum watch_type type; + u32 start_block; + wirestring **owners; + union { + struct scriptpubkey scriptpubkey; + struct bitcoin_outpoint outpoint; + struct short_channel_id scid; + } key; +}; + +/* Main bwatch state. + * + * The four watch hash tables are typed (see bwatch_store.h) so each + * lookup hits the right key shape (script bytes / outpoint / scid / + * confirm-height) without dispatching on type at every call site. */ +struct bwatch { + struct plugin *plugin; + u32 current_height; + struct bitcoin_blkid current_blockhash; + /* Oldest first, most recent last. Used to replay a reorg by + * peeling tips off until the parent hash matches the new chain. */ + struct block_record_wire *block_history; + + struct scriptpubkey_watches *scriptpubkey_watches; + struct outpoint_watches *outpoint_watches; + struct scid_watches *scid_watches; + struct blockdepth_watches *blockdepth_watches; + + /* Active poll timer; rescheduled at the end of every poll cycle. */ + struct plugin_timer *poll_timer; + u32 poll_interval_ms; +}; + +/* Helper: get last block_history (or NULL) */ +const struct block_record_wire *bwatch_last_block(const struct bwatch *bwatch); + +/* Helper: retrieve the bwatch state from a plugin handle. */ +struct bwatch *bwatch_of(struct plugin *plugin); + +/* Timer callback: kicks off one chain-poll cycle (getchaininfo → + * getrawblockbyheight → persist → reschedule). Exposed so other modules + * can schedule a poll from their own callbacks. */ +struct command_result *bwatch_poll_chain(struct command *cmd, void *unused); + +/* Pop the current tip from in-memory + persisted history. Exposed so the + * startup chaininfo path can roll back when bitcoind's chain is shorter + * than what we have stored. */ +void bwatch_remove_tip(struct command *cmd, struct bwatch *bwatch); + +/* Per-rescan cursor: which block we're on and how far to go. */ +struct rescan_state { + const struct watch *watch; /* NULL = rescan all watches, non-NULL = single watch */ + u32 current_block; /* Next block to fetch */ + u32 target_block; /* Stop after this block */ +}; + +/* Replay historical blocks for `w` (or all watches if w==NULL) from + * `start_block` up to `target_block` inclusive. Runs asynchronously: + * fetch -> process -> fetch the next block. */ +void bwatch_start_rescan(struct command *cmd, + const struct watch *w, + u32 start_block, + u32 target_block); + +#endif /* LIGHTNING_PLUGINS_BWATCH_BWATCH_H */ diff --git a/plugins/bwatch/bwatch_interface.c b/plugins/bwatch/bwatch_interface.c new file mode 100644 index 000000000000..c713c8494494 --- /dev/null +++ b/plugins/bwatch/bwatch_interface.c @@ -0,0 +1,615 @@ +#include "config.h" +#include +#include +#include +#include +#include +#include + +/* + * ============================================================================ + * SENDING WATCH_FOUND NOTIFICATIONS + * ============================================================================ + */ + +/* Callback for watch_found RPC. + * watch_found notifications are sent on an aux command so they cannot + * interfere with the poll command lifetime. */ +static struct command_result *notify_ack(struct command *cmd, + const char *method UNUSED, + const char *buf UNUSED, + const jsmntok_t *result UNUSED, + void *arg UNUSED) +{ + return aux_command_done(cmd); +} + +/* Send watch_found notification to lightningd. */ +void bwatch_send_watch_found(struct command *cmd, + const struct bitcoin_tx *tx, + u32 blockheight, + const struct watch *w, + u32 txindex, + u32 index) +{ + struct command *aux = aux_command(cmd); + struct out_req *req; + + req = jsonrpc_request_start(aux, "watch_found", + notify_ack, notify_ack, NULL); + /* tx==NULL signals "not found" for WATCH_SCID; omit tx+txindex so + * json_watch_found passes tx=NULL down to the handler. */ + if (tx) { + json_add_tx(req->js, "tx", tx); + json_add_u32(req->js, "txindex", txindex); + if (index != UINT32_MAX) + json_add_u32(req->js, "index", index); + } + json_add_u32(req->js, "blockheight", blockheight); + + /* Add owners array */ + json_array_start(req->js, "owners"); + for (size_t i = 0; i < tal_count(w->owners); i++) + json_add_string(req->js, NULL, w->owners[i]); + json_array_end(req->js); + + /* Tests (and operators) key off this line; keep wording stable. */ + plugin_log(cmd->plugin, LOG_DBG, + "watch_found at block %u", blockheight); + + send_outreq(req); +} + +/* Send a blockdepth depth notification to lightningd: same watch_found + * RPC shape but with depth + blockheight only (no tx). */ +void bwatch_send_blockdepth_found(struct command *cmd, + const struct watch *w, + u32 depth, + u32 blockheight) +{ + struct command *aux = aux_command(cmd); + struct out_req *req; + + req = jsonrpc_request_start(aux, "watch_found", + notify_ack, notify_ack, NULL); + json_add_u32(req->js, "blockheight", blockheight); + json_add_u32(req->js, "depth", depth); + + json_array_start(req->js, "owners"); + for (size_t i = 0; i < tal_count(w->owners); i++) + json_add_string(req->js, NULL, w->owners[i]); + json_array_end(req->js); + + plugin_log(cmd->plugin, LOG_DBG, + "watch_found at block %u (blockdepth depth=%u)", + blockheight, depth); + + send_outreq(req); +} + +/* Tell one owner that a previously-reported watch_found was rolled back. */ +void bwatch_send_watch_revert(struct command *cmd, + const char *owner, + u32 blockheight) +{ + struct command *aux = aux_command(cmd); + struct out_req *req; + + req = jsonrpc_request_start(aux, "watch_revert", + notify_ack, notify_ack, NULL); + json_add_string(req->js, "owner", owner); + json_add_u32(req->js, "blockheight", blockheight); + send_outreq(req); +} + +/* + * ============================================================================ + * SENDING BLOCK_PROCESSED NOTIFICATION + * + * After bwatch has persisted a new tip, it tells watchman by sending the + * block_processed RPC. The next poll is scheduled from the ack callback, + * which guarantees watchman's persisted height is updated before bwatch + * looks for another block — important for crash safety: on restart we + * trust watchman's height as the floor and re-fetch anything above it. + * ============================================================================ + */ + +/* Watchman acked block_processed: safe to poll for the next block. */ +static struct command_result *block_processed_ack(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + u32 acked_height; + const char *err; + + err = json_scan(tmpctx, buf, result, + "{blockheight:%}", + JSON_SCAN(json_to_number, &acked_height)); + if (err) + plugin_err(cmd->plugin, "block_processed ack '%.*s': %s", + json_tok_full_len(result), + json_tok_full(buf, result), err); + + plugin_log(cmd->plugin, LOG_DBG, + "Received block_processed ack for height %u", acked_height); + + bwatch->poll_timer = global_timer(cmd->plugin, time_from_sec(0), + bwatch_poll_chain, NULL); + return timer_complete(cmd); +} + +/* Non-fatal: watchman may not be ready yet (e.g. lightningd still booting). + * Reschedule the poll anyway so we keep retrying without busy-looping. */ +static struct command_result *block_processed_err(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + + plugin_log(cmd->plugin, LOG_DBG, + "block_processed RPC failed (watchman not ready?): %.*s", + json_tok_full_len(result), json_tok_full(buf, result)); + + bwatch->poll_timer = global_timer(cmd->plugin, time_from_sec(0), + bwatch_poll_chain, NULL); + return timer_complete(cmd); +} + +struct command_result *bwatch_send_block_processed(struct command *cmd) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + struct out_req *req; + + req = jsonrpc_request_start(cmd, "block_processed", + block_processed_ack, block_processed_err, + NULL); + json_add_u32(req->js, "blockheight", bwatch->current_height); + json_add_string(req->js, "blockhash", + fmt_bitcoin_blkid(tmpctx, &bwatch->current_blockhash)); + return send_outreq(req); +} + +/* + * ============================================================================ + * REVERT BLOCK NOTIFICATION + * ============================================================================ + */ + +/* Notify watchman that a block was rolled back so it can update and persist + * its tip. Fire-and-forget via aux_command — the poll timer doesn't depend + * on the ack. Crash safety: if we crash before the ack, watchman's stale + * height will be higher than bwatch's on restart, retriggering rollback. */ +void bwatch_send_revert_block_processed(struct command *cmd, u32 new_height, + const struct bitcoin_blkid *new_hash) +{ + struct command *aux = aux_command(cmd); + struct out_req *req; + + req = jsonrpc_request_start(aux, "revert_block_processed", + notify_ack, notify_ack, NULL); + json_add_u32(req->js, "blockheight", new_height); + json_add_string(req->js, "blockhash", + fmt_bitcoin_blkid(tmpctx, new_hash)); + send_outreq(req); +} + +/* + * ============================================================================ + * CHAININFO ON STARTUP + * + * On init bwatch first asks bcli for chain name / IBD state / current + * blockcount, optionally rolls its tip back if bitcoind is shorter than + * what we have on disk, and forwards the result to watchman via the + * `chaininfo` RPC. Whether watchman acks or errors, we then schedule + * the normal chain-poll loop. + * ============================================================================ + */ + +/* Watchman acked chaininfo: kick off normal polling. */ +static struct command_result *chaininfo_ack(struct command *cmd, + const char *method UNUSED, + const char *buf UNUSED, + const jsmntok_t *result UNUSED, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + bwatch->poll_timer = global_timer(cmd->plugin, time_from_sec(0), + bwatch_poll_chain, NULL); + return timer_complete(cmd); +} + +/* Non-fatal: watchman may not be ready yet; poll anyway. */ +static struct command_result *chaininfo_err(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + void *unused UNUSED) +{ + plugin_log(cmd->plugin, LOG_DBG, + "chaininfo RPC failed: %.*s", + json_tok_full_len(result), json_tok_full(buf, result)); + return chaininfo_ack(cmd, method, buf, result, unused); +} + +/* Got chain state from bcli: optionally roll back, then forward to watchman. */ +static struct command_result *chaininfo_getchaininfo_done(struct command *cmd, + const char *method UNUSED, + const char *buf, + const jsmntok_t *result, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + struct out_req *req; + const char *chain; + u32 headercount, blockcount; + bool ibd; + const char *err; + + err = json_scan(tmpctx, buf, result, + "{chain:%,headercount:%,blockcount:%,ibd:%}", + JSON_SCAN_TAL(tmpctx, json_strdup, &chain), + JSON_SCAN(json_to_number, &headercount), + JSON_SCAN(json_to_number, &blockcount), + JSON_SCAN(json_to_bool, &ibd)); + if (err) { + plugin_log(cmd->plugin, LOG_BROKEN, + "getchaininfo parse failed: %s", err); + return timer_complete(cmd); + } + + /* Startup-only rollback: if bitcoind's chain is shorter than our + * stored tip, peel off stale blocks now. During normal polling the + * shorter-chain case is handled by hash-mismatch reorg detection + * inside handle_block. */ + if (blockcount < bwatch->current_height) { + plugin_log(cmd->plugin, LOG_INFORM, + "Startup: chain at %u but bwatch at %u; rolling back", + blockcount, bwatch->current_height); + while (bwatch->current_height > blockcount + && bwatch_last_block(bwatch)) + bwatch_remove_tip(cmd, bwatch); + } + + req = jsonrpc_request_start(cmd, "chaininfo", + chaininfo_ack, chaininfo_err, NULL); + json_add_string(req->js, "chain", chain); + json_add_u32(req->js, "headercount", headercount); + json_add_u32(req->js, "blockcount", blockcount); + json_add_bool(req->js, "ibd", ibd); + return send_outreq(req); +} + +/* bcli unreachable: log and fall back to polling so we don't stall init. */ +static struct command_result *chaininfo_getchaininfo_failed(struct command *cmd, + const char *method UNUSED, + const char *buf UNUSED, + const jsmntok_t *result UNUSED, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + plugin_log(cmd->plugin, LOG_BROKEN, + "getchaininfo failed during chaininfo init"); + bwatch->poll_timer = global_timer(cmd->plugin, time_from_sec(0), + bwatch_poll_chain, NULL); + return timer_complete(cmd); +} + +struct command_result *bwatch_send_chaininfo(struct command *cmd, + void *unused UNUSED) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + struct out_req *req; + + req = jsonrpc_request_start(cmd, "getchaininfo", + chaininfo_getchaininfo_done, + chaininfo_getchaininfo_failed, + NULL); + json_add_u32(req->js, "last_height", bwatch->current_height); + return send_outreq(req); +} + +/* + * ============================================================================ + * RPC COMMAND HANDLERS + * + * Watch RPCs are thin wrappers over bwatch_add_watch / bwatch_del_watch. + * Adding a watch whose start_block is <= our current chain tip needs a + * historical rescan so it sees confirmations that happened before the + * watch was registered; add_watch_and_maybe_rescan handles that. + * ============================================================================ + */ + +/* If this watch's start_block is at or behind our tip, replay the + * historical range for just this watch; otherwise we can return + * success immediately. */ +static struct command_result *add_watch_and_maybe_rescan(struct command *cmd, + struct bwatch *bwatch, + struct watch *w, + u32 scan_start) +{ + if (w && bwatch->current_height > 0 + && scan_start <= bwatch->current_height) { + bwatch_start_rescan(cmd, w, scan_start, bwatch->current_height); + return command_still_pending(cmd); + } + return command_success(cmd, json_out_obj(cmd, NULL, NULL)); +} + +/* Register a scriptpubkey watch for `owner` from `start_block` onwards. */ +struct command_result *json_bwatch_add_scriptpubkey(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + u8 *scriptpubkey; + u32 *start_block; + struct watch *w; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("scriptpubkey", param_bin_from_hex, &scriptpubkey), + p_req("start_block", param_u32, &start_block), + NULL)) + return command_param_failed(); + + /* New owner is appended to the watch's owner list; same owner + * re-adding lowers start_block if needed. */ + w = bwatch_add_watch(cmd, bwatch, WATCH_SCRIPTPUBKEY, + NULL, scriptpubkey, NULL, NULL, + *start_block, owner); + return add_watch_and_maybe_rescan(cmd, bwatch, w, *start_block); +} + +/* Drop one owner from a scriptpubkey watch; the watch itself goes away + * once the last owner is removed. */ +struct command_result *json_bwatch_del_scriptpubkey(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + u8 *scriptpubkey; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("scriptpubkey", param_bin_from_hex, &scriptpubkey), + NULL)) + return command_param_failed(); + + bwatch_del_watch(cmd, bwatch, WATCH_SCRIPTPUBKEY, + NULL, scriptpubkey, NULL, NULL, owner); + return command_success(cmd, json_out_obj(cmd, "removed", "true")); +} + +/* Register an outpoint (txid + outnum) watch for `owner` from + * `start_block` onwards. */ +struct command_result *json_bwatch_add_outpoint(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + struct bitcoin_outpoint *outpoint; + u32 *start_block; + struct watch *w; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("outpoint", param_outpoint, &outpoint), + p_req("start_block", param_u32, &start_block), + NULL)) + return command_param_failed(); + + /* New owner is appended to the watch's owner list; same owner + * re-adding lowers start_block if needed. */ + w = bwatch_add_watch(cmd, bwatch, WATCH_OUTPOINT, + outpoint, NULL, NULL, NULL, + *start_block, owner); + return add_watch_and_maybe_rescan(cmd, bwatch, w, *start_block); +} + +/* Drop one owner from an outpoint watch; the watch itself goes away + * once the last owner is removed. */ +struct command_result *json_bwatch_del_outpoint(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + struct bitcoin_outpoint *outpoint; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("outpoint", param_outpoint, &outpoint), + NULL)) + return command_param_failed(); + + bwatch_del_watch(cmd, bwatch, WATCH_OUTPOINT, + outpoint, NULL, NULL, NULL, owner); + return command_success(cmd, json_out_obj(cmd, "removed", "true")); +} + +/* Register a short_channel_id watch for `owner` from `start_block` + * onwards. The scid pins the watch to one specific (block, txindex, + * outnum). */ +struct command_result *json_bwatch_add_scid(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + struct short_channel_id *scid; + u32 *start_block; + struct watch *w; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("scid", param_short_channel_id, &scid), + p_req("start_block", param_u32, &start_block), + NULL)) + return command_param_failed(); + + /* New owner is appended to the watch's owner list; same owner + * re-adding lowers start_block if needed. */ + w = bwatch_add_watch(cmd, bwatch, WATCH_SCID, + NULL, NULL, scid, NULL, + *start_block, owner); + return add_watch_and_maybe_rescan(cmd, bwatch, w, *start_block); +} + +/* Drop one owner from a scid watch; the watch itself goes away once + * the last owner is removed. */ +struct command_result *json_bwatch_del_scid(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + struct short_channel_id *scid; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("scid", param_short_channel_id, &scid), + NULL)) + return command_param_failed(); + + bwatch_del_watch(cmd, bwatch, WATCH_SCID, + NULL, NULL, scid, NULL, owner); + return command_success(cmd, json_out_obj(cmd, "removed", "true")); +} + +/* Register a blockdepth watch for `owner` anchored at `start_block`. + * Each new block fires a watch_found with depth = tip - start_block + 1. */ +struct command_result *json_bwatch_add_blockdepth(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + u32 *start_block; + struct watch *w; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("start_block", param_u32, &start_block), + NULL)) + return command_param_failed(); + + /* start_block doubles as the watch key (confirm_height) and + * the anchor for depth = tip - start_block + 1. */ + w = bwatch_add_watch(cmd, bwatch, WATCH_BLOCKDEPTH, + NULL, NULL, NULL, start_block, + *start_block, owner); + return add_watch_and_maybe_rescan(cmd, bwatch, w, *start_block); +} + +/* Drop one owner from a blockdepth watch; the watch itself goes away + * once the last owner is removed. */ +struct command_result *json_bwatch_del_blockdepth(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + const char *owner; + u32 *start_block; + + if (!param(cmd, buffer, params, + p_req("owner", param_string, &owner), + p_req("start_block", param_u32, &start_block), + NULL)) + return command_param_failed(); + + bwatch_del_watch(cmd, bwatch, WATCH_BLOCKDEPTH, + NULL, NULL, NULL, start_block, owner); + return command_success(cmd, json_out_obj(cmd, "removed", "true")); +} + +/* Emit type / start_block / owners for one watch. */ +static void json_out_watch_common(struct json_out *jout, + enum watch_type type, + u32 start_block, + wirestring **owners) +{ + json_out_addstr(jout, "type", bwatch_get_watch_type_name(type)); + json_out_add(jout, "start_block", false, "%u", start_block); + json_out_start(jout, "owners", '['); + for (size_t i = 0; i < tal_count(owners); i++) + json_out_addstr(jout, NULL, owners[i]); + json_out_end(jout, ']'); +} + +/* Dump every active watch as a flat array; per-type fields go first + * so the consumer can dispatch on shape. */ +struct command_result *json_bwatch_list(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + struct bwatch *bwatch = bwatch_of(cmd->plugin); + struct json_out *jout; + struct watch *w; + struct scriptpubkey_watches_iter sit; + struct outpoint_watches_iter oit; + struct scid_watches_iter scit; + struct blockdepth_watches_iter bdit; + + if (!param(cmd, buffer, params, NULL)) + return command_param_failed(); + + jout = json_out_new(cmd); + json_out_start(jout, NULL, '{'); + json_out_start(jout, "watches", '['); + + for (w = scriptpubkey_watches_first(bwatch->scriptpubkey_watches, &sit); + w; + w = scriptpubkey_watches_next(bwatch->scriptpubkey_watches, &sit)) { + json_out_start(jout, NULL, '{'); + json_out_addstr(jout, "scriptpubkey", + tal_hexstr(tmpctx, w->key.scriptpubkey.script, + w->key.scriptpubkey.len)); + json_out_watch_common(jout, w->type, w->start_block, w->owners); + json_out_end(jout, '}'); + } + + for (w = outpoint_watches_first(bwatch->outpoint_watches, &oit); + w; + w = outpoint_watches_next(bwatch->outpoint_watches, &oit)) { + json_out_start(jout, NULL, '{'); + json_out_addstr(jout, "outpoint", + fmt_bitcoin_outpoint(tmpctx, &w->key.outpoint)); + json_out_watch_common(jout, w->type, w->start_block, w->owners); + json_out_end(jout, '}'); + } + + for (w = scid_watches_first(bwatch->scid_watches, &scit); + w; + w = scid_watches_next(bwatch->scid_watches, &scit)) { + json_out_start(jout, NULL, '{'); + json_out_add(jout, "blockheight", false, "%u", + short_channel_id_blocknum(w->key.scid)); + json_out_add(jout, "txindex", false, "%u", + short_channel_id_txnum(w->key.scid)); + json_out_add(jout, "outnum", false, "%u", + short_channel_id_outnum(w->key.scid)); + json_out_watch_common(jout, w->type, w->start_block, w->owners); + json_out_end(jout, '}'); + } + + for (w = blockdepth_watches_first(bwatch->blockdepth_watches, &bdit); + w; + w = blockdepth_watches_next(bwatch->blockdepth_watches, &bdit)) { + json_out_start(jout, NULL, '{'); + json_out_add(jout, "blockdepth", false, "%u", w->start_block); + json_out_watch_common(jout, w->type, w->start_block, w->owners); + json_out_end(jout, '}'); + } + + json_out_end(jout, ']'); + json_out_end(jout, '}'); + return command_success(cmd, jout); +} diff --git a/plugins/bwatch/bwatch_interface.h b/plugins/bwatch/bwatch_interface.h new file mode 100644 index 000000000000..1e9543588bd1 --- /dev/null +++ b/plugins/bwatch/bwatch_interface.h @@ -0,0 +1,83 @@ +#ifndef LIGHTNING_PLUGINS_BWATCH_BWATCH_INTERFACE_H +#define LIGHTNING_PLUGINS_BWATCH_BWATCH_INTERFACE_H + +#include "config.h" +#include + +/* Outward-facing interface from bwatch to lightningd. */ + +/* Send watch_found notification to lightningd */ +void bwatch_send_watch_found(struct command *cmd, + const struct bitcoin_tx *tx, + u32 blockheight, + const struct watch *w, + u32 txindex, + u32 index); + +/* Send blockdepth depth notification to lightningd (no tx, just depth + height) */ +void bwatch_send_blockdepth_found(struct command *cmd, + const struct watch *w, + u32 depth, + u32 blockheight); + +void bwatch_send_watch_revert(struct command *cmd, + const char *owner, + u32 blockheight); + +/* Send chain name / IBD status / sync info to watchman on startup. + * Used as a timer callback from init; the ack/err handlers kick the + * normal chain-poll loop afterwards. */ +struct command_result *bwatch_send_chaininfo(struct command *cmd, void *unused); + +/* RPC handlers: add / remove a scriptpubkey watch. */ +struct command_result *json_bwatch_add_scriptpubkey(struct command *cmd, + const char *buffer, + const jsmntok_t *params); +struct command_result *json_bwatch_del_scriptpubkey(struct command *cmd, + const char *buffer, + const jsmntok_t *params); + +/* RPC handlers: add / remove an outpoint watch. */ +struct command_result *json_bwatch_add_outpoint(struct command *cmd, + const char *buffer, + const jsmntok_t *params); +struct command_result *json_bwatch_del_outpoint(struct command *cmd, + const char *buffer, + const jsmntok_t *params); + +/* RPC handlers: add / remove a scid watch. */ +struct command_result *json_bwatch_add_scid(struct command *cmd, + const char *buffer, + const jsmntok_t *params); +struct command_result *json_bwatch_del_scid(struct command *cmd, + const char *buffer, + const jsmntok_t *params); + +/* RPC handlers: add / remove a blockdepth watch. */ +struct command_result *json_bwatch_add_blockdepth(struct command *cmd, + const char *buffer, + const jsmntok_t *params); +struct command_result *json_bwatch_del_blockdepth(struct command *cmd, + const char *buffer, + const jsmntok_t *params); + +/* RPC handler: dump every active watch. */ +struct command_result *json_bwatch_list(struct command *cmd, + const char *buffer, + const jsmntok_t *params); + +/* Send a block_processed RPC to watchman after a new block has been + * persisted. The next poll is started from the ack callback so we don't + * race ahead of watchman's view of the chain. Chains on the same poll + * command so timer_complete fires once watchman has acknowledged. */ +struct command_result *bwatch_send_block_processed(struct command *cmd); + +/* Notify watchman that the tip has been rolled back during a reorg, so + * watchman can update and persist its own height. Fire-and-forget via + * an aux_command — the poll timer doesn't depend on this ack. Crash + * safety: if we crash before the ack lands, watchman's stale height will + * be higher than bwatch's on restart, which retriggers the rollback. */ +void bwatch_send_revert_block_processed(struct command *cmd, u32 new_height, + const struct bitcoin_blkid *new_hash); + +#endif /* LIGHTNING_PLUGINS_BWATCH_BWATCH_INTERFACE_H */ diff --git a/plugins/bwatch/bwatch_scanner.c b/plugins/bwatch/bwatch_scanner.c new file mode 100644 index 000000000000..46619dae09f3 --- /dev/null +++ b/plugins/bwatch/bwatch_scanner.c @@ -0,0 +1,266 @@ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include + +/* + * ============================================================================ + * TRANSACTION WATCH CHECKING + * ============================================================================ + */ + +/* Check all scriptpubkey watches via hash lookup */ +static void check_scriptpubkey_watches(struct command *cmd, + struct bwatch *bwatch, + const struct bitcoin_tx *tx, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + u32 txindex) +{ + struct bitcoin_txid txid; + + bitcoin_txid(tx, &txid); + + for (size_t i = 0; i < tx->wtx->num_outputs; i++) { + struct watch *w; + struct scriptpubkey k = { + .script = tx->wtx->outputs[i].script, + .len = tx->wtx->outputs[i].script_len + }; + + w = scriptpubkey_watches_get(bwatch->scriptpubkey_watches, &k); + if (!w) + continue; + if (w->start_block != UINT32_MAX + && blockheight < w->start_block) { + plugin_log(cmd->plugin, LOG_BROKEN, + "Watch for script %s on height >= %u found on block %u???", + tal_hexstr(tmpctx, k.script, k.len), + w->start_block, blockheight); + continue; + } + bwatch_send_watch_found(cmd, tx, blockheight, w, txindex, i); + } +} + +/* Check all outpoint watches via hash lookup */ +static void check_outpoint_watches(struct command *cmd, + struct bwatch *bwatch, + const struct bitcoin_tx *tx, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + u32 txindex) +{ + for (size_t i = 0; i < tx->wtx->num_inputs; i++) { + struct watch *w; + struct bitcoin_outpoint outpoint; + + bitcoin_tx_input_get_txid(tx, i, &outpoint.txid); + outpoint.n = tx->wtx->inputs[i].index; + + w = outpoint_watches_get(bwatch->outpoint_watches, &outpoint); + if (!w) + continue; + if (w->start_block != UINT32_MAX + && blockheight < w->start_block) { + plugin_log(cmd->plugin, LOG_BROKEN, + "Watch for outpoint %s on height >= %u found on block %u???", + fmt_bitcoin_outpoint(tmpctx, &outpoint), + w->start_block, blockheight); + continue; + } + bwatch_send_watch_found(cmd, tx, blockheight, w, txindex, i); + } +} + +/* Check a tx against all watches (during normal block processing). + * UTXO spend tracking is handled by lightningd via outpoint watches + * (wallet/utxo/ fires wallet_utxo_spent_watch_found). */ +static void check_tx_against_all_watches(struct command *cmd, + struct bwatch *bwatch, + const struct bitcoin_tx *tx, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + u32 txindex) +{ + check_scriptpubkey_watches(cmd, bwatch, tx, blockheight, blockhash, txindex); + check_outpoint_watches(cmd, bwatch, tx, blockheight, blockhash, txindex); +} + +/* Check tx outputs against a single scriptpubkey watch (rescan path). */ +static void check_tx_scriptpubkey(struct command *cmd, + const struct bitcoin_tx *tx, + const struct watch *w, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + u32 txindex) +{ + for (size_t i = 0; i < tx->wtx->num_outputs; i++) { + if (memeq(tx->wtx->outputs[i].script, + tx->wtx->outputs[i].script_len, + w->key.scriptpubkey.script, + w->key.scriptpubkey.len)) { + bwatch_send_watch_found(cmd, tx, blockheight, w, + txindex, i); + /* Same scriptpubkey may appear in multiple outputs. */ + } + } +} + +/* Check tx inputs against a single outpoint watch (rescan path). */ +static void check_tx_outpoint(struct command *cmd, + const struct bitcoin_tx *tx, + const struct watch *w, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + u32 txindex) +{ + for (size_t i = 0; i < tx->wtx->num_inputs; i++) { + struct bitcoin_outpoint outpoint; + + bitcoin_tx_input_get_txid(tx, i, &outpoint.txid); + outpoint.n = tx->wtx->inputs[i].index; + + if (bitcoin_outpoint_eq(&outpoint, &w->key.outpoint)) { + bwatch_send_watch_found(cmd, tx, blockheight, w, + txindex, i); + return; /* an outpoint can only be spent once */ + } + } +} + +/* Dispatch a single watch against one tx (rescan path). */ +static void check_tx_for_single_watch(struct command *cmd, + const struct watch *w, + const struct bitcoin_tx *tx, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + u32 txindex) +{ + switch (w->type) { + case WATCH_SCRIPTPUBKEY: + check_tx_scriptpubkey(cmd, tx, w, blockheight, blockhash, txindex); + break; + case WATCH_OUTPOINT: + check_tx_outpoint(cmd, tx, w, blockheight, blockhash, txindex); + break; + case WATCH_SCID: + /* scid watches don't scan transactions: txindex is encoded in + * the scid key, so bwatch_check_scid_watches handles them + * directly at the block level. */ + break; + case WATCH_BLOCKDEPTH: + /* blockdepth watches fire per block; no per-tx work. */ + break; + } +} + +/* Fire watch_found for a scid watch anchored to this block. */ +static void maybe_fire_scid_watch(struct command *cmd, + const struct bitcoin_block *block, + u32 blockheight, + const struct watch *w) +{ + struct bitcoin_tx *tx; + u32 scid_blockheight, txindex, outnum; + + assert(w->type == WATCH_SCID); + + /* The scid pins the watch to one specific block. */ + scid_blockheight = short_channel_id_blocknum(w->key.scid); + if (scid_blockheight != blockheight) + return; + + txindex = short_channel_id_txnum(w->key.scid); + outnum = short_channel_id_outnum(w->key.scid); + + /* Out-of-range (txindex or outnum) means the scid doesn't match + * anything on this chain; fire watch_found with tx=NULL so + * lightningd cleans the watch up. */ + if (txindex >= tal_count(block->tx)) { + plugin_log(cmd->plugin, LOG_BROKEN, + "scid watch blockheight=%u txindex=%u outnum=%u: txindex out of range (block has %zu txs)", + blockheight, txindex, outnum, tal_count(block->tx)); + bwatch_send_watch_found(cmd, NULL, blockheight, w, txindex, outnum); + return; + } + tx = block->tx[txindex]; + if (outnum >= tx->wtx->num_outputs) { + plugin_log(cmd->plugin, LOG_BROKEN, + "scid watch blockheight=%u txindex=%u outnum=%u: outnum out of range (tx has %zu outputs)", + blockheight, txindex, outnum, tx->wtx->num_outputs); + bwatch_send_watch_found(cmd, NULL, blockheight, w, txindex, outnum); + return; + } + + /* Found it: tell lightningd the scid output is confirmed. */ + bwatch_send_watch_found(cmd, tx, blockheight, w, txindex, outnum); +} + +void bwatch_check_scid_watches(struct command *cmd, + struct bwatch *bwatch, + const struct bitcoin_block *block, + u32 blockheight, + const struct watch *w) +{ + if (w) { + if (w->type == WATCH_SCID) + maybe_fire_scid_watch(cmd, block, blockheight, w); + return; + } + + struct scid_watches_iter it; + struct watch *scid_w; + + for (scid_w = scid_watches_first(bwatch->scid_watches, &it); + scid_w; + scid_w = scid_watches_next(bwatch->scid_watches, &it)) { + maybe_fire_scid_watch(cmd, block, blockheight, scid_w); + } +} + +void bwatch_process_block_txs(struct command *cmd, + struct bwatch *bwatch, + const struct bitcoin_block *block, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + const struct watch *w) +{ + for (size_t i = 0; i < tal_count(block->tx); i++) { + if (w) + check_tx_for_single_watch(cmd, w, block->tx[i], + blockheight, blockhash, i); + else + check_tx_against_all_watches(cmd, bwatch, block->tx[i], + blockheight, blockhash, i); + } + + bwatch_check_scid_watches(cmd, bwatch, block, blockheight, w); +} + +/* Fire depth notifications for every active blockdepth watch. + * A watch with start_block > new_height is stale: its confirming block + * was reorged away, watch_revert has been sent, but the del hasn't + * arrived yet — skip it until deletion clears it from the table. */ +void bwatch_check_blockdepth_watches(struct command *cmd, + struct bwatch *bwatch, + u32 new_height) +{ + struct blockdepth_watches_iter it; + struct watch *w; + + /* We only have one per channel or so in practice, so don't optimize */ + for (w = blockdepth_watches_first(bwatch->blockdepth_watches, &it); + w; + w = blockdepth_watches_next(bwatch->blockdepth_watches, &it)) { + if (w->start_block > new_height) + continue; /* stale — awaiting deletion */ + + u32 depth = new_height - w->start_block + 1; + bwatch_send_blockdepth_found(cmd, w, depth, new_height); + } +} diff --git a/plugins/bwatch/bwatch_scanner.h b/plugins/bwatch/bwatch_scanner.h new file mode 100644 index 000000000000..a8a81f6f9543 --- /dev/null +++ b/plugins/bwatch/bwatch_scanner.h @@ -0,0 +1,33 @@ +#ifndef LIGHTNING_PLUGINS_BWATCH_BWATCH_SCANNER_H +#define LIGHTNING_PLUGINS_BWATCH_BWATCH_SCANNER_H + +#include "config.h" +#include + +/* Scan a block against scriptpubkey and outpoint watches, firing + * watch_found for each match. If `w` is NULL all active watches are + * checked (normal polling); if non-NULL only that watch is checked + * (single-watch rescan). */ +void bwatch_process_block_txs(struct command *cmd, + struct bwatch *bwatch, + const struct bitcoin_block *block, + u32 blockheight, + const struct bitcoin_blkid *blockhash, + const struct watch *w); + +/* Fire watch_found for scid watches anchored to this block. + * w==NULL walks every scid watch (normal polling); w non-NULL + * fires only that watch (single-watch rescan). */ +void bwatch_check_scid_watches(struct command *cmd, + struct bwatch *bwatch, + const struct bitcoin_block *block, + u32 blockheight, + const struct watch *w); + +/* Fire depth notifications for every active blockdepth watch at + * new_height. Called once per new block on the happy path. */ +void bwatch_check_blockdepth_watches(struct command *cmd, + struct bwatch *bwatch, + u32 new_height); + +#endif /* LIGHTNING_PLUGINS_BWATCH_BWATCH_SCANNER_H */ diff --git a/plugins/bwatch/bwatch_store.c b/plugins/bwatch/bwatch_store.c new file mode 100644 index 000000000000..27d7687f6f30 --- /dev/null +++ b/plugins/bwatch/bwatch_store.c @@ -0,0 +1,602 @@ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +const struct scriptpubkey *scriptpubkey_watch_keyof(const struct watch *w) +{ + assert(w->type == WATCH_SCRIPTPUBKEY); + return &w->key.scriptpubkey; +} + +size_t scriptpubkey_hash(const struct scriptpubkey *scriptpubkey) +{ + return siphash24(siphash_seed(), scriptpubkey->script, scriptpubkey->len); +} + +bool scriptpubkey_watch_eq(const struct watch *w, const struct scriptpubkey *scriptpubkey) +{ + return w->key.scriptpubkey.len == scriptpubkey->len && + memeq(w->key.scriptpubkey.script, scriptpubkey->len, + scriptpubkey->script, scriptpubkey->len); +} + +const struct bitcoin_outpoint *outpoint_watch_keyof(const struct watch *w) +{ + assert(w->type == WATCH_OUTPOINT); + return &w->key.outpoint; +} + +size_t outpoint_hash(const struct bitcoin_outpoint *outpoint) +{ + size_t h1 = siphash24(siphash_seed(), &outpoint->txid, sizeof(outpoint->txid)); + size_t h2 = siphash24(siphash_seed(), &outpoint->n, sizeof(outpoint->n)); + return h1 ^ h2; +} + +bool outpoint_watch_eq(const struct watch *w, const struct bitcoin_outpoint *outpoint) +{ + return bitcoin_outpoint_eq(&w->key.outpoint, outpoint); +} + +const struct short_channel_id *scid_watch_keyof(const struct watch *w) +{ + assert(w->type == WATCH_SCID); + return &w->key.scid; +} + +size_t scid_hash(const struct short_channel_id *scid) +{ + return siphash24(siphash_seed(), scid, sizeof(*scid)); +} + +bool scid_watch_eq(const struct watch *w, const struct short_channel_id *scid) +{ + return short_channel_id_eq(w->key.scid, *scid); +} + +const u32 *blockdepth_watch_keyof(const struct watch *w) +{ + assert(w->type == WATCH_BLOCKDEPTH); + return &w->start_block; +} + +size_t u32_hash(const u32 *height) +{ + return siphash24(siphash_seed(), height, sizeof(*height)); +} + +bool blockdepth_watch_eq(const struct watch *w, const u32 *height) +{ + return w->start_block == *height; +} + +const char *bwatch_get_watch_type_name(enum watch_type type) +{ + switch (type) { + case WATCH_SCRIPTPUBKEY: + return "scriptpubkey"; + case WATCH_OUTPOINT: + return "outpoint"; + case WATCH_SCID: + return "scid"; + case WATCH_BLOCKDEPTH: + return "blockdepth"; + } + abort(); +} + +void bwatch_add_watch_to_hash(struct bwatch *bwatch, struct watch *w) +{ + switch (w->type) { + case WATCH_SCRIPTPUBKEY: + scriptpubkey_watches_add(bwatch->scriptpubkey_watches, w); + return; + case WATCH_OUTPOINT: + outpoint_watches_add(bwatch->outpoint_watches, w); + return; + case WATCH_SCID: + scid_watches_add(bwatch->scid_watches, w); + return; + case WATCH_BLOCKDEPTH: + blockdepth_watches_add(bwatch->blockdepth_watches, w); + return; + } + abort(); +} + +struct watch *bwatch_get_watch(struct bwatch *bwatch, + enum watch_type type, + const struct bitcoin_outpoint *outpoint, + const u8 *scriptpubkey, + const struct short_channel_id *scid, + const u32 *confirm_height) +{ + switch (type) { + case WATCH_SCRIPTPUBKEY: { + struct scriptpubkey k = { + .script = scriptpubkey, + .len = tal_bytelen(scriptpubkey), + }; + return scriptpubkey_watches_get(bwatch->scriptpubkey_watches, &k); + } + case WATCH_OUTPOINT: + return outpoint_watches_get(bwatch->outpoint_watches, outpoint); + case WATCH_SCID: + return scid_watches_get(bwatch->scid_watches, scid); + case WATCH_BLOCKDEPTH: + return blockdepth_watches_get(bwatch->blockdepth_watches, confirm_height); + } + abort(); +} + +void bwatch_remove_watch_from_hash(struct bwatch *bwatch, struct watch *w) +{ + switch (w->type) { + case WATCH_SCRIPTPUBKEY: + scriptpubkey_watches_del(bwatch->scriptpubkey_watches, w); + return; + case WATCH_OUTPOINT: + outpoint_watches_del(bwatch->outpoint_watches, w); + return; + case WATCH_SCID: + scid_watches_del(bwatch->scid_watches, w); + return; + case WATCH_BLOCKDEPTH: + blockdepth_watches_del(bwatch->blockdepth_watches, w); + return; + } + abort(); +} + +/* List all datastore entries under a key prefix (up to 2 components). */ +static const jsmntok_t *bwatch_list_datastore(const tal_t *ctx, + struct command *cmd, + const char *key1, const char *key2, + const char **buf_out) +{ + struct json_out *params = json_out_new(tmpctx); + const jsmntok_t *result; + + json_out_start(params, NULL, '{'); + json_out_start(params, "key", '['); + json_out_addstr(params, NULL, key1); + if (key2) + json_out_addstr(params, NULL, key2); + json_out_end(params, ']'); + json_out_end(params, '}'); + + result = jsonrpc_request_sync(ctx, cmd, "listdatastore", params, buf_out); + return json_get_member(*buf_out, result, "datastore"); +} + +/* Datastore write completed (success or expected failure such as duplicate). + * Either way, invoke the caller's continuation to keep the poll chain alive. */ +static struct command_result *block_store_done(struct command *cmd, + const char *method UNNEEDED, + const char *buf UNNEEDED, + const jsmntok_t *result UNNEEDED, + struct command_result *(*done)(struct command *)) +{ + return done(cmd); +} + +struct command_result *bwatch_add_block_to_datastore( + struct command *cmd, + const struct block_record_wire *br, + struct command_result *(*done)(struct command *cmd)) +{ + /* Zero-pad to 10 digits so listdatastore returns blocks in height + * order ("0000000100" < "0000000101"). */ + const char **key = mkdatastorekey(tmpctx, "bwatch", "block_history", + take(tal_fmt(NULL, "%010u", br->height))); + const u8 *data = towire_bwatch_block(tmpctx, br); + + plugin_log(cmd->plugin, LOG_DBG, "Added block %u to datastore", br->height); + + /* Chain `done` as both success and failure continuation so the poll + * cmd is held alive until the write is acknowledged. Write failure + * (e.g. duplicate on restart) is non-fatal — the poll must continue. */ + return jsonrpc_set_datastore_binary(cmd, key, + data, tal_bytelen(data), + "must-create", + block_store_done, block_store_done, + done); +} + +void bwatch_add_block_to_history(struct bwatch *bwatch, u32 height, + const struct bitcoin_blkid *hash, + const struct bitcoin_blkid *prev_hash) +{ + struct block_record_wire br; + + br.height = height; + br.hash = *hash; + br.prev_hash = *prev_hash; + tal_arr_expand(&bwatch->block_history, br); + + plugin_log(bwatch->plugin, LOG_DBG, + "Added block %u to history (now %zu blocks)", + height, tal_count(bwatch->block_history)); +} + +void bwatch_delete_block_from_datastore(struct command *cmd, u32 height) +{ + struct json_out *params = json_out_new(tmpctx); + const char *buf; + + json_out_start(params, NULL, '{'); + json_out_start(params, "key", '['); + json_out_addstr(params, NULL, "bwatch"); + json_out_addstr(params, NULL, "block_history"); + json_out_addstr(params, NULL, tal_fmt(tmpctx, "%010u", height)); + json_out_end(params, ']'); + json_out_end(params, '}'); + + jsonrpc_request_sync(tmpctx, cmd, "deldatastore", params, &buf); + + plugin_log(cmd->plugin, LOG_DBG, "Deleted block %u from datastore", height); +} + +const struct block_record_wire *bwatch_last_block(const struct bwatch *bwatch) +{ + if (tal_count(bwatch->block_history) == 0) + return NULL; + + return &bwatch->block_history[tal_count(bwatch->block_history) - 1]; +} + +void bwatch_load_block_history(struct command *cmd, struct bwatch *bwatch) +{ + const char *buf; + const jsmntok_t *datastore, *t; + size_t i; + const struct block_record_wire *most_recent; + + datastore = bwatch_list_datastore(tmpctx, cmd, "bwatch", "block_history", &buf); + + json_for_each_arr(i, t, datastore) { + const u8 *data = json_tok_bin_from_hex(tmpctx, buf, + json_get_member(buf, t, "hex")); + struct block_record_wire br; + + if (!data) + plugin_err(cmd->plugin, + "Bad block_history hex %.*s", + json_tok_full_len(t), + json_tok_full(buf, t)); + + if (!fromwire_bwatch_block(data, &br)) { + plugin_err(cmd->plugin, + "Bad block_history %.*s", + json_tok_full_len(t), + json_tok_full(buf, t)); + } + tal_arr_expand(&bwatch->block_history, br); + } + + most_recent = bwatch_last_block(bwatch); + if (most_recent) { + bwatch->current_height = most_recent->height; + bwatch->current_blockhash = most_recent->hash; + plugin_log(cmd->plugin, LOG_DBG, + "Restored %zu blocks from datastore, current height=%u", + tal_count(bwatch->block_history), + bwatch->current_height); + } else { + bwatch->current_height = 0; + memset(&bwatch->current_blockhash, 0, + sizeof(bwatch->current_blockhash)); + } +} + +static char *fmt_scriptpubkey(const tal_t *ctx, + const struct scriptpubkey *scriptpubkey) +{ + return tal_hexstr(ctx, scriptpubkey->script, scriptpubkey->len); +} + +/* Build the datastore key path for a watch. All watch types share the + * ["bwatch", , ] layout; only the key payload + * varies. */ +static const char **get_watch_datastore_key(const tal_t *ctx, const struct watch *w) +{ + const char *type_name = bwatch_get_watch_type_name(w->type); + + switch (w->type) { + case WATCH_SCRIPTPUBKEY: { + return mkdatastorekey(ctx, "bwatch", type_name, + take(fmt_scriptpubkey(NULL, &w->key.scriptpubkey))); + } + case WATCH_OUTPOINT: + return mkdatastorekey(ctx, "bwatch", type_name, + take(fmt_bitcoin_outpoint(NULL, &w->key.outpoint))); + case WATCH_SCID: + return mkdatastorekey(ctx, "bwatch", type_name, + take(fmt_short_channel_id(NULL, w->key.scid))); + case WATCH_BLOCKDEPTH: + return mkdatastorekey(ctx, "bwatch", type_name, + take(tal_fmt(NULL, "%u", w->start_block))); + } + abort(); +} + +static struct watch_wire *watch_to_wire(const tal_t *ctx, const struct watch *w) +{ + struct watch_wire *wire = tal(ctx, struct watch_wire); + size_t num_owners; + + wire->type = w->type; + wire->start_block = w->start_block; + + wire->scriptpubkey = NULL; + memset(&wire->outpoint, 0, sizeof(wire->outpoint)); + wire->scid_blockheight = wire->scid_txindex = wire->scid_outnum = 0; + wire->blockdepth = 0; + + switch (w->type) { + case WATCH_SCRIPTPUBKEY: + wire->scriptpubkey = tal_dup_arr(wire, u8, + w->key.scriptpubkey.script, + w->key.scriptpubkey.len, 0); + break; + case WATCH_OUTPOINT: + wire->outpoint = w->key.outpoint; + break; + case WATCH_SCID: + wire->scid_blockheight = short_channel_id_blocknum(w->key.scid); + wire->scid_txindex = short_channel_id_txnum(w->key.scid); + wire->scid_outnum = short_channel_id_outnum(w->key.scid); + break; + case WATCH_BLOCKDEPTH: + wire->blockdepth = w->start_block; + break; + } + + num_owners = tal_count(w->owners); + wire->owners = tal_arr(wire, wirestring *, num_owners); + for (size_t i = 0; i < num_owners; i++) + wire->owners[i] = tal_strdup(wire->owners, w->owners[i]); + + return wire; +} + +static struct watch *watch_from_wire(const tal_t *ctx, const struct watch_wire *wire) +{ + struct watch *w = tal(ctx, struct watch); + size_t num_owners; + + w->type = wire->type; + w->start_block = wire->start_block; + + switch (wire->type) { + case WATCH_SCRIPTPUBKEY: + w->key.scriptpubkey.len = tal_bytelen(wire->scriptpubkey); + w->key.scriptpubkey.script = tal_dup_arr(w, u8, wire->scriptpubkey, + w->key.scriptpubkey.len, 0); + break; + case WATCH_OUTPOINT: + w->key.outpoint = wire->outpoint; + break; + case WATCH_SCID: + if (!mk_short_channel_id(&w->key.scid, + wire->scid_blockheight, + wire->scid_txindex, + wire->scid_outnum)) + return tal_free(w); + break; + case WATCH_BLOCKDEPTH: + w->start_block = wire->blockdepth; + break; + } + + num_owners = tal_count(wire->owners); + w->owners = tal_arr(w, wirestring *, num_owners); + for (size_t i = 0; i < num_owners; i++) + w->owners[i] = tal_strdup(w->owners, wire->owners[i]); + + return w; +} + +static void load_watches_by_type(struct command *cmd, struct bwatch *bwatch, + enum watch_type type) +{ + const char *watch_type_name = bwatch_get_watch_type_name(type); + const char *buf; + const jsmntok_t *datastore, *t; + size_t i, count = 0; + + datastore = bwatch_list_datastore(tmpctx, cmd, "bwatch", watch_type_name, &buf); + + json_for_each_arr(i, t, datastore) { + const u8 *data = json_tok_bin_from_hex(tmpctx, buf, + json_get_member(buf, t, "hex")); + struct watch_wire *wire; + struct watch *w; + + if (!data) + continue; + + if (!fromwire_bwatch_watch(tmpctx, data, &wire)) + continue; + + w = watch_from_wire(bwatch, wire); + if (!w || w->type != type) + continue; + + bwatch_add_watch_to_hash(bwatch, w); + count++; + } + + plugin_log(cmd->plugin, LOG_DBG, "Restored %zu %s from datastore", + count, watch_type_name); +} + +void bwatch_save_watch_to_datastore(struct command *cmd, const struct watch *w) +{ + const u8 *data = towire_bwatch_watch(tmpctx, watch_to_wire(tmpctx, w)); + const char **key = get_watch_datastore_key(tmpctx, w); + struct json_out *params = json_out_new(tmpctx); + const char *buf; + + json_out_start(params, NULL, '{'); + json_out_start(params, "key", '['); + for (size_t i = 0; i < tal_count(key); i++) + json_out_addstr(params, NULL, key[i]); + json_out_end(params, ']'); + json_out_addstr(params, "mode", "create-or-replace"); + json_out_addstr(params, "hex", tal_hex(tmpctx, data)); + json_out_end(params, '}'); + + jsonrpc_request_sync(tmpctx, cmd, "datastore", params, &buf); + + plugin_log(cmd->plugin, LOG_DBG, + "Saved watch to datastore (type=%d, num_owners=%zu)", + w->type, tal_count(w->owners)); +} + +void bwatch_delete_watch_from_datastore(struct command *cmd, const struct watch *w) +{ + const char **key = get_watch_datastore_key(tmpctx, w); + struct json_out *params = json_out_new(tmpctx); + const char *buf; + + json_out_start(params, NULL, '{'); + json_out_start(params, "key", '['); + for (size_t i = 0; i < tal_count(key); i++) + json_out_addstr(params, NULL, key[i]); + json_out_end(params, ']'); + json_out_end(params, '}'); + + jsonrpc_request_sync(tmpctx, cmd, "deldatastore", params, &buf); + + plugin_log(cmd->plugin, LOG_DBG, + "Deleted watch from datastore: ...%s", + key[tal_count(key) - 1]); +} + +void bwatch_load_watches_from_datastore(struct command *cmd, struct bwatch *bwatch) +{ + load_watches_by_type(cmd, bwatch, WATCH_SCRIPTPUBKEY); + load_watches_by_type(cmd, bwatch, WATCH_OUTPOINT); + load_watches_by_type(cmd, bwatch, WATCH_SCID); + load_watches_by_type(cmd, bwatch, WATCH_BLOCKDEPTH); +} + +/* -1 means "not found" */ +static int find_owner(wirestring **owners, const char *owner_id) +{ + for (size_t i = 0; i < tal_count(owners); i++) { + if (streq(owners[i], owner_id)) + return i; + } + return -1; +} + +struct watch *bwatch_add_watch(struct command *cmd, + struct bwatch *bwatch, + enum watch_type type, + const struct bitcoin_outpoint *outpoint, + const u8 *scriptpubkey, + const struct short_channel_id *scid, + const u32 *confirm_height, + u32 start_block, + const char *owner_id TAKES) +{ + struct watch *w = bwatch_get_watch(bwatch, type, outpoint, scriptpubkey, + scid, confirm_height); + + if (w) { + bool lowered = start_block < w->start_block; + bool found_owner = (find_owner(w->owners, owner_id) != -1); + if (lowered) + w->start_block = start_block; + if (!found_owner) + tal_arr_expand(&w->owners, + tal_strdup(w->owners, owner_id)); + bwatch_save_watch_to_datastore(cmd, w); + /* Always rescan even if owner is already registered: stateless + * restarters (e.g. onchaind) re-register on startup and need + * missed spend events replayed. */ + plugin_log(cmd->plugin, LOG_DBG, + found_owner + ? (lowered + ? "Owner %s already watching, lowering start_block to %u" + : "Owner %s already watching, rescanning for missed events at %u") + : "Owner %s added to existing watch, start_block %u", + owner_id, w->start_block); + return w; + } + + w = tal(bwatch, struct watch); + w->type = type; + w->start_block = start_block; + switch (w->type) { + case WATCH_SCRIPTPUBKEY: + w->key.scriptpubkey.len = tal_bytelen(scriptpubkey); + w->key.scriptpubkey.script = tal_dup_talarr(w, u8, scriptpubkey); + break; + case WATCH_OUTPOINT: + w->key.outpoint = *outpoint; + break; + case WATCH_SCID: + w->key.scid = *scid; + break; + case WATCH_BLOCKDEPTH: + /* confirm_height == start_block for blockdepth watches; + * already set from start_block above. */ + break; + } + w->owners = tal_arr(w, wirestring *, 1); + w->owners[0] = tal_strdup(w->owners, owner_id); + bwatch_save_watch_to_datastore(cmd, w); + bwatch_add_watch_to_hash(bwatch, w); + return w; +} + +void bwatch_del_watch(struct command *cmd, + struct bwatch *bwatch, + enum watch_type type, + const struct bitcoin_outpoint *outpoint, + const u8 *scriptpubkey, + const struct short_channel_id *scid, + const u32 *confirm_height, + const char *owner_id) +{ + struct watch *w = bwatch_get_watch(bwatch, type, outpoint, scriptpubkey, + scid, confirm_height); + int owner_off; + + if (!w) { + plugin_log(cmd->plugin, LOG_DBG, + "Attempted to remove non-existent %s watch (already gone)", + bwatch_get_watch_type_name(type)); + return; + } + + owner_off = find_owner(w->owners, owner_id); + if (owner_off < 0) { + plugin_log(cmd->plugin, LOG_BROKEN, + "Attempted to remove watch for owner %s but it wasn't watching", + owner_id); + return; + } + + tal_free(w->owners[owner_off]); + tal_arr_remove(&w->owners, owner_off); + + if (tal_count(w->owners) == 0) { + bwatch_delete_watch_from_datastore(cmd, w); + bwatch_remove_watch_from_hash(bwatch, w); + tal_free(w); + } else { + bwatch_save_watch_to_datastore(cmd, w); + } +} diff --git a/plugins/bwatch/bwatch_store.h b/plugins/bwatch/bwatch_store.h new file mode 100644 index 000000000000..194d2f03ff36 --- /dev/null +++ b/plugins/bwatch/bwatch_store.h @@ -0,0 +1,102 @@ +#ifndef LIGHTNING_PLUGINS_BWATCH_BWATCH_STORE_H +#define LIGHTNING_PLUGINS_BWATCH_BWATCH_STORE_H + +#include "config.h" +#include +#include + +/* + * Per-watch-type key/hash/eq triplets so HTABLE_DEFINE_NODUPS_TYPE can + * generate a typed hash table for each watch type. Lookups then take + * the natural key (raw script bytes, bitcoin_outpoint, short_channel_id, + * or u32 confirm height) instead of dispatching on type at every call. + */ + +const struct scriptpubkey *scriptpubkey_watch_keyof(const struct watch *w); +size_t scriptpubkey_hash(const struct scriptpubkey *scriptpubkey); +bool scriptpubkey_watch_eq(const struct watch *w, const struct scriptpubkey *scriptpubkey); + +const struct bitcoin_outpoint *outpoint_watch_keyof(const struct watch *w); +size_t outpoint_hash(const struct bitcoin_outpoint *outpoint); +bool outpoint_watch_eq(const struct watch *w, const struct bitcoin_outpoint *outpoint); + +const struct short_channel_id *scid_watch_keyof(const struct watch *w); +size_t scid_hash(const struct short_channel_id *scid); +bool scid_watch_eq(const struct watch *w, const struct short_channel_id *scid); + +const u32 *blockdepth_watch_keyof(const struct watch *w); +size_t u32_hash(const u32 *height); +bool blockdepth_watch_eq(const struct watch *w, const u32 *height); + +HTABLE_DEFINE_NODUPS_TYPE(struct watch, scriptpubkey_watch_keyof, + scriptpubkey_hash, scriptpubkey_watch_eq, + scriptpubkey_watches); + +HTABLE_DEFINE_NODUPS_TYPE(struct watch, outpoint_watch_keyof, + outpoint_hash, outpoint_watch_eq, + outpoint_watches); + +HTABLE_DEFINE_NODUPS_TYPE(struct watch, scid_watch_keyof, + scid_hash, scid_watch_eq, + scid_watches); + +HTABLE_DEFINE_NODUPS_TYPE(struct watch, blockdepth_watch_keyof, + u32_hash, blockdepth_watch_eq, + blockdepth_watches); + +/* Human-readable name of a watch type, used as the second datastore key + * component (e.g. ["bwatch", "scriptpubkey", ]). */ +const char *bwatch_get_watch_type_name(enum watch_type type); + +/* Watch hash table operations: dispatch on watch->type. */ +void bwatch_add_watch_to_hash(struct bwatch *bwatch, struct watch *w); +struct watch *bwatch_get_watch(struct bwatch *bwatch, + enum watch_type type, + const struct bitcoin_outpoint *outpoint, + const u8 *scriptpubkey, + const struct short_channel_id *scid, + const u32 *confirm_height); +void bwatch_remove_watch_from_hash(struct bwatch *bwatch, struct watch *w); + +/* Block storage: in-memory history mirrors what's persisted under + * ["bwatch", "block_history", "%010u"]. Writes are async; reads happen + * once at startup. */ +struct command_result *bwatch_add_block_to_datastore( + struct command *cmd, + const struct block_record_wire *br, + struct command_result *(*done)(struct command *cmd)); +void bwatch_add_block_to_history(struct bwatch *bwatch, u32 height, + const struct bitcoin_blkid *hash, + const struct bitcoin_blkid *prev_hash); +void bwatch_delete_block_from_datastore(struct command *cmd, u32 height); +void bwatch_load_block_history(struct command *cmd, struct bwatch *bwatch); + +/* Watch persistence: round-trip via bwatch_wiregen serialisation, + * stored under ["bwatch", , ]. */ +void bwatch_save_watch_to_datastore(struct command *cmd, const struct watch *w); +void bwatch_delete_watch_from_datastore(struct command *cmd, const struct watch *w); +void bwatch_load_watches_from_datastore(struct command *cmd, struct bwatch *bwatch); + +/* High-level add/del that combine hash-table updates and datastore writes, + * and merge owner sets / lower start_block when the same key is registered + * multiple times. */ +struct watch *bwatch_add_watch(struct command *cmd, + struct bwatch *bwatch, + enum watch_type type, + const struct bitcoin_outpoint *outpoint, + const u8 *scriptpubkey, + const struct short_channel_id *scid, + const u32 *confirm_height, + u32 start_block, + const char *owner_id TAKES); + +void bwatch_del_watch(struct command *cmd, + struct bwatch *bwatch, + enum watch_type type, + const struct bitcoin_outpoint *outpoint, + const u8 *scriptpubkey, + const struct short_channel_id *scid, + const u32 *confirm_height, + const char *owner_id); + +#endif /* LIGHTNING_PLUGINS_BWATCH_BWATCH_STORE_H */ diff --git a/plugins/bwatch/bwatch_wire.csv b/plugins/bwatch/bwatch_wire.csv new file mode 100644 index 000000000000..570e0b59da39 --- /dev/null +++ b/plugins/bwatch/bwatch_wire.csv @@ -0,0 +1,36 @@ +#include +#include + +# Block record: complete serializable structure +subtype,block_record_wire +subtypedata,block_record_wire,height,u32, +subtypedata,block_record_wire,hash,bitcoin_blkid, +subtypedata,block_record_wire,prev_hash,bitcoin_blkid, + +# Watch: complete serializable structure +# Type is stored to enable reconstruction of watch key from wire data +subtype,watch_wire +subtypedata,watch_wire,type,u32, +# Scriptpubkey key (for WATCH_SCRIPTPUBKEY) +subtypedata,watch_wire,scriptpubkey_len,u32, +subtypedata,watch_wire,scriptpubkey,u8,scriptpubkey_len +# Outpoint key (for WATCH_OUTPOINT) +subtypedata,watch_wire,outpoint,bitcoin_outpoint, +# SCID key (for WATCH_SCID) +subtypedata,watch_wire,scid_blockheight,u32, +subtypedata,watch_wire,scid_txindex,u32, +subtypedata,watch_wire,scid_outnum,u32, +# Blockdepth key (for WATCH_BLOCKDEPTH): block where the tx confirmed +subtypedata,watch_wire,blockdepth,u32, +# Common fields +subtypedata,watch_wire,start_block,u32, +subtypedata,watch_wire,num_owners,u16, +subtypedata,watch_wire,owners,wirestring,num_owners + +# Messages for datastore persistence - use these to serialize/deserialize +# Each message wraps a single item for storage +msgtype,bwatch_block,1 +msgdata,bwatch_block,block,block_record_wire, + +msgtype,bwatch_watch,2 +msgdata,bwatch_watch,watch,watch_wire, diff --git a/plugins/libplugin.c b/plugins/libplugin.c index b39058d46559..e681eac9d9c9 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -848,9 +848,9 @@ void rpc_scan(struct command *cmd, guide, method, err); } -static void json_add_keypath(struct json_out *jout, - const char *fieldname, - const char **keys) +void json_add_keypath(struct json_out *jout, + const char *fieldname, + const char **keys) { json_out_start(jout, fieldname, '['); for (size_t i = 0; i < tal_count(keys); i++) diff --git a/plugins/libplugin.h b/plugins/libplugin.h index bddba28f7735..68051e965916 100644 --- a/plugins/libplugin.h +++ b/plugins/libplugin.h @@ -696,6 +696,11 @@ struct listpeers_channel **json_to_listpeers_channels(const tal_t *ctx, const char *buffer, const jsmntok_t *tok); +/* Helper to write keys[] array (mainly for datastore ops) */ +void json_add_keypath(struct json_out *jout, + const char *fieldname, + const char **keys); + struct createonion_response { u8 *onion; struct secret *shared_secrets; diff --git a/plugins/xpay/xpay.c b/plugins/xpay/xpay.c index 05d001d80323..8dd38ee47633 100644 --- a/plugins/xpay/xpay.c +++ b/plugins/xpay/xpay.c @@ -1714,22 +1714,6 @@ static struct command_result *populate_private_layer(struct command *cmd, return batch_done(aux_cmd, batch); } -static struct command_result *param_string_array(struct command *cmd, const char *name, - const char *buffer, const jsmntok_t *tok, - const char ***arr) -{ - size_t i; - const jsmntok_t *s; - - if (tok->type != JSMN_ARRAY) - return command_fail_badparam(cmd, name, buffer, tok, - "should be an array"); - *arr = tal_arr(cmd, const char *, tok->size); - json_for_each_arr(i, s, tok) - (*arr)[i] = json_strdup(*arr, buffer, s); - return NULL; -} - static struct command_result * preapproveinvoice_succeed(struct command *cmd, const char *method, diff --git a/tests/test_misc.py b/tests/test_misc.py index e229a3850c43..c1fb3389e298 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -1044,7 +1044,7 @@ def test_cli(node_factory): .format(l1.daemon.lightning_dir), 'help']).decode('utf-8') # Test some known output. - assert 'addgossip message\n\naddpsbtinput' in out + assert 'addgossip message\n\naddoutpointwatch' in out # Check JSON id is as expected l1.daemon.wait_for_log(r'jsonrpc#[0-9]*: "cli:help#[0-9]*"\[IN\]') @@ -1270,7 +1270,7 @@ def test_cli_commando(node_factory): .format(l1.daemon.lightning_dir), 'help']).decode('utf-8') # Test some known output. - assert 'addgossip message\n\naddpsbtinput' in out + assert 'addgossip message\n\naddoutpointwatch' in out # Check JSON id is as expected l1.daemon.wait_for_log(r'jsonrpc#[0-9]*: "cli:help#[0-9]*"\[IN\]') @@ -1377,7 +1377,7 @@ def test_daemon_option(node_factory): '--lightning-dir={}' .format(l1.daemon.lightning_dir), 'help']).decode('utf-8') - assert 'addgossip message\n\naddpsbtinput' in out + assert 'addgossip message\n\naddoutpointwatch' in out subprocess.run(['cli/lightning-cli', '--network={}'.format(TEST_NETWORK), @@ -3932,8 +3932,8 @@ def test_datastore_escapeing(node_factory): def test_datastore(node_factory): - # Suppress xpay and bookkeeper which use the datastore - l1 = node_factory.get_node(options={"disable-plugin": ["cln-xpay", "bookkeeper"]}) + # Suppress plugins that use the datastore (keep list empty for assertions below). + l1 = node_factory.get_node(options={"disable-plugin": ["cln-xpay", "bookkeeper", "bwatch"]}) # Starts empty assert l1.rpc.listdatastore() == {'datastore': []} @@ -4047,8 +4047,8 @@ def test_datastore(node_factory): def test_datastore_keylist(node_factory): - # Suppress xpay and bookkeeper which use the datastore - l1 = node_factory.get_node(options={"disable-plugin": ["cln-xpay", "bookkeeper"]}) + # Suppress plugins that use the datastore (keep list empty for assertions below). + l1 = node_factory.get_node(options={"disable-plugin": ["cln-xpay", "bookkeeper", "bwatch"]}) # Starts empty assert l1.rpc.listdatastore() == {'datastore': []} @@ -4110,8 +4110,8 @@ def test_datastore_keylist(node_factory): def test_datastoreusage(node_factory): - # Suppress xpay and bookkeeper which use the datastore - l1: LightningNode = node_factory.get_node(options={"disable-plugin": ["cln-xpay", "bookkeeper"]}) + # Suppress plugins that use the datastore (same as test_datastore / test_datastore_keylist). + l1: LightningNode = node_factory.get_node(options={"disable-plugin": ["cln-xpay", "bookkeeper", "bwatch"]}) assert l1.rpc.datastoreusage() == {'datastoreusage': {'key': '[]', 'total_bytes': 0}} data = 'somedatatostoreinthedatastore' # len 29 diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 2a40d4757995..fac00a421514 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -32,6 +32,22 @@ import time import unittest +# Fast bwatch polling for tests (plugin default is 30s). Pass explicitly per node. +BWATCH_OPTS = {'bwatch-poll-interval': 500} + + +def wait_bwatch_caught_up(node, timeout=TIMEOUT): + """Wait until bwatch has caught up to the chain tip and is idle. + + After restart, height replayed from the datastore skips the "First poll" + debug line; both paths eventually emit "No block change" once idle. + """ + node.daemon.wait_for_log( + r'First poll: init at block|No block change, current_height remains', + timeout=timeout, + ) + node.daemon.wait_for_log(r'No block change', timeout=timeout) + def test_option_passthrough(node_factory, directory): """ Ensure that registering options works. @@ -5015,3 +5031,877 @@ def test_openchannel_hook_channel_type(node_factory, bitcoind): l2.daemon.wait_for_log(r"plugin-openchannel_hook_accepter.py: accept by design: channel_type {'bits': \[12, 22\], 'names': \['static_remotekey/even', 'anchors/even'\]}") else: l2.daemon.wait_for_log(r"plugin-openchannel_hook_accepter.py: accept by design: channel_type {'bits': \[12\], 'names': \['static_remotekey/even'\]}") + + +def reverse_bitcoin_hash(hash_hex): + """Convert Bitcoin hash between display format and wire format. + + Bitcoin hashes are stored in reverse byte order in the wire protocol + compared to how they're displayed in RPC calls. + """ + return ''.join(reversed([hash_hex[i:i + 2] for i in range(0, len(hash_hex), 2)])) + + +def test_bwatch_add_watch_creates_datastore_entry(node_factory, bitcoind): + """Test that adding a watch creates a datastore entry""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + # Use an outpoint watch (scriptpubkey, outpoint, scid, blockdepth are the + # four watch types; there is no standalone txid type). + test_txid = "0" * 64 + test_outpoint = f"{test_txid}:0" + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=100) + + # Verify it's in the datastore + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + +def test_bwatch_multiple_owners_same_watch(node_factory, bitcoind): + """Test that multiple owners can watch the same thing""" + l1 = node_factory.get_node() + + test_txid = "1" * 64 + test_outpoint = f"{test_txid}:0" + + # Add watch with two different owners for the same outpoint + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=100) + l1.rpc.addoutpointwatch(owner='wallet/p2tr/0', outpoint=test_outpoint, start_block=200) + + # Should still be one datastore entry (one outpoint, two owners) + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert sum(1 for d in ds['datastore'] if d['key'] == ['bwatch', 'outpoint', test_outpoint]) == 1 + + +def test_bwatch_same_owner_adds_twice(node_factory, bitcoind): + """Test that the same owner adding the same watch twice is idempotent""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + test_txid = "6" * 64 + test_outpoint = f"{test_txid}:0" + + # Add watch with start_block 100 + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=100) + + # Add same watch again with different start_block + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=200) + + # Should log that owner already exists + l1.daemon.wait_for_log(r'Owner wallet/p2wpkh/0 already watching') + + # Should still be just one datastore entry + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert sum(1 for d in ds['datastore'] if d['key'] == ['bwatch', 'outpoint', test_outpoint]) == 1 + + # Removing once should delete the watch (only one owner, not two) + l1.rpc.deloutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint) + + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert not any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + +def test_bwatch_remove_one_owner_keeps_watch(node_factory, bitcoind): + """Test that removing one owner doesn't remove the watch if others remain""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + test_txid = "2" * 64 + test_outpoint = f"{test_txid}:0" + + # Add watch with two owners + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=100) + l1.rpc.addoutpointwatch(owner='wallet/p2tr/0', outpoint=test_outpoint, start_block=100) + + # Remove first owner + l1.rpc.deloutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint) + + # Watch should still exist (wallet/p2tr/0 is still watching) + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + +def test_bwatch_remove_last_owner_deletes_watch(node_factory, bitcoind): + """Test that removing the last owner deletes the datastore entry""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + test_txid = "3" * 64 + test_outpoint = f"{test_txid}:0" + + # Add watch with one owner + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=100) + + # Verify it exists + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + # Remove the only owner + l1.rpc.deloutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint) + + # Watch should be gone + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert not any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + +def test_bwatch_scriptpubkey_watch(node_factory, bitcoind): + """Test scriptpubkey watch datastore operations""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # A simple P2PKH scriptpubkey — not used by the wallet (which uses P2WPKH/P2TR/P2SH-P2WPKH) + test_spk = "76a914" + "00" * 20 + "88ac" + expected_key = ['bwatch', 'scriptpubkey', test_spk] + + l1.rpc.addscriptpubkeywatch(owner='wallet/p2wpkh/0', scriptpubkey=test_spk, start_block=100) + + # Verify our specific key is in the datastore (wallet also has scriptpubkey entries) + ds = l1.rpc.listdatastore(['bwatch', 'scriptpubkey']) + assert any(d['key'] == expected_key for d in ds['datastore']) + + # Remove it + l1.rpc.delscriptpubkeywatch(owner='wallet/p2wpkh/0', scriptpubkey=test_spk) + + ds = l1.rpc.listdatastore(['bwatch', 'scriptpubkey']) + assert not any(d['key'] == expected_key for d in ds['datastore']) + + +def test_bwatch_outpoint_watch(node_factory, bitcoind): + """Test outpoint watch datastore operations""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + test_txid = "4" * 64 + test_outpoint = f"{test_txid}:0" + + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=100) + + # Verify it's in the datastore (use any() since wallet may have its own outpoints) + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + # Remove it + l1.rpc.deloutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint) + + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert not any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + +def test_bwatch_rescan_triggered_for_past_start_block(node_factory, bitcoind): + """Test that adding a watch with start_block in the past triggers a rescan""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to fully sync to chain tip + l1.daemon.wait_for_log(r'No block change') + + # Get current height (now bwatch is synced) + info = l1.rpc.getinfo() + current_height = info['blockheight'] + + test_txid = "7" * 64 + test_outpoint = f"{test_txid}:0" + + # Add watch with start_block in the past (before current height) + start_block = current_height - 5 + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=start_block) + + # Should trigger a rescan + l1.daemon.wait_for_log(rf'Starting rescan for outpoint watch: blocks {start_block}-{current_height}') + + # Rescan should complete + l1.daemon.wait_for_log(r'Rescan complete') + + +def test_bwatch_no_rescan_for_future_start_block(node_factory, bitcoind): + """Test that adding a watch with start_block in the future doesn't trigger rescan""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to sync + wait_bwatch_caught_up(l1) + + info = l1.rpc.getinfo() + current_height = info['blockheight'] + + test_txid = "8" * 64 + test_outpoint = f"{test_txid}:0" + + # Add watch with start_block in the future + future_block = current_height + 100 + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=future_block) + + # Should NOT trigger a rescan - give it a moment then check logs + import time + time.sleep(0.5) + + # Check that no rescan was started for this watch + assert not l1.daemon.is_in_log(rf'Starting rescan.*blocks.*{future_block}') + + +def test_bwatch_rescan_scriptpubkey(node_factory, bitcoind): + """Test that scriptpubkey watches also trigger rescan""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to fully sync + l1.daemon.wait_for_log(r'No block change') + + info = l1.rpc.getinfo() + current_height = info['blockheight'] + + test_spk = "76a914" + "11" * 20 + "88ac" + start_block = current_height - 3 + + l1.rpc.addscriptpubkeywatch(owner='wallet/p2wpkh/0', scriptpubkey=test_spk, start_block=start_block) + + l1.daemon.wait_for_log(rf'Starting rescan for scriptpubkey watch: blocks {start_block}-{current_height}') + l1.daemon.wait_for_log(r'Rescan complete') + + +@pytest.mark.slow_test +def test_bwatch_scriptpubkey_watch_notifies_lightningd(node_factory, bitcoind): + """Test that a matching scriptpubkey triggers watch_found to lightningd""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to fully sync + wait_bwatch_caught_up(l1) + + # Get an address and its scriptpubkey + addr = l1.rpc.newaddr('bech32')['bech32'] + addr_info = bitcoind.rpc.getaddressinfo(addr) + scriptpubkey = addr_info['scriptPubKey'] + + # Add a watch for this scriptpubkey with a wallet owner (p2wpkh uses keyindex 0 by default for newaddr) + l1.rpc.addscriptpubkeywatch(owner='wallet/p2wpkh/0', scriptpubkey=scriptpubkey, start_block=100) + + # Send coins to that address (creates tx with matching scriptpubkey) + bitcoind.rpc.sendtoaddress(addr, 0.01) + bitcoind.generate_block(1) + + # Wait for bwatch to process the block and send watch_found notification + l1.daemon.wait_for_log(r'watch_found at block', timeout=60) + + +def test_bwatch_outpoint_watch_notifies_lightningd(node_factory, bitcoind): + """Test that spending a watched outpoint triggers watch_found to lightningd""" + l1, l2 = node_factory.get_nodes(2, opts=[dict(BWATCH_OPTS), dict(BWATCH_OPTS)]) + + # Wait for bwatch to be ready so wallet scriptpubkey watches are active + # before we mine the funding block. + l1.daemon.wait_for_log(r'No block change') + l2.daemon.wait_for_log(r'No block change') + + # Fund l1 manually using listfunds to detect confirmation (bwatch populates + # the outputs table via watch_found when the scriptpubkey watch fires). + addr = l1.rpc.newaddr('p2tr')['p2tr'] + bitcoind.rpc.sendtoaddress(addr, 10.0) + bitcoind.generate_block(1) + wait_for(lambda: len(l1.rpc.listfunds()['outputs']) > 0, timeout=60) + + l1.connect(l2) + l1.fundchannel(l2, 1_000_000) + + # Get the channel's funding outpoint (first channel has dbid 1) + channels = l1.rpc.listpeerchannels(l2.info['id'])['channels'] + ch = only_one([c for c in channels if c['state'] == 'CHANNELD_NORMAL']) + outpoint = f"{ch['funding_txid']}:{ch['funding_outnum']}" + + # channel/funding_spent/ is the real handler — the channel already + # registered it, so addoutpointwatch may report "already watching"; either + # way the watch exists and will fire when the funding is spent. + l1.rpc.addoutpointwatch(owner='channel/funding_spent/1', outpoint=outpoint, start_block=100) + + txid = only_one(l1.rpc.close(l2.info['id'])['txids']) + bitcoind.generate_block(1, wait_for_mempool=txid) + + l1.daemon.wait_for_log(r'watch_found at block', timeout=60) + + +@pytest.mark.slow_test +def test_bwatch_rescan_notifies_lightningd(node_factory, bitcoind): + """Test that matches found during rescan also trigger watch_found""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Get an address + addr = l1.rpc.newaddr('bech32')['bech32'] + addr_info = bitcoind.rpc.getaddressinfo(addr) + scriptpubkey = addr_info['scriptPubKey'] + + # Send coins FIRST (before adding the watch) + bitcoind.rpc.sendtoaddress(addr, 0.01) + bitcoind.generate_block(1) + + # Wait for bwatch to fully sync to the new block + import time + time.sleep(2) + l1.daemon.wait_for_log(r'No block change') + + # Now get current height and add watch with start_block in the past + info = l1.rpc.getinfo() + start_block = info['blockheight'] - 1 # The block we just mined + + # Add watch - should trigger rescan and find the tx + l1.rpc.addscriptpubkeywatch(owner='wallet/p2wpkh/0', scriptpubkey=scriptpubkey, + start_block=start_block) + + # Should trigger rescan + l1.daemon.wait_for_log(r'Starting rescan') + + # Rescan should find the match and notify lightningd + l1.daemon.wait_for_log(r'watch_found at block', timeout=60) + + +def test_bwatch_watches_persist_across_restart(node_factory, bitcoind): + """Test that watches are restored from datastore after restart""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + test_txid = "5" * 64 + test_outpoint = f"{test_txid}:0" + + # Add an outpoint watch (persisted in bwatch datastore) + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint, start_block=500) + + # Restart the node + l1.restart() + + # The watch should still be in the datastore after restart + ds = l1.rpc.listdatastore(['bwatch', 'outpoint']) + assert any(d['key'] == ['bwatch', 'outpoint', test_outpoint] for d in ds['datastore']) + + +def test_bwatch_reorg_1_block(node_factory, bitcoind): + """Test bwatch handles a 1-block reorg correctly""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to initialize and sync to initial tip + wait_bwatch_caught_up(l1) + + # Mine a few blocks to establish history + bitcoind.generate_block(5) + # Wait for bwatch to fully catch up (important: bwatch must have the block + # that will be reorged, otherwise it can't detect the reorg) + l1.daemon.wait_for_log(r'No block change') + + # Get the actual number of blocks bwatch has stored before reorg + ds_before = l1.rpc.listdatastore(['bwatch', 'block_history']) + blocks_before = len(ds_before['datastore']) + + # Get the hash of the last block (the one we'll reorg out) + height = bitcoind.rpc.getblockcount() + old_block_hash = bitcoind.rpc.getblockhash(height) + common_ancestor_hash = bitcoind.rpc.getblockhash(height - 1) + + # Invalidate the last block (1-block reorg) + bitcoind.rpc.invalidateblock(old_block_hash) + + # Mine 2 new blocks on the new chain + bitcoind.generate_block(2) + + # bwatch should detect and handle the reorg + l1.daemon.wait_for_log(r'Reorg detected', timeout=60) + # Persisting rolled-forward blocks is async; don't assert immediately after + # "Reorg detected" (see test_bwatch_reorg_2_blocks). + l1.daemon.wait_for_log(r'No block change', timeout=60) + + # Verify bwatch's block history matches bitcoind's new chain + new_height = bitcoind.rpc.getblockcount() + # After reorg: removed 1 old block, added 2 new blocks, so net +1 + wait_for(lambda: len(l1.rpc.listdatastore(['bwatch', 'block_history'])['datastore']) + == blocks_before + 1, + timeout=60) + + ds = l1.rpc.listdatastore(['bwatch', 'block_history']) + + # Verify the common ancestor is still present with correct hash + ancestor_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{height - 1:010d}"), None) + assert ancestor_entry is not None + # Decode the block record and verify hash matches + assert reverse_bitcoin_hash(common_ancestor_hash) in str(ancestor_entry['hex']) + + # Verify the old block hash is not in datastore + for entry in ds['datastore']: + assert reverse_bitcoin_hash(old_block_hash) not in str(entry['hex']) + + # Verify the new tip matches bitcoind + new_tip_hash = bitcoind.rpc.getblockhash(new_height) + tip_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{new_height:010d}"), None) + assert tip_entry is not None + assert reverse_bitcoin_hash(new_tip_hash) in str(tip_entry['hex']) + + +@pytest.mark.slow_test +def test_bwatch_reorg_2_blocks(node_factory, bitcoind): + """Test bwatch handles a 2-block reorg correctly""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to initialize + wait_bwatch_caught_up(l1) + + # Mine some blocks + bitcoind.generate_block(5) + expected_height = bitcoind.rpc.getblockcount() + l1.daemon.wait_for_log(rf'Added block {expected_height} to history') + + # Get the hash of block to reorg from (2 blocks back) + height = bitcoind.rpc.getblockcount() + old_block1_hash = bitcoind.rpc.getblockhash(height - 1) + old_block2_hash = bitcoind.rpc.getblockhash(height) + common_ancestor_hash = bitcoind.rpc.getblockhash(height - 2) + + # Invalidate to cause 2-block reorg + bitcoind.rpc.invalidateblock(old_block1_hash) + + # Mine longer chain + bitcoind.generate_block(4) + time.sleep(2) + + # bwatch should detect and handle the reorg + l1.daemon.wait_for_log(r'Reorg detected', timeout=60) + # Wait for bwatch to finish processing the new chain + l1.daemon.wait_for_log(r'No block change') + + # Verify bwatch's block history matches bitcoind's new chain + new_height = bitcoind.rpc.getblockcount() + + # Check that bwatch has correct number of blocks (from its start height, not genesis) + initial_height = 101 # regtest starts at this height + expected_blocks = new_height - initial_height + 1 + + # Wait for datastore to be fully updated + wait_for(lambda: len(l1.rpc.listdatastore(['bwatch', 'block_history'])['datastore']) == expected_blocks, timeout=60) + + ds = l1.rpc.listdatastore(['bwatch', 'block_history']) + + # Verify the common ancestor is still present with correct hash + ancestor_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{height - 2:010d}"), None) + assert ancestor_entry is not None + assert reverse_bitcoin_hash(common_ancestor_hash) in str(ancestor_entry['hex']) + + # Verify the old block hashes are not in datastore + for entry in ds['datastore']: + assert reverse_bitcoin_hash(old_block1_hash) not in str(entry['hex']) + assert reverse_bitcoin_hash(old_block2_hash) not in str(entry['hex']) + + # Verify the new tip matches bitcoind + new_tip_hash = bitcoind.rpc.getblockhash(new_height) + tip_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{new_height:010d}"), None) + assert tip_entry is not None + assert reverse_bitcoin_hash(new_tip_hash) in str(tip_entry['hex']) + + +def test_bwatch_reorg_long_chain(node_factory, bitcoind): + """Test bwatch handles a longer reorg (5+ blocks)""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to initialize and sync to initial tip + wait_bwatch_caught_up(l1) + + # Mine 10 blocks and wait for bwatch to fully catch up + bitcoind.generate_block(10) + expected_height = bitcoind.rpc.getblockcount() + l1.daemon.wait_for_log(rf'Added block {expected_height} to history') + + # Reorg 5 blocks - save hashes of blocks that will be reorged out + height = bitcoind.rpc.getblockcount() + common_ancestor_height = height - 5 + common_ancestor_hash = bitcoind.rpc.getblockhash(common_ancestor_height) + old_hashes = [bitcoind.rpc.getblockhash(h) + for h in range(common_ancestor_height + 1, height + 1)] + + reorg_from_hash = bitcoind.rpc.getblockhash( + common_ancestor_height + 1) + bitcoind.rpc.invalidateblock(reorg_from_hash) + + # Mine longer replacement chain + bitcoind.generate_block(8) + time.sleep(3) + + # Should handle the deep reorg + l1.daemon.wait_for_log(r'Reorg detected') + + # Verify bwatch's block history matches bitcoind's new chain + new_height = bitcoind.rpc.getblockcount() + + # Wait for bwatch to fully sync to the new chain height + # bwatch stores blocks from initial height (101), not genesis + initial_height = 101 # regtest starts at this height + expected_blocks = new_height - initial_height + 1 + wait_for(lambda: len(l1.rpc.listdatastore(['bwatch', 'block_history'])['datastore']) == expected_blocks) + + ds = l1.rpc.listdatastore(['bwatch', 'block_history']) + + # Check that bwatch has correct number of blocks + assert len(ds['datastore']) == expected_blocks + + # Verify the common ancestor is still present with correct hash + ancestor_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{common_ancestor_height:010d}"), + None) + assert ancestor_entry is not None + assert reverse_bitcoin_hash(common_ancestor_hash) in str(ancestor_entry['hex']) + + # Verify none of the old block hashes are in datastore + for entry in ds['datastore']: + for old_hash in old_hashes: + assert reverse_bitcoin_hash(old_hash) not in str(entry['hex']) + + # Verify the new tip matches bitcoind + new_tip_hash = bitcoind.rpc.getblockhash(new_height) + tip_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{new_height:010d}"), None) + assert tip_entry is not None + assert reverse_bitcoin_hash(new_tip_hash) in str(tip_entry['hex']) + + +@pytest.mark.slow_test +def test_bwatch_reorg_at_startup(node_factory, bitcoind): + """Test bwatch handles reorg that happened while node was down""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to initialize and fully sync + wait_bwatch_caught_up(l1) + + bitcoind.generate_block(5) + expected_height = bitcoind.rpc.getblockcount() + # Wait for bwatch to catch up to these new blocks + l1.daemon.wait_for_log(rf'Added block {expected_height} to history', timeout=60) + + # Get block hash before stopping + height = bitcoind.rpc.getblockcount() + reorg_from_hash = bitcoind.rpc.getblockhash(height - 2) + + # Stop the node + l1.stop() + + # Cause a reorg while node is down + bitcoind.rpc.invalidateblock(reorg_from_hash) + bitcoind.generate_block(5) + + # Restart the node + l1.start() + + # bwatch should detect the chain changed and handle reorg during catch-up + # (reorg detection happens during catch-up, before "initialized" message) + l1.daemon.wait_for_log(r'Reorg detected', timeout=60) + + # Wait for it to finish syncing + l1.daemon.wait_for_log(r'No block change') + + # Verify the node is tracking the correct chain + info = l1.rpc.getinfo() + assert info['blockheight'] == bitcoind.rpc.getblockcount() + + +@pytest.mark.slow_test +def test_bwatch_block_history_rollback(node_factory, bitcoind): + """Test that block history is correctly rolled back during reorg""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to fully sync + wait_bwatch_caught_up(l1) + + # Mine blocks + bitcoind.generate_block(5) + expected_height = bitcoind.rpc.getblockcount() + + # Wait for bwatch to process the new blocks + l1.daemon.wait_for_log(rf'Added block {expected_height} to history', timeout=60) + + # Check block history exists in datastore + initial_height = 101 # regtest starts at this height + expected_initial_blocks = expected_height - initial_height + 1 + wait_for(lambda: len(l1.rpc.listdatastore(['bwatch', 'block_history'])['datastore']) == expected_initial_blocks, timeout=60) + + ds = l1.rpc.listdatastore(['bwatch', 'block_history']) + initial_count = len(ds['datastore']) + # bwatch stores blocks from its start height (101), so we should have at least 5 new blocks + assert initial_count >= 5 + + # Cause a 3-block reorg - save hashes of blocks that will be reorged + height = bitcoind.rpc.getblockcount() + common_ancestor_height = height - 3 + common_ancestor_hash = bitcoind.rpc.getblockhash(common_ancestor_height) + old_hashes = [bitcoind.rpc.getblockhash(h) + for h in range(common_ancestor_height + 1, height + 1)] + + reorg_from_hash = bitcoind.rpc.getblockhash( + common_ancestor_height + 1) + bitcoind.rpc.invalidateblock(reorg_from_hash) + bitcoind.generate_block(5) + + # Verify reorg was handled + l1.daemon.wait_for_log(r'Reorg detected', timeout=60) + # Wait for bwatch to finish syncing to the new chain + l1.daemon.wait_for_log(r'No block change') + + # Verify block history after reorg + ds = l1.rpc.listdatastore(['bwatch', 'block_history']) + new_height = bitcoind.rpc.getblockcount() + + # Should have correct number of blocks (from its start height, not genesis) + initial_height = 101 # regtest starts at this height + expected_blocks = new_height - initial_height + 1 + assert len(ds['datastore']) == expected_blocks + + # Verify the common ancestor is still present with correct hash + ancestor_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{common_ancestor_height:010d}"), + None) + assert ancestor_entry is not None + assert reverse_bitcoin_hash(common_ancestor_hash) in str(ancestor_entry['hex']) + + # Verify old block hashes are not present + for entry in ds['datastore']: + for old_hash in old_hashes: + assert reverse_bitcoin_hash(old_hash) not in str(entry['hex']) + + # Verify all blocks from common ancestor to tip match bitcoind + for h in range(common_ancestor_height, new_height + 1): + expected_hash = bitcoind.rpc.getblockhash(h) + block_entry = next((e for e in ds['datastore'] + if e['key'][-1] == f"{h:010d}"), None) + assert block_entry is not None + assert reverse_bitcoin_hash(expected_hash) in str(block_entry['hex']) + + +@pytest.mark.slow_test +def test_bwatch_listwatch(node_factory, bitcoind): + """Test that listwatch RPC returns all active watches""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Record the baseline — the wallet registers scriptpubkey watches on startup. + initial_count = len(l1.rpc.listwatch()['watches']) + + # Add an outpoint watch — clearly not a real UTXO. + test_outpoint_a_txid = "a" * 64 + test_outpoint_a = f"{test_outpoint_a_txid}:0" + l1.rpc.addoutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint_a, start_block=100) + + # Add a P2PKH scriptpubkey watch — not used by the wallet (P2WPKH/P2TR/P2SH-P2WPKH only). + test_scriptpubkey = "76a914" + "b" * 40 + "88ac" + l1.rpc.addscriptpubkeywatch(owner='wallet/p2tr/0', scriptpubkey=test_scriptpubkey, start_block=200) + + # Add a second outpoint watch + test_outpoint_c_txid = "c" * 64 + test_outpoint_c = f"{test_outpoint_c_txid}:1" + l1.rpc.addoutpointwatch(owner='wallet/p2sh_p2wpkh/0', outpoint=test_outpoint_c, start_block=150) + + # Add a second owner to the first outpoint watch + l1.rpc.addoutpointwatch(owner='wallet/p2tr/0', outpoint=test_outpoint_a, start_block=50) + + result = l1.rpc.listwatch() + watches = result['watches'] + + # 3 new unique watches added on top of the wallet's initial set + assert len(watches) == initial_count + 3 + + # Find each test watch by its unique identifier + outpoint_a_watch = next((w for w in watches if w.get('outpoint') == test_outpoint_a), None) + scriptpubkey_watch = next((w for w in watches if w.get('scriptpubkey') == test_scriptpubkey), None) + outpoint_c_watch = next((w for w in watches if w.get('outpoint') == test_outpoint_c), None) + + # Verify first outpoint watch (two owners, start_block is the minimum) + assert outpoint_a_watch is not None + assert outpoint_a_watch['start_block'] == 50 # minimum of 100 and 50 + assert len(outpoint_a_watch['owners']) == 2 + assert 'wallet/p2wpkh/0' in outpoint_a_watch['owners'] + assert 'wallet/p2tr/0' in outpoint_a_watch['owners'] + + # Verify scriptpubkey watch + assert scriptpubkey_watch is not None + assert scriptpubkey_watch['start_block'] == 200 + assert len(scriptpubkey_watch['owners']) == 1 + assert scriptpubkey_watch['owners'][0] == 'wallet/p2tr/0' + + # Verify second outpoint watch + assert outpoint_c_watch is not None + assert outpoint_c_watch['start_block'] == 150 + assert len(outpoint_c_watch['owners']) == 1 + assert outpoint_c_watch['owners'][0] == 'wallet/p2sh_p2wpkh/0' + + # Remove one owner from first outpoint watch — watch itself should remain + l1.rpc.deloutpointwatch(owner='wallet/p2wpkh/0', outpoint=test_outpoint_a) + + watches = l1.rpc.listwatch()['watches'] + assert len(watches) == initial_count + 3 + outpoint_a_watch = next(w for w in watches if w.get('outpoint') == test_outpoint_a) + assert len(outpoint_a_watch['owners']) == 1 + assert outpoint_a_watch['owners'][0] == 'wallet/p2tr/0' + + # Remove the last owner — outpoint watch should disappear entirely + l1.rpc.deloutpointwatch(owner='wallet/p2tr/0', outpoint=test_outpoint_a) + + watches = l1.rpc.listwatch()['watches'] + assert len(watches) == initial_count + 2 + assert not any(w.get('outpoint') == test_outpoint_a for w in watches) + + +# ============================================================================= +# Blockdepth watch tests +# ============================================================================= + +def test_bwatch_blockdepth_watch_creates_datastore_entry(node_factory, bitcoind): + """Test that adding a blockdepth watch creates a datastore entry""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + # Wait for bwatch to sync + l1.daemon.wait_for_log(r'No block change') + + info = l1.rpc.getinfo() + start_block = info['blockheight'] + + owner = "channel/funding_depth/999" + l1.rpc.addblockdepthwatch(owner=owner, start_block=start_block) + + # Blockdepth watches are stored under ['bwatch', 'blockdepth', ] + ds = l1.rpc.listdatastore(['bwatch', 'blockdepth']) + blockdepth_keys = [d.get('key') for d in ds['datastore'] if d.get('key', [])[:2] == ['bwatch', 'blockdepth']] + assert any( + k[-1] == str(start_block) for k in blockdepth_keys + ), f"Expected blockdepth watch for start_block {start_block}, got keys: {blockdepth_keys}" + + # Also verify via listwatch + watches = l1.rpc.listwatch()['watches'] + bdw = [w for w in watches if w.get('type') == 'blockdepth' and owner in w.get('owners', [])] + assert len(bdw) == 1, f"Expected one blockdepth watch with owner {owner}, got: {watches}" + assert bdw[0]['start_block'] == start_block + + +def test_bwatch_blockdepth_watch_remove(node_factory, bitcoind): + """Test that deleting a blockdepth watch removes it from the datastore""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + l1.daemon.wait_for_log(r'No block change') + + info = l1.rpc.getinfo() + start_block = info['blockheight'] + + owner = f"channel/funding_depth/888" + l1.rpc.addblockdepthwatch(owner=owner, start_block=start_block) + + # Verify it's present + watches = l1.rpc.listwatch()['watches'] + assert any(owner in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + + # Delete it + l1.rpc.delblockdepthwatch(owner=owner, start_block=start_block) + + watches = l1.rpc.listwatch()['watches'] + assert not any(owner in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + + +def test_bwatch_blockdepth_watch_multiple_owners(node_factory, bitcoind): + """Test that multiple owners can share a blockdepth watch at the same start_block""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + l1.daemon.wait_for_log(r'No block change') + + info = l1.rpc.getinfo() + start_block = info['blockheight'] + + owner_a = "channel/funding_depth/777" + owner_b = "channel/funding_depth/778" + + l1.rpc.addblockdepthwatch(owner=owner_a, start_block=start_block) + l1.rpc.addblockdepthwatch(owner=owner_b, start_block=start_block) + + watches = l1.rpc.listwatch()['watches'] + bdw = [w for w in watches if w.get('type') == 'blockdepth' + and (owner_a in w.get('owners', []) or owner_b in w.get('owners', []))] + # Both owners may share one watch entry or appear in separate entries depending + # on whether bwatch merges same-start_block watches; either way both owners present. + all_owners = [o for w in bdw for o in w.get('owners', [])] + assert owner_a in all_owners + assert owner_b in all_owners + + # Removing one owner keeps the watch (the other remains) + l1.rpc.delblockdepthwatch(owner=owner_a, start_block=start_block) + watches = l1.rpc.listwatch()['watches'] + assert not any(owner_a in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + assert any(owner_b in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + + # Removing the last owner deletes the entry entirely + l1.rpc.delblockdepthwatch(owner=owner_b, start_block=start_block) + watches = l1.rpc.listwatch()['watches'] + assert not any(owner_b in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + + +@pytest.mark.slow_test +def test_bwatch_blockdepth_watch_fires_each_block(node_factory, bitcoind): + """Test that a blockdepth watch fires watch_found for every block >= start_block""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + l1.daemon.wait_for_log(r'No block change') + + info = l1.rpc.getinfo() + start_block = info['blockheight'] + 1 # watch starts at the next block + + # Use a channel/funding_depth/ owner so watchman dispatches it + owner = "channel/funding_depth/42" + l1.rpc.addblockdepthwatch(owner=owner, start_block=start_block) + + # Mine the block that triggers the watch + bitcoind.generate_block(1) + + # watch_found (blockdepth) should be logged — depth==1 at start_block + l1.daemon.wait_for_log(r'watch_found at block.*blockdepth', timeout=60) + + # Mine another block — the watch fires again (it fires every block until deleted) + bitcoind.generate_block(1) + l1.daemon.wait_for_log(r'watch_found at block.*blockdepth', timeout=60) + + # Clean up + l1.rpc.delblockdepthwatch(owner=owner, start_block=start_block) + + +@pytest.mark.slow_test +def test_bwatch_blockdepth_watch_persists_across_restart(node_factory, bitcoind): + """Test that a blockdepth watch survives a node restart""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + l1.daemon.wait_for_log(r'No block change') + + info = l1.rpc.getinfo() + start_block = info['blockheight'] + + owner = "channel/funding_depth/55" + l1.rpc.addblockdepthwatch(owner=owner, start_block=start_block) + + # Verify present before restart + watches = l1.rpc.listwatch()['watches'] + assert any(owner in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + + l1.restart() + l1.daemon.wait_for_log(r'No block change') + + # Watch should be reloaded from bwatch's datastore after restart + watches = l1.rpc.listwatch()['watches'] + assert any(owner in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + + # Clean up + l1.rpc.delblockdepthwatch(owner=owner, start_block=start_block) + + +def test_bwatch_blockdepth_watch_no_fire_before_start_block(node_factory, bitcoind): + """Test that a blockdepth watch with a future start_block doesn't fire early""" + l1 = node_factory.get_node(options=BWATCH_OPTS) + + l1.daemon.wait_for_log(r'No block change') + + info = l1.rpc.getinfo() + # Set start_block well in the future so it cannot fire during this test + future_start = info['blockheight'] + 1000 + + owner = "channel/funding_depth/11" + l1.rpc.addblockdepthwatch(owner=owner, start_block=future_start) + + # Mine a block — watch must NOT fire (start_block is 1000 blocks away) + bitcoind.generate_block(1) + import time + time.sleep(1) + + # No blockdepth watch_found should have been logged for our start_block + assert not l1.daemon.is_in_log( + rf'watch_found at block.*blockdepth.*{future_start}' + ) + + # Verify watch is still present + watches = l1.rpc.listwatch()['watches'] + assert any(owner in w.get('owners', []) for w in watches if w.get('type') == 'blockdepth') + + # Clean up + l1.rpc.delblockdepthwatch(owner=owner, start_block=future_start)