MacOS.swift 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. //
  2. // MacOS.swift
  3. // Swifter
  4. //
  5. // Copyright © 2016 kolakowski. All rights reserved.
  6. //
  7. #if os(OSX) || os(iOS)
  8. import Foundation
  9. public class MacOSAsyncTCPServer: TcpServer {
  10. private var backlog = Dictionary<Int32, Array<(chunk: [UInt8], done: ((Void) -> TcpWriteDoneAction))>>()
  11. private var peers = Set<Int32>()
  12. private let kernelQueue: KernelQueue
  13. private let server: UInt
  14. public required init(_ port: in_port_t, forceIPv4: Bool, bindAddress: String? = nil) throws {
  15. self.kernelQueue = try KernelQueue()
  16. self.server = UInt(try MacOSAsyncTCPServer.nonBlockingSocketForListenening(port))
  17. self.kernelQueue.subscribe(server, .read)
  18. }
  19. public func write(_ socket: Int32, _ data: Array<UInt8>, _ done: @escaping ((Void) -> TcpWriteDoneAction)) throws {
  20. let result = Darwin.write(socket, data, data.count)
  21. if result == -1 {
  22. defer { self.finish(socket) }
  23. throw AsyncError.writeFailed(Process.error)
  24. }
  25. if result == data.count {
  26. if done() == .terminate {
  27. self.finish(socket)
  28. }
  29. return
  30. }
  31. self.backlog[socket]?.append(([UInt8](data[result..<data.count]), done))
  32. self.kernelQueue.resume(UInt(socket), .write)
  33. }
  34. public func wait(_ callback: ((TcpServerEvent) -> Void)) throws {
  35. try self.kernelQueue.wait { signal in
  36. switch signal.event {
  37. case .read:
  38. if signal.ident == self.server {
  39. let client = try MacOSAsyncTCPServer.acceptAndConfigureClientSocket(Int32(signal.ident))
  40. self.peers.insert(client)
  41. self.backlog[Int32(client)] = []
  42. kernelQueue.subscribe(UInt(client), .read)
  43. kernelQueue.subscribe(UInt(client), .write)
  44. kernelQueue.pause(UInt(client), .write)
  45. callback(.connect("", Int32(client)))
  46. } else {
  47. var chunk = [UInt8](repeating: 0, count: signal.data)
  48. let result = Darwin.read(Int32(signal.ident), &chunk, signal.data)
  49. if result <= 0 {
  50. finish(Int32(signal.ident))
  51. callback(.disconnect("", Int32(signal.ident)))
  52. } else {
  53. callback(.data("", Int32(signal.ident), chunk[0..<result]))
  54. }
  55. }
  56. case .write:
  57. while let backlogElement = self.backlog[Int32(signal.ident)]?.first {
  58. var chunk = backlogElement.chunk
  59. let result = Darwin.write(Int32(signal.ident), &chunk, min(chunk.count, signal.data))
  60. if result == -1 {
  61. finish(Int32(signal.ident))
  62. callback(.disconnect("", Int32(signal.ident)))
  63. return
  64. }
  65. if result < chunk.count {
  66. let leftData = [UInt8](chunk[result..<chunk.count])
  67. self.backlog[Int32(signal.ident)]?.remove(at: 0)
  68. self.backlog[Int32(signal.ident)]?.insert((chunk: leftData, done: backlogElement.done), at: 0)
  69. return
  70. }
  71. self.backlog[Int32(signal.ident)]?.removeFirst()
  72. if backlogElement.done() == .terminate {
  73. self.finish(Int32(signal.ident))
  74. callback(.disconnect("", Int32(signal.ident)))
  75. return
  76. }
  77. }
  78. self.kernelQueue.pause(signal.ident, .write)
  79. case .error:
  80. if signal.ident == self.server {
  81. throw AsyncError.async(Process.error)
  82. } else {
  83. self.finish(Int32(signal.ident))
  84. callback(.disconnect("", Int32(signal.ident)))
  85. }
  86. }
  87. }
  88. }
  89. deinit {
  90. closeAllOpenedSockets()
  91. }
  92. public func finish(_ socket: Int32) {
  93. self.backlog[socket] = []
  94. self.peers.remove(socket)
  95. let _ = Darwin.close(socket)
  96. }
  97. public func closeAllOpenedSockets() {
  98. for client in self.peers {
  99. let _ = Darwin.close(client)
  100. }
  101. self.peers.removeAll(keepingCapacity: true)
  102. let _ = Darwin.close(Int32(server))
  103. }
  104. public static func nonBlockingSocketForListenening(_ port: in_port_t = 8080) throws -> Int32 {
  105. let server = Darwin.socket(AF_INET, SOCK_STREAM, 0)
  106. guard server != -1 else {
  107. throw AsyncError.socketCreation(Process.error)
  108. }
  109. var value: Int32 = 1
  110. if Darwin.setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &value, socklen_t(MemoryLayout<Int32>.size)) == -1 {
  111. defer { let _ = Darwin.close(server) }
  112. throw AsyncError.setReuseAddrFailed(Process.error)
  113. }
  114. try setSocketNonBlocking(server)
  115. try setSocketNoSigPipe(server)
  116. var addr = anyAddrForPort(port)
  117. if withUnsafePointer(to: &addr, { Darwin.bind(server, UnsafePointer<sockaddr>(OpaquePointer($0)), socklen_t(MemoryLayout<sockaddr_in>.size)) }) == -1 {
  118. defer { let _ = Darwin.close(server) }
  119. throw AsyncError.bindFailed(Process.error)
  120. }
  121. if Darwin.listen(server, SOMAXCONN) == -1 {
  122. defer { let _ = Darwin.close(server) }
  123. throw AsyncError.listenFailed(Process.error)
  124. }
  125. return server
  126. }
  127. public static func acceptAndConfigureClientSocket(_ socket: Int32) throws -> Int32 {
  128. guard case let client = Darwin.accept(socket, nil, nil), client != -1 else {
  129. throw AsyncError.acceptFailed(Process.error)
  130. }
  131. try self.setSocketNonBlocking(client)
  132. try self.setSocketNoSigPipe(client)
  133. return client
  134. }
  135. public static func anyAddrForPort(_ port: in_port_t) -> sockaddr_in {
  136. var addr = sockaddr_in()
  137. addr.sin_len = __uint8_t(MemoryLayout<sockaddr_in>.size)
  138. addr.sin_family = sa_family_t(AF_INET)
  139. addr.sin_port = port.bigEndian
  140. addr.sin_addr = in_addr(s_addr: in_addr_t(0))
  141. addr.sin_zero = (0, 0, 0, 0, 0, 0, 0, 0)
  142. return addr
  143. }
  144. public static func setSocketNonBlocking(_ socket: Int32) throws {
  145. if Darwin.fcntl(socket, F_SETFL, Darwin.fcntl(socket, F_GETFL, 0) | O_NONBLOCK) == -1 {
  146. throw AsyncError.setNonBlockFailed(Process.error)
  147. }
  148. }
  149. public static func setSocketNoSigPipe(_ socket: Int32) throws {
  150. var value = 1
  151. if Darwin.setsockopt(socket, SOL_SOCKET, SO_NOSIGPIPE, &value, socklen_t(MemoryLayout<Int32>.size)) == -1 {
  152. throw AsyncError.setNoSigPipeFailed(Process.error)
  153. }
  154. }
  155. }
  156. public class KernelQueue {
  157. private var events = Array<kevent>(repeating: kevent(), count: 256)
  158. private var changes = Array<kevent>()
  159. private let queue: Int32
  160. public enum Subscription { case read, write }
  161. public enum Event { case read, write, error }
  162. public init() throws {
  163. guard case let queue = kqueue(), queue != -1 else {
  164. throw AsyncError.async(Process.error)
  165. }
  166. self.queue = queue
  167. }
  168. public func subscribe(_ ident: UInt, _ event: Subscription) {
  169. switch event {
  170. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_ADD) | UInt16(EV_ENABLE)))
  171. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_ADD) | UInt16(EV_ENABLE)))
  172. }
  173. }
  174. public func unsubscribe(_ ident: UInt, _ event: Subscription) {
  175. switch event {
  176. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_DELETE)))
  177. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_DELETE)))
  178. }
  179. }
  180. public func pause(_ ident: UInt, _ event: Subscription) {
  181. switch event {
  182. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_DISABLE)))
  183. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_DISABLE)))
  184. }
  185. }
  186. public func resume(_ ident: UInt, _ event: Subscription) {
  187. switch event {
  188. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_ENABLE)))
  189. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_ENABLE)))
  190. }
  191. }
  192. private func event(_ ident: UInt, _ filter: Int16, _ flags: UInt16) -> kevent {
  193. return kevent(ident: ident, filter: filter, flags: flags, fflags: 0, data: 0, udata: nil)
  194. }
  195. public func wait(_ callback: (_ tuple: (event: Event, ident: UInt, data: Int)) throws -> (Void)) throws {
  196. if !changes.isEmpty {
  197. if kevent(self.queue, &changes, Int32(changes.count), nil, 0, nil) == -1 {
  198. throw AsyncError.async(Process.error)
  199. }
  200. }
  201. self.changes.removeAll(keepingCapacity: true)
  202. guard case let count = kevent(self.queue, nil, 0, &events, Int32(events.count), nil), count != -1 else {
  203. throw AsyncError.async(Process.error)
  204. }
  205. for event in events[0..<Int(count)] {
  206. if Int32(event.flags) & EV_EOF != 0 || Int32(event.flags) & EV_ERROR != 0 {
  207. try callback((.error, event.ident, 0))
  208. continue
  209. }
  210. if Int32(event.filter) == EVFILT_READ {
  211. try callback((.read, event.ident, event.data))
  212. continue
  213. }
  214. if Int32(event.filter) == EVFILT_WRITE {
  215. try callback((.write, event.ident, event.data))
  216. continue
  217. }
  218. }
  219. }
  220. }
  221. #endif