OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
agent/impl.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "impl.hpp"
7
10
11#include <atomic>
12
13namespace cbdc::parsec::agent {
14 impl::impl(std::shared_ptr<logging::log> logger,
17 std::shared_ptr<broker::interface> broker,
19 parameter_type param,
20 exec_callback_type result_callback,
21 broker::lock_type initial_lock_type,
22 bool is_readonly_run,
23 std::shared_ptr<secp256k1_context> secp,
24 std::shared_ptr<thread_pool> t_pool)
25 : interface(std::move(function),
26 std::move(param),
27 std::move(result_callback)),
28 m_log(std::move(logger)),
29 m_cfg(std::move(cfg)),
30 m_runner_factory(std::move(runner_factory)),
31 m_broker(std::move(broker)),
32 m_initial_lock_type(is_readonly_run ? broker::lock_type::read
33 : initial_lock_type),
34 m_is_readonly_run(is_readonly_run),
35 m_secp(std::move(secp)),
36 m_threads(std::move(t_pool)) {}
37
38 auto impl::exec() -> bool {
39 std::unique_lock l(m_mut);
40 switch(m_state) {
41 // In these states we can start again from the beginning
42 case state::init:
43 case state::begin_sent:
44 case state::begin_failed:
45 break;
46
47 // We already have a ticket number but need to start again
48 case state::rollback_complete:
49 m_result = std::nullopt;
50 m_wounded = false;
51 m_restarted = true;
52 do_start();
53 return true;
54
55 // Re-run commit
56 case state::commit_failed:
57 [[fallthrough]];
58 case state::commit_sent:
59 do_commit();
60 return true;
61
62 // Re-run rollback with prior error type flag
63 case state::rollback_failed:
64 [[fallthrough]];
65 case state::rollback_sent:
66 do_rollback(m_permanent_error);
67 return true;
68
69 // Rollback first so we can start fresh
70 case state::function_get_sent:
71 case state::function_get_failed:
72 case state::function_failed:
73 case state::function_started:
74 do_rollback(false);
75 return true;
76
77 // Re-run finish
78 case state::finish_sent:
79 [[fallthrough]];
80 case state::finish_failed:
81 // Committed but transient error running finish, cannot
82 // rollback, need to retry finish
83 do_finish();
84 return true;
85
86 // End states, cannot re-run exec
87 case state::function_get_error:
88 case state::commit_error:
89 case state::function_exception:
90 case state::finish_complete:
91 return true;
92 }
93
94 m_result = std::nullopt;
95 m_state = state::begin_sent;
96 auto success = m_broker->begin(
98 handle_begin(res);
99 });
100
101 if(!success) {
102 m_state = state::begin_failed;
103 m_log->error("Failed to contact broker to begin");
104 m_result = error_code::broker_unreachable;
105 do_result();
106 }
107
108 return true;
109 }
110
111 void impl::handle_begin(broker::interface::ticketnum_or_errcode_type res) {
112 std::unique_lock l(m_mut);
113 if(m_state != state::begin_sent) {
114 m_log->warn("handle_begin while not in begin_sent state");
115 return;
116 }
117 std::visit(
119 m_ticket_number = n;
120 do_start();
121 },
122 [&](const broker::interface::error_code& /* e */) {
123 m_state = state::begin_failed;
124 m_log->error(
125 "Broker failed to assign a ticket number");
127 do_result();
128 }},
129 res);
130 }
131
132 void impl::do_start() {
133 std::unique_lock l(m_mut);
134 assert(m_ticket_number.has_value());
135 assert(m_state == state::begin_sent
136 || m_state == state::rollback_complete);
137 m_state = state::function_get_sent;
138
139 if(m_is_readonly_run && get_function().size() == 0) {
140 // If this is a read-only run and the function key is empty, the
141 // runner will handle retrieving any keys directly.
142 handle_function(broker::value_type());
143 } else if(get_function().size() == 1) {
144 // for one-byte functions, don't resolve but use the one byte and
145 // pass it along. This is used in the EVM runner to distinguish
146 // between sending a transaction or querying something (account
147 // data for instance). Since we don't know the from here for EVM,
148 // since it relies on the signature check, we only pass the
149 // transaction as m_param and let the runner figure it out.
150 handle_function(broker::value_type(get_function()));
151 } else {
152 m_log->trace("do_start ", get_function().to_hex());
153
154 auto tl_success = m_broker->try_lock(
155 m_ticket_number.value(),
156 get_function(),
157 m_initial_lock_type,
158 [this](
160 handle_function(lock_res);
161 });
162 if(!tl_success) {
164 m_log->error("Failed to contact broker to retrieve "
165 "function code");
167 do_result();
168 }
169 }
170 }
171
172 void impl::handle_try_lock_response(
175 std::unique_lock l(m_mut);
176 if(m_state != state::function_started) {
177 m_log->error("try_lock response while not in "
178 "function_started state");
179 return;
180 }
181 if(std::holds_alternative<runtime_locking_shard::shard_error>(res)) {
182 auto& err = std::get<runtime_locking_shard::shard_error>(res);
183 if(err.m_error_code
185 m_wounded = true;
186 }
187 }
188 res_cb(std::move(res));
189 }
190
191 auto
192 impl::do_try_lock_request(broker::key_type key,
193 broker::lock_type locktype,
195 -> bool {
196 // TODO: permissions for keys
197 std::unique_lock l(m_mut);
198 assert(m_ticket_number.has_value());
199 if(m_state != state::function_started) {
200 m_log->warn("do_try_lock_request while not in "
201 "function_started state");
202 return false;
203 }
204
205 if(m_is_readonly_run && locktype == broker::lock_type::write) {
206 m_log->warn("do_try_lock_request of type write when "
207 "m_is_readonly_run = true");
208 return false;
209 }
210
211 if(m_wounded) {
212 m_log->debug(
213 "Skipping lock request because ticket is already wounded");
214 handle_try_lock_response(
215 res_cb,
216 runtime_locking_shard::shard_error{
218 std::nullopt});
219 return true;
220 }
221
222 auto it = m_requested_locks.find(key);
223 if(it == m_requested_locks.end()
224 || it->second == broker::lock_type::read) {
225 m_requested_locks[key] = locktype;
226 }
227
228 auto actual_lock_type
229 = m_is_readonly_run ? broker::lock_type::read : locktype;
230 return m_broker->try_lock(
231 m_ticket_number.value(),
232 std::move(key),
233 actual_lock_type,
234 [this, cb = std::move(res_cb)](
236 handle_try_lock_response(cb, std::move(res));
237 });
238 }
239
240 void
241 impl::handle_function(const broker::interface::try_lock_return_type& res) {
242 std::unique_lock l(m_mut);
243 if(m_state != state::function_get_sent) {
244 m_log->warn(
245 "handle_function while not in function_get_sent state");
246 return;
247 }
248 std::visit(
250 [&](const broker::value_type& v) {
251 m_state = state::function_started;
252 auto reacq_locks
253 = std::make_shared<broker::held_locks_set_type>();
254 (*reacq_locks).swap(m_requested_locks);
255
256 if(reacq_locks->empty()) {
257 do_runner(v);
258 return;
259 }
260
261 // Re-acquire previously held locks upon retries
262 // immediately
263 m_log->trace("Re-acquiring locks for",
264 m_ticket_number.value());
265 auto reacquired = std::make_shared<std::atomic<size_t>>();
266 for(auto& it : *reacq_locks) {
267 m_log->trace("Re-acquiring lock on",
268 it.first.to_hex(),
269 "type",
270 static_cast<int>(it.second),
271 "for",
272 m_ticket_number.value());
273 auto success = do_try_lock_request(
274 it.first,
275 it.second,
276 [this, reacquired, v, reacq_locks](
277 const broker::interface::
278 try_lock_return_type&) {
279 std::unique_lock ll(m_mut);
280 auto reacq = (*reacquired)++;
281 m_log->trace("Re-acquired",
282 reacq + 1,
283 "of",
284 reacq_locks->size(),
285 "locks for",
286 m_ticket_number.value());
287
288 if(reacq + 1 == reacq_locks->size()) {
289 do_runner(v);
290 }
291 });
292 if(!success) {
293 m_log->error("Try lock request failed for",
294 m_ticket_number.value());
297 do_result();
298 return;
299 }
300 }
301 },
302 [&](broker::interface::error_code /* e */) {
304 m_log->error("Failed to retrieve function");
306 do_result();
307 },
308 [&](const runtime_locking_shard::shard_error& e) {
309 if(e.m_error_code
312 m_log->trace("Shard wounded ticket while "
313 "retrieving function");
314 } else {
316 m_log->error("Shard error retrieving function");
317 }
319 do_result();
320 }},
321 res);
322 }
323
324 void impl::do_runner(broker::value_type v) {
325 m_runner = m_runner_factory(
326 m_log,
327 m_cfg,
328 std::move(v),
329 get_param(),
330 m_is_readonly_run,
331 [this](const runner::interface::run_return_type& run_res) {
332 handle_run(run_res);
333 },
334 [this](broker::key_type key,
335 broker::lock_type locktype,
337 return do_try_lock_request(std::move(key),
338 locktype,
339 std::move(res_cb));
340 },
341 m_secp,
342 m_restarted ? nullptr : m_threads,
343 m_ticket_number.value());
344 auto run_res = m_runner->run();
345 if(!run_res) {
346 // telemetry_log("agent_handle_function", 2, start);
347 m_state = state::function_failed;
348 m_log->error("Failed to start contract execution");
349 m_result = error_code::function_execution;
350 do_result();
351 }
352 }
353
354 void impl::do_commit() {
355 std::unique_lock l(m_mut);
356 assert(m_state == state::function_started
357 || m_state == state::commit_failed
358 || m_state == state::commit_sent);
359 assert(m_result.has_value());
360 assert(m_ticket_number.has_value());
361 assert(std::holds_alternative<broker::state_update_type>(
362 m_result.value()));
363 m_state = state::commit_sent;
364 m_log->trace(this,
365 "Agent requesting commit for",
366 m_ticket_number.value());
367 auto payload = return_type();
368 if(!m_is_readonly_run) {
369 payload = std::get<broker::state_update_type>(m_result.value());
370 }
371 auto maybe_success = m_broker->commit(
372 m_ticket_number.value(),
373 payload,
374 [this](broker::interface::commit_return_type commit_res) {
375 handle_commit(std::move(commit_res));
376 });
377 if(!maybe_success) {
378 m_state = state::commit_failed;
379 m_log->error("Failed to contact broker for commit");
380 m_result = error_code::broker_unreachable;
381 do_result();
382 }
383 }
384
385 void impl::handle_run(const runner::interface::run_return_type& res) {
386 std::unique_lock l(m_mut);
387 if(m_state != state::function_started) {
388 m_log->warn("handle_run while not in function_started state");
389 return;
390 }
391 std::visit(
393 m_result = std::move(states);
394 do_commit();
395 },
396 [&](runner::interface::error_code e) {
397 if(e == runner::interface::error_code::wounded) {
398 m_state = state::function_failed;
399 } else if(e
400 == runner::interface::error_code::
401 internal_error) {
402 // Unexpected exception (e.g. write lock request
403 // within a read-only transaction such as one
404 // invoked via invocation of eth_call)
405 m_state = state::function_exception;
406 m_log->error(
407 this,
408 "Unexpected internal error encountered for",
409 m_ticket_number.value());
410 } else {
411 m_state = state::function_exception;
412 m_log->error(this,
413 "function execution failed for",
414 m_ticket_number.value());
415 }
416 m_result = error_code::function_execution;
417 do_result();
418 }},
419 res);
420 m_log->trace(this,
421 "Agent handle_run complete for",
422 m_ticket_number.value());
423 }
424
425 void impl::handle_commit(broker::interface::commit_return_type res) {
426 std::unique_lock l(m_mut);
427 if(m_state != state::commit_sent) {
428 m_log->warn(
429 this,
430 "Agent handle_commit while not in commit_sent state for",
431 m_ticket_number.value(),
432 "actual state:");
433 return;
434 }
435 if(res.has_value()) {
436 std::visit(
440 // Do not retry
441 m_state = state::commit_error;
442 } else {
443 m_state = state::commit_failed;
444 }
445 m_log->error("Broker error for commit for",
446 m_ticket_number.value());
447 m_result = error_code::commit_error;
448 do_result();
449 },
450 [&](const runtime_locking_shard::shard_error& e) {
451 if(e.m_error_code
453 m_state = state::commit_failed;
454 m_log->trace(m_ticket_number.value(),
455 "wounded during commit");
456 } else {
457 m_state = state::commit_error;
458 m_log->error("Shard error for commit for",
459 m_ticket_number.value());
460 }
461 m_result = error_code::commit_error;
462 do_result();
463 },
465 m_state = state::commit_error;
466 m_log->error("Shard error for commit for",
467 m_ticket_number.value());
468 m_result = error_code::commit_error;
469 do_result();
470 }},
471 res.value());
472 } else {
473 m_log->trace(this,
474 "Agent handled commit for",
475 m_ticket_number.value());
476 do_finish();
477 }
478 }
479
480 void impl::do_result() {
481 std::unique_lock l(m_mut);
482 assert(m_result.has_value());
483 switch(m_state) {
484 // No results should be reported in these states, fatal bugs
485 case state::init:
486 m_log->fatal("Result reported in initial state");
487 // System terminated by fatal()
488 case state::begin_sent:
489 m_log->fatal("Result reported in begin_sent state");
490 // System terminated by fatal()
491 case state::function_get_sent:
492 m_log->fatal("Result reported in function_get_sent state");
493 // System terminated by fatal()
494 case state::commit_sent:
495 m_log->fatal("Result reported in commit_sent state");
496 // System terminated by fatal()
497 case state::finish_sent:
498 m_log->fatal("Result reported in finish_sent state");
499 // System terminated by fatal()
500 case state::function_started:
501 m_log->fatal("Result reported in function_started state");
502 // System terminated by fatal()
503 case state::rollback_sent:
504 m_log->fatal("Result reported in rollback_sent state");
505 // System terminated by fatal()
506 case state::rollback_complete:
507 if(!std::holds_alternative<error_code>(m_result.value())
508 || std::get<error_code>(m_result.value())
509 != error_code::retry) {
510 m_log->fatal("Result reported in rollback_complete state "
511 "when result is not retry");
512 }
513 break;
514
515 // Failure due to transient problems, should retry
516 case state::begin_failed:
517 // Couldn't get a ticket number, no need to rollback
518 break;
519
520 case state::function_get_failed:
521 case state::function_failed:
522 case state::commit_failed:
523 do_rollback(false);
524 return;
525
526 case state::finish_failed:
527 // Committed but transient error running finish, cannot
528 // rollback, need to retry finish
529 [[fallthrough]];
530 case state::rollback_failed:
531 // Need to retry rollback
532 break;
533
534 // Failure due to permanent error, abort completely
535 case state::function_get_error:
536 case state::commit_error:
537 case state::function_exception:
538 do_rollback(true);
539 return;
540
541 // Ran to completion
542 case state::finish_complete:
543 m_log->debug(this, "Agent finished", m_ticket_number.value());
544 break;
545 }
546 get_result_callback()(m_result.value());
547 m_log->trace(this,
548 "Agent handled result for",
549 m_ticket_number.value());
550 }
551
552 void impl::do_finish() {
553 std::unique_lock l(m_mut);
554 assert(m_state == state::commit_sent || m_state == state::finish_failed
555 || m_state == state::finish_sent
556 || m_state == state::rollback_complete);
557 assert(m_ticket_number.has_value());
558 m_state = state::finish_sent;
559 m_log->trace(this,
560 "Agent requesting finish for",
561 m_ticket_number.value());
562 auto maybe_success = m_broker->finish(
563 m_ticket_number.value(),
564 [this](broker::interface::finish_return_type finish_res) {
565 handle_finish(finish_res);
566 });
567 if(!maybe_success) {
568 m_state = state::finish_failed;
569 m_log->error("Error contacting broker for finish");
570 m_result = error_code::broker_unreachable;
571 do_result();
572 }
573 }
574
575 void
576 impl::handle_finish(broker::interface::finish_return_type finish_res) {
577 std::unique_lock l(m_mut);
578 if(m_state != state::finish_sent) {
579 m_log->warn("handle_finish while not in finish_sent state");
580 return;
581 }
582 if(finish_res.has_value()) {
583 m_state = state::finish_failed;
584 m_log->error("Broker error for finish for",
585 m_ticket_number.value());
586 m_result = error_code::finish_error;
587 do_result();
588 } else {
589 m_state = state::finish_complete;
590 m_log->trace(this,
591 "Agent handled finish for",
592 m_ticket_number.value());
593 do_result();
594 }
595 }
596
597 void impl::do_rollback(bool finish) {
598 std::unique_lock l(m_mut);
599 assert(m_state == state::commit_failed
600 || m_state == state::rollback_sent
601 || m_state == state::function_exception
602 || m_state == state::function_failed
603 || m_state == state::commit_error
604 || m_state == state::function_get_failed
605 || m_state == state::function_get_error
606 || m_state == state::function_started
607 || m_state == state::rollback_failed);
608 assert(m_ticket_number.has_value());
609 m_log->trace(this, "Agent rolling back", m_ticket_number.value());
610 m_state = state::rollback_sent;
611 m_permanent_error = finish;
612 auto maybe_success = m_broker->rollback(
613 m_ticket_number.value(),
614 [this](broker::interface::rollback_return_type rollback_res) {
615 handle_rollback(rollback_res);
616 });
617 if(!maybe_success) {
618 m_state = state::rollback_failed;
619 m_log->error("Error contacting broker for rollback");
620 m_result = error_code::broker_unreachable;
621 do_result();
622 }
623 }
624
625 void impl::handle_rollback(
627 std::unique_lock l(m_mut);
628 if(m_state != state::rollback_sent) {
629 m_log->warn("handle_rollback while not in rollback_sent state");
630 return;
631 }
632 if(rollback_res.has_value()) {
633 m_state = state::rollback_failed;
634 m_result = error_code::rollback_error;
635 m_log->error("Broker error rolling back", m_ticket_number.value());
636 do_result();
637 return;
638 }
639
640 m_state = state::rollback_complete;
641 m_log->trace(this, "Agent rolled back", m_ticket_number.value());
642 if(m_permanent_error) {
643 m_log->trace(this,
644 "Agent finishing due to permanent error",
645 m_ticket_number.value());
646 do_finish();
647 } else {
648 // Transient error, try again
649 m_log->debug(this,
650 "Agent should restart",
651 m_ticket_number.value());
652 m_result = error_code::retry;
653 do_result();
654 }
655 }
656
657 impl::~impl() {
658 std::unique_lock l(m_mut);
659 if(m_state != state::finish_complete) {
660 m_log->fatal(
661 this,
662 "Agent state wasn't finished at destruction, state was:",
663 static_cast<int>(m_state));
664 }
665 }
666
667 auto impl::get_ticket_number() const
668 -> std::optional<ticket_machine::ticket_number_type> {
669 std::unique_lock l(m_mut);
670 return m_ticket_number;
671 }
672
673 auto impl::get_state() const -> state {
674 std::unique_lock l(m_mut);
675 return m_state;
676 }
677}
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
state
States for a ticket managed by this agent.
@ begin_sent
Begin request sent to broker.
@ function_get_failed
Function bytecode lock request failed.
@ function_get_error
Broker error during function bytecode lock.
@ function_get_sent
Function bytecode lock sent to broker.
@ function_started
Function execution started.
@ rollback_complete
Rollback complete.
@ begin_failed
Begin request failed.
auto exec() -> bool override
Initiates function execution.
impl(std::shared_ptr< logging::log > logger, cbdc::parsec::config cfg, runner::interface::factory_type runner_factory, std::shared_ptr< broker::interface > broker, runtime_locking_shard::key_type function, parameter_type param, exec_callback_type result_callback, broker::lock_type initial_lock_type, bool is_readonly_run, std::shared_ptr< secp256k1_context > secp, std::shared_ptr< thread_pool > t_pool)
Constructor.
@ function_retrieval
Error retrieving function bytecode.
@ broker_unreachable
Broker was unreachable.
@ ticket_number_assignment
Ticket number assignment failed.
std::function< void(exec_return_type)> exec_callback_type
Callback function type with function execution result.
auto get_function() const -> runtime_locking_shard::key_type
Return the key of the function bytecode managed by this agent.
std::function< std::unique_ptr< interface >( std::shared_ptr< logging::log > logger, const cbdc::parsec::config &cfg, runtime_locking_shard::value_type function, parameter_type param, bool is_readonly_run, runner::interface::run_callback_type result_callback, runner::interface::try_lock_callback_type try_lock_callback, std::shared_ptr< secp256k1_context >, std::shared_ptr< thread_pool > t_pool, ticket_number_type ticket_number)> factory_type
Factory function type for instantiating new runners.
std::variant< value_type, error_code, runtime_locking_shard::shard_error > try_lock_return_type
Return type from a try lock operation.
std::optional< std::variant< error_code, runtime_locking_shard::shard_error > > commit_return_type
Return type from a commit operation.
std::optional< error_code > finish_return_type
Return type from a finish operation.
error_code
Error codes returned by broker operations.
@ commit_hazard
A commit is attempted without associating update keys with ticket.
std::variant< ticket_number_type, error_code > ticketnum_or_errcode_type
Return type from a begin operation.
std::function< void(try_lock_return_type)> try_lock_callback_type
Callback function type for a try lock operation.
std::optional< std::variant< error_code, runtime_locking_shard::error_code > > rollback_return_type
Return type from a rollback operation.
auto to_hex(const evmc::address &addr) -> std::string
broker::state_update_type return_type
Type returned after function execution.
runtime_locking_shard::value_type value_type
Shard value type.
runtime_locking_shard::key_type key_type
Shard key type.
runtime_locking_shard::lock_type lock_type
Shard lock type.
error_code
Error codes returned by methods on shards.
@ wounded
Request invalid because ticket is in the wounded state.
std:: unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_update_type
Type for state updates to a shard. A map of keys and their new values.
uint64_t ticket_number_type
Type alias for a ticket number.
overloaded(Ts...) -> overloaded< Ts... >
Variant handler template.
Configuration parameters for a phase two system.