Skip to content

fix MempoolFilter returning pending transactions#7109

Open
Fraccaman wants to merge 4 commits into
ChainSafe:mainfrom
Fraccaman:fix-mempool-filters
Open

fix MempoolFilter returning pending transactions#7109
Fraccaman wants to merge 4 commits into
ChainSafe:mainfrom
Fraccaman:fix-mempool-filters

Conversation

@Fraccaman
Copy link
Copy Markdown
Contributor

@Fraccaman Fraccaman commented May 26, 2026

Summary of changes

MempoolFilter was returning hashes of already-mined transactions instead of pending ones. This PR fixes it by subscribing each mempool filter to the mempool's Add/Remove broadcast channel, so it reports only transactions still in the pool.

Changes introduced in this pull request:

  • MempoolFilterManager::install now create a Receiver on install
  • Added method drain to MempoolFilter to add/remove pending tx hashes
  • Call drain in EthGetFilterChanges::handle method
  • fix eth_new_pending_transaction_filter test + minor unit

Reference issue to close (if applicable)

Closes #7091

Other information and links

  • The implementation uses channels, no background tasks are spawned. I think having channels is easier and less error prone.
  • Claude Opus 4.7 was used to help navigate the codebase and implement the necessary changes.

Change checklist

  • I have performed a self-review of my own code,
  • I have made corresponding changes to the documentation. All new code adheres to the team's documentation standards,
  • I have added tests that prove my fix is effective or that my feature works (if possible),
  • I have made sure the CHANGELOG is up-to-date. All user-facing changes should be reflected in this document.

Outside contributions

  • I have read and agree to the CONTRIBUTING document.
  • I have read and agree to the AI Policy document. I understand that failure to comply with the guidelines will lead to rejection of the pull request.

Summary by CodeRabbit

  • New Features

    • Mempool filters now subscribe to live mempool events for real-time pending-transaction updates.
    • RPCs use a single, consistent transaction-hash derivation for reporting.
  • Bug Fixes

    • Pending-transaction filtering handles add/remove ordering, receiver lag, and result truncation more robustly.
  • Tests

    • Stateful tests now observe mempool presence directly, validating pending-filter behavior prior to mining.

@Fraccaman Fraccaman requested a review from a team as a code owner May 26, 2026 21:59
@Fraccaman Fraccaman requested review from akaladarshi and hanabi1224 and removed request for a team May 26, 2026 21:59
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 26, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: af9165f4-2628-46e6-a883-ad60a22f93ed

📥 Commits

Reviewing files that changed from the base of the PR and between e39aa6f and 5464537.

📒 Files selected for processing (8)
  • src/daemon/mod.rs
  • src/message_pool/msgpool/msg_pool.rs
  • src/message_pool/msgpool/pending_store.rs
  • src/rpc/methods/eth.rs
  • src/rpc/methods/eth/filter/mempool.rs
  • src/rpc/methods/eth/filter/mod.rs
  • src/tool/offline_server/server.rs
  • src/tool/subcommands/api_cmd/stateful_tests.rs
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/daemon/mod.rs
  • src/message_pool/msgpool/msg_pool.rs
  • src/message_pool/msgpool/pending_store.rs
  • src/rpc/methods/eth/filter/mod.rs
  • src/tool/offline_server/server.rs
  • src/rpc/methods/eth.rs

Walkthrough

Refactors mempool filtering to consume MpoolUpdate broadcasts: exposes the message-pool sender, gives each MempoolFilter its own broadcast receiver, wires the sender into EthEventHandler/MempoolFilterManager, updates RPCs to return pending mempool hashes, and adjusts tests to poll mempool presence.

Changes

Mempool-driven pending transaction filter

