diff --git a/CHANGELOG.md b/CHANGELOG.md index 2aba2028563..2d361feea3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Experimental self-healing-open protocol for automatically transitioning-to-open during a disaster recovery without operator intervention. (#7189) +- Added support for endpoints which only respond once their content is committed by the consensus protocol (#7562). ### Changed diff --git a/doc/schemas/app_openapi.json b/doc/schemas/app_openapi.json index 5c231a6529e..eb5d65322f3 100644 --- a/doc/schemas/app_openapi.json +++ b/doc/schemas/app_openapi.json @@ -257,7 +257,7 @@ "info": { "description": "This CCF sample app implements a simple logging application, securely recording messages at client-specified IDs. It demonstrates most of the features available to CCF apps.", "title": "CCF Sample Logging App", - "version": "2.8.0" + "version": "2.8.1" }, "openapi": "3.0.0", "paths": { @@ -375,6 +375,86 @@ } } }, + "/app/log/blocking/private": { + "get": { + "operationId": "GetAppLogBlockingPrivate", + "parameters": [ + { + "in": "query", + "name": "id", + "required": true, + "schema": { + "$ref": "#/components/schemas/uint64" + } + } + ], + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/LoggingGet__Out" + } + } + }, + "description": "Default response description" + }, + "default": { + "$ref": "#/components/responses/default" + } + }, + "security": [ + { + "jwt": [] + }, + { + "user_cose_sign1": [] + } + ], + "x-ccf-forwarding": { + "$ref": "#/components/x-ccf-forwarding/sometimes" + } + }, + "post": { + "operationId": "PostAppLogBlockingPrivate", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/LoggingRecord__In" + } + } + }, + "description": "Auto-generated request body schema" + }, + "responses": { + "200": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/boolean" + } + } + }, + "description": "Default response description" + }, + "default": { + "$ref": "#/components/responses/default" + } + }, + "security": [ + { + "jwt": [] + }, + { + "user_cose_sign1": [] + } + ], + "x-ccf-forwarding": { + "$ref": "#/components/x-ccf-forwarding/always" + } + } + }, "/app/log/cose_signed_content": { "post": { "operationId": "PostAppLogCoseSignedContent", diff --git a/include/ccf/endpoint.h b/include/ccf/endpoint.h index 601eec8232a..96670187bff 100644 --- a/include/ccf/endpoint.h +++ b/include/ccf/endpoint.h @@ -259,9 +259,16 @@ namespace ccf::endpoints { // Functor which is invoked to process requests for this Endpoint EndpointFunction func; - // Functor which is invoked to modify the response post commit. + + // Functor which is invoked to modify the response after it is locally + // committed (ie - assigned a transaction ID) LocallyCommittedEndpointFunction locally_committed_func; + // Functor which is invoked to modify the response after it reaches a + // terminal consensus state (ie - it is either globally committed, or + // invalidated) + ConsensusCommittedEndpointFunction consensus_committed_func; + struct Installer { virtual void install(Endpoint&) = 0; @@ -488,6 +495,9 @@ namespace ccf::endpoints Endpoint& set_locally_committed_function( const LocallyCommittedEndpointFunction& lcf); + Endpoint& set_consensus_committed_function( + const ConsensusCommittedEndpointFunction& ccf_); + void install(); }; diff --git a/include/ccf/endpoint_context.h b/include/ccf/endpoint_context.h index eca44c2299c..160a7dbe50b 100644 --- a/include/ccf/endpoint_context.h +++ b/include/ccf/endpoint_context.h @@ -3,6 +3,7 @@ #pragma once #include "ccf/endpoints/authentication/authentication_types.h" +#include "ccf/tx_status.h" #include #include @@ -65,6 +66,11 @@ namespace ccf::endpoints using LocallyCommittedEndpointFunction = std::function; + using ConsensusCommittedEndpointFunction = std::function rpc_ctx, + const ccf::TxID& txid, + ccf::FinalTxStatus status)>; + // Read-only endpoints can only get values from the kv, they cannot write struct ReadOnlyEndpointContext : public CommandEndpointContext { diff --git a/include/ccf/endpoint_registry.h b/include/ccf/endpoint_registry.h index b69cc54fe94..cee57da6b8b 100644 --- a/include/ccf/endpoint_registry.h +++ b/include/ccf/endpoint_registry.h @@ -60,6 +60,11 @@ namespace ccf::endpoints void default_locally_committed_func( CommandEndpointContext& ctx, const TxID& tx_id); + void default_respond_on_commit_func( + std::shared_ptr rpc_ctx, + const TxID& tx_id, + ccf::FinalTxStatus status); + template inline bool get_path_param( const ccf::PathParams& params, diff --git a/include/ccf/tx_status.h b/include/ccf/tx_status.h index 6dceea3feac..a2690f40ac1 100644 --- a/include/ccf/tx_status.h +++ b/include/ccf/tx_status.h @@ -31,6 +31,13 @@ namespace ccf Invalid, }; + // Contains only the terminal values of TxStatus + enum class FinalTxStatus : uint8_t + { + Committed = static_cast(TxStatus::Committed), + Invalid = static_cast(TxStatus::Invalid), + }; + constexpr char const* tx_status_to_str(TxStatus status) { switch (status) diff --git a/samples/apps/logging/logging.cpp b/samples/apps/logging/logging.cpp index 28b9fbbee31..3c49b907880 100644 --- a/samples/apps/logging/logging.cpp +++ b/samples/apps/logging/logging.cpp @@ -468,7 +468,7 @@ namespace loggingapp "recording messages at client-specified IDs. It demonstrates most of " "the features available to CCF apps."; - openapi_info.document_version = "2.8.0"; + openapi_info.document_version = "2.8.1"; }; void init_handlers() override @@ -517,6 +517,16 @@ namespace loggingapp .install(); // SNIPPET_END: install_record + make_endpoint( + "/log/blocking/private", + HTTP_POST, + ccf::json_adapter(record), + auth_policies) + .set_auto_schema() + .set_consensus_committed_function( + ccf::endpoints::default_respond_on_commit_func) + .install(); + auto add_txid_in_body_put = [](auto& ctx, const auto& tx_id) { static constexpr auto CCF_TX_ID = "x-ms-ccf-transaction-id"; ctx.rpc_ctx->set_response_header(CCF_TX_ID, tx_id.to_str()); @@ -623,6 +633,17 @@ namespace loggingapp .install(); // SNIPPET_END: install_get + make_read_only_endpoint( + "/log/blocking/private", + HTTP_GET, + ccf::json_read_only_adapter(get), + auth_policies) + .set_auto_schema() + .add_query_parameter("id") + .set_consensus_committed_function( + ccf::endpoints::default_respond_on_commit_func) + .install(); + make_read_only_endpoint( "/log/private/backup", HTTP_GET, diff --git a/src/consensus/aft/impl/state.h b/src/consensus/aft/impl/state.h index 6317e8ce39d..630c2cc8c48 100644 --- a/src/consensus/aft/impl/state.h +++ b/src/consensus/aft/impl/state.h @@ -155,7 +155,7 @@ namespace aft {} State() = default; - ccf::pal::Mutex lock; + std::recursive_mutex lock; ccf::NodeId node_id; ccf::View current_view = 0; diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index dd9a11f978f..8ca94b30e34 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -12,6 +12,7 @@ #include "ds/serialized.h" #include "impl/state.h" #include "kv/kv_types.h" +#include "node/commit_callback_subsystem.h" #include "node/node_client.h" #include "node/node_to_node.h" #include "node/node_types.h" @@ -183,6 +184,8 @@ namespace aft // Used to remove retired nodes from store std::unique_ptr retired_node_cleanup; + std::shared_ptr commit_callbacks; + size_t entry_size_not_limited = 0; size_t entry_count = 0; Index entries_batch_size = 20; @@ -214,6 +217,8 @@ namespace aft std::shared_ptr channels_, std::shared_ptr state_, std::shared_ptr rpc_request_context_, + std::shared_ptr + commit_callbacks_subsystem_ = nullptr, bool public_only_ = false) : store(std::move(store_)), @@ -228,6 +233,7 @@ namespace aft node_client(std::move(rpc_request_context_)), retired_node_cleanup( std::make_unique(node_client)), + commit_callbacks(std::move(commit_callbacks_subsystem_)), public_only(public_only_), @@ -236,7 +242,12 @@ namespace aft ledger(std::move(ledger_)), channels(std::move(channels_)) - {} + { + if (commit_callbacks != nullptr) + { + commit_callbacks->set_consensus(this); + } + } ~Aft() override = default; @@ -262,7 +273,7 @@ namespace aft bool can_replicate() override { - std::unique_lock guard(state->lock); + std::unique_lock guard(state->lock); return can_replicate_unsafe(); } @@ -277,14 +288,14 @@ namespace aft { return false; } - std::unique_lock guard(state->lock); + std::unique_lock guard(state->lock); return state->leadership_state == ccf::kv::LeadershipState::Leader && (state->last_idx - state->commit_idx >= max_uncommitted_tx_count); } Consensus::SignatureDisposition get_signature_disposition() override { - std::unique_lock guard(state->lock); + std::unique_lock guard(state->lock); if (can_sign_unsafe()) { if (should_sign) @@ -389,7 +400,7 @@ namespace aft { // When receiving append entries as a follower, all security domains will // be deserialised - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); public_only = false; } @@ -403,7 +414,7 @@ namespace aft "Can't force leadership if there is already a leader"); } - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); state->current_view += starting_view_change; become_leader(true); } @@ -422,7 +433,7 @@ namespace aft "Can't force leadership if there is already a leader"); } - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); state->current_view = term; state->last_idx = index; state->commit_idx = commit_idx_; @@ -440,7 +451,7 @@ namespace aft { // This should only be called when the node resumes from a snapshot and // before it has received any append entries. - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); state->last_idx = index; state->commit_idx = index; @@ -459,26 +470,26 @@ namespace aft Index get_committed_seqno() override { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); return get_commit_idx_unsafe(); } Term get_view() override { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); return state->current_view; } std::pair get_committed_txid() override { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); ccf::SeqNo commit_idx = get_commit_idx_unsafe(); return {get_term_internal(commit_idx), commit_idx}; } Term get_view(Index idx) override { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); return get_term_internal(idx); } @@ -584,14 +595,14 @@ namespace aft Configuration::Nodes get_latest_configuration() override { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); return get_latest_configuration_unsafe(); } ccf::kv::ConsensusDetails get_details() override { ccf::kv::ConsensusDetails details; - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); details.primary_id = leader_id; details.current_view = state->current_view; details.ticking = ticking; @@ -616,7 +627,7 @@ namespace aft bool replicate(const ccf::kv::BatchVector& entries, Term term) override { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); if (state->leadership_state != ccf::kv::LeadershipState::Leader) { @@ -825,7 +836,7 @@ namespace aft void periodic(std::chrono::milliseconds elapsed) override { - std::unique_lock guard(state->lock); + std::unique_lock guard(state->lock); timeout_elapsed += elapsed; if (state->leadership_state == ccf::kv::LeadershipState::Leader) @@ -1098,7 +1109,7 @@ namespace aft const uint8_t* data, size_t size) { - std::unique_lock guard(state->lock); + std::unique_lock guard(state->lock); RAFT_DEBUG_FMT( "Recv {} to {} from {}: {}.{} to {}.{} in term {}", @@ -1553,7 +1564,7 @@ namespace aft void recv_append_entries_response( const ccf::NodeId& from, AppendEntriesResponse r) { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); auto node = all_other_nodes.find(from); if (node == all_other_nodes.end()) @@ -1832,7 +1843,7 @@ namespace aft void recv_request_vote(const ccf::NodeId& from, RequestVote r) { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); #ifdef CCF_RAFT_TRACING nlohmann::json j = {}; @@ -1849,7 +1860,7 @@ namespace aft void recv_request_pre_vote(const ccf::NodeId& from, RequestPreVote r) { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); #ifdef CCF_RAFT_TRACING nlohmann::json j = {}; @@ -1912,7 +1923,7 @@ namespace aft RequestVoteResponse r, ElectionType election_type) { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); #ifdef CCF_RAFT_TRACING nlohmann::json j = {}; @@ -2038,7 +2049,7 @@ namespace aft void recv_propose_request_vote( const ccf::NodeId& from, ProposeRequestVote r) { - std::lock_guard guard(state->lock); + std::lock_guard guard(state->lock); #ifdef CCF_RAFT_TRACING nlohmann::json j = {}; @@ -2598,6 +2609,12 @@ namespace aft store->compact(idx); ledger->commit(idx); + if (commit_callbacks != nullptr) + { + const auto term = get_term_internal(idx); + commit_callbacks->trigger_callbacks({term, idx}); + } + RAFT_DEBUG_FMT("Commit on {}: {}", state->node_id, idx); // Examine each configuration that is followed by a globally committed diff --git a/src/enclave/enclave.h b/src/enclave/enclave.h index a50c029a7c5..ddd607cbb45 100644 --- a/src/enclave/enclave.h +++ b/src/enclave/enclave.h @@ -15,6 +15,7 @@ #include "interface.h" #include "js/interpreter_cache.h" #include "kv/ledger_chunker.h" +#include "node/commit_callback_subsystem.h" #include "node/historical_queries.h" #include "node/network_state.h" #include "node/node_state.h" @@ -143,6 +144,10 @@ namespace ccf context->install_subsystem( std::make_shared(*node)); + auto commit_callbacks = std::make_shared(); + context->install_subsystem(commit_callbacks); + rpcsessions->set_commit_callbacks_subsystem(commit_callbacks); + LOG_TRACE_FMT("Creating RPC actors / ffi"); rpc_map->register_frontend( std::make_unique(network, *context)); @@ -160,6 +165,7 @@ namespace ccf rpc_map, rpcsessions, indexer, + commit_callbacks, sig_tx_interval, sig_ms_interval); } diff --git a/src/enclave/rpc_sessions.h b/src/enclave/rpc_sessions.h index 7b2458e580e..fc694eeeb56 100644 --- a/src/enclave/rpc_sessions.h +++ b/src/enclave/rpc_sessions.h @@ -60,7 +60,10 @@ namespace ccf ringbuffer::WriterPtr to_host = nullptr; std::shared_ptr rpc_map; std::unordered_map> certs; - std::shared_ptr custom_protocol_subsystem; + std::shared_ptr custom_protocol_subsystem = + nullptr; + std::shared_ptr commit_callbacks_subsystem = + nullptr; ccf::pal::Mutex lock; std::unordered_map< @@ -168,7 +171,8 @@ namespace ccf writer_factory, std::move(ctx), parser_configuration, - shared_from_this()); + shared_from_this(), + commit_callbacks_subsystem); } if (custom_protocol_subsystem) { @@ -186,8 +190,7 @@ namespace ccf ringbuffer::AbstractWriterFactory& writer_factory, std::shared_ptr rpc_map_) : writer_factory(writer_factory), - rpc_map(std::move(rpc_map_)), - custom_protocol_subsystem(nullptr) + rpc_map(std::move(rpc_map_)) { to_host = writer_factory.create_writer_to_outside(); } @@ -198,6 +201,12 @@ namespace ccf custom_protocol_subsystem = cpss; } + void set_commit_callbacks_subsystem( + std::shared_ptr fcss) + { + commit_callbacks_subsystem = fcss; + } + void report_parsing_error(const ccf::ListenInterfaceID& id) override { std::lock_guard guard(lock); @@ -410,7 +419,8 @@ namespace ccf writer_factory, std::move(ctx), per_listen_interface.http_configuration, - shared_from_this()); + shared_from_this(), + commit_callbacks_subsystem); } sessions.insert(std::make_pair( id, std::make_pair(listen_interface_id, std::move(capped_session)))); diff --git a/src/endpoints/base_endpoint_registry.cpp b/src/endpoints/base_endpoint_registry.cpp index dbdf12c5471..aa488342b36 100644 --- a/src/endpoints/base_endpoint_registry.cpp +++ b/src/endpoints/base_endpoint_registry.cpp @@ -69,12 +69,7 @@ namespace ccf { if (consensus != nullptr) { - const auto tx_view = consensus->get_view(seqno); - const auto committed_seqno = consensus->get_committed_seqno(); - const auto committed_view = consensus->get_view(committed_seqno); - - tx_status = ccf::evaluate_tx_status( - view, seqno, tx_view, committed_view, committed_seqno); + tx_status = consensus->evaluate_tx_status(view, seqno); } else { diff --git a/src/endpoints/endpoint.cpp b/src/endpoints/endpoint.cpp index 4556db77955..6cd2287b305 100644 --- a/src/endpoints/endpoint.cpp +++ b/src/endpoints/endpoint.cpp @@ -103,6 +103,13 @@ namespace ccf::endpoints return *this; } + Endpoint& Endpoint::set_consensus_committed_function( + const ConsensusCommittedEndpointFunction& ccf_) + { + consensus_committed_func = ccf_; + return *this; + } + Endpoint& Endpoint::set_openapi_description(const std::string& description) { openapi_description = description; diff --git a/src/endpoints/endpoint_registry.cpp b/src/endpoints/endpoint_registry.cpp index 0affb3346cd..daa5f94c25d 100644 --- a/src/endpoints/endpoint_registry.cpp +++ b/src/endpoints/endpoint_registry.cpp @@ -201,6 +201,24 @@ namespace ccf::endpoints ctx.rpc_ctx->set_response_header(http::headers::CCF_TX_ID, tx_id.to_str()); } + void default_respond_on_commit_func( + std::shared_ptr rpc_ctx, + const TxID& tx_id, + ccf::FinalTxStatus status) + { + if (status == ccf::FinalTxStatus::Invalid) + { + rpc_ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::TransactionInvalid, + fmt::format( + "While waiting for TxID {} to commit, it was invalidated", + tx_id.to_str())); + } + + // Else leave the original response untouched, and return it now + } + Endpoint EndpointRegistry::make_endpoint( const std::string& method, RESTVerb verb, diff --git a/src/http/http_session.h b/src/http/http_session.h index 847ede2d4ac..0a42379d922 100644 --- a/src/http/http_session.h +++ b/src/http/http_session.h @@ -26,6 +26,7 @@ namespace http std::shared_ptr handler; std::shared_ptr session_ctx; std::shared_ptr error_reporter; + std::shared_ptr commit_callbacks; ccf::ListenInterfaceID interface_id; public: @@ -36,11 +37,13 @@ namespace http ringbuffer::AbstractWriterFactory& writer_factory, std::unique_ptr ctx, const ccf::http::ParserConfiguration& configuration, - const std::shared_ptr& error_reporter_ = nullptr) : + const std::shared_ptr& error_reporter_, + const std::shared_ptr& commit_callbacks_) : HTTPSession(session_id_, writer_factory, std::move(ctx)), request_parser(*this, configuration), rpc_map(std::move(rpc_map_)), error_reporter(error_reporter_), + commit_callbacks(commit_callbacks_), interface_id(std::move(interface_id_)) {} @@ -158,6 +161,7 @@ namespace http ccf::errors::InternalError, fmt::format("Error constructing RpcContext: {}", e.what())}); close_session(); + return; } std::shared_ptr search = @@ -172,15 +176,56 @@ namespace http return; } - send_response( - rpc_ctx->get_response_http_status(), - rpc_ctx->get_response_headers(), - rpc_ctx->get_response_trailers(), - std::move(rpc_ctx->take_response_body())); - - if (rpc_ctx->terminate_session) + const auto& respond_on_commit = rpc_ctx->respond_on_commit; + if (respond_on_commit.has_value()) { - close_session(); + auto [tx_id, committed_func] = respond_on_commit.value(); + + // Block any future work from happening on this session, to + // maintain session consistency + ccf::tasks::Resumable paused_task = ccf::tasks::pause_current_task(); + + // shared_from_this returns a base session type + std::shared_ptr self = shared_from_this(); + + // Register for a callback when this TxID is committed (or + // invalidated) + commit_callbacks->add_callback( + tx_id, + [self, rpc_ctx, paused_task, committed_func]( + ccf::TxID transaction_id, ccf::FinalTxStatus status) { + // Let the handler modify the response + committed_func(rpc_ctx, transaction_id, status); + + // Write the response + send_response_impl( + *self, + rpc_ctx->get_response_http_status(), + rpc_ctx->get_response_headers(), + rpc_ctx->get_response_trailers(), + std::move(rpc_ctx->take_response_body())); + + if (rpc_ctx->terminate_session) + { + self->close_session(); + } + + // Resume processing work for this session + ccf::tasks::resume_task(paused_task); + }); + } + else + { + send_response( + rpc_ctx->get_response_http_status(), + rpc_ctx->get_response_headers(), + rpc_ctx->get_response_trailers(), + std::move(rpc_ctx->take_response_body())); + + if (rpc_ctx->terminate_session) + { + close_session(); + } } } catch (const std::exception& e) @@ -198,11 +243,12 @@ namespace http } } - bool send_response( + static bool send_response_impl( + ccf::ThreadedSession& session, ccf::http_status status_code, ccf::http::HeaderMap&& headers, ccf::http::HeaderMap&& trailers, - std::vector&& body) override + std::vector&& body) { if (!trailers.empty()) { @@ -221,9 +267,23 @@ namespace http false /* Don't overwrite any existing content-length header */ ); - send_data(response.build_response()); + session.send_data(response.build_response()); return true; } + + bool send_response( + ccf::http_status status_code, + ccf::http::HeaderMap&& headers, + ccf::http::HeaderMap&& trailers, + std::vector&& body) override + { + return send_response_impl( + *this, + status_code, + std::move(headers), + std::move(trailers), + std::move(body)); + } }; class HTTPClientSession : public HTTPSession, diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index 8134448fb7f..6897742a40e 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -14,6 +14,7 @@ #include "ccf/service/consensus_type.h" #include "ccf/service/reconfiguration_type.h" #include "ccf/tx_id.h" +#include "ccf/tx_status.h" #include "crypto/openssl/ec_key_pair.h" #include "kv/ledger_chunker_interface.h" #include "serialiser_declare.h" @@ -411,6 +412,16 @@ namespace ccf::kv {} virtual void nominate_successor() {}; + + ccf::TxStatus evaluate_tx_status( + ccf::View target_view, ccf::SeqNo target_seqno) + { + const auto local_view = get_view(target_seqno); + const auto [committed_view, committed_seqno] = get_committed_txid(); + + return ccf::evaluate_tx_status( + target_view, target_seqno, local_view, committed_view, committed_seqno); + } }; struct PendingTxInfo diff --git a/src/node/commit_callback_interface.h b/src/node/commit_callback_interface.h new file mode 100644 index 00000000000..0d680839687 --- /dev/null +++ b/src/node/commit_callback_interface.h @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/node_subsystem_interface.h" +#include "ccf/tx_id.h" +#include "ccf/tx_status.h" + +#include + +namespace ccf +{ + using CommitCallback = std::function; + + class CommitCallbackInterface : public AbstractNodeSubSystem + { + public: + ~CommitCallbackInterface() override = default; + + static char const* get_subsystem_name() + { + return "CommitCallback"; + } + + virtual void add_callback(ccf::TxID tx_id, CommitCallback&& callback) = 0; + }; +} diff --git a/src/node/commit_callback_subsystem.h b/src/node/commit_callback_subsystem.h new file mode 100644 index 00000000000..e41594df74d --- /dev/null +++ b/src/node/commit_callback_subsystem.h @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "kv/kv_types.h" +#include "node/commit_callback_interface.h" + +#include +#include + +namespace ccf +{ + class CommitCallbackSubsystem : public CommitCallbackInterface + { + private: + using Callbacks = std::vector>; + std::map pending_callbacks; + + std::optional known_commit = std::nullopt; + + // Use a recursive mutex so that `add_callback` may safely be called while a + // callback is executing (and the mutex is locked) + std::recursive_mutex callbacks_mutex; + + ccf::kv::Consensus* consensus = nullptr; + + public: + CommitCallbackSubsystem() = default; + + void set_consensus(ccf::kv::Consensus* c) + { + consensus = c; + } + + void add_callback(ccf::TxID tx_id, CommitCallback&& callback) override + { + std::lock_guard guard(callbacks_mutex); + + if (known_commit.has_value() && consensus != nullptr) + { + const auto status = + consensus->evaluate_tx_status(tx_id.view, tx_id.seqno); + + if (status == TxStatus::Committed || status == TxStatus::Invalid) + { + // TxID is already known to be in a terminal state - execute callback + // immediately + const auto final_status = static_cast(status); + callback(tx_id, final_status); + return; + } + } + + pending_callbacks[tx_id.seqno].emplace_back( + std::make_pair(tx_id, std::move(callback))); + } + + void trigger_callbacks(ccf::TxID committed) + { + if (consensus == nullptr) + { + throw std::logic_error( + "trigger_callbacks() called before set_consensus()"); + } + + std::lock_guard guard(callbacks_mutex); + + known_commit = committed; + + auto it = pending_callbacks.begin(); + while (it != pending_callbacks.end()) + { + auto [seqno, callbacks] = *it; + if (seqno > committed.seqno) + { + break; + } + + // Have committed to this seqno - terminal status for this transaction + // should now be known + for (auto& [tx_id, callback] : callbacks) + { + const auto status = + consensus->evaluate_tx_status(tx_id.view, tx_id.seqno); + + if (status != TxStatus::Committed && status != TxStatus::Invalid) + { + throw std::logic_error(fmt::format( + "Expected transaction {} evaluated against commit point {} to " + "return terminal TxStatus, instead returned {}", + tx_id.to_str(), + committed.to_str(), + nlohmann::json(status).dump())); + } + + const auto final_status = static_cast(status); + callback(tx_id, final_status); + } + + it = pending_callbacks.erase(it); + } + } + }; +} diff --git a/src/node/historical_queries_adapter.cpp b/src/node/historical_queries_adapter.cpp index 9dbca3a4538..96d4c993623 100644 --- a/src/node/historical_queries_adapter.cpp +++ b/src/node/historical_queries_adapter.cpp @@ -373,12 +373,8 @@ namespace ccf::historical return HistoricalTxStatus::Error; } - const auto tx_view = consensus->get_view(seqno); - const auto committed_seqno = consensus->get_committed_seqno(); - const auto committed_view = consensus->get_view(committed_seqno); + const auto tx_status = consensus->evaluate_tx_status(view, seqno); - const auto tx_status = ccf::evaluate_tx_status( - view, seqno, tx_view, committed_view, committed_seqno); switch (tx_status) { case ccf::TxStatus::Unknown: diff --git a/src/node/node_state.h b/src/node/node_state.h index 8cd04a285c0..559f8032512 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -33,6 +33,7 @@ #include "indexing/indexer.h" #include "js/global_class_ids.h" #include "network_state.h" +#include "node/commit_callback_subsystem.h" #include "node/hooks.h" #include "node/http_node_client.h" #include "node/jwt_key_auto_refresh.h" @@ -132,6 +133,7 @@ namespace ccf std::shared_ptr indexer; std::shared_ptr n2n_channels; std::shared_ptr> cmd_forwarder; + std::shared_ptr commit_callbacks = nullptr; std::shared_ptr rpcsessions; std::shared_ptr history; @@ -253,6 +255,7 @@ namespace ccf std::shared_ptr rpc_map_, std::shared_ptr rpc_sessions_, std::shared_ptr indexer_, + std::shared_ptr commit_callbacks_, size_t sig_tx_interval_, size_t sig_ms_interval_) { @@ -261,7 +264,10 @@ namespace ccf consensus_config = consensus_config_; rpc_map = rpc_map_; + indexer = indexer_; + commit_callbacks = commit_callbacks_; + sig_tx_interval = sig_tx_interval_; sig_ms_interval = sig_ms_interval_; @@ -2551,6 +2557,7 @@ namespace ccf n2n_channels, shared_state, node_client, + commit_callbacks, public_only); network.tables->set_consensus(consensus); diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index 9b41847d3aa..24474f81560 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -16,6 +16,7 @@ #include "enclave/rpc_handler.h" #include "forwarder.h" #include "http/http_jwt.h" +#include "http/http_rpc_context.h" #include "kv/compacted_version_conflict.h" #include "kv/store.h" #include "node/endpoint_context_impl.h" @@ -818,9 +819,11 @@ namespace ccf { case ccf::kv::CommitResult::SUCCESS: { - auto tx_id = tx.get_txid(); - if (tx_id.has_value() && consensus != nullptr) + auto tx_id_opt = tx.get_txid(); + if (tx_id_opt.has_value() && consensus != nullptr) { + ccf::TxID tx_id = tx_id_opt.value(); + try { // Only transactions that acquired one or more map handles @@ -828,14 +831,13 @@ namespace ccf // don't. Also, only report a TxID if the consensus is set, as // the consensus is required to verify that a TxID is valid. endpoints.execute_endpoint_locally_committed( - endpoint, args, tx_id.value()); + endpoint, args, tx_id); } catch (const std::exception& e) { // run default handler to set transaction id in header ctx->clear_response_headers(); - ccf::endpoints::default_locally_committed_func( - args, tx_id.value()); + ccf::endpoints::default_locally_committed_func(args, tx_id); ctx->set_error( HTTP_STATUS_INTERNAL_SERVER_ERROR, ccf::errors::InternalError, @@ -847,13 +849,24 @@ namespace ccf { // run default handler to set transaction id in header ctx->clear_response_headers(); - ccf::endpoints::default_locally_committed_func( - args, tx_id.value()); + ccf::endpoints::default_locally_committed_func(args, tx_id); ctx->set_error( HTTP_STATUS_INTERNAL_SERVER_ERROR, ccf::errors::InternalError, "Failed to execute local commit handler func"); } + + { + const auto* concrete_endpoint = + dynamic_cast(endpoint.get()); + if ( + concrete_endpoint != nullptr && + concrete_endpoint->consensus_committed_func != nullptr) + { + ctx->respond_on_commit = std::make_pair( + tx_id, concrete_endpoint->consensus_committed_func); + } + } } if ( diff --git a/src/node/rpc_context_impl.h b/src/node/rpc_context_impl.h index f9734aff3de..18f6b070b9d 100644 --- a/src/node/rpc_context_impl.h +++ b/src/node/rpc_context_impl.h @@ -3,6 +3,7 @@ #pragma once #include "ccf/claims_digest.h" +#include "ccf/endpoint_context.h" #include "ccf/rpc_context.h" namespace ccf @@ -109,6 +110,10 @@ namespace ccf bool response_is_pending = false; bool terminate_session = false; + std::optional< + std::pair> + respond_on_commit = std::nullopt; + [[nodiscard]] virtual bool should_apply_writes() const = 0; virtual void reset_response() = 0; [[nodiscard]] virtual std::vector serialise_response() const = 0; diff --git a/src/tasks/resumable.h b/src/tasks/resumable.h index 071c598124b..1427b2d4a34 100644 --- a/src/tasks/resumable.h +++ b/src/tasks/resumable.h @@ -7,7 +7,9 @@ namespace ccf::tasks { struct IResumable; - void resume_task(std::unique_ptr&& resumable); + using Resumable = std::shared_ptr; + + void resume_task(Resumable resumable); struct IResumable { @@ -17,12 +19,8 @@ namespace ccf::tasks public: virtual ~IResumable() = default; - friend void ccf::tasks::resume_task( - std::unique_ptr&& resumable); + friend void ccf::tasks::resume_task(Resumable resumable); }; - using Resumable = std::unique_ptr; - Resumable pause_current_task(); - void resume_task(Resumable&& resumable); } \ No newline at end of file diff --git a/src/tasks/task_system.cpp b/src/tasks/task_system.cpp index 83a03530f59..a95f6cff026 100644 --- a/src/tasks/task_system.cpp +++ b/src/tasks/task_system.cpp @@ -103,7 +103,7 @@ namespace ccf::tasks return handle; } - void resume_task(Resumable&& resumable) + void resume_task(Resumable resumable) { resumable->resume(); } diff --git a/tests/e2e_logging.py b/tests/e2e_logging.py index 35c149c676e..96d67bd334b 100644 --- a/tests/e2e_logging.py +++ b/tests/e2e_logging.py @@ -40,6 +40,7 @@ import subprocess import base64 import cbor2 +from datetime import datetime from loguru import logger as LOG @@ -2269,6 +2270,82 @@ def test_cose_config(network, args): return network +def test_blocking_calls(network, args): + primary, _ = network.find_nodes() + + class CommitPoller(infra.concurrency.StoppableThread): + def __init__(self, node): + super().__init__(name="commit poller") + self.node = node + self.known_commit_times = [] + + def run(self): + with self.node.client() as c: + prev_txid = None + while not self.is_stopped(): + r = c.get("/node/commit", log_capture=[]) + assert r.status_code == http.HTTPStatus.OK, r.status_code + txid = TxID.from_str(r.body.json()["transaction_id"]) + if txid != prev_txid: + self.known_commit_times.append((datetime.now(), txid)) + prev_txid = txid + + cp = CommitPoller(primary) + cp.start() + + response_times = [] + + # Make some blocking and some non-blocking requests, in random order, and compare delta to known-commit time + n_requests = 5 + request_order = [True] * n_requests + [False] * n_requests + random.shuffle(request_order) + + with primary.client("user0") as c: + for blocking in request_order: + path = "/log/blocking/private" if blocking else "/log/private" + r = c.post(path, {"id": 42, "msg": "Hello world"}) + assert r.status_code == http.HTTPStatus.OK, r.status_code + + now = datetime.now() + txid = TxID.from_str(r.headers[infra.clients.CCF_TX_ID_HEADER]) + response_times.append((now, (blocking, txid))) + + # Ensure CommitPoller has waited for commit of _all_ entries + c.wait_for_commit(r) + + cp.stop() + cp.join() + + # Measure the delta between when we received a response, and when we knew it was committed + blocking_deltas = [] + non_blocking_deltas = [] + + for response_time, (blocking, txid) in response_times: + for commit_time, commit_txid in cp.known_commit_times: + assert commit_txid.view == txid.view + if commit_txid.seqno >= txid.seqno: + delta = (commit_time - response_time).total_seconds() + if blocking: + blocking_deltas.append(delta) + else: + non_blocking_deltas.append(delta) + break + else: + raise AssertionError(f"No commit found for {txid}") + + blocking_mean = sum(blocking_deltas) / len(blocking_deltas) + non_blocking_mean = sum(non_blocking_deltas) / len(non_blocking_deltas) + + # Over a large-enough sample size, we'd expect: + # - blocking_mean to approach 0. We get a response and see commit advance at exactly the same time + # - non_blocking_mean to approach the signature interval. We get responses eagerly, and they're committed later at regular signature intervals + + # Our actual test has far more variation, so we can only make much broader claims - the blocking_mean is (much) smaller than the non_blocking_mean + assert blocking_mean < non_blocking_mean + + return network + + def run_main_tests(network, args): test_basic_constraints(network, args) test(network, args) @@ -2315,6 +2392,8 @@ def run_main_tests(network, args): if args.package == "samples/apps/logging/logging": test_etags(network, args) test_cose_config(network, args) + if not args.http2: + test_blocking_calls(network, args) def run_parsing_errors(args): diff --git a/tests/partitions_test.py b/tests/partitions_test.py index 48fcd56c48c..a2894a55900 100644 --- a/tests/partitions_test.py +++ b/tests/partitions_test.py @@ -10,6 +10,7 @@ import suite.test_requirements as reqs from datetime import datetime, timedelta from infra.checker import check_can_progress, check_does_not_progress +from infra.log_capture import flush_info import pprint from infra.tx_status import TxStatus import time @@ -22,6 +23,7 @@ from ccf.tx_id import TxID import os from reconfiguration import test_ledger_invariants +import threading from loguru import logger as LOG @@ -543,6 +545,70 @@ def test_forwarding_timeout(network, args): network.wait_for_primary_unanimity(min_view=view) +@reqs.description( + "Respond-on-commit requests get an error response if the operation is lost in an election" +) +@reqs.at_least_n_nodes(3) +def test_invalidated_blocking_calls(network, args): + primary, backups = network.find_nodes() + key = 42 + val_a = "Hello" + + ready_to_go = threading.Event() + partition_created = threading.Event() + + def blocking_send(): + LOG.info("Make a blocking respond-on-commit call to a partitioned primary") + with primary.client("user0") as c: + ready_to_go.set() + partition_created.wait() + r = c.post( + "/app/log/blocking/private", {"id": key, "msg": val_a}, timeout=10 + ) + assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR + tx_id = r.headers[infra.clients.CCF_TX_ID_HEADER] + body = r.body.json() + assert ( + body["error"]["message"] + == f"While waiting for TxID {tx_id} to commit, it was invalidated" + ) + + send_thread = threading.Thread(target=blocking_send, name="blocking") + send_thread.start() + + with network.partitioner.partition(backups): + ready_to_go.wait() + partition_created.set() + + new_primary, new_term = network.wait_for_new_primary( + old_primary=primary, nodes=backups + ) + + LOG.info(f"Polling for commit in {new_term}") + with new_primary.client() as c: + timeout = 3 + end_time = time.time() + timeout + while True: + logs = [] + r = c.get("/node/commit", log_capture=logs) + assert r.status_code == http.HTTPStatus.OK, r + + commit_tx_id = TxID.from_str(r.body.json()["transaction_id"]) + if commit_tx_id.view >= new_term: + flush_info(logs) + break + + if time.time() > end_time: + flush_info(logs) + raise AssertionError( + f"New primary made no commit progress after {timeout}s" + ) + + LOG.info("Drop partition and wait for reunification") + send_thread.join() + network.wait_for_primary_unanimity() + + @reqs.description( "Session consistency is provided, and inconsistencies after elections are replaced by errors" ) @@ -1095,6 +1161,7 @@ def run(args): test_isolate_and_reconnect_primary(network, args, iteration=n) test_election_reconfiguration(network, args) test_forwarding_timeout(network, args) + test_invalidated_blocking_calls(network, args) # HTTP2 doesn't support forwarding if not args.http2: test_session_consistency(network, args)