OpenCBDC Transaction Processor
Loading...
Searching...
No Matches
util.cpp
Go to the documentation of this file.
1// Copyright (c) 2021 MIT Digital Currency Initiative,
2// Federal Reserve Bank of Boston
3// Distributed under the MIT software license, see the accompanying
4// file COPYING or http://www.opensource.org/licenses/mit-license.php.
5
6#include "util.hpp"
7
8#include <future>
9#include <unordered_map>
10
11namespace cbdc::parsec {
12 auto split(const std::string& s, const std::string& delim)
13 -> std::vector<std::string> {
14 size_t pos_start{};
15 size_t pos_end{};
16 std::vector<std::string> ret;
17
18 while((pos_end = s.find(delim, pos_start)) != std::string::npos) {
19 auto token = s.substr(pos_start, pos_end - pos_start);
20 pos_start = pos_end + delim.size();
21 ret.emplace_back(token);
22 }
23
24 ret.emplace_back(s.substr(pos_start));
25 return ret;
26 }
27
28 auto parse_args(int argc, char** argv)
29 -> std::optional<std::unordered_map<std::string, std::string>> {
30 auto opts = std::unordered_map<std::string, std::string>();
31 auto args = cbdc::config::get_args(argc, argv);
32 for(size_t i = 1; i < args.size(); i++) {
33 const auto& arg = args[i];
34 auto arr = split(arg, "--");
35 if(arr.size() != 2 || !arr[0].empty() || arr[1].empty()) {
36 return std::nullopt;
37 }
38
39 auto elems = split(arr[1], "=");
40 if(elems.size() != 2) {
41 return std::nullopt;
42 }
43
44 opts.emplace(elems[0], elems[1]);
45 }
46 return opts;
47 }
48
49 auto
50 read_endpoints(const std::unordered_map<std::string, std::string>& opts,
51 const std::string& component_name)
52 -> std::optional<std::vector<network::endpoint_t>> {
53 auto ret = std::vector<network::endpoint_t>();
54
55 auto count_key = component_name + "_count";
56 auto it = opts.find(count_key);
57 if(it == opts.end()) {
58 return std::nullopt;
59 }
60 auto count = std::stoull(it->second);
61
62 for(size_t i = 0; i < count; i++) {
63 auto ep_key = component_name + std::to_string(i) + "_endpoint";
64 it = opts.find(ep_key);
65 if(it == opts.end()) {
66 return std::nullopt;
67 }
68 auto ep = cbdc::config::parse_ip_port(it->second);
69 ret.emplace_back(ep);
70 }
71
72 return ret;
73 }
74
76 const std::unordered_map<std::string, std::string>& opts,
77 const std::string& component_name)
78 -> std::optional<std::vector<std::vector<network::endpoint_t>>> {
79 auto ret = std::vector<std::vector<network::endpoint_t>>();
80
81 auto count_key = component_name + "_count";
82 auto it = opts.find(count_key);
83 if(it == opts.end()) {
84 return std::nullopt;
85 }
86 auto count = std::stoull(it->second);
87
88 for(size_t i = 0; i < count; i++) {
89 auto node_name = component_name + std::to_string(i);
90 auto eps = read_endpoints(opts, node_name);
91 if(!eps.has_value()) {
92 return std::nullopt;
93 }
94 ret.emplace_back(eps.value());
95 }
96
97 return ret;
98 }
99
100 auto read_config(int argc, char** argv) -> std::optional<config> {
101 // TODO: refactor, make config parsing generic
102 auto opts = parse_args(argc, argv);
103 if(!opts.has_value()) {
104 return std::nullopt;
105 }
106
107 auto cfg = config{};
108
109 constexpr auto component_id_key = "component_id";
110 auto it = opts->find(component_id_key);
111 if(it == opts->end()) {
112 return std::nullopt;
113 }
114 cfg.m_component_id = std::stoull(it->second);
115
116 cfg.m_loglevel = logging::log_level::trace;
117 constexpr auto loglevel_key = "loglevel";
118 it = opts->find(loglevel_key);
119 if(it != opts->end()) {
120 auto maybe_loglevel = logging::parse_loglevel(it->second);
121 if(maybe_loglevel.has_value()) {
122 cfg.m_loglevel = maybe_loglevel.value();
123 }
124 }
125
126 auto ticket_machine_endpoints
127 = read_endpoints(opts.value(), "ticket_machine");
128 if(!ticket_machine_endpoints.has_value()) {
129 return std::nullopt;
130 }
131 cfg.m_ticket_machine_endpoints = ticket_machine_endpoints.value();
132
133 constexpr auto node_id_key = "node_id";
134 it = opts->find(node_id_key);
135 if(it != opts->end()) {
136 cfg.m_node_id = std::stoull(it->second);
137 }
138
139 auto shard_endpoints = read_cluster_endpoints(opts.value(), "shard");
140 if(!shard_endpoints.has_value()) {
141 return std::nullopt;
142 }
143 cfg.m_shard_endpoints = shard_endpoints.value();
144
145 auto agent_endpoints = read_endpoints(opts.value(), "agent");
146 if(!agent_endpoints.has_value()) {
147 return std::nullopt;
148 }
149 cfg.m_agent_endpoints = agent_endpoints.value();
150
151 constexpr auto loadgen_txtype_key = "loadgen_txtype";
152 it = opts->find(loadgen_txtype_key);
153 if(it != opts->end()) {
154 const auto& val = it->second;
155 if(val == "transfer") {
156 cfg.m_load_type = load_type::transfer;
157 } else if(val == "erc20") {
158 cfg.m_load_type = load_type::erc20;
159 } else {
160 return std::nullopt;
161 }
162 }
163
164 cfg.m_contention_rate = 0.0;
165 constexpr auto contention_rate_key = "contention_rate";
166 it = opts->find(contention_rate_key);
167 if(it != opts->end()) {
168 cfg.m_contention_rate = std::stod(it->second);
169 }
170
171 constexpr auto default_loadgen_accounts = 1000;
172 cfg.m_loadgen_accounts = default_loadgen_accounts;
173 constexpr auto loadgen_accounts_key = "loadgen_accounts";
174 it = opts->find(loadgen_accounts_key);
175 if(it != opts->end()) {
176 cfg.m_loadgen_accounts = std::stoull(it->second);
177 }
178
179 constexpr auto runner_type_key = "runner_type";
180 it = opts->find(runner_type_key);
181 if(it != opts->end()) {
182 const auto& val = it->second;
183 if(val == "evm") {
184 cfg.m_runner_type = runner_type::evm;
185 } else if(val == "lua") {
186 cfg.m_runner_type = runner_type::lua;
187 } else {
188 return std::nullopt;
189 }
190 }
191
192 return cfg;
193 }
194
195 auto put_row(const std::shared_ptr<broker::interface>& broker,
197 broker::value_type value,
198 const std::function<void(bool)>& result_callback) -> bool {
199 auto begin_res = broker->begin([=](auto begin_ret) {
200 if(!std::holds_alternative<
202 begin_ret)) {
203 result_callback(false);
204 return;
205 }
206
207 auto ticket_number
208 = std::get<cbdc::parsec::ticket_machine::ticket_number_type>(
209 begin_ret);
210 auto lock_res = broker->try_lock(
211 ticket_number,
212 key,
214 [=](auto try_lock_res) {
215 if(!std::holds_alternative<cbdc::buffer>(try_lock_res)) {
216 result_callback(false);
217 return;
218 }
219 auto commit_res = broker->commit(
220 ticket_number,
221 {{key, value}},
222 [=](auto commit_ret) {
223 if(commit_ret.has_value()) {
224 result_callback(false);
225 return;
226 }
227 auto finish_res = broker->finish(
228 ticket_number,
229 [=](auto finish_ret) {
230 result_callback(!finish_ret.has_value());
231 });
232 if(!finish_res) {
233 result_callback(false);
234 return;
235 }
236 });
237 if(!commit_res) {
238 result_callback(false);
239 return;
240 }
241 });
242 if(!lock_res) {
243 result_callback(false);
244 return;
245 }
246 });
247 return begin_res;
248 }
249
250 auto get_row(const std::shared_ptr<broker::interface>& broker,
252 const std::function<void(
254 result_callback)
256 std::promise<cbdc::parsec::broker::interface::try_lock_return_type>
257 res_promise;
258 auto res_future = res_promise.get_future();
259
260 auto finish_cb = [=](auto finish_ret) {
261 if(finish_ret.has_value()) {
262 result_callback(finish_ret.value());
263 }
264 };
265
266 auto begin_cb = [&](auto begin_ret) {
267 if(!std::holds_alternative<
269 begin_ret)) {
270 res_promise.set_value(
272 ticket_number_assignment);
274 ticket_number_assignment);
275 return;
276 }
277
278 auto ticket_number
279 = std::get<cbdc::parsec::ticket_machine::ticket_number_type>(
280 begin_ret);
281 auto lock_res = broker->try_lock(
282 ticket_number,
283 key,
285 [&](auto try_lock_res) {
286 if(!std::holds_alternative<cbdc::buffer>(try_lock_res)) {
287 res_promise.set_value(
289 shard_unreachable);
290 result_callback(cbdc::parsec::broker::interface::
291 error_code::shard_unreachable);
292 return;
293 }
294 res_promise.set_value(try_lock_res);
295 result_callback(try_lock_res);
296
297 auto commit_cb = [=](auto commit_ret) {
298 if(commit_ret.has_value()) {
299 if(std::holds_alternative<
301 error_code>(commit_ret.value())) {
302 result_callback(
304 error_code>(
305 commit_ret.value()));
306 return;
307 }
308 result_callback(
310 shard_error>(commit_ret.value()));
311 return;
312 }
313 auto finish_res
314 = broker->finish(ticket_number, finish_cb);
315 if(!finish_res) {
316 result_callback(cbdc::parsec::broker::interface::
317 error_code::finish_error);
318 return;
319 }
320 };
321
322 auto commit_res = broker->commit(
323 ticket_number,
325 commit_cb);
326
327 if(!commit_res) {
328 result_callback(cbdc::parsec::broker::interface::
329 error_code::commit_error);
330 return;
331 }
332 });
333 if(!lock_res) {
335 shard_unreachable);
336 return;
337 }
338 };
339 // This never returns false. We don't need the return
340 // value in this context.
341 [[maybe_unused]] auto begin_success = broker->begin(begin_cb);
342 return res_future.get();
343 }
344}
Buffer to store and retrieve byte data.
Definition buffer.hpp:15
std::variant< value_type, error_code, runtime_locking_shard::shard_error > try_lock_return_type
Return type from a try lock operation.
error_code
Error codes returned by broker operations.
auto parse_ip_port(const std::string &in_str) -> network::endpoint_t
Definition config.cpp:13
auto get_args(int argc, char **argv) -> std::vector< std::string >
Converts c-args from an executable's main function into a vector of strings.
Definition config.cpp:751
@ trace
Fine-grained, fully verbose operating information.
auto parse_loglevel(const std::string &level) -> std::optional< log_level >
Parses a capitalized string into a log level.
Definition logging.cpp:70
std:: unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_update_type
Type for state updates to a shard. A map of keys and their new values.
@ read
Read lock. Multiple readers can hold a lock for the same key.
@ write
Write lock. Only one ticket can hold this lock at a time.
uint64_t ticket_number_type
Type alias for a ticket number.
@ evm
Ethereum-style transactions using EVM.
@ lua
Transaction semantics defined using Lua.
auto split(const std::string &s, const std::string &delim) -> std::vector< std::string >
Definition util.cpp:12
auto put_row(const std::shared_ptr< broker::interface > &broker, broker::key_type key, broker::value_type value, const std::function< void(bool)> &result_callback) -> bool
Asynchronously inserts the given row into the cluster.
Definition util.cpp:195
@ erc20
ERC20 token transfer.
@ transfer
Base token transfer.
auto read_endpoints(const std::unordered_map< std::string, std::string > &opts, const std::string &component_name) -> std::optional< std::vector< network::endpoint_t > >
Definition util.cpp:50
auto parse_args(int argc, char **argv) -> std::optional< std::unordered_map< std::string, std::string > >
Definition util.cpp:28
auto read_config(int argc, char **argv) -> std::optional< config >
Reads the configuration parameters from the program arguments.
Definition util.cpp:100
auto get_row(const std::shared_ptr< broker::interface > &broker, broker::key_type key, const std::function< void(cbdc::parsec::broker::interface::try_lock_return_type)> &result_callback) -> cbdc::parsec::broker::interface::try_lock_return_type
Asynchronously get the value stored at key from the cluster.
Definition util.cpp:250
auto read_cluster_endpoints(const std::unordered_map< std::string, std::string > &opts, const std::string &component_name) -> std::optional< std::vector< std::vector< network::endpoint_t > > >
Definition util.cpp:75
Configuration parameters for a phase two system.
size_t m_component_id
ID of the component the instance should be.