diff --git a/.github/workflows/check_unicode.yml b/.github/workflows/check_unicode.yml new file mode 100644 index 00000000000..a01add3f814 --- /dev/null +++ b/.github/workflows/check_unicode.yml @@ -0,0 +1,26 @@ +name: Unicode listing up to date +on: + workflow_dispatch: + schedule: + - cron: '42 3 * * *' + +jobs: + check-unicode: + runs-on: ubuntu-latest + permissions: + issues: write + steps: + - name: Checkout source code + uses: actions/checkout@v4 + - name: Check unicode file state + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + curl --proto '=https' --tlsv1.2 -fsSL -o /tmp/UnicodeData.txt https://www.unicode.org/Public/UCD/latest/ucd/UnicodeData.txt + contrib/gen_unicode_general_category.py /tmp/UnicodeData.txt -o /tmp/unicode.rs + if ! diff -u lightning-types/src/unicode.rs /tmp/unicode.rs; then + TITLE="Unicode listing out of date: ${{ github.workflow }}" + RUN_URL="https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}" + BODY="The unicode character listing is out of date, see $RUN_URL" + gh issue create --title "$TITLE" --body "$BODY" + fi diff --git a/contrib/gen_unicode_general_category.py b/contrib/gen_unicode_general_category.py new file mode 100755 index 00000000000..4871e967b55 --- /dev/null +++ b/contrib/gen_unicode_general_category.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +# This file is Copyright its original authors, visible in version control +# history. +# +# This file is licensed under the Apache License, Version 2.0 or the MIT license +# , at your option. +# You may not use this file except in accordance with one or both of these +# licenses. + +"""Generate Unicode general-category predicates from `UnicodeData.txt`. + +Emits two `pub(crate)` functions taking a `char`, split into two disjoint +buckets across the Unicode top-level `C` ("Other") category so callers can +compose them: + + is_unicode_general_category_other — Cc / Cf / Cs / Co (assigned) + is_unicode_general_category_unassigned — Cn (plus codepoints above + U+10FFFF, which aren't + valid codepoints at all) + +`UnicodeData.txt` is the canonical machine-readable listing of every assigned +codepoint in the Unicode Character Database. Each line is `;`-separated; field +0 is the codepoint (hex), field 1 is the name, and field 2 is the two-letter +general category (e.g. `Lu`, `Cf`, `Mn`). Codepoints absent from the file have +category `Cn` (Unassigned) by convention. + +Two encoding details to preserve: + * Large blocks of contiguous same-category codepoints are written as two + consecutive entries whose names end in `, First>` and `, Last>`. Every + codepoint between First and Last (inclusive) shares the listed category. + * The codepoint range is U+0000..=U+10FFFF. + +Each `matches!` arm in the assigned-Other table carries an end-of-line comment +derived from the `UnicodeData.txt` name field — typically the longest common +word prefix or suffix across the names in the range, falling back to the set +of categories when the names share nothing meaningful. The unassigned table +omits per-arm comments since every range there has the same meaning by +construction. + +Usage: + contrib/gen_unicode_general_category.py UnicodeData.txt > out.rs +""" + +import argparse +import sys +from pathlib import Path + +MAX_CODEPOINT = 0x10FFFF + +LICENSE_HEADER = """\ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. +""" + +GENERATED_NOTICE = """\ +// Auto-generated from the Unicode Character Database (UnicodeData.txt) by +// contrib/gen_unicode_general_category.py. Do not edit by hand; rerun the +// generator with an updated UnicodeData.txt to refresh the table. +""" + + +def _normalize_name(name): + """Strip the `<...>` wrapping and `, First` / `, Last` range markers so + that, e.g., `` becomes + `Non Private Use High Surrogate` and `` becomes `control`. + """ + if name.startswith("<") and name.endswith(">"): + inner = name[1:-1] + for suffix in (", First", ", Last"): + if inner.endswith(suffix): + inner = inner[: -len(suffix)] + return inner + return name + + +def parse_categories(path): + """Return `(cats, names)` mapping every codepoint listed in `path` to its + general category and to its (normalised) name. Codepoints absent from the + returned dicts have category `Cn` (Unassigned) and no name. + """ + cats = {} + names = {} + pending_first = None # (first_cp, first_cat, normalised_name) once a range opens. + with path.open() as f: + for lineno, raw in enumerate(f, 1): + line = raw.rstrip("\n") + if not line: + continue + fields = line.split(";") + if len(fields) < 3: + raise ValueError(f"{path}:{lineno}: expected at least 3 fields, got {len(fields)}") + cp = int(fields[0], 16) + name = fields[1] + cat = fields[2] + if pending_first is not None: + first_cp, first_cat, first_name = pending_first + if not name.endswith(", Last>"): + raise ValueError( + f"{path}:{lineno}: expected `, Last>` to close range " + f"opened at U+{first_cp:04X}, got name {name!r}" + ) + if cat != first_cat: + raise ValueError( + f"{path}:{lineno}: range U+{first_cp:04X}..=U+{cp:04X} " + f"has mismatched categories {first_cat!r} / {cat!r}" + ) + for x in range(first_cp, cp + 1): + cats[x] = cat + names[x] = first_name + pending_first = None + elif name.endswith(", First>"): + pending_first = (cp, cat, _normalize_name(name)) + else: + cats[cp] = cat + names[cp] = _normalize_name(name) + if pending_first is not None: + raise ValueError(f"{path}: dangling `, First>` entry at U+{pending_first[0]:04X}") + return cats, names + + +ASSIGNED_OTHER_CATS = frozenset({"Cc", "Cf", "Cs", "Co"}) + + +def coalesce_ranges(cats, names, target_cats, *, label): + """Walk U+0000..=U+10FFFF and return a list of `(start, end, label)` for + every contiguous run of codepoints whose general category is in + `target_cats`. Codepoints absent from `cats` are treated as `Cn`. + + If `label` is `True`, attach a comment summarising the codepoint names in + each range; otherwise every range gets an empty label. + """ + ranges = [] + start = None + for cp in range(MAX_CODEPOINT + 1): + in_target = cats.get(cp, "Cn") in target_cats + if in_target and start is None: + start = cp + elif not in_target and start is not None: + ranges.append((start, cp - 1)) + start = None + if start is not None: + ranges.append((start, MAX_CODEPOINT)) + + if not label: + return [(s, e, "") for s, e in ranges] + + labelled = [] + for s, e in ranges: + range_names = [] + range_cats = set() + for cp in range(s, e + 1): + range_cats.add(cats.get(cp, "Cn")) + n = names.get(cp) + if n is not None: + range_names.append(n) + labelled.append((s, e, _make_label(range_names, range_cats))) + return labelled + + +def _common_word_run(names, *, from_end): + """Return the longest sequence of words shared by every name, taken from + either the start (`from_end=False`) or the end (`from_end=True`) of each + name's whitespace-split tokens. + """ + if not names: + return "" + tokenised = [n.split() for n in names] + if from_end: + tokenised = [list(reversed(t)) for t in tokenised] + limit = min(len(t) for t in tokenised) + common = [] + for i in range(limit): + token = tokenised[0][i] + if all(t[i] == token for t in tokenised): + common.append(token) + else: + break + if from_end: + common.reverse() + return " ".join(common) + + +def _make_label(names, cats_in_range): + """Build a short human-readable label for a coalesced range. Applied to + the assigned-Other buckets only; each range there is `Cc`, `Cf`, `Cs`, + `Co`, or some contiguous union thereof. + + Rules, in order: + 1. All names identical → that name (e.g. `control`). + 2. Common leading or trailing words → the longer of the two. + 3. Otherwise, list the categories present (e.g. `Co / Cs`). + """ + unique = list(dict.fromkeys(names)) + if len(unique) == 1: + return unique[0] + + prefix = _common_word_run(names, from_end=False) + suffix = _common_word_run(names, from_end=True) + # Pick whichever is more informative; when both are non-empty, prefer the + # longer one. A multi-word prefix beats a single-word suffix. + label = prefix if len(prefix) >= len(suffix) else suffix + if label: + return label + return " / ".join(sorted(cats_in_range)) + + +def fmt_codepoint(cp): + # `UnicodeData.txt` uses 4-digit hex for the BMP and wider for higher + # planes; mirror that so the output stays readable next to the source data. + return f"0x{cp:04X}" if cp <= 0xFFFF else f"0x{cp:X}" + + +def _pattern(start, end): + if start == end: + return fmt_codepoint(start) + return f"{fmt_codepoint(start)}..={fmt_codepoint(end)}" + + +def _emit_matches_body(lines, arms): + """Append a `matches!(c as u32, ...)` body to `lines`, with one + `(pattern, label)` tuple per arm. The first arm sits at the `matches!` + argument indent and continuation `| ...` arms indent one level deeper, + matching the rustfmt convention used elsewhere in the tree. + """ + lines.append("\tmatches!(") + lines.append("\t\tc as u32,") + for i, (pattern, label) in enumerate(arms): + prefix = "\t\t" if i == 0 else "\t\t\t| " + comment = f" // {label}" if label else "" + lines.append(f"{prefix}{pattern}{comment}") + lines.append("\t)") + + +def render_rust(other_ranges, unassigned_ranges): + """Render the final Rust source defining both `char`-taking predicates. + + `other_ranges` and `unassigned_ranges` are lists of `(start, end, label)`. + The unassigned function additionally gets a synthetic final arm catching + `u32` values above U+10FFFF — these aren't valid Unicode codepoints, so + by definition they have no general category and the unassigned bucket is + the closest match. + """ + lines = [LICENSE_HEADER, GENERATED_NOTICE] + + lines.append("/// Returns `true` if `c` is in Unicode general category `Cc` (Control), `Cf`") + lines.append("/// (Format), `Cs` (Surrogate), or `Co` (Private Use) — the assigned codepoints") + lines.append("/// in the top-level `C` (\"Other\") category. The `Cs` portion of the table is") + lines.append("/// unreachable for `char` input (a `char` cannot hold a surrogate) but is kept") + lines.append("/// so the table mirrors the source UCD data verbatim. The disjoint `Cn`") + lines.append("/// (Unassigned) bucket is `is_unicode_general_category_unassigned`.") + lines.append("#[allow(dead_code)]") + lines.append("pub(crate) fn is_unicode_general_category_other(c: char) -> bool {") + other_arms = [(_pattern(s, e), label) for s, e, label in other_ranges] + _emit_matches_body(lines, other_arms) + lines.append("}") + lines.append("") + + lines.append("/// Returns `true` if `c` is in Unicode general category `Cn` (Unassigned), or") + lines.append("/// strictly above U+10FFFF. The trailing `0x110000..=u32::MAX` arm is") + lines.append("/// unreachable for `char` input (a `char` is bounded to U+10FFFF) but is kept") + lines.append("/// for defensive coverage of the underlying `u32`. The disjoint Cc / Cf / Cs /") + lines.append("/// Co bucket is `is_unicode_general_category_other`.") + lines.append("#[allow(dead_code)]") + lines.append("pub(crate) fn is_unicode_general_category_unassigned(c: char) -> bool {") + unassigned_arms = [(_pattern(s, e), label) for s, e, label in unassigned_ranges] + unassigned_arms.append(("0x110000..=u32::MAX", "above U+10FFFF — unreachable for `char`")) + _emit_matches_body(lines, unassigned_arms) + lines.append("}") + lines.append("") + + return "\n".join(lines) + + +def main(argv): + ap = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + ap.add_argument("unicode_data", type=Path, help="Path to UnicodeData.txt") + ap.add_argument( + "-o", "--output", type=Path, default=None, + help="Output Rust file (default: stdout)", + ) + args = ap.parse_args(argv) + + cats, names = parse_categories(args.unicode_data) + other = coalesce_ranges(cats, names, ASSIGNED_OTHER_CATS, label=True) + unassigned = coalesce_ranges(cats, names, frozenset({"Cn"}), label=False) + rust = render_rust(other, unassigned) + + if args.output is None: + sys.stdout.write(rust) + else: + args.output.write_text(rust) + print( + f"Wrote {args.output} " + f"({len(other)} assigned-Other ranges, " + f"{len(unassigned)} unassigned ranges).", + file=sys.stderr, + ) + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 31e519b9f57..cdf1b2e5aa3 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1230,12 +1230,20 @@ where } if let Some(liquidity_manager) = liquidity_manager.as_ref() { - log_trace!(logger, "Persisting LiquidityManager..."); let fut = async { - liquidity_manager.get_lm().persist().await.map_err(|e| { - log_error!(logger, "Persisting LiquidityManager failed: {}", e); - e - }) + liquidity_manager + .get_lm() + .persist() + .await + .map(|did_persist| { + if did_persist { + log_trace!(logger, "Persisted LiquidityManager."); + } + }) + .map_err(|e| { + log_error!(logger, "Persisting LiquidityManager failed: {}", e); + e + }) }; futures.set_e(Box::pin(fut)); } diff --git a/lightning-custom-message/src/lib.rs b/lightning-custom-message/src/lib.rs index 32d5a9e4389..0d70ba06385 100644 --- a/lightning-custom-message/src/lib.rs +++ b/lightning-custom-message/src/lib.rs @@ -312,13 +312,25 @@ macro_rules! composite_custom_message_handler { } fn peer_connected(&self, their_node_id: $crate::bitcoin::secp256k1::PublicKey, msg: &$crate::lightning::ln::msgs::Init, inbound: bool) -> Result<(), ()> { - let mut result = Ok(()); + // Per the `CustomMessageHandler::peer_connected` contract, `peer_disconnected` + // will not be called by `PeerManager` if we return `Err`. To avoid leaking + // per-peer state in sub-handlers that already returned `Ok` when a later one + // errors, record each sub-handler's result and roll back the successful ones + // ourselves before propagating the failure. $( - if let Err(e) = self.$field.peer_connected(their_node_id, msg, inbound) { - result = Err(e); - } + let $field = self.$field.peer_connected(their_node_id, msg, inbound); )* - result + let any_err = false $( || $field.is_err() )*; + if any_err { + $( + if $field.is_ok() { + self.$field.peer_disconnected(their_node_id); + } + )* + Err(()) + } else { + Ok(()) + } } fn provided_node_features(&self) -> $crate::lightning::types::features::NodeFeatures { @@ -376,3 +388,143 @@ macro_rules! composite_custom_message_handler { } } } + +#[cfg(test)] +mod tests { + use bitcoin::secp256k1::PublicKey; + use core::sync::atomic::{AtomicUsize, Ordering}; + use lightning::io; + use lightning::ln::msgs::{DecodeError, Init, LightningError}; + use lightning::ln::peer_handler::CustomMessageHandler; + use lightning::ln::wire::{CustomMessageReader, Type}; + use lightning::types::features::{InitFeatures, NodeFeatures}; + use lightning::util::ser::{LengthLimitedRead, Writeable, Writer}; + + #[derive(Debug)] + pub struct Foo; + impl Type for Foo { + fn type_id(&self) -> u16 { + 32768 + } + } + impl Writeable for Foo { + fn write(&self, _: &mut W) -> Result<(), io::Error> { + Ok(()) + } + } + + pub struct CountingHandler { + pub connect_count: AtomicUsize, + } + impl CustomMessageReader for CountingHandler { + type CustomMessage = Foo; + fn read( + &self, _t: u16, _b: &mut R, + ) -> Result, DecodeError> { + Ok(None) + } + } + impl CustomMessageHandler for CountingHandler { + fn handle_custom_message(&self, _msg: Foo, _: PublicKey) -> Result<(), LightningError> { + Ok(()) + } + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Foo)> { + vec![] + } + fn peer_disconnected(&self, _: PublicKey) { + self.connect_count.fetch_sub(1, Ordering::SeqCst); + } + fn peer_connected(&self, _: PublicKey, _: &Init, _: bool) -> Result<(), ()> { + self.connect_count.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + fn provided_node_features(&self) -> NodeFeatures { + NodeFeatures::empty() + } + fn provided_init_features(&self, _: PublicKey) -> InitFeatures { + InitFeatures::empty() + } + } + + #[derive(Debug)] + pub struct Bar; + impl Type for Bar { + fn type_id(&self) -> u16 { + 32769 + } + } + impl Writeable for Bar { + fn write(&self, _: &mut W) -> Result<(), io::Error> { + Ok(()) + } + } + + pub struct ErroringHandler; + impl CustomMessageReader for ErroringHandler { + type CustomMessage = Bar; + fn read( + &self, _t: u16, _b: &mut R, + ) -> Result, DecodeError> { + Ok(None) + } + } + impl CustomMessageHandler for ErroringHandler { + fn handle_custom_message(&self, _msg: Bar, _: PublicKey) -> Result<(), LightningError> { + Ok(()) + } + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Bar)> { + vec![] + } + fn peer_disconnected(&self, _: PublicKey) { + debug_assert!(false); + } + fn peer_connected(&self, _: PublicKey, _: &Init, _: bool) -> Result<(), ()> { + Err(()) + } + fn provided_node_features(&self) -> NodeFeatures { + NodeFeatures::empty() + } + fn provided_init_features(&self, _: PublicKey) -> InitFeatures { + InitFeatures::empty() + } + } + + composite_custom_message_handler!( + pub struct CompositeHandler { + counting: CountingHandler, + erroring: ErroringHandler, + } + + pub enum CompositeMessage { + Foo(32768), + Bar(32769), + } + ); + + #[test] + fn peer_connected_failure_does_not_leak_subhandler_state() { + let composite = CompositeHandler { + counting: CountingHandler { connect_count: AtomicUsize::new(0) }, + erroring: ErroringHandler, + }; + let pk_bytes = [ + 0x02, 0x79, 0xBE, 0x66, 0x7E, 0xF9, 0xDC, 0xBB, 0xAC, 0x55, 0xA0, 0x62, 0x95, 0xCE, + 0x87, 0x0B, 0x07, 0x02, 0x9B, 0xFC, 0xDB, 0x2D, 0xCE, 0x28, 0xD9, 0x59, 0xF2, 0x81, + 0x5B, 0x16, 0xF8, 0x17, 0x98, + ]; + let pk = PublicKey::from_slice(&pk_bytes).unwrap(); + let init = + Init { features: InitFeatures::empty(), networks: None, remote_network_address: None }; + + let result = composite.peer_connected(pk, &init, true); + assert!(result.is_err(), "Composite must propagate the inner Err"); + + let leaked = composite.counting.connect_count.load(Ordering::SeqCst); + assert_eq!( + leaked, 0, + "CountingHandler tracked {leaked} connected peer(s) after the composite \ + returned Err; this state will never be cleaned up because per the trait \ + contract peer_disconnected won't be called when peer_connected returns Err.", + ); + } +} diff --git a/lightning-dns-resolver/src/lib.rs b/lightning-dns-resolver/src/lib.rs index f5b1d53fc8a..fbafeb76860 100644 --- a/lightning-dns-resolver/src/lib.rs +++ b/lightning-dns-resolver/src/lib.rs @@ -135,8 +135,8 @@ where let contents = DNSResolverMessage::DNSSECProof(DNSSECProof { name: q.0, proof }); let instructions = responder.respond().into_instructions(); us.pending_replies.lock().unwrap().push((contents, instructions)); - us.pending_query_count.fetch_sub(1, Ordering::Relaxed); } + us.pending_query_count.fetch_sub(1, Ordering::Relaxed); }); None } @@ -518,4 +518,94 @@ mod test { ) .await; } + + #[tokio::test] + async fn failed_query_does_not_leak_pending_counter() { + use std::sync::atomic::Ordering; + + let secp_ctx = Secp256k1::new(); + + // Resolver points at a port that should refuse TCP, so build_txt_proof_async + // returns Err quickly. + let resolver_keys = Arc::new(KeysManager::new(&[99; 32], 42, 43, true)); + let resolver_logger = TestLogger { node: "resolver" }; + let resolver = + Arc::new(OMDomainResolver::::ignoring_incoming_proofs( + "127.0.0.1:1".parse().unwrap(), + )); + let resolver_state = Arc::clone(&resolver.state); + let resolver_messenger = OnionMessenger::new( + Arc::clone(&resolver_keys), + Arc::clone(&resolver_keys), + resolver_logger, + DummyNodeLookup {}, + DirectlyConnectedRouter {}, + IgnoringMessageHandler {}, + IgnoringMessageHandler {}, + Arc::clone(&resolver), + IgnoringMessageHandler {}, + ); + let resolver_id = resolver_keys.get_node_id(Recipient::Node).unwrap(); + + let resolver_dest = Destination::Node(resolver_id); + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + let payment_id = PaymentId([42; 32]); + let name = HumanReadableName::from_encoded("matt@mattcorallo.com").unwrap(); + + let payer_keys = Arc::new(KeysManager::new(&[2; 32], 42, 43, true)); + let payer_logger = TestLogger { node: "payer" }; + let payer_id = payer_keys.get_node_id(Recipient::Node).unwrap(); + let payer = Arc::new(URIResolver { + resolved_uri: Mutex::new(None), + resolver: OMNameResolver::new(now as u32, 1), + pending_messages: Mutex::new(Vec::new()), + }); + let payer_messenger = Arc::new(OnionMessenger::new( + Arc::clone(&payer_keys), + Arc::clone(&payer_keys), + payer_logger, + DummyNodeLookup {}, + DirectlyConnectedRouter {}, + IgnoringMessageHandler {}, + IgnoringMessageHandler {}, + Arc::clone(&payer), + IgnoringMessageHandler {}, + )); + + let init_msg = get_om_init(); + payer_messenger.peer_connected(resolver_id, &init_msg, true).unwrap(); + resolver_messenger.peer_connected(payer_id, &init_msg, false).unwrap(); + + let (msg, context) = + payer.resolver.resolve_name(payment_id, name.clone(), &*payer_keys).unwrap(); + let query_context = MessageContext::DNSResolver(context); + let receive_key = payer_keys.get_receive_auth_key(); + let reply_path = BlindedMessagePath::one_hop( + payer_id, + receive_key, + query_context, + &*payer_keys, + &secp_ctx, + ); + payer.pending_messages.lock().unwrap().push(( + DNSResolverMessage::DNSSECQuery(msg), + MessageSendInstructions::WithSpecifiedReplyPath { + destination: resolver_dest, + reply_path, + }, + )); + + let query = payer_messenger.next_onion_message_for_peer(resolver_id).unwrap(); + resolver_messenger.handle_onion_message(payer_id, &query); + + let start = Instant::now(); + while resolver_state.pending_query_count.load(Ordering::Relaxed) != 0 { + tokio::time::sleep(Duration::from_millis(50)).await; + assert!( + start.elapsed() < Duration::from_secs(10), + "pending_query_count not decremented after failed proof: counter leaks" + ); + } + } } diff --git a/lightning-liquidity/src/events/event_queue.rs b/lightning-liquidity/src/events/event_queue.rs index cd1162cee31..0d6e3a0ec54 100644 --- a/lightning-liquidity/src/events/event_queue.rs +++ b/lightning-liquidity/src/events/event_queue.rs @@ -129,12 +129,12 @@ where EventQueueNotifierGuard(self) } - pub async fn persist(&self) -> Result<(), lightning::io::Error> { + pub async fn persist(&self) -> Result { let fut = { let mut state_lock = self.state.lock().unwrap(); if !state_lock.needs_persist { - return Ok(()); + return Ok(false); } state_lock.needs_persist = false; @@ -153,7 +153,7 @@ where e })?; - Ok(()) + Ok(true) } } diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index dda9922686d..0cc0a4afd2d 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -42,7 +42,7 @@ use crate::utils::async_poll::dummy_waker; use lightning::chain::chaininterface::BroadcasterInterface; use lightning::events::HTLCHandlingFailureType; -use lightning::ln::channelmanager::{AChannelManager, FailureCode, InterceptId}; +use lightning::ln::channelmanager::{AChannelManager, InterceptId}; use lightning::ln::msgs::{ErrorAction, LightningError}; use lightning::ln::types::ChannelId; use lightning::util::errors::APIError; @@ -1375,10 +1375,8 @@ where { let intercepted_htlcs = payment_queue.clear(); for htlc in intercepted_htlcs { - self.channel_manager.get_cm().fail_htlc_backwards_with_reason( - &htlc.payment_hash, - FailureCode::TemporaryNodeFailure, - ); + // A missing intercept has already been released; still reset this LSPS2 state. + let _ = self.channel_manager.get_cm().fail_intercepted_htlc(htlc.intercept_id); } jit_channel.state = OutboundJITChannelState::PendingInitialPayment { @@ -1782,15 +1780,16 @@ where }) } - pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> { + pub(crate) async fn persist(&self) -> Result { // TODO: We should eventually persist in parallel, however, when we do, we probably want to // introduce some batching to upper-bound the number of requests inflight at any given // time. + let mut did_persist = false; if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { // If we're not the first event processor to get here, just return early, the increment // we just did will be treated as "go around again" at the end. - return Ok(()); + return Ok(did_persist); } loop { @@ -1816,6 +1815,7 @@ where for counterparty_node_id in need_persist.into_iter() { debug_assert!(!need_remove.contains(&counterparty_node_id)); self.persist_peer_state(counterparty_node_id).await?; + did_persist = true; } for counterparty_node_id in need_remove { @@ -1850,6 +1850,7 @@ where } if let Some(future) = future_opt { future.await?; + did_persist = true; } else { self.persist_peer_state(counterparty_node_id).await?; } @@ -1864,7 +1865,7 @@ where break; } - Ok(()) + Ok(did_persist) } pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) { diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 8b1f0ec70cb..e19477c187c 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -244,7 +244,7 @@ where }) } - pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> { + pub(crate) async fn persist(&self) -> Result { // TODO: We should eventually persist in parallel, however, when we do, we probably want to // introduce some batching to upper-bound the number of requests inflight at any given // time. @@ -252,80 +252,97 @@ where if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { // If we're not the first event processor to get here, just return early, the increment // we just did will be treated as "go around again" at the end. - return Ok(()); + return Ok(false); } + let mut did_persist = false; + loop { - let mut need_remove = Vec::new(); - let mut need_persist = Vec::new(); + match self.do_persist().await { + Ok(pass_did_persist) => did_persist |= pass_did_persist, + Err(e) => { + self.persistence_in_flight.store(0, Ordering::Release); + return Err(e); + }, + } - self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); - { - let outer_state_lock = self.per_peer_state.read().unwrap(); - - for (client_id, peer_state) in outer_state_lock.iter() { - let is_prunable = peer_state.is_prunable(); - let has_open_channel = self.client_has_open_channel(client_id); - if is_prunable && !has_open_channel { - need_remove.push(*client_id); - } else if peer_state.needs_persist { - need_persist.push(*client_id); - } - } + if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.persistence_in_flight.store(1, Ordering::Release); + continue; } + break; + } - for client_id in need_persist.into_iter() { - debug_assert!(!need_remove.contains(&client_id)); - self.persist_peer_state(client_id).await?; + Ok(did_persist) + } + + async fn do_persist(&self) -> Result { + let mut did_persist = false; + let mut need_remove = Vec::new(); + let mut need_persist = Vec::new(); + + self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); + { + let outer_state_lock = self.per_peer_state.read().unwrap(); + + for (client_id, peer_state) in outer_state_lock.iter() { + let is_prunable = peer_state.is_prunable(); + let has_open_channel = self.client_has_open_channel(client_id); + if is_prunable && !has_open_channel { + need_remove.push(*client_id); + } else if peer_state.needs_persist { + need_persist.push(*client_id); + } } + } + + for client_id in need_persist.into_iter() { + debug_assert!(!need_remove.contains(&client_id)); + self.persist_peer_state(client_id).await?; + did_persist = true; + } - for client_id in need_remove { - let mut future_opt = None; - { - // We need to take the `per_peer_state` write lock to remove an entry, but also - // have to hold it until after the `remove` call returns (but not through - // future completion) to ensure that writes for the peer's state are - // well-ordered with other `persist_peer_state` calls even across the removal - // itself. - let mut per_peer_state = self.per_peer_state.write().unwrap(); - if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { - let state = entry.get_mut(); - if state.is_prunable() && !self.client_has_open_channel(&client_id) { - entry.remove(); - let key = client_id.to_string(); - future_opt = Some(self.kv_store.remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - true, - )); - } else { - // If the peer was re-added, force a re-persist of the current state. - state.needs_persist = true; - } + for client_id in need_remove { + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { + let state = entry.get_mut(); + if state.is_prunable() && !self.client_has_open_channel(&client_id) { + entry.remove(); + let key = client_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + true, + )); } else { - // This should never happen, we can only have one `persist` call - // in-progress at once and map entries are only removed by it. - debug_assert!(false); + // If the peer was re-added, force a re-persist of the current state. + state.needs_persist = true; } - } - if let Some(future) = future_opt { - future.await?; } else { - self.persist_peer_state(client_id).await?; + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); } } - - if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { - // If another thread incremented the state while we were running we should go - // around again, but only once. - self.persistence_in_flight.store(1, Ordering::Release); - continue; + if let Some(future) = future_opt { + future.await?; + did_persist = true; + } else { + self.persist_peer_state(client_id).await?; } - break; } - Ok(()) + Ok(did_persist) } fn check_prune_stale_webhooks<'a>( diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 5d95d32d540..de84ee20897 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -670,23 +670,27 @@ where self.pending_events.get_and_clear_pending_events() } - /// Persists the state of the service handlers towards the given [`KVStore`] implementation. + /// Persists the state of the service handlers towards the given [`KVStore`] implementation if + /// needed. + /// + /// Returns `true` if it persisted sevice handler data. /// /// This will be regularly called by LDK's background processor if necessary and only needs to /// be called manually if it's not utilized. - pub async fn persist(&self) -> Result<(), lightning::io::Error> { + pub async fn persist(&self) -> Result { // TODO: We should eventually persist in parallel. - self.pending_events.persist().await?; + let mut did_persist = false; + did_persist |= self.pending_events.persist().await?; if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() { - lsps2_service_handler.persist().await?; + did_persist |= lsps2_service_handler.persist().await?; } if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() { - lsps5_service_handler.persist().await?; + did_persist |= lsps5_service_handler.persist().await?; } - Ok(()) + Ok(did_persist) } fn handle_lsps_message( @@ -1285,8 +1289,10 @@ where /// Persists the state of the service handlers towards the given [`KVStoreSync`] implementation. /// + /// Returns `true` if it persisted sevice handler data. + /// /// Wraps [`LiquidityManager::persist`]. - pub fn persist(&self) -> Result<(), lightning::io::Error> { + pub fn persist(&self) -> Result { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) { diff --git a/lightning-liquidity/tests/lsps2_integration_tests.rs b/lightning-liquidity/tests/lsps2_integration_tests.rs index 82f93b5990c..9c42ee6908c 100644 --- a/lightning-liquidity/tests/lsps2_integration_tests.rs +++ b/lightning-liquidity/tests/lsps2_integration_tests.rs @@ -8,7 +8,7 @@ use common::{ }; use lightning::check_added_monitors; -use lightning::events::{ClosureReason, Event}; +use lightning::events::{ClosureReason, Event, HTLCHandlingFailureType}; use lightning::get_event_msg; use lightning::ln::channelmanager::PaymentId; use lightning::ln::channelmanager::Retry; @@ -466,6 +466,126 @@ fn channel_open_failed() { }; } +#[test] +fn channel_open_failed_releases_intercepted_htlcs() { + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let mut service_node_config = test_default_channel_config(); + service_node_config.accept_intercept_htlcs = true; + + let mut client_node_config = test_default_channel_config(); + client_node_config.channel_config.accept_underpaying_htlcs = true; + + let node_chanmgrs = create_node_chanmgrs( + 3, + &node_cfgs, + &[Some(service_node_config), Some(client_node_config), None], + ); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + let (lsps_nodes, promise_secret) = setup_test_lsps2_nodes_with_payer(nodes); + let LSPSNodesWithPayer { ref service_node, ref client_node, ref payer_node } = lsps_nodes; + + let payer_node_id = payer_node.node.get_our_node_id(); + let service_node_id = service_node.inner.node.get_our_node_id(); + let client_node_id = client_node.inner.node.get_our_node_id(); + + let service_handler = service_node.liquidity_manager.lsps2_service_handler().unwrap(); + create_chan_between_nodes_with_value(&payer_node, &service_node.inner, 2_000_000, 100_000); + + let intercept_scid = service_node.node.get_intercept_scid(); + let user_channel_id = 42u128; + let cltv_expiry_delta: u32 = 144; + let payment_size_msat = Some(1_000_000); + let fee_base_msat: u64 = 1_000; + + execute_lsps2_dance( + &lsps_nodes, + intercept_scid, + user_channel_id, + cltv_expiry_delta, + promise_secret, + payment_size_msat, + fee_base_msat, + ); + + let invoice = create_jit_invoice( + &client_node, + service_node_id, + intercept_scid, + cltv_expiry_delta, + payment_size_msat, + "channel-open-failed-cleanup", + 3600, + ) + .unwrap(); + + payer_node + .node + .pay_for_bolt11_invoice( + &invoice, + PaymentId(invoice.payment_hash().to_byte_array()), + None, + Default::default(), + Retry::Attempts(0), + ) + .unwrap(); + + check_added_monitors!(payer_node, 1); + let events = payer_node.node.get_and_clear_pending_msg_events(); + let ev = SendEvent::from_event(events[0].clone()); + service_node.inner.node.handle_update_add_htlc(payer_node_id, &ev.msgs[0]); + do_commitment_signed_dance(&service_node.inner, &payer_node, &ev.commitment_msg, false, true); + service_node.inner.node.process_pending_htlc_forwards(); + + let events = service_node.inner.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + let intercept_id = match &events[0] { + Event::HTLCIntercepted { + intercept_id, + requested_next_hop_scid, + payment_hash, + expected_outbound_amount_msat, + .. + } => { + assert_eq!(*requested_next_hop_scid, intercept_scid); + service_handler + .htlc_intercepted( + *requested_next_hop_scid, + *intercept_id, + *expected_outbound_amount_msat, + *payment_hash, + ) + .unwrap(); + *intercept_id + }, + other => panic!("Expected HTLCIntercepted, got {:?}", other), + }; + + match service_node.liquidity_manager.next_event().unwrap() { + LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::OpenChannel { .. }) => {}, + other => panic!("Unexpected event: {:?}", other), + }; + + service_handler.channel_open_failed(&client_node_id, user_channel_id).unwrap(); + + let res = service_node.inner.node.fail_intercepted_htlc(intercept_id); + assert!( + res.is_err(), + "channel_open_failed must release the intercepted HTLC via fail_intercepted_htlc, but the entry is still pending: {:?}", + res, + ); + + let events = service_node.inner.node.get_and_clear_pending_events(); + assert_eq!(events.len(), 1); + match &events[0] { + Event::HTLCHandlingFailed { + failure_type: HTLCHandlingFailureType::InvalidForward { requested_forward_scid }, + .. + } => assert_eq!(*requested_forward_scid, intercept_scid), + other => panic!("Expected HTLCHandlingFailed, got {:?}", other), + } +} + #[test] fn channel_open_failed_nonexistent_channel() { let chanmon_cfgs = create_chanmon_cfgs(2); diff --git a/lightning-liquidity/tests/lsps5_integration_tests.rs b/lightning-liquidity/tests/lsps5_integration_tests.rs index 80707a60774..f144f8b1231 100644 --- a/lightning-liquidity/tests/lsps5_integration_tests.rs +++ b/lightning-liquidity/tests/lsps5_integration_tests.rs @@ -1648,3 +1648,183 @@ fn lsps5_service_handler_persistence_across_restarts() { } } } + +struct FailableKVStore { + inner: TestStore, + fail_lsps5: std::sync::atomic::AtomicBool, +} + +impl FailableKVStore { + fn new() -> Self { + Self { inner: TestStore::new(false), fail_lsps5: std::sync::atomic::AtomicBool::new(false) } + } + + fn set_fail_lsps5(&self, fail: bool) { + self.fail_lsps5.store(fail, std::sync::atomic::Ordering::SeqCst); + } +} + +impl lightning::util::persist::KVStoreSync for FailableKVStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> lightning::io::Result> { + ::read( + &self.inner, + primary_namespace, + secondary_namespace, + key, + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> lightning::io::Result<()> { + if secondary_namespace == "lsps5_service" + && self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst) + { + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::Other, + "intentional failure for lsps5 namespace", + )); + } + ::write( + &self.inner, + primary_namespace, + secondary_namespace, + key, + buf, + ) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> lightning::io::Result<()> { + if secondary_namespace == "lsps5_service" + && self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst) + { + return Err(lightning::io::Error::new( + lightning::io::ErrorKind::Other, + "intentional failure for lsps5 namespace", + )); + } + ::remove( + &self.inner, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> lightning::io::Result> { + ::list( + &self.inner, + primary_namespace, + secondary_namespace, + ) + } +} + +#[test] +fn lsps5_service_persist_resets_in_flight_counter_on_io_error() { + use lightning::ln::peer_handler::CustomMessageHandler; + + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let service_kv_store = Arc::new(FailableKVStore::new()); + let client_kv_store = Arc::new(TestStore::new(false)); + + let service_config = LiquidityServiceConfig { + #[cfg(lsps1_service)] + lsps1_service_config: None, + lsps2_service_config: None, + lsps5_service_config: Some(LSPS5ServiceConfig::default()), + advertise_service: true, + }; + let client_config = LiquidityClientConfig { + lsps1_client_config: None, + lsps2_client_config: None, + lsps5_client_config: Some(LSPS5ClientConfig::default()), + }; + let time_provider: Arc = Arc::new(DefaultTimeProvider); + + let chain_params = ChainParameters { + network: Network::Testnet, + best_block: BestBlock::from_network(Network::Testnet), + }; + + let service_lm = LiquidityManagerSync::new_with_custom_time_provider( + nodes[0].keys_manager, + nodes[0].keys_manager, + nodes[0].node, + None::>, + Some(chain_params), + Arc::clone(&service_kv_store), + nodes[0].tx_broadcaster, + Some(service_config), + None, + Arc::clone(&time_provider), + ) + .unwrap(); + + let client_lm = LiquidityManagerSync::new_with_custom_time_provider( + nodes[1].keys_manager, + nodes[1].keys_manager, + nodes[1].node, + None::>, + Some(chain_params), + client_kv_store, + nodes[1].tx_broadcaster, + None, + Some(client_config), + Arc::clone(&time_provider), + ) + .unwrap(); + + let service_node_id = nodes[0].node.get_our_node_id(); + let client_node_id = nodes[1].node.get_our_node_id(); + + create_chan_between_nodes(&nodes[0], &nodes[1]); + + let client_handler = client_lm.lsps5_client_handler().unwrap(); + client_handler + .set_webhook(service_node_id, "App".to_string(), "https://example.org/hook".to_string()) + .unwrap(); + + let req_msgs = client_lm.get_and_clear_pending_msg(); + assert_eq!(req_msgs.len(), 1); + let (_, request) = req_msgs.into_iter().next().unwrap(); + service_lm.handle_custom_message(request, client_node_id).unwrap(); + + // Consume the SendWebhookNotification event so pending events queue is drained. + let _ = service_lm.next_event(); + let _ = service_lm.get_and_clear_pending_msg(); + + // Initial persist should succeed and clear all needs_persist flags. + service_lm.persist().expect("initial persist should succeed"); + + // Now arrange for lsps5 writes to fail and dirty lsps5 state without dirtying + // pending_events (which lives in a different namespace). + service_kv_store.set_fail_lsps5(true); + service_lm.peer_disconnected(client_node_id); + + // First persist attempt should error out due to the failing kv_store. + let res1 = service_lm.persist(); + assert!(res1.is_err(), "persist should fail when lsps5 kv_store write fails"); + + // Second persist attempt must still attempt the write (and fail again). With the + // bug, the LSPS5 service handler's `persistence_in_flight` counter is left above + // zero on error so this returns Ok(false) immediately, silently dropping the + // pending state and breaking persistence forever. + let res2 = service_lm.persist(); + assert!( + res2.is_err(), + "after a failed persist, subsequent persist calls must still attempt to persist; got {:?}", + res2, + ); +} diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 9b15398d4d1..05530b8fcf0 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -554,20 +554,25 @@ impl KVStore for FilesystemStore { } } +fn dir_entry_is_store_artifact(path: &Path) -> bool { + match path.extension().and_then(|ext| ext.to_str()) { + Some("tmp") => true, + Some("trash") => { + #[cfg(target_os = "windows")] + { + // Clean up any trash files lying around. + fs::remove_file(path).ok(); + } + true + }, + _ => false, + } +} + fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { let p = dir_entry.path(); - if let Some(ext) = p.extension() { - #[cfg(target_os = "windows")] - { - // Clean up any trash files lying around. - if ext == "trash" { - fs::remove_file(p).ok(); - return Ok(false); - } - } - if ext == "tmp" { - return Ok(false); - } + if dir_entry_is_store_artifact(&p) { + return Ok(false); } let metadata = dir_entry.metadata()?; @@ -654,6 +659,9 @@ impl MigratableKVStore for FilesystemStore { 'primary_loop: for primary_entry in fs::read_dir(prefixed_dest)? { let primary_entry = primary_entry?; let primary_path = primary_entry.path(); + if dir_entry_is_store_artifact(&primary_path) { + continue 'primary_loop; + } if dir_entry_is_key(&primary_entry)? { let primary_namespace = String::new(); @@ -667,6 +675,9 @@ impl MigratableKVStore for FilesystemStore { 'secondary_loop: for secondary_entry in fs::read_dir(&primary_path)? { let secondary_entry = secondary_entry?; let secondary_path = secondary_entry.path(); + if dir_entry_is_store_artifact(&secondary_path) { + continue 'secondary_loop; + } if dir_entry_is_key(&secondary_entry)? { let primary_namespace = @@ -681,6 +692,9 @@ impl MigratableKVStore for FilesystemStore { for tertiary_entry in fs::read_dir(&secondary_path)? { let tertiary_entry = tertiary_entry?; let tertiary_path = tertiary_entry.path(); + if dir_entry_is_store_artifact(&tertiary_path) { + continue; + } if dir_entry_is_key(&tertiary_entry)? { let primary_namespace = @@ -806,6 +820,28 @@ mod tests { assert_eq!(listed_keys.len(), 0); } + #[test] + fn list_all_keys_skips_leftover_store_artifacts() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_list_all_keys_skips_leftover_store_artifacts"); + let fs_store = FilesystemStore::new(temp_path.clone()); + KVStoreSync::write(&fs_store, "primary", "secondary", "key", vec![1]).unwrap(); + + fs::write(temp_path.join("top_level.0.tmp"), b"stale").unwrap(); + fs::write(temp_path.join("top_level.0.trash"), b"stale").unwrap(); + + let primary_path = temp_path.join("primary"); + fs::write(primary_path.join("primary_level.0.tmp"), b"stale").unwrap(); + fs::write(primary_path.join("primary_level.0.trash"), b"stale").unwrap(); + + let secondary_path = primary_path.join("secondary"); + fs::write(secondary_path.join("secondary_level.0.tmp"), b"stale").unwrap(); + fs::write(secondary_path.join("secondary_level.0.trash"), b"stale").unwrap(); + + let keys = fs_store.list_all_keys().unwrap(); + assert_eq!(keys, vec![("primary".to_string(), "secondary".to_string(), "key".to_string())]); + } + #[test] fn test_data_migration() { let mut source_temp_path = std::env::temp_dir(); diff --git a/lightning-transaction-sync/src/electrum.rs b/lightning-transaction-sync/src/electrum.rs index 1162b9c00c9..70e4f79a595 100644 --- a/lightning-transaction-sync/src/electrum.rs +++ b/lightning-transaction-sync/src/electrum.rs @@ -335,11 +335,11 @@ where let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid); if let Some(history) = filtered_history.next() { - let prob_conf_height = history.height as u32; - if prob_conf_height <= 0 { + if history.height <= 0 { // Skip if it's a an unconfirmed entry. continue; } + let prob_conf_height = history.height as u32; let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?; confirmed_txs.push(confirmed_tx); } diff --git a/lightning-transaction-sync/src/esplora.rs b/lightning-transaction-sync/src/esplora.rs index a191260bc01..e96110cc164 100644 --- a/lightning-transaction-sync/src/esplora.rs +++ b/lightning-transaction-sync/src/esplora.rs @@ -367,8 +367,13 @@ where let mut matches = Vec::new(); let mut indexes = Vec::new(); - let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes); - if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid { + let computed_merkle_root = + merkle_block.txn.extract_matches(&mut matches, &mut indexes).ok(); + if computed_merkle_root != Some(block_header.merkle_root) + || indexes.len() != 1 + || matches.len() != 1 + || matches[0] != txid + { log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid); return Err(InternalError::Failed); } diff --git a/lightning-types/src/features.rs b/lightning-types/src/features.rs index 8c512401f58..de8c97e6da6 100644 --- a/lightning-types/src/features.rs +++ b/lightning-types/src/features.rs @@ -649,9 +649,17 @@ mod sealed { supports_payment_metadata, requires_payment_metadata ); - define_feature!(51, ZeroConf, [InitContext, NodeContext, ChannelTypeContext], + define_feature!( + 51, + ZeroConf, + [InitContext, NodeContext, ChannelTypeContext], "Feature flags for accepting channels with zero confirmations. Called `option_zeroconf` in the BOLTs", - set_zero_conf_optional, set_zero_conf_required, supports_zero_conf, requires_zero_conf); + set_zero_conf_optional, + set_zero_conf_required, + clear_zero_conf, + supports_zero_conf, + requires_zero_conf + ); define_feature!( 55, Keysend, diff --git a/lightning-types/src/lib.rs b/lightning-types/src/lib.rs index 7f72d6d2671..6a526adaed2 100644 --- a/lightning-types/src/lib.rs +++ b/lightning-types/src/lib.rs @@ -27,3 +27,4 @@ pub mod features; pub mod payment; pub mod routing; pub mod string; +mod unicode; diff --git a/lightning-types/src/string.rs b/lightning-types/src/string.rs index ae5395a5289..a21cad411be 100644 --- a/lightning-types/src/string.rs +++ b/lightning-types/src/string.rs @@ -12,6 +12,8 @@ use alloc::string::String; use core::fmt; +use crate::unicode::*; + /// Struct to `Display` fields in a safe way using `PrintableString` #[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Default)] pub struct UntrustedString(pub String); @@ -31,7 +33,13 @@ impl<'a> fmt::Display for PrintableString<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { use core::fmt::Write; for c in self.0.chars() { - let c = if c.is_control() { core::char::REPLACEMENT_CHARACTER } else { c }; + let is_other = is_unicode_general_category_other(c); + let is_unassigned = is_unicode_general_category_unassigned(c); + let c = if c.is_control() || is_other || is_unassigned { + core::char::REPLACEMENT_CHARACTER + } else { + c + }; f.write_char(c)?; } @@ -50,4 +58,24 @@ mod tests { "I \u{1F496} LDK!\u{FFFD}\u{26A1}", ); } + + #[test] + fn sanitizes_unicode_bidi_override_characters() { + // U+202E RIGHT-TO-LEFT OVERRIDE and friends are Unicode general category + // `Cf` (Format), not `Cc` (Control). They enable "Trojan Source" / + // bidi-spoofing attacks where an attacker-supplied string (e.g. a node + // alias gossiped from a peer) renders to a human reader as something + // other than its byte content. `PrintableString` is the sanitiser used + // for exactly these untrusted strings, so it must replace them. + let rendered = format!("{}", PrintableString("safe\u{202E}cipsxe.exe")); + assert!( + !rendered.contains('\u{202E}'), + "PrintableString left a U+202E RLO override in its output: {:?}", + rendered + ); + + // U+13440 is in the Egyptian Hieroglyph Format Controls block, but its + // general category is `Mn`, not `Cf`, so the `Cf` range ends at U+1343F. + assert_eq!(format!("{}", PrintableString("x\u{1343F}y\u{13440}z")), "x\u{FFFD}y\u{13440}z"); + } } diff --git a/lightning-types/src/unicode.rs b/lightning-types/src/unicode.rs new file mode 100644 index 00000000000..22b21969365 --- /dev/null +++ b/lightning-types/src/unicode.rs @@ -0,0 +1,799 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +// Auto-generated from the Unicode Character Database (UnicodeData.txt) by +// contrib/gen_unicode_general_category.py. Do not edit by hand; rerun the +// generator with an updated UnicodeData.txt to refresh the table. + +/// Returns `true` if `c` is in Unicode general category `Cc` (Control), `Cf` +/// (Format), `Cs` (Surrogate), or `Co` (Private Use) — the assigned codepoints +/// in the top-level `C` ("Other") category. The `Cs` portion of the table is +/// unreachable for `char` input (a `char` cannot hold a surrogate) but is kept +/// so the table mirrors the source UCD data verbatim. The disjoint `Cn` +/// (Unassigned) bucket is `is_unicode_general_category_unassigned`. +#[allow(dead_code)] +pub(crate) fn is_unicode_general_category_other(c: char) -> bool { + matches!( + c as u32, + 0x0000..=0x001F // control + | 0x007F..=0x009F // control + | 0x00AD // SOFT HYPHEN + | 0x0600..=0x0605 // ARABIC + | 0x061C // ARABIC LETTER MARK + | 0x06DD // ARABIC END OF AYAH + | 0x070F // SYRIAC ABBREVIATION MARK + | 0x0890..=0x0891 // MARK ABOVE + | 0x08E2 // ARABIC DISPUTED END OF AYAH + | 0x180E // MONGOLIAN VOWEL SEPARATOR + | 0x200B..=0x200F // Cf + | 0x202A..=0x202E // Cf + | 0x2060..=0x2064 // Cf + | 0x2066..=0x206F // Cf + | 0xD800..=0xF8FF // Co / Cs + | 0xFEFF // ZERO WIDTH NO-BREAK SPACE + | 0xFFF9..=0xFFFB // INTERLINEAR ANNOTATION + | 0x110BD // KAITHI NUMBER SIGN + | 0x110CD // KAITHI NUMBER SIGN ABOVE + | 0x13430..=0x1343F // EGYPTIAN HIEROGLYPH + | 0x1BCA0..=0x1BCA3 // SHORTHAND FORMAT + | 0x1D173..=0x1D17A // MUSICAL SYMBOL + | 0xE0001 // LANGUAGE TAG + | 0xE0020..=0xE007F // Cf + | 0xF0000..=0xFFFFD // Plane 15 Private Use + | 0x100000..=0x10FFFD // Plane 16 Private Use + ) +} + +/// Returns `true` if `c` is in Unicode general category `Cn` (Unassigned), or +/// strictly above U+10FFFF. The trailing `0x110000..=u32::MAX` arm is +/// unreachable for `char` input (a `char` is bounded to U+10FFFF) but is kept +/// for defensive coverage of the underlying `u32`. The disjoint Cc / Cf / Cs / +/// Co bucket is `is_unicode_general_category_other`. +#[allow(dead_code)] +pub(crate) fn is_unicode_general_category_unassigned(c: char) -> bool { + matches!( + c as u32, + 0x0378..=0x0379 + | 0x0380..=0x0383 + | 0x038B + | 0x038D + | 0x03A2 + | 0x0530 + | 0x0557..=0x0558 + | 0x058B..=0x058C + | 0x0590 + | 0x05C8..=0x05CF + | 0x05EB..=0x05EE + | 0x05F5..=0x05FF + | 0x070E + | 0x074B..=0x074C + | 0x07B2..=0x07BF + | 0x07FB..=0x07FC + | 0x082E..=0x082F + | 0x083F + | 0x085C..=0x085D + | 0x085F + | 0x086B..=0x086F + | 0x0892..=0x0896 + | 0x0984 + | 0x098D..=0x098E + | 0x0991..=0x0992 + | 0x09A9 + | 0x09B1 + | 0x09B3..=0x09B5 + | 0x09BA..=0x09BB + | 0x09C5..=0x09C6 + | 0x09C9..=0x09CA + | 0x09CF..=0x09D6 + | 0x09D8..=0x09DB + | 0x09DE + | 0x09E4..=0x09E5 + | 0x09FF..=0x0A00 + | 0x0A04 + | 0x0A0B..=0x0A0E + | 0x0A11..=0x0A12 + | 0x0A29 + | 0x0A31 + | 0x0A34 + | 0x0A37 + | 0x0A3A..=0x0A3B + | 0x0A3D + | 0x0A43..=0x0A46 + | 0x0A49..=0x0A4A + | 0x0A4E..=0x0A50 + | 0x0A52..=0x0A58 + | 0x0A5D + | 0x0A5F..=0x0A65 + | 0x0A77..=0x0A80 + | 0x0A84 + | 0x0A8E + | 0x0A92 + | 0x0AA9 + | 0x0AB1 + | 0x0AB4 + | 0x0ABA..=0x0ABB + | 0x0AC6 + | 0x0ACA + | 0x0ACE..=0x0ACF + | 0x0AD1..=0x0ADF + | 0x0AE4..=0x0AE5 + | 0x0AF2..=0x0AF8 + | 0x0B00 + | 0x0B04 + | 0x0B0D..=0x0B0E + | 0x0B11..=0x0B12 + | 0x0B29 + | 0x0B31 + | 0x0B34 + | 0x0B3A..=0x0B3B + | 0x0B45..=0x0B46 + | 0x0B49..=0x0B4A + | 0x0B4E..=0x0B54 + | 0x0B58..=0x0B5B + | 0x0B5E + | 0x0B64..=0x0B65 + | 0x0B78..=0x0B81 + | 0x0B84 + | 0x0B8B..=0x0B8D + | 0x0B91 + | 0x0B96..=0x0B98 + | 0x0B9B + | 0x0B9D + | 0x0BA0..=0x0BA2 + | 0x0BA5..=0x0BA7 + | 0x0BAB..=0x0BAD + | 0x0BBA..=0x0BBD + | 0x0BC3..=0x0BC5 + | 0x0BC9 + | 0x0BCE..=0x0BCF + | 0x0BD1..=0x0BD6 + | 0x0BD8..=0x0BE5 + | 0x0BFB..=0x0BFF + | 0x0C0D + | 0x0C11 + | 0x0C29 + | 0x0C3A..=0x0C3B + | 0x0C45 + | 0x0C49 + | 0x0C4E..=0x0C54 + | 0x0C57 + | 0x0C5B + | 0x0C5E..=0x0C5F + | 0x0C64..=0x0C65 + | 0x0C70..=0x0C76 + | 0x0C8D + | 0x0C91 + | 0x0CA9 + | 0x0CB4 + | 0x0CBA..=0x0CBB + | 0x0CC5 + | 0x0CC9 + | 0x0CCE..=0x0CD4 + | 0x0CD7..=0x0CDB + | 0x0CDF + | 0x0CE4..=0x0CE5 + | 0x0CF0 + | 0x0CF4..=0x0CFF + | 0x0D0D + | 0x0D11 + | 0x0D45 + | 0x0D49 + | 0x0D50..=0x0D53 + | 0x0D64..=0x0D65 + | 0x0D80 + | 0x0D84 + | 0x0D97..=0x0D99 + | 0x0DB2 + | 0x0DBC + | 0x0DBE..=0x0DBF + | 0x0DC7..=0x0DC9 + | 0x0DCB..=0x0DCE + | 0x0DD5 + | 0x0DD7 + | 0x0DE0..=0x0DE5 + | 0x0DF0..=0x0DF1 + | 0x0DF5..=0x0E00 + | 0x0E3B..=0x0E3E + | 0x0E5C..=0x0E80 + | 0x0E83 + | 0x0E85 + | 0x0E8B + | 0x0EA4 + | 0x0EA6 + | 0x0EBE..=0x0EBF + | 0x0EC5 + | 0x0EC7 + | 0x0ECF + | 0x0EDA..=0x0EDB + | 0x0EE0..=0x0EFF + | 0x0F48 + | 0x0F6D..=0x0F70 + | 0x0F98 + | 0x0FBD + | 0x0FCD + | 0x0FDB..=0x0FFF + | 0x10C6 + | 0x10C8..=0x10CC + | 0x10CE..=0x10CF + | 0x1249 + | 0x124E..=0x124F + | 0x1257 + | 0x1259 + | 0x125E..=0x125F + | 0x1289 + | 0x128E..=0x128F + | 0x12B1 + | 0x12B6..=0x12B7 + | 0x12BF + | 0x12C1 + | 0x12C6..=0x12C7 + | 0x12D7 + | 0x1311 + | 0x1316..=0x1317 + | 0x135B..=0x135C + | 0x137D..=0x137F + | 0x139A..=0x139F + | 0x13F6..=0x13F7 + | 0x13FE..=0x13FF + | 0x169D..=0x169F + | 0x16F9..=0x16FF + | 0x1716..=0x171E + | 0x1737..=0x173F + | 0x1754..=0x175F + | 0x176D + | 0x1771 + | 0x1774..=0x177F + | 0x17DE..=0x17DF + | 0x17EA..=0x17EF + | 0x17FA..=0x17FF + | 0x181A..=0x181F + | 0x1879..=0x187F + | 0x18AB..=0x18AF + | 0x18F6..=0x18FF + | 0x191F + | 0x192C..=0x192F + | 0x193C..=0x193F + | 0x1941..=0x1943 + | 0x196E..=0x196F + | 0x1975..=0x197F + | 0x19AC..=0x19AF + | 0x19CA..=0x19CF + | 0x19DB..=0x19DD + | 0x1A1C..=0x1A1D + | 0x1A5F + | 0x1A7D..=0x1A7E + | 0x1A8A..=0x1A8F + | 0x1A9A..=0x1A9F + | 0x1AAE..=0x1AAF + | 0x1ADE..=0x1ADF + | 0x1AEC..=0x1AFF + | 0x1B4D + | 0x1BF4..=0x1BFB + | 0x1C38..=0x1C3A + | 0x1C4A..=0x1C4C + | 0x1C8B..=0x1C8F + | 0x1CBB..=0x1CBC + | 0x1CC8..=0x1CCF + | 0x1CFB..=0x1CFF + | 0x1F16..=0x1F17 + | 0x1F1E..=0x1F1F + | 0x1F46..=0x1F47 + | 0x1F4E..=0x1F4F + | 0x1F58 + | 0x1F5A + | 0x1F5C + | 0x1F5E + | 0x1F7E..=0x1F7F + | 0x1FB5 + | 0x1FC5 + | 0x1FD4..=0x1FD5 + | 0x1FDC + | 0x1FF0..=0x1FF1 + | 0x1FF5 + | 0x1FFF + | 0x2065 + | 0x2072..=0x2073 + | 0x208F + | 0x209D..=0x209F + | 0x20C2..=0x20CF + | 0x20F1..=0x20FF + | 0x218C..=0x218F + | 0x242A..=0x243F + | 0x244B..=0x245F + | 0x2B74..=0x2B75 + | 0x2CF4..=0x2CF8 + | 0x2D26 + | 0x2D28..=0x2D2C + | 0x2D2E..=0x2D2F + | 0x2D68..=0x2D6E + | 0x2D71..=0x2D7E + | 0x2D97..=0x2D9F + | 0x2DA7 + | 0x2DAF + | 0x2DB7 + | 0x2DBF + | 0x2DC7 + | 0x2DCF + | 0x2DD7 + | 0x2DDF + | 0x2E5E..=0x2E7F + | 0x2E9A + | 0x2EF4..=0x2EFF + | 0x2FD6..=0x2FEF + | 0x3040 + | 0x3097..=0x3098 + | 0x3100..=0x3104 + | 0x3130 + | 0x318F + | 0x31E6..=0x31EE + | 0x321F + | 0xA48D..=0xA48F + | 0xA4C7..=0xA4CF + | 0xA62C..=0xA63F + | 0xA6F8..=0xA6FF + | 0xA7DD..=0xA7F0 + | 0xA82D..=0xA82F + | 0xA83A..=0xA83F + | 0xA878..=0xA87F + | 0xA8C6..=0xA8CD + | 0xA8DA..=0xA8DF + | 0xA954..=0xA95E + | 0xA97D..=0xA97F + | 0xA9CE + | 0xA9DA..=0xA9DD + | 0xA9FF + | 0xAA37..=0xAA3F + | 0xAA4E..=0xAA4F + | 0xAA5A..=0xAA5B + | 0xAAC3..=0xAADA + | 0xAAF7..=0xAB00 + | 0xAB07..=0xAB08 + | 0xAB0F..=0xAB10 + | 0xAB17..=0xAB1F + | 0xAB27 + | 0xAB2F + | 0xAB6C..=0xAB6F + | 0xABEE..=0xABEF + | 0xABFA..=0xABFF + | 0xD7A4..=0xD7AF + | 0xD7C7..=0xD7CA + | 0xD7FC..=0xD7FF + | 0xFA6E..=0xFA6F + | 0xFADA..=0xFAFF + | 0xFB07..=0xFB12 + | 0xFB18..=0xFB1C + | 0xFB37 + | 0xFB3D + | 0xFB3F + | 0xFB42 + | 0xFB45 + | 0xFDD0..=0xFDEF + | 0xFE1A..=0xFE1F + | 0xFE53 + | 0xFE67 + | 0xFE6C..=0xFE6F + | 0xFE75 + | 0xFEFD..=0xFEFE + | 0xFF00 + | 0xFFBF..=0xFFC1 + | 0xFFC8..=0xFFC9 + | 0xFFD0..=0xFFD1 + | 0xFFD8..=0xFFD9 + | 0xFFDD..=0xFFDF + | 0xFFE7 + | 0xFFEF..=0xFFF8 + | 0xFFFE..=0xFFFF + | 0x1000C + | 0x10027 + | 0x1003B + | 0x1003E + | 0x1004E..=0x1004F + | 0x1005E..=0x1007F + | 0x100FB..=0x100FF + | 0x10103..=0x10106 + | 0x10134..=0x10136 + | 0x1018F + | 0x1019D..=0x1019F + | 0x101A1..=0x101CF + | 0x101FE..=0x1027F + | 0x1029D..=0x1029F + | 0x102D1..=0x102DF + | 0x102FC..=0x102FF + | 0x10324..=0x1032C + | 0x1034B..=0x1034F + | 0x1037B..=0x1037F + | 0x1039E + | 0x103C4..=0x103C7 + | 0x103D6..=0x103FF + | 0x1049E..=0x1049F + | 0x104AA..=0x104AF + | 0x104D4..=0x104D7 + | 0x104FC..=0x104FF + | 0x10528..=0x1052F + | 0x10564..=0x1056E + | 0x1057B + | 0x1058B + | 0x10593 + | 0x10596 + | 0x105A2 + | 0x105B2 + | 0x105BA + | 0x105BD..=0x105BF + | 0x105F4..=0x105FF + | 0x10737..=0x1073F + | 0x10756..=0x1075F + | 0x10768..=0x1077F + | 0x10786 + | 0x107B1 + | 0x107BB..=0x107FF + | 0x10806..=0x10807 + | 0x10809 + | 0x10836 + | 0x10839..=0x1083B + | 0x1083D..=0x1083E + | 0x10856 + | 0x1089F..=0x108A6 + | 0x108B0..=0x108DF + | 0x108F3 + | 0x108F6..=0x108FA + | 0x1091C..=0x1091E + | 0x1093A..=0x1093E + | 0x1095A..=0x1097F + | 0x109B8..=0x109BB + | 0x109D0..=0x109D1 + | 0x10A04 + | 0x10A07..=0x10A0B + | 0x10A14 + | 0x10A18 + | 0x10A36..=0x10A37 + | 0x10A3B..=0x10A3E + | 0x10A49..=0x10A4F + | 0x10A59..=0x10A5F + | 0x10AA0..=0x10ABF + | 0x10AE7..=0x10AEA + | 0x10AF7..=0x10AFF + | 0x10B36..=0x10B38 + | 0x10B56..=0x10B57 + | 0x10B73..=0x10B77 + | 0x10B92..=0x10B98 + | 0x10B9D..=0x10BA8 + | 0x10BB0..=0x10BFF + | 0x10C49..=0x10C7F + | 0x10CB3..=0x10CBF + | 0x10CF3..=0x10CF9 + | 0x10D28..=0x10D2F + | 0x10D3A..=0x10D3F + | 0x10D66..=0x10D68 + | 0x10D86..=0x10D8D + | 0x10D90..=0x10E5F + | 0x10E7F + | 0x10EAA + | 0x10EAE..=0x10EAF + | 0x10EB2..=0x10EC1 + | 0x10EC8..=0x10ECF + | 0x10ED9..=0x10EF9 + | 0x10F28..=0x10F2F + | 0x10F5A..=0x10F6F + | 0x10F8A..=0x10FAF + | 0x10FCC..=0x10FDF + | 0x10FF7..=0x10FFF + | 0x1104E..=0x11051 + | 0x11076..=0x1107E + | 0x110C3..=0x110CC + | 0x110CE..=0x110CF + | 0x110E9..=0x110EF + | 0x110FA..=0x110FF + | 0x11135 + | 0x11148..=0x1114F + | 0x11177..=0x1117F + | 0x111E0 + | 0x111F5..=0x111FF + | 0x11212 + | 0x11242..=0x1127F + | 0x11287 + | 0x11289 + | 0x1128E + | 0x1129E + | 0x112AA..=0x112AF + | 0x112EB..=0x112EF + | 0x112FA..=0x112FF + | 0x11304 + | 0x1130D..=0x1130E + | 0x11311..=0x11312 + | 0x11329 + | 0x11331 + | 0x11334 + | 0x1133A + | 0x11345..=0x11346 + | 0x11349..=0x1134A + | 0x1134E..=0x1134F + | 0x11351..=0x11356 + | 0x11358..=0x1135C + | 0x11364..=0x11365 + | 0x1136D..=0x1136F + | 0x11375..=0x1137F + | 0x1138A + | 0x1138C..=0x1138D + | 0x1138F + | 0x113B6 + | 0x113C1 + | 0x113C3..=0x113C4 + | 0x113C6 + | 0x113CB + | 0x113D6 + | 0x113D9..=0x113E0 + | 0x113E3..=0x113FF + | 0x1145C + | 0x11462..=0x1147F + | 0x114C8..=0x114CF + | 0x114DA..=0x1157F + | 0x115B6..=0x115B7 + | 0x115DE..=0x115FF + | 0x11645..=0x1164F + | 0x1165A..=0x1165F + | 0x1166D..=0x1167F + | 0x116BA..=0x116BF + | 0x116CA..=0x116CF + | 0x116E4..=0x116FF + | 0x1171B..=0x1171C + | 0x1172C..=0x1172F + | 0x11747..=0x117FF + | 0x1183C..=0x1189F + | 0x118F3..=0x118FE + | 0x11907..=0x11908 + | 0x1190A..=0x1190B + | 0x11914 + | 0x11917 + | 0x11936 + | 0x11939..=0x1193A + | 0x11947..=0x1194F + | 0x1195A..=0x1199F + | 0x119A8..=0x119A9 + | 0x119D8..=0x119D9 + | 0x119E5..=0x119FF + | 0x11A48..=0x11A4F + | 0x11AA3..=0x11AAF + | 0x11AF9..=0x11AFF + | 0x11B0A..=0x11B5F + | 0x11B68..=0x11BBF + | 0x11BE2..=0x11BEF + | 0x11BFA..=0x11BFF + | 0x11C09 + | 0x11C37 + | 0x11C46..=0x11C4F + | 0x11C6D..=0x11C6F + | 0x11C90..=0x11C91 + | 0x11CA8 + | 0x11CB7..=0x11CFF + | 0x11D07 + | 0x11D0A + | 0x11D37..=0x11D39 + | 0x11D3B + | 0x11D3E + | 0x11D48..=0x11D4F + | 0x11D5A..=0x11D5F + | 0x11D66 + | 0x11D69 + | 0x11D8F + | 0x11D92 + | 0x11D99..=0x11D9F + | 0x11DAA..=0x11DAF + | 0x11DDC..=0x11DDF + | 0x11DEA..=0x11EDF + | 0x11EF9..=0x11EFF + | 0x11F11 + | 0x11F3B..=0x11F3D + | 0x11F5B..=0x11FAF + | 0x11FB1..=0x11FBF + | 0x11FF2..=0x11FFE + | 0x1239A..=0x123FF + | 0x1246F + | 0x12475..=0x1247F + | 0x12544..=0x12F8F + | 0x12FF3..=0x12FFF + | 0x13456..=0x1345F + | 0x143FB..=0x143FF + | 0x14647..=0x160FF + | 0x1613A..=0x167FF + | 0x16A39..=0x16A3F + | 0x16A5F + | 0x16A6A..=0x16A6D + | 0x16ABF + | 0x16ACA..=0x16ACF + | 0x16AEE..=0x16AEF + | 0x16AF6..=0x16AFF + | 0x16B46..=0x16B4F + | 0x16B5A + | 0x16B62 + | 0x16B78..=0x16B7C + | 0x16B90..=0x16D3F + | 0x16D7A..=0x16E3F + | 0x16E9B..=0x16E9F + | 0x16EB9..=0x16EBA + | 0x16ED4..=0x16EFF + | 0x16F4B..=0x16F4E + | 0x16F88..=0x16F8E + | 0x16FA0..=0x16FDF + | 0x16FE5..=0x16FEF + | 0x16FF7..=0x16FFF + | 0x18CD6..=0x18CFE + | 0x18D1F..=0x18D7F + | 0x18DF3..=0x1AFEF + | 0x1AFF4 + | 0x1AFFC + | 0x1AFFF + | 0x1B123..=0x1B131 + | 0x1B133..=0x1B14F + | 0x1B153..=0x1B154 + | 0x1B156..=0x1B163 + | 0x1B168..=0x1B16F + | 0x1B2FC..=0x1BBFF + | 0x1BC6B..=0x1BC6F + | 0x1BC7D..=0x1BC7F + | 0x1BC89..=0x1BC8F + | 0x1BC9A..=0x1BC9B + | 0x1BCA4..=0x1CBFF + | 0x1CCFD..=0x1CCFF + | 0x1CEB4..=0x1CEB9 + | 0x1CED1..=0x1CEDF + | 0x1CEF1..=0x1CEFF + | 0x1CF2E..=0x1CF2F + | 0x1CF47..=0x1CF4F + | 0x1CFC4..=0x1CFFF + | 0x1D0F6..=0x1D0FF + | 0x1D127..=0x1D128 + | 0x1D1EB..=0x1D1FF + | 0x1D246..=0x1D2BF + | 0x1D2D4..=0x1D2DF + | 0x1D2F4..=0x1D2FF + | 0x1D357..=0x1D35F + | 0x1D379..=0x1D3FF + | 0x1D455 + | 0x1D49D + | 0x1D4A0..=0x1D4A1 + | 0x1D4A3..=0x1D4A4 + | 0x1D4A7..=0x1D4A8 + | 0x1D4AD + | 0x1D4BA + | 0x1D4BC + | 0x1D4C4 + | 0x1D506 + | 0x1D50B..=0x1D50C + | 0x1D515 + | 0x1D51D + | 0x1D53A + | 0x1D53F + | 0x1D545 + | 0x1D547..=0x1D549 + | 0x1D551 + | 0x1D6A6..=0x1D6A7 + | 0x1D7CC..=0x1D7CD + | 0x1DA8C..=0x1DA9A + | 0x1DAA0 + | 0x1DAB0..=0x1DEFF + | 0x1DF1F..=0x1DF24 + | 0x1DF2B..=0x1DFFF + | 0x1E007 + | 0x1E019..=0x1E01A + | 0x1E022 + | 0x1E025 + | 0x1E02B..=0x1E02F + | 0x1E06E..=0x1E08E + | 0x1E090..=0x1E0FF + | 0x1E12D..=0x1E12F + | 0x1E13E..=0x1E13F + | 0x1E14A..=0x1E14D + | 0x1E150..=0x1E28F + | 0x1E2AF..=0x1E2BF + | 0x1E2FA..=0x1E2FE + | 0x1E300..=0x1E4CF + | 0x1E4FA..=0x1E5CF + | 0x1E5FB..=0x1E5FE + | 0x1E600..=0x1E6BF + | 0x1E6DF + | 0x1E6F6..=0x1E6FD + | 0x1E700..=0x1E7DF + | 0x1E7E7 + | 0x1E7EC + | 0x1E7EF + | 0x1E7FF + | 0x1E8C5..=0x1E8C6 + | 0x1E8D7..=0x1E8FF + | 0x1E94C..=0x1E94F + | 0x1E95A..=0x1E95D + | 0x1E960..=0x1EC70 + | 0x1ECB5..=0x1ED00 + | 0x1ED3E..=0x1EDFF + | 0x1EE04 + | 0x1EE20 + | 0x1EE23 + | 0x1EE25..=0x1EE26 + | 0x1EE28 + | 0x1EE33 + | 0x1EE38 + | 0x1EE3A + | 0x1EE3C..=0x1EE41 + | 0x1EE43..=0x1EE46 + | 0x1EE48 + | 0x1EE4A + | 0x1EE4C + | 0x1EE50 + | 0x1EE53 + | 0x1EE55..=0x1EE56 + | 0x1EE58 + | 0x1EE5A + | 0x1EE5C + | 0x1EE5E + | 0x1EE60 + | 0x1EE63 + | 0x1EE65..=0x1EE66 + | 0x1EE6B + | 0x1EE73 + | 0x1EE78 + | 0x1EE7D + | 0x1EE7F + | 0x1EE8A + | 0x1EE9C..=0x1EEA0 + | 0x1EEA4 + | 0x1EEAA + | 0x1EEBC..=0x1EEEF + | 0x1EEF2..=0x1EFFF + | 0x1F02C..=0x1F02F + | 0x1F094..=0x1F09F + | 0x1F0AF..=0x1F0B0 + | 0x1F0C0 + | 0x1F0D0 + | 0x1F0F6..=0x1F0FF + | 0x1F1AE..=0x1F1E5 + | 0x1F203..=0x1F20F + | 0x1F23C..=0x1F23F + | 0x1F249..=0x1F24F + | 0x1F252..=0x1F25F + | 0x1F266..=0x1F2FF + | 0x1F6D9..=0x1F6DB + | 0x1F6ED..=0x1F6EF + | 0x1F6FD..=0x1F6FF + | 0x1F7DA..=0x1F7DF + | 0x1F7EC..=0x1F7EF + | 0x1F7F1..=0x1F7FF + | 0x1F80C..=0x1F80F + | 0x1F848..=0x1F84F + | 0x1F85A..=0x1F85F + | 0x1F888..=0x1F88F + | 0x1F8AE..=0x1F8AF + | 0x1F8BC..=0x1F8BF + | 0x1F8C2..=0x1F8CF + | 0x1F8D9..=0x1F8FF + | 0x1FA58..=0x1FA5F + | 0x1FA6E..=0x1FA6F + | 0x1FA7D..=0x1FA7F + | 0x1FA8B..=0x1FA8D + | 0x1FAC7 + | 0x1FAC9..=0x1FACC + | 0x1FADD..=0x1FADE + | 0x1FAEB..=0x1FAEE + | 0x1FAF9..=0x1FAFF + | 0x1FB93 + | 0x1FBFB..=0x1FFFF + | 0x2A6E0..=0x2A6FF + | 0x2B81E..=0x2B81F + | 0x2CEAE..=0x2CEAF + | 0x2EBE1..=0x2EBEF + | 0x2EE5E..=0x2F7FF + | 0x2FA1E..=0x2FFFF + | 0x3134B..=0x3134F + | 0x3347A..=0xE0000 + | 0xE0002..=0xE001F + | 0xE0080..=0xE00FF + | 0xE01F0..=0xEFFFF + | 0xFFFFE..=0xFFFFF + | 0x10FFFE..=0x10FFFF + | 0x110000..=u32::MAX // above U+10FFFF — unreachable for `char` + ) +} diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index d8d6c90921f..b060c9b0601 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -569,7 +569,7 @@ where channel_id_bytes[2], channel_id_bytes[3], ]); - channel_id_u32.wrapping_add(best_height.unwrap_or_default()) + best_height.map(|height| channel_id_u32.wrapping_add(height)) }; let partition_factor = if channel_count < 15 { @@ -579,7 +579,7 @@ where }; let has_pending_claims = monitor_state.monitor.has_pending_claims(); - if has_pending_claims || get_partition_key(channel_id) % partition_factor == 0 { + if has_pending_claims || get_partition_key(channel_id).is_some_and(|key| key % partition_factor == 0) { log_trace!( logger, "Syncing Channel Monitor for channel {}", diff --git a/lightning/src/events/bump_transaction/mod.rs b/lightning/src/events/bump_transaction/mod.rs index 11f008612a2..e8c1350e862 100644 --- a/lightning/src/events/bump_transaction/mod.rs +++ b/lightning/src/events/bump_transaction/mod.rs @@ -74,6 +74,7 @@ impl AnchorDescriptor { chan_utils::get_keyed_anchor_redeemscript( &channel_params.broadcaster_pubkeys().funding_pubkey, ) + .to_p2wsh() } else { assert!(tx_params.channel_type_features.supports_anchor_zero_fee_commitments()); shared_anchor_script_pubkey() @@ -1383,4 +1384,27 @@ mod tests { pending_htlcs: Vec::new(), }); } + + #[test] + fn test_anchor_descriptor_previous_utxo_script_pubkey_uses_p2wsh() { + let mut transaction_parameters = ChannelTransactionParameters::test_dummy(42_000_000); + transaction_parameters.channel_type_features = + ChannelTypeFeatures::anchors_zero_htlc_fee_and_dependencies(); + + let funding_pubkey = transaction_parameters.holder_pubkeys.funding_pubkey; + let expected_script_pubkey = + chan_utils::get_keyed_anchor_redeemscript(&funding_pubkey).to_p2wsh(); + + let anchor_descriptor = AnchorDescriptor { + channel_derivation_parameters: ChannelDerivationParameters { + value_satoshis: 42_000_000, + keys_id: [42; 32], + transaction_parameters, + }, + outpoint: OutPoint::null(), + value: Amount::from_sat(ANCHOR_OUTPUT_VALUE_SATOSHI), + }; + + assert_eq!(anchor_descriptor.previous_utxo().script_pubkey, expected_script_pubkey); + } } diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index d97ae6097b6..c0f252dbff1 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -1049,7 +1049,8 @@ pub enum Event { /// If the recipient or an intermediate node misbehaves and gives us free money, this may /// overstate the amount paid, though this is unlikely. /// - /// This is only `None` for payments initiated on LDK versions prior to 0.0.103. + /// This is only `None` for payments abandoned but ultimately claimed when using LDK versions + /// prior to 0.3, 0.2.3, or 0.1.10. /// /// [`Route::get_total_fees`]: crate::routing::router::Route::get_total_fees fee_paid_msat: Option, diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 816eaee8db2..bd88fa88bc3 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1280,6 +1280,11 @@ enum BackgroundEvent { channel_id: ChannelId, highest_update_id_completed: u64, }, + /// A channel had blocked monitor updates waiting on startup. If the updates were blocked on + /// an MPP claim blocker not written to disk, we may be able to unblock them now. + /// + /// This event is never written to disk. + AttemptUnblockMonitorUpdates { counterparty_node_id: PublicKey, channel_id: ChannelId }, } /// A pointer to a channel that is unblocked when an event is surfaced @@ -8074,6 +8079,12 @@ where &counterparty_node_id, ); }, + BackgroundEvent::AttemptUnblockMonitorUpdates { + counterparty_node_id, + channel_id, + } => { + self.handle_monitor_update_release(counterparty_node_id, channel_id, None); + }, } } NotifyOption::DoPersist @@ -8357,26 +8368,23 @@ where debug_assert!(false); return false; } - if let OnionPayload::Invoice { .. } = payment.htlcs[0].onion_payload { - // Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat). - // In this case we're not going to handle any timeouts of the parts here. - // This condition determining whether the MPP is complete here must match - // exactly the condition used in `process_pending_htlc_forwards`. - let htlc_total_msat = - payment.htlcs.iter().map(|h| h.sender_intended_value).sum(); - if payment.htlcs[0].total_msat <= htlc_total_msat { - return true; - } else if payment.htlcs.iter_mut().any(|htlc| { - htlc.timer_ticks += 1; - return htlc.timer_ticks >= MPP_TIMEOUT_TICKS; - }) { - let htlcs = payment - .htlcs - .drain(..) - .map(|htlc: ClaimableHTLC| (htlc.prev_hop, *payment_hash)); - timed_out_mpp_htlcs.extend(htlcs); - return false; - } + // Check if we've received all the parts we need for an MPP. + // This condition determining whether the MPP is complete here must match + // exactly the condition used in `process_pending_htlc_forwards`. + let htlc_total_msat = + payment.htlcs.iter().map(|h| h.sender_intended_value).sum(); + if payment.htlcs[0].total_msat <= htlc_total_msat { + return true; + } else if payment.htlcs.iter_mut().any(|htlc| { + htlc.timer_ticks += 1; + return htlc.timer_ticks >= MPP_TIMEOUT_TICKS; + }) { + let htlcs = payment + .htlcs + .drain(..) + .map(|htlc: ClaimableHTLC| (htlc.prev_hop, *payment_hash)); + timed_out_mpp_htlcs.extend(htlcs); + return false; } true }, @@ -9077,12 +9085,12 @@ where { if let Some(peer_state_mtx) = per_peer_state.get(&node_id) { let mut peer_state = peer_state_mtx.lock().unwrap(); - if let Some(blockers) = peer_state + let entry = peer_state .actions_blocking_raa_monitor_updates - .get_mut(&channel_id) - { + .entry(channel_id); + if let btree_map::Entry::Occupied(mut entry) = entry { let mut found_blocker = false; - blockers.retain(|iter| { + entry.get_mut().retain(|iter| { // Note that we could actually be blocked, in // which case we need to only remove the one // blocker which was added duplicatively. @@ -9092,6 +9100,9 @@ where } *iter != blocker || !first_blocker }); + if entry.get().is_empty() { + entry.remove(); + } debug_assert!(found_blocker); } } else { @@ -9359,6 +9370,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ channel_id, .. } => *channel_id == prev_channel_id, + BackgroundEvent::AttemptUnblockMonitorUpdates { .. } => false, } }); assert!( @@ -13587,10 +13599,12 @@ where let peer_state = &mut *peer_state_lck; if let Some(blocker) = completed_blocker.take() { // Only do this on the first iteration of the loop. - if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates - .get_mut(&channel_id) - { - blockers.retain(|iter| iter != &blocker); + let entry = peer_state.actions_blocking_raa_monitor_updates.entry(channel_id); + if let btree_map::Entry::Occupied(mut entry) = entry { + entry.get_mut().retain(|iter| iter != &blocker); + if entry.get().is_empty() { + entry.remove(); + } } } @@ -17366,6 +17380,14 @@ where log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning"); return Err(DecodeError::DangerousValue); } + if funded_chan.blocked_monitor_updates_pending() > 0 { + pending_background_events.push( + BackgroundEvent::AttemptUnblockMonitorUpdates { + counterparty_node_id: *counterparty_id, + channel_id: *chan_id, + }, + ); + } } else { // We shouldn't have persisted (or read) any unfunded channel types so none should have been // created in this `channel_by_id` map. diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 36fb17ff076..54019149409 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -8405,7 +8405,7 @@ pub fn test_inconsistent_mpp_params() { pass_along_path(&nodes[0], path_b, real_amt, hash, Some(payment_secret), event, true, None); do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], &[path_a, path_b], preimage)); - expect_payment_sent(&nodes[0], preimage, Some(None), true, true); + expect_payment_sent(&nodes[0], preimage, Some(Some(2000)), true, true); } #[xtest(feature = "_externalize_tests")] @@ -9999,3 +9999,69 @@ pub fn test_dust_exposure_holding_cell_assertion() { // Now that everything has settled, make sure the channels still work with a simple claim. claim_payment(&nodes[2], &[&nodes[1]], payment_preimage_cb); } + +#[test] +fn test_dup_htlc_claim_onchain_and_offchain() { + // Tests what happens if we receive a claim first offchain, then see a counterparty broadcast + // their commitment transaction and re-claim the same HTLC on-chain. This was never broken, but + // the very specific ordering in this test did hit a debug assertion failure. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let legacy_cfg = test_default_channel_config(); + let node_chanmgrs = create_node_chanmgrs( + 3, + &node_cfgs, + &[Some(legacy_cfg.clone()), Some(legacy_cfg.clone()), Some(legacy_cfg)], + ); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let node_b_id = nodes[1].node.get_our_node_id(); + let node_c_id = nodes[2].node.get_our_node_id(); + + create_announced_chan_between_nodes(&nodes, 0, 1); + let chan_bc = create_announced_chan_between_nodes(&nodes, 1, 2); + + // Route payment A -> B -> C. + let (payment_preimage, payment_hash, _, _) = + route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1_000_000); + + // C claims the payment. + nodes[2].node.claim_funds(payment_preimage); + expect_payment_claimed!(nodes[2], payment_hash, 1_000_000); + check_added_monitors(&nodes[2], 1); + + // Deliver only C's update_fulfill_htlc to B (NOT the commitment_signed). B learns + // the preimage and claims from A (adding an RAA blocker on B-C via + // internal_update_fulfill_htlc, then removing it when the A-B monitor update completes + // and the EmitEventOptionAndFreeOtherChannel action runs). + let cs_updates = get_htlc_update_msgs(&nodes[2], &node_b_id); + nodes[1].node.handle_update_fulfill_htlc(node_c_id, cs_updates.update_fulfill_htlcs[0].clone()); + check_added_monitors(&nodes[1], 1); + + // Ignore B's attempts to claim the HTLC from A. + nodes[1].node.get_and_clear_pending_msg_events(); + + // Get C's commitment transactions. C's commitment includes the HTLC and C has + // an HTLC-success transaction (claiming with preimage). Mine both on B. + let cs_txn = get_local_commitment_txn!(nodes[2], chan_bc.2); + assert!(cs_txn.len() >= 2, "Expected commitment + HTLC-success tx, got {}", cs_txn.len()); + + // Mine C's commitment on B. B sees the counterparty commitment on-chain. + mine_transaction(&nodes[1], &cs_txn[0]); + check_closed_broadcast(&nodes[1], 1, true); + check_added_monitors(&nodes[1], 1); + let events = nodes[1].node.get_and_clear_pending_events(); + assert!( + events.iter().any(|e| matches!(e, Event::ChannelClosed { .. })), + "Expected ChannelClosed event" + ); + + // Mine C's HTLC-success transaction. B's monitor sees the preimage being used on-chain + // and generates an HTLCEvent with the preimage. + mine_transaction(&nodes[1], &cs_txn[1]); + + // Advance past ANTI_REORG_DELAY so the on-chain HTLC resolution matures. This triggers + // the monitor to generate an HTLCEvent with the preimage via process_pending_monitor_events, + // which calls claim_funds_internal a second time. + connect_blocks(&nodes[1], ANTI_REORG_DELAY); +} diff --git a/lightning/src/ln/offers_tests.rs b/lightning/src/ln/offers_tests.rs index b7d64df4063..e8832316b9a 100644 --- a/lightning/src/ln/offers_tests.rs +++ b/lightning/src/ln/offers_tests.rs @@ -53,7 +53,7 @@ use crate::events::{ClosureReason, Event, HTLCHandlingFailureType, PaidBolt12Inv use crate::ln::channelmanager::{Bolt12PaymentError, PaymentId, RecentPaymentDetails, RecipientOnionFields, Retry, self}; use crate::types::features::Bolt12InvoiceFeatures; use crate::ln::functional_test_utils::*; -use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, NodeAnnouncement, OnionMessage, OnionMessageHandler, RoutingMessageHandler, SocketAddress, UnsignedGossipMessage, UnsignedNodeAnnouncement}; +use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, OnionMessage, OnionMessageHandler}; use crate::ln::outbound_payment::IDEMPOTENCY_TIMEOUT_TICKS; use crate::offers::invoice::Bolt12Invoice; use crate::offers::invoice_error::InvoiceError; @@ -115,38 +115,6 @@ fn disconnect_peers<'a, 'b, 'c>(node_a: &Node<'a, 'b, 'c>, peers: &[&Node<'a, 'b } } -fn announce_node_address<'a, 'b, 'c>( - node: &Node<'a, 'b, 'c>, peers: &[&Node<'a, 'b, 'c>], address: SocketAddress, -) { - let features = node.onion_messenger.provided_node_features() - | node.gossip_sync.provided_node_features(); - let rgb = [0u8; 3]; - let announcement = UnsignedNodeAnnouncement { - features, - timestamp: 1000, - node_id: NodeId::from_pubkey(&node.keys_manager.get_node_id(Recipient::Node).unwrap()), - rgb, - alias: NodeAlias([0u8; 32]), - addresses: vec![address], - excess_address_data: Vec::new(), - excess_data: Vec::new(), - }; - let signature = node.keys_manager.sign_gossip_message( - UnsignedGossipMessage::NodeAnnouncement(&announcement) - ).unwrap(); - - let msg = NodeAnnouncement { - signature, - contents: announcement - }; - - let node_pubkey = node.node.get_our_node_id(); - node.gossip_sync.handle_node_announcement(None, &msg).unwrap(); - for peer in peers { - peer.gossip_sync.handle_node_announcement(Some(node_pubkey), &msg).unwrap(); - } -} - fn resolve_introduction_node<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, path: &BlindedMessagePath) -> PublicKey { path.public_introduction_node_id(&node.network_graph.read_only()) .and_then(|node_id| node_id.as_pubkey().ok()) @@ -315,126 +283,6 @@ fn create_refund_with_no_blinded_path() { assert!(refund.paths().is_empty()); } -/// Checks that blinded paths without Tor-only nodes are preferred when constructing an offer. -#[test] -fn prefers_non_tor_nodes_in_blinded_paths() { - let mut accept_forward_cfg = test_default_channel_config(); - accept_forward_cfg.accept_forwards_to_priv_channels = true; - - let mut features = channelmanager::provided_init_features(&accept_forward_cfg); - features.set_onion_messages_optional(); - features.set_route_blinding_optional(); - - let chanmon_cfgs = create_chanmon_cfgs(6); - let node_cfgs = create_node_cfgs(6, &chanmon_cfgs); - - *node_cfgs[1].override_init_features.borrow_mut() = Some(features); - - let node_chanmgrs = create_node_chanmgrs( - 6, &node_cfgs, &[None, Some(accept_forward_cfg), None, None, None, None] - ); - let nodes = create_network(6, &node_cfgs, &node_chanmgrs); - - create_unannounced_chan_between_nodes_with_value(&nodes, 0, 1, 10_000_000, 1_000_000_000); - create_unannounced_chan_between_nodes_with_value(&nodes, 2, 3, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 1, 2, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 1, 4, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 1, 5, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 2, 4, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 2, 5, 10_000_000, 1_000_000_000); - - // Add an extra channel so that more than one of Bob's peers have MIN_PEER_CHANNELS. - create_announced_chan_between_nodes_with_value(&nodes, 4, 5, 10_000_000, 1_000_000_000); - - let (alice, bob, charlie, david) = (&nodes[0], &nodes[1], &nodes[2], &nodes[3]); - let bob_id = bob.node.get_our_node_id(); - let charlie_id = charlie.node.get_our_node_id(); - - disconnect_peers(alice, &[charlie, david, &nodes[4], &nodes[5]]); - disconnect_peers(david, &[bob, &nodes[4], &nodes[5]]); - - let tor = SocketAddress::OnionV2([255, 254, 253, 252, 251, 250, 249, 248, 247, 246, 38, 7]); - announce_node_address(charlie, &[alice, bob, david, &nodes[4], &nodes[5]], tor.clone()); - - let offer = bob.node - .create_offer_builder().unwrap() - .amount_msats(10_000_000) - .build().unwrap(); - assert_ne!(offer.issuer_signing_pubkey(), Some(bob_id)); - assert!(!offer.paths().is_empty()); - for path in offer.paths() { - let introduction_node_id = resolve_introduction_node(david, &path); - assert_ne!(introduction_node_id, bob_id); - assert_ne!(introduction_node_id, charlie_id); - } - - // Use a one-hop blinded path when Bob is announced and all his peers are Tor-only. - announce_node_address(&nodes[4], &[alice, bob, charlie, david, &nodes[5]], tor.clone()); - announce_node_address(&nodes[5], &[alice, bob, charlie, david, &nodes[4]], tor.clone()); - - let offer = bob.node - .create_offer_builder().unwrap() - .amount_msats(10_000_000) - .build().unwrap(); - assert_ne!(offer.issuer_signing_pubkey(), Some(bob_id)); - assert!(!offer.paths().is_empty()); - for path in offer.paths() { - let introduction_node_id = resolve_introduction_node(david, &path); - assert_eq!(introduction_node_id, bob_id); - } -} - -/// Checks that blinded paths prefer an introduction node that is the most connected. -#[test] -fn prefers_more_connected_nodes_in_blinded_paths() { - let mut accept_forward_cfg = test_default_channel_config(); - accept_forward_cfg.accept_forwards_to_priv_channels = true; - - let mut features = channelmanager::provided_init_features(&accept_forward_cfg); - features.set_onion_messages_optional(); - features.set_route_blinding_optional(); - - let chanmon_cfgs = create_chanmon_cfgs(6); - let node_cfgs = create_node_cfgs(6, &chanmon_cfgs); - - *node_cfgs[1].override_init_features.borrow_mut() = Some(features); - - let node_chanmgrs = create_node_chanmgrs( - 6, &node_cfgs, &[None, Some(accept_forward_cfg), None, None, None, None] - ); - let nodes = create_network(6, &node_cfgs, &node_chanmgrs); - - create_unannounced_chan_between_nodes_with_value(&nodes, 0, 1, 10_000_000, 1_000_000_000); - create_unannounced_chan_between_nodes_with_value(&nodes, 2, 3, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 1, 2, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 1, 4, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 1, 5, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 2, 4, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 2, 5, 10_000_000, 1_000_000_000); - - // Add extra channels so that more than one of Bob's peers have MIN_PEER_CHANNELS and one has - // more than the others. - create_announced_chan_between_nodes_with_value(&nodes, 0, 4, 10_000_000, 1_000_000_000); - create_announced_chan_between_nodes_with_value(&nodes, 3, 4, 10_000_000, 1_000_000_000); - - let (alice, bob, charlie, david) = (&nodes[0], &nodes[1], &nodes[2], &nodes[3]); - let bob_id = bob.node.get_our_node_id(); - - disconnect_peers(alice, &[charlie, david, &nodes[4], &nodes[5]]); - disconnect_peers(david, &[bob, &nodes[4], &nodes[5]]); - - let offer = bob.node - .create_offer_builder().unwrap() - .amount_msats(10_000_000) - .build().unwrap(); - assert_ne!(offer.issuer_signing_pubkey(), Some(bob_id)); - assert!(!offer.paths().is_empty()); - for path in offer.paths() { - let introduction_node_id = resolve_introduction_node(david, &path); - assert_eq!(introduction_node_id, nodes[4].node.get_our_node_id()); - } -} - /// Tests the dummy hop behavior of Offers based on the message router used: /// - Compact paths (`DefaultMessageRouter`) should not include dummy hops. /// - Node ID paths (`NodeIdMessageRouter`) may include 0 to [`MAX_DUMMY_HOPS_COUNT`] dummy hops. diff --git a/lightning/src/ln/outbound_payment.rs b/lightning/src/ln/outbound_payment.rs index 60c33b09bea..670d3a18ba9 100644 --- a/lightning/src/ln/outbound_payment.rs +++ b/lightning/src/ln/outbound_payment.rs @@ -156,6 +156,9 @@ pub(crate) enum PendingOutboundPayment { /// The total payment amount across all paths, used to be able to issue `PaymentSent` if /// an HTLC still happens to succeed after we marked the payment as abandoned. total_msat: Option, + /// Preserved from `Retryable` so we can still report `fee_paid_msat` if an HTLC succeeds after + /// the payment was abandoned. Added in 0.3. + pending_fee_msat: Option, }, } @@ -244,6 +247,7 @@ impl PendingOutboundPayment { fn get_pending_fee_msat(&self) -> Option { match self { PendingOutboundPayment::Retryable { pending_fee_msat, .. } => pending_fee_msat.clone(), + PendingOutboundPayment::Abandoned { pending_fee_msat, .. } => pending_fee_msat.clone(), _ => None, } } @@ -300,6 +304,7 @@ impl PendingOutboundPayment { _ => new_hash_set(), }; let total_msat = self.total_msat(); + let pending_fee_msat = self.get_pending_fee_msat(); match self { Self::Retryable { payment_hash, .. } | Self::InvoiceReceived { payment_hash, .. } | @@ -310,6 +315,7 @@ impl PendingOutboundPayment { payment_hash: *payment_hash, reason: Some(reason), total_msat, + pending_fee_msat, }; }, _ => {} @@ -2753,6 +2759,7 @@ impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment, (1, reason, upgradable_option), (2, payment_hash, required), (3, total_msat, option), + (5, pending_fee_msat, option), }, (5, AwaitingInvoice) => { (0, expiration, required), diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index bab2a16bef9..4c6fe963a6f 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -332,7 +332,7 @@ fn mpp_retry_overpay() { expect_payment_sent!(&nodes[0], payment_preimage, Some(expected_total_fee_msat)); } -fn do_mpp_receive_timeout(send_partial_mpp: bool) { +fn do_mpp_receive_timeout(send_partial_mpp: bool, keysend: bool) { let chanmon_cfgs = create_chanmon_cfgs(4); let node_cfgs = create_node_cfgs(4, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(4, &node_cfgs, &[None, None, None, None]); @@ -348,8 +348,12 @@ fn do_mpp_receive_timeout(send_partial_mpp: bool) { let (chan_3_update, _, chan_3_id, _) = create_announced_chan_between_nodes(&nodes, 1, 3); let (chan_4_update, _, _, _) = create_announced_chan_between_nodes(&nodes, 2, 3); - let (mut route, hash, payment_preimage, payment_secret) = - get_route_and_payment_hash!(nodes[0], nodes[3], 100_000); + let (mut route, hash, payment_preimage, payment_secret) = if keysend { + let payment_params = PaymentParameters::for_keysend(node_d_id, TEST_FINAL_CLTV, true); + get_route_and_payment_hash!(nodes[0], nodes[3], payment_params, 100_000) + } else { + get_route_and_payment_hash!(nodes[0], nodes[3], 100_000) + }; let path = route.paths[0].clone(); route.paths.push(path); route.paths[0].hops[0].pubkey = node_b_id; @@ -361,8 +365,23 @@ fn do_mpp_receive_timeout(send_partial_mpp: bool) { // Initiate the MPP payment. let onion = RecipientOnionFields::secret_only(payment_secret); - nodes[0].node.send_payment_with_route(route, hash, onion, PaymentId(hash.0)).unwrap(); - check_added_monitors!(nodes[0], 2); // one monitor per path + if keysend { + let route_params = route.route_params.clone().unwrap(); + nodes[0].router.expect_find_route(route_params.clone(), Ok(route.clone())); + nodes[0] + .node + .send_spontaneous_payment( + Some(payment_preimage), + onion, + PaymentId(hash.0), + route_params, + Retry::Attempts(0), + ) + .unwrap(); + } else { + nodes[0].node.send_payment_with_route(route, hash, onion, PaymentId(hash.0)).unwrap(); + } + check_added_monitors(&nodes[0], 2); // one monitor per path let mut events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(events.len(), 2); @@ -408,7 +427,17 @@ fn do_mpp_receive_timeout(send_partial_mpp: bool) { let node_2_msgs = remove_first_msg_event_to_node(&node_c_id, &mut events); let path = &[&nodes[2], &nodes[3]]; let payment_secret = Some(payment_secret); - pass_along_path(&nodes[0], path, 200_000, hash, payment_secret, node_2_msgs, true, None); + let expected_preimage = if keysend { Some(payment_preimage) } else { None }; + pass_along_path( + &nodes[0], + path, + 200_000, + hash, + payment_secret, + node_2_msgs, + true, + expected_preimage, + ); // Even after MPP_TIMEOUT_TICKS we should not timeout the MPP if we have all the parts for _ in 0..MPP_TIMEOUT_TICKS { @@ -422,8 +451,14 @@ fn do_mpp_receive_timeout(send_partial_mpp: bool) { #[test] fn mpp_receive_timeout() { - do_mpp_receive_timeout(true); - do_mpp_receive_timeout(false); + do_mpp_receive_timeout(true, false); + do_mpp_receive_timeout(false, false); +} + +#[test] +fn keysend_mpp_receive_timeout() { + do_mpp_receive_timeout(true, true); + do_mpp_receive_timeout(false, true); } #[test] @@ -1991,6 +2026,36 @@ fn abandoned_send_payment_idempotent() { claim_payment(&nodes[0], &[&nodes[1]], second_payment_preimage); } +#[test] +fn abandoned_payment_fulfilled_preserves_fee_paid_msat() { + // Previously, if we abandoned a payment with HTLCs in-flight and the payment eventually + // succeeded, we would set the `Event::PaymentSent::fee_paid_msat` to None, even though we had + // docs guaranteeing that it would always be Some after 0.0.103. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + create_announced_chan_between_nodes(&nodes, 0, 1); + create_announced_chan_between_nodes(&nodes, 1, 2); + + let amt_msat = 5_000_000; + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(&nodes[0], nodes[2], amt_msat); + let payment_id = PaymentId(payment_hash.0); + let onion = RecipientOnionFields::secret_only(payment_secret); + nodes[0].node.send_payment_with_route(route, payment_hash, onion, payment_id).unwrap(); + check_added_monitors(&nodes[0], 1); + + let path: &[&Node] = &[&nodes[1], &nodes[2]]; + pass_along_route(&nodes[0], &[path], amt_msat, payment_hash, payment_secret); + + nodes[0].node.abandon_payment(payment_id); + assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + + claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], &[path], payment_preimage)); +} + #[derive(PartialEq)] enum InterceptTest { Forward, diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 9e169d176e6..9669ff5f05f 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -919,6 +919,363 @@ fn test_partial_claim_before_restart() { do_test_partial_claim_before_restart(true, true); } +#[test] +fn test_mpp_claim_htlc_fulfills_unblocked_on_reload() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + // Open two independent channels between the same nodes. The payment below is large enough to + // force the router to split it across both channels, which is what makes the MPP claim depend + // on both ChannelMonitors durably learning the preimage. + let chan_b = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0); + let chan_a = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0); + let chan_id_a = chan_a.2; + let chan_id_b = chan_b.2; + let scid_a = chan_a.0.contents.short_channel_id; + let scid_b = chan_b.0.contents.short_channel_id; + + // Send an MPP payment to nodes[1]. `send_along_route_with_secret` leaves the payment + // claimable but unclaimed, so nodes[1] still has both inbound HTLCs live when we start + // manipulating monitor persistence below. + let amt_msat = 20_000_000; + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[1], amt_msat); + assert_eq!(route.paths.len(), 2); + send_along_route_with_secret( + &nodes[0], route, &[&[&nodes[1]], &[&nodes[1]]], amt_msat, payment_hash, + payment_secret, + ); + + // Move both channels into `AWAITING_REMOTE_REVOKE` by having nodes[0] send fee updates and + // withholding nodes[1]'s responding `commitment_signed`s. When nodes[1] later claims the + // payment, the fulfill updates cannot be sent immediately and instead sit in each channel's + // holding cell. + { + let mut fee_est = chanmon_cfgs[0].fee_estimator.sat_per_kw.lock().unwrap(); + *fee_est *= 2; + } + nodes[0].node.timer_tick_occurred(); + check_added_monitors(&nodes[0], 2); + + let node_0_id = nodes[0].node.get_our_node_id(); + let node_1_id = nodes[1].node.get_our_node_id(); + + let fee_msgs = nodes[0].node.get_and_clear_pending_msg_events(); + assert_eq!(fee_msgs.len(), 2); + for ev in &fee_msgs { + match ev { + MessageSendEvent::UpdateHTLCs { updates, .. } => { + nodes[1].node.handle_update_fee(node_0_id, updates.update_fee.as_ref().unwrap()); + nodes[1].node.handle_commitment_signed_batch_test( + node_0_id, &updates.commitment_signed, + ); + check_added_monitors(&nodes[1], 1); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + // nodes[1] responds to each fee update with a `revoke_and_ack` and a new + // `commitment_signed`. Deliver only the `revoke_and_ack`s for now. The held + // `commitment_signed`s are delivered after nodes[1] claims the payment, creating the blocked + // post-claim monitor updates whose release is exercised after reload. + let node_1_msgs = nodes[1].node.get_and_clear_pending_msg_events(); + let mut commitment_signed_msgs = Vec::new(); + for ev in &node_1_msgs { + match ev { + MessageSendEvent::SendRevokeAndACK { msg, .. } => { + nodes[0].node.handle_revoke_and_ack(node_1_id, msg); + check_added_monitors(&nodes[0], 1); + }, + MessageSendEvent::UpdateHTLCs { updates, .. } => { + commitment_signed_msgs.push(updates.commitment_signed.clone()); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + let node_0_msgs = nodes[0].node.get_and_clear_pending_msg_events(); + for ev in &node_0_msgs { + match ev { + MessageSendEvent::SendRevokeAndACK { msg, .. } => { + nodes[1].node.handle_revoke_and_ack(node_0_id, msg); + check_added_monitors(&nodes[1], 1); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + // Snapshot channel B before the claim. The in-memory ChainMonitor applies updates even when + // the persister returns `InProgress`, so taking this snapshot after the claim would not model a + // crash between two separate monitor writes. + let mon_b_serialized = get_monitor!(nodes[1], chan_id_b).encode(); + + // Make both preimage monitor writes asynchronous. `claim_funds` attaches an in-memory MPP RAA + // blocker so neither channel can release later monitor updates until all channels have the + // preimage durably persisted. + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + nodes[1].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[1], 2); + + // Complete only channel A's preimage update. Channel B will be reloaded from the stale snapshot + // above, simulating a crash where one monitor write completed and the other did not. + let (update_id_a, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_id_a).unwrap().clone(); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id_a, update_id_a); + + // Now finish the fee-update commitment dance we held back. nodes[1] receives nodes[0]'s + // `revoke_and_ack`s while the MPP RAA blocker is still in place, so the resulting monitor + // updates are blocked behind state that is not serialized in the ChannelManager. + for commitment_signed in &commitment_signed_msgs { + nodes[0].node.handle_commitment_signed_batch_test(node_1_id, commitment_signed); + check_added_monitors(&nodes[0], 1); + } + let node_0_msgs = nodes[0].node.get_and_clear_pending_msg_events(); + for ev in &node_0_msgs { + match ev { + MessageSendEvent::SendRevokeAndACK { msg, .. } => { + nodes[1].node.handle_revoke_and_ack(node_0_id, msg); + check_added_monitors(&nodes[1], 0); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + + // Persist the ChannelManager after the blocked post-claim monitor updates have been recorded. + // Reload with channel A's up-to-date monitor and channel B's stale monitor. The preimage update + // for B is replayed during reload, putting both channels' preimages on disk. The remaining state + // under test is the blocked post-claim `revoke_and_ack` monitor updates after the in-memory MPP + // RAA blocker that created them is gone. + let node_1_serialized = nodes[1].node.encode(); + let mon_a_serialized = get_monitor!(nodes[1], chan_id_a).encode(); + + nodes[0].node.peer_disconnected(node_1_id); + reload_node!( + nodes[1], + node_1_serialized, + &[&mon_a_serialized, &mon_b_serialized], + persister, + new_chain_monitor, + nodes_1_deserialized + ); + + // Reconnect both peers by manually exchanging `channel_reestablish`s. This avoids relying on a + // more general reconnect helper while the channels intentionally have asymmetric monitor state. + let node_1_id = nodes[1].node.get_our_node_id(); + nodes[0].node.peer_connected(node_1_id, &msgs::Init { + features: nodes[1].node.init_features(), networks: None, remote_network_address: None, + }, true).unwrap(); + nodes[1].node.peer_connected(node_0_id, &msgs::Init { + features: nodes[0].node.init_features(), networks: None, remote_network_address: None, + }, false).unwrap(); + + let reestablish_0 = nodes[0].node.get_and_clear_pending_msg_events(); + let reestablish_1 = nodes[1].node.get_and_clear_pending_msg_events(); + let mut reestablish_0_chan_ids = Vec::new(); + let mut reestablish_1_chan_ids = Vec::new(); + for ev in &reestablish_1 { + match ev { + MessageSendEvent::SendChannelReestablish { node_id, msg } => { + assert_eq!(*node_id, node_0_id); + reestablish_1_chan_ids.push(msg.channel_id); + nodes[0].node.handle_channel_reestablish(node_1_id, msg); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + for ev in &reestablish_0 { + match ev { + MessageSendEvent::SendChannelReestablish { node_id, msg } => { + assert_eq!(*node_id, node_1_id); + reestablish_0_chan_ids.push(msg.channel_id); + nodes[1].node.handle_channel_reestablish(node_0_id, msg); + }, + _ => panic!("Unexpected message: {:?}", ev), + } + } + assert_eq!(reestablish_0_chan_ids.len(), 2); + assert!(reestablish_0_chan_ids.contains(&chan_id_a)); + assert!(reestablish_0_chan_ids.contains(&chan_id_b)); + assert_eq!(reestablish_1_chan_ids.len(), 2); + assert!(reestablish_1_chan_ids.contains(&chan_id_a)); + assert!(reestablish_1_chan_ids.contains(&chan_id_b)); + // Only nodes[1] was reloaded with stale monitor state. nodes[0] responds to the + // `channel_reestablish`s without touching its monitors. nodes[1] applies the replayed channel B + // preimage update, releases channel A's held RAA update, and frees channel A's held fulfill + // during startup processing. + // Note that unlike the test in 0.3, we only generate the last monitor update for node B after + // get_and_clear_pending_msg_events as we only free the holding cell then. + check_added_monitors(&nodes[0], 0); + check_added_monitors(&nodes[1], 2); + + // The first message batch after reconnect contains channel updates from both nodes. nodes[1] + // also sends the channel A fulfill that startup processing released from the holding cell. + let restart_msgs_0 = nodes[0].node.get_and_clear_pending_msg_events(); + let restart_msgs_1 = nodes[1].node.get_and_clear_pending_msg_events(); + check_added_monitors(&nodes[1], 1); + let mut restart_scids_0 = Vec::new(); + let mut restart_scids_1 = Vec::new(); + let mut startup_fulfill_chan_ids = Vec::new(); + for ev in &restart_msgs_0 { + match ev { + MessageSendEvent::SendChannelUpdate { node_id, msg } => { + assert_eq!(*node_id, node_1_id); + restart_scids_0.push(msg.contents.short_channel_id); + }, + _ => panic!("Unexpected restart message from node 0: {:?}", ev), + } + } + for ev in &restart_msgs_1 { + match ev { + MessageSendEvent::SendChannelUpdate { node_id, msg } => { + assert_eq!(*node_id, node_0_id); + restart_scids_1.push(msg.contents.short_channel_id); + }, + MessageSendEvent::UpdateHTLCs { node_id, channel_id, updates } => { + assert_eq!(*node_id, node_0_id); + startup_fulfill_chan_ids.push(*channel_id); + assert_eq!(updates.update_fulfill_htlcs.len(), 1); + assert!(updates.update_add_htlcs.is_empty()); + assert!(updates.update_fail_htlcs.is_empty()); + assert!(updates.update_fail_malformed_htlcs.is_empty()); + assert!(updates.update_fee.is_none()); + for fulfill in &updates.update_fulfill_htlcs { + nodes[0].node.handle_update_fulfill_htlc(node_1_id, fulfill.clone()); + } + // Complete the standard commitment handshake for the released fulfill. The helper + // checks nodes[0]'s incoming commitment monitor update, nodes[1]'s response monitor + // updates, and nodes[0]'s held final monitor update. + do_commitment_signed_dance( + &nodes[0], &nodes[1], &updates.commitment_signed, false, false, + ); + }, + _ => panic!("Unexpected restart message from node 1: {:?}", ev), + } + } + assert_eq!(restart_scids_0.len(), 2); + assert!(restart_scids_0.contains(&scid_a)); + assert!(restart_scids_0.contains(&scid_b)); + assert_eq!(restart_scids_1.len(), 2); + assert!(restart_scids_1.contains(&scid_a)); + assert!(restart_scids_1.contains(&scid_b)); + assert_eq!(startup_fulfill_chan_ids, vec![chan_id_a]); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + check_added_monitors(&nodes[0], 0); + check_added_monitors(&nodes[1], 0); + + // Receiving the startup-released fulfill gives nodes[0] the payment preimage. That is enough to + // emit `PaymentSent`, even though channel B's path-level success still needs its own fulfill. + let startup_payment_events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(startup_payment_events.len(), 2); + let mut saw_startup_payment_sent = false; + let mut startup_success_scids = Vec::new(); + for ev in &startup_payment_events { + match ev { + Event::PaymentSent { + payment_preimage: sent_preimage, + payment_hash: sent_hash, + amount_msat: sent_amount, + fee_paid_msat, + .. + } => { + assert_eq!(*sent_preimage, payment_preimage); + assert_eq!(*sent_hash, payment_hash); + assert_eq!(*sent_amount, Some(amt_msat)); + assert_eq!(*fee_paid_msat, Some(0)); + saw_startup_payment_sent = true; + }, + Event::PaymentPathSuccessful { payment_hash: Some(path_hash), path, .. } => { + assert_eq!(*path_hash, payment_hash); + assert_eq!(path.hops.len(), 1); + startup_success_scids.push(path.hops[0].short_channel_id); + }, + _ => panic!("Unexpected startup payment event: {:?}", ev), + } + } + assert!(saw_startup_payment_sent); + assert_eq!(startup_success_scids, vec![scid_a]); + + // Handling the claim event runs the event-completion action that releases the remaining + // RAA-blocked monitor update. The startup unblock path already released channel A, so channel B + // is the only fulfill that should be emitted here. + let claim_events = nodes[1].node.get_and_clear_pending_events(); + assert_eq!(claim_events.len(), 1); + match &claim_events[0] { + Event::PaymentClaimed { payment_hash: claimed_hash, amount_msat, htlcs, .. } => { + assert_eq!(*claimed_hash, payment_hash); + assert_eq!(*amount_msat, amt_msat); + assert_eq!(htlcs.len(), 2); + }, + _ => panic!("Unexpected event: {:?}", claim_events[0]), + } + // The `PaymentSent` event above releases the monitor update that nodes[0] held after the final + // channel A startup revocation. + check_added_monitors(&nodes[0], 1); + // Handling `PaymentClaimed` releases channel B's held revocation update and then the fulfill + // that was waiting behind it (unlike this test in 0.3, after we free the holding cell in + // get_and_clear_pending_msg_events below). + check_added_monitors(&nodes[1], 1); + + // Channel A's fulfill was already sent during startup. The `PaymentClaimed` completion action + // now frees channel B's held fulfill, and no other HTLC update should be bundled with it. + let fulfill_msgs = nodes[1].node.get_and_clear_pending_msg_events(); + check_added_monitors(&nodes[1], 1); + assert_eq!(fulfill_msgs.len(), 1); + match &fulfill_msgs[0] { + MessageSendEvent::UpdateHTLCs { node_id, channel_id, updates } => { + assert_eq!(*node_id, node_0_id); + assert_eq!(*channel_id, chan_id_b); + assert_eq!(updates.update_fulfill_htlcs.len(), 1); + assert!(updates.update_add_htlcs.is_empty()); + assert!(updates.update_fail_htlcs.is_empty()); + assert!(updates.update_fail_malformed_htlcs.is_empty()); + assert!(updates.update_fee.is_none()); + for fulfill in &updates.update_fulfill_htlcs { + nodes[0].node.handle_update_fulfill_htlc(node_1_id, fulfill.clone()); + } + // Complete the same commitment handshake for channel B. Here nodes[0]'s final monitor + // update is persisted immediately because `PaymentSent` already ran for channel A. + do_commitment_signed_dance( + &nodes[0], &nodes[1], &updates.commitment_signed, false, false, + ); + }, + _ => panic!("Unexpected fulfill message: {:?}", fulfill_msgs[0]), + } + check_added_monitors(&nodes[1], 0); + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + let final_payment_events = nodes[0].node.get_and_clear_pending_events(); + assert_eq!(final_payment_events.len(), 1); + match &final_payment_events[0] { + Event::PaymentPathSuccessful { payment_hash: Some(path_hash), path, .. } => { + assert_eq!(*path_hash, payment_hash); + assert_eq!(path.hops.len(), 1); + assert_eq!(path.hops[0].short_channel_id, scid_b); + }, + _ => panic!("Unexpected final payment event: {:?}", final_payment_events[0]), + } + check_added_monitors(&nodes[0], 0); + assert!(nodes[1].node.get_and_clear_pending_events().is_empty()); + check_added_monitors(&nodes[0], 0); + check_added_monitors(&nodes[1], 0); + + // Both MPP parts should have been fulfilled back to nodes[0]. If either channel still has a + // pending outbound HTLC, its fulfill remained stuck in nodes[1]'s holding cell after reload. + let pending: Vec<_> = nodes[0].node.list_channels().iter() + .filter(|channel| channel.channel_id == chan_id_a || channel.channel_id == chan_id_b) + .filter(|channel| !channel.pending_outbound_htlcs.is_empty()) + .map(|channel| channel.channel_id) + .collect(); + assert!(pending.is_empty(), "HTLC fulfills remained stuck on channels {:?}", pending); +} + fn do_forwarded_payment_no_manager_persistence(use_cs_commitment: bool, claim_htlc: bool, use_intercept: bool) { if !use_cs_commitment { assert!(!claim_htlc); } // If we go to forward a payment, and the ChannelMonitor persistence completes, but the diff --git a/lightning/src/offers/flow.rs b/lightning/src/offers/flow.rs index 15e744e1a7a..d8fca367dff 100644 --- a/lightning/src/offers/flow.rs +++ b/lightning/src/offers/flow.rs @@ -210,10 +210,12 @@ where /// /// Must be called whenever a new chain tip becomes available. May be skipped /// for intermediary blocks. - pub fn best_block_updated(&self, header: &Header, _height: u32) { + pub fn best_block_updated(&self, header: &Header, height: u32) { let timestamp = &self.highest_seen_timestamp; let block_time = header.time as usize; + *self.best_block.write().unwrap() = BestBlock::new(header.block_hash(), height); + loop { // Update timestamp to be the max of its current value and the block // timestamp. This should keep us close to the current time without relying on @@ -235,7 +237,7 @@ where #[cfg(feature = "dnssec")] { let updated_time = timestamp.load(Ordering::Acquire) as u32; - self.hrn_resolver.new_best_block(_height, updated_time); + self.hrn_resolver.new_best_block(height, updated_time); } } } diff --git a/lightning/src/offers/invoice_request.rs b/lightning/src/offers/invoice_request.rs index d5d3c4d75a8..9d5d09d2a24 100644 --- a/lightning/src/offers/invoice_request.rs +++ b/lightning/src/offers/invoice_request.rs @@ -2127,6 +2127,19 @@ mod tests { Err(e) => assert_eq!(e, Bolt12SemanticError::InvalidQuantity), } + match OfferBuilder::new(recipient_pubkey()) + .amount_msats(1000) + .supported_quantity(Quantity::Bounded(ten)) + .build() + .unwrap() + .request_invoice(&expanded_key, nonce, &secp_ctx, payment_id) + .unwrap() + .quantity(0) + { + Ok(_) => panic!("expected error"), + Err(e) => assert_eq!(e, Bolt12SemanticError::InvalidQuantity), + } + let invoice_request = OfferBuilder::new(recipient_pubkey()) .amount_msats(1000) .supported_quantity(Quantity::Unbounded) diff --git a/lightning/src/offers/offer.rs b/lightning/src/offers/offer.rs index 5e66b1c9924..e9c3deb7d9a 100644 --- a/lightning/src/offers/offer.rs +++ b/lightning/src/offers/offer.rs @@ -978,7 +978,7 @@ impl OfferContents { fn is_valid_quantity(&self, quantity: u64) -> bool { match self.supported_quantity { - Quantity::Bounded(n) => quantity <= n.get(), + Quantity::Bounded(n) => quantity > 0 && quantity <= n.get(), Quantity::Unbounded => quantity > 0, Quantity::One => quantity == 1, } diff --git a/lightning/src/offers/static_invoice.rs b/lightning/src/offers/static_invoice.rs index 77f486a6a06..8d4b75bb627 100644 --- a/lightning/src/offers/static_invoice.rs +++ b/lightning/src/offers/static_invoice.rs @@ -400,7 +400,7 @@ impl StaticInvoice { /// Whether the [`Offer`] that this invoice is based on is expired. #[cfg(feature = "std")] pub fn is_offer_expired(&self) -> bool { - self.contents.is_expired() + self.contents.is_offer_expired() } /// Whether the [`Offer`] that this invoice is based on is expired, given the current time as @@ -993,6 +993,43 @@ mod tests { } } + #[cfg(feature = "std")] + #[test] + fn is_offer_expired_does_not_check_invoice_expiry() { + // Regression test: `StaticInvoice::is_offer_expired` must reflect the offer's expiry, + // not the invoice's own expiry. Build an invoice whose offer has no absolute expiry + // (so the offer never expires) but whose own `created_at + relative_expiry` lies in + // the past (so the invoice itself is expired). + let node_id = recipient_pubkey(); + let payment_paths = payment_paths(); + let expanded_key = ExpandedKey::new([42; 32]); + let entropy = FixedEntropy {}; + let nonce = Nonce::from_entropy_source(&entropy); + let secp_ctx = Secp256k1::new(); + + let offer = OfferBuilder::deriving_signing_pubkey(node_id, &expanded_key, nonce, &secp_ctx) + .path(blinded_path()) + .build() + .unwrap(); + + let invoice = StaticInvoiceBuilder::for_offer_using_derived_keys( + &offer, + payment_paths.clone(), + vec![blinded_path()], + Duration::from_secs(0), + &expanded_key, + nonce, + &secp_ctx, + ) + .unwrap() + .relative_expiry(1) + .build_and_sign(&secp_ctx) + .unwrap(); + + assert!(invoice.is_expired()); + assert!(!invoice.is_offer_expired()); + } + #[test] fn builds_invoice_from_offer_using_derived_key() { let node_id = recipient_pubkey(); diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 9a2c06bb72f..251c2ae84be 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -584,40 +584,10 @@ where // Limit the number of blinded paths that are computed. const MAX_PATHS: usize = 3; - // Ensure peers have at least three channels so that it is more difficult to infer the - // recipient's node_id. - const MIN_PEER_CHANNELS: usize = 3; - let network_graph = network_graph.deref().read_only(); let is_recipient_announced = network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient)); - let has_one_peer = peers.len() == 1; - let mut peer_info = peers - .map(|peer| MessageForwardNode { - short_channel_id: if compact_paths { peer.short_channel_id } else { None }, - ..peer - }) - // Limit to peers with announced channels unless the recipient is unannounced. - .filter_map(|peer| { - network_graph - .node(&NodeId::from_pubkey(&peer.node_id)) - .filter(|info| { - !is_recipient_announced || info.channels.len() >= MIN_PEER_CHANNELS - }) - .map(|info| (peer, info.is_tor_only(), info.channels.len())) - // Allow messages directly with the only peer when unannounced. - .or_else(|| (!is_recipient_announced && has_one_peer).then(|| (peer, false, 0))) - }) - // Exclude Tor-only nodes when the recipient is announced. - .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced)) - .collect::>(); - - // Prefer using non-Tor nodes with the most channels as the introduction node. - peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| { - a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse()) - }); - let build_path = |intermediate_hops: &[MessageForwardNode]| { let dummy_hops_count = if compact_paths { 0 @@ -637,12 +607,37 @@ where ) }; - // Try to create paths from peer info, fall back to direct path if needed - let mut paths = peer_info - .into_iter() - .map(|(peer, _, _)| build_path(&[peer])) - .take(MAX_PATHS) - .collect::>(); + let has_one_peer = peers.len() == 1; + let mut paths = if !is_recipient_announced { + let mut peer_info = peers + .map(|peer| MessageForwardNode { + short_channel_id: if compact_paths { peer.short_channel_id } else { None }, + ..peer + }) + .filter_map(|peer| { + network_graph + .node(&NodeId::from_pubkey(&peer.node_id)) + .map(|info| (peer, info.is_tor_only(), info.channels.len())) + // Allow messages directly with the only peer. + .or_else(|| has_one_peer.then(|| (peer, false, 0))) + }) + .collect::>(); + + // Prefer using non-Tor nodes with the most channels as the introduction node. + peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| { + a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse()) + }); + + // Try to create paths from peer info, fall back to direct path if needed + peer_info + .into_iter() + .map(|(peer, _, _)| build_path(&[peer])) + .take(MAX_PATHS) + .collect::>() + } else { + vec![] + }; + if paths.is_empty() { if is_recipient_announced { paths = vec![build_path(&[])]; diff --git a/lightning/src/util/anchor_channel_reserves.rs b/lightning/src/util/anchor_channel_reserves.rs index e50e103211f..29e24c9f727 100644 --- a/lightning/src/util/anchor_channel_reserves.rs +++ b/lightning/src/util/anchor_channel_reserves.rs @@ -25,7 +25,7 @@ use crate::chain::chainmonitor::ChainMonitor; use crate::chain::chainmonitor::Persist; use crate::chain::Filter; use crate::events::bump_transaction::Utxo; -use crate::ln::chan_utils::max_htlcs; +use crate::ln::chan_utils::{max_htlcs, BASE_INPUT_WEIGHT}; use crate::ln::channelmanager::AChannelManager; use crate::prelude::new_hash_set; use crate::sign::ecdsa::EcdsaChannelSigner; @@ -240,11 +240,11 @@ pub fn get_supportable_anchor_channels( let mut total_fractional_amount = Amount::from_sat(0); let mut num_whole_utxos = 0; for utxo in utxos { - let satisfaction_fee = context + let spend_fee = context .upper_bound_fee_rate - .fee_wu(Weight::from_wu(utxo.satisfaction_weight)) + .fee_wu(Weight::from_wu(BASE_INPUT_WEIGHT + utxo.satisfaction_weight)) .unwrap_or(Amount::MAX); - let amount = utxo.output.value.checked_sub(satisfaction_fee).unwrap_or(Amount::MIN); + let amount = utxo.output.value.checked_sub(spend_fee).unwrap_or(Amount::MIN); if amount >= reserve_per_channel { num_whole_utxos += 1; } else { @@ -260,6 +260,13 @@ pub fn get_supportable_anchor_channels( num_whole_utxos + total_fractional_amount.to_sat() / reserve_per_channel.to_sat() / 2 } +/// Returns whether a channel of the given type requires an on-chain anchor reserve, i.e. uses +/// either the `anchors_zero_fee_htlc_tx` or `anchor_zero_fee_commitments` (TRUC / 0FC) variant. +fn is_anchor_channel_type(channel_type: &ChannelTypeFeatures) -> bool { + channel_type.supports_anchors_zero_fee_htlc_tx() + || channel_type.supports_anchor_zero_fee_commitments() +} + /// Verifies whether the anchor channel reserve provided by `utxos` is sufficient to support /// an additional anchor channel. /// @@ -311,7 +318,7 @@ where } else { continue; }; - if channel_monitor.channel_type_features().supports_anchors_zero_fee_htlc_tx() + if is_anchor_channel_type(&channel_monitor.channel_type_features()) && !channel_monitor.get_claimable_balances().is_empty() { anchor_channels.insert(channel_id); @@ -320,7 +327,7 @@ where // Also include channels that are in the middle of negotiation or anchor channels that don't have // a ChannelMonitor yet. for channel in a_channel_manager.get_cm().list_channels() { - if channel.channel_type.map_or(true, |ct| ct.supports_anchors_zero_fee_htlc_tx()) { + if channel.channel_type.map_or(true, |ct| is_anchor_channel_type(&ct)) { anchor_channels.insert(channel.channel_id); } } @@ -330,6 +337,7 @@ where #[cfg(test)] mod test { use super::*; + use crate::ln::functional_test_utils::*; use bitcoin::{OutPoint, ScriptBuf, TxOut, Txid}; use std::str::FromStr; @@ -376,6 +384,15 @@ mod test { assert_eq!(get_supportable_anchor_channels(&context, utxos.as_slice()), 3); } + #[test] + fn test_get_supportable_anchor_channels_accounts_for_input_weight() { + let context = AnchorChannelReserveContext::default(); + let reserve = get_reserve_per_channel(&context); + let utxo = make_p2wpkh_utxo(reserve - Amount::from_sat(1)); + + assert_eq!(get_supportable_anchor_channels(&context, &[utxo]), 0); + } + #[test] fn test_anchor_output_spend_transaction_weight() { // Example with smaller signatures: @@ -439,4 +456,49 @@ mod test { 1068 ); } + + #[test] + fn test_can_support_additional_anchor_channel_zero_fee_commitments() { + // Regression test: a channel that uses the `anchor_zero_fee_commitments` + // (option 41) variant is just as much an anchor channel — and requires + // the same on-chain reserve — as one using `anchors_zero_fee_htlc_tx`. + // The reserve check must therefore count it as an existing anchor + // channel when deciding whether the wallet can safely support an + // additional one. Currently `can_support_additional_anchor_channel` + // only counts channels whose features set `anchors_zero_fee_htlc_tx`, + // so a node whose reserves are exhausted by zero-fee-commitment + // channels is incorrectly told it can open another anchor channel. + let mut cfg = test_default_channel_config(); + cfg.channel_handshake_config.negotiate_anchor_zero_fee_commitments = true; + cfg.manually_accept_inbound_channels = true; + + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(cfg.clone()), Some(cfg)]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + create_chan_between_nodes(&nodes[0], &nodes[1]); + + let channels = nodes[0].node.list_channels(); + assert_eq!(channels.len(), 1); + let channel_type = channels[0].channel_type.as_ref().unwrap(); + assert!(channel_type.supports_anchor_zero_fee_commitments()); + // Sanity check: a zero-fee-commitments channel does not also set the + // older anchors_zero_fee_htlc_tx feature. + assert!(!channel_type.supports_anchors_zero_fee_htlc_tx()); + + let context = AnchorChannelReserveContext::default(); + let reserve = get_reserve_per_channel(&context); + // Provide a single UTXO with enough value to cover one channel reserve. + let utxos = vec![make_p2wpkh_utxo(reserve * 2)]; + + // We already have one TRUC anchor channel and only enough reserve for + // a single channel; we must not authorize an additional one. + assert!(!can_support_additional_anchor_channel( + &context, + &utxos, + nodes[0].node, + &nodes[0].chain_monitor.chain_monitor, + )); + } } diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index b31dab1ccf6..a10ecdc01e2 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -473,12 +473,18 @@ where return Ok(()); } - let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await; - - // Release the pending sweep flag again, regardless of result. - self.pending_sweep.store(false, Ordering::Release); + // Use an RAII guard so the flag is released even if this future is dropped mid-await + // (e.g. cancelled by `tokio::time::timeout` or `select!`). A bare `store(false)` after + // the await would never run on cancellation, leaving the sweeper permanently disabled. + struct PendingSweepGuard<'a>(&'a AtomicBool); + impl<'a> Drop for PendingSweepGuard<'a> { + fn drop(&mut self) { + self.0.store(false, Ordering::Release); + } + } + let _guard = PendingSweepGuard(&self.pending_sweep); - result + self.regenerate_and_broadcast_spend_if_necessary_internal().await } /// Regenerates and broadcasts the spending transaction for any outputs that are pending @@ -1111,3 +1117,141 @@ where Ok((best_block, OutputSweeperSync { sweeper })) } } + +#[cfg(all(test, feature = "std"))] +mod tests { + use super::*; + use crate::chain::transaction::OutPoint; + use crate::sign::{ChangeDestinationSource, OutputSpender}; + use crate::util::async_poll::dummy_waker; + use crate::util::logger::Record; + + use bitcoin::hashes::Hash as _; + use bitcoin::secp256k1::All; + use bitcoin::transaction::Version; + use bitcoin::{Amount, BlockHash, ScriptBuf, Transaction, TxOut, Txid}; + + use core::future as core_future; + use core::pin::pin; + use core::sync::atomic::Ordering; + use core::task::Poll; + + struct DummyBroadcaster; + impl BroadcasterInterface for DummyBroadcaster { + fn broadcast_transactions(&self, _: &[&Transaction]) {} + } + + struct DummyFeeEstimator; + impl FeeEstimator for DummyFeeEstimator { + fn get_est_sat_per_1000_weight(&self, _: ConfirmationTarget) -> u32 { + 1000 + } + } + + struct DummyFilter; + impl Filter for DummyFilter { + fn register_tx(&self, _: &Txid, _: &bitcoin::Script) {} + fn register_output(&self, _: WatchedOutput) {} + } + + struct DummyLogger; + impl Logger for DummyLogger { + fn log(&self, _: Record) {} + } + + struct DummyOutputSpender; + impl OutputSpender for DummyOutputSpender { + fn spend_spendable_outputs( + &self, _: &[&SpendableOutputDescriptor], _: Vec, _: ScriptBuf, _: u32, + _: Option, _: &Secp256k1, + ) -> Result { + Ok(Transaction { + version: Version::TWO, + lock_time: LockTime::ZERO, + input: Vec::new(), + output: Vec::new(), + }) + } + } + + struct DummyChangeDestSource; + impl ChangeDestinationSource for DummyChangeDestSource { + fn get_change_destination_script<'a>( + &'a self, + ) -> AsyncResult<'a, ScriptBuf, ()> { + Box::pin(core_future::ready(Ok(ScriptBuf::new()))) + } + } + + struct PendingKVStore; + impl KVStore for PendingKVStore { + fn read( + &self, _: &str, _: &str, _: &str, + ) -> AsyncResult<'static, Vec, io::Error> { + Box::pin(core_future::ready(Err(io::Error::new(io::ErrorKind::NotFound, "")))) + } + fn write( + &self, _: &str, _: &str, _: &str, _: Vec, + ) -> AsyncResult<'static, (), io::Error> { + Box::pin(core_future::pending()) + } + fn remove( + &self, _: &str, _: &str, _: &str, _: bool, + ) -> AsyncResult<'static, (), io::Error> { + Box::pin(core_future::ready(Ok(()))) + } + fn list( + &self, _: &str, _: &str, + ) -> AsyncResult<'static, Vec, io::Error> { + Box::pin(core_future::ready(Ok(Vec::new()))) + } + } + + #[test] + fn pending_sweep_flag_resets_after_future_drop() { + let best_block = BestBlock::new(BlockHash::all_zeros(), 1_000); + + let sweeper: OutputSweeper<_, _, _, _, _, _, _> = OutputSweeper::new( + best_block, + &DummyBroadcaster, + &DummyFeeEstimator, + None::<&DummyFilter>, + &DummyOutputSpender, + Box::new(DummyChangeDestSource), + &PendingKVStore, + &DummyLogger, + ); + + // Inject a tracked output directly so the sweep loop has work to do. + let descriptor = SpendableOutputDescriptor::StaticOutput { + outpoint: OutPoint { txid: Txid::all_zeros(), index: 0 }, + output: TxOut { value: Amount::from_sat(100_000), script_pubkey: ScriptBuf::new() }, + channel_keys_id: None, + }; + sweeper.sweeper_state.lock().unwrap().outputs.push(TrackedSpendableOutput { + descriptor, + channel_id: None, + status: OutputSpendStatus::PendingInitialBroadcast { delayed_until_height: None }, + }); + + // Start a sweep, poll once (the persist step stays Pending because our KVStore's + // `write` future is `future::pending()`), then drop the future to mimic + // cancellation - the sort of thing a `tokio::time::timeout` wrapper produces. + { + let mut fut = pin!(sweeper.regenerate_and_broadcast_spend_if_necessary()); + let waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&waker); + assert!(matches!(fut.as_mut().poll(&mut ctx), Poll::Pending)); + } + + // Once the future has been dropped, `pending_sweep` must be cleared. The bug + // is that the flag is only ever cleared after the inner future returns, so a + // dropped future leaves it stuck `true` and every subsequent call to + // `regenerate_and_broadcast_spend_if_necessary` short-circuits with `Ok(())`, + // permanently disabling the sweeper. + assert!( + !sweeper.pending_sweep.load(Ordering::Acquire), + "pending_sweep flag was not reset when the future was dropped", + ); + } +}