并发元语 CAS (compare and swap, 汇编级的原子操作), rust 对应的是 compare_exchange 和 compare_exchange_weak 参考 11. Memory Order.org
想要安全地改变一个值只能是同一时间可以改变该值的只能有一个线程, 而要达到这一目的可以用互斥锁或 cpu 锁 (原子操作) 互斥锁可以改成是软件保证的更大粒度的 atomic
事实上, 互斥锁也是可以通过原子操作 CAS 来实现, 以下用最简单的自旋实现
use std::sync::atomic::{AtomicBool, Ordering};
let lock = AtomicBool::new(false);
// acquire lock
while lock.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
// cas 比较重
while lock.load(Ordering::Relaxed) {
std::hint::spin_loop();
}
}
// release lock
lock.store(false, Ordering::Release);
spin loop 一般只在短时间内可以得到锁的时候时候, 否则会很浪费资源, 从而导致获得锁的时间更长
更通用的解决方案是: mutex 维护一个等待队列 (本身也是一个类 channel 的结构, 操作系统维护?), 线程获取锁失败就把自己挂起, 放入等待队列, 直到持有锁的线程释放锁后, 通知第一个等待的线程 (FIFO)
缺点: 线程上下文 (用户态和内核态) 切换代价很大, 频繁挂起再唤醒, 开销很大
另一种效率更高的做法 (也是目前最常见的实现) 是把 spin_loop 和 wait_list 结合, 先尝试 spin 一会, 不行就挂起到 wait_list, 这种做法不是严格的 FIFO, 所以有公平性问题
semaphore 可以看成初始值可以更大的 mutex 只有两个操作, 如下: (NOTE: 对于信号量的值的变化是原子操作)
- sem_wait(s): s > 0 时减一返回, s == 0 时阻塞线程
- sem_post(s): s 加一返回, 如果有等待的线程的话唤醒其中一个
可参考 tokio::sync::Semaphore
初始化一个初始值为 1 的信号量, 之后使用的时候在前后包上 sem_wait, sem_post 即可
sem_read |
sem_write |
read_count = 0
sem_wait(sem_read) read_count++ if read_count == 1 { rem_wait(sem_write) } // NOTE: if write, then read will blokc here, and other reads will be blokced at the first line sem_post(sem_read)
do something
sem_wait(sem_read) read_count– if read_count == 0 { rem_post(sem_write) } sem_post(sem_read)
sem_wait(sem_write) do something sem_post(sem_write)
sem_mutex |
sem_slots |
sem_items |
sem_wait(sem_items); sem_wait(sem_mutex); do something sem_post(sem_mutex); sem_post(sme_slots);
sem_wait(sem_slots); sem_wait(sem_mutex); do something sem_post(sem_mutex); sem_post(sme_items);
两类操作
- wait
- notify (one/all)
叫条件变量是因为通常需要和 mutex (或 semaphore) 和一个条件一起使用 wait 只是等待某一事件 (信号) 发生, 它并不是带着 (使那个条件成立) 的目的的 wait 的时候会 block 并暂时释放锁, 让别的线程可以继续从而可以得到修改后的条件 当某一事件或信号发生, 被通知的 condvar 会再次 acquire lock 条件判断的时候是 acquire lock 的状态, 所以不会有 data race 通知也都是需要手动调用 notify 而不是被 lock 的状态改变后自动调用
mpsc, mpmc bounded (sync), unbounded oneshot: 通常是通过存在的 mpsc 把 oneshot 的 sender 传过去 broadcast
- 一种是 read first (or read-preferring), 即只要有读, 写的 acquire 就会被搁置
- 另一种是 write first, 一旦有 write acquire, 之后的读都会被搁置
第一种实现简单, 但可能会导致一直不能更新数据, write starvation 第二种比较复杂, 可以有不同的实现
- 可以是 fairness 的 (完全保持 acquire 顺序, tokio 的 rwlock 就是这样, 具体的实现就是维护一个关于 task (read + writer) 的 queue, 但相对的效率可能不是最好)
- 可以是完全写优先的, 分别维护读和写的 waiting list, 每一个写完成的时候, 检查是否有写等待, 有的话唤醒, 没有就唤醒所有的读等待, 这会导致 read starvation
- 根据 tokio 的文档, rust std 的 rwlock 实现是操作系统相关的