Layer / File(s) Summary
Mempool event broadcast exposure
src/message_pool/msgpool/pending_store.rs, src/message_pool/msgpool/msg_pool.rs
PendingStore::event_sender() and MessagePool::mpool_event_sender() expose a clone of the internal broadcast::Sender<MpoolUpdate>.
Mempool filter broadcast-driven refactor
src/rpc/methods/eth/filter/mempool.rs
MempoolFilter drops epoch-based collection and now owns a broadcast::Receiver<MpoolUpdate> (guarded by Mutex); drain() consumes Add/Remove events, maintains an insertion-ordered deduplicated set truncated to max_results. MempoolFilterManager subscribes to a shared sender for each installed filter; tests updated for broadcast semantics.
Event handler broadcast sender wiring
src/rpc/methods/eth/filter/mod.rs
EthEventHandler::from_config accepts an mpool_event_sender and forwards it to MempoolFilterManager; EthEventHandler::new() creates a dummy sender for standalone usage.
Service initialization and wiring
src/daemon/mod.rs, src/tool/offline_server/server.rs
Daemon and offline server obtain message_pool.mpool_event_sender() and pass it into EthEventHandler::from_config, embedding the handler into RPCState.
RPC integration and hash helpers
src/rpc/methods/eth.rs
Added eth_hash_from_signed_message; removed message-CID-to-hash helpers; EthGetTransactionHashByCid uses the new helper; EthGetFilterChanges returns mempool hashes by draining MempoolFilter::drain(chain_id) as EthFilterResult::Hashes.
Stateful test mempool-only waiting
src/tool/subcommands/api_cmd/stateful_tests.rs
Adds wait_in_mempool that polls MpoolPending for a CID and updates eth_newPendingTransactionFilter to use it before re-reading filter changes.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant RPC as Eth RPC
  participant FilterManager
  participant MempoolFilter
  participant MessagePool
  Client->>RPC: eth_getFilterChanges request
  RPC->>FilterManager: lookup & drain(filter_id, chain_id)
  FilterManager->>MempoolFilter: drain(chain_id)
  MempoolFilter->>MessagePool: consume MpoolUpdate via Receiver.try_recv()
  MempoolFilter->>RPC: return EthFilterResult::Hashes
  RPC->>Client: respond with hashes
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • ChainSafe/forest#6965: Introduced MpoolUpdate and broadcast-capable PendingStore that this PR wires into mempool filtering.
  • ChainSafe/forest#6361: Updated eth_newPendingTransactionFilter stateful test sequencing related to mempool pending transactions.

Suggested reviewers

  • hanabi1224
  • akaladarshi
  • LesnyRumcajs
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title accurately describes the main change: fixing MempoolFilter to properly return pending transactions instead of already-mined ones.
Linked Issues check ✅ Passed All coding requirements from issue #7091 are met: MempoolFilter now subscribes to mempool Add/Remove events, returns only pending transactions, ignores removed messages, and does not collect on-chain hashes.
Out of Scope Changes check ✅ Passed All changes are directly related to fixing MempoolFilter functionality: message pool event broadcasting, filter state management, RPC handler updates, and test corrections are all in scope.
Docstring Coverage ✅ Passed Docstring coverage is 94.12% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/rpc/methods/eth.rs (1)

1106-1120: ⚡ Quick win

Consolidate the signed-message hash derivation into one helper.

eth_tx_hash_from_signed_message below still reimplements the same delegated/secp/BLS branches, so this extraction can drift from the event/log path. Please make one helper delegate to the other and keep the hash rules in one place.

♻️ Minimal consolidation
 fn eth_tx_hash_from_signed_message(
     message: &SignedMessage,
     eth_chain_id: EthChainIdType,
 ) -> anyhow::Result<EthHash> {
-    if message.is_delegated() {
-        let (_, tx) = eth_tx_from_signed_eth_message(message, eth_chain_id)?;
-        Ok(tx.eth_hash()?.into())
-    } else if message.is_secp256k1() {
-        Ok(message.cid().into())
-    } else {
-        Ok(message.message().cid().into())
-    }
+    eth_hash_from_signed_message(message, eth_chain_id)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rpc/methods/eth.rs` around lines 1106 - 1120, The logic that derives an
Ethereum-style hash from a SignedMessage has been factored into
eth_hash_from_signed_message but eth_tx_hash_from_signed_message still
duplicates the delegated/secp256k1/BLS branching; consolidate by making
eth_tx_hash_from_signed_message call eth_hash_from_signed_message (or
vice‑versa) so the hash rules live in one place. Update
eth_tx_hash_from_signed_message to remove its own branching and delegate to
eth_hash_from_signed_message (passing the same SignedMessage and chain_id),
ensuring all callers use the single helper to avoid drift between paths.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/rpc/methods/eth/filter/mempool.rs`:
- Line 67: The current drain uses
pending.into_iter().take(self.max_results).collect() which returns an empty vec
when self.max_results == 0; change the logic so a zero max_results means “no
cap”: check self.max_results and if it is 0 collect the full pending.into_iter()
otherwise collect after take(self.max_results). Use the existing pending
iterator and self.max_results symbol to implement the conditional branch so the
returned collection contains all items when max_results == 0.

In `@src/tool/subcommands/api_cmd/stateful_tests.rs`:
- Around line 300-311: The mempool polling in wait_in_mempool has a very short
timeout (retries = 100 with 10ms sleeps); increase the retry budget or
per-iteration sleep to reduce flakiness in CI by changing the retries or
Duration::from_millis call in the wait_in_mempool function (e.g., bump retries
to a larger value like 1000 or increase the sleep to 50–100ms) so the loop has a
longer total wait before the ensure! triggers; update the retries initialization
and/or the tokio::time::sleep(Duration::from_millis(...)).

---

Nitpick comments:
In `@src/rpc/methods/eth.rs`:
- Around line 1106-1120: The logic that derives an Ethereum-style hash from a
SignedMessage has been factored into eth_hash_from_signed_message but
eth_tx_hash_from_signed_message still duplicates the delegated/secp256k1/BLS
branching; consolidate by making eth_tx_hash_from_signed_message call
eth_hash_from_signed_message (or vice‑versa) so the hash rules live in one
place. Update eth_tx_hash_from_signed_message to remove its own branching and
delegate to eth_hash_from_signed_message (passing the same SignedMessage and
chain_id), ensuring all callers use the single helper to avoid drift between
paths.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 15f882bf-4135-452b-b120-7f6d544dc8b8

📥 Commits

Reviewing files that changed from the base of the PR and between a3caf3e and 0f8af7d.

📒 Files selected for processing (8)
  • src/daemon/mod.rs
  • src/message_pool/msgpool/msg_pool.rs
  • src/message_pool/msgpool/pending_store.rs
  • src/rpc/methods/eth.rs
  • src/rpc/methods/eth/filter/mempool.rs
  • src/rpc/methods/eth/filter/mod.rs
  • src/tool/offline_server/server.rs
  • src/tool/subcommands/api_cmd/stateful_tests.rs

}
}
}
pending.into_iter().take(self.max_results).collect()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Handle max_results == 0 as “no cap” in drain output.

