Skip to content

Commit

Permalink
Pass User token all the way to HttpsDataVault
Browse files Browse the repository at this point in the history
  • Loading branch information
frcroth committed Jan 13, 2025
1 parent dcd4a37 commit 5779401
Show file tree
Hide file tree
Showing 41 changed files with 286 additions and 183 deletions.
38 changes: 24 additions & 14 deletions test/backend/DataVaultTestSuite.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package backend

import com.scalableminds.util.accesscontext.TokenContext
import com.scalableminds.util.tools.Fox
import org.scalatestplus.play.PlaySpec

Expand All @@ -26,6 +27,7 @@ import scala.concurrent.ExecutionContext.{global => globalExecutionContext}
class DataVaultTestSuite extends PlaySpec {

val handleFoxJustification = "Handling Fox in Unit Test Context"
val tokenContext = TokenContext(None)

"Data vault" when {
"using Range requests" when {
Expand All @@ -36,10 +38,11 @@ class DataVaultTestSuite extends PlaySpec {
"return correct response" in {
WsTestClient.withClient { ws =>
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None), ws))
val vaultPath =
new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None), ws, "example.com"))
val bytes =
(vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey")
.readBytes(Some(range))(globalExecutionContext)
.readBytes(Some(range))(globalExecutionContext, tokenContext)
.get(handleFoxJustification)

assert(bytes.length == range.length)
Expand All @@ -53,7 +56,9 @@ class DataVaultTestSuite extends PlaySpec {
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
"return correct response" in {

val bytes = (vaultPath / dataKey).readBytes(Some(range))(globalExecutionContext).get(handleFoxJustification)
val bytes = (vaultPath / dataKey)
.readBytes(Some(range))(globalExecutionContext, tokenContext)
.get(handleFoxJustification)

assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
Expand All @@ -63,15 +68,15 @@ class DataVaultTestSuite extends PlaySpec {
"requesting a non-existent object" in {
val result =
(vaultPath / s"non-existent-key${UUID.randomUUID}")
.readBytes()(globalExecutionContext)
.readBytes()(globalExecutionContext, tokenContext)
.await(handleFoxJustification)
assertBoxEmpty(result)
}
}
"return failure" when {
"requesting invalid range" in {
val result = (vaultPath / dataKey)
.readBytes(Some(Range.Long(-5, -10, 1)))(globalExecutionContext)
.readBytes(Some(Range.Long(-5, -10, 1)))(globalExecutionContext, tokenContext)
.await(handleFoxJustification)
assertBoxFailure(result)
}
Expand All @@ -83,7 +88,7 @@ class DataVaultTestSuite extends PlaySpec {
uri,
Some(GoogleServiceAccountCredential("name", JsString("secret"), "user", "org")))))
val result = (vaultPath / dataKey)
.readBytes(Some(Range.Long(-10, 10, 1)))(globalExecutionContext)
.readBytes(Some(Range.Long(-10, 10, 1)))(globalExecutionContext, tokenContext)
.await(handleFoxJustification)
assertBoxFailure(result)
}
Expand All @@ -97,7 +102,9 @@ class DataVaultTestSuite extends PlaySpec {
val vaultPath =
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext))
val bytes =
(vaultPath / "s0/5/5/5").readBytes(Some(range))(globalExecutionContext).get(handleFoxJustification)
(vaultPath / "s0/5/5/5")
.readBytes(Some(range))(globalExecutionContext, tokenContext)
.get(handleFoxJustification)
assert(bytes.length == range.length)
assert(bytes.take(10).sameElements(Array(0, 0, 0, 3, 0, 0, 0, 64, 0, 0)))
}
Expand All @@ -113,9 +120,10 @@ class DataVaultTestSuite extends PlaySpec {
"return correct response" in {
WsTestClient.withClient { ws =>
val uri = new URI("http://storage.googleapis.com/")
val vaultPath = new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None), ws))
val vaultPath =
new VaultPath(uri, HttpsDataVault.create(RemoteSourceDescriptor(uri, None), ws, "example.com"))
val bytes = (vaultPath / s"neuroglancer-fafb-data/fafb_v14/fafb_v14_orig/$dataKey")
.readBytes()(globalExecutionContext)
.readBytes()(globalExecutionContext, tokenContext)
.get(handleFoxJustification)

