Changeset 45

Show
Ignore:
Timestamp:
06/30/08 11:57:16 (5 months ago)
Author:
jtang
Message:

Add sipc_ioctl() call to enable non-blocking read.
(This does not address interrupted read/writes, nor race condition during read, nor Java bindings.)

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libsipc/include/sipc/sipc.h

    r41 r45  
    4040#define SIPC_NUM_TYPES 2 
    4141 
     42/* SIPC behaviors, for sipc_ioctl() */ 
     43#define SIPC_BLOCK 0 
     44#define SIPC_NOBLOCK 1 
     45 
    4246/* sipc_open(char *key, int role, int ipc_type, size_t size) 
    4347 *      key             Unique key to identify IPC communication channel, 
     
    6266void sipc_unlink(const char *key, int ipc_type); 
    6367 
     68/* Modify the behavior of an open SIPC channel.  Returns 0 on success, 
     69 * <0 on failure. 
     70 */ 
     71int sipc_ioctl(sipc_t *sipc, int request); 
     72 
    6473/* Blocking call to send msglen bytes of data. 
    6574 * This can only be called if sender was specified when sipc_init was called. 
     
    6776int sipc_send_data(sipc_t *sipc, int msg_len); 
    6877 
    69 /* Blocking call to recieve data.  Data will be allocated and filled and 
    70  * len will be set to the length. Returns 0 on success, <0 on failure */ 
     78/* Call to receive data.  Data will be allocated and filled and len 
     79 * will be set to the length.  Returns 0 on success, <0 on failure. */ 
    7180int sipc_recv_data(sipc_t *sipc, char **data, int *len); 
    7281 
  • trunk/libsipc/man/man3/sipc_open.3

    r38 r45  
    1 .\" Copyright (C) 2006, 2007 Tresys Technology, LLC 
     1.\" Copyright (C) 2006 - 2008 Tresys Technology, LLC 
    22.\" This file is distributed according to the GNU Lesser General Public License 
    33.TH "SIPC_OPEN" 3 "2006-08-10" "Linux 2.6" "Linux Programmer's Manual" 
     
    4343This function returns a pointer to the new IPC handle on success, NULL on error. 
    4444.SH "SEE ALSO" 
    45 \fBsipc_close(3)\fR, \fBsipc_unlink(3)\fR 
     45\fBsipc_close(3)\fR, \fBsipc_unlink(3)\fR, \fBsipc_ioctl(3)\fR 
    4646.SH AUTHOR 
    4747David Windsor <dwindsor@tresys.com> 
  • trunk/libsipc/man/man3/sipc_recv_data.3

    r40 r45  
    2424.TP 
    2525.I len 
    26 This will hold the total number of bytes received 
     26Pointer to the total number of bytes received 
    2727.PP 
    2828If the IPC channel is a message queue, \fBsipc_recv_data\fR() 
     
    4141\fBsipc_recv_data\fR(), else undefined behavior results. 
    4242.P 
    43 This function blocks if no data is in the IPC channel. 
     43By default, this function blocks if no data is in the IPC channel. 
     44This can be changed to non-blocking via \fBsipc_ioctl()\fR. 
    4445.SH "RETURN VALUE" 
    45 This function returns 0 on success, \-1 on error. 
    46 Upon error, 
     46This function returns 0 on success.  On error, \-1 is returned, 
    4747.I *data 
    48 will be set to NULL and 
     48will be set to NULL, 
    4949.I msg_len 
    50 will be set to zero. 
     50will be set to zero, and \fIerrno\fR is set appropriately. 
     51.SH "ERRORS" 
     52.TP 
     53.B EAGAIN 
     54The channel is set to non-blocking and nothing was available to be 
     55read. 
     56.TP 
     57.B EBADF 
     58The channel is not open for receiving. 
     59.TP 
     60.B EIO 
     61Message was corrupted during transmission, and is now lost. 
    5162.SH "EXAMPLES" 
    5263.nf 
     
    6172.fi 
    6273.SH "SEE ALSO" 
    63 \fBsipc_send_data(3)\fR, \fBsipc_shm_recv_done(3)\fR 
     74\fBsipc_send_data(3)\fR, \fBsipc_shm_recv_done(3)\fR, \fBsipc_ioctl(3)\fR 
    6475.SH AUTHOR 
    6576David Windsor <dwindsor@tresys.com> 
  • trunk/libsipc/src/libsipc.map

    r44 r45  
    1111  local: *; 
    1212}; 
     13 
     14VERS_1.1{ 
     15  global: 
     16        sipc_ioctl; 
     17}; 
  • trunk/libsipc/src/sipc.c

    r41 r45  
    5353        new_sipc->ipc_type = ipc_type; 
    5454        new_sipc->role = role; 
     55        new_sipc->non_blocking = 0; 
    5556        new_sipc->len = len; 
    5657        new_sipc->msg_len = SIPC_MSGLEN_NOT_SET; 
     
    116117} 
    117118 
     119int sipc_ioctl(sipc_t *sipc, int request) 
     120{ 
     121        if (!sipc) { 
     122                errno = EINVAL; 
     123                return -1; 
     124        } 
     125 
     126        switch (request) { 
     127        case SIPC_BLOCK: 
     128                sipc->non_blocking = 0; 
     129                break; 
     130        case SIPC_NOBLOCK: 
     131                sipc->non_blocking = IPC_NOWAIT; 
     132                break; 
     133        default: 
     134                errno = EINVAL; 
     135                return -1; 
     136        } 
     137        return 0; 
     138} 
     139 
    118140char *sipc_get_data_ptr(sipc_t *sipc) 
    119141{ 
  • trunk/libsipc/src/sipc_internal.h

    r44 r45  
    7676        int ipc_type; 
    7777        int role; 
     78        int non_blocking;              /* set to IPC_NOWAIT if reads should be 
     79                                          nonblocking, 0 to block (default) */ 
    7880        union 
    7981        { 
  • trunk/libsipc/src/sipc_mqueue.c

    r44 r45  
    193193        int retv = -1, alloc_sz = 0, idx = 0, recv_sz; 
    194194        int msgtxt_sz = SIPC_MQUEUE_MSG_SZ - sizeof(struct msgbuf); 
    195  
    196         if (!sipc || !data || !len) 
     195        int error = 0; 
     196 
     197        if (data) 
     198                *data = NULL;          /* Pointer to a buffer created here */ 
     199        if (len) 
     200                *len = 0;              /* Bytes received so far */ 
     201        if (!sipc || !data || !len) { 
     202                error = EINVAL; 
    197203                goto err; 
    198  
    199         *data = NULL;                  /* Pointer to a buffer created here */ 
     204        } 
     205 
    200206        idx = 0;                       /* Current index into the buffer */ 
    201         *len = 0;                      /* Bytes received so far */ 
    202207 
    203208        if (sipc->role != SIPC_RECEIVER) { 
     209                error = EBADF; 
    204210                sipc_error(sipc, "sipc_send_data called without receiver flag enabled in IPC structure\n"); 
    205211                goto err; 
     
    212218        /* Receive and validate the length of message marker */ 
    213219        if (sipc->msg_len == SIPC_MSGLEN_NOT_SET) { 
    214                 if (msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, 0) < 0) { 
    215                         sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 
    216                         goto err; 
    217                 } 
    218                 if (is_end_xmit(mbuf)) 
     220                if (msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking) < 0) { 
     221                        error = errno; 
     222                        if (error == ENOMSG || error == EAGAIN) 
     223                                error = EAGAIN; 
     224                        else 
     225                                sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 
     226                        goto err; 
     227                } 
     228                if (is_end_xmit(mbuf)) { 
     229                        errno = 0; 
    219230                        return 0; 
     231                } 
    220232 
    221233                if (is_msg_len(mbuf)) { 
    222234                        sipc->msg_len = atoi(mbuf->mtext); 
    223235                        if (sipc->msg_len < 0) { 
     236                                error = EIO; 
    224237                                sipc_error(sipc, "libsipc: bad message length\n"); 
    225238                                goto err; 
    226239                        } 
    227240                        if (sipc->msg_len > sipc->len) { 
     241                                error = EIO; 
    228242                                sipc_error(sipc, "libsipc: cannot receive buffer of size %zd, can only receive %zd\n", 
    229243                                           sipc->msg_len, sipc->len); 
     
    231245                        } 
    232246                } else { 
     247                        error = EIO; 
    233248                        sipc_error(sipc, "libsipc: did not receive length of message, received %ld\n", mbuf->mtype); 
    234249                        goto err; 
     
    239254         * Stop when we have received the total number of bytes 
    240255         * or upon receiving the end of message marker. */ 
    241         if ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, 0)) < 0) { 
    242                 sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 
     256        if ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 
     257                error = errno; 
     258                if (error == ENOMSG || error == EAGAIN) 
     259                        error = EAGAIN; 
     260                else 
     261                        sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 
    243262                goto err; 
    244263        } 
     
    253272                char *tdata = realloc(*data, alloc_sz); 
    254273                if (!tdata) { 
     274                        error = errno; 
    255275                        sipc_error(sipc, "%s\n", "Out of memory"); 
    256                         free(*data); 
    257                         *data = NULL; 
    258                         *len = 0; 
    259276                        goto err; 
    260277                } 
     
    266283 
    267284                /* Receive the next message */ 
    268                 if ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, 0)) < 0) { 
    269                         sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 
    270                         free(*data); 
    271                         *data = NULL; 
    272                         *len = 0; 
     285                if ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 
     286                        error = errno; 
     287                        if (error == ENOMSG || error == EAGAIN) 
     288                                error = EAGAIN; 
     289                        else 
     290                                sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 
     291                        /* Note that it is possible for a non-blocking 
     292                         * SIPC to fail while receiving a chunked message, 
     293                         * in which case the entire message becomes lost. 
     294                         * This is probably not the best design possible. 
     295                         */ 
    273296                        goto err; 
    274297                } 
     
    278301      err: 
    279302        sipc->msg_len = SIPC_MSGLEN_NOT_SET;    /* Unset message length */ 
     303        errno = error; 
     304        if (retv) { 
     305                free(*data); 
     306                *data = NULL; 
     307                *len = 0; 
     308        } 
    280309        return retv; 
    281310} 
  • trunk/libsipc/src/sipc_shm.c

    r43 r45  
    188188/* Receive a DATA_READY marker from sender and pass the shm pointer to 
    189189 * the caller.  This function will block if there is nothing on the 
    190  * message queue
     190 * message queue, if the handle is set to blocking mode (default)
    191191 */ 
    192192int sipc_shm_recv_data(sipc_t *sipc, char **data, int *len) 
    193193{ 
    194194        int mtype = 0; 
    195  
    196         if (!sipc || !data || !len) 
    197                 return -1; 
    198  
    199         *data = NULL; 
    200         *len = 0; 
     195        int block; 
     196 
     197        if (data) 
     198                *data = NULL;          /* Pointer to a buffer created here */ 
     199        if (len) 
     200                *len = 0;              /* Bytes received so far */ 
     201        if (!sipc || !data || !len) { 
     202                errno = EINVAL; 
     203                return -1; 
     204        } 
     205        block = (sipc->non_blocking == 0 ? 1 : 0); 
    201206 
    202207        /* Get a message from the side channel. */ 
    203         mtype = mqueue_get_msg_type(sipc->s.msqid, SIPC_ANY, MQ_BLOCK); 
     208        mtype = mqueue_get_msg_type(sipc->s.msqid, SIPC_ANY, block); 
    204209        if (mtype == SIPC_DATA_READY) { 
    205210                /* It is now OK to read shared memory */ 
     
    208213                return 0; 
    209214        } else { 
    210                 sipc_error(sipc, "Received a message of unknown type\n"); 
    211                 return -1; 
    212         } 
    213 
     215                int error = errno; 
     216                if (error == ENOMSG || error == EAGAIN) 
     217                        errno = EAGAIN; 
     218                else { 
     219                        errno = error; 
     220                        sipc_error(sipc, "Received a message of unknown type\n"); 
     221                } 
     222                return -1; 
     223        } 
     224
  • trunk/libsipc/tests/mqueue.c

    r39 r45  
    4848static void do_parent_chunked(void); 
    4949static void do_child_chunked(void); 
     50static void do_blocked_parent(void); 
     51static void do_blocked_child(void); 
    5052static int verify_data(char *filename, char *recv_data); 
    5153static inline int is_end_xmit(char *data); 
     
    229231} 
    230232 
     233static void test_blocked_mqueue(void) 
     234{ 
     235        pid_t pid; 
     236        int status = 0; 
     237 
     238        switch ((pid = fork())) { 
     239        case -1: 
     240                fprintf(stderr, "fork: %s\n", strerror(errno)); 
     241                return; 
     242        case 0: 
     243                sipc_close(reader2_ipc); 
     244                do_blocked_child(); 
     245                exit(0); 
     246                break; 
     247        default: 
     248                do_blocked_parent(); 
     249                break; 
     250        } 
     251 
     252        wait(&status); 
     253} 
     254 
    231255CU_TestInfo mqueue_tests[] = { 
    232256        {"unforked mqueue", test_unforked_mqueue} 
     
    235259        , 
    236260        {"forked chunked mqueue", test_chunked_mqueue} 
     261        , 
     262        {"forked blocked mqueue", test_blocked_mqueue} 
    237263        , 
    238264        CU_TEST_INFO_NULL 
     
    341367} 
    342368 
     369/* 
     370 * Test blocking and non-blocking mode. 
     371 */ 
     372static void do_blocked_child(void) 
     373{ 
     374        char *shm_data = sipc_get_data_ptr(writer_ipc); 
     375        if (!shm_data) { 
     376                sipc_error(writer_ipc, "Unable to get pointer to data.\n"); 
     377                return; 
     378        } 
     379        sleep(2); 
     380        shm_data[0] = 0x42; 
     381        if (sipc_send_data(writer_ipc, 1) < 0) { 
     382                sipc_error(writer_ipc, "Unable to send data\n"); 
     383                return; 
     384        } 
     385        sleep(2); 
     386        shm_data[0] = 0x43; 
     387        if (sipc_send_data(writer_ipc, 1) < 0) { 
     388                sipc_error(writer_ipc, "Unable to send data\n"); 
     389                return; 
     390        } 
     391        shm_data[0] = 0x44; 
     392        if (sipc_send_data(writer_ipc, 1) < 0) { 
     393                sipc_error(writer_ipc, "Unable to send data\n"); 
     394                return; 
     395        } 
     396        sipc_close(writer_ipc); 
     397} 
     398 
     399static void do_blocked_parent(void) 
     400{ 
     401        char *data = NULL; 
     402        int len = 0, retv, error; 
     403 
     404        printf("This should pause for 2 seconds (default read)...\n"); 
     405        if (sipc_recv_data(reader_ipc, &data, &len) < 0) { 
     406                CU_FAIL("Error during first read"); 
     407        } 
     408        CU_ASSERT(len == 1 && data[0] == 0x42); 
     409        free(data); 
     410 
     411        printf("No blocking here.\n"); 
     412        CU_ASSERT(sipc_ioctl(reader_ipc, SIPC_NOBLOCK) == 0); 
     413        retv = sipc_recv_data(reader_ipc, &data, &len); 
     414        error = errno; 
     415        CU_ASSERT(retv == -1); 
     416        CU_ASSERT(data == NULL); 
     417        CU_ASSERT(len == 0); 
     418        CU_ASSERT(error == EAGAIN); 
     419 
     420        printf("This should pause for 3 seconds... (non-blocking read)\n"); 
     421        sleep(3); 
     422        if (sipc_recv_data(reader_ipc, &data, &len) < 0) { 
     423                CU_FAIL("Error during first read"); 
     424        } 
     425        CU_ASSERT(len == 1 && data[0] == 0x43); 
     426        free(data); 
     427 
     428        CU_ASSERT(sipc_ioctl(reader_ipc, -1) < 0); 
     429        CU_ASSERT(sipc_ioctl(reader_ipc, SIPC_BLOCK) == 0); 
     430 
     431        printf("This should pause for 1 second... (blocked read)\n"); 
     432        if (sipc_recv_data(reader_ipc, &data, &len) < 0) { 
     433                CU_FAIL("Error during first read"); 
     434        } 
     435        CU_ASSERT(len == 1 && data[0] == 0x44); 
     436        free(data); 
     437} 
     438 
    343439static int send_end_xmit(sipc_t *ipc) 
    344440{ 
  • trunk/libsipc/tests/shm.c

    r39 r45  
    4646static void do_binary_child(void); 
    4747static void do_binary_parent(void); 
     48static void do_blocked_child(void); 
     49static void do_blocked_parent(void); 
    4850static void send_end_xmit(sipc_t *sipc); 
    4951static int verify_data(char *filename, char *recv_data); 
     
    175177} 
    176178 
     179static void test_blocked_shm(void) 
     180{ 
     181        int status; 
     182        pid_t pid; 
     183 
     184        switch (pid = fork()) { 
     185        case -1: 
     186                fprintf(stderr, "fork: %s\n", strerror(errno)); 
     187                return; 
     188        case 0: 
     189                sipc_close(reader_ipc); 
     190                do_blocked_child(); 
     191                /* Exit here to prevent displaying results 2x */ 
     192                exit(0); 
     193                break; 
     194        default: 
     195                do_blocked_parent(); 
     196                break; 
     197        } 
     198 
     199        wait(&status); 
     200} 
     201 
    177202CU_TestInfo shm_tests[] = { 
    178203        {"forked shm", test_shm} 
    179204        , 
    180205        {"binary shm", test_binary_shm} 
     206        , 
     207        {"blocked shm", test_blocked_shm} 
    181208        , 
    182209        CU_TEST_INFO_NULL 
     
    313340} 
    314341 
     342/* 
     343 * Test blocking and non-blocking mode. 
     344 */ 
     345static void do_blocked_child(void) 
     346{ 
     347        char *shm_data = sipc_get_data_ptr(writer_ipc); 
     348        if (!shm_data) { 
     349                sipc_error(writer_ipc, "Unable to get pointer to data.\n"); 
     350                return; 
     351        } 
     352        sleep(2); 
     353        shm_data[0] = 0x42; 
     354        if (sipc_send_data(writer_ipc, 1) < 0) { 
     355                sipc_error(writer_ipc, "Unable to send data\n"); 
     356                return; 
     357        } 
     358        sleep(2); 
     359        shm_data[0] = 0x43; 
     360        if (sipc_send_data(writer_ipc, 1) < 0) { 
     361                sipc_error(writer_ipc, "Unable to send data\n"); 
     362                return; 
     363        } 
     364        shm_data[0] = 0x44; 
     365        if (sipc_send_data(writer_ipc, 1) < 0) { 
     366                sipc_error(writer_ipc, "Unable to send data\n"); 
     367                return; 
     368        } 
     369        sipc_close(writer_ipc); 
     370} 
     371 
     372static void do_blocked_parent(void) 
     373{ 
     374        char *data = NULL; 
     375        int len = 0, retv, error; 
     376 
     377        printf("This should pause for 2 seconds (default read)...\n"); 
     378        if (sipc_recv_data(reader_ipc, &data, &len) < 0) { 
     379                CU_FAIL("Error during first read"); 
     380        } 
     381        CU_ASSERT(len == DATA_LEN && data[0] == 0x42); 
     382        sipc_shm_recv_done(reader_ipc); 
     383 
     384        printf("No blocking here.\n"); 
     385        CU_ASSERT(sipc_ioctl(reader_ipc, SIPC_NOBLOCK) == 0); 
     386        retv = sipc_recv_data(reader_ipc, &data, &len); 
     387        error = errno; 
     388        CU_ASSERT(retv == -1); 
     389        CU_ASSERT(data == NULL); 
     390        CU_ASSERT(len == 0); 
     391        CU_ASSERT(error == EAGAIN); 
     392 
     393        printf("This should pause for 3 seconds... (non-blocking read)\n"); 
     394        sleep(3); 
     395        if (sipc_recv_data(reader_ipc, &data, &len) < 0) { 
     396                CU_FAIL("Error during first read"); 
     397        } 
     398        CU_ASSERT(len == DATA_LEN && data[0] == 0x43); 
     399        sipc_shm_recv_done(reader_ipc); 
     400 
     401        CU_ASSERT(sipc_ioctl(reader_ipc, -1) < 0); 
     402        CU_ASSERT(sipc_ioctl(reader_ipc, SIPC_BLOCK) == 0); 
     403 
     404        printf("This should pause for 1 second... (blocked read)\n"); 
     405        if (sipc_recv_data(reader_ipc, &data, &len) < 0) { 
     406                CU_FAIL("Error during first read"); 
     407        } 
     408        CU_ASSERT(len == DATA_LEN && data[0] == 0x44); 
     409        sipc_shm_recv_done(reader_ipc); 
     410} 
     411 
    315412static void send_end_xmit(sipc_t *sipc) 
    316413{