Line 67 currently truncates with take(0), which returns an empty vector when max_results is zero; this conflicts with the filter subsystem’s “0 disables cap” convention.

Proposed fix
-        pending.into_iter().take(self.max_results).collect()
+        if self.max_results == 0 {
+            return pending.into_iter().collect();
+        }
+        pending.into_iter().take(self.max_results).collect()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pending.into_iter().take(self.max_results).collect()
if self.max_results == 0 {
return pending.into_iter().collect();
}
pending.into_iter().take(self.max_results).collect()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/rpc/methods/eth/filter/mempool.rs` at line 67, The current drain uses
pending.into_iter().take(self.max_results).collect() which returns an empty vec
when self.max_results == 0; change the logic so a zero max_results means “no
cap”: check self.max_results and if it is 0 collect the full pending.into_iter()
otherwise collect after take(self.max_results). Use the existing pending
iterator and self.max_results symbol to implement the conditional branch so the
returned collection contains all items when max_results == 0.

Comment thread src/tool/subcommands/api_cmd/stateful_tests.rs
@Fraccaman Fraccaman force-pushed the fix-mempool-filters branch from 0f8af7d to 4f45897 Compare May 26, 2026 22:13
@Fraccaman Fraccaman changed the title refactor(eth-rpc): extract eth_hash_from_signed_message helper fix MempoolFilter returning pending transactions May 27, 2026
@akaladarshi akaladarshi added the RPC requires calibnet RPC checks to run on CI label May 27, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented May 27, 2026

Codecov Report

❌ Patch coverage is 88.28125% with 15 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.41%. Comparing base (39a6969) to head (5464537).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/rpc/methods/eth.rs 62.50% 3 Missing and 3 partials ⚠️
src/rpc/methods/eth/filter/mempool.rs 93.25% 3 Missing and 3 partials ⚠️
src/daemon/mod.rs 0.00% 3 Missing ⚠️
Additional details and impacted files
Files with missing lines Coverage Δ
src/message_pool/msgpool/msg_pool.rs 87.96% <100.00%> (+0.21%) ⬆️
src/message_pool/msgpool/pending_store.rs 97.12% <100.00%> (+0.04%) ⬆️
src/rpc/methods/eth/filter/mod.rs 89.19% <100.00%> (+0.05%) ⬆️
src/tool/offline_server/server.rs 28.24% <100.00%> (+1.01%) ⬆️
src/daemon/mod.rs 24.81% <0.00%> (-0.08%) ⬇️
src/rpc/methods/eth.rs 66.60% <62.50%> (+1.33%) ⬆️
src/rpc/methods/eth/filter/mempool.rs 93.75% <93.25%> (-2.17%) ⬇️

... and 19 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 39a6969...5464537. Read the comment docs.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@akaladarshi
Copy link
Copy Markdown
Collaborator

@Fraccaman CI linter seems to be failing please fix it, you can use mise run lint locally to verify as well.

Also all the commits needs to signed, so please do that as well.

@Fraccaman Fraccaman force-pushed the fix-mempool-filters branch 2 times, most recently from 53c3804 to e39aa6f Compare May 27, 2026 10:41
@Fraccaman
Copy link
Copy Markdown
Contributor Author

Fraccaman commented May 27, 2026

@akaladarshi I signed the commits and ran cargo fmt. I couldn't get mise to work:

mise run lint                                                     ok  at 13:49:43
mise config files in ~/forest are not trusted. Trust them? Yes
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: keydb_get_keyblock failed: Value not found
gpg: error writing keyring '[keyboxd]': SQL library used incorrectly
gpg: error reading '[stdin]': SQL library used incorrectly
gpg: import from '[stdin]' failed: SQL library used incorrectly
mise ERROR gpg failed
mise 2026.5.13 by @jdx                                                                                              [4/4]
go@1.26.3       go version go1.26.3 darwin/arm64                                                                        ✔
pnpm@11.3.0     extract pnpm-darwin-arm64.tar.gz                                                                        ✔
node@24.16.0    download node-v24.16.0-darwin-arm64.tar.gz                             119 B 5m18s [>                 ] ◠
cargo-binstall@1.17.9 extract cargo-binstall-aarch64-apple-darwin.zip                                                   ✔
mise ERROR Failed to install core:node@24: gpg exited with non-zero status: exit code 2
mise ERROR Run with --verbose or MISE_VERBOSE=1 for more information

@akaladarshi
Copy link
Copy Markdown
Collaborator

@akaladarshi I signed the commits and ran cargo fmt. I couldn't get mise to work:

mise run lint                                                     ok  at 13:49:43
mise config files in ~/forest are not trusted. Trust them? Yes
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: Note: database_open 134217901 waiting for lock (held by 32019) ...
gpg: keydb_get_keyblock failed: Value not found
gpg: error writing keyring '[keyboxd]': SQL library used incorrectly
gpg: error reading '[stdin]': SQL library used incorrectly
gpg: import from '[stdin]' failed: SQL library used incorrectly
mise ERROR gpg failed
mise 2026.5.13 by @jdx                                                                                              [4/4]
go@1.26.3       go version go1.26.3 darwin/arm64                                                                        ✔
pnpm@11.3.0     extract pnpm-darwin-arm64.tar.gz                                                                        ✔
node@24.16.0    download node-v24.16.0-darwin-arm64.tar.gz                             119 B 5m18s [>                 ] ◠
cargo-binstall@1.17.9 extract cargo-binstall-aarch64-apple-darwin.zip                                                   ✔
mise ERROR Failed to install core:node@24: gpg exited with non-zero status: exit code 2
mise ERROR Run with --verbose or MISE_VERBOSE=1 for more information

Hey @Fraccaman, Sorry for delayed response.
There was another PR related to PendingTransactions that was being worked on and now that is merged can you please rebase those changes and resolve the conflicts. So we can have proper review.
Thanks.

Also you need to first run the mise run install-lint-tools to download all the required tools.

@Fraccaman Fraccaman force-pushed the fix-mempool-filters branch from e39aa6f to 5464537 Compare June 2, 2026 19:48
@Fraccaman
Copy link
Copy Markdown
Contributor Author

Fraccaman commented Jun 2, 2026

I rebased but mise run install-lint-tools is failing for me.

Copy link
Copy Markdown
Collaborator

@akaladarshi akaladarshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to add tests for the EthGetFilterChanges where we call it multiple times and see if each call returns the new pending transaction since the last poll.


/// Clone of the [`MpoolUpdate`] broadcast sender. New independent
/// receivers can be derived by calling `subscribe()` on the clone.
pub(in crate::message_pool) fn event_sender(&self) -> broadcast::Sender<MpoolUpdate> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We made the events private for a specific reason, events should only be sent by the message pool module and every other module will read the data through subscribe method receiver.
This breaks the encapsulation.


/// Clone of the [`MpoolUpdate`] sender. Each call to `subscribe()` on
/// the returned sender yields an independent receiver.
pub fn mpool_event_sender(&self) -> broadcast::Sender<MpoolUpdate> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here this breaks the encapsulation, event sender is send only should not shared.

Comment thread src/rpc/methods/eth.rs
}

/// Derive the Ethereum-style hash for a `SignedMessage`.
pub fn eth_hash_from_signed_message(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have similar fn eth_tx_hash_from_signed_message, please reuse it.

Comment on lines +56 to +58
Ok(MpoolUpdate::Remove(m)) => {
if let Some(h) = hash_or_log(&m, chain_id) {
pending.shift_remove(&h);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the actual purpose doing this ?

@Fraccaman
Copy link
Copy Markdown
Contributor Author

hey @akaladarshi im on vacation, ill fix this as soon as im back

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

RPC requires calibnet RPC checks to run on CI

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MempoolFilter should collect pending transaction data

2 participants