OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
replicated_shard_client.cpp
Go to the documentation of this file.
1// Copyright (c) 2022 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
7
8#include "format.hpp"
9#include "messages.hpp"
10#include "util/raft/util.hpp"
12
15 std::shared_ptr<raft::node> raft_node)
16 : m_raft(std::move(raft_node)) {}
17
19 broker_id_type broker_id,
20 state_type state_update,
21 callback_type result_callback)
22 -> bool {
23 auto req = rpc::replicated_prepare_request{ticket_number,
24 broker_id,
25 std::move(state_update)};
26 auto success = replicate_request(
27 std::move(req),
28 [result_callback](
29 std::optional<rpc::replicated_response> maybe_res) {
30 if(!maybe_res.has_value()) {
31 result_callback(error_code::internal_error);
32 return;
33 }
34 auto&& res = maybe_res.value();
35 assert(std::holds_alternative<
37 auto&& resp_val
38 = std::get<replicated_shard_interface::return_type>(res);
39 result_callback(resp_val);
40 });
41 return success;
42 }
43
45 callback_type result_callback)
46 -> bool {
47 auto req = rpc::commit_request{ticket_number};
48 auto success = replicate_request(
49 req,
50 [result_callback](
51 std::optional<rpc::replicated_response> maybe_res) {
52 if(!maybe_res.has_value()) {
53 result_callback(error_code::internal_error);
54 return;
55 }
56 auto&& res = maybe_res.value();
57 assert(std::holds_alternative<
59 auto&& resp_val
60 = std::get<replicated_shard_interface::return_type>(res);
61 result_callback(resp_val);
62 });
63 return success;
64 }
65
67 callback_type result_callback)
68 -> bool {
69 auto req = rpc::finish_request{ticket_number};
70 auto success = replicate_request(
71 req,
72 [result_callback](
73 std::optional<rpc::replicated_response> maybe_res) {
74 if(!maybe_res.has_value()) {
75 result_callback(error_code::internal_error);
76 return;
77 }
78 auto&& res = maybe_res.value();
79 assert(std::holds_alternative<
81 auto&& resp_val
82 = std::get<replicated_shard_interface::return_type>(res);
83 result_callback(resp_val);
84 });
85 return success;
86 }
87
89 get_tickets_callback_type result_callback) const -> bool {
91 auto success = replicate_request(
92 req,
93 [result_callback](
94 std::optional<rpc::replicated_response> maybe_res) {
95 if(!maybe_res.has_value()) {
96 result_callback(error_code::internal_error);
97 return;
98 }
99 auto&& res = maybe_res.value();
100 assert(std::holds_alternative<
102 res));
103 auto&& resp_val = std::get<
105 result_callback(std::move(resp_val));
106 });
107 return success;
108 }
109
110 auto replicated_shard_client::replicate_request(
111 const rpc::replicated_request& req,
112 const std::function<void(std::optional<rpc::replicated_response>)>&
113 result_callback) const -> bool {
114 if(!m_raft->is_leader()) {
115 return false;
116 }
118 nuraft::ptr<nuraft::buffer>>(req);
119 auto success = m_raft->replicate(
120 std::move(req_buf),
121 [result_callback](raft::result_type& r,
122 nuraft::ptr<std::exception>& err) {
123 if(err) {
124 result_callback(std::nullopt);
125 return;
126 }
127
128 const auto res = r.get();
129 if(!res) {
130 result_callback(std::nullopt);
131 return;
132 }
133
134 auto maybe_resp = from_buffer<rpc::replicated_response>(*res);
135 assert(maybe_resp.has_value());
136 auto&& resp = maybe_resp.value();
137 result_callback(std::move(resp));
138 });
139 return success;
140 }
141}
auto finish(ticket_number_type ticket_number, callback_type result_callback) -> bool override
Replicates a finish request in the state machine and returns the response via a callback function.
auto commit(ticket_number_type ticket_number, callback_type result_callback) -> bool override
Replicates a commit request in the state machine and returns the response via a callback function.
auto get_tickets(get_tickets_callback_type result_callback) const -> bool override
Replicates a get tickets request in the state machine and returns the response via a callback functio...
replicated_shard_client(std::shared_ptr< raft::node > raft_node)
Constructs a shard client.
auto prepare(ticket_number_type ticket_number, broker_id_type broker_id, state_type state_update, callback_type result_callback) -> bool override
Replicates a prepare request in the state machine and returns the response via a callback function.
std::function< void(get_tickets_return_type)> get_tickets_callback_type
Callback function type for the result of a get tickets operation.
std::variant< tickets_type, error_code > get_tickets_return_type
Return type from a get tickets operation.
std::function< void(return_type)> callback_type
Callback function type for the result of a prepare operation.
std::unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_type
Type for state updates to a shard.
std::optional< error_code > return_type
Return type from a prepare operation. An error, if applicable.
std::variant< replicated_prepare_request, commit_request, finish_request, replicated_get_tickets_request > replicated_request
Shard replicated state machine request type.
parsec::ticket_machine::ticket_number_type ticket_number_type
Type for a ticket number.
@ internal_error
Request failed because of a transient internal error.
nuraft::cmd_result< nuraft::ptr< nuraft::buffer > > result_type
A NuRaft state machine execution result.
Definition node.hpp:18
auto from_buffer(nuraft::buffer &buf) -> std::optional< T >
Deserialize object of given type from a nuraft::buffer.
auto make_buffer(const T &obj) -> std::enable_if_t< std::is_same_v< B, nuraft::ptr< nuraft::buffer > >, nuraft::ptr< nuraft::buffer > >
Serialize object into nuraft::buffer using a cbdc::nuraft_serializer.
Message for retrieving unfinished tickets from the replicated state machine.