Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth indexer: subscribe to log events instead of pulling blocks #129

Merged
merged 6 commits into from
Jan 24, 2025

Conversation

ppca
Copy link
Contributor

@ppca ppca commented Jan 22, 2025

This PR changes the way we get sign requests from ethereum chain. Previously, we pull for latest block every 5 seconds, if there is a request in the block, we process it. The problem with that is that there's always a delay between block becoming available and us pulling for it no matter how frequent we pull.
The approach in this PR is, we subscribe to the logs in the sign request via web socket, and those logs will be pushed to us when they become available. This will minimize delay in previous approach.
There is much less ethereum RPC calls to be made because we essentially have one web socket and that's it.

@ppca ppca force-pushed the xiangyi/subscribe_to_eth_logs branch from 0997638 to 53e2feb Compare January 23, 2025 01:30
@ppca ppca marked this pull request as ready for review January 23, 2025 05:39
@ppca ppca requested review from ailisp and ChaoticTempest January 23, 2025 05:39
eth_rpc_url: String,
#[arg(long, default_value = "0x5FbDB2315678afecb367f032d93F642f64180aa3")]
eth_rpc_http_url: String,
#[arg(long, default_value = "99bbA657f2BbC93c02D617f8bA121cB8Fc104Acf")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we revert this one? Current deploy script is expected to deterministically deploy local contract to 0x5FbDB2315678afecb367f032d93F642f64180aa3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the localhost network to automine and add a step to find address of contract on localhost network, and pass that contract address in when running the mpc nodes.

ailisp
ailisp previously approved these changes Jan 23, 2025
Copy link
Contributor

@ailisp ailisp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! A huge enhancement!

ChaoticTempest
ChaoticTempest previously approved these changes Jan 23, 2025
Copy link
Contributor

@ChaoticTempest ChaoticTempest left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM besides a couple nits

Comment on lines 273 to 277
tracing::info!("protocol thread spawned");
let web_handle = tokio::spawn(web::run(web_port, sender, state, indexer));
let eth_indexer_handle = tokio::spawn(async move {
indexer_eth::run(&indexer_eth_options, sign_tx, &account_id).await
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to have async move { ... } if you move all the items to the function or if the functions owns all the item:

Suggested change
tracing::info!("protocol thread spawned");
let web_handle = tokio::spawn(web::run(web_port, sender, state, indexer));
let eth_indexer_handle = tokio::spawn(async move {
indexer_eth::run(&indexer_eth_options, sign_tx, &account_id).await
});
tracing::info!("protocol thread spawned");
let web_handle = tokio::spawn(web::run(web_port, sender, state, indexer));
let eth_indexer_handle = tokio::spawn(
indexer_eth::run(indexer_eth_options, sign_tx, account_id)
);

Comment on lines +167 to +169
match web3::transports::WebSocket::new(&options.eth_rpc_ws_url).await {
Ok(ws) => {
let web3_ws = web3::Web3::new(ws);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this part need to be in the loop in every time? Can't we just create it the first time outside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah since this loop is trying to recreate the socket endlessly if it fails to be created

Ok(mut filtered_logs_sub) => {
tracing::info!("Ethereum indexer connected and listening for logs");

let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(30));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe every 1min would be better

@@ -121,7 +116,6 @@ async fn state(Extension(state): Extension<Arc<AxumState>>) -> Result<Json<State
tracing::debug!("fetching state");
// TODO: rename to last_processed_block when making other breaking changes
let latest_block_height = state.indexer.last_processed_block().await.unwrap_or(0);
let latest_eth_block_height = state.eth_indexer.last_processed_block().await.unwrap_or(0);
let is_stable = state.indexer.is_stable().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also now that we're publishing to multiple chains, the stable participants is gonna be funky. We need to refactor our code so that the stable check is checked per chain instead of only near chain. Issue created: #137

Comment on lines 179 to 198
match log {
Ok(log) => {
tracing::info!("Received new Ethereum sign request: {:?}", log);
crate::metrics::NUM_SIGN_REQUESTS_ETH
.with_label_values(&[node_near_account_id.as_str()])
.inc();
let sign_tx = sign_tx.clone();
if let Ok(sign_request) = sign_request_from_filtered_log(log) {
tokio::spawn(async move {
if let Err(err) = sign_tx.send(sign_request).await {
tracing::error!(?err, "Failed to send ETH sign request into queue");
}
});
}
}
Err(err) => {
tracing::warn!("Ethereum log subscription error: {:?}", err);
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reduces the need for scoping by a bit:

Suggested change
match log {
Ok(log) => {
tracing::info!("Received new Ethereum sign request: {:?}", log);
crate::metrics::NUM_SIGN_REQUESTS_ETH
.with_label_values(&[node_near_account_id.as_str()])
.inc();
let sign_tx = sign_tx.clone();
if let Ok(sign_request) = sign_request_from_filtered_log(log) {
tokio::spawn(async move {
if let Err(err) = sign_tx.send(sign_request).await {
tracing::error!(?err, "Failed to send ETH sign request into queue");
}
});
}
}
Err(err) => {
tracing::warn!("Ethereum log subscription error: {:?}", err);
break;
}
}
let Ok(log) = log.inspect_err(|err| {
tracing::warn!("Ethereum log subscription error: {:?}", err);
}) else {
break;
};
tracing::info!("Received new Ethereum sign request: {:?}", log);
crate::metrics::NUM_SIGN_REQUESTS_ETH
.with_label_values(&[node_near_account_id.as_str()])
.inc();
if let Ok(sign_request) = sign_request_from_filtered_log(log) {
let sign_tx = sign_tx.clone();
tokio::spawn(async move {
if let Err(err) = sign_tx.send(sign_request).await {
tracing::error!(?err, "Failed to send ETH sign request into queue");
}
});
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks much nicer!!

@ppca ppca dismissed stale reviews from ChaoticTempest and ailisp via fe1651b January 24, 2025 01:14
@ppca ppca merged commit 4e18971 into develop Jan 24, 2025
2 of 3 checks passed
@ppca ppca deleted the xiangyi/subscribe_to_eth_logs branch January 24, 2025 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

eth indexer: subscribe to blocks or logs instead of actively pulling each block every few second
3 participants