12 : m_log(std::move(logger)) {}
20 auto callbacks = pending_callbacks_list_type();
21 auto w_details = std::optional<wounded_details>();
22 auto maybe_error = [&]() -> std::optional<error_code> {
23 std::unique_lock<std::mutex> l(m_mut);
25 m_log->trace(ticket_number,
28 static_cast<int>(locktype));
30 auto it = m_tickets.find(ticket_number);
31 if(first_lock && it != m_tickets.end()) {
32 m_log->fatal(ticket_number,
33 "called try_lock with first lock but ticket "
36 if(it == m_tickets.end()) {
38 m_log->error(ticket_number,
39 "called try_lock with unknown ticket");
42 it = m_tickets.emplace(ticket_number, ticket_state_type{})
45 auto& ticket = it->second;
49 m_log->error(ticket_number,
"called try_lock after prepare");
54 m_log->error(ticket_number,
"called try_lock after commit");
61 m_log->trace(ticket_number,
62 "called try_lock after being wounded");
63 w_details = ticket.m_wounded_details;
68 if(
auto lock_it = ticket.m_locks_held.find(key);
69 lock_it != ticket.m_locks_held.end()
70 && lock_it->second >= locktype) {
73 "tried to acquire already held lock");
77 if(ticket.m_queued_locks.find(key)
78 != ticket.m_queued_locks.end()) {
79 m_log->warn(ticket_number,
80 "tried to acquire already queued lock");
84 ticket.m_broker_id = broker_id;
87 auto& state_element = m_state[key];
88 auto& lock = state_element.m_lock;
93 lock_queue_element_type{locktype, std::move(result_callback)});
94 ticket.m_queued_locks.insert(key);
97 auto waiting_on = get_waiting_on(ticket_number, locktype, lock);
99 = wound_tickets(std::move(key), waiting_on, ticket_number);
101 w_details = ticket.m_wounded_details;
103 m_log->trace(
this,
"shard handled try_lock for", ticket_number);
107 if(maybe_error.has_value()) {
108 result_callback(
shard_error{maybe_error.value(), w_details});
111 for(
auto& callback : callbacks) {
112 callback.m_callback(std::move(callback.m_returning));
119 auto impl::wound_tickets(
121 const std::vector<ticket_number_type>& blocking_tickets,
123 auto callbacks = pending_callbacks_list_type();
124 auto keys = key_set_type();
125 for(
auto blocking_ticket_number : blocking_tickets) {
126 auto& blocking_ticket = m_tickets[blocking_ticket_number];
135 blocking_ticket.m_wounded_details = {blocked_ticket, key};
137 auto [wounded_callbacks, affected_keys]
138 = release_locks(blocking_ticket_number, blocking_ticket);
141 std::make_move_iterator(wounded_callbacks.begin()),
142 std::make_move_iterator(wounded_callbacks.end()));
143 keys.merge(affected_keys);
146 keys.insert(std::move(key));
148 auto acquire_callbacks = acquire_locks(keys);
149 callbacks.insert(callbacks.end(),
150 std::make_move_iterator(acquire_callbacks.begin()),
151 std::make_move_iterator(acquire_callbacks.end()));
159 -> std::vector<ticket_number_type> {
160 auto waiting_on = std::vector<ticket_number_type>();
161 auto younger_ticket = [&](
auto blocking_ticket_number) {
162 return ticket_number < blocking_ticket_number;
166 std::copy_if(lock.m_readers.begin(),
167 lock.m_readers.end(),
168 std::back_inserter(waiting_on),
172 if(lock.m_writer.has_value()) {
173 if(younger_ticket(lock.m_writer.value())) {
174 waiting_on.push_back(lock.m_writer.value());
184 auto result = [&]() -> std::optional<shard_error> {
185 std::unique_lock<std::mutex> l(m_mut);
187 auto ticket_it = m_tickets.find(ticket_number);
188 if(ticket_it == m_tickets.end()) {
191 "does not exist on shard for prepare");
194 auto& ticket = ticket_it->second;
198 m_log->warn(ticket_number,
199 "called prepare but already prepared");
204 m_log->warn(ticket_number,
205 "called prepare but already committed");
211 m_log->debug(ticket_number,
212 "called prepare after being wounded");
214 ticket.m_wounded_details};
217 if(!ticket.m_queued_locks.empty()) {
218 m_log->error(ticket_number,
"still has queued locks");
222 for(
auto& [key, value] : state_update) {
223 auto lk_it = ticket.m_locks_held.find(key);
224 if(lk_it == ticket.m_locks_held.end()) {
225 m_log->warn(ticket_number,
226 "wanted state update for unheld lock");
231 m_log->warn(ticket_number,
232 "wanted state update for read lock");
238 ticket.m_state_update = std::move(state_update);
243 result_callback(result);
249 auto callbacks = pending_callbacks_list_type();
250 auto result = [&]() -> std::optional<shard_error> {
251 std::unique_lock<std::mutex> l(m_mut);
253 auto ticket_it = m_tickets.find(ticket_number);
254 if(ticket_it == m_tickets.end()) {
257 "does not exist on shard for commit");
260 auto& ticket = ticket_it->second;
264 m_log->warn(ticket_number,
"called commit but not prepared");
268 for(
auto&& [key, value] : ticket.m_state_update) {
269 m_state[key].m_value = std::move(value);
272 auto [wounded_callbacks, affected_keys]
273 = release_locks(ticket_number, ticket);
274 assert(wounded_callbacks.empty());
275 callbacks = acquire_locks(affected_keys);
278 std::make_move_iterator(wounded_callbacks.begin()),
279 std::make_move_iterator(wounded_callbacks.end()));
283 m_log->trace(
this,
"Shard executed commit for", ticket_number);
287 for(
auto& callback : callbacks) {
289 "Shard calling callback for",
290 callback.m_ticket_number);
291 callback.m_callback(std::move(callback.m_returning));
293 result_callback(result);
296 "Shard called all callbacks for commit on",
302 ticket_state_type& ticket)
303 -> std::pair<pending_callbacks_list_type, key_set_type> {
304 auto callbacks = pending_callbacks_list_type();
306 for(
const auto& lock_key : ticket.m_queued_locks) {
307 auto& queued_element = m_state[lock_key];
308 auto& lk = queued_element.m_lock;
309 auto queue_node = lk.m_queue.extract(ticket_number);
310 auto& queued_lock_element = queue_node.mapped();
312 callbacks.emplace_back(pending_callback_element_type{
313 std::move(queued_lock_element.m_callback),
317 auto keys = std::move(ticket.m_queued_locks);
318 ticket.m_queued_locks = key_set_type();
320 for(
auto& [lock_key, lt] : ticket.m_locks_held) {
322 auto& locked_element = m_state[lock_key];
323 auto& lk = locked_element.m_lock;
326 m_log->trace(
"Releasing read lock on",
330 lk.m_readers.erase(ticket_number);
334 m_log->trace(
"Releasing write lock on",
340 keys.insert(lock_key);
342 ticket.m_locks_held.clear();
344 return {callbacks, keys};
347 auto impl::acquire_locks(
const key_set_type& keys)
348 -> pending_callbacks_list_type {
349 auto callbacks = pending_callbacks_list_type();
350 for(
const auto& key : keys) {
352 while(acquire_lock(key, callbacks)) {}
359 auto callbacks = pending_callbacks_list_type();
360 auto result = [&]() -> std::optional<shard_error> {
361 std::unique_lock<std::mutex> l(m_mut);
363 auto ticket_it = m_tickets.find(ticket_number);
364 if(ticket_it == m_tickets.end()) {
367 "does not exist on shard for rollback");
370 auto& ticket = ticket_it->second;
372 auto [wounded_callbacks, affected_keys]
373 = release_locks(ticket_number, ticket);
374 callbacks = acquire_locks(affected_keys);
377 std::make_move_iterator(wounded_callbacks.begin()),
378 std::make_move_iterator(wounded_callbacks.end()));
383 m_tickets.erase(ticket_it);
385 m_log->trace(
this,
"Shard handled rollback for", ticket_number);
390 for(
auto& callback : callbacks) {
391 callback.m_callback(std::move(callback.m_returning));
393 result_callback(result);
400 auto maybe_error = [&]() -> std::optional<shard_error> {
401 std::unique_lock l(m_mut);
402 auto ticket_it = m_tickets.find(ticket_number);
403 if(ticket_it == m_tickets.end()) {
406 "does not exist on shard for finish");
410 auto& ticket = ticket_it->second;
415 "finish requested but not committed");
419 m_tickets.erase(ticket_it);
421 m_log->trace(
this,
"Shard handled finish for", ticket_number);
426 result_callback(maybe_error);
434 std::unique_lock l(m_mut);
436 for(
auto& [ticket_number, ticket] : m_tickets) {
437 if(ticket.m_broker_id == broker_id) {
438 ret.emplace(ticket_number, ticket.m_state);
444 result_callback(result);
451 std::unique_lock l(m_mut);
452 if(!m_tickets.empty() && !m_state.empty()) {
453 m_log->error(
"Shard state is not empty, cannot recover");
456 m_state.reserve(state.size());
457 for(
auto&& [k, v] : state) {
458 m_state.emplace(k, state_element_type{v, {}});
460 m_tickets.reserve(tickets.size());
461 for(
auto&& [tn, t] : tickets) {
462 auto ticket = ticket_state_type{};
463 ticket.m_broker_id = t.m_broker_id;
470 for(
const auto& [k, v] : t.m_state_update) {
472 m_state[k].m_lock.m_writer = tn;
476 ticket.m_state_update = t.m_state_update;
477 m_tickets.emplace(tn, std::move(ticket));
482 auto impl::acquire_lock(
const key_type& key,
483 pending_callbacks_list_type& callbacks) ->
bool {
484 auto& locked_element = m_state[key];
485 auto& lk = locked_element.m_lock;
486 if(lk.m_queue.empty()) {
489 auto acquire_next =
true;
490 auto queue_node = lk.m_queue.begin();
491 const auto& queued_ticket_number = queue_node->first;
492 auto& queued_lock_element = queue_node->second;
493 auto& queued_ticket = m_tickets[queued_ticket_number];
498 if(lk.m_writer.has_value()) {
501 m_log->trace(
"Assigning read lock on",
504 queued_ticket_number);
505 lk.m_readers.insert(queued_ticket_number);
513 if(lk.m_readers.size() > 1 || lk.m_writer.has_value()) {
516 if(lk.m_readers.size() == 1) {
517 if(*lk.m_readers.begin() != queued_ticket_number) {
522 lk.m_readers.clear();
524 m_log->trace(
"Assigning write lock on",
527 queued_ticket_number);
528 lk.m_writer = queued_ticket_number;
529 acquire_next =
false;
531 queued_ticket.m_queued_locks.erase(key);
532 queued_ticket.m_locks_held[key] = queued_lock_element.m_type;
534 callbacks.emplace_back(pending_callback_element_type{
535 std::move(queued_lock_element.m_callback),
536 locked_element.m_value,
537 queued_ticket_number});
538 lk.m_queue.erase(queue_node);
Buffer to store and retrieve byte data.
auto rollback(ticket_number_type ticket_number, rollback_callback_type result_callback) -> bool override
Rolls back an uncommitted ticket.
auto recover(const replicated_shard::state_type &state, const replicated_shard::tickets_type &tickets) -> bool
Restores the state of another shard instance.
auto finish(ticket_number_type ticket_number, finish_callback_type result_callback) -> bool override
Deletes a committed or rolled-back ticket.
auto prepare(ticket_number_type ticket_number, broker_id_type broker_id, state_update_type state_update, prepare_callback_type result_callback) -> bool override
Prepares a ticket with the given state updates.
auto commit(ticket_number_type ticket_number, commit_callback_type result_callback) -> bool override
Commits a previously prepared ticket.
auto try_lock(ticket_number_type ticket_number, broker_id_type broker_id, key_type key, lock_type locktype, bool first_lock, try_lock_callback_type result_callback) -> bool override
Locks the given key for a ticket and returns the associated value.
auto get_tickets(broker_id_type broker_id, get_tickets_callback_type result_callback) -> bool override
Returns tickets managed by the given broker.
impl(std::shared_ptr< logging::log > logger)
Constructor.
std::function< void(prepare_return_type)> prepare_callback_type
Callback function type for the result of a prepare operation.
std::unordered_map< ticket_number_type, ticket_state > get_tickets_success_type
Return type from a successful get tickets operation.
std::function< void(get_tickets_return_type)> get_tickets_callback_type
Callback function type for the result of a get tickets operation.
std::function< void(rollback_return_type)> rollback_callback_type
Callback function type for the result of a rollback operation.
std::function< void(finish_return_type)> finish_callback_type
Callback function type for the result of a finish operation.
std::function< void(try_lock_return_type)> try_lock_callback_type
Function type for try lock operation results.
std::function< void(commit_return_type)> commit_callback_type
Callback function type for the result of a commit operation.
std::unordered_map< ticket_number_type, ticket_type > tickets_type
Type for the tickets list returned by the state machine.
@ prepared
Prepared, holds locks.
@ committed
Committed, not holding any locks.
std::unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_type
Type for state updates to a shard.
parsec::ticket_machine::ticket_number_type ticket_number_type
Type for a ticket number.
size_t broker_id_type
Type for the ID of a broker.
@ wounded
Request invalid because ticket is in the wounded state.
@ lock_not_held
Cannot apply requested state update because the ticket does not hold a write lock on the given key.
@ not_committed
Request invalid because ticket is not in the committed state.
@ lock_held
The ticket already holds the requested lock.
@ lock_queued
The requested lock is already queued for the given ticket.
@ prepared
Request invalid because ticket is in the prepared state.
@ state_update_with_read_lock
Cannot apply requested state update because the ticket only holds a read lock on the given key.
@ not_prepared
Cannot commit the ticket because the ticket has not been prepared.
@ unknown_ticket
The given ticket number is not known to this shard.
@ committed
Request invalid because ticket is in the committed state.
std:: unordered_map< key_type, value_type, hashing::const_sip_hash< key_type > > state_update_type
Type for state updates to a shard. A map of keys and their new values.
@ wounded
Wounded, not holding any locks.
@ prepared
Prepared, holds locks.
@ committed
Committed, not holding any locks.
lock_type
Types of key lock supported by shards.
@ read
Read lock. Multiple readers can hold a lock for the same key.
@ write
Write lock. Only one ticket can hold this lock at a time.