Linux.swift 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. //
  2. // Linux.swift
  3. // Swifter
  4. //
  5. // Copyright © 2016 kolakowski. All rights reserved.
  6. //
  7. #if os(Linux)
  8. import Glibc
  9. public class LinuxAsyncServer: TcpServer {
  10. private var backlog = [Int32: Array<(chunk: [UInt8], done: ((Void) -> TcpWriteDoneAction))>]()
  11. private var descriptors = [pollfd]()
  12. private let server: Int32
  13. public required init(_ port: in_port_t) throws {
  14. self.server = try LinuxAsyncServer.nonBlockingSocketForListenening(port)
  15. self.descriptors.append(pollfd(fd: self.server, events: Int16(POLLIN), revents: 0))
  16. }
  17. deinit {
  18. cleanup()
  19. }
  20. public func write(_ socket: Int32, _ data: Array<UInt8>, _ done: @escaping ((Void) -> TcpWriteDoneAction)) throws {
  21. let result = Glibc.write(socket, data, data.count)
  22. if result == -1 {
  23. defer { self.finish(socket) }
  24. throw AsyncError.writeFailed(Process.error)
  25. }
  26. if result == data.count {
  27. if done() == .terminate {
  28. self.finish(socket)
  29. }
  30. return
  31. }
  32. let leftData = [UInt8](data[result..<data.count])
  33. for i in 0..<descriptors.count {
  34. if descriptors[i].fd == socket {
  35. self.backlog[socket]?.append((leftData, done))
  36. descriptors[i].events = descriptors[i].events | Int16(POLLOUT)
  37. return
  38. }
  39. }
  40. }
  41. public func wait(_ callback: ((TcpServerEvent) -> Void)) throws {
  42. guard poll(&descriptors, UInt(descriptors.count), -1) != -1 else {
  43. throw AsyncError.async(Process.error)
  44. }
  45. for i in 0..<descriptors.count {
  46. if descriptors[i].revents == 0 {
  47. continue
  48. }
  49. if descriptors[i].fd == server {
  50. while case let client = accept(server, nil, nil), client > 0 {
  51. try LinuxAsyncServer.setSocketNonBlocking(client)
  52. self.backlog[Int32(client)] = []
  53. descriptors.append(pollfd(fd: client, events: Int16(POLLIN), revents: 0))
  54. callback(TcpServerEvent.connect("", Int32(client)))
  55. }
  56. if errno != EWOULDBLOCK { throw AsyncError.acceptFailed(Process.error) }
  57. } else {
  58. if (descriptors[i].revents & Int16(POLLERR) != 0) || (descriptors[i].revents & Int16(POLLHUP) != 0) || (descriptors[i].revents & Int16(POLLNVAL) != 0) {
  59. self.finish(descriptors[i].fd)
  60. callback(TcpServerEvent.disconnect("", descriptors[i].fd))
  61. descriptors[i].fd = -1
  62. continue
  63. }
  64. if descriptors[i].revents & Int16(POLLIN) != 0 {
  65. var buffer = [UInt8](repeating: 0, count: 256)
  66. readLoop: while true {
  67. let result = read(descriptors[i].fd, &buffer, 256)
  68. switch result {
  69. case -1:
  70. if errno != EWOULDBLOCK {
  71. callback(TcpServerEvent.disconnect("", descriptors[i].fd))
  72. self.finish(descriptors[i].fd)
  73. descriptors[i].fd = -1
  74. }
  75. break readLoop
  76. case 0:
  77. callback(TcpServerEvent.disconnect("", descriptors[i].fd))
  78. self.finish(descriptors[i].fd)
  79. descriptors[i].fd = -1
  80. break readLoop
  81. default:
  82. callback(TcpServerEvent.data("", descriptors[i].fd, buffer[0..<result]))
  83. }
  84. }
  85. }
  86. if descriptors[i].revents & Int16(POLLOUT) != 0 {
  87. while let backlogElement = self.backlog[Int32(descriptors[i].fd)]?.first {
  88. var chunk = backlogElement.chunk
  89. let result = Glibc.write(Int32(descriptors[i].fd), chunk, chunk.count)
  90. if result == -1 {
  91. finish(Int32(descriptors[i].fd))
  92. callback(TcpServerEvent.disconnect("", Int32(descriptors[i].fd)))
  93. descriptors[i].fd = -1
  94. return
  95. }
  96. if result < chunk.count {
  97. let leftData = [UInt8](chunk[result..<chunk.count])
  98. self.backlog[Int32(descriptors[i].fd)]?.remove(at: 0)
  99. self.backlog[Int32(descriptors[i].fd)]?.insert((chunk: leftData, done: backlogElement.done), at: 0)
  100. return
  101. }
  102. self.backlog[Int32(descriptors[i].fd)]?.removeFirst()
  103. if backlogElement.done() == .terminate {
  104. finish(Int32(descriptors[i].fd))
  105. callback(TcpServerEvent.disconnect("", Int32(descriptors[i].fd)))
  106. descriptors[i].fd = -1
  107. return
  108. }
  109. }
  110. descriptors[i].events = descriptors[i].events & (~Int16(POLLOUT))
  111. }
  112. }
  113. }
  114. for i in (0..<descriptors.count).reversed() {
  115. if descriptors[i].fd == -1 {
  116. descriptors.remove(at: i)
  117. }
  118. }
  119. }
  120. public func finish(_ socket: Int32) {
  121. self.backlog[socket] = []
  122. let _ = Glibc.close(socket)
  123. }
  124. public func cleanup() {
  125. for client in self.descriptors {
  126. let _ = Glibc.close(client.fd)
  127. }
  128. let _ = Glibc.close(Int32(server))
  129. }
  130. public static func nonBlockingSocketForListenening(_ port: in_port_t = 8080) throws -> Int32 {
  131. let server = Glibc.socket(AF_INET, Int32(SOCK_STREAM.rawValue), 0)
  132. guard server != -1 else {
  133. throw AsyncError.socketCreation(Process.error)
  134. }
  135. var value: Int32 = 1
  136. if Glibc.setsockopt(server, SOL_SOCKET, SO_REUSEADDR, &value, socklen_t(MemoryLayout<Int32>.size)) == -1 {
  137. defer { let _ = Glibc.close(server) }
  138. throw AsyncError.setReuseAddrFailed(Process.error)
  139. }
  140. if Glibc.fcntl(server, F_SETFL, fcntl(server, F_GETFL, 0) | O_NONBLOCK) == -1 {
  141. defer { let _ = Glibc.close(server) }
  142. throw AsyncError.setNonBlockFailed(Process.error)
  143. }
  144. var addr = anyAddrForPort(port)
  145. if withUnsafePointer(to: &addr, { bind(server, UnsafePointer<sockaddr>(OpaquePointer($0)), socklen_t(MemoryLayout<sockaddr_in>.size)) }) == -1 {
  146. defer { let _ = Glibc.close(server) }
  147. throw AsyncError.bindFailed(Process.error)
  148. }
  149. if Glibc.listen(server, SOMAXCONN) == -1 {
  150. defer { let _ = Glibc.close(server) }
  151. throw AsyncError.listenFailed(Process.error)
  152. }
  153. return server
  154. }
  155. public static func anyAddrForPort(_ port: in_port_t) -> sockaddr_in {
  156. var addr = sockaddr_in()
  157. addr.sin_family = sa_family_t(AF_INET)
  158. addr.sin_port = port.bigEndian
  159. addr.sin_addr = in_addr(s_addr: in_addr_t(0))
  160. addr.sin_zero = (0, 0, 0, 0, 0, 0, 0, 0)
  161. return addr
  162. }
  163. public static func setSocketNonBlocking(_ socket: Int32) throws {
  164. if Glibc.fcntl(socket, F_SETFL, fcntl(socket, F_GETFL, 0) | O_NONBLOCK) == -1 {
  165. throw AsyncError.setNonBlockFailed(Process.error)
  166. }
  167. }
  168. }
  169. #endif