-
Notifications
You must be signed in to change notification settings - Fork 55
Description
kick method is used to unbury several tasks in the tube. After analyzing this method for drivers fifo, fifottl, utube, utubettl, I came to the conclusion, that it still possible to get a data race and incorrect, unexpected result.
queue/queue/abstract/driver/fifo.lua
Lines 158 to 173 in aa7c092
| -- unbury several tasks | |
| function method.kick(self, count) | |
| for i = 1, count do | |
| local task = self.space.index.status:min{ state.BURIED } | |
| if task == nil then | |
| return i - 1 | |
| end | |
| if task[2] ~= state.BURIED then | |
| return i - 1 | |
| end | |
| task = self.space:update(task[1], {{ '=', 2, state.READY }}) | |
| self.on_task_change(task, 'kick') | |
| end | |
| return count | |
| end |
We take a task with minimal task_id and BURIED status. After that we are updating it, commiting. But lua can also yield before WAL write. So this very task could also be taken in a parallel kick call for the second time.
Possible fix: do changes inside a transaction, like in put, take methods. But there is a subtle point here (which is described in a comment below).
queue/queue/abstract/driver/fifo.lua
Lines 101 to 117 in aa7c092
| -- Taking the minimum is an implicit transactions, so it is | |
| -- always done with 'read-confirmed' mvcc isolation level. | |
| -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. | |
| -- It is hapenning because 'min' for several takes in parallel will be the same since | |
| -- read confirmed isolation level makes visible all transactions that finished the commit. | |
| -- To fix it we wrap it with box.begin/commit and set right isolation level. | |
| -- Current fix does not resolve that bug in situations when we already are in transaction | |
| -- since it will open nested transactions. | |
| -- See https://github.com/tarantool/queue/issues/207 | |
| -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ | |
| if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then | |
| box.begin({txn_isolation = 'read-committed'}) | |
| task = self.space.index.status:min{state.READY} | |
| box.commit() | |
| else | |
| task = self.space.index.status:min{state.READY} | |
| end |
### Tasks
- [ ] `fifo`
- [ ] `fifottl`
- [ ] `utube`
- [ ] `utubettl`