TrinityCore
Loading...
Searching...
No Matches
MPSCQueue.h
Go to the documentation of this file.
1/*
2 * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License as published by the
6 * Free Software Foundation; either version 2 of the License, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18#ifndef MPSCQueue_h__
19#define MPSCQueue_h__
20
21#include <atomic>
22#include <utility>
23
24namespace Trinity
25{
26namespace Impl
27{
28// C++ implementation of Dmitry Vyukov's lock free MPSC queue
29// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
30template<typename T>
32{
33public:
34 MPSCQueueNonIntrusive() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed))
35 {
36 Node* front = _head.load(std::memory_order_relaxed);
37 front->Next.store(nullptr, std::memory_order_relaxed);
38 }
39
41 {
42 T* output;
43 while (Dequeue(output))
44 delete output;
45
46 Node* front = _head.load(std::memory_order_relaxed);
47 delete front;
48 }
49
50 void Enqueue(T* input)
51 {
52 Node* node = new Node(input);
53 Node* prevHead = _head.exchange(node, std::memory_order_acq_rel);
54 prevHead->Next.store(node, std::memory_order_release);
55 }
56
57 bool Dequeue(T*& result)
58 {
59 Node* tail = _tail.load(std::memory_order_relaxed);
60 Node* next = tail->Next.load(std::memory_order_acquire);
61 if (!next)
62 return false;
63
64 result = next->Data;
65 _tail.store(next, std::memory_order_release);
66 delete tail;
67 return true;
68 }
69
70private:
71 struct Node
72 {
73 Node() = default;
74 explicit Node(T* data) : Data(data)
75 {
76 Next.store(nullptr, std::memory_order_relaxed);
77 }
78
79 T* Data;
80 std::atomic<Node*> Next;
81 };
82
83 std::atomic<Node*> _head;
84 std::atomic<Node*> _tail;
85
88};
89
90// C++ implementation of Dmitry Vyukov's lock free MPSC queue
91// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
92template<typename T, std::atomic<T*> T::* IntrusiveLink>
94{
95public:
96 MPSCQueueIntrusive() : _dummyPtr(reinterpret_cast<T*>(std::addressof(_dummy))), _head(_dummyPtr), _tail(_dummyPtr)
97 {
98 // _dummy is constructed from aligned_storage and is intentionally left uninitialized (it might not be default constructible)
99 // so we init only its IntrusiveLink here
100 std::atomic<T*>* dummyNext = new (&(_dummyPtr->*IntrusiveLink)) std::atomic<T*>();
101 dummyNext->store(nullptr, std::memory_order_relaxed);
102 }
103
105 {
106 T* output;
107 while (Dequeue(output))
108 delete output;
109 }
110
111 void Enqueue(T* input)
112 {
113 (input->*IntrusiveLink).store(nullptr, std::memory_order_release);
114 T* prevHead = _head.exchange(input, std::memory_order_acq_rel);
115 (prevHead->*IntrusiveLink).store(input, std::memory_order_release);
116 }
117
118 bool Dequeue(T*& result)
119 {
120 T* tail = _tail.load(std::memory_order_relaxed);
121 T* next = (tail->*IntrusiveLink).load(std::memory_order_acquire);
122 if (tail == _dummyPtr)
123 {
124 if (!next)
125 return false;
126
127 _tail.store(next, std::memory_order_release);
128 tail = next;
129 next = (next->*IntrusiveLink).load(std::memory_order_acquire);
130 }
131
132 if (next)
133 {
134 _tail.store(next, std::memory_order_release);
135 result = tail;
136 return true;
137 }
138
139 T* head = _head.load(std::memory_order_acquire);
140 if (tail != head)
141 return false;
142
144 next = (tail->*IntrusiveLink).load(std::memory_order_acquire);
145 if (next)
146 {
147 _tail.store(next, std::memory_order_release);
148 result = tail;
149 return true;
150 }
151 return false;
152 }
153
154private:
155 std::aligned_storage_t<sizeof(T), alignof(T)> _dummy;
157 std::atomic<T*> _head;
158 std::atomic<T*> _tail;
159
162};
163}
164}
165
166template<typename T, std::atomic<T*> T::* IntrusiveLink = nullptr>
167using MPSCQueue = std::conditional_t<IntrusiveLink != nullptr, Trinity::Impl::MPSCQueueIntrusive<T, IntrusiveLink>, Trinity::Impl::MPSCQueueNonIntrusive<T>>;
168
169#endif // MPSCQueue_h__
std::conditional_t< IntrusiveLink !=nullptr, Trinity::Impl::MPSCQueueIntrusive< T, IntrusiveLink >, Trinity::Impl::MPSCQueueNonIntrusive< T > > MPSCQueue
Definition MPSCQueue.h:167
Data
MPSCQueueIntrusive(MPSCQueueIntrusive const &)=delete
MPSCQueueIntrusive & operator=(MPSCQueueIntrusive const &)=delete
std::aligned_storage_t< sizeof(T), alignof(T)> _dummy
Definition MPSCQueue.h:155
MPSCQueueNonIntrusive & operator=(MPSCQueueNonIntrusive const &)=delete
std::atomic< Node * > _head
Definition MPSCQueue.h:83
std::atomic< Node * > _tail
Definition MPSCQueue.h:84
MPSCQueueNonIntrusive(MPSCQueueNonIntrusive const &)=delete
STL namespace.