Changeset 59

Show
Ignore:
Timestamp:
09/26/08 12:51:47 (2 months ago)
Author:
jbrindle
Message:

Update to send length across data channel for SHM backend. This does not use the same length passing method as the mq ba
ckend since we didn't have a buffer allocated for the message buffer.

This also changes all send and recv functions to take a size_t instead of an int for the length

and some minor makefile updates for debugging

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • trunk/libsipc/Makefile

    r36 r59  
    99  # if DEBUG is set, then explicitly compile with -O0 
    1010  AM_CFLAGS ?= -O0 -g3 -gdwarf-2 
    11   LDFLAGS ?= -g 
     11  LDFLAGS ?= -g3 
    1212else 
    1313  # if no DEBUG and no CFLAGS are already set, then compile with -O3 
  • trunk/libsipc/bindings/java/sipc.i

    r54 r59  
    122122        }; 
    123123 
    124         void send_data(int msg_len) { 
     124        void send_data(size_t msg_len) { 
    125125                BEGIN_EXCEPTION 
    126126                if (sipc_send_data(self->sipc, msg_len) < 0) { 
     
    146146        jbyteArray recv_data() { 
    147147                char *data = NULL; 
    148                 int len = 0; 
     148                size_t len = 0; 
    149149                jobject jresult = NULL; 
    150150                BEGIN_EXCEPTION 
  • trunk/libsipc/examples/mq_reader.c

    r24 r59  
    4343int main(void) 
    4444{ 
    45         int msglen = 0; 
     45        size_t msglen = 0; 
    4646        char *data = NULL; 
    4747 
  • trunk/libsipc/examples/shm_reader.c

    r24 r59  
    4444{ 
    4545        char *data = NULL; 
    46         int len = 0; 
     46        size_t len = 0; 
    4747 
    4848        /* Initialize the IPC handle */ 
  • trunk/libsipc/include/sipc/sipc.h

    r50 r59  
    7171 * This can only be called if sender was specified when sipc_init was called. 
    7272 * returns 0 on success, <0 on failure */ 
    73 int sipc_send_data(sipc_t *sipc, int msg_len); 
     73int sipc_send_data(sipc_t *sipc, size_t msg_len); 
    7474 
    7575/* Call to receive data.  Data will be allocated and filled and len 
    7676 * will be set to the length.  Returns 0 on success, <0 on failure. */ 
    77 int sipc_recv_data(sipc_t *sipc, char **data, int *len); 
     77int sipc_recv_data(sipc_t *sipc, char **data, size_t *len); 
    7878 
    7979/* Returns a pointer to the data contained within the IPC resource */ 
  • trunk/libsipc/src/mqueue_internal.c

    r48 r59  
    2020 
    2121#include <stdio.h> 
     22#include <stdlib.h> 
    2223#include <sys/types.h> 
    2324#include <sys/ipc.h> 
     
    2829#include "sipc/sipc.h" 
    2930#include "mqueue_internal.h" 
     31#include "sipc_internal.h" 
    3032 
    3133#define hidden __attribute__ ((visibility ("hidden"))) 
     
    9698 
    9799/* Change the capacity of the message queue to contain a user 
    98  * defined number bytes */ 
    99 int hidden mqueue_set_capacity(int msqid, int mqbytes) 
     100 * defined number of messages (not bytes) */ 
     101int hidden mqueue_set_capacity(int msqid, int mq_messages) 
    100102{ 
    101103        struct msqid_ds mqbuf; 
     
    105107 
    106108        /* Change the capacity of the queue */ 
    107         mqbuf.msg_qbytes = mqbytes
     109        mqbuf.msg_qbytes = mq_messages * (sizeof(struct shm_msgbuf) - sizeof(long))
    108110        if (msgctl(msqid, IPC_SET, &mqbuf) < 0) 
    109111                return -1; 
     
    115117 * If block is non-zero, this call will block if no such message exists in the 
    116118 * queue. */ 
    117 int hidden mqueue_get_msg_type(int msqid, int type, int block
     119int hidden mqueue_get_msg_type(sipc_t *sipc, int msqid, int type, int block, size_t *payload
    118120{ 
    119         struct msgbuf mbuf; 
     121        struct shm_msgbuf *mbuf; 
    120122        int flags = block ? 0 : IPC_NOWAIT; 
     123        int ret; 
    121124 
    122         while (msgrcv(msqid, &mbuf, sizeof(mbuf.mtext), type, flags) < 0) { 
     125        mbuf = calloc(1, sizeof(*mbuf)); 
     126        if (!mbuf) { 
     127                sipc_error(sipc, "Out of memory!\n"); 
     128                return -1; 
     129        }        
     130 
     131        while (msgrcv(msqid, mbuf, sizeof(*mbuf) - sizeof(long), type, flags) < 0) { 
    123132                if (errno == EINTR) { 
    124133                        continue; 
     
    129138        } 
    130139 
    131         return mbuf.mtype; 
     140        *payload = mbuf->payload; 
     141        ret = mbuf->mtype; 
     142 
     143        free(mbuf); 
     144 
     145        return ret; 
    132146} 
    133147 
    134148/* This exists solely for the shared memory implementation 
    135149 * to be able to send data-ready markers. 
    136  * If block is non-zero, this call will block if the message queue is full. */ 
    137 int hidden mqueue_send_msg_type(int msqid, int type, int block) 
     150 * If block is non-zero, this call will block if the message queue is full. * 
     151 * payload is currently used for packing the message length in the DATA_READY packet */ 
     152int hidden mqueue_send_msg_type(sipc_t *sipc, int msqid, int type, int block, size_t payload) 
    138153{ 
    139         struct msgbuf mbuf; 
     154        struct shm_msgbuf *mbuf; 
    140155        int flags = block ? 0 : IPC_NOWAIT; 
    141156 
    142         mbuf.mtype = type; 
    143         memset(mbuf.mtext, 1, sizeof(mbuf.mtext)); 
    144         while (msgsnd(msqid, &mbuf, sizeof(mbuf.mtext), flags) < 0) { 
     157        mbuf = calloc(1, sizeof(*mbuf)); 
     158        if (!mbuf) { 
     159                sipc_error(sipc, "Out of memory!\n"); 
     160                return -1; 
     161        }        
     162 
     163        mbuf->mtype = type; 
     164        mbuf->payload = payload; 
     165        while (msgsnd(msqid, mbuf, sizeof(*mbuf) - sizeof(long), flags) < 0) { 
    145166                if (errno != EINTR) { 
    146167                        return -1; 
     
    148169        } 
    149170 
     171        free(mbuf); 
     172 
    150173        return 0; 
    151174} 
  • trunk/libsipc/src/mqueue_internal.h

    r41 r59  
    2525void mqueue_disconnect(int msqid); 
    2626int mqueue_set_capacity(int msqid, int mqbytes); 
    27 int mqueue_get_msg_type(int msqid, int type, int block); 
    28 int mqueue_send_msg_type(int msqid, int type, int block); 
     27int mqueue_get_msg_type(sipc_t *sipc, int msqid, int type, int block, size_t *payload); 
     28int mqueue_send_msg_type(sipc_t *sipc, int msqid, int type, int block, size_t payload); 
    2929int mqueue_create(key_t key); 
    3030void mqueue_destroy_resource(key_t key); 
  • trunk/libsipc/src/shm_internal.c

    r48 r59  
    3636int sipc_shm_recv_done(sipc_t *sipc) 
    3737{ 
     38        size_t payload; 
     39 
    3840        if (!sipc || sipc->ipc_type != SIPC_SYSV_SHM) { 
    3941                errno = EINVAL; 
     
    4143        } 
    4244 
    43         if (mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK) < 0) { 
     45        if (mqueue_get_msg_type(sipc, sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK, &payload) < 0) { 
    4446                return -1; 
    4547        } 
    4648        sipc->len = 0; 
    47         if (mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK) < 0) { 
     49        if (mqueue_get_msg_type(sipc, sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK, &payload) < 0) { 
    4850                return -1; 
    4951        } 
  • trunk/libsipc/src/sipc.c

    r48 r59  
    148148} 
    149149 
    150 int sipc_send_data(sipc_t *sipc, int msg_len) 
     150int sipc_send_data(sipc_t *sipc, size_t msg_len) 
    151151{ 
    152152        if (!sipc) { 
     
    164164        } 
    165165        if (msg_len > sipc->data_size) { 
    166                 sipc_error(sipc, "sipc_send_data: cannot send buffer of size %d, can only receive %zd\n", msg_len, sipc->data_size); 
     166                sipc_error(sipc, "sipc_send_data: cannot send buffer of size %zu, can only receive %zd\n", msg_len, sipc->data_size); 
    167167                errno = ENOMEM; 
    168168                return -1; 
     
    172172} 
    173173 
    174 int sipc_recv_data(sipc_t *sipc, char **data, int *len) 
     174int sipc_recv_data(sipc_t *sipc, char **data, size_t *len) 
    175175{ 
    176176        if (data) 
  • trunk/libsipc/src/sipc_internal.h

    r48 r59  
    3535#define SIPC_END_XMIT   0x0c 
    3636 
     37/* msgbuf wrapper used for shm control messages */ 
     38struct shm_msgbuf { 
     39        long mtype; /* Control message type */ 
     40        size_t payload; /* Currently used for packing len */ 
     41};  
     42 
    3743/* Creation functions */ 
    3844int sipc_mqueue_create(sipc_t *sipc); 
     
    6268        int (*sipc_attach) (sipc_t *sipc); 
    6369        char *(*sipc_get_data_ptr) (sipc_t *sipc); 
    64         int (*sipc_send_data) (sipc_t *sipc, int msg_len); 
    65         int (*sipc_recv_data) (sipc_t *sipc, char **data, int *len); 
     70        int (*sipc_send_data) (sipc_t *sipc, size_t msg_len); 
     71        int (*sipc_recv_data) (sipc_t *sipc, char **data, size_t *len); 
    6672        void (*sipc_detach) (sipc_t *sipc); 
    6773        void (*_sipc_error) (const char *fmt, va_list ap); 
  • trunk/libsipc/src/sipc_mqueue.c

    r48 r59  
    5252/* Forward decls */ 
    5353static int get_max_msg(sipc_t *sipc); 
    54 static int mqueue_send_msg_len(sipc_t *sipc, int msg_len); 
     54static int mqueue_send_msg_len(sipc_t *sipc, size_t msg_len); 
    5555static int mqueue_send_end_xmit(sipc_t *sipc); 
    5656static int is_msg_len(struct msgbuf *mbuf); 
     
    136136/* Sends a message to the queue. 
    137137 * Returns 0 on success, -1 on error */ 
    138 int sipc_mqueue_send_data(sipc_t *sipc, int msg_len) 
     138int sipc_mqueue_send_data(sipc_t *sipc, size_t msg_len) 
    139139{ 
    140140        size_t max_packet_sz = SIPC_MQUEUE_MSG_SZ - sizeof(long); 
     
    179179} 
    180180 
    181 int sipc_mqueue_recv_data(sipc_t *sipc, char **data, int *len) 
     181int sipc_mqueue_recv_data(sipc_t *sipc, char **data, size_t *len) 
    182182{ 
    183183        int retv = -1; 
     
    363363} 
    364364 
    365 static int mqueue_send_msg_len(sipc_t *sipc, int len) 
     365static int mqueue_send_msg_len(sipc_t *sipc, size_t len) 
    366366{ 
    367367        struct msgbuf *mbuf; 
     
    380380        mbuf->mtype = SIPC_MSG_LEN; 
    381381 
    382         sprintf(mbuf->mtext, "%d", len); 
     382        sprintf(mbuf->mtext, "%zu", len); 
    383383 
    384384        while (msgsnd(sipc->msqid, mbuf, strlen(mbuf->mtext) + 1, 0) < 0) { 
  • trunk/libsipc/src/sipc_mqueue.h

    r41 r59  
    3232void sipc_mqueue_disconnect(sipc_t *sipc); 
    3333char *sipc_mqueue_get_data_ptr(sipc_t *sipc); 
    34 int sipc_mqueue_send_data(sipc_t *sipc, int msg_len); 
    35 int sipc_mqueue_recv_data(sipc_t *sipc, char **data, int *len); 
     34int sipc_mqueue_send_data(sipc_t *sipc, size_t msg_len); 
     35int sipc_mqueue_recv_data(sipc_t *sipc, char **data, size_t *len); 
    3636int sipc_mqueue_end_xmit(sipc_t *sipc); 
    3737void sipc_mqueue_destroy_handle(sipc_t *sipc); 
  • trunk/libsipc/src/sipc_shm.c

    r48 r59  
    158158 * before the reader is done with the data. 
    159159 */ 
    160 int sipc_shm_send_data(sipc_t *sipc, int msg_len) 
     160int sipc_shm_send_data(sipc_t *sipc, size_t msg_len) 
    161161{ 
    162162        int error; 
     
    164164        /* Send a DATA_READY marker */ 
    165165        sipc->len = msg_len; 
    166         if (mqueue_send_msg_type(sipc->s.msqid, SIPC_DATA_READY, MQ_BLOCK) < 0) { 
     166        if (mqueue_send_msg_type(sipc, sipc->s.msqid, SIPC_DATA_READY, MQ_BLOCK, msg_len) < 0) { 
    167167                error = errno; 
    168168                sipc_error(sipc, "Could not send DATA_READY marker\n"); 
     
    173173        /* Send a DATA_READING marker; this should block until the 
    174174           reader calls sipc_recv_data(). */ 
    175         if (mqueue_send_msg_type(sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK) < 0) { 
     175        if (mqueue_send_msg_type(sipc, sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK, 0) < 0) { 
    176176                error = errno; 
    177177                sipc_error(sipc, "Could not send DATA_READING marker\n"); 
     
    182182        /* Send a DATA_DONE marker; this should block until the reader 
    183183           calls sipc_shm_recv_done(). */ 
    184         if (mqueue_send_msg_type(sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK) < 0) { 
     184        if (mqueue_send_msg_type(sipc, sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK, 0) < 0) { 
    185185                error = errno; 
    186186                sipc_error(sipc, "Could not send DATA_DONE marker\n"); 
     
    196196 * message queue, if the handle is set to blocking mode (default). 
    197197 */ 
    198 int sipc_shm_recv_data(sipc_t *sipc, char **data, int *len) 
     198int sipc_shm_recv_data(sipc_t *sipc, char **data, size_t *len) 
    199199{ 
    200200        int mtype = 0; 
     
    204204 
    205205        /* Get a message from the side channel. */ 
    206         mtype = mqueue_get_msg_type(sipc->s.msqid, SIPC_ANY, block); 
     206        mtype = mqueue_get_msg_type(sipc, sipc->s.msqid, SIPC_ANY, block, len); 
    207207        if (mtype == SIPC_DATA_READY) { 
    208208                /* It is now OK to read shared memory */ 
    209209                *data = sipc->data; 
    210                 *len = sipc->data_size; 
    211210                return 0; 
    212211        } else { 
  • trunk/libsipc/src/sipc_shm.h

    r41 r59  
    2929void sipc_shm_disconnect(sipc_t *sipc); 
    3030char *sipc_shm_get_data_ptr(sipc_t *sipc); 
    31 int sipc_shm_send_data(sipc_t *sipc, int msg_len); 
    32 int sipc_shm_recv_data(sipc_t *sipc, char **data, int *len); 
     31int sipc_shm_send_data(sipc_t *sipc, size_t msg_len); 
     32int sipc_shm_recv_data(sipc_t *sipc, char **data, size_t *len); 
    3333int sipc_shm_end_xmit(sipc_t *sipc); 
    3434void sipc_shm_destroy_handle(sipc_t *sipc); 
  • trunk/libsipc/tests/Makefile

    r47 r59  
    33AM_LDFLAGS += -lsipc -lcunit -Wl,-rpath=$(TOP_BUILDDIR)/src -L$(TOP_BUILDDIR)/src 
    44TOP_BUILDDIR := $(shell pwd)/.. 
     5CFLAGS += -O0 -g3 -gdwarf-2 
    56 
    67all: 
  • trunk/libsipc/tests/mqueue.c

    r48 r59  
    174174        size_t message_len = sizeof(message); 
    175175        char *data = sipc_get_data_ptr(writer_ipc); 
    176         int recv_len; 
     176        size_t recv_len; 
    177177        char *recv_data; 
    178178 
     
    269269{ 
    270270        char *data = NULL; 
    271         int len = 0; 
     271        size_t len = 0; 
    272272 
    273273        while (!sipc_recv_data(reader_ipc, &data, &len)) { 
     
    328328{ 
    329329        char *data = NULL; 
    330         int len = 0; 
     330        size_t len = 0; 
    331331 
    332332        size_t j = 0; 
     
    427427{ 
    428428        char *data = NULL; 
    429         int len = 0, retv, error; 
     429        size_t len = 0; 
     430        int retv, error; 
    430431 
    431432        printf("This should pause for 2 seconds (default read)...\n"); 
  • trunk/libsipc/tests/shm.c

    r45 r59  
    251251        char *data = NULL; 
    252252        char recv_data[DATA_LEN]; 
    253         int len = 0; 
     253        size_t len = 0; 
    254254 
    255255        memset(recv_data, 0, sizeof(recv_data)); 
     
    308308        char *data = NULL; 
    309309        unsigned char recv_data[DATA_LEN]; 
    310         int len = 0, i; 
     310        size_t len = 0, i; 
    311311 
    312312        memset(recv_data, 0, sizeof(recv_data)); 
     
    352352        sleep(2); 
    353353        shm_data[0] = 0x42; 
    354         if (sipc_send_data(writer_ipc, 1) < 0) { 
     354        if (sipc_send_data(writer_ipc, DATA_LEN) < 0) { 
    355355                sipc_error(writer_ipc, "Unable to send data\n"); 
    356356                return; 
     
    358358        sleep(2); 
    359359        shm_data[0] = 0x43; 
    360         if (sipc_send_data(writer_ipc, 1) < 0) { 
     360        if (sipc_send_data(writer_ipc, DATA_LEN) < 0) { 
    361361                sipc_error(writer_ipc, "Unable to send data\n"); 
    362362                return; 
    363363        } 
    364364        shm_data[0] = 0x44; 
    365         if (sipc_send_data(writer_ipc, 1) < 0) { 
     365        if (sipc_send_data(writer_ipc, DATA_LEN) < 0) { 
    366366                sipc_error(writer_ipc, "Unable to send data\n"); 
    367367                return; 
     
    373373{ 
    374374        char *data = NULL; 
    375         int len = 0, retv, error; 
     375        size_t len = 0, retv, error; 
    376376 
    377377        printf("This should pause for 2 seconds (default read)...\n");