OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
parsec/runtime_locking_shard/server.cpp
Go to the documentation of this file.
1// Copyright (c) 2022 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "server.hpp"
7
9
12 std::shared_ptr<logging::log> logger,
13 std::shared_ptr<interface> impl,
14 std::shared_ptr<replicated_shard_interface> repl,
16 : m_log(std::move(logger)),
17 m_impl(std::move(impl)),
18 m_repl(std::move(repl)),
19 m_srv(std::move(srv)) {
20 m_srv->register_handler_callback(
21 [&](const request& req,
22 std::function<void(std::optional<response>)> callback) {
23 return handler_callback(req, std::move(callback));
24 });
25 }
26
27 auto server::handler_callback(
28 const request& req,
29 std::function<void(std::optional<response>)> callback) -> bool {
30 auto success = std::visit(
32 [&](const rpc::try_lock_request& msg) {
33 return m_impl->try_lock(
34 msg.m_ticket_number,
35 msg.m_broker_id,
36 msg.m_key,
37 msg.m_locktype,
38 msg.m_first_lock,
39 [callback](interface::try_lock_return_type ret) {
40 callback(std::move(ret));
41 });
42 },
43 [&](const rpc::prepare_request& msg) {
44 return m_impl->prepare(
45 msg.m_ticket_number,
46 msg.m_broker_id,
47 msg.m_state_updates,
48 [this, callback, msg](
50 handle_prepare(std::move(ret), msg, callback);
51 });
52 },
53 [&](rpc::commit_request msg) {
54 return m_repl->commit(
55 msg.m_ticket_number,
56 [this, callback, msg](
58 handle_commit(ret, msg, callback);
59 });
60 },
61 [&](rpc::rollback_request msg) {
62 return m_repl->finish(
63 msg.m_ticket_number,
64 [this, callback, msg](
66 do_rollback(ret, msg, callback);
67 });
68 },
69 [&](rpc::finish_request msg) {
70 return m_repl->finish(
71 msg.m_ticket_number,
72 [this, callback, msg](
74 do_finish(ret, msg, callback);
75 });
76 },
77 [&](rpc::get_tickets_request msg) {
78 return m_impl->get_tickets(
79 msg.m_broker_id,
81 callback(std::move(ret));
82 });
83 }},
84 req);
85 return success;
86 }
87
88 void server::handle_prepare(interface::prepare_return_type ret,
89 const rpc::prepare_request& msg,
90 const callback_type& callback) {
91 if(ret.has_value()) {
92 m_log->trace("Error response during prepare");
93 callback(std::move(ret));
94 return;
95 }
96
97 auto success = m_repl->prepare(
98 msg.m_ticket_number,
99 msg.m_broker_id,
100 msg.m_state_updates,
101 [this, callback](replicated_shard_interface::return_type res) {
102 if(res.has_value()) {
103 m_log->error("Error response during prepare replication");
104 callback(error_code::internal_error);
105 return;
106 }
108 });
109 if(!success) {
110 m_log->error("Error replicating prepare");
112 }
113 }
114
115 void server::handle_commit(replicated_shard_interface::return_type ret,
116 rpc::commit_request msg,
117 const callback_type& callback) {
118 if(ret.has_value()) {
119 m_log->error("Error response during commit replication");
121 return;
122 }
123
124 auto success
125 = m_impl->commit(msg.m_ticket_number,
126 [callback](interface::commit_return_type res) {
127 callback(std::move(res));
128 });
129 if(!success) {
130 m_log->error("Error initiating commit with internal shard");
132 }
133 }
134
135 void server::do_rollback(replicated_shard_interface::return_type ret,
136 rpc::rollback_request msg,
137 const callback_type& callback) {
138 if(ret.has_value()) {
139 m_log->error("Error response during discard replication",
140 static_cast<int>(ret.value()));
142 return;
143 }
144
145 auto success = m_impl->rollback(
146 msg.m_ticket_number,
147 [callback](interface::rollback_return_type res) {
148 callback(std::move(res));
149 });
150 if(!success) {
151 m_log->error("Error initiating rollback with internal shard");
153 }
154 }
155
156 void server::do_finish(replicated_shard_interface::return_type ret,
157 rpc::finish_request msg,
158 const callback_type& callback) {
159 if(ret.has_value()) {
160 m_log->error("Error response during discard replication");
162 return;
163 }
164
165 auto success
166 = m_impl->finish(msg.m_ticket_number,
167 [callback](interface::finish_return_type res) {
168 callback(std::move(res));
169 });
170 if(!success) {
171 m_log->error("Error initiating rollback with internal shard");
173 }
174 }
175}
Implementation of a runtime locking shard.
std::optional< shard_error > prepare_return_type
Return type from a prepare operation. An error, if applicable.
std::optional< shard_error > commit_return_type
Return type from a commit operation. An error code, if applicable.
std::optional< shard_error > finish_return_type
Return type from a finish operation. An error code, if applicable.
std::variant< value_type, shard_error > try_lock_return_type
Return type from a try lock operation.
std::optional< shard_error > rollback_return_type
Return type from a rollback operation.
std::variant< get_tickets_success_type, error_code > get_tickets_return_type
Return type from a get tickets operation.
std::optional< error_code > return_type
Return type from a prepare operation. An error, if applicable.
server(std::shared_ptr< logging::log > logger, std::shared_ptr< interface > impl, std::shared_ptr< replicated_shard_interface > repl, std::unique_ptr< cbdc::rpc::async_server< request, response > > srv)
Constructor.
Generic asynchronous RPC server.
std::variant< try_lock_request, prepare_request, commit_request, rollback_request, finish_request, get_tickets_request > request
RPC request message type.
@ internal_error
Request failed because of a transient internal error.
Variant handler template.