From 2e98065ba70c9b47524f2b0ae580b7a35727f8a9 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 22 Aug 2024 22:52:30 +0330 Subject: [PATCH 1/5] Expose query `metadata` in `PostgresRowSequence` --- Sources/PostgresNIO/New/PSQLRowStream.swift | 4 +- .../PostgresNIO/New/PostgresRowSequence.swift | 22 ++++++++--- Tests/IntegrationTests/AsyncTests.swift | 37 +++++++++++++++---- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/Sources/PostgresNIO/New/PSQLRowStream.swift b/Sources/PostgresNIO/New/PSQLRowStream.swift index ee925d0e..6cbbf023 100644 --- a/Sources/PostgresNIO/New/PSQLRowStream.swift +++ b/Sources/PostgresNIO/New/PSQLRowStream.swift @@ -44,7 +44,7 @@ final class PSQLRowStream: @unchecked Sendable { } internal let rowDescription: [RowDescription.Column] - private let lookupTable: [String: Int] + internal let lookupTable: [String: Int] private var downstreamState: DownstreamState init( @@ -114,7 +114,7 @@ final class PSQLRowStream: @unchecked Sendable { self.downstreamState = .consumed(.failure(error)) } - return PostgresRowSequence(producer.sequence, lookupTable: self.lookupTable, columns: self.rowDescription) + return PostgresRowSequence(producer.sequence, stream: self) } func demand() { diff --git a/Sources/PostgresNIO/New/PostgresRowSequence.swift b/Sources/PostgresNIO/New/PostgresRowSequence.swift index 3936b51e..cd9fd75b 100644 --- a/Sources/PostgresNIO/New/PostgresRowSequence.swift +++ b/Sources/PostgresNIO/New/PostgresRowSequence.swift @@ -9,14 +9,18 @@ public struct PostgresRowSequence: AsyncSequence, Sendable { typealias BackingSequence = NIOThrowingAsyncSequenceProducer - let backing: BackingSequence - let lookupTable: [String: Int] - let columns: [RowDescription.Column] + private let backing: BackingSequence + private let stream: PSQLRowStream + var lookupTable: [String: Int] { + self.stream.lookupTable + } + var columns: [RowDescription.Column] { + self.stream.rowDescription + } - init(_ backing: BackingSequence, lookupTable: [String: Int], columns: [RowDescription.Column]) { + init(_ backing: BackingSequence, stream: PSQLRowStream) { self.backing = backing - self.lookupTable = lookupTable - self.columns = columns + self.stream = stream } public func makeAsyncIterator() -> AsyncIterator { @@ -67,6 +71,12 @@ extension PostgresRowSequence { } return result } + + public func collectWithMetadata() async throws -> (metadata: PostgresQueryMetadata?, rows: [PostgresRow]) { + let rows = try await self.collect() + let metadata = PostgresQueryMetadata(string: self.stream.commandTag) + return (metadata, rows) + } } struct AdaptiveRowBuffer: NIOAsyncSequenceProducerBackPressureStrategy { diff --git a/Tests/IntegrationTests/AsyncTests.swift b/Tests/IntegrationTests/AsyncTests.swift index 513157fd..4260a02d 100644 --- a/Tests/IntegrationTests/AsyncTests.swift +++ b/Tests/IntegrationTests/AsyncTests.swift @@ -46,6 +46,31 @@ final class AsyncPostgresConnectionTests: XCTestCase { } } + func testSelect10kRowsAndCollect() async throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let eventLoop = eventLoopGroup.next() + + let start = 1 + let end = 10000 + + try await withTestConnection(on: eventLoop) { connection in + let rows = try await connection.query("SELECT generate_series(\(start), \(end));", logger: .psqlTest) + let (metadata, elements) = try await rows.collectWithMetadata() + var counter = 0 + for row in elements { + let element = try row.decode(Int.self) + XCTAssertEqual(element, counter + 1) + counter += 1 + } + XCTAssertEqual(metadata?.command, "SELECT") + XCTAssertEqual(metadata?.oid, nil) + XCTAssertEqual(metadata?.rows, 10000) + + XCTAssertEqual(counter, end) + } + } + func testSelectActiveConnection() async throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } @@ -207,7 +232,7 @@ final class AsyncPostgresConnectionTests: XCTestCase { try await withTestConnection(on: eventLoop) { connection in // Max binds limit is UInt16.max which is 65535 which is 3 * 5 * 17 * 257 - // Max columns limit is 1664, so we will only make 5 * 257 columns which is less + // Max columns limit appears to be ~1600, so we will only make 5 * 257 columns which is less // Then we will insert 3 * 17 rows // In the insertion, there will be a total of 3 * 17 * 5 * 257 == UInt16.max bindings // If the test is successful, it means Postgres supports UInt16.max bindings @@ -241,13 +266,9 @@ final class AsyncPostgresConnectionTests: XCTestCase { unsafeSQL: "INSERT INTO table1 VALUES \(insertionValues)", binds: binds ) - try await connection.query(insertionQuery, logger: .psqlTest) - - let countQuery = PostgresQuery(unsafeSQL: "SELECT COUNT(*) FROM table1") - let countRows = try await connection.query(countQuery, logger: .psqlTest) - var countIterator = countRows.makeAsyncIterator() - let insertedRowsCount = try await countIterator.next()?.decode(Int.self, context: .default) - XCTAssertEqual(rowsCount, insertedRowsCount) + let result = try await connection.query(insertionQuery, logger: .psqlTest) + let metadata = try await result.collectWithMetadata().metadata + XCTAssertEqual(metadata?.rows, rowsCount) let dropQuery = PostgresQuery(unsafeSQL: "DROP TABLE table1") try await connection.query(dropQuery, logger: .psqlTest) From bedc86a75ea465bec719aad948697cd72a7998ae Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Fri, 23 Aug 2024 12:37:49 +0330 Subject: [PATCH 2/5] refinements --- Sources/PostgresNIO/New/PSQLRowStream.swift | 2 +- .../PostgresNIO/New/PostgresRowSequence.swift | 16 +++++++++------- Tests/IntegrationTests/AsyncTests.swift | 8 ++++---- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/Sources/PostgresNIO/New/PSQLRowStream.swift b/Sources/PostgresNIO/New/PSQLRowStream.swift index 6cbbf023..a7082a90 100644 --- a/Sources/PostgresNIO/New/PSQLRowStream.swift +++ b/Sources/PostgresNIO/New/PSQLRowStream.swift @@ -114,7 +114,7 @@ final class PSQLRowStream: @unchecked Sendable { self.downstreamState = .consumed(.failure(error)) } - return PostgresRowSequence(producer.sequence, stream: self) + return PostgresRowSequence(producer.sequence, rowStream: self) } func demand() { diff --git a/Sources/PostgresNIO/New/PostgresRowSequence.swift b/Sources/PostgresNIO/New/PostgresRowSequence.swift index cd9fd75b..2c3829a8 100644 --- a/Sources/PostgresNIO/New/PostgresRowSequence.swift +++ b/Sources/PostgresNIO/New/PostgresRowSequence.swift @@ -10,17 +10,17 @@ public struct PostgresRowSequence: AsyncSequence, Sendable { typealias BackingSequence = NIOThrowingAsyncSequenceProducer private let backing: BackingSequence - private let stream: PSQLRowStream + private let rowStream: PSQLRowStream var lookupTable: [String: Int] { - self.stream.lookupTable + self.rowStream.lookupTable } var columns: [RowDescription.Column] { - self.stream.rowDescription + self.rowStream.rowDescription } - init(_ backing: BackingSequence, stream: PSQLRowStream) { + init(_ backing: BackingSequence, rowStream: PSQLRowStream) { self.backing = backing - self.stream = stream + self.rowStream = rowStream } public func makeAsyncIterator() -> AsyncIterator { @@ -72,9 +72,11 @@ extension PostgresRowSequence { return result } - public func collectWithMetadata() async throws -> (metadata: PostgresQueryMetadata?, rows: [PostgresRow]) { + public func collectWithMetadata() async throws -> (metadata: PostgresQueryMetadata, rows: [PostgresRow]) { let rows = try await self.collect() - let metadata = PostgresQueryMetadata(string: self.stream.commandTag) + guard let metadata = PostgresQueryMetadata(string: self.rowStream.commandTag) else { + throw PSQLError.invalidCommandTag(self.rowStream.commandTag) + } return (metadata, rows) } } diff --git a/Tests/IntegrationTests/AsyncTests.swift b/Tests/IntegrationTests/AsyncTests.swift index 4260a02d..ee2a6f1f 100644 --- a/Tests/IntegrationTests/AsyncTests.swift +++ b/Tests/IntegrationTests/AsyncTests.swift @@ -63,9 +63,9 @@ final class AsyncPostgresConnectionTests: XCTestCase { XCTAssertEqual(element, counter + 1) counter += 1 } - XCTAssertEqual(metadata?.command, "SELECT") - XCTAssertEqual(metadata?.oid, nil) - XCTAssertEqual(metadata?.rows, 10000) + XCTAssertEqual(metadata.command, "SELECT") + XCTAssertEqual(metadata.oid, nil) + XCTAssertEqual(metadata.rows, 10000) XCTAssertEqual(counter, end) } @@ -268,7 +268,7 @@ final class AsyncPostgresConnectionTests: XCTestCase { ) let result = try await connection.query(insertionQuery, logger: .psqlTest) let metadata = try await result.collectWithMetadata().metadata - XCTAssertEqual(metadata?.rows, rowsCount) + XCTAssertEqual(metadata.rows, rowsCount) let dropQuery = PostgresQuery(unsafeSQL: "DROP TABLE table1") try await connection.query(dropQuery, logger: .psqlTest) From a6dc89424116b5b8c29c6706a2cf09a5ad9c9841 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Fri, 23 Aug 2024 13:35:38 +0330 Subject: [PATCH 3/5] add a consume functions + better docs --- .../PostgresNIO/New/PostgresRowSequence.swift | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Sources/PostgresNIO/New/PostgresRowSequence.swift b/Sources/PostgresNIO/New/PostgresRowSequence.swift index 2c3829a8..c3ea0c68 100644 --- a/Sources/PostgresNIO/New/PostgresRowSequence.swift +++ b/Sources/PostgresNIO/New/PostgresRowSequence.swift @@ -64,6 +64,8 @@ extension PostgresRowSequence { extension PostgresRowSequence.AsyncIterator: Sendable {} extension PostgresRowSequence { + /// Collect and return all rows. + /// - Returns: The rows. public func collect() async throws -> [PostgresRow] { var result = [PostgresRow]() for try await row in self { @@ -72,6 +74,8 @@ extension PostgresRowSequence { return result } + /// Collect and return all rows, alongside the query metadata. + /// - Returns: The query metadata and the rows. public func collectWithMetadata() async throws -> (metadata: PostgresQueryMetadata, rows: [PostgresRow]) { let rows = try await self.collect() guard let metadata = PostgresQueryMetadata(string: self.rowStream.commandTag) else { @@ -79,6 +83,21 @@ extension PostgresRowSequence { } return (metadata, rows) } + + /// Consumes all rows and returns the query metadata. + /// - Parameter onRow: Processes each row. + /// - Returns: The query metadata. + public func consume( + onRow: @Sendable (PostgresRow) throws -> () + ) async throws -> PostgresQueryMetadata { + for try await row in self { + try onRow(row) + } + guard let metadata = PostgresQueryMetadata(string: self.rowStream.commandTag) else { + throw PSQLError.invalidCommandTag(self.rowStream.commandTag) + } + return metadata + } } struct AdaptiveRowBuffer: NIOAsyncSequenceProducerBackPressureStrategy { From d299741488b0ee5c682687ce02108d20b269a354 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Fri, 23 Aug 2024 13:43:04 +0330 Subject: [PATCH 4/5] better docs + a test --- .../PostgresNIO/New/PostgresRowSequence.swift | 8 ++++++ Tests/IntegrationTests/AsyncTests.swift | 28 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/Sources/PostgresNIO/New/PostgresRowSequence.swift b/Sources/PostgresNIO/New/PostgresRowSequence.swift index c3ea0c68..5784c819 100644 --- a/Sources/PostgresNIO/New/PostgresRowSequence.swift +++ b/Sources/PostgresNIO/New/PostgresRowSequence.swift @@ -85,6 +85,14 @@ extension PostgresRowSequence { } /// Consumes all rows and returns the query metadata. + /// + /// If you don't need the query metadata, just use the for-try-await-loop syntax: + /// ```swift + /// for try await row in myPostgresSequence { + /// /// Process each row + /// } + /// ``` + /// /// - Parameter onRow: Processes each row. /// - Returns: The query metadata. public func consume( diff --git a/Tests/IntegrationTests/AsyncTests.swift b/Tests/IntegrationTests/AsyncTests.swift index ee2a6f1f..52233ae8 100644 --- a/Tests/IntegrationTests/AsyncTests.swift +++ b/Tests/IntegrationTests/AsyncTests.swift @@ -1,3 +1,4 @@ +import Atomics import Logging import XCTest import PostgresNIO @@ -46,6 +47,32 @@ final class AsyncPostgresConnectionTests: XCTestCase { } } + func testSelect10kRowsAndConsume() async throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let eventLoop = eventLoopGroup.next() + + let start = 1 + let end = 10000 + + try await withTestConnection(on: eventLoop) { connection in + let rows = try await connection.query("SELECT generate_series(\(start), \(end));", logger: .psqlTest) + + let counter = ManagedAtomic(0) + let metadata = try await rows.consume { row in + let element = try row.decode(Int.self) + let newCounter = counter.wrappingIncrementThenLoad(ordering: .relaxed) + XCTAssertEqual(element, newCounter) + } + + XCTAssertEqual(metadata.command, "SELECT") + XCTAssertEqual(metadata.oid, nil) + XCTAssertEqual(metadata.rows, 10000) + + XCTAssertEqual(counter.load(ordering: .relaxed), end) + } + } + func testSelect10kRowsAndCollect() async throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } @@ -63,6 +90,7 @@ final class AsyncPostgresConnectionTests: XCTestCase { XCTAssertEqual(element, counter + 1) counter += 1 } + XCTAssertEqual(metadata.command, "SELECT") XCTAssertEqual(metadata.oid, nil) XCTAssertEqual(metadata.rows, 10000) From 68084f9486599d15069254161075e8def503b536 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Sat, 24 Aug 2024 13:50:26 +0330 Subject: [PATCH 5/5] Update PostgresRowSequence.swift --- Sources/PostgresNIO/New/PostgresRowSequence.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/PostgresNIO/New/PostgresRowSequence.swift b/Sources/PostgresNIO/New/PostgresRowSequence.swift index 5784c819..9ee06358 100644 --- a/Sources/PostgresNIO/New/PostgresRowSequence.swift +++ b/Sources/PostgresNIO/New/PostgresRowSequence.swift @@ -86,9 +86,9 @@ extension PostgresRowSequence { /// Consumes all rows and returns the query metadata. /// - /// If you don't need the query metadata, just use the for-try-await-loop syntax: + /// If you don't need the returned query metadata, just use the for-try-await-loop syntax: /// ```swift - /// for try await row in myPostgresSequence { + /// for try await row in myRowSequence { /// /// Process each row /// } /// ```