12 std::shared_ptr<logging::log> logger,
13 std::shared_ptr<interface>
impl,
14 std::shared_ptr<replicated_shard_interface> repl,
16 : m_log(std::move(logger)),
17 m_impl(std::move(
impl)),
18 m_repl(std::move(repl)),
19 m_srv(std::move(srv)) {
20 m_srv->register_handler_callback(
22 std::function<
void(std::optional<response>)> callback) {
23 return handler_callback(req, std::move(callback));
27 auto server::handler_callback(
29 std::function<
void(std::optional<response>)> callback) ->
bool {
30 auto success = std::visit(
33 return m_impl->try_lock(
40 callback(std::move(ret));
44 return m_impl->prepare(
48 [
this, callback, msg](
50 handle_prepare(std::move(ret), msg, callback);
53 [&](rpc::commit_request msg) {
54 return m_repl->commit(
56 [
this, callback, msg](
58 handle_commit(ret, msg, callback);
61 [&](rpc::rollback_request msg) {
62 return m_repl->finish(
64 [
this, callback, msg](
66 do_rollback(ret, msg, callback);
69 [&](rpc::finish_request msg) {
70 return m_repl->finish(
72 [
this, callback, msg](
74 do_finish(ret, msg, callback);
77 [&](rpc::get_tickets_request msg) {
78 return m_impl->get_tickets(
81 callback(std::move(ret));
89 const rpc::prepare_request& msg,
90 const callback_type& callback) {
92 m_log->trace(
"Error response during prepare");
93 callback(std::move(ret));
97 auto success = m_repl->prepare(
102 if(res.has_value()) {
103 m_log->error(
"Error response during prepare replication");
104 callback(error_code::internal_error);
110 m_log->error(
"Error replicating prepare");
116 rpc::commit_request msg,
117 const callback_type& callback) {
118 if(ret.has_value()) {
119 m_log->error(
"Error response during commit replication");
125 = m_impl->commit(msg.m_ticket_number,
127 callback(std::move(res));
130 m_log->error(
"Error initiating commit with internal shard");
136 rpc::rollback_request msg,
137 const callback_type& callback) {
138 if(ret.has_value()) {
139 m_log->error(
"Error response during discard replication",
140 static_cast<int>(ret.value()));
145 auto success = m_impl->rollback(
148 callback(std::move(res));
151 m_log->error(
"Error initiating rollback with internal shard");
157 rpc::finish_request msg,
158 const callback_type& callback) {
159 if(ret.has_value()) {
160 m_log->error(
"Error response during discard replication");
166 = m_impl->finish(msg.m_ticket_number,
168 callback(std::move(res));
171 m_log->error(
"Error initiating rollback with internal shard");
Implementation of a runtime locking shard.
std::optional< shard_error > prepare_return_type
Return type from a prepare operation. An error, if applicable.
std::optional< shard_error > commit_return_type
Return type from a commit operation. An error code, if applicable.
std::optional< shard_error > finish_return_type
Return type from a finish operation. An error code, if applicable.
std::variant< value_type, shard_error > try_lock_return_type
Return type from a try lock operation.
std::optional< shard_error > rollback_return_type
Return type from a rollback operation.
std::variant< get_tickets_success_type, error_code > get_tickets_return_type
Return type from a get tickets operation.
std::optional< error_code > return_type
Return type from a prepare operation. An error, if applicable.
server(std::shared_ptr< logging::log > logger, std::shared_ptr< interface > impl, std::shared_ptr< replicated_shard_interface > repl, std::unique_ptr< cbdc::rpc::async_server< request, response > > srv)
Constructor.
Generic asynchronous RPC server.
std::variant< try_lock_request, prepare_request, commit_request, rollback_request, finish_request, get_tickets_request > request
RPC request message type.
@ internal_error
Request failed because of a transient internal error.
Variant handler template.
Try lock request message.