summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@googlemail.com>2014-03-09 13:49:38 +0000
committerDominik Picheta <dominikpicheta@googlemail.com>2014-03-09 13:49:38 +0000
commita0975269564f2474953d2b12aaaab4ba71cec664 (patch)
treeaa2e55c68a8a53633ac99d15cd96321b37422ef3
parent7704cdc90ec187ef22f259f0e0f89ceeb5c13431 (diff)
downloadNim-a0975269564f2474953d2b12aaaab4ba71cec664.tar.gz
Fixes to asyncio2 on Linux.
-rw-r--r--lib/posix/epoll.nim2
-rw-r--r--lib/pure/asyncio2.nim28
-rw-r--r--lib/pure/selectors.nim26
-rw-r--r--lib/pure/sockets2.nim22
-rw-r--r--tests/async/tasyncawait.nim10
5 files changed, 64 insertions, 24 deletions
diff --git a/lib/posix/epoll.nim b/lib/posix/epoll.nim
index 366521551..57a2f001f 100644
--- a/lib/posix/epoll.nim
+++ b/lib/posix/epoll.nim
@@ -36,7 +36,7 @@ type
   epoll_data* {.importc: "union epoll_data", 
       header: "<sys/epoll.h>", pure, final.} = object # TODO: This is actually a union.
     #thePtr* {.importc: "ptr".}: pointer
-    fd*: cint # \
+    fd* {.importc: "fd".}: cint # \
     #u32*: uint32
     #u64*: uint64
 
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim
index 12d4cb5a3..60d489dda 100644
--- a/lib/pure/asyncio2.nim
+++ b/lib/pure/asyncio2.nim
@@ -473,7 +473,6 @@ else:
 
   proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) =
     assert sock in p.selector
-    echo("Update: ", events)
     if events == {}:
       discard p.selector.unregister(sock)
     else:
@@ -499,23 +498,25 @@ else:
     for info in p.selector.select(timeout):
       let data = PData(info.key.data)
       assert data.sock == info.key.fd
-      echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events)
       
       if EvRead in info.events:
-        var newReadCBs: seq[TCallback] = @[]
-        for cb in data.readCBs:
+        # Callback may add items to ``data.readCBs`` which causes issues if
+        # we are iterating over ``data.readCBs`` at the same time. We therefore
+        # make a copy to iterate over.
+        let currentCBs = data.readCBs
+        data.readCBs = @[]
+        for cb in currentCBs:
           if not cb(data.sock):
             # Callback wants to be called again.
-            newReadCBs.add(cb)
-        data.readCBs = newReadCBs
+            data.readCBs.add(cb)
       
       if EvWrite in info.events:
-        var newWriteCBs: seq[TCallback] = @[]
-        for cb in data.writeCBs:
+        let currentCBs = data.writeCBs
+        data.writeCBs = @[]
+        for cb in currentCBs:
           if not cb(data.sock):
             # Callback wants to be called again.
-            newWriteCBs.add(cb)
-        data.writeCBs = newWriteCBs
+            data.writeCBs.add(cb)
   
       var newEvents: set[TEvent]
       if data.readCBs.len != 0: newEvents = {EvRead}
@@ -615,7 +616,6 @@ else:
           retFuture.complete(0)
     addWrite(p, socket, cb)
     return retFuture
-        
 
   proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): 
       PFuture[tuple[address: string, client: TSocketHandle]] =
@@ -854,7 +854,7 @@ when isMainModule:
   sock.setBlocking false
 
 
-  when false:
+  when true:
     # Await tests
     proc main(p: PDispatcher): PFuture[int] {.async.} =
       discard await p.connect(sock, "irc.freenode.net", TPort(6667))
@@ -880,7 +880,7 @@ when isMainModule:
     
 
   else:
-    when false:
+    when true:
 
       var f = p.connect(sock, "irc.freenode.org", TPort(6667))
       f.callback =
@@ -919,4 +919,4 @@ when isMainModule:
 
   
 
-  
\ No newline at end of file
+  
diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim
index 6482a01a6..e086ee3ab 100644
--- a/lib/pure/selectors.nim
+++ b/lib/pure/selectors.nim
@@ -10,11 +10,13 @@
 # TODO: Docs.
 
 import tables, os, unsigned, hashes
