queue.h
#ifndef __QUEUE_H__ #define __QUEUE_H__ /* * -lpthread * 编译时需要线程库 */ #include <pthread.h> /* * https://www.kernel.org/doc/man-pages/ * https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html */ struct queue_buf_t { char *data_buf; int item_num; int item_size; int item_cnt; int wr; int rd; pthread_mutex_t mtx; /* * 数据成功入队后,触发pop_cond,去唤醒由于对空队列进行pop操作被阻塞的线程 */ pthread_cond_t pop_cond; /* * 数据成功出队后,触发push_cond,去唤醒由于满空队列进行push操作被阻塞的线程 */ pthread_cond_t push_cond; }; int queue_buf_item_num(struct queue_buf_t *queue); int queue_buf_item_cnt(struct queue_buf_t *queue); int queue_buf_push(struct queue_buf_t *queue, const void *item); int queue_buf_push_wait(struct queue_buf_t *queue, const void *item, unsigned int timeout_ms); int queue_buf_pop(struct queue_buf_t *queue, void *item); int queue_buf_pop_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms); int queue_buf_get(struct queue_buf_t *queue, void *item); int queue_buf_get_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms); struct queue_buf_t *queue_buf_alloc(int item_num, int item_size); void queue_buf_free(struct queue_buf_t *queue_buf); #endif
queue.c
/* * Copyright (C) 2021, 2021 huohongpeng * Author: huohongpeng <1045338804@qq.com> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Change logs: * Date Author Notes * 2021-05-15 huohongpeng 首次添加 * 2021-05-17 修改tm_to_ns()在32bit平台溢出问题 * 2021-05-22 1.将队列元素从long类型修改为一个大小可调整的缓冲区. * 2.队列大小又新变量进行记录,不在使用rd和wr进行计算,避免浪费一个无用的空间 */ #include "queue.h" #include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h> #include <errno.h> static long long tm_to_ns(struct timespec tm) { long long ret = tm.tv_sec; ret = ret * 1000000000 + tm.tv_nsec; return ret; } static struct timespec ns_to_tm(long long ns) { struct timespec tm; long long tmp; tmp = ns / 1000000000; tm.tv_sec = tmp; tm.tv_nsec = ns - (tmp * 1000000000); return tm; } int queue_buf_item_num(struct queue_buf_t *queue) { pthread_mutex_lock(&queue->mtx); int num = queue->item_num; pthread_mutex_unlock(&queue->mtx); return num; } int queue_buf_item_cnt(struct queue_buf_t *queue) { pthread_mutex_lock(&queue->mtx); int cnt = queue->item_cnt; pthread_mutex_unlock(&queue->mtx); return cnt; } int queue_buf_push(struct queue_buf_t *queue, const void *item) { if (!queue) { return -1; } int ret = 0; pthread_mutex_lock(&queue->mtx); if (queue->item_cnt == queue->item_num) { ret = -1; } if (ret == 0) { char *data = queue->data_buf + queue->item_size * queue->wr; memcpy(data, item, queue->item_size); queue->wr++; queue->wr %= queue->item_num; queue->item_cnt++; } pthread_mutex_unlock(&queue->mtx); if (ret == 0) { /* * 队列中有数据了,通知其他被阻塞的线程可以读数据 */ pthread_cond_signal(&queue->pop_cond); } return ret; } int queue_buf_push_wait(struct queue_buf_t *queue, const void *item, unsigned int timeout_ms) { if (!queue) { return -1; } int ret = 0; struct timespec start_tm; struct timespec end_tm; clock_gettime(CLOCK_MONOTONIC, &start_tm); long long tmp = timeout_ms; printf("tmp: %lld\n", tmp); end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000); printf("st: %lld\n", tm_to_ns(start_tm)); printf("et: %lld\n", tm_to_ns(end_tm)); printf("dt: %lld\n", tm_to_ns(end_tm) - tm_to_ns(start_tm)); pthread_mutex_lock(&queue->mtx); while (queue->item_cnt == queue->item_num) { //printf("2tmp: %lld\n", tmp); /* * 队列为满需要等待push_cond有效 */ if (pthread_cond_timedwait(&queue->push_cond, &queue->mtx, &end_tm) == ETIMEDOUT) { /* * 如果超时则退出等待 */ ret = -1; break; } } if (ret == 0) { char *data = queue->data_buf + queue->item_size * queue->wr; memcpy(data, item, queue->item_size); queue->wr++; queue->wr %= queue->item_num; queue->item_cnt++; } pthread_mutex_unlock(&queue->mtx); if (ret == 0) { pthread_cond_signal(&queue->pop_cond); } return ret; } int queue_buf_pop(struct queue_buf_t *queue, void *item) { if (!queue) { return -1; } int ret = 0; pthread_mutex_lock(&queue->mtx); if (queue->item_cnt == 0) { ret = -1; } if (ret == 0) { char *data = queue->data_buf + queue->item_size * queue->rd; memcpy(item, data, queue->item_size); queue->rd++; queue->rd %= queue->item_num; queue->item_cnt--; } pthread_mutex_unlock(&queue->mtx); if (ret == 0) { /* * 通知其他线程队列已经有空间 */ pthread_cond_signal(&queue->push_cond); } return ret; } int queue_buf_pop_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms) { if (!queue) { return -1; } int ret = 0; struct timespec start_tm; struct timespec end_tm; clock_gettime(CLOCK_MONOTONIC, &start_tm); long long tmp = timeout_ms; end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000); pthread_mutex_lock(&queue->mtx); while (queue->item_cnt == 0) { /* * 队列为空需要等待pop_cond有效 */ if (pthread_cond_timedwait(&queue->pop_cond, &queue->mtx, &end_tm) == ETIMEDOUT) { /* * 如果超时则退出等待 */ ret = -1; break; } } if (ret == 0) { char *data = queue->data_buf + queue->item_size * queue->rd; memcpy(item, data, queue->item_size); queue->rd++; queue->rd %= queue->item_num; queue->item_cnt--; } pthread_mutex_unlock(&queue->mtx); if (ret == 0) { /* * 通知其他线程队列已经有空间 */ pthread_cond_signal(&queue->push_cond); } return ret; } int queue_buf_get(struct queue_buf_t *queue, void *item) { if (!queue) { return -1; } int ret = 0; pthread_mutex_lock(&queue->mtx); if (queue->item_cnt == 0) { ret = -1; } if (ret == 0) { char *data = queue->data_buf + queue->item_size * queue->rd; memcpy(item, data, queue->item_size); } pthread_mutex_unlock(&queue->mtx); return ret; } int queue_buf_get_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms) { if (!queue) { return -1; } int ret = 0; struct timespec start_tm; struct timespec end_tm; clock_gettime(CLOCK_MONOTONIC, &start_tm); long long tmp = timeout_ms; end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000); pthread_mutex_lock(&queue->mtx); while (queue->item_cnt == 0) { /* * 队列为空需要等待pop_cond有效 */ if (pthread_cond_timedwait(&queue->pop_cond, &queue->mtx, &end_tm) == ETIMEDOUT) { /* * 如果超时则退出等待 */ ret = -1; break; } } if (ret == 0) { char *data = queue->data_buf + queue->item_size * queue->rd; memcpy(item, data, queue->item_size); } pthread_mutex_unlock(&queue->mtx); return ret; } struct queue_buf_t *queue_buf_alloc(int item_num, int item_size) { struct queue_buf_t *queue; char *p; p = (char *)malloc(sizeof(struct queue_buf_t) + item_num * item_size); if (!p) { return NULL; } queue = (struct queue_buf_t *)p; memset(queue, 0x00, sizeof(struct queue_buf_t)); queue->data_buf = (char *)(p + sizeof(struct queue_buf_t)); queue->item_num = item_num; queue->item_size = item_size; pthread_mutex_init(&queue->mtx, NULL); pthread_condattr_t attr; pthread_condattr_init(&attr); #if 0 clockid_t clock_id; pthread_condattr_getclock(&attr, &clock_id); printf("clock_id: %d\n", clock_id); #endif /* * pthread_cond_timedwait()默认使用的是CLOCK_REALTIME, * CLOCK_REALTIME容易受系统影响,比如校时操作 * 所以条件变量使用的时钟改为CLOCK_MONOTONIC * 参考:https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html */ pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); pthread_cond_init(&queue->push_cond, &attr); pthread_cond_init(&queue->pop_cond, &attr); pthread_condattr_destroy(&attr); return queue; } void queue_buf_free(struct queue_buf_t *queue) { if (queue) { pthread_mutex_destroy(&queue->mtx); pthread_cond_destroy(&queue->pop_cond); pthread_cond_destroy(&queue->push_cond); free(queue); } } void test(void) { struct queue_buf_t *q = queue_buf_alloc(20, sizeof(long long)); long long l = 9999999999*100; int i; for (i = 0; i < 25; i++) { long long t = l + i; int ret = queue_buf_push_wait(q, &t, 1000); printf("push ret: %d\n", ret); printf("num: %d\n", queue_buf_item_num(q)); printf("cnt: %d\n", queue_buf_item_cnt(q)); } for (i = 0; i < 25; i++) { long long t = 0; int ret = queue_buf_pop_wait(q, &t, 500); printf("pop ret: %d, data: %lld\n", ret, t); printf("num: %d\n", queue_buf_item_num(q)); printf("cnt: %d\n", queue_buf_item_cnt(q)); } for (i = 0; i < 25; i++) { long long t = 0; int ret = queue_buf_get_wait(q, &t, 100); printf("get ret: %d, data: %lld\n", ret, t); printf("num: %d\n", queue_buf_item_num(q)); printf("cnt: %d\n", queue_buf_item_cnt(q)); } }