目录
1、消息队列
2、Posix消息队列
2.1、消息队列属性
2.2、mq_open()函数
2.3、mq_close()函数
2.4、mq_unlink()函数
2.5、mq_getattr()函数
2.6、mq_setattr()函数
2.7、mq_send()函数
2.8、mq_receive()函数
2.9、my_notify()函数
2.9.1、my_notify()函数的若干规则
2.10、消息队列限制
2.10.1、获取能够同时拥有打开着的消息队列的最大数目
2.10.2、获取消息的最大优先级
2.10.3、获取最大消息队列数
2.10.4、获取最大消息队列消息大小
3、System V消息队列
3.1、msgget()函数
3.2、msgsnd()函数
3.3、msgrcv()函数
3.4、msgctl()函数
3.5、消息队列限制
3.5.1、获取每个消息的最大字节数
3.5.2、获取系统范围的最大消息队列数
3.5.3、获取系统范围的最大消息数
3.5.4、获取任何一个消息队列上的最大字节数
1)对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。
2)当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似机制。
注:接收时,接收缓冲区小于实际的消息大小时,Posix消息队列返回EMSGSIZE,System V消息队列返回E2BIG,UDP recvmsg函数返回MSG_TRUNC标志。
注:Posix消息队列和System V消息队列都不具有的特性:向接收者准确的标识每条消息的发送者。
struct mq_attr { long int mq_flags; /* 消息队列标志. 0或o_NONBLOCK */ long int mq_maxmsg; /* 最大消息数. */ long int mq_msgsize; /* 最大消息大小. */ long int mq_curmsgs; /* 当前消息队列消息数量. */ };
1)mq_curmsgs只能由mq_getattr()函数获取,不能设置。
2)mq_maxmsg和mq_msgsize只能由mq_open()函数在创建消息队列时设置(不能指定其中一个而不指定另一个,即两者都得指定)。
3)mq_flags只能由mq_setattr()函数设置。
创建或打开一个消息队列。
#include <mqueue.h> mqd_t mq_open(const char *name, int oflag); mqt_t mq_open(const char *name,int oflag,mode_t mode,struct mq_attr *attr)
参数 name:消息队列名称(以一个’/’开头,名字中不能包含其他的’/’)
参数 oflag:O_RDONLY、O_WRONLY或O_RDWR之一,可以按位或上O_CREAT、O_EXCL或O_NONBLOCK。
参数 mode:指定模式(当创建新消息队列时需要)。
常量 | 说明 |
S_IRUSR | 用户(属主)读 |
S_IWUSR | 用户(属主)写 |
S_IRGRP | (属)组成员读 |
S_IWGRP | (属)组成员写 |
S_IROTH | 其他用户读 |
S_IWOTH | 其他用户写 |
参数 attr:当创建新消息队列时需要。如果为空指针,那就使用默认属性。
返回值:成功则返回消息队列描述符,出错则为-1。
注:创建的消息队列可以在/dev/mqueue中看到。
关闭一个已打开的消息队列。
#include <mqueue.h> int mq_close(mqd_t mqdes);
注:调用此函数后只是关闭消息队列,消息队列并不从系统中删除(删除消息队列需要使用mq_unlink函数)。进程终止时,其所有打开着的消息队列都关闭,就像调用了mq_close函数一样。
参数 mqdes:消息队列描述符。
返回值:成功返回0,出错则为-1。
删除消息队列。
#include <mqueue.h> int mq_unlink(const char *name);
参数 name:消息队列名字。
返回值:成功返回0,出错则为-1。
注:消息队列有一个保存其当前打开着描述符的引用计数器,因而本函数能够实现类似于unlink函数删除一个文件的机制:当一个消息队列的引用计数仍大于0时,其name就能删除,但是该队列的析构要等到最后一个mq_close发生时才进行。
获取消息队列属性。
#include <mqueue.h> int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
参数 mqdes:消息队列描述符。
参数 attr:消息队列属性。
返回值:成功则为0,出错为-1。
设置消息队列属性(只能设置消息队列的mq_flags属性)。
#include <mqueue.h> int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr,struct mq_attr *oldattr);
参数 mqdes:消息队列描述符。
参数 newattr:新设置的消息队列属性。
参数 oldattr:之前的消息队列属性。
返回值:成功则为0,出错为-1。
往消息队列放一条消息。
#include <mqueue.h> int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned int msg_prio);
参数 mqdes:消息队列描述符。
参数 msg_ptr:指向缓冲区指针。
参数 len:缓冲区大小。
参数 msg_prio:待发送消息的优先级,其值必须小于MQ_PRIO_MAX,否则立刻返回EMSGSIZE错误。
返回值:成功为0,出错则为-1.
未设置O_NONBLOCK属性 | 当消息队列满了时,调用此函数会阻塞,直至消息队列有空间存放消息 |
设置O_NONBLOCK属性 | 当消息队列满了时,调用此函数返回错误-1,设置ennor为EAGAIN |
注:如果应用不必使用优先级不同的消息,那就给mq_send指定值为0的优先级,给mq_receive指定一个空指针作为其最后一个参数。
从消息队列取出一条消息。
#include <mqueue.h> ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,size_t msg_len, unsigned int *msg_prio);
参数 mqdes:消息队里描述符。
参数 msg_ptr:指向缓冲区指针。
参数 len:缓冲区大小,不能小于消息队列属性mq_msgsize,否则立刻返回EMSGSIZE错误。
参数 msg_prio:返回消息的优先级。
返回值:成功则为消息中字节数,出错则为-1.
未设置O_NONBLOCK属性 | 当消息队列为空时,调用此函数会阻塞,直至消息队列有消息 |
设置O_NONBLOCK属性 | 当消息队列为空时,调用此函数返回错误-1,设置ennor为EAGAIN |
注:使用Posix消息队列的大多应用程序必须在打开某个消息队列后,调用mq_getattr确定最大消息大小,然后分配那样大小的读缓冲区。
允许异步事件通知事件。
#include <mqueue.h> int mq_notify(mqd_t mqdes, const struct sigevent* notification);
参数 mqdes:消息队列描述符。
参数 notification:sigevent结构体指针。
Posix消息队列容许 异步事件通知,以告知何时有一个消息放置到某个空消息队列中,这种通知有两种方式可以选择:
1)产生一个信号
2)创建一个线程来执行一个指定的函数
union sigval{ int sival_int; /*integer value*/ void *sival_ptr; /*pointer value*/ }; struct sigevent{ int sigev_notify; /*SIGEV_{NONE, SIGNAL, THREAD}*/ int sigev_signo; /*signal number if SIGEV_SIGNAL*/ union sigval sigev_value; void (*sigev_notify_function)(union sigval); pthread_attr_t *sigev_notify_attributes; };
返回值:成功则为0,若出错则为-1。
注:必须小心,保证从队列中读出消息之前(而不是之后)重新注册!!!以防止通知丢失。
1)如果notification参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。
2)如果notification参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。
3)任意时刻只有一个进程可以被注册为接收某个给定队列的通知(出错消息对应的是EBUSY)。
4)当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。
5)当前通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册。
最大消息队列数 | mq_mqxmsg成员 |
最大消息队列消息大小 | mq_msgsize成员 |
一个进程能够同时拥有打开着的消息队列的最大数目 | MQ_OPEN_MAX |
消息的最大优先级 | MQ_PRIO_MAX |
名称 | 获取方式 | Ubuntu测试值 |
MQ_OPEN_MAX | sysconf(_SC_MQ_OPEN_MAX) | -1 |
MQ_OPEN_MAX | mq_open | 9 |
MQ_PRIO_MAX | sysconf(_SC_MQ_PRIO_MAX) | 32768 |
MQ_PRIO_MAX | mq_send | 32768 |
mq_mqxmsg | mq_open | 65535 |
mq_msgsize | mq_open | 65535 |
#include <unistd.h> #include <stdio.h> #include <string.h> #include <mqueue.h> #include <fcntl.h> #include <sys/stat.h> #define MAX_NMESG 64*1024 int main() { char name[256] = {0}; mqd_t id[MAX_NMESG]; int i,j; for(i=0;i<MAX_NMESG;i++){ memset(name,0,sizeof(name)); snprintf(name,sizeof(name),"/tmp_%d",i); if((id[i] = mq_open(name,O_RDWR|O_CREAT|O_NONBLOCK,0666,NULL))==-1){ break; } } printf("open max=%d\n",i); for(j=0;j<i;j++){ mq_close(id[j]); memset(name,0,sizeof(name)); snprintf(name,sizeof(name),"/tmp_%d",j); mq_unlink(name); } return 0; }
#include <unistd.h> #include <stdio.h> #include <string.h> #include <mqueue.h> #include <fcntl.h> #include <sys/stat.h> #include <stdlib.h> #define MAX_PRIORITY 64*1024 int main() { int i,j; struct mq_attr attr; mqd_t mqdes = mq_open("/tmp",O_RDWR|O_CREAT|O_NONBLOCK,0666,NULL); printf("mqdes=%d\n",mqdes); if(mqdes<0) return -1; unsigned int msg_prio; mq_getattr(mqdes,&attr); char *buff = malloc(attr.mq_msgsize); char send[1]; for(i=0;i<MAX_PRIORITY;i++){ if((mq_send(mqdes,send,sizeof(send),i))==-1){ break ; } if((mq_receive(mqdes,buff,attr.mq_msgsize,&msg_prio))!=-1){ mq_getattr(mqdes,&attr); } } printf("max priority=%d\n",i); free(buff); mq_close(mqdes); mq_unlink("/tmp"); return 0; }
#include <unistd.h> #include <stdio.h> #include <string.h> #include <mqueue.h> #include <fcntl.h> #include <sys/stat.h> #define MAX_NMESG 64*1024 int main() { mqd_t mqdes ; int i,j; struct mq_attr attr; for(i=0;i<MAX_NMESG;i++){ attr.mq_maxmsg = i; attr.mq_msgsize = 1; if((mqdes = mq_open("/tmp",O_RDWR|O_CREAT|O_NONBLOCK,0666,NULL))==-1){ break; } mq_close(mqdes); mq_unlink("/tmp"); } printf("maxmsg max=%d\n",i); return 0; }
#include <unistd.h> #include <stdio.h> #include <string.h> #include <mqueue.h> #include <fcntl.h> #include <sys/stat.h> #define MAX_NMESG 64*1024 int main() { mqd_t mqdes ; int i,j; struct mq_attr attr; for(i=0;i<MAX_NMESG;i++){ attr.mq_maxmsg = 1; attr.mq_msgsize = i; if((mqdes = mq_open("/tmp",O_RDWR|O_CREAT|O_NONBLOCK,0666,NULL))==-1){ break; } mq_close(mqdes); mq_unlink("/tmp"); } printf("msgsize max=%d\n",i); return 0; }
对于系统中的每个消息队列,内核维护一个定义在<sys/msg.h>头文件的信息结构。
struct msqid_ds { struct ipc_perm msg_perm; /* Ownership and permissions */ time_t msg_stime; /* Time of last msgsnd(2) */ time_t msg_rtime; /* Time of last msgrcv(2) */ time_t msg_ctime; /* Time of last change */ unsigned long __msg_cbytes; /* Current number of bytes inqueue (nonstandard) */ msgqnum_t msg_qnum; /* Current number of messages in queue */ msglen_t msg_qbytes; /* Maximum number of bytes allowed in queue */ pid_t msg_lspid; /* PID of last msgsnd(2) */ pid_t msg_lrpid; /* PID of last msgrcv(2) */ };
用于创建一个新的消息队列或访问一个已存在的消息队列。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgget(key_t key, int msgflg);
参数 key:IPC键值,既可以是ftok的返回值,也可以是IPC_PRIVATE。
参数 msgflg:除了以下组合,还可以与IPC_CREAT或IPC_CREAT|IPC_EXCL按位或。
数字值(八进制) | 消息队列 | 说明 |
0400 | MSG_R | 由用户(属主)读 |
0200 | MSG_W | 由用户(属主)写 |
0040 | MSG_R>>3 | 由(属)组成员读 |
0020 | MSG_W>>3 | 由(属)组成员写 |
0004 | MSG_R>>6 | 由其他用户读 |
0002 | MSG_W>>6 | 由其他用户写 |
返回值:成功返回消息队列ID;出错返回-1
当创建一个新消息队列时,msqid_ds结构的如下成员被初始化。
1)msg_perm结构的uid和cuid成员被设置成当前进程的有效用户ID,gid和cgid成员被设置成当前进程的有效组ID。
2)oflag中的读写权限位存放在msg_perm.mode中。
3)msg_qnum、msg_lspid、msg_lrpid、msg_stime和msg_rtime被置为0.
4)msg_ctime被设置为当前时间。
5)msg_qbytes被设置成系统限制值。
往msgsnd上放置一个消息。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数 msgid:消息队列标识符。
参数 msgp:结构指针,具有如下模板,定义在<sys/msg.h>中,不过大多数应用并不使用msgbuf 结构这个定义,因为其数据量(1个字节)通常是不够的。大多应用定义自己的消息结构,其数据部分根据应用的需要定义。
struct msgbuf { long mtype; /* 消息类型,必须大于0 */ char mtext[1]; /* 消息数据 */ }
注:消息类型必须大于0,应为对于msgrcv函数来说,非正的消息类型用作特殊的指示器。
参数 msgsz:以字节为单位指定待发送消息的长度,这是位于长整型消息类型之后的用户自定义数据的长度,该长度可以为0。
参数 msgflg:可以是0或IPC_NOWAIT.IPC_NOWAIT标志使得msgsnd调用非阻塞:如果没有存放新消息的可用空间,该函数就马上返回。这个条件可能发生的情况包括:
1 | 在指定的队列中已有太多的字节(对应队列的msqid_ds结构中的msg_qbytes值) |
2 | 在系统范围存在太多的消息 |
返回值:成功返回0;出错返回-1,并设置错误码。
1)如果这两个条件中有一个存在,而且IPC_NOWAIT标志已指定,msgsnd就返回一个EAGAIN错误。
2)如果这两个条件中有一个存在,而且IPC_NOWAIT标志未指定,那么调用线程被投入睡眠,直到:
1 | 具备存放新消息的空间 |
2 | 由msqid标识的消息队列从系统中删除(这种情况返回一个EIDRM错误) |
3 | 调用线程被某个捕获的信号所中断(这种情况返回一个EINTR错误) |
从消息队列中读出一个消息。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,int msgflg);
参数 msgid:消息队列标识符。
参数 msgp:指定所接收消息的存放位置。
参数 msgsz:指定msgp指向的缓冲区中数据部分的大小。这是该函数能返回的最大数据量。该长度不包括长整型类型字段。
参数 msgtyp:指定希望从所给定的队列中读出什么样的消息。
0 | 返回该队列中的第一个消息 |
>0 | 返回其类型值为type的第一个消息 |
<0 | 返回其类型值小于或等于type参数的绝对值的消息类型值最小的第一个消息 |
参数 msgflg:0,IPC_NOWAIT,MSG_NOERROR。在没有消息可得的情况下,如果设置了IPC_NOWAIT,就立即返回一个ENOMSG错误,否则调用者被阻塞到下列某个事件发生为止:
1 | 有一个所请求类型的消息可获取 |
2 | 由msqid标识的消息队列被从系统中删除(这种情况下返回一个EIDRM错误) |
3 | 调用线程被某个捕获的信号所中断(这种情况下返回一个EINTR错误) |
当所接收消息的真正数据部分大于参数 msgsz时,如果设置了MSG_NOERROR,该函数就只是截短数据部分,而不是返回错误。否则,该函数返回E2BIG错误。
返回值:若成功则为读入缓冲区中数据的字节数,出错则为-1(不包括也通过参数 msgp返回的长整型消息类型所需的几个字节).
提供在一个消息队列上的各种控制。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数 msgid:消息队列标识符。
参数 cmd:
IPC_RMID | 从系统删除由参数 msgid指定的消息队列。当前在该队列上的任何消息都被丢弃,对于该命令,参数 buf被忽略。 |
IPC_SET | 给所指定的消息队列设置其msqid_ds结构的以下4个成员:msg_perm.uid、msg_perm.gid、msg_perm.mode、msg_qbytes.值来自参数 buf指向的结构中相应成员 |
IPC_STAT | 通过参数 buf,给调用者返回所指定消息队列对应的当前msqid_ds结构 |
参数 buf:根据cmd不同,意义不同。
返回值:成功返回0,失败返回-1.
名字 | 说明 | Ubuntu测试值 |
msgmax | 每个消息的最大字节数 | 8192 |
msgmni | 系统范围的最大消息队列数 | 32000 |
msgtql | 系统范围的最大消息数 | 16384 |
msgmnb | 任何一个消息队列上的最大字节数 | 16384 |
#include <unistd.h> #include <stdio.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define MAX_DATA 64*1024 struct mymesg { long type; char data[MAX_DATA]; }mesg; int main() { int msqid = msgget(IPC_PRIVATE,0666|IPC_CREAT); mesg.type = 1; for(int i=MAX_DATA;i>0;i-=1){ if(msgsnd(msqid,&mesg,i,0)==0){ printf("msgmax = %d\n",i); break; } } msgctl(msqid,IPC_RMID,NULL); return 0; }
#include <unistd.h> #include <stdio.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define MAX_NIDS 64*1024 int main() { int i,j; int qid[MAX_NIDS]; for(i=0;i<=MAX_NIDS;i++){ if((qid[i]=msgget(IPC_PRIVATE,0666|IPC_CREAT))==-1){ printf("msgmni = %d\n",i); break; } } for(j=0;j<=i;j++){ msgctl(qid[j],IPC_RMID,NULL); } return 0; }
#include <unistd.h> #include <stdio.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define MAX_NMESG 64*1024 #define MAX_DATA 64*1024 struct mymesg { long type; char data[MAX_DATA]; }mesg; int main() { int j; int msqid; mesg.type = 1; msqid = msgget(IPC_PRIVATE,0666|IPC_CREAT); for(j=0;j<=MAX_NMESG;j++){ if(msgsnd(msqid,&mesg,1,IPC_NOWAIT)!=0){ break; } } printf("%d\n",j); msgctl(msqid,IPC_RMID,NULL); return 0; }
#include <unistd.h> #include <stdio.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define MAX_NMESG 64*1024 #define MAX_NIDS 64*1024 #define MAX_DATA 64*1024 struct mymesg { long type; char data[MAX_DATA]; }mesg; int main() { int i,j; int qid[MAX_NIDS]; int msqid; mesg.type = 1; for(i=1;i<=MAX_NIDS;i*=2){ msqid = msgget(IPC_PRIVATE,0666|IPC_CREAT); for(j=0;j<=MAX_NMESG;j++){ if(msgsnd(msqid,&mesg,i,IPC_NOWAIT)!=0){ break; } } if(j==0){ msgctl(msqid,IPC_RMID,NULL); break; }else{ printf("%d-byte,num=%d,total=%d bytes\n",i,j,i*j); msgctl(msqid,IPC_RMID,NULL); } } return 0; }