Changeset 48
- Timestamp:
- 06/30/08 17:37:24 (2 months ago)
- Files:
-
- trunk/libsipc/src/mqueue_internal.c (modified) (1 diff)
- trunk/libsipc/src/shm_internal.c (modified) (1 diff)
- trunk/libsipc/src/sipc.c (modified) (1 diff)
- trunk/libsipc/src/sipc_internal.h (modified) (2 diffs)
- trunk/libsipc/src/sipc_mqueue.c (modified) (9 diffs)
- trunk/libsipc/src/sipc_shm.c (modified) (4 diffs)
- trunk/libsipc/tests/mqueue.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
trunk/libsipc/src/mqueue_internal.c
r46 r48 120 120 int flags = block ? 0 : IPC_NOWAIT; 121 121 122 if (msgrcv(msqid, &mbuf, sizeof(mbuf.mtext), type, flags) < 0) { 123 if (errno == ENOMSG) 122 while (msgrcv(msqid, &mbuf, sizeof(mbuf.mtext), type, flags) < 0) { 123 if (errno == EINTR) { 124 continue; 125 } 126 if (errno == ENOMSG || errno == EAGAIN) 124 127 return 0; 125 128 return -1; trunk/libsipc/src/shm_internal.c
r41 r48 36 36 int sipc_shm_recv_done(sipc_t *sipc) 37 37 { 38 if (!sipc || sipc->ipc_type != SIPC_SYSV_SHM) 38 if (!sipc || sipc->ipc_type != SIPC_SYSV_SHM) { 39 errno = EINVAL; 39 40 return -1; 41 } 40 42 41 mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK); 42 mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK); 43 if (mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_READING, MQ_BLOCK) < 0) { 44 return -1; 45 } 46 sipc->len = 0; 47 if (mqueue_get_msg_type(sipc->s.msqid, SIPC_DATA_DONE, MQ_BLOCK) < 0) { 48 return -1; 49 } 43 50 return 0; 44 51 } trunk/libsipc/src/sipc.c
r47 r48 54 54 new_sipc->role = role; 55 55 new_sipc->non_blocking = 0; 56 new_sipc->len = len; 57 new_sipc->msg_len = SIPC_MSGLEN_NOT_SET; 56 new_sipc->data_size = len; 57 new_sipc->len = 0; 58 new_sipc->recv_len = 0; 59 new_sipc->recv_data = NULL; 58 60 59 61 /* call backend specific init to fill in function table */ trunk/libsipc/src/sipc_internal.h
r45 r48 34 34 #define SIPC_MSG_LEN 0x09 35 35 #define SIPC_END_XMIT 0x0c 36 37 /* Invalid message length */38 #define SIPC_MSGLEN_NOT_SET -139 36 40 37 /* Creation functions */ … … 90 87 }; 91 88 92 char *data; 89 char *data; /* Buffer for sending data */ 93 90 struct msgbuf *mbuf; 94 size_t len; 95 size_t msg_len; /* Length in bytes of the next message */ 96 size_t copied; /* Number of bytes xmitted so far */ 91 size_t data_size; /* Size of the data buffer, set in sipc_open() */ 92 size_t len; /* Number of bytes xmitted so far, 93 or number of bytes received so far */ 94 size_t recv_len; /* Number of bytes to be received */ 95 char *recv_data; /* Buffer for receiving mqueue data */ 97 96 struct sipc_func_table *funcs; 98 97 }; trunk/libsipc/src/sipc_mqueue.c
r47 r48 56 56 static int is_msg_len(struct msgbuf *mbuf); 57 57 static int is_end_xmit(struct msgbuf *mbuf); 58 static size_t next_alloc_sz(int recv, int recv_sz, int max_packet_sz, int msg_len);59 58 60 59 int sipc_mqueue_init(sipc_t *sipc) … … 104 103 { 105 104 /* Allocate data member */ 106 sipc->data = calloc(1, sipc-> len);105 sipc->data = calloc(1, sipc->data_size); 107 106 if (!sipc->data) { 108 107 sipc_error(sipc, "Out of memory\n"); … … 182 181 int sipc_mqueue_recv_data(sipc_t *sipc, char **data, int *len) 183 182 { 184 int retv = -1 , alloc_sz = 0, idx = 0, recv_sz;185 int msgtxt_sz = SIPC_MQUEUE_MSG_SZ - sizeof(struct msgbuf);183 int retv = -1; 184 size_t idx, recv_sz; 186 185 int error = 0; 187 186 188 if (data)189 *data = NULL; /* Pointer to a buffer created here */190 if (len)191 *len = 0; /* Bytes received so far */192 if (!sipc || !data || !len) {193 error = EINVAL;194 goto err;195 }196 197 187 idx = 0; /* Current index into the buffer */ 198 199 if (sipc->role != SIPC_RECEIVER) {200 error = EBADF;201 sipc_error(sipc, "sipc_send_data called without receiver flag enabled in IPC structure\n");202 goto err;203 }204 188 205 189 struct msgbuf *mbuf = sipc->mbuf; … … 208 192 209 193 /* Receive and validate the length of message marker */ 210 if (sipc-> msg_len == SIPC_MSGLEN_NOT_SET) {211 if(msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking) < 0) {194 if (sipc->recv_len == 0) { 195 while (msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking) < 0) { 212 196 error = errno; 197 if (error == EINTR) { 198 continue; 199 } 213 200 if (error == ENOMSG || error == EAGAIN) 214 201 error = EAGAIN; … … 223 210 224 211 if (is_msg_len(mbuf)) { 225 sipc->msg_len = atoi(mbuf->mtext); 226 if (sipc->msg_len < 0) { 212 errno = 0; 213 sipc->recv_len = (size_t) strtoul(mbuf->mtext, NULL, 10); 214 if (errno != 0) { 227 215 error = EIO; 228 216 sipc_error(sipc, "libsipc: bad message length\n"); 229 217 goto err; 230 218 } 231 if (sipc-> msg_len > sipc->len) {219 if (sipc->recv_len > sipc->data_size) { 232 220 error = EIO; 233 221 sipc_error(sipc, "libsipc: cannot receive buffer of size %zd, can only receive %zd\n", 234 sipc-> msg_len, sipc->len);222 sipc->recv_len, sipc->data_size); 235 223 goto err; 236 224 } … … 240 228 goto err; 241 229 } 230 sipc->len = 0; 231 } 232 233 if (sipc->recv_len == 0) { 234 /* special case if there is no data to be received: *data 235 needs to point to a buffer, but a malloc() of 0 is not 236 defined, so allocate a single byte for it. */ 237 if ((sipc->recv_data = malloc(1)) == NULL) { 238 error = errno; 239 sipc_error(sipc, "%s\n", "Out of memory"); 240 goto err; 241 } 242 242 } 243 243 … … 245 245 * Stop when we have received the total number of bytes 246 246 * or upon receiving the end of message marker. */ 247 if((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) {247 while ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 248 248 error = errno; 249 if (error == EINTR) { 250 continue; 251 } 249 252 if (error == ENOMSG || error == EAGAIN) 250 253 error = EAGAIN; … … 255 258 256 259 while (!is_end_xmit(mbuf)) { 257 *len += recv_sz; 258 259 /* If this isn't the last packet in the message, 260 * resize to +1 message */ 261 alloc_sz += next_alloc_sz(*len, recv_sz, msgtxt_sz, sipc->msg_len); 262 263 char *tdata = realloc(*data, alloc_sz); 264 if (!tdata) { 260 size_t amount_to_copy = sipc->recv_len - sipc->len > recv_sz ? recv_sz : sipc->recv_len - sipc->len; 261 if (amount_to_copy != 0) { 262 char *tdata = realloc(sipc->recv_data, sipc->len + amount_to_copy); 263 if (!tdata) { 264 error = errno; 265 sipc_error(sipc, "%s\n", "Out of memory"); 266 goto err; 267 } 268 sipc->recv_data = tdata; 269 270 /* memset(*data+idx, 0x0, recv_sz); */ 271 memcpy(sipc->recv_data + sipc->len, mbuf->mtext, amount_to_copy); 272 sipc->len += amount_to_copy; 273 } 274 275 /* Receive the next message */ 276 while ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 265 277 error = errno; 266 sipc_error(sipc, "%s\n", "Out of memory"); 267 goto err; 268 } 269 *data = tdata; 270 271 /* memset(*data+idx, 0x0, recv_sz); */ 272 memcpy(*data + idx, mbuf->mtext, recv_sz); 273 idx += recv_sz; 274 275 /* Receive the next message */ 276 if ((recv_sz = msgrcv(sipc->msqid, mbuf, SIPC_MQUEUE_MSG_SZ - sizeof(long), SIPC_ANY, sipc->non_blocking)) < 0) { 277 error = errno; 278 if (error == EINTR) { 279 continue; 280 } 278 281 if (error == ENOMSG || error == EAGAIN) 279 282 error = EAGAIN; 280 283 else 281 284 sipc_error(sipc, "msgrcv: %s\n", strerror(errno)); 282 /* Note that it is possible for a non-blocking283 * SIPC to fail while receiving a chunked message,284 * in which case the entire message becomes lost.285 * This is probably not the best design possible.286 */287 285 goto err; 288 286 } 289 287 } 290 288 289 *data = sipc->recv_data; 290 *len = sipc->len; 291 sipc->recv_len = 0; 292 sipc->len = 0; 293 sipc->recv_data = NULL; 291 294 retv = 0; 295 292 296 err: 293 sipc->msg_len = SIPC_MSGLEN_NOT_SET; /* Unset message length */ 297 if (retv && error != EAGAIN) { 298 free(sipc->recv_data); 299 sipc->recv_data = NULL; 300 sipc->len = 0; 301 sipc->recv_len = 0; 302 } 294 303 errno = error; 295 if (retv) {296 free(*data);297 *data = NULL;298 *len = 0;299 }300 304 return retv; 301 305 } … … 408 412 return 0; 409 413 } 410 411 /* Determine how much memory to allocate for receiving a packet.412 * A packet should not contain extra padding.413 * Returns the size of the next allocation on success, -1 on failure. */414 static size_t next_alloc_sz(int need, int recv_sz, int max_packet_sz, int msg_len)415 {416 if (need < 0 || max_packet_sz <= 0 || msg_len <= 0)417 return -1;418 419 if (need > msg_len)420 return max_packet_sz;421 422 return recv_sz;423 }trunk/libsipc/src/sipc_shm.c
r47 r48 67 67 68 68 /* Create the shm segment */ 69 sipc->s.shmid = shmget(sipc->key, sipc-> len, SHM_CPERMS);69 sipc->s.shmid = shmget(sipc->key, sipc->data_size, SHM_CPERMS); 70 70 if (sipc->s.shmid < 0) { 71 71 sipc_error(sipc, "shmget: %s\n", strerror(errno)); … … 99 99 /* Get the (existing) shm segment */ 100 100 flags = sipc->role == SIPC_SENDER ? SHM_SPERMS : SHM_RPERMS; 101 sipc->s.shmid = shmget(sipc->key, sipc-> len, flags);101 sipc->s.shmid = shmget(sipc->key, sipc->data_size, flags); 102 102 if (sipc->s.shmid < 0) { 103 103 sipc_error(sipc, "shmget: %s\n", strerror(errno)); … … 163 163 164 164 /* Send a DATA_READY marker */ 165 sipc->len = msg_len; 165 166 if (mqueue_send_msg_type(sipc->s.msqid, SIPC_DATA_READY, MQ_BLOCK) < 0) { 166 167 error = errno; … … 207 208 /* It is now OK to read shared memory */ 208 209 *data = sipc->data; 209 *len = sipc-> len;210 *len = sipc->data_size; 210 211 return 0; 211 212 } else { trunk/libsipc/tests/mqueue.c
r47 r48 330 330 int len = 0; 331 331 332 size_t total_read = 0, j = 0; 333 while (!sipc_recv_data(reader2_ipc, &data, &len)) { 334 CU_ASSERT_PTR_NOT_NULL_FATAL(data); 335 CU_ASSERT_FATAL(len > 0); 336 337 size_t i; 338 for (i = 0; i < len; i++) { 339 if (total_read % 512 == 0) { 340 j++; 341 } 342 CU_ASSERT(data[i] == j); 343 total_read++; 332 size_t j = 0; 333 CU_ASSERT(sipc_recv_data(reader2_ipc, &data, &len) == 0); 334 CU_ASSERT_PTR_NOT_NULL_FATAL(data); 335 CU_ASSERT_FATAL(len == BIG_DATA_LEN); 336 337 size_t i; 338 for (i = 0; i < len; i++) { 339 if (i % 512 == 0) { 340 j++; 344 341 } 345 346 free(data); 347 if (total_read >= BIG_DATA_LEN) { 348 return; 342 CU_ASSERT(data[i] == j); 343 } 344 free(data); 345 346 CU_ASSERT(sipc_recv_data(reader2_ipc, &data, &len) == 0); 347 CU_ASSERT_PTR_NOT_NULL_FATAL(data); 348 CU_ASSERT_FATAL(len == BIG_DATA_LEN - 1); 349 350 j = 42; 351 for (i = 0; i < len; i++) { 352 CU_ASSERT(data[i] == j); 353 j++; 354 if (i % 120 == 0) { 355 j = 0; 349 356 } 350 357 } 351 CU_FAIL("Did not receive as much data as expected.");358 free(data); 352 359 } 353 360 … … 366 373 } 367 374 data[i] = j; 375 } 376 377 /* Send data, then end of xmit */ 378 CU_ASSERT(sipc_send_data(writer2_ipc, i) == 0); 379 380 /* This time send data that does not nicely fit on a word boundary */ 381 j = 42; 382 for (i = 0; i < BIG_DATA_LEN - 1; i++) { 383 data[i] = j++; 384 if (i % 120 == 0) { 385 j = 0; 386 } 368 387 } 369 388
