OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
runtime_locking_shard/impl.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "impl.hpp"
7
8#include <cassert>
9
11 impl::impl(std::shared_ptr<logging::log> logger)
12 : m_log(std::move(logger)) {}
13
15 broker_id_type broker_id,
16 key_type key,
17 lock_type locktype,
18 bool first_lock,
19 try_lock_callback_type result_callback) -> bool {
20 auto callbacks = pending_callbacks_list_type();
21 auto w_details = std::optional<wounded_details>();
22 auto maybe_error = [&]() -> std::optional<error_code> {
23 std::unique_lock<std::mutex> l(m_mut);
24
25 m_log->trace(ticket_number,
26 "requesting lock on",
27 key.to_hex(),
28 static_cast<int>(locktype));
29
30 auto it = m_tickets.find(ticket_number);
31 if(first_lock && it != m_tickets.end()) {
32 m_log->fatal(ticket_number,
33 "called try_lock with first lock but ticket "
34 "already exists");
35 }
36 if(it == m_tickets.end()) {
37 if(!first_lock) {
38 m_log->error(ticket_number,
39 "called try_lock with unknown ticket");
41 }
42 it = m_tickets.emplace(ticket_number, ticket_state_type{})
43 .first;
44 }
45 auto& ticket = it->second;
46
47 // Callers shouldn't be using try_lock after prepare
48 if(ticket.m_state == ticket_state::prepared) {
49 m_log->error(ticket_number, "called try_lock after prepare");
51 }
52
53 if(ticket.m_state == ticket_state::committed) {
54 m_log->error(ticket_number, "called try_lock after commit");
56 }
57
58 // If the ticket way wounded don't bother trying to acquire any
59 // locks
60 if(ticket.m_state == ticket_state::wounded) {
61 m_log->trace(ticket_number,
62 "called try_lock after being wounded");
63 w_details = ticket.m_wounded_details;
65 }
66
67 // Make sure the ticket doesn't already hold a lock on the key
68 if(auto lock_it = ticket.m_locks_held.find(key);
69 lock_it != ticket.m_locks_held.end()
70 && lock_it->second >= locktype) {
71 m_log->warn(this,
72 ticket_number,
73 "tried to acquire already held lock");
75 }
76
77 if(ticket.m_queued_locks.find(key)
78 != ticket.m_queued_locks.end()) {
79 m_log->warn(ticket_number,
80 "tried to acquire already queued lock");
82 }
83
84 ticket.m_broker_id = broker_id;
85
86 // Grab the requested state element
87 auto& state_element = m_state[key];
88 auto& lock = state_element.m_lock;
89
90 // Queue the lock
91 lock.m_queue.emplace(
92 ticket_number,
93 lock_queue_element_type{locktype, std::move(result_callback)});
94 ticket.m_queued_locks.insert(key);
95
96 // Determine if the ticket will wait on any locks
97 auto waiting_on = get_waiting_on(ticket_number, locktype, lock);
98 callbacks
99 = wound_tickets(std::move(key), waiting_on, ticket_number);
100
101 w_details = ticket.m_wounded_details;
102
103 m_log->trace(this, "shard handled try_lock for", ticket_number);
104 return std::nullopt;
105 }();
106
107 if(maybe_error.has_value()) {
108 result_callback(shard_error{maybe_error.value(), w_details});
109 } else {
110 // Call all the result callbacks without holding the lock
111 for(auto& callback : callbacks) {
112 callback.m_callback(std::move(callback.m_returning));
113 }
114 }
115
116 return true;
117 }
118
119 auto impl::wound_tickets(
120 key_type key,
121 const std::vector<ticket_number_type>& blocking_tickets,
122 ticket_number_type blocked_ticket) -> pending_callbacks_list_type {
123 auto callbacks = pending_callbacks_list_type();
124 auto keys = key_set_type();
125 for(auto blocking_ticket_number : blocking_tickets) {
126 auto& blocking_ticket = m_tickets[blocking_ticket_number];
127 // Tickets can't be deadlocked by prepared tickets and
128 // we're not allowed to wound them anyway
129 if(blocking_ticket.m_state == ticket_state::prepared) {
130 continue;
131 }
132
133 // Mark the ticket as wounded
134 blocking_ticket.m_state = ticket_state::wounded;
135 blocking_ticket.m_wounded_details = {blocked_ticket, key};
136
137 auto [wounded_callbacks, affected_keys]
138 = release_locks(blocking_ticket_number, blocking_ticket);
139 callbacks.insert(
140 callbacks.end(),
141 std::make_move_iterator(wounded_callbacks.begin()),
142 std::make_move_iterator(wounded_callbacks.end()));
143 keys.merge(affected_keys);
144 }
145
146 keys.insert(std::move(key));
147
148 auto acquire_callbacks = acquire_locks(keys);
149 callbacks.insert(callbacks.end(),
150 std::make_move_iterator(acquire_callbacks.begin()),
151 std::make_move_iterator(acquire_callbacks.end()));
152
153 return callbacks;
154 }
155
156 auto impl::get_waiting_on(ticket_number_type ticket_number,
157 lock_type locktype,
158 rw_lock_type& lock)
159 -> std::vector<ticket_number_type> {
160 auto waiting_on = std::vector<ticket_number_type>();
161 auto younger_ticket = [&](auto blocking_ticket_number) {
162 return ticket_number < blocking_ticket_number;
163 };
164 // Write locks wait on readers
165 if(locktype == lock_type::write) {
166 std::copy_if(lock.m_readers.begin(),
167 lock.m_readers.end(),
168 std::back_inserter(waiting_on),
169 younger_ticket);
170 }
171 // All locks wait on writers
172 if(lock.m_writer.has_value()) {
173 if(younger_ticket(lock.m_writer.value())) {
174 waiting_on.push_back(lock.m_writer.value());
175 }
176 }
177 return waiting_on;
178 }
179
181 broker_id_type /* broker_id */,
182 state_update_type state_update,
183 prepare_callback_type result_callback) -> bool {
184 auto result = [&]() -> std::optional<shard_error> {
185 std::unique_lock<std::mutex> l(m_mut);
186 // Grab the ticket and ensure it exists
187 auto ticket_it = m_tickets.find(ticket_number);
188 if(ticket_it == m_tickets.end()) {
189 m_log->error(this,
190 ticket_number,
191 "does not exist on shard for prepare");
192 return shard_error{error_code::unknown_ticket, std::nullopt};
193 }
194 auto& ticket = ticket_it->second;
195
196 // If the ticket is already prepared, return the result as such
197 if(ticket.m_state == ticket_state::prepared) {
198 m_log->warn(ticket_number,
199 "called prepare but already prepared");
200 return shard_error{error_code::prepared, std::nullopt};
201 }
202
203 if(ticket.m_state == ticket_state::committed) {
204 m_log->warn(ticket_number,
205 "called prepare but already committed");
206 return shard_error{error_code::committed, std::nullopt};
207 }
208
209 // If the ticket was wounded it can't be prepared
210 if(ticket.m_state == ticket_state::wounded) {
211 m_log->debug(ticket_number,
212 "called prepare after being wounded");
214 ticket.m_wounded_details};
215 }
216
217 if(!ticket.m_queued_locks.empty()) {
218 m_log->error(ticket_number, "still has queued locks");
219 return shard_error{error_code::lock_queued, std::nullopt};
220 }
221
222 for(auto& [key, value] : state_update) {
223 auto lk_it = ticket.m_locks_held.find(key);
224 if(lk_it == ticket.m_locks_held.end()) {
225 m_log->warn(ticket_number,
226 "wanted state update for unheld lock");
228 std::nullopt};
229 }
230 if(lk_it->second != lock_type::write) {
231 m_log->warn(ticket_number,
232 "wanted state update for read lock");
234 std::nullopt};
235 }
236 }
237
238 ticket.m_state_update = std::move(state_update);
239 ticket.m_state = ticket_state::prepared;
240 return std::nullopt;
241 }();
242
243 result_callback(result);
244 return true;
245 }
246
248 commit_callback_type result_callback) -> bool {
249 auto callbacks = pending_callbacks_list_type();
250 auto result = [&]() -> std::optional<shard_error> {
251 std::unique_lock<std::mutex> l(m_mut);
252 // Grab the ticket and ensure it exists
253 auto ticket_it = m_tickets.find(ticket_number);
254 if(ticket_it == m_tickets.end()) {
255 m_log->error(this,
256 ticket_number,
257 "does not exist on shard for commit");
258 return shard_error{error_code::unknown_ticket, std::nullopt};
259 }
260 auto& ticket = ticket_it->second;
261
262 // If the ticket is not prepared we can't commit
263 if(ticket.m_state != ticket_state::prepared) {
264 m_log->warn(ticket_number, "called commit but not prepared");
265 return shard_error{error_code::not_prepared, std::nullopt};
266 }
267
268 for(auto&& [key, value] : ticket.m_state_update) {
269 m_state[key].m_value = std::move(value);
270 }
271
272 auto [wounded_callbacks, affected_keys]
273 = release_locks(ticket_number, ticket);
274 assert(wounded_callbacks.empty());
275 callbacks = acquire_locks(affected_keys);
276 callbacks.insert(
277 callbacks.end(),
278 std::make_move_iterator(wounded_callbacks.begin()),
279 std::make_move_iterator(wounded_callbacks.end()));
280
281 ticket.m_state = ticket_state::committed;
282
283 m_log->trace(this, "Shard executed commit for", ticket_number);
284 return std::nullopt;
285 }();
286
287 for(auto& callback : callbacks) {
288 m_log->trace(this,
289 "Shard calling callback for",
290 callback.m_ticket_number);
291 callback.m_callback(std::move(callback.m_returning));
292 }
293 result_callback(result);
294
295 m_log->trace(this,
296 "Shard called all callbacks for commit on",
297 ticket_number);
298 return true;
299 }
300
301 auto impl::release_locks(ticket_number_type ticket_number,
302 ticket_state_type& ticket)
303 -> std::pair<pending_callbacks_list_type, key_set_type> {
304 auto callbacks = pending_callbacks_list_type();
305 // Unqueue any pending locks
306 for(const auto& lock_key : ticket.m_queued_locks) {
307 auto& queued_element = m_state[lock_key];
308 auto& lk = queued_element.m_lock;
309 auto queue_node = lk.m_queue.extract(ticket_number);
310 auto& queued_lock_element = queue_node.mapped();
311 // Notify the ticket the queued lock was aborted
312 callbacks.emplace_back(pending_callback_element_type{
313 std::move(queued_lock_element.m_callback),
314 shard_error{error_code::wounded, ticket.m_wounded_details},
315 ticket_number});
316 }
317 auto keys = std::move(ticket.m_queued_locks);
318 ticket.m_queued_locks = key_set_type();
319
320 for(auto& [lock_key, lt] : ticket.m_locks_held) {
321 // Release any locks held by the blocking ticket
322 auto& locked_element = m_state[lock_key];
323 auto& lk = locked_element.m_lock;
324 // Release the read lock held by the wounded ticket
325 if(lt == lock_type::read) {
326 m_log->trace("Releasing read lock on",
327 lock_key.to_hex(),
328 "held by",
329 ticket_number);
330 lk.m_readers.erase(ticket_number);
331 }
332 // Release the write lock held by the wounded ticket
333 if(lt == lock_type::write) {
334 m_log->trace("Releasing write lock on",
335 lock_key.to_hex(),
336 "held by",
337 ticket_number);
338 lk.m_writer.reset();
339 }
340 keys.insert(lock_key);
341 }
342 ticket.m_locks_held.clear();
343
344 return {callbacks, keys};
345 }
346
347 auto impl::acquire_locks(const key_set_type& keys)
348 -> pending_callbacks_list_type {
349 auto callbacks = pending_callbacks_list_type();
350 for(const auto& key : keys) {
351 // Attempt to allow queued tickets to acquire the lock
352 while(acquire_lock(key, callbacks)) {}
353 }
354 return callbacks;
355 }
356
358 rollback_callback_type result_callback) -> bool {
359 auto callbacks = pending_callbacks_list_type();
360 auto result = [&]() -> std::optional<shard_error> {
361 std::unique_lock<std::mutex> l(m_mut);
362 // Grab the ticket and ensure it exists
363 auto ticket_it = m_tickets.find(ticket_number);
364 if(ticket_it == m_tickets.end()) {
365 m_log->error(this,
366 ticket_number,
367 "does not exist on shard for rollback");
368 return shard_error{error_code::unknown_ticket, std::nullopt};
369 }
370 auto& ticket = ticket_it->second;
371
372 auto [wounded_callbacks, affected_keys]
373 = release_locks(ticket_number, ticket);
374 callbacks = acquire_locks(affected_keys);
375 callbacks.insert(
376 callbacks.end(),
377 std::make_move_iterator(wounded_callbacks.begin()),
378 std::make_move_iterator(wounded_callbacks.end()));
379
380 // We erase the ticket here as we won't need the ticket for
381 // recovery. No need for a "rolled back" state and subsequent
382 // finish.
383 m_tickets.erase(ticket_it);
384
385 m_log->trace(this, "Shard handled rollback for", ticket_number);
386
387 return std::nullopt;
388 }();
389
390 for(auto& callback : callbacks) {
391 callback.m_callback(std::move(callback.m_returning));
392 }
393 result_callback(result);
394
395 return true;
396 }
397
399 finish_callback_type result_callback) -> bool {
400 auto maybe_error = [&]() -> std::optional<shard_error> {
401 std::unique_lock l(m_mut);
402 auto ticket_it = m_tickets.find(ticket_number);
403 if(ticket_it == m_tickets.end()) {
404 m_log->error(this,
405 ticket_number,
406 "does not exist on shard for finish");
407 return shard_error{error_code::unknown_ticket, std::nullopt};
408 }
409
410 auto& ticket = ticket_it->second;
411
412 if(ticket.m_state != ticket_state::committed) {
413 m_log->error(this,
414 ticket_number,
415 "finish requested but not committed");
416 return shard_error{error_code::not_committed, std::nullopt};
417 }
418
419 m_tickets.erase(ticket_it);
420
421 m_log->trace(this, "Shard handled finish for", ticket_number);
422
423 return std::nullopt;
424 }();
425
426 result_callback(maybe_error);
427
428 return true;
429 }
430
432 get_tickets_callback_type result_callback) -> bool {
433 auto result = [&]() -> get_tickets_success_type {
434 std::unique_lock l(m_mut);
435 auto ret = get_tickets_success_type();
436 for(auto& [ticket_number, ticket] : m_tickets) {
437 if(ticket.m_broker_id == broker_id) {
438 ret.emplace(ticket_number, ticket.m_state);
439 }
440 }
441 return ret;
442 }();
443
444 result_callback(result);
445
446 return true;
447 }
448
450 const replicated_shard::tickets_type& tickets) -> bool {
451 std::unique_lock l(m_mut);
452 if(!m_tickets.empty() && !m_state.empty()) {
453 m_log->error("Shard state is not empty, cannot recover");
454 return false;
455 }
456 m_state.reserve(state.size());
457 for(auto&& [k, v] : state) {
458 m_state.emplace(k, state_element_type{v, {}});
459 }
460 m_tickets.reserve(tickets.size());
461 for(auto&& [tn, t] : tickets) {
462 auto ticket = ticket_state_type{};
463 ticket.m_broker_id = t.m_broker_id;
464 switch(t.m_state) {
466 ticket.m_state = ticket_state::committed;
467 break;
469 ticket.m_state = ticket_state::prepared;
470 for(const auto& [k, v] : t.m_state_update) {
471 ticket.m_locks_held.emplace(k, lock_type::write);
472 m_state[k].m_lock.m_writer = tn;
473 }
474 break;
475 }
476 ticket.m_state_update = t.m_state_update;
477 m_tickets.emplace(tn, std::move(ticket));
478 }
479 return true;
480 }
481
482 auto impl::acquire_lock(const key_type& key,
483 pending_callbacks_list_type& callbacks) -> bool {
484 auto& locked_element = m_state[key];
485 auto& lk = locked_element.m_lock;
486 if(lk.m_queue.empty()) {
487 return false;
488 }
489 auto acquire_next = true;
490 auto queue_node = lk.m_queue.begin();
491 const auto& queued_ticket_number = queue_node->first;
492 auto& queued_lock_element = queue_node->second;
493 auto& queued_ticket = m_tickets[queued_ticket_number];
494 // Acquire the read lock if the ticket requested a
495 // read
496 if(queued_lock_element.m_type == lock_type::read) {
497 // If the write lock is held we can't acquire the read lock
498 if(lk.m_writer.has_value()) {
499 return false;
500 }
501 m_log->trace("Assigning read lock on",
502 key.to_hex(),
503 "to",
504 queued_ticket_number);
505 lk.m_readers.insert(queued_ticket_number);
506 }
507 // Acquire the write lock if the ticket requested a
508 // write
509 if(queued_lock_element.m_type == lock_type::write) {
510 // If there are readers holding the lock we
511 // can't acquire the write lock or allow any
512 // more queued tickets to acquire the lock
513 if(lk.m_readers.size() > 1 || lk.m_writer.has_value()) {
514 return false;
515 }
516 if(lk.m_readers.size() == 1) {
517 if(*lk.m_readers.begin() != queued_ticket_number) {
518 return false;
519 }
520
521 // Upgrade from a read to a write lock
522 lk.m_readers.clear();
523 }
524 m_log->trace("Assigning write lock on",
525 key.to_hex(),
526 "to",
527 queued_ticket_number);
528 lk.m_writer = queued_ticket_number;
529 acquire_next = false;
530 }
531 queued_ticket.m_queued_locks.erase(key);
532 queued_ticket.m_locks_held[key] = queued_lock_element.m_type;
533 // Notify the ticket that the lock was acquired
534 callbacks.emplace_back(pending_callback_element_type{
535 std::move(queued_lock_element.m_callback),
536 locked_element.m_value,
537 queued_ticket_number});
538 lk.m_queue.erase(queue_node);
539 return acquire_next;
540 }
541}
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
auto rollback(ticket_number_type ticket_number, rollback_callback_type result_callback) -> bool override
Rolls back an uncommitted ticket.
auto recover(const replicated_shard::state_type &state, const replicated_shard::tickets_type &tickets) -> bool
Restores the state of another shard instance.
auto finish(ticket_number_type ticket_number, finish_callback_type result_callback) -> bool override
Deletes a committed or rolled-back ticket.
auto prepare(ticket_number_type ticket_number, broker_id_type broker_id, state_update_type state_update, prepare_callback_type result_callback) -> bool override
Prepares a ticket with the given state updates.
auto commit(ticket_number_type ticket_number, commit_callback_type result_callback) -> bool override
Commits a previously prepared ticket.
auto try_lock(ticket_number_type ticket_number, broker_id_type broker_id, key_type key, lock_type locktype, bool first_lock, try_lock_callback_type result_callback) -> bool override
Locks the given key for a ticket and returns the associated value.
auto get_tickets(broker_id_type broker_id, get_tickets_callback_type result_callback) -> bool override
Returns tickets managed by the given broker.
impl(std::shared_ptr< logging::log > logger)
Constructor.
std::function< void(prepare_return_type)> prepare_callback_type
Callback function type for the result of a prepare operation.
std::unordered_map< ticket_number_type, ticket_state > get_tickets_success_type
Return type from a successful get tickets operation.
std::function< void(get_tickets_return_type)> get_tickets_callback_type
Callback function type for the result of a get tickets operation.
std::function< void(rollback_return_type)> rollback_callback_type
Callback function type for the result of a rollback operation.
std::function< void(finish_return_type)> finish_callback_type
Callback function type for the result of a finish operation.
std::function< void(try_lock_return_type)> try_lock_callback_type
Function type for try lock operation results.
std::function< void(commit_return_type)> commit_callback_type
Callback function type for the result of a commit operation.
std::unordered_map< ticket_number_type, ticket_type > tickets_type
Type for the tickets list returned by the state machine.
std::unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_type
Type for state updates to a shard.
parsec::ticket_machine::ticket_number_type ticket_number_type
Type for a ticket number.
@ wounded
Request invalid because ticket is in the wounded state.
@ lock_not_held
Cannot apply requested state update because the ticket does not hold a write lock on the given key.
@ not_committed
Request invalid because ticket is not in the committed state.
@ lock_held
The ticket already holds the requested lock.
@ lock_queued
The requested lock is already queued for the given ticket.
@ prepared
Request invalid because ticket is in the prepared state.
@ state_update_with_read_lock
Cannot apply requested state update because the ticket only holds a read lock on the given key.
@ not_prepared
Cannot commit the ticket because the ticket has not been prepared.
@ unknown_ticket
The given ticket number is not known to this shard.
@ committed
Request invalid because ticket is in the committed state.
std:: unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_update_type
Type for state updates to a shard. A map of keys and their new values.
@ wounded
Wounded, not holding any locks.
@ committed
Committed, not holding any locks.
@ read
Read lock. Multiple readers can hold a lock for the same key.
@ write
Write lock. Only one ticket can hold this lock at a time.