1
0

MacOS.swift 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. //
  2. // MacOS.swift
  3. // Swifter
  4. //
  5. // Copyright © 2016 kolakowski. All rights reserved.
  6. //
  7. #if os(OSX) || os(iOS)
  8. import Foundation
  9. public class MacOSIO: IO {
  10. private var backlog = Dictionary<Int32, Array<(chunk: [UInt8], done: ((Void) -> IODoneAction))>>()
  11. private var peers = Set<Int32>()
  12. private let kernelQueue: KernelQueue
  13. private let server: UInt
  14. public required init(_ port: in_port_t, forceIPv4: Bool, bindAddress: String? = nil) throws {
  15. self.kernelQueue = try KernelQueue()
  16. self.server = UInt(
  17. try MacOSIO.nonBlockingSocketForListenening(port, forceIPv4: forceIPv4, address: bindAddress)
  18. )
  19. self.kernelQueue.subscribe(server, .read)
  20. }
  21. public func write(_ socket: Int32, _ data: Array<UInt8>, _ done: @escaping ((Void) -> IODoneAction)) throws {
  22. let result = Darwin.write(socket, data, data.count)
  23. if result == -1 {
  24. defer { self.finish(socket) }
  25. throw SwifterError.writeFailed(Process.error)
  26. }
  27. if result == data.count {
  28. if done() == .terminate {
  29. self.finish(socket)
  30. }
  31. return
  32. }
  33. self.backlog[socket]?.append(([UInt8](data[result..<data.count]), done))
  34. self.kernelQueue.resume(UInt(socket), .write)
  35. }
  36. public func wait(_ callback: ((IOEvent) -> Void)) throws {
  37. try self.kernelQueue.wait { signal in
  38. switch signal.event {
  39. case .read:
  40. if signal.ident == self.server {
  41. let client = try MacOSIO.acceptAndConfigureClientSocket(Int32(signal.ident))
  42. self.peers.insert(client)
  43. self.backlog[Int32(client)] = []
  44. kernelQueue.subscribe(UInt(client), .read)
  45. kernelQueue.subscribe(UInt(client), .write)
  46. kernelQueue.pause(UInt(client), .write)
  47. callback(.connect("", Int32(client)))
  48. } else {
  49. var chunk = [UInt8](repeating: 0, count: signal.data)
  50. let result = Darwin.read(Int32(signal.ident), &chunk, signal.data)
  51. if result <= 0 {
  52. finish(Int32(signal.ident))
  53. callback(.disconnect("", Int32(signal.ident)))
  54. } else {
  55. callback(.data("", Int32(signal.ident), chunk[0..<result]))
  56. }
  57. }
  58. case .write:
  59. while let backlogElement = self.backlog[Int32(signal.ident)]?.first {
  60. var chunk = backlogElement.chunk
  61. let result = Darwin.write(Int32(signal.ident), &chunk, min(chunk.count, signal.data))
  62. if result == -1 {
  63. finish(Int32(signal.ident))
  64. callback(.disconnect("", Int32(signal.ident)))
  65. return
  66. }
  67. if result < chunk.count {
  68. let leftData = [UInt8](chunk[result..<chunk.count])
  69. self.backlog[Int32(signal.ident)]?.remove(at: 0)
  70. self.backlog[Int32(signal.ident)]?.insert((chunk: leftData, done: backlogElement.done), at: 0)
  71. return
  72. }
  73. self.backlog[Int32(signal.ident)]?.removeFirst()
  74. if backlogElement.done() == .terminate {
  75. self.finish(Int32(signal.ident))
  76. callback(.disconnect("", Int32(signal.ident)))
  77. return
  78. }
  79. }
  80. self.kernelQueue.pause(signal.ident, .write)
  81. case .error:
  82. if signal.ident == self.server {
  83. throw SwifterError.async(Process.error)
  84. } else {
  85. self.finish(Int32(signal.ident))
  86. callback(.disconnect("", Int32(signal.ident)))
  87. }
  88. }
  89. }
  90. }
  91. deinit {
  92. closeAllOpenedSockets()
  93. }
  94. public func finish(_ socket: Int32) {
  95. self.backlog[socket] = []
  96. self.peers.remove(socket)
  97. let _ = Darwin.close(socket)
  98. }
  99. public func closeAllOpenedSockets() {
  100. for client in self.peers {
  101. let _ = Darwin.close(client)
  102. }
  103. self.peers.removeAll(keepingCapacity: true)
  104. let _ = Darwin.close(Int32(server))
  105. }
  106. public static func nonBlockingSocketForListenening(_ port: in_port_t = 8080, forceIPv4: Bool = false, address: String? = nil) throws -> Int32 {
  107. let server = Darwin.socket(forceIPv4 ? AF_INET : AF_INET6, SOCK_STREAM, 0)
  108. guard server != -1 else {
  109. throw SwifterError.socketCreation(Process.error)
  110. }
  111. var value: Int32 = 1
  112. if Darwin.setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &value, socklen_t(MemoryLayout<Int32>.size)) == -1 {
  113. defer { let _ = Darwin.close(server) }
  114. throw SwifterError.setReuseAddrFailed(Process.error)
  115. }
  116. do {
  117. try setSocketNonBlocking(server)
  118. try setSocketNoSigPipe(server)
  119. if forceIPv4 {
  120. try bind(toSocket: server, port: port, andIPv4Address: address)
  121. } else {
  122. try bind(toSocket: server, port: port, andAddress: address)
  123. }
  124. } catch {
  125. let _ = Darwin.close(server)
  126. throw error
  127. }
  128. if Darwin.listen(server, SOMAXCONN) == -1 {
  129. defer { let _ = Darwin.close(server) }
  130. throw SwifterError.listenFailed(Process.error)
  131. }
  132. return server
  133. }
  134. public static func acceptAndConfigureClientSocket(_ socket: Int32) throws -> Int32 {
  135. guard case let client = Darwin.accept(socket, nil, nil), client != -1 else {
  136. throw SwifterError.acceptFailed(Process.error)
  137. }
  138. try self.setSocketNonBlocking(client)
  139. try self.setSocketNoSigPipe(client)
  140. return client
  141. }
  142. public static func bind(toSocket socket: Int32, port: in_port_t, andIPv4Address address: String? = nil) throws {
  143. var addr = sockaddr_in()
  144. addr.sin_len = __uint8_t(MemoryLayout<sockaddr_in>.size)
  145. addr.sin_family = sa_family_t(AF_INET)
  146. addr.sin_port = port.bigEndian
  147. if let addressFound = address {
  148. guard addressFound.withCString({ inet_pton(AF_INET, $0, &addr.sin_addr) }) == 1 else {
  149. throw SwifterError.inetPtonFailed(Errno.description())
  150. }
  151. } else {
  152. addr.sin_addr = in_addr(s_addr: in_addr_t(0))
  153. }
  154. addr.sin_zero = (0, 0, 0, 0, 0, 0, 0, 0)
  155. let bindResult = withUnsafePointer(to: &addr) {
  156. Darwin.bind(socket, UnsafePointer<sockaddr>(OpaquePointer($0)), socklen_t(MemoryLayout<sockaddr_in>.size))
  157. }
  158. guard bindResult != -1 else {
  159. throw SwifterError.bindFailed(Errno.description())
  160. }
  161. }
  162. public static func bind(toSocket socket: Int32, port: in_port_t, andAddress address: String? = nil) throws {
  163. var addr = sockaddr_in6()
  164. addr.sin6_len = __uint8_t(MemoryLayout<sockaddr_in6>.size)
  165. addr.sin6_family = sa_family_t(AF_INET6)
  166. addr.sin6_port = port.bigEndian
  167. if let addressFound = address {
  168. guard addressFound.withCString({ inet_pton(AF_INET6, $0, &addr.sin6_addr) }) == 1 else {
  169. throw SwifterError.inetPtonFailed(Errno.description())
  170. }
  171. } else {
  172. addr.sin6_addr = in6addr_any
  173. }
  174. addr.sin6_scope_id = 0
  175. let bindResult = withUnsafePointer(to: &addr) {
  176. Darwin.bind(socket, UnsafePointer<sockaddr>(OpaquePointer($0)), socklen_t(MemoryLayout<sockaddr_in6>.size))
  177. }
  178. guard bindResult != -1 else {
  179. throw SwifterError.bindFailed(Errno.description())
  180. }
  181. }
  182. public static func setSocketNonBlocking(_ socket: Int32) throws {
  183. if Darwin.fcntl(socket, F_SETFL, Darwin.fcntl(socket, F_GETFL, 0) | O_NONBLOCK) == -1 {
  184. throw SwifterError.setNonBlockFailed(Process.error)
  185. }
  186. }
  187. public static func setSocketNoSigPipe(_ socket: Int32) throws {
  188. var value = 1
  189. if Darwin.setsockopt(socket, SOL_SOCKET, SO_NOSIGPIPE, &value, socklen_t(MemoryLayout<Int32>.size)) == -1 {
  190. throw SwifterError.setNoSigPipeFailed(Process.error)
  191. }
  192. }
  193. }
  194. public class KernelQueue {
  195. private var events = Array<kevent>(repeating: kevent(), count: 256)
  196. private var changes = Array<kevent>()
  197. private let queue: Int32
  198. public enum Subscription { case read, write }
  199. public enum Event { case read, write, error }
  200. public init() throws {
  201. guard case let queue = kqueue(), queue != -1 else {
  202. throw SwifterError.async(Process.error)
  203. }
  204. self.queue = queue
  205. }
  206. public func subscribe(_ ident: UInt, _ event: Subscription) {
  207. switch event {
  208. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_ADD) | UInt16(EV_ENABLE)))
  209. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_ADD) | UInt16(EV_ENABLE)))
  210. }
  211. }
  212. public func unsubscribe(_ ident: UInt, _ event: Subscription) {
  213. switch event {
  214. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_DELETE)))
  215. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_DELETE)))
  216. }
  217. }
  218. public func pause(_ ident: UInt, _ event: Subscription) {
  219. switch event {
  220. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_DISABLE)))
  221. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_DISABLE)))
  222. }
  223. }
  224. public func resume(_ ident: UInt, _ event: Subscription) {
  225. switch event {
  226. case .read : changes.append(self.event(UInt(ident), Int16(EVFILT_READ), UInt16(EV_ENABLE)))
  227. case .write : changes.append(self.event(UInt(ident), Int16(EVFILT_WRITE), UInt16(EV_ENABLE)))
  228. }
  229. }
  230. private func event(_ ident: UInt, _ filter: Int16, _ flags: UInt16) -> kevent {
  231. return kevent(ident: ident, filter: filter, flags: flags, fflags: 0, data: 0, udata: nil)
  232. }
  233. public func wait(_ callback: (_ tuple: (event: Event, ident: UInt, data: Int)) throws -> (Void)) throws {
  234. if !changes.isEmpty {
  235. if kevent(self.queue, &changes, Int32(changes.count), nil, 0, nil) == -1 {
  236. throw SwifterError.async(Process.error)
  237. }
  238. }
  239. self.changes.removeAll(keepingCapacity: true)
  240. guard case let count = kevent(self.queue, nil, 0, &events, Int32(events.count), nil), count != -1 else {
  241. throw SwifterError.async(Process.error)
  242. }
  243. for event in events[0..<Int(count)] {
  244. if Int32(event.flags) & EV_EOF != 0 || Int32(event.flags) & EV_ERROR != 0 {
  245. try callback((.error, event.ident, 0))
  246. continue
  247. }
  248. if Int32(event.filter) == EVFILT_READ {
  249. try callback((.read, event.ident, event.data))
  250. continue
  251. }
  252. if Int32(event.filter) == EVFILT_WRITE {
  253. try callback((.write, event.ident, event.data))
  254. continue
  255. }
  256. }
  257. }
  258. }
  259. #endif