細說linux IPC(九):posix消息隊列
來源:程序員人生 發(fā)布時間:2015-01-12 08:39:37 閱讀次數(shù):4098次
【版權聲明:尊重原創(chuàng),轉載請保存出處:blog.csdn.net/shallnet 或 .../gentleliu,文章僅供學習交換,請勿用于商業(yè)用處】
消息隊列可以看做1系列消息組織成的鏈表,1個程序可以往這個鏈表添加消息,另外的程序可以從這個消息鏈表讀走消息。
- mq_open()函數(shù)打開或創(chuàng)建1個posix消息隊列。
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);
Link with -lrt.
參數(shù)name為posix IPC名字, 行將要被打開或創(chuàng)建的消息隊列對象,為了便于移植,需要指定為“/name”的格式。
參數(shù)oflag必須要有O_RDONLY(只讀)、標志O_RDWR(讀寫), O_WRONLY(只寫)之1,除此以外還可以指定O_CREAT(沒有該對象則創(chuàng)建)、O_EXCL(如果O_CREAT指定,但name不存在,就返回毛病),O_NONBLOCK(以非阻塞方式打開銷息隊列,在正常情況下mq_receive和mq_send函數(shù)會阻塞的地方,使用該標志打開的消息隊列會返回EAGAIN毛病)。
當操作1個新隊列時,使用O_CREAT標識,此時后面兩個參數(shù)需要被指定,參數(shù)mode為指定權限位,attr指定新創(chuàng)建隊列的屬性。
#include <mqueue.h>
int mq_close(mqd_t mqdes);
Link with -lrt.
關閉以后調用進程不在使用該描寫符,但消息隊列不會從系統(tǒng)中刪除,進程終止時,會自動關閉已打開的消息隊列,和調用mq_close1樣。參數(shù)為mq_open()函數(shù)返回的值。
刪除會馬上產(chǎn)生,即便該隊列的描寫符援用計數(shù)依然大于0。參數(shù)為mq_open()函數(shù)第1個參數(shù)。
- mq_setattr()函數(shù)和mq_getattr()函數(shù)分別設置和和獲得消息隊列屬性。
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
Link with -lrt.
參數(shù)mqdes為mq_open()函數(shù)返回的消息隊列描寫符。
參數(shù)attr、newattr、oldattr為消息隊列屬性結構體指針;
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
};
參數(shù)mq_flags在mq_open時被初始化(oflag參數(shù)),其值為0
或 O_NONBLOCK。
參數(shù)mq_maxmsg和mq_msgsize在mq_open時在參數(shù)attr中初始化設置,mq_maxmsg是指隊列的消息個數(shù)最大值;mq_msgsize為隊列每一個消息的最大值。
參數(shù)mq_curmsgs為當前隊列消息。
mq_getattr()函數(shù)把隊列當前屬性填入attr所指向的結構體。
mq_setattr()函數(shù)只能設置mq_flags屬性,另外的域會被自動疏忽,mq_maxmsg和mq_msgsize的設置需要在mq_open當中來完成,
參數(shù)oldattr會和函數(shù)mq_getattr函數(shù)中參數(shù)attr相同的值。
- mq_send() 函數(shù) 和mq_receive()函數(shù)分別用于向消息隊列放置和取走消息。
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
Link with -lrt.
參數(shù)msg_ptr為指向消息的指針。
msg_len為消息長度,該值不能大于屬性值中mq_msgsize的值。
msg_prio為優(yōu)先級,消息在隊列中將依照優(yōu)先級大小順序來排列消息。
如果消息隊列已滿,mq_send()函數(shù)將阻塞,直到隊列有可用空間再次允許放置消息或該調用被信號打斷;如果O_NONBLOCK被指定,mq_send()那末將不會阻塞,而是返回EAGAIN毛病。如果隊列空,mq_receive()函數(shù)將阻塞,直到消息隊列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那末將不會阻塞,而是返回EAGAIN毛病。
示例:
服務進程:
int sln_ipc_mq_loop(void)
{
mqd_t mqd;
struct mq_attr setattr, attr;
char *recvbuf = NULL;
unsigned int prio;
int recvlen;
setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG;
setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE;
mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr); //創(chuàng)建消息隊列并設置消息隊列屬性
if ((mqd < 0) && (errno != EEXIST)) {
fprintf(stderr, "mq_open: %s
", strerror(errno));
return ⑴;
}
if ((mqd < 0) && (errno == EEXIST)) { // 消息隊列存在則打開
mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR);
if (mqd < 0) {
fprintf(stderr, "mq_open: %s
", strerror(errno));
return ⑴;
}
}
if (mq_getattr(mqd, &attr) < 0) { //獲得消息隊列屬性
fprintf(stderr, "mq_getattr: %s
", strerror(errno));
return ⑴;
}
printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld
",
attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
recvbuf = malloc(attr.mq_msgsize); //為讀取消息隊列分配當前系統(tǒng)允許的每條消息的最大大小的內(nèi)存空間
if (NULL == recvbuf) {
return ⑴;
}
for (;;) {
recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio); //從消息隊列中讀取消息
if (recvlen < 0) {
fprintf(stderr, "mq_receive: %s
", strerror(errno));
continue;
}
printf("recvive length: %d, prio: %d, recvbuf: %s
", recvlen, prio, recvbuf);
}
return 0;
}
客戶進程:
int sln_ipc_mq_send(const char *sendbuf, int sendlen, int prio)
{
mqd_t mqd;
mqd = mq_open(SLN_IPC_MQ_NAME, O_WRONLY); //客戶進程打開銷息隊列
if (mqd < 0) {
fprintf(stderr, "mq_open: %s
", strerror(errno));
return ⑴;
}
if (mq_send(mqd, sendbuf, sendlen, prio) < 0) { //客戶進程網(wǎng)消息隊列中添加1條消息
fprintf(stderr, "mq_send: %s
", strerror(errno));
return ⑴;
}
return 0;
}
程序運行時,服務進程阻塞于mq_receive,客戶進程每發(fā)1條消息隊列,服務進程都會從mq_receive處返回,但不1定接收到的消息就是客戶進程最近發(fā)送的那1條消息,由于客戶進程往消息隊列中添加消息時會依照優(yōu)先級來排序,如果客戶進程同時向消息隊列添加多條消息,服務進程還未來得及讀取,那末當服務進程開始讀取的消息1定是優(yōu)先級最高的那條消息,而不是客戶進程最早發(fā)送的那1條消息。
我們將服務進程稍作修改來試1下:
int sln_ipc_mq_loop(void)
{
mqd_t mqd;
struct mq_attr setattr, attr;
char *recvbuf = NULL;
unsigned int prio;
int recvlen;
memset(&setattr, 0, sizeof(setattr));
setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG;
setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE;
mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr);
//mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, NULL);
if ((mqd < 0) && (errno != EEXIST)) {
fprintf(stderr, "mq_open: %s
", strerror(errno));
return ⑴;
}
if ((mqd < 0) && (errno == EEXIST)) { // name is exist
mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR);
if (mqd < 0) {
fprintf(stderr, "mq_open: %s
", strerror(errno));
return ⑴;
}
}
if (mq_getattr(mqd, &attr) < 0) {
fprintf(stderr, "mq_getattr: %s
", strerror(errno));
return ⑴;
}
printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld
",
attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
recvbuf = malloc(attr.mq_msgsize);
if (NULL == recvbuf) {
return ⑴;
}
sleep(10); //此處等待10秒,此時客戶進程1次性向消息隊列加入多條消息
for (;;) {
if (mq_getattr(mqd, &attr) < 0) {
fprintf(stderr, "mq_getattr: %s
", strerror(errno));
return ⑴;
}
printf("msgsize: %ld, curmsgs: %ld
", attr.mq_msgsize, attr.mq_curmsgs);
recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio);
if (recvlen < 0) {
fprintf(stderr, "mq_receive: %s
", strerror(errno));
continue;
}
printf("recvive-> prio: %d, recvbuf: %s
", prio, recvbuf);
sleep(1); //每秒處理1個消息
}
mq_close(mqd);
return 0;
}
服務進程先運行,然后客戶進程立即向消息隊列加入12消息,每條消息優(yōu)先級從1到12,,以后服務進程運行,程序運行以下:
# ./server
flags: 0, maxmsg: 10, msgsize: 1024, curmsgs: 0
msgsize: 1024, curmsgs: 10
recvive-> prio: 10, recvbuf: asdf
msgsize: 1024, curmsgs: 10
recvive-> prio: 11, recvbuf: 1234
msgsize: 1024, curmsgs: 10
recvive-> prio: 12, recvbuf: asdf
msgsize: 1024, curmsgs: 9
recvive-> prio: 9, recvbuf: 1234
msgsize: 1024, curmsgs: 8
recvive-> prio: 8, recvbuf: asdf
msgsize: 1024, curmsgs: 7
recvive-> prio: 7, recvbuf: 1234
msgsize: 1024, curmsgs: 6
recvive-> prio: 6, recvbuf: asdf
msgsize: 1024, curmsgs: 5
recvive-> prio: 5, recvbuf: 1234
msgsize: 1024, curmsgs: 4
recvive-> prio: 4, recvbuf: asdf
msgsize: 1024, curmsgs: 3
recvive-> prio: 3, recvbuf: 1234
msgsize: 1024, curmsgs: 2
recvive-> prio: 2, recvbuf: asdf
msgsize: 1024, curmsgs: 1
recvive-> prio: 1, recvbuf: 1234
msgsize: 1024, curmsgs: 0
可以看到,系統(tǒng)允許最大消息數(shù)量是10條,當客戶進程1次性加入12條消息時,客戶進程在加入最后兩條會阻塞在那里,直到服務進程取出消息以后,最后兩天消息才能順次加入到消息隊列。并且服務進程取出消息時從優(yōu)先級從高到低取出:10->11->12->9->8->...
->1
本節(jié)源碼下載:
點擊打開鏈接
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學習有所幫助,可以手機掃描二維碼進行捐贈