assert(bytes.length == dataLength)
Expand All @@ -128,7 +136,8 @@ class DataVaultTestSuite extends PlaySpec {
"return correct response" in {
val uri = new URI("gs://neuroglancer-fafb-data/fafb_v14/fafb_v14_orig")
val vaultPath = new VaultPath(uri, GoogleCloudDataVault.create(RemoteSourceDescriptor(uri, None)))
val bytes = (vaultPath / dataKey).readBytes()(globalExecutionContext).get(handleFoxJustification)
val bytes =
(vaultPath / dataKey).readBytes()(globalExecutionContext, tokenContext).get(handleFoxJustification)

assert(bytes.length == dataLength)
assert(bytes.take(10).sameElements(Array(-1, -40, -1, -32, 0, 16, 74, 70, 73, 70)))
Expand All @@ -143,7 +152,7 @@ class DataVaultTestSuite extends PlaySpec {
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext))
val bytes =
(vaultPath / "33792-34304_29696-30208_3216-3232")
.readBytes()(globalExecutionContext)
.readBytes()(globalExecutionContext, tokenContext)
.get(handleFoxJustification)
assert(bytes.take(10).sameElements(Array(-87, -95, -85, -94, -101, 124, 115, 100, 113, 111)))
}
Expand All @@ -155,7 +164,7 @@ class DataVaultTestSuite extends PlaySpec {
WsTestClient.withClient { ws =>
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification)
val result = vaultPath.readBytes()(globalExecutionContext, tokenContext).await(handleFoxJustification)
assertBoxEmpty(result)
}
}
Expand All @@ -167,7 +176,7 @@ class DataVaultTestSuite extends PlaySpec {
WsTestClient.withClient { ws =>
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification)
val result = vaultPath.readBytes()(globalExecutionContext, tokenContext).await(handleFoxJustification)
assertBoxEmpty(result)
}
}
Expand Down Expand Up @@ -207,7 +216,8 @@ class DataVaultTestSuite extends PlaySpec {
"using vault path" when {
class MockDataVault extends DataVault {
override def readBytesAndEncoding(path: VaultPath, range: RangeSpecifier)(
implicit ec: ExecutionContext): Fox[(Array[Byte], Encoding.Value)] = ???
implicit ec: ExecutionContext,
tc: TokenContext): Fox[(Array[Byte], Encoding.Value)] = ???

override def listDirectory(path: VaultPath,
maxItems: Int)(implicit ec: ExecutionContext): Fox[List[VaultPath]] = ???
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.scalableminds.webknossos.datastore.controllers

import com.google.inject.Inject
import com.scalableminds.util.accesscontext.TokenContext
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.image.{Color, JPEGWriter}
import com.scalableminds.util.time.Instant
Expand All @@ -22,6 +23,7 @@ import net.liftweb.common.Box.tryo
import play.api.i18n.Messages
import play.api.libs.json.Json
import play.api.mvc.{AnyContent, _}

import scala.concurrent.duration.DurationInt
import java.io.ByteArrayOutputStream
import java.nio.{ByteBuffer, ByteOrder}
Expand Down Expand Up @@ -93,7 +95,7 @@ class BinaryDataController @Inject()(
// If true, use lossy compression by sending only half-bytes of the data
halfByte: Boolean,
mappingName: Option[String]
): Action[AnyContent] = Action.async { implicit request =>
): Action[AnyContent] = Action.async { implicit r =>
accessTokenService.validateAccessFromTokenContext(
UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) {
for {
Expand Down Expand Up @@ -139,7 +141,7 @@ class BinaryDataController @Inject()(
x: Int,
y: Int,
z: Int,
cubeSize: Int): Action[AnyContent] = Action.async { implicit request =>
cubeSize: Int): Action[AnyContent] = Action.async { implicit r =>
accessTokenService.validateAccessFromTokenContext(
UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) {
for {
Expand Down Expand Up @@ -170,7 +172,7 @@ class BinaryDataController @Inject()(
intensityMin: Option[Double],
intensityMax: Option[Double],
color: Option[String],
invertColor: Option[Boolean]): Action[RawBuffer] = Action.async(parse.raw) { implicit request =>
invertColor: Option[Boolean]): Action[RawBuffer] = Action.async(parse.raw) { implicit r =>
accessTokenService.validateAccessFromTokenContext(
UserAccessRequest.readDataSources(DataSourceId(datasetDirectoryName, organizationId))) {
for {
Expand Down Expand Up @@ -307,7 +309,7 @@ class BinaryDataController @Inject()(
dataSource: DataSource,
dataLayer: DataLayer,
dataRequests: DataRequestCollection
): Fox[(Array[Byte], List[Int])] = {
)(implicit tc: TokenContext): Fox[(Array[Byte], List[Int])] = {
val requests =
dataRequests.map(r => DataServiceDataRequest(dataSource, dataLayer, r.cuboid(dataLayer), r.settings))
binaryDataService.handleDataRequests(requests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class ZarrStreamingController @Inject()(
dataLayerName: String,
mag: String,
coordinates: String,
)(implicit m: MessagesProvider): Fox[Result] =
)(implicit m: MessagesProvider, tc: TokenContext): Fox[Result] =
for {
(dataSource, dataLayer) <- dataSourceRepository.getDataSourceAndDataLayer(organizationId,
datasetDirectoryName,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.scalableminds.webknossos.datastore.dataformats

import com.scalableminds.util.accesscontext.TokenContext
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction

import scala.concurrent.ExecutionContext

trait BucketProvider {
def load(readInstruction: DataReadInstruction)(implicit ec: ExecutionContext): Fox[Array[Byte]]
def load(readInstruction: DataReadInstruction)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Array[Byte]]
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.scalableminds.webknossos.datastore.dataformats

import com.scalableminds.util.accesscontext.TokenContext
import com.scalableminds.util.cache.AlfuCache
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.time.Instant
Expand All @@ -16,6 +17,7 @@ import com.scalableminds.webknossos.datastore.models.requests.DataReadInstructio
import com.scalableminds.webknossos.datastore.storage.RemoteSourceDescriptorService
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.Empty

import scala.concurrent.duration._
import ucar.ma2.{Array => MultiArray}

Expand All @@ -32,7 +34,7 @@ class DatasetArrayBucketProvider(dataLayer: DataLayer,
// Cache the DatasetArrays of all mags of this layer
private lazy val datasetArrayCache = AlfuCache[Vec3Int, DatasetArray](maxCapacity = 50)

def load(readInstruction: DataReadInstruction)(implicit ec: ExecutionContext): Fox[Array[Byte]] =
def load(readInstruction: DataReadInstruction)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Array[Byte]] =
for {
datasetArray <- datasetArrayCache.getOrLoad(readInstruction.bucket.mag,
_ => openDatasetArrayWithTimeLogging(readInstruction))
Expand All @@ -45,8 +47,8 @@ class DatasetArrayBucketProvider(dataLayer: DataLayer,
dataLayer.elementClass == ElementClass.uint24)
} yield bucketData

private def openDatasetArrayWithTimeLogging(readInstruction: DataReadInstruction)(
implicit ec: ExecutionContext): Fox[DatasetArray] = {
private def openDatasetArrayWithTimeLogging(
readInstruction: DataReadInstruction)(implicit ec: ExecutionContext, tc: TokenContext): Fox[DatasetArray] = {
val before = Instant.now
for {
result <- openDatasetArray(readInstruction).futureBox
Expand All @@ -59,8 +61,8 @@ class DatasetArrayBucketProvider(dataLayer: DataLayer,
} yield result
}

private def openDatasetArray(readInstruction: DataReadInstruction)(
implicit ec: ExecutionContext): Fox[DatasetArray] = {
private def openDatasetArray(readInstruction: DataReadInstruction)(implicit ec: ExecutionContext,
tc: TokenContext): Fox[DatasetArray] = {
val magLocatorOpt: Option[MagLocator] =
dataLayer.mags.find(_.mag == readInstruction.bucket.mag)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.scalableminds.webknossos.datastore.datareaders

import com.scalableminds.util.accesscontext.TokenContext
import com.scalableminds.util.tools.Fox
import com.scalableminds.util.tools.Fox.box2Fox
import com.scalableminds.webknossos.datastore.datavault.VaultPath
Expand All @@ -18,7 +19,7 @@ class ChunkReader(header: DatasetHeader) {
def read(path: VaultPath,
chunkShapeFromMetadata: Array[Int],
range: Option[NumericRange[Long]],
useSkipTypingShortcut: Boolean)(implicit ec: ExecutionContext): Fox[MultiArray] =
useSkipTypingShortcut: Boolean)(implicit ec: ExecutionContext, tc: TokenContext): Fox[MultiArray] =
for {
chunkBytesAndShapeBox: Box[(Array[Byte], Option[Array[Int]])] <- readChunkBytesAndShape(path, range).futureBox
chunkShape: Array[Int] = chunkBytesAndShapeBox.toOption.flatMap(_._2).getOrElse(chunkShapeFromMetadata)
Expand All @@ -39,7 +40,8 @@ class ChunkReader(header: DatasetHeader) {
// Returns bytes (optional, Fox.empty may later be replaced with fill value)
// and chunk shape (optional, only for data formats where each chunk reports its own shape, e.g. N5)
protected def readChunkBytesAndShape(path: VaultPath, range: Option[NumericRange[Long]])(
implicit ec: ExecutionContext): Fox[(Array[Byte], Option[Array[Int]])] =
implicit ec: ExecutionContext,
tc: TokenContext): Fox[(Array[Byte], Option[Array[Int]])] =
for {
bytes <- path.readBytes(range)
decompressed <- tryo(header.compressorImpl.decompress(bytes)).toFox ?~> "chunk.decompress.failed"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.scalableminds.webknossos.datastore.datareaders

import com.scalableminds.util.accesscontext.TokenContext
import com.scalableminds.util.cache.AlfuCache
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.tools.Fox
Expand Down Expand Up @@ -64,10 +65,11 @@ class DatasetArray(vaultPath: VaultPath,
chunkShape // irregular shaped chunk indexes are currently not supported for 2d datasets
}

def readBytesWithAdditionalCoordinates(shapeXYZ: Vec3Int,
offsetXYZ: Vec3Int,
additionalCoordinatesOpt: Option[Seq[AdditionalCoordinate]],
shouldReadUint24: Boolean)(implicit ec: ExecutionContext): Fox[Array[Byte]] =
def readBytesWithAdditionalCoordinates(
shapeXYZ: Vec3Int,
offsetXYZ: Vec3Int,
additionalCoordinatesOpt: Option[Seq[AdditionalCoordinate]],
shouldReadUint24: Boolean)(implicit ec: ExecutionContext, tc: TokenContext): Fox[Array[Byte]] =
for {
(shapeArray, offsetArray) <- tryo(constructShapeAndOffsetArrays(
shapeXYZ,
Expand Down Expand Up @@ -114,7 +116,8 @@ class DatasetArray(vaultPath: VaultPath,
}

// returns byte array in fortran-order with little-endian values
private def readBytes(shape: Array[Int], offset: Array[Int])(implicit ec: ExecutionContext): Fox[Array[Byte]] =
private def readBytes(shape: Array[Int], offset: Array[Int])(implicit ec: ExecutionContext,
tc: TokenContext): Fox[Array[Byte]] =
for {
typedMultiArray <- readAsFortranOrder(shape, offset)
asBytes <- BytesConverter.toByteArray(typedMultiArray, header.resolvedDataType, ByteOrder.LITTLE_ENDIAN)
Expand Down Expand Up @@ -147,8 +150,8 @@ class DatasetArray(vaultPath: VaultPath,
// The local variables like chunkIndices are also in this order unless explicitly named.
// Loading data adapts to the array's axis order so that …CXYZ data in fortran-order is
// returned, regardless of the array’s internal storage.
private def readAsFortranOrder(shape: Array[Int], offset: Array[Int])(
implicit ec: ExecutionContext): Fox[MultiArray] = {
private def readAsFortranOrder(shape: Array[Int], offset: Array[Int])(implicit ec: ExecutionContext,
tc: TokenContext): Fox[MultiArray] = {
val totalOffset: Array[Int] = offset.zip(header.voxelOffset).map { case (o, v) => o - v }.padTo(offset.length, 0)
val chunkIndices = ChunkUtils.computeChunkIndices(datasetShape.map(fullAxisOrder.permuteIndicesArrayToWk),
fullAxisOrder.permuteIndicesArrayToWk(chunkShape),
Expand Down Expand Up @@ -185,19 +188,22 @@ class DatasetArray(vaultPath: VaultPath,
s"Copying data from dataset chunk failed. Chunk shape (F): ${printAsOuterF(sourceChunk.getShape)}, target shape (F): ${printAsOuterF(
target.getShape)}, offsetInChunk: ${printAsOuterF(offsetInChunk)}. Axis order (C): $fullAxisOrder (outer: ${fullAxisOrder.toStringWk})"

protected def getShardedChunkPathAndRange(chunkIndex: Array[Int])(
implicit ec: ExecutionContext): Fox[(VaultPath, NumericRange[Long])] = ??? // Defined in subclass
protected def getShardedChunkPathAndRange(
chunkIndex: Array[Int])(implicit ec: ExecutionContext, tc: TokenContext): Fox[(VaultPath, NumericRange[Long])] =
??? // Defined in subclass

private def chunkContentsCacheKey(chunkIndex: Array[Int]): String =
s"${dataSourceId}__${layerName}__${vaultPath}__chunk_${chunkIndex.mkString(",")}"

private def getSourceChunkDataWithCache(chunkIndex: Array[Int], useSkipTypingShortcut: Boolean = false)(
implicit ec: ExecutionContext): Fox[MultiArray] =
implicit ec: ExecutionContext,
tc: TokenContext): Fox[MultiArray] =
sharedChunkContentsCache.getOrLoad(chunkContentsCacheKey(chunkIndex),
_ => readSourceChunkData(chunkIndex, useSkipTypingShortcut))

private def readSourceChunkData(chunkIndex: Array[Int], useSkipTypingShortcut: Boolean)(
implicit ec: ExecutionContext): Fox[MultiArray] =
implicit ec: ExecutionContext,
tc: TokenContext): Fox[MultiArray] =
if (header.isSharded) {
for {
(shardPath, chunkRange) <- getShardedChunkPathAndRange(chunkIndex) ?~> "chunk.getShardedPathAndRange.failed"
Expand Down
Loading

0 comments on commit 5779401

Please sign in to comment.