OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
broker/impl.hpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#ifndef OPENCBDC_TX_SRC_PARSEC_BROKER_IMPL_H_
7#define OPENCBDC_TX_SRC_PARSEC_BROKER_IMPL_H_
8
9#include "interface.hpp"
12
13#include <memory>
14
15namespace cbdc::parsec::broker {
18 class impl : public interface {
19 public:
27 std::vector<std::shared_ptr<runtime_locking_shard::interface>>
28 shards,
29 std::shared_ptr<ticket_machine::interface> ticketer,
30 std::shared_ptr<directory::interface> directory,
31 std::shared_ptr<logging::log> logger);
32
37 auto begin(begin_callback_type result_callback) -> bool override;
38
48 auto try_lock(ticket_number_type ticket_number,
49 key_type key,
50 lock_type locktype,
51 try_lock_callback_type result_callback) -> bool override;
52
58 auto commit(ticket_number_type ticket_number,
59 state_update_type state_updates,
60 commit_callback_type result_callback) -> bool override;
61
66 auto finish(ticket_number_type ticket_number,
67 finish_callback_type result_callback) -> bool override;
68
73 auto rollback(ticket_number_type ticket_number,
74 rollback_callback_type result_callback) -> bool override;
75
81 auto recover(recover_callback_type result_callback) -> bool override;
82
88 auto highest_ticket() -> ticket_number_type override;
89
90 private:
91 enum class ticket_state : uint8_t {
92 begun,
93 prepared,
94 committed,
95 aborted
96 };
97
99 std::vector<std::shared_ptr<runtime_locking_shard::interface>>
100 m_shards;
101 std::shared_ptr<ticket_machine::interface> m_ticketer;
102 std::shared_ptr<directory::interface> m_directory;
103 std::shared_ptr<logging::log> m_log;
104
105 mutable std::recursive_mutex m_mut;
106 ticket_number_type m_highest_ticket{};
107
108 enum class shard_state_type : uint8_t {
109 begun,
110 preparing,
111 prepared,
112 wounded,
113 committing,
114 committed,
115 rolling_back,
116 rolled_back,
117 finishing,
118 finished
119 };
120
121 enum class key_state : uint8_t {
122 locking,
123 locked
124 };
125
126 struct key_state_type {
127 key_state m_key_state{};
128 lock_type m_locktype{};
129 std::optional<value_type> m_value;
130 };
131
132 struct shard_state {
133 std::unordered_map<key_type,
134 key_state_type,
136 m_key_states;
137 shard_state_type m_state{};
138 };
139
140 using shard_states = std::unordered_map<size_t, shard_state>;
141
142 struct state {
143 ticket_state m_state{};
144 shard_states m_shard_states;
145 };
146
147 std::unordered_map<ticket_number_type, std::shared_ptr<state>>
148 m_tickets;
149
150 std::unordered_map<
151 uint64_t,
152 std::unordered_map<ticket_number_type,
154 m_recovery_tickets;
155
156 void handle_prepare(
157 const commit_callback_type& commit_cb,
158 ticket_number_type ticket_number,
159 uint64_t shard_idx,
161
162 void handle_commit(
163 const commit_callback_type& commit_cb,
164 ticket_number_type ticket_number,
165 uint64_t shard_idx,
167
168 void handle_lock(ticket_number_type ticket_number,
169 key_type key,
170 uint64_t shard_idx,
171 const try_lock_callback_type& result_callback,
174
175 void handle_ticket_number(
176 begin_callback_type result_callback,
178 get_ticket_number_return_type> res);
179
180 void handle_rollback(
181 const rollback_callback_type& result_callback,
182 ticket_number_type ticket_number,
183 uint64_t shard_idx,
185 res);
186
187 void handle_find_key(
188 ticket_number_type ticket_number,
189 key_type key,
190 lock_type locktype,
191 try_lock_callback_type result_callback,
192 std::optional<
194
195 void handle_finish(
196 const finish_callback_type& result_callback,
197 ticket_number_type ticket_number,
198 uint64_t shard_idx,
200
201 void handle_get_tickets(const recover_callback_type& result_callback,
202 uint64_t shard_idx,
203 const parsec::runtime_locking_shard::
204 interface::get_tickets_return_type& res);
205
206 void
207 handle_recovery_commit(const recover_callback_type& result_callback,
208 ticket_number_type ticket_number,
209 const commit_return_type& res);
210
211 void
212 handle_recovery_finish(const recover_callback_type& result_callback,
214
215 void
216 handle_recovery_rollback(const recover_callback_type& result_callback,
217 ticket_number_type ticket_number,
219
220 auto do_commit(const commit_callback_type& commit_cb,
221 ticket_number_type ticket_number,
222 const std::shared_ptr<state>& ts)
223 -> std::optional<error_code>;
224
225 auto do_handle_prepare(const commit_callback_type& commit_cb,
226 ticket_number_type ticket_number,
227 const std::shared_ptr<state>& ts,
228 uint64_t shard_idx,
230 prepare_return_type& res)
231 -> std::optional<commit_return_type>;
232
233 auto do_prepare(const commit_callback_type& result_callback,
234 ticket_number_type ticket_number,
235 const std::shared_ptr<state>& t_state,
236 const state_update_type& state_updates)
237 -> std::optional<error_code>;
238
239 auto do_recovery(const recover_callback_type& result_callback)
240 -> std::optional<error_code>;
241 };
242}
243
244#endif
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
Implementation of a broker.
auto finish(ticket_number_type ticket_number, finish_callback_type result_callback) -> bool override
Finishes the ticket on all shards involved in the ticket.
auto try_lock(ticket_number_type ticket_number, key_type key, lock_type locktype, try_lock_callback_type result_callback) -> bool override
Determines the shard responsible for the given key and issues a try lock request for the key.
auto highest_ticket() -> ticket_number_type override
Get the highest ticket number that was used.
auto rollback(ticket_number_type ticket_number, rollback_callback_type result_callback) -> bool override
Rolls back the ticket on all shards involved in the ticket.
auto recover(recover_callback_type result_callback) -> bool override
Requests tickets managed by this broker ID from all shards and completes partially committed tickets,...
auto commit(ticket_number_type ticket_number, state_update_type state_updates, commit_callback_type result_callback) -> bool override
Commits the ticket on all shards involved in the ticket.
impl(runtime_locking_shard::broker_id_type broker_id, std::vector< std::shared_ptr< runtime_locking_shard::interface > > shards, std::shared_ptr< ticket_machine::interface > ticketer, std::shared_ptr< directory::interface > directory, std::shared_ptr< logging::log > logger)
Constructor.
auto begin(begin_callback_type result_callback) -> bool override
Requests a new ticket number from the ticket machine.
std::function< void(recover_return_type)> recover_callback_type
Callback function type for a recovery operation.
std::function< void(rollback_return_type)> rollback_callback_type
Callback function type for a rollback operation.
std::function< void(ticketnum_or_errcode_type)> begin_callback_type
Callback function type for a begin operation.
std::variant< value_type, error_code, runtime_locking_shard::shard_error > try_lock_return_type
Return type from a try lock operation.
std::optional< std::variant< error_code, runtime_locking_shard::shard_error > > commit_return_type
Return type from a commit operation.
std::optional< error_code > finish_return_type
Return type from a finish operation.
std::function< void(finish_return_type)> finish_callback_type
Callback function type for a finish operation.
std::function< void(commit_return_type)> commit_callback_type
Callback function type for a commit operation.
std::function< void(try_lock_return_type)> try_lock_callback_type
Callback function type for a try lock operation.
std::optional< std::variant< error_code, runtime_locking_shard::error_code > > rollback_return_type
Return type from a rollback operation.
uint64_t key_location_return_type
Key location return type. Shard ID where key is located.
std::optional< shard_error > prepare_return_type
Return type from a prepare operation. An error, if applicable.
std::optional< shard_error > commit_return_type
Return type from a commit operation. An error code, if applicable.
std::optional< shard_error > finish_return_type
Return type from a finish operation. An error code, if applicable.
std::optional< shard_error > rollback_return_type
Return type from a rollback operation.
runtime_locking_shard::state_update_type state_update_type
Shard state updates type.
ticket_machine::ticket_number_type ticket_number_type
Ticket number type.
runtime_locking_shard::key_type key_type
Shard key type.
ticket_state
Ticket states returned by shards for broker recovery purposes.
SipHash function to generate STL data structure hash keys for system IDs.
Definition hashmap.hpp:27