17 std::shared_ptr<broker::interface> broker,
23 std::shared_ptr<secp256k1_context> secp,
24 std::shared_ptr<thread_pool> t_pool)
27 std::move(result_callback)),
28 m_log(std::move(logger)),
29 m_cfg(std::move(cfg)),
30 m_runner_factory(std::move(runner_factory)),
31 m_broker(std::move(broker)),
32 m_initial_lock_type(is_readonly_run ? broker::lock_type::read
34 m_is_readonly_run(is_readonly_run),
35 m_secp(std::move(secp)),
36 m_threads(std::move(t_pool)) {}
39 std::unique_lock l(m_mut);
43 case state::begin_sent:
44 case state::begin_failed:
48 case state::rollback_complete:
49 m_result = std::nullopt;
56 case state::commit_failed:
58 case state::commit_sent:
63 case state::rollback_failed:
65 case state::rollback_sent:
66 do_rollback(m_permanent_error);
70 case state::function_get_sent:
71 case state::function_get_failed:
72 case state::function_failed:
73 case state::function_started:
78 case state::finish_sent:
80 case state::finish_failed:
87 case state::function_get_error:
88 case state::commit_error:
89 case state::function_exception:
90 case state::finish_complete:
94 m_result = std::nullopt;
95 m_state = state::begin_sent;
96 auto success = m_broker->begin(
102 m_state = state::begin_failed;
103 m_log->error(
"Failed to contact broker to begin");
104 m_result = error_code::broker_unreachable;
112 std::unique_lock l(m_mut);
114 m_log->warn(
"handle_begin while not in begin_sent state");
125 "Broker failed to assign a ticket number");
132 void impl::do_start() {
133 std::unique_lock l(m_mut);
134 assert(m_ticket_number.has_value());
154 auto tl_success = m_broker->try_lock(
155 m_ticket_number.value(),
160 handle_function(lock_res);
164 m_log->error(
"Failed to contact broker to retrieve "
172 void impl::handle_try_lock_response(
175 std::unique_lock l(m_mut);
177 m_log->error(
"try_lock response while not in "
178 "function_started state");
181 if(std::holds_alternative<runtime_locking_shard::shard_error>(res)) {
182 auto& err = std::get<runtime_locking_shard::shard_error>(res);
188 res_cb(std::move(res));
197 std::unique_lock l(m_mut);
198 assert(m_ticket_number.has_value());
199 if(m_state != state::function_started) {
200 m_log->warn(
"do_try_lock_request while not in "
201 "function_started state");
205 if(m_is_readonly_run && locktype == broker::lock_type::write) {
206 m_log->warn(
"do_try_lock_request of type write when "
207 "m_is_readonly_run = true");
213 "Skipping lock request because ticket is already wounded");
214 handle_try_lock_response(
216 runtime_locking_shard::shard_error{
222 auto it = m_requested_locks.find(key);
223 if(it == m_requested_locks.end()
224 || it->second == broker::lock_type::read) {
225 m_requested_locks[key] = locktype;
228 auto actual_lock_type
229 = m_is_readonly_run ? broker::lock_type::read : locktype;
230 return m_broker->try_lock(
231 m_ticket_number.value(),
234 [
this, cb = std::move(res_cb)](
236 handle_try_lock_response(cb, std::move(res));
242 std::unique_lock l(m_mut);
245 "handle_function while not in function_get_sent state");
253 = std::make_shared<broker::held_locks_set_type>();
254 (*reacq_locks).swap(m_requested_locks);
256 if(reacq_locks->empty()) {
263 m_log->trace(
"Re-acquiring locks for",
264 m_ticket_number.value());
265 auto reacquired = std::make_shared<std::atomic<size_t>>();
266 for(
auto& it : *reacq_locks) {
267 m_log->trace(
"Re-acquiring lock on",
270 static_cast<int>(it.second),
272 m_ticket_number.value());
273 auto success = do_try_lock_request(
276 [
this, reacquired, v, reacq_locks](
277 const broker::interface::
278 try_lock_return_type&) {
279 std::unique_lock ll(m_mut);
280 auto reacq = (*reacquired)++;
281 m_log->trace(
"Re-acquired",
286 m_ticket_number.value());
288 if(reacq + 1 == reacq_locks->size()) {
293 m_log->error(
"Try lock request failed for",
294 m_ticket_number.value());
304 m_log->error(
"Failed to retrieve function");
308 [&](
const runtime_locking_shard::shard_error& e) {
312 m_log->trace(
"Shard wounded ticket while "
313 "retrieving function");
316 m_log->error(
"Shard error retrieving function");
325 m_runner = m_runner_factory(
331 [
this](
const runner::interface::run_return_type& run_res) {
337 return do_try_lock_request(std::move(key),
342 m_restarted ? nullptr : m_threads,
343 m_ticket_number.value());
344 auto run_res = m_runner->run();
347 m_state = state::function_failed;
348 m_log->error(
"Failed to start contract execution");
349 m_result = error_code::function_execution;
354 void impl::do_commit() {
355 std::unique_lock l(m_mut);
356 assert(m_state == state::function_started
357 || m_state == state::commit_failed
358 || m_state == state::commit_sent);
359 assert(m_result.has_value());
360 assert(m_ticket_number.has_value());
361 assert(std::holds_alternative<broker::state_update_type>(
363 m_state = state::commit_sent;
365 "Agent requesting commit for",
366 m_ticket_number.value());
368 if(!m_is_readonly_run) {
369 payload = std::get<broker::state_update_type>(m_result.value());
371 auto maybe_success = m_broker->commit(
372 m_ticket_number.value(),
375 handle_commit(std::move(commit_res));
378 m_state = state::commit_failed;
379 m_log->error(
"Failed to contact broker for commit");
380 m_result = error_code::broker_unreachable;
385 void impl::handle_run(
const runner::interface::run_return_type& res) {
386 std::unique_lock l(m_mut);
387 if(m_state != state::function_started) {
388 m_log->warn(
"handle_run while not in function_started state");
393 m_result = std::move(states);
396 [&](runner::interface::error_code e) {
397 if(e == runner::interface::error_code::wounded) {
398 m_state = state::function_failed;
400 == runner::interface::error_code::
405 m_state = state::function_exception;
408 "Unexpected internal error encountered for",
409 m_ticket_number.value());
411 m_state = state::function_exception;
413 "function execution failed for",
414 m_ticket_number.value());
416 m_result = error_code::function_execution;
421 "Agent handle_run complete for",
422 m_ticket_number.value());
426 std::unique_lock l(m_mut);
427 if(m_state != state::commit_sent) {
430 "Agent handle_commit while not in commit_sent state for",
431 m_ticket_number.value(),
435 if(res.has_value()) {
441 m_state = state::commit_error;
443 m_state = state::commit_failed;
445 m_log->error(
"Broker error for commit for",
446 m_ticket_number.value());
447 m_result = error_code::commit_error;
450 [&](
const runtime_locking_shard::shard_error& e) {
453 m_state = state::commit_failed;
454 m_log->trace(m_ticket_number.value(),
455 "wounded during commit");
457 m_state = state::commit_error;
458 m_log->error(
"Shard error for commit for",
459 m_ticket_number.value());
461 m_result = error_code::commit_error;
465 m_state = state::commit_error;
466 m_log->error(
"Shard error for commit for",
467 m_ticket_number.value());
468 m_result = error_code::commit_error;
474 "Agent handled commit for",
475 m_ticket_number.value());
480 void impl::do_result() {
481 std::unique_lock l(m_mut);
482 assert(m_result.has_value());
486 m_log->fatal(
"Result reported in initial state");
488 case state::begin_sent:
489 m_log->fatal(
"Result reported in begin_sent state");
491 case state::function_get_sent:
492 m_log->fatal(
"Result reported in function_get_sent state");
494 case state::commit_sent:
495 m_log->fatal(
"Result reported in commit_sent state");
497 case state::finish_sent:
498 m_log->fatal(
"Result reported in finish_sent state");
500 case state::function_started:
501 m_log->fatal(
"Result reported in function_started state");
503 case state::rollback_sent:
504 m_log->fatal(
"Result reported in rollback_sent state");
506 case state::rollback_complete:
507 if(!std::holds_alternative<error_code>(m_result.value())
508 || std::get<error_code>(m_result.value())
509 != error_code::retry) {
510 m_log->fatal(
"Result reported in rollback_complete state "
511 "when result is not retry");
516 case state::begin_failed:
520 case state::function_get_failed:
521 case state::function_failed:
522 case state::commit_failed:
526 case state::finish_failed:
530 case state::rollback_failed:
535 case state::function_get_error:
536 case state::commit_error:
537 case state::function_exception:
542 case state::finish_complete:
543 m_log->debug(
this,
"Agent finished", m_ticket_number.value());
546 get_result_callback()(m_result.value());
548 "Agent handled result for",
549 m_ticket_number.value());
552 void impl::do_finish() {
553 std::unique_lock l(m_mut);
554 assert(m_state == state::commit_sent || m_state == state::finish_failed
555 || m_state == state::finish_sent
556 || m_state == state::rollback_complete);
557 assert(m_ticket_number.has_value());
558 m_state = state::finish_sent;
560 "Agent requesting finish for",
561 m_ticket_number.value());
562 auto maybe_success = m_broker->finish(
563 m_ticket_number.value(),
565 handle_finish(finish_res);
568 m_state = state::finish_failed;
569 m_log->error(
"Error contacting broker for finish");
570 m_result = error_code::broker_unreachable;
577 std::unique_lock l(m_mut);
578 if(m_state != state::finish_sent) {
579 m_log->warn(
"handle_finish while not in finish_sent state");
582 if(finish_res.has_value()) {
583 m_state = state::finish_failed;
584 m_log->error(
"Broker error for finish for",
585 m_ticket_number.value());
586 m_result = error_code::finish_error;
589 m_state = state::finish_complete;
591 "Agent handled finish for",
592 m_ticket_number.value());
597 void impl::do_rollback(
bool finish) {
598 std::unique_lock l(m_mut);
599 assert(m_state == state::commit_failed
600 || m_state == state::rollback_sent
601 || m_state == state::function_exception
602 || m_state == state::function_failed
603 || m_state == state::commit_error
604 || m_state == state::function_get_failed
605 || m_state == state::function_get_error
606 || m_state == state::function_started
607 || m_state == state::rollback_failed);
608 assert(m_ticket_number.has_value());
609 m_log->trace(
this,
"Agent rolling back", m_ticket_number.value());
610 m_state = state::rollback_sent;
611 m_permanent_error = finish;
612 auto maybe_success = m_broker->rollback(
613 m_ticket_number.value(),
615 handle_rollback(rollback_res);
618 m_state = state::rollback_failed;
619 m_log->error(
"Error contacting broker for rollback");
620 m_result = error_code::broker_unreachable;
625 void impl::handle_rollback(
627 std::unique_lock l(m_mut);
628 if(m_state != state::rollback_sent) {
629 m_log->warn(
"handle_rollback while not in rollback_sent state");
632 if(rollback_res.has_value()) {
633 m_state = state::rollback_failed;
634 m_result = error_code::rollback_error;
635 m_log->error(
"Broker error rolling back", m_ticket_number.value());
640 m_state = state::rollback_complete;
641 m_log->trace(
this,
"Agent rolled back", m_ticket_number.value());
642 if(m_permanent_error) {
644 "Agent finishing due to permanent error",
645 m_ticket_number.value());
650 "Agent should restart",
651 m_ticket_number.value());
652 m_result = error_code::retry;
658 std::unique_lock l(m_mut);
659 if(m_state != state::finish_complete) {
662 "Agent state wasn't finished at destruction, state was:",
663 static_cast<int>(m_state));
667 auto impl::get_ticket_number() const
668 -> std::optional<ticket_machine::ticket_number_type> {
669 std::unique_lock l(m_mut);
670 return m_ticket_number;
673 auto impl::get_state() const ->
state {
674 std::unique_lock l(m_mut);
Buffer to store and retrieve byte data.
state
States for a ticket managed by this agent.
@ begin_sent
Begin request sent to broker.
@ function_get_failed
Function bytecode lock request failed.
@ function_get_error
Broker error during function bytecode lock.
@ function_get_sent
Function bytecode lock sent to broker.
@ function_started
Function execution started.
@ rollback_complete
Rollback complete.
@ begin_failed
Begin request failed.
auto exec() -> bool override
Initiates function execution.
impl(std::shared_ptr< logging::log > logger, cbdc::parsec::config cfg, runner::interface::factory_type runner_factory, std::shared_ptr< broker::interface > broker, runtime_locking_shard::key_type function, parameter_type param, exec_callback_type result_callback, broker::lock_type initial_lock_type, bool is_readonly_run, std::shared_ptr< secp256k1_context > secp, std::shared_ptr< thread_pool > t_pool)
Constructor.
@ function_retrieval
Error retrieving function bytecode.
@ broker_unreachable
Broker was unreachable.
@ ticket_number_assignment
Ticket number assignment failed.
std::function< void(exec_return_type)> exec_callback_type
Callback function type with function execution result.
auto get_function() const -> runtime_locking_shard::key_type
Return the key of the function bytecode managed by this agent.
std::function< std::unique_ptr< interface >( std::shared_ptr< logging::log > logger, const cbdc::parsec::config &cfg, runtime_locking_shard::value_type function, parameter_type param, bool is_readonly_run, runner::interface::run_callback_type result_callback, runner::interface::try_lock_callback_type try_lock_callback, std::shared_ptr< secp256k1_context >, std::shared_ptr< thread_pool > t_pool, ticket_number_type ticket_number)> factory_type
Factory function type for instantiating new runners.
std::variant< value_type, error_code, runtime_locking_shard::shard_error > try_lock_return_type
Return type from a try lock operation.
std::optional< std::variant< error_code, runtime_locking_shard::shard_error > > commit_return_type
Return type from a commit operation.
std::optional< error_code > finish_return_type
Return type from a finish operation.
error_code
Error codes returned by broker operations.
@ commit_hazard
A commit is attempted without associating update keys with ticket.
std::variant< ticket_number_type, error_code > ticketnum_or_errcode_type
Return type from a begin operation.
std::function< void(try_lock_return_type)> try_lock_callback_type
Callback function type for a try lock operation.
std::optional< std::variant< error_code, runtime_locking_shard::error_code > > rollback_return_type
Return type from a rollback operation.
auto to_hex(const evmc::address &addr) -> std::string
broker::state_update_type return_type
Type returned after function execution.
runtime_locking_shard::value_type value_type
Shard value type.
runtime_locking_shard::key_type key_type
Shard key type.
runtime_locking_shard::lock_type lock_type
Shard lock type.
error_code
Error codes returned by methods on shards.
@ wounded
Request invalid because ticket is in the wounded state.
std:: unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_update_type
Type for state updates to a shard. A map of keys and their new values.
lock_type
Types of key lock supported by shards.
uint64_t ticket_number_type
Type alias for a ticket number.
overloaded(Ts...) -> overloaded< Ts... >
Variant handler template.
Configuration parameters for a phase two system.