diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala index dd768c84ee..9a251ab2de 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/S3FileOperations.scala @@ -19,8 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource import ch.epfl.bluebrain.nexus.delta.sdk.stream.StreamConverter import software.amazon.awssdk.services.s3.model.NoSuchKeyException -import scala.concurrent.duration.DurationInt - trait S3FileOperations { def checkBucketExists(bucket: String): IO[Unit] @@ -34,8 +32,7 @@ trait S3FileOperations { object S3FileOperations { final case class S3FileMetadata(contentType: Option[ContentType], metadata: FileStorageMetadata) - private val log = Logger[S3FileOperations] - private val ChunkSize = 8 * 1024 + private val log = Logger[S3FileOperations] def mk(client: S3StorageClient, locationGenerator: S3LocationGenerator)(implicit as: ActorSystem, @@ -55,8 +52,7 @@ object S3FileOperations { StreamConverter( client .readFile(bucket, UrlUtils.decode(path)) - .groupWithin(ChunkSize, 1.second) - .map(bytes => ByteString(bytes.toArray)) + .map(bytes => ByteString(bytes)) .adaptError { case _: NoSuchKeyException => FetchFileRejection.FileNotFound(path.toString) case err => UnexpectedFetchError(path.toString, err.getMessage) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala index 50ef149549..385dbac695 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClient.scala @@ -22,7 +22,7 @@ trait S3StorageClient { def listObjectsV2(bucket: String, prefix: String): IO[ListObjectsV2Response] - def readFile(bucket: String, fileKey: String): Stream[IO, Byte] + def readFile(bucket: String, fileKey: String): Stream[IO, ByteBuffer] def readFileMultipart(bucket: String, fileKey: String): Stream[IO, Byte] diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientDisabled.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientDisabled.scala index 07767f356b..92cb21f807 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientDisabled.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientDisabled.scala @@ -16,7 +16,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient { override def listObjectsV2(bucket: String, prefix: String): IO[ListObjectsV2Response] = raiseDisabledErr - override def readFile(bucket: String, fileKey: String): Stream[IO, Byte] = Stream.raiseError[IO](disabledErr) + override def readFile(bucket: String, fileKey: String): Stream[IO, ByteBuffer] = Stream.raiseError[IO](disabledErr) override def headObject(bucket: String, key: String): IO[HeadObject] = raiseDisabledErr diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientImpl.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientImpl.scala index fd760078c7..4e6d23d41b 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientImpl.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/operations/s3/client/S3StorageClientImpl.scala @@ -8,10 +8,10 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejec import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{checksumAlgorithm, CopyOptions, HeadObject, PutObjectRequest} import eu.timepit.refined.refineMV import eu.timepit.refined.types.string.NonEmptyString +import fs2.Stream import fs2.aws.s3.S3 import fs2.aws.s3.models.Models.{BucketName, FileKey, PartSizeMB} import fs2.interop.reactivestreams.{PublisherOps, _} -import fs2.{Chunk, Stream} import io.laserdisc.pure.s3.tagless.S3AsyncClientOp import org.reactivestreams.Subscriber import software.amazon.awssdk.core.async.AsyncRequestBody @@ -31,11 +31,10 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext override def listObjectsV2(bucket: String, prefix: String): IO[ListObjectsV2Response] = client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build()) - override def readFile(bucket: String, fileKey: String): Stream[IO, Byte] = { + override def readFile(bucket: String, fileKey: String): Stream[IO, ByteBuffer] = Stream .eval(client.getObject(getObjectRequest(bucket, fileKey), new Fs2StreamAsyncResponseTransformer)) - .flatMap(_.toStreamBuffered[IO](2).flatMap(bb => Stream.chunk(Chunk.byteBuffer(bb)))) - } + .flatMap(_.toStreamBuffered[IO](2)) override def readFileMultipart(bucket: String, fileKey: String): Stream[IO, Byte] = { val bucketName = BucketName(NonEmptyString.unsafeFrom(bucket)) diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala index 4a992342f0..4c4da452aa 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/config/ShipConfig.scala @@ -6,13 +6,14 @@ import ch.epfl.bluebrain.nexus.delta.kernel.config.Configs import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient import ch.epfl.bluebrain.nexus.delta.sourcing.config.DatabaseConfig import com.typesafe.config.Config -import fs2.Stream +import fs2.{Chunk, Stream} import fs2.io.file.Path import pureconfig.ConfigReader import pureconfig.backend.ConfigFactoryWrapper import pureconfig.error.ConfigReaderException import pureconfig.generic.semiauto.deriveReader +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets.UTF_8 final case class ShipConfig(database: DatabaseConfig, s3: S3Config, input: InputConfig) @@ -28,7 +29,7 @@ object ShipConfig { externalConfig.flatMap(mergeFromConfig) } - def merge(externalConfigStream: Stream[IO, Byte]): IO[(ShipConfig, Config)] = { + def merge(externalConfigStream: Stream[IO, ByteBuffer]): IO[(ShipConfig, Config)] = { val externalConfig = configFromStream(externalConfigStream) externalConfig.flatMap(mergeFromConfig) } @@ -51,9 +52,9 @@ object ShipConfig { * Loads a config from a stream. Taken from * https://github.com/pureconfig/pureconfig/tree/master/modules/fs2/src/main/scala/pureconfig/module/fs2 */ - private def configFromStream(configStream: Stream[IO, Byte]): IO[Config] = + private def configFromStream(configStream: Stream[IO, ByteBuffer]): IO[Config] = for { - bytes <- configStream.compile.to(Array) + bytes <- configStream.flatMap(bb => Stream.chunk(Chunk.byteBuffer(bb))).compile.to(Array) string = new String(bytes, UTF_8) configOrError <- IO.delay(ConfigFactoryWrapper.parseString(string)) config <- IO.fromEither(configOrError.leftMap(ConfigReaderException[Config])) diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala index b58b6df5f0..2ac8e65718 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/files/S3StorageSpec.scala @@ -337,8 +337,8 @@ class S3StorageSpec extends StorageSpec { } } - "Uploading a large file" should { - "succeed" ignore { + "Uploading and downloading a large file" should { + "succeed" in { val content = { val sb = new StringBuilder (1 to 100_000_000).foreach(_ => sb.append('1')) @@ -351,11 +351,15 @@ class S3StorageSpec extends StorageSpec { content ) for { - _ <- IO.println("Starting the upload") - _ <- deltaClient.uploadFile(projectRef, storageId, fileInput, None) { expectCreated } - _ <- deltaClient.get[ByteString](s"/files/$projectRef/${fileInput.fileId}", Coyote, acceptAll) { - expectOk - } + _ <- IO.println("Starting the upload") + startUpload <- IO.delay(System.currentTimeMillis()) + _ <- deltaClient.uploadFile(projectRef, storageId, fileInput, None) { expectCreated }.timed + endUpload <- IO.delay(System.currentTimeMillis()) + _ <- IO.println(s"End of upload after ${endUpload - startUpload}") + _ <- IO.println("Starting the download") + _ <- deltaClient.get[ByteString](s"/files/$projectRef/${fileInput.fileId}", Coyote, acceptAll) { expectOk } + endDownload <- IO.delay(System.currentTimeMillis()) + _ <- IO.println(s"End of download after ${endDownload - endUpload}") } yield succeed } }