生产者消费者问题是一个经典的、多线程同步问题。
有两个线程: 一个生产者线程和一个消费者线程。两个线程共享一个初始为空、固定大小为n的缓存区。
生产着的工作是“生产”一段数据,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待,如此反复;
同时,只有缓冲区非空时,消费者才能从中取出数据,一次“消费”一段数据,否则必须等待,如此反复。
问题的核心是:
解决生产者消费者问题实际上是要解决线程间互斥关系问题和同步关系问题
由于缓冲区是临界资源,它一个时刻只允许一个生产者放入消息,或者一个消费者从中取出消息,所以这里我们需要解决一个互斥访问的问题。同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,所以我们还需要解决一个同步的问题。
/* * 程序清单:生产者消费者例子 * * 这个例子中将创建两个线程用于实现生产者消费者问题 *(1)生产者线程将cnt值每次加1并循环存入array数组的5个成员内; *(2)消费者线程将生产者中生产的数值打印出来,并累加求和 */ #include <rtthread.h> #define THREAD_PRIORITY 6 #define THREAD_STACK_SIZE 512 #define THREAD_TIMESLICE 5 /* 定义最大5个元素能够被产生 */ #define MAXSEM 5 /* 用于放置生产的整数数组 */ rt_uint32_t array[MAXSEM]; /* 指向生产者、消费者在array数组中的读写位置 */ static rt_uint32_t set, get; /* 指向线程控制块的指针 */ static rt_thread_t producer_tid = RT_NULL; static rt_thread_t consumer_tid = RT_NULL; /* 定义二值信号量 缓冲区空信号量 缓冲区满信号量 */ struct rt_semaphore sem_lock; struct rt_semaphore sem_empty, sem_full; /* 生产者线程入口 */ void producer_thread_entry(void *parameter) { int cnt = 0; /* 运行10次 */ while (cnt < 10) { /* 获取一个空位 */ rt_sem_take(&sem_empty, RT_WAITING_FOREVER); /* 修改array内容,上锁 */ rt_sem_take(&sem_lock, RT_WAITING_FOREVER); array[set % MAXSEM] = cnt + 1; rt_kprintf("the producer generates a number: %d\n", array[set % MAXSEM]); set++; rt_sem_release(&sem_lock); /* 释放一个满位 */ rt_sem_release(&sem_full); cnt++; /* 暂停一段时间 */ rt_thread_mdelay(20); } rt_kprintf("the producer exit!\n"); } /* 消费者线程入口 */ void consumer_thread_entry(void *parameter) { rt_uint32_t sum = 0; while (1) { /* 获取一个满位 */ rt_sem_take(&sem_full, RT_WAITING_FOREVER); /* 临界区,上锁进行操作 */ rt_sem_take(&sem_lock, RT_WAITING_FOREVER); sum += array[get % MAXSEM]; rt_kprintf("the consumer[%d] get a number: %d\n", (get % MAXSEM), array[get % MAXSEM]); get++; rt_sem_release(&sem_lock); /* 释放一个空位 */ rt_sem_release(&sem_empty); /* 生产者生产到10个数目,停止,消费者线程相应停止 */ if (get == 10) break; /* 暂停一段时间 生产速度大于消费速度 */ rt_thread_mdelay(50); } rt_kprintf("the consumer sum is: %d\n", sum); rt_kprintf("the consumer exit!\n"); } int producer_consumer(void) { set = 0; get = 0; /* 初始化3个信号量 */ rt_sem_init(&sem_lock, "lock", 1, RT_IPC_FLAG_FIFO); rt_sem_init(&sem_empty, "empty", MAXSEM, RT_IPC_FLAG_FIFO); rt_sem_init(&sem_full, "full", 0, RT_IPC_FLAG_FIFO); /* 创建生产者线程 */ producer_tid = rt_thread_create("producer", producer_thread_entry, RT_NULL, THREAD_STACK_SIZE, THREAD_PRIORITY - 1, THREAD_TIMESLICE); if (producer_tid != RT_NULL) { rt_thread_startup(producer_tid); } else { rt_kprintf("create thread producer failed"); return -1; } /* 创建消费者线程 */ consumer_tid = rt_thread_create("consumer", consumer_thread_entry, RT_NULL, THREAD_STACK_SIZE, THREAD_PRIORITY + 1, THREAD_TIMESLICE); if (consumer_tid != RT_NULL) { rt_thread_startup(consumer_tid); } else { rt_kprintf("create thread consumer failed"); return -1; } return 0; } /* 导出到 msh 命令列表中 */ MSH_CMD_EXPORT(producer_consumer, producer_consumer sample);