15 std::vector<std::shared_ptr<runtime_locking_shard::interface>> shards,
16 std::shared_ptr<ticket_machine::interface> ticketer,
17 std::shared_ptr<directory::interface> directory,
18 std::shared_ptr<logging::log> logger)
19 : m_broker_id(broker_id),
20 m_shards(std::move(shards)),
21 m_ticketer(std::move(ticketer)),
22 m_directory(std::move(directory)),
23 m_log(std::move(logger)) {}
26 if(!m_ticketer->get_ticket_number(
27 [
this, result_callback](
29 get_ticket_number_return_type> res) {
30 handle_ticket_number(result_callback, res);
32 m_log->error(
"Failed to request a ticket number");
33 result_callback(error_code::ticket_machine_unreachable);
39 void impl::handle_ticket_number(
40 begin_callback_type result_callback,
44 if(!res.has_value()) {
48 std::visit(
overloaded{[&](
const parsec::ticket_machine::interface::
49 ticket_number_range_type& n) {
51 std::unique_lock l(m_mut);
52 if(m_highest_ticket < n.second) {
53 m_highest_ticket = n.second;
57 std::make_shared<state>(
58 state{ticket_state::begun, {}}));
60 result_callback(n.first);
62 [&](
const parsec::ticket_machine::interface::
71 return m_highest_ticket;
74 void impl::handle_lock(
78 const try_lock_callback_type& result_callback,
81 auto result = std::visit(
84 -> try_lock_return_type {
85 std::unique_lock l(m_mut);
86 auto it = m_tickets.find(ticket_number);
87 if(it == m_tickets.end()) {
88 return error_code::unknown_ticket;
91 auto t_state = it->second;
92 auto& s_state = t_state->m_shard_states[shard_idx];
93 auto k_it = s_state.m_key_states.find(key);
94 if(k_it == s_state.m_key_states.end()) {
95 m_log->error(
"Shard state not found for key");
96 return error_code::invalid_shard_state;
99 if(k_it->second.m_key_state != key_state::locking) {
100 m_log->error(
"Shard state not locking");
101 return error_code::invalid_shard_state;
104 k_it->second.m_key_state = key_state::locked;
105 k_it->second.m_value = v;
107 m_log->trace(
this,
"Broker locked key for", ticket_number);
111 [&, key](parsec::runtime_locking_shard::shard_error e)
112 -> try_lock_return_type {
113 if(e.m_wounded_details.has_value()) {
115 e.m_wounded_details->m_wounding_ticket,
121 static_cast<int>(e.m_error_code),
129 result_callback(result);
136 auto maybe_error = [&]() -> std::optional<error_code> {
137 std::unique_lock l(m_mut);
138 auto it = m_tickets.find(ticket_number);
139 if(it == m_tickets.end()) {
140 return error_code::unknown_ticket;
143 auto t_state = it->second;
144 switch(t_state->m_state) {
145 case ticket_state::begun:
147 case ticket_state::prepared:
148 return error_code::prepared;
149 case ticket_state::committed:
150 return error_code::committed;
151 case ticket_state::aborted:
152 t_state->m_state = ticket_state::begun;
153 t_state->m_shard_states.clear();
154 m_log->trace(
this,
"broker restarting", ticket_number);
158 if(!m_directory->key_location(
161 key_location_return_type> res) {
162 handle_find_key(ticket_number,
168 m_log->error(
"Failed to make key location directory request");
169 return error_code::directory_unreachable;
175 if(maybe_error.has_value()) {
176 result_callback(maybe_error.value());
182 void impl::handle_prepare(
183 const commit_callback_type& commit_cb,
187 auto maybe_error = [&]() -> std::optional<commit_return_type> {
188 std::unique_lock ll(m_mut);
189 auto itt = m_tickets.find(ticket_number);
190 if(itt == m_tickets.end()) {
191 return error_code::unknown_ticket;
194 auto ts = itt->second;
195 switch(ts->m_state) {
196 case ticket_state::begun:
198 case ticket_state::prepared:
199 return error_code::prepared;
200 case ticket_state::committed:
201 return error_code::committed;
202 case ticket_state::aborted:
203 return error_code::aborted;
206 return do_handle_prepare(commit_cb,
213 m_log->trace(
this,
"Broker handled prepare for", ticket_number);
215 if(maybe_error.has_value()) {
217 "Broker calling prepare callback with error for",
219 commit_cb(maybe_error.value());
223 auto impl::do_handle_prepare(
224 const commit_callback_type& commit_cb,
225 ticket_number_type ticket_number,
226 const std::shared_ptr<state>& ts,
229 res) -> std::optional<commit_return_type> {
230 auto& ss = ts->m_shard_states[shard_idx].m_state;
231 if(ss != shard_state_type::preparing) {
235 "not in preparing state for",
240 if(res.has_value()) {
241 if(res.value().m_error_code
243 m_log->error(
"Shard error with prepare for", ticket_number);
245 m_log->trace(
"Shard",
249 for(
auto& [sidx, s] : ts->m_shard_states) {
250 if(s.m_state == shard_state_type::wounded) {
254 ss = shard_state_type::wounded;
260 "Broker setting shard",
264 ss = shard_state_type::prepared;
266 for(
auto& shard : ts->m_shard_states) {
267 if(shard.second.m_state != shard_state_type::prepared) {
272 ts->m_state = ticket_state::prepared;
274 auto maybe_error = do_commit(commit_cb, ticket_number, ts);
275 if(maybe_error.has_value()) {
276 return maybe_error.value();
281 auto impl::do_commit(
const commit_callback_type& commit_cb,
282 ticket_number_type ticket_number,
283 const std::shared_ptr<state>& ts)
284 -> std::optional<error_code> {
285 for(
auto& shard : ts->m_shard_states) {
286 if(ts->m_state == ticket_state::aborted) {
287 m_log->trace(
"Broker aborted during commit for",
291 if(shard.second.m_state == shard_state_type::committed) {
294 shard.second.m_state = shard_state_type::committing;
295 auto sidx = shard.first;
296 if(!m_shards[sidx]->commit(
298 [=,
this](
const parsec::runtime_locking_shard::interface::
299 commit_return_type& comm_res) {
300 handle_commit(commit_cb, ticket_number, sidx, comm_res);
302 m_log->error(
"Failed to make commit shard request");
303 return error_code::shard_unreachable;
309 void impl::handle_commit(
310 const commit_callback_type& commit_cb,
311 ticket_number_type ticket_number,
314 auto callback =
false;
315 auto maybe_error = [&]() -> std::optional<error_code> {
316 std::unique_lock lll(m_mut);
317 auto ittt = m_tickets.find(ticket_number);
318 if(ittt == m_tickets.end()) {
319 return error_code::unknown_ticket;
322 auto tss = ittt->second;
323 switch(tss->m_state) {
324 case ticket_state::begun:
325 return error_code::not_prepared;
326 case ticket_state::prepared:
328 case ticket_state::committed:
329 return error_code::committed;
330 case ticket_state::aborted:
331 return error_code::aborted;
334 if(tss->m_shard_states[shard_idx].m_state
335 != shard_state_type::committing) {
336 m_log->error(
"Commit result when shard not committing");
337 return error_code::invalid_shard_state;
340 if(res.has_value()) {
341 m_log->error(
"Error committing on shard");
342 return error_code::commit_error;
345 tss->m_shard_states[shard_idx].m_state
346 = shard_state_type::committed;
348 for(
auto& shard : tss->m_shard_states) {
349 if(shard.second.m_state != shard_state_type::committed) {
354 tss->m_state = ticket_state::committed;
357 m_log->trace(
this,
"Broker handled commit for", ticket_number);
362 if(maybe_error.has_value()) {
364 "Broker calling commit callback with error for",
366 commit_cb(maybe_error.value());
367 }
else if(callback) {
369 "Broker calling commit callback from handle_commit "
372 commit_cb(std::nullopt);
379 m_log->trace(
this,
"Broker got commit request for", ticket_number);
380 auto maybe_error = [&]() -> std::optional<error_code> {
381 std::unique_lock l(m_mut);
382 auto it = m_tickets.find(ticket_number);
383 if(it == m_tickets.end()) {
384 return error_code::unknown_ticket;
387 auto t_state = it->second;
388 switch(t_state->m_state) {
389 case ticket_state::begun:
391 case ticket_state::prepared:
393 case ticket_state::committed:
394 return error_code::committed;
395 case ticket_state::aborted:
396 return error_code::aborted;
399 auto keys_with_tickets = std::unordered_map<
403 for(
auto& shard : t_state->m_shard_states) {
404 for(
auto& key : shard.second.m_key_states) {
405 if(key.second.m_key_state == key_state::locking) {
406 m_log->error(
"Cannot commit, still waiting for locks");
407 return error_code::waiting_for_locks;
409 keys_with_tickets[key.first] =
true;
412 for(
auto& update_key : state_updates) {
413 if(keys_with_tickets.find(update_key.first)
414 == keys_with_tickets.end()) {
419 m_log->error(
"Update map contains keys not associated "
420 "with tickets. Aborting.");
421 return error_code::commit_hazard;
425 if(t_state->m_state == ticket_state::prepared) {
426 return do_commit(result_callback, ticket_number, t_state);
428 return do_prepare(result_callback,
434 if(maybe_error.has_value()) {
437 "Broker calling commit callback with error from commit for",
439 result_callback(maybe_error.value());
445 auto impl::do_prepare(
const commit_callback_type& result_callback,
447 const std::shared_ptr<state>& t_state,
449 -> std::optional<error_code> {
450 for(
auto& shard : t_state->m_shard_states) {
454 if(t_state->m_state == ticket_state::aborted) {
455 m_log->trace(
"Broker aborted during prepare for",
459 if(shard.second.m_state == shard_state_type::prepared) {
462 shard.second.m_state = shard_state_type::preparing;
463 auto shard_updates = state_update_type();
464 for(
const auto& update : state_updates) {
465 if(shard.second.m_key_states.find(update.first)
466 != shard.second.m_key_states.end()) {
467 shard_updates.emplace(update);
470 if(!m_shards[shard.first]->prepare(
473 std::move(shard_updates),
478 = shard.first](
const parsec::runtime_locking_shard::
479 interface::prepare_return_type& res) {
480 handle_prepare(result_callback,
485 m_log->error(
"Failed to make prepare shard request");
486 return error_code::shard_unreachable;
495 auto maybe_error = [&]() -> std::optional<error_code> {
496 std::unique_lock l(m_mut);
497 auto it = m_tickets.find(ticket_number);
498 if(it == m_tickets.end()) {
500 "Broker failing finish: [Unknown ticket] for ",
502 return error_code::unknown_ticket;
505 auto t_state = it->second;
506 switch(t_state->m_state) {
507 case ticket_state::begun:
509 "Broker failing finish: [State = Begun] for ",
511 return error_code::begun;
512 case ticket_state::prepared:
515 "Broker failing finish: [State = Prepared] for ",
517 return error_code::prepared;
518 case ticket_state::committed:
520 case ticket_state::aborted:
527 for(
auto& shard : t_state->m_shard_states) {
529 "Broker requesting finish on",
533 if(shard.second.m_state == shard_state_type::finished) {
535 "Broker skipping finish on",
539 " already finished");
542 auto sidx = shard.first;
543 assert(sidx < m_shards.size());
544 shard.second.m_state = shard_state_type::finishing;
545 if(!m_shards[sidx]->finish(
547 [=,
this](
const parsec::runtime_locking_shard::
549 handle_finish(result_callback,
554 m_log->error(
"Failed to make finish shard request");
555 return error_code::shard_unreachable;
562 if(maybe_error.has_value()) {
563 result_callback(maybe_error.value());
565 result_callback(std::nullopt);
573 m_log->trace(
this,
"Broker got rollback request for", ticket_number);
574 auto callback =
false;
575 auto maybe_error = [&]() -> std::optional<error_code> {
576 std::unique_lock l(m_mut);
577 auto it = m_tickets.find(ticket_number);
578 if(it == m_tickets.end()) {
579 return error_code::unknown_ticket;
582 auto t_state = it->second;
583 switch(t_state->m_state) {
584 case ticket_state::begun:
586 case ticket_state::prepared:
587 return error_code::prepared;
588 case ticket_state::committed:
589 return error_code::committed;
590 case ticket_state::aborted:
591 return error_code::aborted;
594 if(t_state->m_shard_states.empty()) {
596 t_state->m_state = ticket_state::aborted;
600 for(
auto& shard : t_state->m_shard_states) {
602 "Broker requesting rollback on",
606 if(shard.second.m_state == shard_state_type::rolled_back) {
608 "Broker skipping rollback on",
612 " already rolled back");
615 auto sidx = shard.first;
616 assert(sidx < m_shards.size());
617 shard.second.m_state = shard_state_type::rolling_back;
618 if(!m_shards[sidx]->rollback(
620 [=,
this](
const parsec::runtime_locking_shard::
622 handle_rollback(result_callback,
627 m_log->error(
"Failed to make rollback shard request");
628 return error_code::shard_unreachable;
636 "Broker initiated rollback request for",
639 if(maybe_error.has_value()) {
640 result_callback(maybe_error.value());
641 }
else if(callback) {
642 result_callback(std::nullopt);
646 "Broker handled rollback request for",
652 void impl::handle_rollback(
653 const rollback_callback_type& result_callback,
657 auto callback =
false;
658 auto maybe_error = [&]() -> std::optional<error_code> {
659 std::unique_lock lll(m_mut);
660 auto ittt = m_tickets.find(ticket_number);
661 if(ittt == m_tickets.end()) {
662 return error_code::unknown_ticket;
665 auto tss = ittt->second;
666 switch(tss->m_state) {
667 case ticket_state::begun:
669 case ticket_state::prepared:
670 return error_code::prepared;
671 case ticket_state::committed:
672 return error_code::committed;
673 case ticket_state::aborted:
674 return error_code::aborted;
677 if(tss->m_shard_states[shard_idx].m_state
678 != shard_state_type::rolling_back) {
680 "Rollback response for",
684 "not in rolling back state. Actual state:",
685 static_cast<int>(tss->m_shard_states[shard_idx].m_state));
686 return error_code::invalid_shard_state;
689 if(res.has_value()) {
690 m_log->error(
"Shard rollback error");
691 return error_code::rollback_error;
694 auto& s_state = tss->m_shard_states[shard_idx];
696 s_state.m_state = shard_state_type::rolled_back;
697 s_state.m_key_states.clear();
704 for(
auto& shard : tss->m_shard_states) {
705 if(shard.second.m_state != shard_state_type::rolled_back) {
709 "not yet rolled back for",
712 static_cast<int>(shard.second.m_state));
717 m_log->trace(
this,
"All shards rolled back for", ticket_number);
719 tss->m_state = ticket_state::aborted;
724 if(maybe_error.has_value()) {
725 result_callback(maybe_error.value());
726 }
else if(callback) {
727 result_callback(std::nullopt);
731 void impl::handle_find_key(
732 ticket_number_type ticket_number,
735 try_lock_callback_type result_callback,
736 std::optional<parsec::directory::interface::key_location_return_type>
738 auto maybe_error = [&]() -> std::optional<try_lock_return_type> {
739 std::unique_lock l(m_mut);
740 assert(res < m_shards.size());
741 auto ticket = m_tickets.find(ticket_number);
742 if(ticket == m_tickets.end()) {
743 m_log->error(
"Unknown ticket number");
744 return error_code::unknown_ticket;
747 auto tss = ticket->second;
748 switch(tss->m_state) {
749 case ticket_state::begun:
751 case ticket_state::prepared:
752 return error_code::prepared;
753 case ticket_state::committed:
754 return error_code::committed;
755 case ticket_state::aborted:
756 return error_code::aborted;
759 if(!res.has_value()) {
760 return error_code::directory_unreachable;
763 auto shard_idx = res.value();
764 auto& ss = tss->m_shard_states[shard_idx];
765 auto first_lock = ss.m_key_states.empty();
766 auto it = ss.m_key_states.find(key);
767 if(it != ss.m_key_states.end()
768 && it->second.m_key_state == key_state::locked
769 && it->second.m_locktype >= locktype) {
770 assert(it->second.m_value.has_value());
771 return it->second.m_value.value();
774 auto& ks = ss.m_key_states[key];
776 ks.m_key_state = key_state::locking;
777 ks.m_locktype = locktype;
779 if(!m_shards[shard_idx]->try_lock(
785 [=,
this](
const parsec::runtime_locking_shard::interface::
786 try_lock_return_type& lock_res) {
787 handle_lock(ticket_number,
793 m_log->error(
"Failed to make try_lock shard request");
794 return error_code::shard_unreachable;
799 if(maybe_error.has_value()) {
800 result_callback(maybe_error.value());
804 void impl::handle_finish(
805 const finish_callback_type& result_callback,
806 ticket_number_type ticket_number,
809 auto callback =
false;
810 auto maybe_error = [&]() -> std::optional<error_code> {
811 std::unique_lock lll(m_mut);
812 auto ittt = m_tickets.find(ticket_number);
813 if(ittt == m_tickets.end()) {
814 return error_code::unknown_ticket;
817 auto tss = ittt->second;
818 switch(tss->m_state) {
819 case ticket_state::begun:
820 return error_code::begun;
821 case ticket_state::prepared:
822 return error_code::prepared;
823 case ticket_state::committed:
825 case ticket_state::aborted:
826 return error_code::aborted;
829 if(tss->m_shard_states[shard_idx].m_state
830 != shard_state_type::finishing) {
832 "Finish response for",
836 "not in finishing state. Actual state:",
837 static_cast<int>(tss->m_shard_states[shard_idx].m_state));
838 return error_code::invalid_shard_state;
841 if(res.has_value()) {
842 m_log->error(
"Shard finish error");
843 return error_code::finish_error;
846 tss->m_shard_states[shard_idx].m_state
847 = shard_state_type::finished;
854 for(
auto& shard : tss->m_shard_states) {
855 if(shard.second.m_state != shard_state_type::finished) {
859 "not yet finished for",
862 static_cast<int>(shard.second.m_state));
867 m_log->trace(
this,
"All shards finished for", ticket_number);
869 m_tickets.erase(ittt);
875 if(maybe_error.has_value()) {
876 result_callback(maybe_error.value());
877 }
else if(callback) {
878 result_callback(std::nullopt);
884 auto maybe_tickets = [&]() {
885 std::unique_lock l(m_mut);
886 return !m_tickets.empty();
891 for(uint64_t i = 0; i < m_shards.size(); i++) {
892 auto& s = m_shards[i];
893 auto success = s->get_tickets(
895 [&, result_callback, i](
897 get_tickets_return_type& res) {
898 handle_get_tickets(result_callback, i, res);
908 impl::handle_get_tickets(
const recover_callback_type& result_callback,
911 get_tickets_return_type& res) {
913 auto maybe_error = std::visit(
914 overloaded{[&](
const runtime_locking_shard::interface::
915 get_tickets_success_type& tickets)
916 -> std::optional<error_code> {
917 std::unique_lock l(m_mut);
918 m_recovery_tickets.emplace(shard_idx, tickets);
919 if(m_recovery_tickets.size() != m_shards.size()) {
922 for(
auto& [s, ts] : m_recovery_tickets) {
923 for(
auto& [ticket_number, t_state] : ts) {
924 auto& ticket = m_tickets[ticket_number];
926 ticket = std::make_shared<state>();
931 ticket->m_shard_states[s].m_state
932 = shard_state_type::begun;
936 ticket->m_shard_states[s].m_state
937 = shard_state_type::committed;
941 ticket->m_shard_states[s].m_state
942 = shard_state_type::prepared;
946 ticket->m_shard_states[s].m_state
947 = shard_state_type::wounded;
952 if(m_tickets.empty()) {
956 m_recovery_tickets.clear();
957 return do_recovery(result_callback);
960 -> std::optional<error_code> {
961 return error_code::get_tickets_error;
964 if(maybe_error.has_value()) {
965 result_callback(maybe_error.value());
967 result_callback(std::nullopt);
970 m_log->trace(
this,
"Broker handled get_tickets for shard", shard_idx);
973 auto impl::do_recovery(
const recover_callback_type& result_callback)
974 -> std::optional<error_code> {
975 for(
auto [ticket_number, ticket] : m_tickets) {
977 for(
auto& [sidx, t_state] : ticket->m_shard_states) {
978 switch(t_state.m_state) {
979 case shard_state_type::begun:
980 case shard_state_type::prepared:
981 case shard_state_type::wounded:
983 case shard_state_type::committed:
988 "Found invalid shard "
989 "state during recovery");
992 if(committed == ticket->m_shard_states.size()) {
993 ticket->m_state = ticket_state::committed;
994 auto success = finish(
996 [&, result_callback](finish_return_type fin_res) {
997 handle_recovery_finish(result_callback, fin_res);
1000 return error_code::shard_unreachable;
1002 }
else if(committed > 0) {
1003 ticket->m_state = ticket_state::prepared;
1004 auto success = commit(
1007 [&, result_callback, tn = ticket_number](
1008 const commit_return_type& comm_res) {
1009 handle_recovery_commit(result_callback, tn, comm_res);
1012 return error_code::shard_unreachable;
1015 ticket->m_state = ticket_state::begun;
1017 = rollback(ticket_number,
1018 [&, result_callback, tn = ticket_number](
1019 rollback_return_type roll_res) {
1020 handle_recovery_rollback(result_callback,
1025 return error_code::shard_unreachable;
1029 return std::nullopt;
1033 impl::handle_recovery_commit(
const recover_callback_type& result_callback,
1034 ticket_number_type ticket_number,
1035 const commit_return_type& res) {
1036 if(res.has_value()) {
1037 result_callback(error_code::commit_error);
1042 = finish(ticket_number,
1043 [&, result_callback](finish_return_type fin_res) {
1044 handle_recovery_finish(result_callback, fin_res);
1047 result_callback(error_code::shard_unreachable);
1052 impl::handle_recovery_finish(
const recover_callback_type& result_callback,
1053 finish_return_type res) {
1054 if(res.has_value()) {
1055 result_callback(error_code::finish_error);
1059 std::unique_lock l(m_mut);
1060 return m_tickets.empty();
1063 result_callback(std::nullopt);
1067 void impl::handle_recovery_rollback(
1068 const recover_callback_type& result_callback,
1069 ticket_number_type ticket_number,
1070 rollback_return_type res) {
1071 if(res.has_value()) {
1072 result_callback(error_code::rollback_error);
1076 = finish(ticket_number,
1077 [&, result_callback](finish_return_type fin_res) {
1078 handle_recovery_finish(result_callback, fin_res);
1081 result_callback(error_code::shard_unreachable);