1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
#
#
# Nimrod's Runtime Library
# (c) Copyright 2011 Andreas Rumpf
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## Channel support for threads. **Note**: This is part of the system module.
## Do not import it directly. To activate thread support you need to compile
## with the ``--threads:on`` command line switch.
##
## **Note:** The current implementation of message passing is slow and does
## not work with cyclic data structures.
type
pbytes = ptr array[0.. 0xffff, byte]
TRawChannel {.pure, final.} = object ## msg queue for a thread
rd, wr, count, mask: int
data: pbytes
lock: TSysLock
cond: TSysCond
elemType: PNimType
ready: bool
region: TMemRegion
PRawChannel = ptr TRawChannel
TLoadStoreMode = enum mStore, mLoad
TChannel*[TMsg] = TRawChannel ## a channel for thread communication
const ChannelDeadMask = -2
proc initRawChannel(p: pointer) =
var c = cast[PRawChannel](p)
initSysLock(c.lock)
initSysCond(c.cond)
c.mask = -1
proc deinitRawChannel(p: pointer) =
var c = cast[PRawChannel](p)
# we need to grab the lock to be save against sending threads!
acquireSys(c.lock)
c.mask = ChannelDeadMask
deallocOsPages(c.region)
deinitSys(c.lock)
deinitSysCond(c.cond)
proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel,
mode: TLoadStoreMode)
proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PRawChannel,
mode: TLoadStoreMode) =
var
d = cast[TAddress](dest)
s = cast[TAddress](src)
case n.kind
of nkSlot: storeAux(cast[pointer](d +% n.offset),
cast[pointer](s +% n.offset), n.typ, t, mode)
of nkList:
for i in 0..n.len-1: storeAux(dest, src, n.sons[i], t, mode)
of nkCase:
copyMem(cast[pointer](d +% n.offset), cast[pointer](s +% n.offset),
n.typ.size)
var m = selectBranch(src, n)
if m != nil: storeAux(dest, src, m, t, mode)
of nkNone: sysAssert(false)
proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel,
mode: TLoadStoreMode) =
var
d = cast[TAddress](dest)
s = cast[TAddress](src)
sysAssert(mt != nil)
case mt.Kind
of tyString:
if mode == mStore:
var x = cast[ppointer](dest)
var s2 = cast[ppointer](s)[]
if s2 == nil:
x[] = nil
else:
var ss = cast[NimString](s2)
var ns = cast[NimString](Alloc(t.region, ss.len+1 + GenericSeqSize))
copyMem(ns, ss, ss.len+1 + GenericSeqSize)
x[] = ns
else:
var x = cast[ppointer](dest)
var s2 = cast[ppointer](s)[]
if s2 == nil:
unsureAsgnRef(x, s2)
else:
unsureAsgnRef(x, copyString(cast[NimString](s2)))
Dealloc(t.region, s2)
of tySequence:
var s2 = cast[ppointer](src)[]
var seq = cast[PGenericSeq](s2)
var x = cast[ppointer](dest)
if s2 == nil:
if mode == mStore:
x[] = nil
else:
unsureAsgnRef(x, nil)
else:
sysAssert(dest != nil)
if mode == mStore:
x[] = Alloc(t.region, seq.len *% mt.base.size +% GenericSeqSize)
else:
unsureAsgnRef(x, newObj(mt, seq.len * mt.base.size + GenericSeqSize))
var dst = cast[taddress](cast[ppointer](dest)[])
for i in 0..seq.len-1:
storeAux(
cast[pointer](dst +% i*% mt.base.size +% GenericSeqSize),
cast[pointer](cast[TAddress](s2) +% i *% mt.base.size +%
GenericSeqSize),
mt.Base, t, mode)
var dstseq = cast[PGenericSeq](dst)
dstseq.len = seq.len
dstseq.space = seq.len
if mode != mStore: Dealloc(t.region, s2)
of tyObject:
# copy type field:
var pint = cast[ptr PNimType](dest)
# XXX use dynamic type here!
pint[] = mt
storeAux(dest, src, mt.node, t, mode)
of tyTuple, tyPureObject:
storeAux(dest, src, mt.node, t, mode)
of tyArray, tyArrayConstr:
for i in 0..(mt.size div mt.base.size)-1:
storeAux(cast[pointer](d +% i*% mt.base.size),
cast[pointer](s +% i*% mt.base.size), mt.base, t, mode)
of tyRef:
var s = cast[ppointer](src)[]
var x = cast[ppointer](dest)
if s == nil:
if mode == mStore:
x[] = nil
else:
unsureAsgnRef(x, nil)
else:
if mode == mStore:
x[] = Alloc(t.region, mt.base.size)
else:
# XXX we should use the dynamic type here too, but that is not stored
# in the inbox at all --> use source[]'s object type? but how? we need
# a tyRef to the object!
var obj = newObj(mt.base, mt.base.size)
unsureAsgnRef(x, obj)
storeAux(x[], s, mt.base, t, mode)
if mode != mStore: Dealloc(t.region, s)
else:
copyMem(dest, src, mt.size) # copy raw bits
proc rawSend(q: PRawChannel, data: pointer, typ: PNimType) =
## adds an `item` to the end of the queue `q`.
var cap = q.mask+1
if q.count >= cap:
# start with capicity for 2 entries in the queue:
if cap == 0: cap = 1
var n = cast[pbytes](Alloc0(q.region, cap*2*typ.size))
var z = 0
var i = q.rd
var c = q.count
while c > 0:
dec c
copyMem(addr(n[z*typ.size]), addr(q.data[i*typ.size]), typ.size)
i = (i + 1) and q.mask
inc z
if q.data != nil: Dealloc(q.region, q.data)
q.data = n
q.mask = cap*2 - 1
q.wr = q.count
q.rd = 0
storeAux(addr(q.data[q.wr * typ.size]), data, typ, q, mStore)
inc q.count
q.wr = (q.wr + 1) and q.mask
proc rawRecv(q: PRawChannel, data: pointer, typ: PNimType) =
assert q.count > 0
dec q.count
storeAux(data, addr(q.data[q.rd * typ.size]), typ, q, mLoad)
q.rd = (q.rd + 1) and q.mask
template lockChannel(q: expr, action: stmt) =
acquireSys(q.lock)
action
releaseSys(q.lock)
template sendImpl(q: expr) =
if q.mask == ChannelDeadMask:
raise newException(EDeadThread, "cannot send message; thread died")
acquireSys(q.lock)
var m: TMsg
shallowCopy(m, msg)
var typ = cast[PNimType](getTypeInfo(msg))
rawSend(q, addr(m), typ)
q.elemType = typ
releaseSys(q.lock)
SignalSysCond(q.cond)
proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) =
## sends a message to a thread. `msg` is deeply copied.
var q = cast[PRawChannel](addr(c))
sendImpl(q)
proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) =
# to save space, the generic is as small as possible
acquireSys(q.lock)
q.ready = true
while q.count <= 0:
WaitSysCond(q.cond, q.lock)
q.ready = false
if typ != q.elemType:
releaseSys(q.lock)
raise newException(EInvalidValue, "cannot receive message of wrong type")
rawRecv(q, res, typ)
releaseSys(q.lock)
proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =
## receives a message from the channel `c`. This blocks until
## a message has arrived! You may use ``peek`` to avoid the blocking.
var q = cast[PRawChannel](addr(c))
llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))
proc peek*[TMsg](c: var TChannel[TMsg]): int =
## returns the current number of messages in the channel `c`.
var q = cast[PRawChannel](addr(c))
if q.mask != ChannelDeadMask:
lockChannel(q):
result = q.count
proc open*[TMsg](c: var TChannel[TMsg]) =
## opens a channel `c` for inter thread communication.
initRawChannel(addr(c))
proc close*[TMsg](c: var TChannel[TMsg]) =
## closes a channel `c` and frees its associated resources.
deinitRawChannel(addr(c))
proc ready*[TMsg](c: var TChannel[TMsg]): bool =
## returns true iff some thread is waiting on the channel `c` for
## new messages.
var q = cast[PRawChannel](addr(c))
result = q.ready
|