Changeset 48

Show
Ignore:
Timestamp:
06/30/08 17:37:24 (2 months ago)
Author:
jtang
Message:

Modified sipc_recv_data() and sipc_shm_recv_done() to be signal-safe.
sipc_recv_data() can now be [re-]invoked without loss of data.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libsipc/src/mqueue_internal.c

    r46 r48  
    120120        int flags = block ? 0 : IPC_NOWAIT; 
    121121 
    122         if (msgrcv(msqid, &mbuf, sizeof(mbuf.mtext), type, flags) < 0) { 
    123                 if (errno == ENOMSG) 
     122        while (msgrcv(msqid, &mbuf, sizeof(mbuf.mtext), type, flags) < 0) { 
     123                if (errno == EINTR) { 
     124                        continue; 
     125                } 
     126                if (errno == ENOMSG || errno == EAGAIN) 
    124127                        return 0; 
    125128                return -1; 
  • trunk/libsipc/src/shm_internal.c

    r41 r48  
    3636int sipc_shm_recv_done(sipc_t *sipc) 
    3737{ 
    38         if (!sipc || sipc->ipc_type != SIPC_SYSV_SHM) 
     38        if (!sipc || sipc->ipc_type != SIPC_SYSV_SHM) { 
     39                errno = EINVAL; 
    3940                return -1; 
     41        } 
    4042 
    41         mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK); 
    42         mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK); 
     43        if (mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK) < 0) { 
     44                return -1; 
     45        } 
     46        sipc->len = 0; 
     47        if (mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK) < 0) { 
     48                return -1; 
     49        } 
    4350        return 0; 
    4451} 
  • trunk/libsipc/src/sipc.c

    r47 r48  
    5454        new_sipc->role = role; 
    5555        new_sipc->non_blocking = 0; 
    56         new_sipc->len = len; 
    57         new_sipc->msg_len = SIPC_MSGLEN_NOT_SET; 
     56        new_sipc->data_size = len; 
     57        new_sipc->len = 0; 
     58        new_sipc->recv_len = 0; 
     59        new_sipc->recv_data = NULL; 
    5860 
    5961        /* call backend specific init to fill in function table */ 
  • trunk/libsipc/src/sipc_internal.h

    r45 r48  
    3434#define SIPC_MSG_LEN    0x09 
    3535#define SIPC_END_XMIT   0x0c 
    36  
    37 /* Invalid message length */ 
    38 #define SIPC_MSGLEN_NOT_SET -1 
    3936 
    4037/* Creation functions */ 
     
    9087        }; 
    9188 
    92         char *data; 
     89        char *data;                   /* Buffer for sending data */ 
    9390        struct msgbuf *mbuf; 
    94         size_t len; 
    95         size_t msg_len;                /* Length in bytes of the next message */ 
    96         size_t copied;                 /* Number of bytes xmitted so far */ 
     91        size_t data_size;              /* Size of the data buffer, set in sipc_open() */ 
     92        size_t len;                    /* Number of bytes xmitted so far, 
     93                                          or number of bytes received so far */ 
     94        size_t recv_len;               /* Number of bytes to be received */ 
     95        char *recv_data;               /* Buffer for receiving mqueue data */ 
    9796        struct sipc_func_table *funcs; 
    9897}; 
  • trunk/libsipc/src/sipc_mqueue.c

    r47 r48  
    5656static int is_msg_len(struct msgbuf *mbuf); 
    5757static int is_end_xmit(struct msgbuf *mbuf); 
    58 static size_t next_alloc_sz(int recv, int recv_sz, int max_packet_sz, int msg_len); 
    5958 
    6059int sipc_mqueue_init(sipc_t *sipc) 
     
    104103{ 
    105104        /* Allocate data member */ 
    106         sipc->data = calloc(1, sipc->len); 
     105        sipc->data = calloc(1, sipc->data_size); 
    107106        if (!sipc->data) { 
    108107                sipc_error(sipc, "Out of memory\n"); 
     
    182181int sipc_mqueue_recv_data(sipc_t *sipc, char **data, int *len) 
    183182{ 
    184         int retv = -1, alloc_sz = 0, idx = 0, recv_sz
    185         int msgtxt_sz = SIPC_MQUEUE_MSG_SZ - sizeof(struct msgbuf)
     183        int retv = -1
     184        size_t idx, recv_sz
    186185        int error = 0; 
    187186 
    188         if (data) 
    189                 *data = NULL;          /* Pointer to a buffer created here */ 
    190         if (len) 
    191                 *len = 0;              /* Bytes received so far */ 
    192         if (!sipc || !data || !len) { 
    193                 error = EINVAL; 
    194                 goto err; 
    195         } 
    196  
    197187        idx = 0;                       /* Current index into the buffer */ 
    198  
    199         if (sipc->role != SIPC_RECEIVER) { 
    200                 error = EBADF; 
    201                 sipc_error(sipc, "sipc_send_data called without receiver flag enabled in IPC structure\n"); 
    202                 goto err; 
    203         } 
    204188 
    205189        struct msgbuf *mbuf = sipc->mbuf; 
     
    208192 
    209193        /* Receive and validate the length of message marker */ 
    210         if (sipc->msg_len == SIPC_MSGLEN_NOT_SET) { 
    211                 if (msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking) < 0) { 
     194        if (sipc->recv_len == 0) { 
     195                while (msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking) < 0) { 
    212196                        error = errno; 
     197                        if (error == EINTR) { 
     198                                continue; 
     199                        } 
    213200                        if (error == ENOMSG || error == EAGAIN) 
    214201                                error = EAGAIN; 
     
    223210 
    224211                if (is_msg_len(mbuf)) { 
    225                         sipc->msg_len = atoi(mbuf->mtext); 
    226                         if (sipc->msg_len < 0) { 
     212                        errno = 0; 
     213                        sipc->recv_len = (size_t) strtoul(mbuf->mtext, NULL, 10); 
     214                        if (errno != 0) { 
    227215                                error = EIO; 
    228216                                sipc_error(sipc, "libsipc: bad message length\n"); 
    229217                                goto err; 
    230218                        } 
    231                         if (sipc->msg_len > sipc->len) { 
     219                        if (sipc->recv_len > sipc->data_size) { 
    232220                                error = EIO; 
    233221                                sipc_error(sipc, "libsipc: cannot receive buffer of size %zd, can only receive %zd\n", 
    234                                            sipc->msg_len, sipc->len); 
     222                                           sipc->recv_len, sipc->data_size); 
    235223                                goto err; 
    236224                        } 
     
    240228                        goto err; 
    241229                } 
     230                sipc->len = 0; 
     231        } 
     232 
     233        if (sipc->recv_len == 0) { 
     234                /* special case if there is no data to be received: *data 
     235                   needs to point to a buffer, but a malloc() of 0 is not 
     236                   defined, so allocate a single byte for it. */ 
     237                if ((sipc->recv_data = malloc(1)) == NULL) { 
     238                        error = errno; 
     239                        sipc_error(sipc, "%s\n", "Out of memory"); 
     240                        goto err; 
     241                } 
    242242        } 
    243243 
     
    245245         * Stop when we have received the total number of bytes 
    246246         * or upon receiving the end of message marker. */ 
    247         if ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 
     247        while ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 
    248248                error = errno; 
     249                if (error == EINTR) { 
     250                        continue; 
     251                } 
    249252                if (error == ENOMSG || error == EAGAIN) 
    250253                        error = EAGAIN; 
     
    255258 
    256259        while (!is_end_xmit(mbuf)) { 
    257                 *len += recv_sz; 
    258  
    259                 /* If this isn't the last packet in the message, 
    260                  * resize to +1 message */ 
    261                 alloc_sz += next_alloc_sz(*len, recv_sz, msgtxt_sz, sipc->msg_len); 
    262  
    263                 char *tdata = realloc(*data, alloc_sz); 
    264                 if (!tdata) { 
     260                size_t amount_to_copy = sipc->recv_len - sipc->len > recv_sz ? recv_sz : sipc->recv_len - sipc->len; 
     261                if (amount_to_copy != 0) { 
     262                        char *tdata = realloc(sipc->recv_data, sipc->len + amount_to_copy); 
     263                        if (!tdata) { 
     264                                error = errno; 
     265                                sipc_error(sipc, "%s\n", "Out of memory"); 
     266                                goto err; 
     267                        } 
     268                        sipc->recv_data = tdata; 
     269 
     270                        /* memset(*data+idx, 0x0, recv_sz); */ 
     271                        memcpy(sipc->recv_data + sipc->len, mbuf->mtext, amount_to_copy); 
     272                        sipc->len += amount_to_copy; 
     273                } 
     274 
     275                /* Receive the next message */ 
     276                while ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 
    265277                        error = errno; 
    266                         sipc_error(sipc, "%s\n", "Out of memory"); 
    267                         goto err; 
    268                 } 
    269                 *data = tdata; 
    270  
    271                 /* memset(*data+idx, 0x0, recv_sz); */ 
    272                 memcpy(*data + idx, mbuf->mtext, recv_sz); 
    273                 idx += recv_sz; 
    274  
    275                 /* Receive the next message */ 
    276                 if ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 
    277                         error = errno; 
     278                        if (error == EINTR) { 
     279                                continue; 
     280                        } 
    278281                        if (error == ENOMSG || error == EAGAIN) 
    279282                                error = EAGAIN; 
    280283                        else 
    281284                                sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 
    282                         /* Note that it is possible for a non-blocking 
    283                          * SIPC to fail while receiving a chunked message, 
    284                          * in which case the entire message becomes lost. 
    285                          * This is probably not the best design possible. 
    286                          */ 
    287285                        goto err; 
    288286                } 
    289287        } 
    290288 
     289        *data = sipc->recv_data; 
     290        *len = sipc->len; 
     291        sipc->recv_len = 0; 
     292        sipc->len = 0; 
     293        sipc->recv_data = NULL; 
    291294        retv = 0; 
     295 
    292296      err: 
    293         sipc->msg_len = SIPC_MSGLEN_NOT_SET;    /* Unset message length */ 
     297        if (retv && error != EAGAIN) { 
     298                free(sipc->recv_data); 
     299                sipc->recv_data = NULL; 
     300                sipc->len = 0; 
     301                sipc->recv_len = 0; 
     302        } 
    294303        errno = error; 
    295         if (retv) { 
    296                 free(*data); 
    297                 *data = NULL; 
    298                 *len = 0; 
    299         } 
    300304        return retv; 
    301305} 
     
    408412        return 0; 
    409413} 
    410  
    411 /* Determine how much memory to allocate for receiving a packet. 
    412  * A packet should not contain extra padding. 
    413  * Returns the size of the next allocation on success, -1 on failure. */ 
    414 static size_t next_alloc_sz(int need, int recv_sz, int max_packet_sz, int msg_len) 
    415 { 
    416         if (need < 0 || max_packet_sz <= 0 || msg_len <= 0) 
    417                 return -1; 
    418  
    419         if (need > msg_len) 
    420                 return max_packet_sz; 
    421  
    422         return recv_sz; 
    423 } 
  • trunk/libsipc/src/sipc_shm.c

    r47 r48  
    6767 
    6868        /* Create the shm segment */ 
    69         sipc->s.shmid = shmget(sipc->key, sipc->len, SHM_CPERMS); 
     69        sipc->s.shmid = shmget(sipc->key, sipc->data_size, SHM_CPERMS); 
    7070        if (sipc->s.shmid < 0) { 
    7171                sipc_error(sipc, "shmget: %s\n", strerror(errno)); 
     
    9999        /* Get the (existing) shm segment */ 
    100100        flags = sipc->role == SIPC_SENDER ? SHM_SPERMS : SHM_RPERMS; 
    101         sipc->s.shmid = shmget(sipc->key, sipc->len, flags); 
     101        sipc->s.shmid = shmget(sipc->key, sipc->data_size, flags); 
    102102        if (sipc->s.shmid < 0) { 
    103103                sipc_error(sipc, "shmget: %s\n", strerror(errno)); 
     
    163163 
    164164        /* Send a DATA_READY marker */ 
     165        sipc->len = msg_len; 
    165166        if (mqueue_send_msg_type(sipc->s.msqid, SIPC_DATA_READY, MQ_BLOCK) < 0) { 
    166167                error = errno; 
     
    207208                /* It is now OK to read shared memory */ 
    208209                *data = sipc->data; 
    209                 *len = sipc->len
     210                *len = sipc->data_size
    210211                return 0; 
    211212        } else { 
  • trunk/libsipc/tests/mqueue.c

    r47 r48  
    330330        int len = 0; 
    331331 
    332         size_t total_read = 0, j = 0; 
    333         while (!sipc_recv_data(reader2_ipc, &data, &len)) { 
    334                 CU_ASSERT_PTR_NOT_NULL_FATAL(data); 
    335                 CU_ASSERT_FATAL(len > 0); 
    336  
    337                 size_t i; 
    338                 for (i = 0; i < len; i++) { 
    339                         if (total_read % 512 == 0) { 
    340                                 j++; 
    341                         } 
    342                         CU_ASSERT(data[i] == j); 
    343                         total_read++; 
     332        size_t j = 0; 
     333        CU_ASSERT(sipc_recv_data(reader2_ipc, &data, &len) == 0); 
     334        CU_ASSERT_PTR_NOT_NULL_FATAL(data); 
     335        CU_ASSERT_FATAL(len == BIG_DATA_LEN); 
     336 
     337        size_t i; 
     338        for (i = 0; i < len; i++) { 
     339                if (i % 512 == 0) { 
     340                        j++; 
    344341                } 
    345  
    346                 free(data); 
    347                 if (total_read >= BIG_DATA_LEN) { 
    348                         return; 
     342                CU_ASSERT(data[i] == j); 
     343        } 
     344        free(data); 
     345 
     346        CU_ASSERT(sipc_recv_data(reader2_ipc, &data, &len) == 0); 
     347        CU_ASSERT_PTR_NOT_NULL_FATAL(data); 
     348        CU_ASSERT_FATAL(len == BIG_DATA_LEN - 1); 
     349 
     350        j = 42; 
     351        for (i = 0; i < len; i++) { 
     352                CU_ASSERT(data[i] == j); 
     353                j++; 
     354                if (i % 120 == 0) { 
     355                        j = 0; 
    349356                } 
    350357        } 
    351         CU_FAIL("Did not receive as much data as expected."); 
     358        free(data); 
    352359} 
    353360 
     
    366373                } 
    367374                data[i] = j; 
     375        } 
     376 
     377        /* Send data, then end of xmit */ 
     378        CU_ASSERT(sipc_send_data(writer2_ipc, i) == 0); 
     379 
     380        /* This time send data that does not nicely fit on a word boundary */ 
     381        j = 42; 
     382        for (i = 0; i < BIG_DATA_LEN - 1; i++) { 
     383                data[i] = j++; 
     384                if (i % 120 == 0) { 
     385                        j = 0; 
     386                } 
    368387        } 
    369388