-
Notifications
You must be signed in to change notification settings - Fork 0
/
riverq.cc
60 lines (54 loc) · 1.54 KB
/
riverq.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include "riverq.hh"
namespace riverq {
extern "C" std::atomic<u64>* publish(spsct* q, std::atomic<u64>* producer) {
if (producer == q->end) {
q->producerWrapped++;
producer = q->queue;
}
std::atomic<u64>* consumed;
while (unlikely(producer == (consumed = q->consumed.load(relaxed)))) {
//printf("waiting for consumer to move past %lx\n", u64(consumed));
q->producerStall++;
_mm_pause(); // wait for consumer to catch up
}
q->published.store(producer, release);
return producer;
}
extern "C" std::atomic<u64>* advance(spsct* q, std::atomic<u64>* consumer) {
if (consumer == q->end) {
q->consumerWrapped++;
consumer = q->queue;
}
std::atomic<u64>* published;
while (unlikely((published = q->published.load(acquire)) == consumer)) {
//printf("waiting for producer to move past %lx\n", u64(published));
q->consumerStall++;
_mm_pause(); // let producer get ahead
}
q->consumed.store(consumer, relaxed);
return consumer;
}
extern "C" std::atomic<u64>* wait_for_consumer(spscl* q, std::atomic<u64>* producer) {
if (producer == q->end) {
q->producerWrapped++;
producer = q->queue;
}
while (producer->load(relaxed) != 0) {
q->producerStall++;
pthread_yield();
}
return producer;
}
extern "C" std::atomic<u64>* wait_for_producer(spscl* q, std::atomic<u64>* consumer) {
if (consumer == q->end) {
q->consumerWrapped++;
memset(consumer-16, 0, 128);
consumer = q->queue;
}
while (consumer->load(relaxed) == 0) {
q->consumerStall++;
pthread_yield();
}
return consumer;
}
}