+import sockets2
 
 when defined(linux): import posix, epoll
 elif defined(windows): import winlean
 
 proc hash*(x: TSocketHandle): THash {.borrow.}
+proc `$`*(x: TSocketHandle): string {.borrow.}
 
 type
   TEvent* = enum
@@ -31,7 +33,7 @@ when defined(linux) or defined(nimdoc):
   type
     PSelector* = ref object
       epollFD: cint
-      events: array[64, ptr epoll_event]
+      events: array[64, epoll_event]
       fds: TTable[TSocketHandle, PSelectorKey]
   
   proc createEventStruct(events: set[TEvent], fd: TSocketHandle): epoll_event =
@@ -66,17 +68,25 @@ when defined(linux) or defined(nimdoc):
     var event = createEventStruct(events, fd)
     
     s.fds[fd].events = events
-    echo("About to update")
     if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
+      if OSLastError().cint == ENOENT:
+        # Socket has been closed. Epoll automatically removes disconnected
+        # sockets.
+        s.fds.del(fd)
+        osError("Socket has been disconnected")
+        
       OSError(OSLastError())
-    echo("finished updating")
     result = s.fds[fd]
   
   proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} =
     if not s.fds.hasKey(fd):
       raise newException(EInvalidValue, "File descriptor not found.")
     if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
-      OSError(OSLastError())
+      if osLastError().cint == ENOENT:
+        # Socket has been closed. Epoll automatically removes disconnected
+        # sockets so its already been removed.
+      else:
+        OSError(OSLastError())
     result = s.fds[fd]
     s.fds.del(fd)
 
@@ -92,21 +102,21 @@ when defined(linux) or defined(nimdoc):
     ## on the ``fd``.
     result = @[]
     
-    let evNum = epoll_wait(s.epollFD, s.events[0], 64.cint, timeout.cint)
+    let evNum = epoll_wait(s.epollFD, addr s.events[0], 64.cint, timeout.cint)
     if evNum < 0: OSError(OSLastError())
     if evNum == 0: return @[]
     for i in 0 .. <evNum:
       var evSet: set[TEvent] = {}
       if (s.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
       if (s.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite}
-      
       let selectorKey = s.fds[s.events[i].data.fd.TSocketHandle]
+      assert selectorKey != nil
       result.add((selectorKey, evSet))
   
   proc newSelector*(): PSelector =
     new result
     result.epollFD = epoll_create(64)
-    result.events = cast[array[64, ptr epoll_event]](alloc0(sizeof(epoll_event)*64))
+    result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
     result.fds = initTable[TSocketHandle, PSelectorKey]()
     if result.epollFD < 0:
       OSError(OSLastError())
@@ -247,4 +257,4 @@ when isMainModule:
   
   
   
-  
\ No newline at end of file
+  
diff --git a/lib/pure/sockets2.nim b/lib/pure/sockets2.nim
index 3542a0694..290f414b4 100644
--- a/lib/pure/sockets2.nim
+++ b/lib/pure/sockets2.nim
@@ -24,6 +24,10 @@ else:
 export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen,
   inet_ntoa, recv, `==`, connect, send, accept
 
+export
+  SO_ERROR,
+  SOL_SOCKET
+
 type
   
   TPort* = distinct uint16  ## port type
@@ -208,6 +212,24 @@ proc htons*(x: int16): int16 =
   ## order, this is a no-op; otherwise, it performs a 2-byte swap operation.
   result = sockets2.ntohs(x)
 
+proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {.
+  tags: [FReadIO].} = 
+  ## getsockopt for integer options.
+  var res: cint
+  var size = sizeof(res).TSocklen
+  if getsockopt(socket, cint(level), cint(optname), 
+                addr(res), addr(size)) < 0'i32:
+    osError(osLastError())
+  result = int(res)
+
+proc setSockOptInt*(socket: TSocketHandle, level, optname, optval: int) {.
+  tags: [FWriteIO].} =
+  ## setsockopt for integer options.
+  var value = cint(optval)
+  if setsockopt(socket, cint(level), cint(optname), addr(value),  
+                sizeof(value).TSocklen) < 0'i32:
+    osError(osLastError())
+
 when defined(Windows):
   var wsa: TWSADATA
   if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError())
diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim
index bde5bf8c8..9e5d270c3 100644
--- a/tests/async/tasyncawait.nim
+++ b/tests/async/tasyncawait.nim
@@ -15,16 +15,24 @@ const
 var clientCount = 0
 
 proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} =
+  echo("entering sendMessages")
   for i in 0 .. <messagesToSend:
-    discard await disp.send(client, "Message " & $i & "\c\L") 
+    discard await disp.send(client, "Message " & $i & "\c\L")
+  echo("returning sendMessages")
 
 proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} =
   for i in 0 .. <swarmSize:
     var sock = socket()
+    # TODO: We may need to explicitly register and unregister the fd.
+    # This is because when the socket is closed, selectors is not aware
+    # that it has been closed. While epoll is. Perhaps we should just unregister
+    # in close()?
+    echo(sock.cint)
     #disp.register(sock)
     discard await disp.connect(sock, "localhost", port)
     when true:
       discard await sendMessages(disp, sock)
+      echo("Calling close")
       sock.close()
     else:
       # Issue #932: https://github.com/Araq/Nimrod/issues/932
+0200 compiler almost free of deprecated expr/stmt names' href='/ahoang/Nim/commit/compiler/vmdeps.nim?h=devel&id=39ebe2175bd4e8e62d03875d06b24feafd36f8f7'>39ebe2175 ^
0356f53b5 ^

39ebe2175 ^
5f4e98bbc ^
39ebe2175 ^
c89397f82 ^
39ebe2175 ^
0356f53b5 ^




39ebe2175 ^
0356f53b5 ^


96a5062b8 ^
dc5e4b018 ^
96a5062b8 ^

752052e90 ^

5f4e98bbc ^





b47d9b7b9 ^

e6c5622aa ^
936265df7 ^
5f4e98bbc ^
a406ebbac ^
0356f53b5 ^






752052e90 ^


936265df7 ^
5f4e98bbc ^
752052e90 ^

5f4e98bbc ^
752052e90 ^
936265df7 ^
70ea45cdb ^
752052e90 ^
35d7a99b6 ^
d462cca21 ^
0356f53b5 ^



35d7a99b6 ^

70ea45cdb ^
0356f53b5 ^
d462cca21 ^
c16639402 ^
ec50dab57 ^

35d7a99b6 ^
ec50dab57 ^

35d7a99b6 ^

ec50dab57 ^
752052e90 ^
f9a3de984 ^
c89397f82 ^
0356f53b5 ^
c89397f82 ^
f9a3de984 ^
e8aa6f6c3 ^
5f4e98bbc ^
c89397f82 ^
dc5e4b018 ^
5f4e98bbc ^
dc5e4b018 ^
752052e90 ^
0356f53b5 ^


e8aa6f6c3 ^
752052e90 ^
0356f53b5 ^


e8aa6f6c3 ^

0356f53b5 ^

752052e90 ^
0356f53b5 ^








dc5e4b018 ^
752052e90 ^
936265df7 ^
3bffb3ba3 ^


0356f53b5 ^

e2567e2e0 ^

47335aab4 ^
e2567e2e0 ^


0755f902d ^
e2567e2e0 ^

0356f53b5 ^
5f4e98bbc ^

0356f53b5 ^




5f4e98bbc ^
0356f53b5 ^




5f4e98bbc ^
822d2b508 ^





e4081a720 ^

5f4e98bbc ^
e879da579 ^
0356f53b5 ^












5f4e98bbc ^

752052e90 ^
936265df7 ^
5f4e98bbc ^
752052e90 ^

5f4e98bbc ^
















5f4e98bbc ^
5f4e98bbc ^


0b0a3e5f2 ^





5f4e98bbc ^





fedc13698 ^
e879da579 ^
d462cca21 ^

5f4e98bbc ^
d462cca21 ^

5f4e98bbc ^
d462cca21 ^

fedc13698 ^
752052e90 ^

0356f53b5 ^
d462cca21 ^
c89397f82 ^

d462cca21 ^
0356f53b5 ^





1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297