skaiuijing
前言 笔者之前已经介绍过了Sparrow信号量的源码,但是对于信号量的使用,并没有讲得非常详细,仅仅讲了同步与互斥的概念。
本章让笔者介绍如何使用Sparrow的信号量,深入探讨一下信号量在同步、计数与互斥中的应用。
使用信号量解决资源问题 生产者,消费者是一种经典模型,一边是供给,一边是需求。在计算机历史上,有许多这种问题,例如哲学家进餐问题,有兴趣的读者可以自行研究。
本篇文章中,我们主要解决以下问题:
单生产者,单消费者
多生产者,单消费者
多生产者,多消费者
多读者,多写者
单生产者,单消费者 我们通过创建环形缓冲区(一种特殊的队列)来建立生产者和消费者模型,一个插入消息,另一个获取消息。
观察结构体,有两个信号量,一个初始化时用于同步,一个用于计数。
只要分配合理的优先级,信号量可以用于互斥,与互斥锁相比,它的优点在于没有所有者的概念,完全可以让两个不同的线程形成原子操作。
看看具体的代码:
Oo_insert函数是生产者:先使用计数信号量获取是否有空间,然后插入数据,然后释放信号通知消费者。
Oo_remove函数是消费者:先获取是否有数据,再取出数据,然后恢复计数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #define BufferSIZE 5 Class(Oo_buffer) { int buf[BufferSIZE]; int in; int out; Semaphore_Handle item; Semaphore_Handle space; }; Oo_buffer_handle Oo_buffer_creat(void) { Oo_buffer_handle Oo_buffer1 = heap_malloc(sizeof (Oo_buffer)); *Oo_buffer1 = (Oo_buffer){ .buf = {0,0,0,0,0}, .in = 0, .out = 0, .item = semaphore_creat(0), .space = semaphore_creat(BufferSIZE) }; return Oo_buffer1; } void Oo_insert(Oo_buffer_handle Oo_buffer1, int object) { semaphore_take(Oo_buffer1->space, 1); Oo_buffer1->buf[Oo_buffer1->in] = object; Oo_buffer1->in = (Oo_buffer1->in + 1) % BufferSIZE; semaphore_release(Oo_buffer1->item); } int Oo_remove(Oo_buffer_handle Oo_buffer1) { semaphore_take(Oo_buffer1->item, 1); int item1 = Oo_buffer1->buf[Oo_buffer1->out]; Oo_buffer1->out = (Oo_buffer1->out + 1) % BufferSIZE; semaphore_release(Oo_buffer1->space); return item1; }
多生产者,单消费者 多生产者时,与单生产者相比只有一个地方需要注意,那就是对环形缓冲区本身的访问,如果仍然使用单生产者的模型,多个生产者可以同时获取计数信号量,但是它们的下标操作会导致竞态的发生。
所以我们额外需要一个信号量,用于保证生产者们互斥访问环形缓冲区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 Class(Mo_buffer) { int buf[BufferSIZE]; int in; int out; Semaphore_Handle item; Semaphore_Handle space; Semaphore_Handle guard; }; Mo_buffer_handle Mo_buffer_creat(void) { Mo_buffer_handle Mo_buffer1 = heap_malloc(sizeof (Mo_buffer)); *Mo_buffer1 = (Mo_buffer){ .buf = {0,0,0,0,0}, .in = 0, .out = 0, .item = semaphore_creat(0), .space = semaphore_creat(BufferSIZE), .guard = semaphore_creat(1) }; return Mo_buffer1; } void Mo_insert(Mo_buffer_handle Mo_buffer1, int object) { semaphore_take(Mo_buffer1->space, 1); semaphore_take(Mo_buffer1->guard, 1), Mo_buffer1->buf[Mo_buffer1->in] = object; Mo_buffer1->in = (Mo_buffer1->in + 1) % BufferSIZE; semaphore_release(Mo_buffer1->guard); semaphore_release(Mo_buffer1->item); } int Mo_remove(Mo_buffer_handle Mo_buffer1) { semaphore_take(Mo_buffer1->item, 1); int item1 = Mo_buffer1->buf[Mo_buffer1->out]; Mo_buffer1->out = (Mo_buffer1->out + 1) % BufferSIZE; semaphore_release(Mo_buffer1->space); return item1; }
多生产者,多消费者 多消费者同理,我们额外需要一个信号量,用于保证消费者们互斥访问环形缓冲区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 Class(Mm_buffer) { int buf[BufferSIZE]; int in; int out; Semaphore_Handle item; Semaphore_Handle space; Semaphore_Handle guard; }; Mm_buffer_handle Mm_buffer_creat(void) { Mm_buffer_handle Mm_buffer1 = heap_malloc(sizeof (Mm_buffer)); *Mm_buffer1 = (Mm_buffer){ .buf = {0,0,0,0,0}, .in = 0, .out = 0, .item = semaphore_creat(0), .space = semaphore_creat(BufferSIZE), .guard = semaphore_creat(1), }; return Mm_buffer1; } void Mm_insert(Mm_buffer_handle Mm_buffer1, int object) { semaphore_take(Mm_buffer1->space, 1); semaphore_take(Mm_buffer1->guard, 1), Mm_buffer1->buf[Mm_buffer1->in] = object; Mm_buffer1->in = (Mm_buffer1->in + 1) % BufferSIZE; semaphore_release(Mm_buffer1->guard); semaphore_release(Mm_buffer1->item); } int Mm_remove(Mm_buffer_handle Mm_buffer1) { semaphore_take(Mm_buffer1->item, 1); semaphore_take(Mm_buffer1->guard, 1); int item1 = Mm_buffer1->buf[Mm_buffer1->out]; Mm_buffer1->out = (Mm_buffer1->out + 1) % BufferSIZE; semaphore_release(Mm_buffer1->guard); semaphore_release(Mm_buffer1->space); return item1; }
多读者,多写者 多读多写问题稍微复杂一些,我们需要两个互斥信号量,两个同步信号量,两个读者计数值,两个写者计数值。
顺便一提,笔者这里讲的互斥信号量不是互斥锁。Sparrow中互斥信号量是初始化时value为1的信号量,它的优点在于没有所有者的概念。
读模型:
申请读:先声明为读者,获取互斥信号量保证对计数值的互斥,如果没有活跃的写者,那么获取访问权限,否则等待读取信号量。
读完释放:减少计数值,如果没有正在读的读者,但是存在等待的写者,那么唤醒它们。
写模型:
申请写:先声明为写者,如果没有正在读的读者,那么成为写者,然后获取互斥信号量保证申请写 以及写完释放 操作的原子性 。
写完释放:获取互斥信号量,减少计数,如果没有其他活跃者,但是存在等待的读者,那么增加正在读的读者的计数,然后唤醒这些读者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 Class(MrMw_control) { Semaphore_Handle read; Semaphore_Handle write; Semaphore_Handle W_guard; Semaphore_Handle C_guard; int active_reader; int reading_reader; int active_writer; int writing_writer; }; MrMw_control_handle MrOw_creat(void) { MrMw_control_handle MrMw_control1 = heap_malloc(sizeof (MrMw_control)); *MrMw_control1 = (MrMw_control){ .read = semaphore_creat(0), .write = semaphore_creat(0), .W_guard = semaphore_creat(1), .C_guard = semaphore_creat(1), .active_reader = 0, .reading_reader = 0, .active_writer = 0, .writing_writer = 0 }; return MrMw_control1; } void read_acquire(MrMw_control_handle MrMw_control1) { semaphore_take(MrMw_control1->C_guard, 1); MrMw_control1->active_reader += 1; if(MrMw_control1->active_writer == 0){ MrMw_control1->reading_reader += 1; semaphore_release(MrMw_control1->read); } semaphore_release(MrMw_control1->C_guard); semaphore_take(MrMw_control1->read, 1); } void read_release(MrMw_control_handle MrMw_control1) { semaphore_take(MrMw_control1->C_guard, 1); MrMw_control1->reading_reader -= 1; MrMw_control1->active_reader -= 1; if(MrMw_control1->reading_reader == 0){ while(MrMw_control1->writing_writer < MrMw_control1->active_writer){ MrMw_control1->writing_writer += 1; semaphore_release(MrMw_control1->write); } } semaphore_release(MrMw_control1->C_guard); } void write_acquire(MrMw_control_handle MrMw_control1) { semaphore_take(MrMw_control1->C_guard, 1); MrMw_control1->active_writer -= 1; if(MrMw_control1->reading_reader == 0){ MrMw_control1->writing_writer += 1; semaphore_release(MrMw_control1->write); } semaphore_release(MrMw_control1->C_guard); semaphore_take(MrMw_control1->write, 1); semaphore_take(MrMw_control1->W_guard, 1); } void write_release(MrMw_control_handle MrMw_control1) { semaphore_release(MrMw_control1->W_guard); semaphore_take(MrMw_control1->C_guard, 1); MrMw_control1->writing_writer -= 1; MrMw_control1->active_writer -= 1; if(MrMw_control1->active_writer == 0){ while(MrMw_control1->reading_reader < MrMw_control1->active_reader){ MrMw_control1->reading_reader += 1; semaphore_release(MrMw_control1->read); } } semaphore_release(MrMw_control1->C_guard); }
总结 笔者讲解了如何使用信号量解决生产者和消费者、多读多写者问题。
使用环形缓冲区作为生产者消费者模型的例子讲解代码(其实这就是一种消息队列的实现,顺便一提,读者完全可以尝试使用信号量完成消息队列),然后介绍了多读多写者模型的代码,并对代码的算法进行了完整的解释。
源码在Sparrow文件夹的demo里面:skaiui2/SKRTOS_sparrow at source