/*
* Copyright (c) 2018 CZ.NIC, z.s.p.o.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* - The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Authors:
* Jiří Zárevúcky (jzr) <zarevucky.jiri@gmail.com>
*/
#include <fibril.h>
#include <fibril_synch.h>
#include <mem.h>
#include <stdlib.h>
#include "../private/fibril.h"
/*
* A multi-producer, single-consumer concurrent FIFO channel with unlimited
* buffering.
*
* The current implementation is based on the super simple two-lock queue
* by Michael and Scott
* (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf)
*
* The original algorithm uses one lock on each side. Since this queue is
* single-consumer, we only use the tail lock.
*/
typedef struct mpsc_node mpsc_node_t;
struct mpsc {
size_t elem_size;
fibril_rmutex_t t_lock;
mpsc_node_t *head;
mpsc_node_t *tail;
mpsc_node_t *close_node;
fibril_event_t event;
};
struct mpsc_node {
mpsc_node_t *next;
unsigned char data[];
};
mpsc_t *mpsc_create(size_t elem_size)
{
mpsc_t *q = calloc(1, sizeof(mpsc_t));
mpsc_node_t *n = calloc(1, sizeof(mpsc_node_t) + elem_size);
mpsc_node_t *c = calloc(1, sizeof(mpsc_node_t) + elem_size);
if (!q || !n || !c) {
free(q);
free(n);
free(c);
return NULL;
}
if (fibril_rmutex_initialize(&q->t_lock) != EOK) {
free(q);
free(n);
free(c);
return NULL;
}
q->elem_size = elem_size;
q->head = q->tail = n;
q->close_node = c;
return q;
}
void mpsc_destroy(mpsc_t *q)
{
mpsc_node_t *n = q->head;
mpsc_node_t *next = NULL;
while (n != NULL) {
next = n->next;
free(n);
n = next;
}
fibril_rmutex_destroy(&q->t_lock);
free(q);
}
static errno_t _mpsc_push(mpsc_t *q, mpsc_node_t *n)
{
fibril_rmutex_lock(&q->t_lock);
if (q->tail == q->close_node) {
fibril_rmutex_unlock(&q->t_lock);
return EINVAL;
}
__atomic_store_n(&q->tail->next, n, __ATOMIC_RELEASE);
q->tail = n;
fibril_rmutex_unlock(&q->t_lock);
fibril_notify(&q->event);
return EOK;
}
/**
* Send data on the channel.
* The length of data is equal to the `elem_size` value set in `mpsc_create`.
*
* This function is safe for use under restricted mutex lock.
*
* @return ENOMEM if allocation failed, EINVAL if the queue is closed.
*/
errno_t mpsc_send(mpsc_t *q, const void *b)
{
mpsc_node_t *n = malloc(sizeof(mpsc_node_t) + q->elem_size);
if (!n)
return ENOMEM;
n->next = NULL;
memcpy(n->data, b, q->elem_size);
return _mpsc_push(q, n);
}
/**
* Receive data from the channel.
*
* @return ETIMEOUT if deadline expires, ENOENT if the queue is closed and
* there is no message left in the queue.
*/
errno_t mpsc_receive(mpsc_t *q, void *b, const struct timespec *expires)
{
mpsc_node_t *n;
mpsc_node_t *new_head;
while (true) {
n = q->head;
new_head = __atomic_load_n(&n->next, __ATOMIC_ACQUIRE);
if (new_head)
break;
errno_t rc = fibril_wait_timeout(&q->event, expires);
if (rc != EOK)
return rc;
}
if (new_head == q->close_node)
return ENOENT;
memcpy(b, new_head->data, q->elem_size);
q->head = new_head;
free(n);
return EOK;
}
/**
* Close the channel.
*
* This function is safe for use under restricted mutex lock.
*/
void mpsc_close(mpsc_t *q)
{
_mpsc_push(q, q->close_node);
}