From f1f16202873064cdb76e738fba8bfbb5f2ee530d Mon Sep 17 00:00:00 2001 From: Giovanni Di Sirio Date: Tue, 2 Oct 2018 12:58:00 +0000 Subject: Revision of pipes. git-svn-id: svn://svn.code.sf.net/p/chibios/svn/trunk@12315 110e8d01-0319-4d1e-a829-52ad28d1bb01 --- os/lib/include/chpipes.h | 45 +++++++++------------ os/lib/src/chpipes.c | 100 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 86 insertions(+), 59 deletions(-) (limited to 'os') diff --git a/os/lib/include/chpipes.h b/os/lib/include/chpipes.h index a09b3c72a..8bf26c62b 100644 --- a/os/lib/include/chpipes.h +++ b/os/lib/include/chpipes.h @@ -58,12 +58,16 @@ typedef struct { uint8_t *rdptr; /**< @brief Read pointer. */ size_t cnt; /**< @brief Bytes in the pipe. */ bool reset; /**< @brief True if in reset state. */ - threads_queue_t qw; /**< @brief Queued writers. */ - threads_queue_t qr; /**< @brief Queued readers. */ + thread_reference_t wtr; /**< @brief Waiting writer. */ + thread_reference_t rtr; /**< @brief Waiting reader. */ #if (CH_CFG_USE_MUTEXES == TRUE) || defined(__DOXYGEN__) - mutex_t mtx; /**< @brief Heap access mutex. */ + mutex_t cmtx; /**< @brief Common access mutex. */ + mutex_t wmtx; /**< @brief Write access mutex. */ + mutex_t rmtx; /**< @brief Read access mutex. */ #else - semaphore_t sem; /**< @brief Heap access semaphore. */ + semaphore_t csem; /**< @brief Common access semaphore.*/ + semaphore_t wsem; /**< @brief Write access semaphore. */ + semaphore_t rsem; /**< @brief Read access semaphore. */ #endif } pipe_t; @@ -88,9 +92,11 @@ typedef struct { (uint8_t *)(buffer), \ (size_t)0, \ false, \ - _THREADS_QUEUE_DATA(name.qw), \ - _THREADS_QUEUE_DATA(name.qr), \ - _MUTEX_DATA(name.mtx), \ + NULL, \ + NULL, \ + _MUTEX_DATA(name.cmtx), \ + _MUTEX_DATA(name.wmtx), \ + _MUTEX_DATA(name.rmtx), \ } #else /* CH_CFG_USE_MUTEXES == FALSE */ #define _PIPE_DATA(name, buffer, size) { \ @@ -100,9 +106,11 @@ typedef struct { (uint8_t *)(buffer), \ (size_t)0, \ false, \ - _THREADS_QUEUE_DATA(name.qw), \ - _THREADS_QUEUE_DATA(name.qr), \ - _SEMAPHORE_DATA(name.sem, (cnt_t)1), \ + NULL, \ + NULL, \ + _SEMAPHORE_DATA(name.csem, (cnt_t)1), \ + _SEMAPHORE_DATA(name.wsem, (cnt_t)1), \ + _SEMAPHORE_DATA(name.rsem, (cnt_t)1), \ } #endif /* CH_CFG_USE_MUTEXES == FALSE */ @@ -181,23 +189,6 @@ static inline size_t chPipeGetFreeCount(const pipe_t *pp) { return chPipeGetSize(pp) - chPipeGetUsedCount(pp); } -/** - * @brief Returns the next byte in the queue without removing it. - * @pre A byte must be present in the queue for this function to work - * or it would return garbage. The correct way to use this macro is - * to use @p chPipeGetFullCountI() and then use this macro, all within - * a lock state. - * - * @param[in] pp the pointer to an initialized @p pipe_t object - * @return The next byte in queue. - * - * @api - */ -static inline uint8_t chPipePeek(const pipe_t *pp) { - - return *pp->rdptr; -} - /** * @brief Terminates the reset state. * diff --git a/os/lib/src/chpipes.c b/os/lib/src/chpipes.c index 6ce029205..c18a21597 100644 --- a/os/lib/src/chpipes.c +++ b/os/lib/src/chpipes.c @@ -51,11 +51,25 @@ * Defaults on the best synchronization mechanism available. */ #if (CH_CFG_USE_MUTEXES == TRUE) || defined(__DOXYGEN__) -#define P_LOCK(p) chMtxLock(&(p)->mtx) -#define P_UNLOCK(p) chMtxUnlock(&(p)->mtx) +#define PC_INIT(p) chMtxObjectInit(&(p)->cmtx) +#define PC_LOCK(p) chMtxLock(&(p)->cmtx) +#define PC_UNLOCK(p) chMtxUnlock(&(p)->cmtx) +#define PW_INIT(p) chMtxObjectInit(&(p)->wmtx) +#define PW_LOCK(p) chMtxLock(&(p)->wmtx) +#define PW_UNLOCK(p) chMtxUnlock(&(p)->wmtx) +#define PR_INIT(p) chMtxObjectInit(&(p)->rmtx) +#define PR_LOCK(p) chMtxLock(&(p)->rmtx) +#define PR_UNLOCK(p) chMtxUnlock(&(p)->rmtx) #else -#define P_LOCK(p) (void) chSemWait(&(p)->sem) -#define P_UNLOCK(p) chSemSignal(&(p)->sem) +#define PC_INIT(p) chMtxObjectInit(&(p)->csem, (cnt_t)1) +#define PC_LOCK(p) (void) chSemWait(&(p)->csem) +#define PC_UNLOCK(p) chSemSignal(&(p)->csem) +#define PW_INIT(p) chMtxObjectInit(&(p)->wsem, (cnt_t)1) +#define PW_LOCK(p) (void) chSemWait(&(p)->wsem) +#define PW_UNLOCK(p) chSemSignal(&(p)->wsem) +#define PR_INIT(p) chMtxObjectInit(&(p)->rsem, (cnt_t)1) +#define PR_LOCK(p) (void) chSemWait(&(p)->rsem) +#define PR_UNLOCK(p) chSemSignal(&(p)->rsem) #endif /*===========================================================================*/ @@ -91,15 +105,19 @@ static size_t pipe_write(pipe_t *pp, const uint8_t *bp, size_t n) { size_t s1, s2; + PC_LOCK(pp); + /* Number of bytes that can be written in a single atomic operation.*/ if (n > chPipeGetFreeCount(pp)) { n = chPipeGetFreeCount(pp); } + pp->cnt += n; /* Number of bytes before buffer limit.*/ /*lint -save -e9033 [10.8] Checked to be safe.*/ s1 = (size_t)(pp->top - pp->wrptr); /*lint -restore*/ + if (n < s1) { memcpy((void *)pp->wrptr, (const void *)bp, n); pp->wrptr += n; @@ -116,7 +134,8 @@ static size_t pipe_write(pipe_t *pp, const uint8_t *bp, size_t n) { pp->wrptr = pp->buffer; } - pp->cnt += n; + PC_UNLOCK(pp); + return n; } @@ -137,15 +156,19 @@ static size_t pipe_write(pipe_t *pp, const uint8_t *bp, size_t n) { static size_t pipe_read(pipe_t *pp, uint8_t *bp, size_t n) { size_t s1, s2; + PC_LOCK(pp); + /* Number of bytes that can be read in a single atomic operation.*/ if (n > chPipeGetUsedCount(pp)) { n = chPipeGetUsedCount(pp); } + pp->cnt -= n; /* Number of bytes before buffer limit.*/ /*lint -save -e9033 [10.8] Checked to be safe.*/ s1 = (size_t)(pp->top - pp->rdptr); /*lint -restore*/ + if (n < s1) { memcpy((void *)bp, (void *)pp->rdptr, n); pp->rdptr += n; @@ -162,7 +185,8 @@ static size_t pipe_read(pipe_t *pp, uint8_t *bp, size_t n) { pp->rdptr = pp->buffer; } - pp->cnt -= n; + PC_UNLOCK(pp); + return n; } @@ -190,8 +214,11 @@ void chPipeObjectInit(pipe_t *pp, uint8_t *buf, size_t n) { pp->top = &buf[n]; pp->cnt = (size_t)0; pp->reset = false; - chThdQueueObjectInit(&pp->qw); - chThdQueueObjectInit(&pp->qr); + pp->wtr = NULL; + pp->rtr = NULL; + PC_INIT(pp); + PW_INIT(pp); + PR_INIT(pp); } /** @@ -210,17 +237,20 @@ void chPipeReset(pipe_t *pp) { chDbgCheck(pp != NULL); - P_LOCK(pp); - chSysLock(); + PC_LOCK(pp); + pp->wrptr = pp->buffer; pp->rdptr = pp->buffer; pp->cnt = (size_t)0; pp->reset = true; - chThdDequeueAllI(&pp->qw, MSG_RESET); - chThdDequeueAllI(&pp->qr, MSG_RESET); + + chSysLock(); + chThdResumeI(&pp->wtr, MSG_RESET); + chThdResumeI(&pp->rtr, MSG_RESET); chSchRescheduleS(); chSysUnlock(); - P_UNLOCK(pp); + + PC_UNLOCK(pp); } /** @@ -232,16 +262,16 @@ void chPipeReset(pipe_t *pp) { * * @param[in] pp the pointer to an initialized @p pipe_t object * @param[in] bp pointer to the data buffer - * @param[in] n the maximum amount of data to be transferred, the - * value 0 is reserved + * @param[in] n the number of bytes to be written, the value 0 is + * reserved * @param[in] timeout the number of ticks before the operation timeouts, * the following special values are allowed: * - @a TIME_IMMEDIATE immediate timeout. * - @a TIME_INFINITE no timeout. * . - * @return The number of bytes effectively transferred. - * @retval MSG_RESET if the mailbox has been reset. - * @retval MSG_TIMEOUT if the operation has timed out. + * @return The number of bytes effectively transferred. A number + * lower than @p n means that a timeout occurred or the + * pipe went in reset state. * * @api */ @@ -253,10 +283,10 @@ size_t chPipeWriteTimeout(pipe_t *pp, const uint8_t *bp, /* If the pipe is in reset state then returns immediately.*/ if (pp->reset) { - return MSG_RESET; + return (size_t)0; } - P_LOCK(pp); + PW_LOCK(pp); while (n > 0U) { size_t done; @@ -266,7 +296,7 @@ size_t chPipeWriteTimeout(pipe_t *pp, const uint8_t *bp, msg_t msg; chSysLock(); - msg = chThdEnqueueTimeoutS(&pp->qw, timeout); + msg = chThdSuspendTimeoutS(&pp->wtr, timeout); chSysUnlock(); /* Anything except MSG_OK causes the operation to stop.*/ @@ -277,10 +307,13 @@ size_t chPipeWriteTimeout(pipe_t *pp, const uint8_t *bp, else { n -= done; bp += done; + + /* Resuming the reader, if present.*/ + chThdResume(&pp->rtr, MSG_OK); } } - P_UNLOCK(pp); + PW_UNLOCK(pp); return max - n; } @@ -293,17 +326,17 @@ size_t chPipeWriteTimeout(pipe_t *pp, const uint8_t *bp, * been reset. * * @param[in] pp the pointer to an initialized @p pipe_t object - * @param[out] bp pointer to the data buffer - * @param[in] n the maximum amount of data to be transferred, the - * value 0 is reserved + * @param[out] bp pointer to the data buffer + * @param[in] n the number of bytes to be read, the value 0 is + * reserved * @param[in] timeout the number of ticks before the operation timeouts, * the following special values are allowed: * - @a TIME_IMMEDIATE immediate timeout. * - @a TIME_INFINITE no timeout. * . - * @return The number of bytes effectively transferred. - * @retval MSG_RESET if the mailbox has been reset. - * @retval MSG_TIMEOUT if the operation has timed out. + * @return The number of bytes effectively transferred. A number + * lower than @p n means that a timeout occurred or the + * pipe went in reset state. * * @api */ @@ -315,10 +348,10 @@ size_t chPipeReadTimeout(pipe_t *pp, uint8_t *bp, /* If the pipe is in reset state then returns immediately.*/ if (pp->reset) { - return MSG_RESET; + return (size_t)0; } - P_LOCK(pp); + PR_LOCK(pp); while (n > 0U) { size_t done; @@ -328,7 +361,7 @@ size_t chPipeReadTimeout(pipe_t *pp, uint8_t *bp, msg_t msg; chSysLock(); - msg = chThdEnqueueTimeoutS(&pp->qr, timeout); + msg = chThdSuspendTimeoutS(&pp->rtr, timeout); chSysUnlock(); /* Anything except MSG_OK causes the operation to stop.*/ @@ -339,10 +372,13 @@ size_t chPipeReadTimeout(pipe_t *pp, uint8_t *bp, else { n -= done; bp += done; + + /* Resuming the writer, if present.*/ + chThdResume(&pp->wtr, MSG_OK); } } - P_UNLOCK(pp); + PR_UNLOCK(pp); return max - n; } -- cgit v1.2.3