OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
broker/impl.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "impl.hpp"
7
9
10#include <cassert>
11
15 std::vector<std::shared_ptr<runtime_locking_shard::interface>> shards,
16 std::shared_ptr<ticket_machine::interface> ticketer,
17 std::shared_ptr<directory::interface> directory,
18 std::shared_ptr<logging::log> logger)
19 : m_broker_id(broker_id),
20 m_shards(std::move(shards)),
21 m_ticketer(std::move(ticketer)),
22 m_directory(std::move(directory)),
23 m_log(std::move(logger)) {}
24
25 auto impl::begin(begin_callback_type result_callback) -> bool {
26 if(!m_ticketer->get_ticket_number(
27 [this, result_callback](
29 get_ticket_number_return_type> res) {
30 handle_ticket_number(result_callback, res);
31 })) {
32 m_log->error("Failed to request a ticket number");
33 result_callback(error_code::ticket_machine_unreachable);
34 }
35
36 return true;
37 }
38
39 void impl::handle_ticket_number(
40 begin_callback_type result_callback,
41 std::optional<
43 res) {
44 if(!res.has_value()) {
46 return;
47 }
48 std::visit(overloaded{[&](const parsec::ticket_machine::interface::
49 ticket_number_range_type& n) {
50 {
51 std::unique_lock l(m_mut);
52 if(m_highest_ticket < n.second) {
53 m_highest_ticket = n.second;
54 }
55 m_tickets.emplace(
56 n.first,
57 std::make_shared<state>(
58 state{ticket_state::begun, {}}));
59 }
60 result_callback(n.first);
61 },
62 [&](const parsec::ticket_machine::interface::
63 error_code& /* e */) {
64 result_callback(
66 }},
67 res.value());
68 }
69
70 auto impl::highest_ticket() -> ticket_number_type {
71 return m_highest_ticket;
72 }
73
74 void impl::handle_lock(
75 ticket_number_type ticket_number,
76 key_type key,
77 uint64_t shard_idx,
78 const try_lock_callback_type& result_callback,
80 res) {
81 auto result = std::visit(
84 -> try_lock_return_type {
85 std::unique_lock l(m_mut);
86 auto it = m_tickets.find(ticket_number);
87 if(it == m_tickets.end()) {
88 return error_code::unknown_ticket;
89 }
90
91 auto t_state = it->second;
92 auto& s_state = t_state->m_shard_states[shard_idx];
93 auto k_it = s_state.m_key_states.find(key);
94 if(k_it == s_state.m_key_states.end()) {
95 m_log->error("Shard state not found for key");
96 return error_code::invalid_shard_state;
97 }
98
99 if(k_it->second.m_key_state != key_state::locking) {
100 m_log->error("Shard state not locking");
101 return error_code::invalid_shard_state;
102 }
103
104 k_it->second.m_key_state = key_state::locked;
105 k_it->second.m_value = v;
106
107 m_log->trace(this, "Broker locked key for", ticket_number);
108
109 return v;
110 },
111 [&, key](parsec::runtime_locking_shard::shard_error e)
112 -> try_lock_return_type {
113 if(e.m_wounded_details.has_value()) {
114 m_log->trace(this,
115 e.m_wounded_details->m_wounding_ticket,
116 "wounded ticket",
117 ticket_number);
118 }
119 m_log->trace(this,
120 "Shard error",
121 static_cast<int>(e.m_error_code),
122 "locking key",
123 key.to_hex(),
124 "for",
125 ticket_number);
126 return e;
127 }},
128 res);
129 result_callback(result);
130 }
131
132 auto impl::try_lock(ticket_number_type ticket_number,
133 key_type key,
134 lock_type locktype,
135 try_lock_callback_type result_callback) -> bool {
136 auto maybe_error = [&]() -> std::optional<error_code> {
137 std::unique_lock l(m_mut);
138 auto it = m_tickets.find(ticket_number);
139 if(it == m_tickets.end()) {
140 return error_code::unknown_ticket;
141 }
142
143 auto t_state = it->second;
144 switch(t_state->m_state) {
145 case ticket_state::begun:
146 break;
147 case ticket_state::prepared:
148 return error_code::prepared;
149 case ticket_state::committed:
150 return error_code::committed;
151 case ticket_state::aborted:
152 t_state->m_state = ticket_state::begun;
153 t_state->m_shard_states.clear();
154 m_log->trace(this, "broker restarting", ticket_number);
155 break;
156 }
157
158 if(!m_directory->key_location(
159 key,
160 [=, this](std::optional<parsec::directory::interface::
161 key_location_return_type> res) {
162 handle_find_key(ticket_number,
163 key,
164 locktype,
165 result_callback,
166 res);
167 })) {
168 m_log->error("Failed to make key location directory request");
169 return error_code::directory_unreachable;
170 }
171
172 return std::nullopt;
173 }();
174
175 if(maybe_error.has_value()) {
176 result_callback(maybe_error.value());
177 }
178
179 return true;
180 }
181
182 void impl::handle_prepare(
183 const commit_callback_type& commit_cb,
184 ticket_number_type ticket_number,
185 uint64_t shard_idx,
187 auto maybe_error = [&]() -> std::optional<commit_return_type> {
188 std::unique_lock ll(m_mut);
189 auto itt = m_tickets.find(ticket_number);
190 if(itt == m_tickets.end()) {
191 return error_code::unknown_ticket;
192 }
193
194 auto ts = itt->second;
195 switch(ts->m_state) {
196 case ticket_state::begun:
197 break;
198 case ticket_state::prepared:
199 return error_code::prepared;
200 case ticket_state::committed:
201 return error_code::committed;
202 case ticket_state::aborted:
203 return error_code::aborted;
204 }
205
206 return do_handle_prepare(commit_cb,
207 ticket_number,
208 ts,
209 shard_idx,
210 res);
211 }();
212
213 m_log->trace(this, "Broker handled prepare for", ticket_number);
214
215 if(maybe_error.has_value()) {
216 m_log->trace(this,
217 "Broker calling prepare callback with error for",
218 ticket_number);
219 commit_cb(maybe_error.value());
220 }
221 }
222
223 auto impl::do_handle_prepare(
224 const commit_callback_type& commit_cb,
225 ticket_number_type ticket_number,
226 const std::shared_ptr<state>& ts,
227 uint64_t shard_idx,
229 res) -> std::optional<commit_return_type> {
230 auto& ss = ts->m_shard_states[shard_idx].m_state;
231 if(ss != shard_state_type::preparing) {
232 m_log->trace(this,
233 "Shard",
234 shard_idx,
235 "not in preparing state for",
236 ticket_number);
237 return std::nullopt;
238 }
239
240 if(res.has_value()) {
241 if(res.value().m_error_code
243 m_log->error("Shard error with prepare for", ticket_number);
244 } else {
245 m_log->trace("Shard",
246 shard_idx,
247 "wounded ticket",
248 ticket_number);
249 for(auto& [sidx, s] : ts->m_shard_states) {
250 if(s.m_state == shard_state_type::wounded) {
251 return std::nullopt;
252 }
253 }
254 ss = shard_state_type::wounded;
255 }
256 return res.value();
257 }
258
259 m_log->trace(this,
260 "Broker setting shard",
261 shard_idx,
262 "to prepared for",
263 ticket_number);
264 ss = shard_state_type::prepared;
265
266 for(auto& shard : ts->m_shard_states) {
267 if(shard.second.m_state != shard_state_type::prepared) {
268 return std::nullopt;
269 }
270 }
271
272 ts->m_state = ticket_state::prepared;
273
274 auto maybe_error = do_commit(commit_cb, ticket_number, ts);
275 if(maybe_error.has_value()) {
276 return maybe_error.value();
277 }
278 return std::nullopt;
279 }
280
281 auto impl::do_commit(const commit_callback_type& commit_cb,
282 ticket_number_type ticket_number,
283 const std::shared_ptr<state>& ts)
284 -> std::optional<error_code> {
285 for(auto& shard : ts->m_shard_states) {
286 if(ts->m_state == ticket_state::aborted) {
287 m_log->trace("Broker aborted during commit for",
288 ticket_number);
289 break;
290 }
291 if(shard.second.m_state == shard_state_type::committed) {
292 continue;
293 }
294 shard.second.m_state = shard_state_type::committing;
295 auto sidx = shard.first;
296 if(!m_shards[sidx]->commit(
297 ticket_number,
298 [=, this](const parsec::runtime_locking_shard::interface::
299 commit_return_type& comm_res) {
300 handle_commit(commit_cb, ticket_number, sidx, comm_res);
301 })) {
302 m_log->error("Failed to make commit shard request");
303 return error_code::shard_unreachable;
304 }
305 }
306 return std::nullopt;
307 }
308
309 void impl::handle_commit(
310 const commit_callback_type& commit_cb,
311 ticket_number_type ticket_number,
312 uint64_t shard_idx,
314 auto callback = false;
315 auto maybe_error = [&]() -> std::optional<error_code> {
316 std::unique_lock lll(m_mut);
317 auto ittt = m_tickets.find(ticket_number);
318 if(ittt == m_tickets.end()) {
319 return error_code::unknown_ticket;
320 }
321
322 auto tss = ittt->second;
323 switch(tss->m_state) {
324 case ticket_state::begun:
325 return error_code::not_prepared;
326 case ticket_state::prepared:
327 break;
328 case ticket_state::committed:
329 return error_code::committed;
330 case ticket_state::aborted:
331 return error_code::aborted;
332 }
333
334 if(tss->m_shard_states[shard_idx].m_state
335 != shard_state_type::committing) {
336 m_log->error("Commit result when shard not committing");
337 return error_code::invalid_shard_state;
338 }
339
340 if(res.has_value()) {
341 m_log->error("Error committing on shard");
342 return error_code::commit_error;
343 }
344
345 tss->m_shard_states[shard_idx].m_state
346 = shard_state_type::committed;
347
348 for(auto& shard : tss->m_shard_states) {
349 if(shard.second.m_state != shard_state_type::committed) {
350 return std::nullopt;
351 }
352 }
353
354 tss->m_state = ticket_state::committed;
355 callback = true;
356
357 m_log->trace(this, "Broker handled commit for", ticket_number);
358
359 return std::nullopt;
360 }();
361
362 if(maybe_error.has_value()) {
363 m_log->trace(this,
364 "Broker calling commit callback with error for",
365 ticket_number);
366 commit_cb(maybe_error.value());
367 } else if(callback) {
368 m_log->trace(this,
369 "Broker calling commit callback from handle_commit "
370 "with success for",
371 ticket_number);
372 commit_cb(std::nullopt);
373 }
374 }
375
376 auto impl::commit(ticket_number_type ticket_number,
377 state_update_type state_updates,
378 commit_callback_type result_callback) -> bool {
379 m_log->trace(this, "Broker got commit request for", ticket_number);
380 auto maybe_error = [&]() -> std::optional<error_code> {
381 std::unique_lock l(m_mut);
382 auto it = m_tickets.find(ticket_number);
383 if(it == m_tickets.end()) {
384 return error_code::unknown_ticket;
385 }
386
387 auto t_state = it->second;
388 switch(t_state->m_state) {
389 case ticket_state::begun:
390 [[fallthrough]];
391 case ticket_state::prepared:
392 break;
393 case ticket_state::committed:
394 return error_code::committed;
395 case ticket_state::aborted:
396 return error_code::aborted;
397 }
398
399 auto keys_with_tickets = std::unordered_map<
401 bool,
403 for(auto& shard : t_state->m_shard_states) {
404 for(auto& key : shard.second.m_key_states) {
405 if(key.second.m_key_state == key_state::locking) {
406 m_log->error("Cannot commit, still waiting for locks");
407 return error_code::waiting_for_locks;
408 }
409 keys_with_tickets[key.first] = true;
410 }
411 }
412 for(auto& update_key : state_updates) {
413 if(keys_with_tickets.find(update_key.first)
414 == keys_with_tickets.end()) {
419 m_log->error("Update map contains keys not associated "
420 "with tickets. Aborting.");
421 return error_code::commit_hazard;
422 }
423 }
424
425 if(t_state->m_state == ticket_state::prepared) {
426 return do_commit(result_callback, ticket_number, t_state);
427 }
428 return do_prepare(result_callback,
429 ticket_number,
430 t_state,
431 state_updates);
432 }();
433
434 if(maybe_error.has_value()) {
435 m_log->trace(
436 this,
437 "Broker calling commit callback with error from commit for",
438 ticket_number);
439 result_callback(maybe_error.value());
440 }
441
442 return true;
443 }
444
445 auto impl::do_prepare(const commit_callback_type& result_callback,
446 ticket_number_type ticket_number,
447 const std::shared_ptr<state>& t_state,
448 const state_update_type& state_updates)
449 -> std::optional<error_code> {
450 for(auto& shard : t_state->m_shard_states) {
451 // Shard states might get nuked in this loop if there's a
452 // rollback Before iterating, make sure we're still good to
453 // do more prepares Only nuke shard states on restart
454 if(t_state->m_state == ticket_state::aborted) {
455 m_log->trace("Broker aborted during prepare for",
456 ticket_number);
457 break;
458 }
459 if(shard.second.m_state == shard_state_type::prepared) {
460 continue;
461 }
462 shard.second.m_state = shard_state_type::preparing;
463 auto shard_updates = state_update_type();
464 for(const auto& update : state_updates) {
465 if(shard.second.m_key_states.find(update.first)
466 != shard.second.m_key_states.end()) {
467 shard_updates.emplace(update);
468 }
469 }
470 if(!m_shards[shard.first]->prepare(
471 ticket_number,
472 m_broker_id,
473 std::move(shard_updates),
474 [this,
475 result_callback,
476 ticket_number,
477 shard_idx
478 = shard.first](const parsec::runtime_locking_shard::
479 interface::prepare_return_type& res) {
480 handle_prepare(result_callback,
481 ticket_number,
482 shard_idx,
483 res);
484 })) {
485 m_log->error("Failed to make prepare shard request");
486 return error_code::shard_unreachable;
487 }
488 }
489 return std::nullopt;
490 }
491
492 auto impl::finish(ticket_number_type ticket_number,
493 finish_callback_type result_callback) -> bool {
494 auto done = false;
495 auto maybe_error = [&]() -> std::optional<error_code> {
496 std::unique_lock l(m_mut);
497 auto it = m_tickets.find(ticket_number);
498 if(it == m_tickets.end()) {
499 m_log->trace(this,
500 "Broker failing finish: [Unknown ticket] for ",
501 ticket_number);
502 return error_code::unknown_ticket;
503 }
504
505 auto t_state = it->second;
506 switch(t_state->m_state) {
507 case ticket_state::begun:
508 m_log->trace(this,
509 "Broker failing finish: [State = Begun] for ",
510 ticket_number);
511 return error_code::begun;
512 case ticket_state::prepared:
513 m_log->trace(
514 this,
515 "Broker failing finish: [State = Prepared] for ",
516 ticket_number);
517 return error_code::prepared;
518 case ticket_state::committed:
519 break;
520 case ticket_state::aborted:
521 // Ticket already rolled back. Just delete the ticket.
522 m_tickets.erase(it);
523 done = true;
524 return std::nullopt;
525 }
526
527 for(auto& shard : t_state->m_shard_states) {
528 m_log->trace(this,
529 "Broker requesting finish on",
530 shard.first,
531 "for ticket",
532 ticket_number);
533 if(shard.second.m_state == shard_state_type::finished) {
534 m_log->trace(this,
535 "Broker skipping finish on",
536 shard.first,
537 "for ticket",
538 ticket_number,
539 " already finished");
540 continue;
541 }
542 auto sidx = shard.first;
543 assert(sidx < m_shards.size());
544 shard.second.m_state = shard_state_type::finishing;
545 if(!m_shards[sidx]->finish(
546 ticket_number,
547 [=, this](const parsec::runtime_locking_shard::
549 handle_finish(result_callback,
550 ticket_number,
551 sidx,
552 res);
553 })) {
554 m_log->error("Failed to make finish shard request");
555 return error_code::shard_unreachable;
556 }
557 }
558
559 return std::nullopt;
560 }();
561
562 if(maybe_error.has_value()) {
563 result_callback(maybe_error.value());
564 } else if(done) {
565 result_callback(std::nullopt);
566 }
567
568 return true;
569 }
570
571 auto impl::rollback(ticket_number_type ticket_number,
572 rollback_callback_type result_callback) -> bool {
573 m_log->trace(this, "Broker got rollback request for", ticket_number);
574 auto callback = false;
575 auto maybe_error = [&]() -> std::optional<error_code> {
576 std::unique_lock l(m_mut);
577 auto it = m_tickets.find(ticket_number);
578 if(it == m_tickets.end()) {
579 return error_code::unknown_ticket;
580 }
581
582 auto t_state = it->second;
583 switch(t_state->m_state) {
584 case ticket_state::begun:
585 break;
586 case ticket_state::prepared:
587 return error_code::prepared;
588 case ticket_state::committed:
589 return error_code::committed;
590 case ticket_state::aborted:
591 return error_code::aborted;
592 }
593
594 if(t_state->m_shard_states.empty()) {
595 callback = true;
596 t_state->m_state = ticket_state::aborted;
597 return std::nullopt;
598 }
599
600 for(auto& shard : t_state->m_shard_states) {
601 m_log->trace(this,
602 "Broker requesting rollback on",
603 shard.first,
604 "for ticket",
605 ticket_number);
606 if(shard.second.m_state == shard_state_type::rolled_back) {
607 m_log->trace(this,
608 "Broker skipping rollback on",
609 shard.first,
610 "for ticket",
611 ticket_number,
612 " already rolled back");
613 continue;
614 }
615 auto sidx = shard.first;
616 assert(sidx < m_shards.size());
617 shard.second.m_state = shard_state_type::rolling_back;
618 if(!m_shards[sidx]->rollback(
619 ticket_number,
620 [=, this](const parsec::runtime_locking_shard::
622 handle_rollback(result_callback,
623 ticket_number,
624 sidx,
625 res);
626 })) {
627 m_log->error("Failed to make rollback shard request");
628 return error_code::shard_unreachable;
629 }
630 }
631
632 return std::nullopt;
633 }();
634
635 m_log->trace(this,
636 "Broker initiated rollback request for",
637 ticket_number);
638
639 if(maybe_error.has_value()) {
640 result_callback(maybe_error.value());
641 } else if(callback) {
642 result_callback(std::nullopt);
643 }
644
645 m_log->trace(this,
646 "Broker handled rollback request for",
647 ticket_number);
648
649 return true;
650 }
651
652 void impl::handle_rollback(
653 const rollback_callback_type& result_callback,
654 ticket_number_type ticket_number,
655 uint64_t shard_idx,
657 auto callback = false;
658 auto maybe_error = [&]() -> std::optional<error_code> {
659 std::unique_lock lll(m_mut);
660 auto ittt = m_tickets.find(ticket_number);
661 if(ittt == m_tickets.end()) {
662 return error_code::unknown_ticket;
663 }
664
665 auto tss = ittt->second;
666 switch(tss->m_state) {
667 case ticket_state::begun:
668 break;
669 case ticket_state::prepared:
670 return error_code::prepared;
671 case ticket_state::committed:
672 return error_code::committed;
673 case ticket_state::aborted:
674 return error_code::aborted;
675 }
676
677 if(tss->m_shard_states[shard_idx].m_state
678 != shard_state_type::rolling_back) {
679 m_log->error(
680 "Rollback response for",
681 ticket_number,
682 "when shard",
683 shard_idx,
684 "not in rolling back state. Actual state:",
685 static_cast<int>(tss->m_shard_states[shard_idx].m_state));
686 return error_code::invalid_shard_state;
687 }
688
689 if(res.has_value()) {
690 m_log->error("Shard rollback error");
691 return error_code::rollback_error;
692 }
693
694 auto& s_state = tss->m_shard_states[shard_idx];
695
696 s_state.m_state = shard_state_type::rolled_back;
697 s_state.m_key_states.clear();
698 m_log->trace(this,
699 "Shard",
700 shard_idx,
701 "rolled back for",
702 ticket_number);
703
704 for(auto& shard : tss->m_shard_states) {
705 if(shard.second.m_state != shard_state_type::rolled_back) {
706 m_log->trace(this,
707 "Shard",
708 shard.first,
709 "not yet rolled back for",
710 ticket_number,
711 ". Shard state:",
712 static_cast<int>(shard.second.m_state));
713 return std::nullopt;
714 }
715 }
716
717 m_log->trace(this, "All shards rolled back for", ticket_number);
718
719 tss->m_state = ticket_state::aborted;
720 callback = true;
721 return std::nullopt;
722 }();
723
724 if(maybe_error.has_value()) {
725 result_callback(maybe_error.value());
726 } else if(callback) {
727 result_callback(std::nullopt);
728 }
729 }
730
731 void impl::handle_find_key(
732 ticket_number_type ticket_number,
733 key_type key,
734 lock_type locktype,
735 try_lock_callback_type result_callback,
736 std::optional<parsec::directory::interface::key_location_return_type>
737 res) {
738 auto maybe_error = [&]() -> std::optional<try_lock_return_type> {
739 std::unique_lock l(m_mut);
740 assert(res < m_shards.size());
741 auto ticket = m_tickets.find(ticket_number);
742 if(ticket == m_tickets.end()) {
743 m_log->error("Unknown ticket number");
744 return error_code::unknown_ticket;
745 }
746
747 auto tss = ticket->second;
748 switch(tss->m_state) {
749 case ticket_state::begun:
750 break;
751 case ticket_state::prepared:
752 return error_code::prepared;
753 case ticket_state::committed:
754 return error_code::committed;
755 case ticket_state::aborted:
756 return error_code::aborted;
757 }
758
759 if(!res.has_value()) {
760 return error_code::directory_unreachable;
761 }
762
763 auto shard_idx = res.value();
764 auto& ss = tss->m_shard_states[shard_idx];
765 auto first_lock = ss.m_key_states.empty();
766 auto it = ss.m_key_states.find(key);
767 if(it != ss.m_key_states.end()
768 && it->second.m_key_state == key_state::locked
769 && it->second.m_locktype >= locktype) {
770 assert(it->second.m_value.has_value());
771 return it->second.m_value.value();
772 }
773
774 auto& ks = ss.m_key_states[key];
775
776 ks.m_key_state = key_state::locking;
777 ks.m_locktype = locktype;
778
779 if(!m_shards[shard_idx]->try_lock(
780 ticket_number,
781 m_broker_id,
782 key,
783 locktype,
784 first_lock,
785 [=, this](const parsec::runtime_locking_shard::interface::
786 try_lock_return_type& lock_res) {
787 handle_lock(ticket_number,
788 key,
789 shard_idx,
790 result_callback,
791 lock_res);
792 })) {
793 m_log->error("Failed to make try_lock shard request");
794 return error_code::shard_unreachable;
795 }
796 return std::nullopt;
797 }();
798
799 if(maybe_error.has_value()) {
800 result_callback(maybe_error.value());
801 }
802 }
803
804 void impl::handle_finish(
805 const finish_callback_type& result_callback,
806 ticket_number_type ticket_number,
807 uint64_t shard_idx,
809 auto callback = false;
810 auto maybe_error = [&]() -> std::optional<error_code> {
811 std::unique_lock lll(m_mut);
812 auto ittt = m_tickets.find(ticket_number);
813 if(ittt == m_tickets.end()) {
814 return error_code::unknown_ticket;
815 }
816
817 auto tss = ittt->second;
818 switch(tss->m_state) {
819 case ticket_state::begun:
820 return error_code::begun;
821 case ticket_state::prepared:
822 return error_code::prepared;
823 case ticket_state::committed:
824 break;
825 case ticket_state::aborted:
826 return error_code::aborted;
827 }
828
829 if(tss->m_shard_states[shard_idx].m_state
830 != shard_state_type::finishing) {
831 m_log->error(
832 "Finish response for",
833 ticket_number,
834 "when shard",
835 shard_idx,
836 "not in finishing state. Actual state:",
837 static_cast<int>(tss->m_shard_states[shard_idx].m_state));
838 return error_code::invalid_shard_state;
839 }
840
841 if(res.has_value()) {
842 m_log->error("Shard finish error");
843 return error_code::finish_error;
844 }
845
846 tss->m_shard_states[shard_idx].m_state
847 = shard_state_type::finished;
848 m_log->trace(this,
849 "Shard",
850 shard_idx,
851 "finished for",
852 ticket_number);
853
854 for(auto& shard : tss->m_shard_states) {
855 if(shard.second.m_state != shard_state_type::finished) {
856 m_log->trace(this,
857 "Shard",
858 shard.first,
859 "not yet finished for",
860 ticket_number,
861 ". Shard state:",
862 static_cast<int>(shard.second.m_state));
863 return std::nullopt;
864 }
865 }
866
867 m_log->trace(this, "All shards finished for", ticket_number);
868
869 m_tickets.erase(ittt);
870
871 callback = true;
872 return std::nullopt;
873 }();
874
875 if(maybe_error.has_value()) {
876 result_callback(maybe_error.value());
877 } else if(callback) {
878 result_callback(std::nullopt);
879 }
880 }
881
882 auto impl::recover(recover_callback_type result_callback) -> bool {
883 // Do not allow recovery when tickets are in-flight
884 auto maybe_tickets = [&]() {
885 std::unique_lock l(m_mut);
886 return !m_tickets.empty();
887 }();
888 if(maybe_tickets) {
889 return false;
890 }
891 for(uint64_t i = 0; i < m_shards.size(); i++) {
892 auto& s = m_shards[i];
893 auto success = s->get_tickets(
894 m_broker_id,
895 [&, result_callback, i](
897 get_tickets_return_type& res) {
898 handle_get_tickets(result_callback, i, res);
899 });
900 if(!success) {
901 return false;
902 }
903 }
904 return true;
905 }
906
907 void
908 impl::handle_get_tickets(const recover_callback_type& result_callback,
909 uint64_t shard_idx,
911 get_tickets_return_type& res) {
912 auto done = false;
913 auto maybe_error = std::visit(
914 overloaded{[&](const runtime_locking_shard::interface::
915 get_tickets_success_type& tickets)
916 -> std::optional<error_code> {
917 std::unique_lock l(m_mut);
918 m_recovery_tickets.emplace(shard_idx, tickets);
919 if(m_recovery_tickets.size() != m_shards.size()) {
920 return std::nullopt;
921 }
922 for(auto& [s, ts] : m_recovery_tickets) {
923 for(auto& [ticket_number, t_state] : ts) {
924 auto& ticket = m_tickets[ticket_number];
925 if(!ticket) {
926 ticket = std::make_shared<state>();
927 }
928 switch(t_state) {
931 ticket->m_shard_states[s].m_state
932 = shard_state_type::begun;
933 break;
936 ticket->m_shard_states[s].m_state
937 = shard_state_type::committed;
938 break;
941 ticket->m_shard_states[s].m_state
942 = shard_state_type::prepared;
943 break;
946 ticket->m_shard_states[s].m_state
947 = shard_state_type::wounded;
948 break;
949 }
950 }
951 }
952 if(m_tickets.empty()) {
953 done = true;
954 return std::nullopt;
955 }
956 m_recovery_tickets.clear();
957 return do_recovery(result_callback);
958 },
959 [&](const runtime_locking_shard::error_code& /* e */)
960 -> std::optional<error_code> {
961 return error_code::get_tickets_error;
962 }},
963 res);
964 if(maybe_error.has_value()) {
965 result_callback(maybe_error.value());
966 } else if(done) {
967 result_callback(std::nullopt);
968 }
969
970 m_log->trace(this, "Broker handled get_tickets for shard", shard_idx);
971 }
972
973 auto impl::do_recovery(const recover_callback_type& result_callback)
974 -> std::optional<error_code> {
975 for(auto [ticket_number, ticket] : m_tickets) {
976 size_t committed{};
977 for(auto& [sidx, t_state] : ticket->m_shard_states) {
978 switch(t_state.m_state) {
979 case shard_state_type::begun:
980 case shard_state_type::prepared:
981 case shard_state_type::wounded:
982 break;
983 case shard_state_type::committed:
984 committed++;
985 break;
986 default:
987 m_log->fatal(this,
988 "Found invalid shard "
989 "state during recovery");
990 }
991 }
992 if(committed == ticket->m_shard_states.size()) {
993 ticket->m_state = ticket_state::committed;
994 auto success = finish(
995 ticket_number,
996 [&, result_callback](finish_return_type fin_res) {
997 handle_recovery_finish(result_callback, fin_res);
998 });
999 if(!success) {
1000 return error_code::shard_unreachable;
1001 }
1002 } else if(committed > 0) {
1003 ticket->m_state = ticket_state::prepared;
1004 auto success = commit(
1005 ticket_number,
1006 {},
1007 [&, result_callback, tn = ticket_number](
1008 const commit_return_type& comm_res) {
1009 handle_recovery_commit(result_callback, tn, comm_res);
1010 });
1011 if(!success) {
1012 return error_code::shard_unreachable;
1013 }
1014 } else {
1015 ticket->m_state = ticket_state::begun;
1016 auto success
1017 = rollback(ticket_number,
1018 [&, result_callback, tn = ticket_number](
1019 rollback_return_type roll_res) {
1020 handle_recovery_rollback(result_callback,
1021 tn,
1022 roll_res);
1023 });
1024 if(!success) {
1025 return error_code::shard_unreachable;
1026 }
1027 }
1028 }
1029 return std::nullopt;
1030 }
1031
1032 void
1033 impl::handle_recovery_commit(const recover_callback_type& result_callback,
1034 ticket_number_type ticket_number,
1035 const commit_return_type& res) {
1036 if(res.has_value()) {
1037 result_callback(error_code::commit_error);
1038 return;
1039 }
1040
1041 auto success
1042 = finish(ticket_number,
1043 [&, result_callback](finish_return_type fin_res) {
1044 handle_recovery_finish(result_callback, fin_res);
1045 });
1046 if(!success) {
1047 result_callback(error_code::shard_unreachable);
1048 }
1049 }
1050
1051 void
1052 impl::handle_recovery_finish(const recover_callback_type& result_callback,
1053 finish_return_type res) {
1054 if(res.has_value()) {
1055 result_callback(error_code::finish_error);
1056 return;
1057 }
1058 auto done = [&]() {
1059 std::unique_lock l(m_mut);
1060 return m_tickets.empty();
1061 }();
1062 if(done) {
1063 result_callback(std::nullopt);
1064 }
1065 }
1066
1067 void impl::handle_recovery_rollback(
1068 const recover_callback_type& result_callback,
1069 ticket_number_type ticket_number,
1070 rollback_return_type res) {
1071 if(res.has_value()) {
1072 result_callback(error_code::rollback_error);
1073 return;
1074 }
1075 auto success
1076 = finish(ticket_number,
1077 [&, result_callback](finish_return_type fin_res) {
1078 handle_recovery_finish(result_callback, fin_res);
1079 });
1080 if(!success) {
1081 result_callback(error_code::shard_unreachable);
1082 }
1083 }
1084}
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
auto to_hex() const -> std::string
Returns a hex string representation of the contents of the buffer.
Definition buffer.cpp:62
impl(runtime_locking_shard::broker_id_type broker_id, std::vector< std::shared_ptr< runtime_locking_shard::interface > > shards, std::shared_ptr< ticket_machine::interface > ticketer, std::shared_ptr< directory::interface > directory, std::shared_ptr< logging::log > logger)
Constructor.
auto begin(begin_callback_type result_callback) -> bool override
Requests a new ticket number from the ticket machine.
std::function< void(recover_return_type)> recover_callback_type
Callback function type for a recovery operation.
std::function< void(rollback_return_type)> rollback_callback_type
Callback function type for a rollback operation.
std::function< void(ticketnum_or_errcode_type)> begin_callback_type
Callback function type for a begin operation.
std::optional< error_code > finish_return_type
Return type from a finish operation.
@ ticket_number_assignment
Error during ticket number assignment.
std::function< void(finish_return_type)> finish_callback_type
Callback function type for a finish operation.
std::function< void(commit_return_type)> commit_callback_type
Callback function type for a commit operation.
std::function< void(try_lock_return_type)> try_lock_callback_type
Callback function type for a try lock operation.
std::optional< std::variant< error_code, runtime_locking_shard::error_code > > rollback_return_type
Return type from a rollback operation.
Interface for a directory. Maps keys to shard IDs.
std::optional< shard_error > prepare_return_type
Return type from a prepare operation. An error, if applicable.
std::optional< shard_error > commit_return_type
Return type from a commit operation. An error code, if applicable.
std::optional< shard_error > finish_return_type
Return type from a finish operation. An error code, if applicable.
std::variant< value_type, shard_error > try_lock_return_type
Return type from a try lock operation.
std::optional< shard_error > rollback_return_type
Return type from a rollback operation.
std::variant< ticket_number_range_type, error_code > get_ticket_number_return_type
Return value from the ticket machine.
runtime_locking_shard::state_update_type state_update_type
Shard state updates type.
ticket_machine::ticket_number_type ticket_number_type
Ticket number type.
error_code
Error codes returned by methods on shards.
@ wounded
Request invalid because ticket is in the wounded state.
@ committed
Request invalid because ticket is in the committed state.
@ wounded
Wounded, not holding any locks.
@ begun
Begun, may still hold locks or be rolled-back.
@ committed
Committed, not holding any locks.
@ buffer
A singular RLP value (byte array)
SipHash function to generate STL data structure hash keys for system IDs.
Definition hashmap.hpp:27
Variant handler template.