Skip to content

Commit cea5bde

Browse files
authored
[query] clean-up unused fs methods (#15177)
## Change Description This PR refactors the filesystem abstraction layer to improve the URL handling interface. The main changes include: - Convert `FSURL` from a trait to an abstract class with a generic type parameter - Add a standard path component addition operator `/` to replace `addPathComponent` - Remove redundant methods like `readNoCompression`, `deleteOnExit`, `stripCodecExtension`, and caching-related methods - Rename `getPath` to simply `path` for cleaner access - Update implementations in AzureStorageFS, GoogleStorageFS, HadoopFS, and RouterFS to use the new interface These changes make the filesystem API more consistent and easier to use while removing unnecessary functionality. ## Security Assessment This change cannot impact the Hail Batch instance as deployed by Broad Institute in GCP
1 parent 4f98151 commit cea5bde

File tree

9 files changed

+64
-167
lines changed

9 files changed

+64
-167
lines changed

hail/hail/src/is/hail/io/fs/AzureStorageFS.scala

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,26 @@ import scala.collection.mutable
1717
import scala.collection.mutable.ArrayBuffer
1818
import scala.jdk.CollectionConverters._
1919

20-
import java.io.{ByteArrayOutputStream, FileNotFoundException, OutputStream}
20+
import java.io.{FileNotFoundException, OutputStream}
2121
import java.nio.file.{Path, Paths}
2222
import java.time.Duration
2323

2424
class AzureStorageFSURL(
2525
val account: String,
2626
val container: String,
27-
val path: String,
27+
override val path: String,
2828
val sasToken: Option[String],
29-
) extends FSURL {
29+
) extends FSURL[AzureStorageFSURL] {
3030

31-
def addPathComponent(c: String): AzureStorageFSURL =
32-
if (path == "")
33-
withPath(c)
34-
else
35-
withPath(s"$path/$c")
36-
37-
def fromString(s: String): AzureStorageFSURL = AzureStorageFS.parseUrl(s)
31+
override def /(c: String): AzureStorageFSURL =
32+
if (path == "") withPath(c)
33+
else withPath(s"$path/$c")
3834

3935
def withPath(newPath: String): AzureStorageFSURL =
4036
new AzureStorageFSURL(account, container, newPath, sasToken)
4137

4238
def prefix: String = s"https://$account.blob.core.windows.net/$container"
4339

44-
def getPath: String = path
45-
4640
def base: String = {
4741
val pathPart = if (path == "") "" else s"/$path"
4842
prefix + pathPart
@@ -157,8 +151,6 @@ class AzureStorageFS(val credential: AzureCloudCredentials) extends FS {
157151
case _: IllegalArgumentException => false
158152
}
159153

160-
def urlAddPathComponent(url: URL, component: String): URL = url.addPathComponent(component)
161-
162154
def getConfiguration(): Unit = ()
163155

164156
def setConfiguration(config: Any): Unit = {}
@@ -246,22 +238,6 @@ class AzureStorageFS(val credential: AzureCloudCredentials) extends FS {
246238
new WrappedSeekableDataInputStream(is)
247239
}
248240

249-
override def readNoCompression(url: URL): Array[Byte] = handlePublicAccessError(url) {
250-
val client = getBlobClient(url)
251-
val size = client.getProperties.getBlobSize
252-
if (size < 2L * 1024 * 1024 * 1024) { // https://learn.microsoft.com/en-us/java/api/com.azure.storage.blob.specialized.blobclientbase?view=azure-java-stable#com-azure-storage-blob-specialized-blobclientbase-downloadcontent()
253-
retryTransientErrors {
254-
client.downloadContent().toBytes()
255-
}
256-
} else {
257-
val baos = new ByteArrayOutputStream()
258-
retryTransientErrors {
259-
client.downloadStream(baos)
260-
}
261-
baos.toByteArray()
262-
}
263-
}
264-
265241
def createNoCompression(url: URL): PositionedDataOutputStream = retryTransientErrors {
266242
val blockBlobClient = getBlobClient(url).getBlockBlobClient
267243

@@ -357,12 +333,12 @@ class AzureStorageFS(val credential: AzureCloudCredentials) extends FS {
357333
}
358334

359335
override def fileListEntry(url: URL): FileListEntry = {
360-
if (url.getPath == "")
336+
if (url.path == "")
361337
return AzureStorageFileListEntry.dir(url)
362338

363339
val it = {
364340
val containerClient = getContainerClient(url)
365-
val options = new ListBlobsOptions().setPrefix(dropTrailingSlash(url.getPath))
341+
val options = new ListBlobsOptions().setPrefix(dropTrailingSlash(url.path))
366342
val prefixMatches = containerClient.listBlobsByHierarchy("/", options, timeout)
367343
prefixMatches.iterator()
368344
}.asScala.map(AzureStorageFileListEntry.apply(url, _))

hail/hail/src/is/hail/io/fs/FS.scala

Lines changed: 8 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package is.hail.io.fs
22

33
import is.hail.io.compress.{BGzipInputStream, BGzipOutputStream}
44
import is.hail.io.fs.FSUtil.{containsWildcard, dropTrailingSlash}
5-
import is.hail.services._
65
import is.hail.utils._
76

87
import scala.collection.mutable
@@ -48,8 +47,9 @@ class WrappedPositionOutputStream(os: OutputStream) extends OutputStream with Po
4847
def getPosition: Long = count
4948
}
5049

51-
trait FSURL {
52-
def getPath: String
50+
abstract class FSURL[URL <: FSURL[URL]] {
51+
def path: String
52+
def /(component: String): URL
5353
}
5454

5555
trait FileStatus {
@@ -253,30 +253,13 @@ abstract class FSPositionedOutputStream(val capacity: Int) extends OutputStream
253253
}
254254

255255
trait FS extends Serializable with Logging {
256-
type URL <: FSURL
256+
type URL <: FSURL[URL]
257257

258258
def parseUrl(filename: String): URL
259259

260260
def validUrl(filename: String): Boolean
261261

262-
def urlAddPathComponent(url: URL, component: String): URL
263-
264-
final def openCachedNoCompression(filename: String): SeekableDataInputStream =
265-
openNoCompression(filename)
266-
267-
def openCachedNoCompression(url: URL): SeekableDataInputStream = openNoCompression(url)
268-
269-
final def createCachedNoCompression(filename: String): PositionedDataOutputStream =
270-
createNoCompression(filename)
271-
272-
def createCachedNoCompression(url: URL): PositionedDataOutputStream = createNoCompression(url)
273-
274-
final def writeCached(filename: String)(writer: PositionedDataOutputStream => Unit) =
275-
writePDOS(filename)(writer)
276-
277-
def writeCached(url: URL)(writer: PositionedDataOutputStream => Unit) = writePDOS(url)(writer)
278-
279-
def getCodecFromExtension(extension: String, gzAsBGZ: Boolean = false): CompressionCodec = {
262+
def getCodecFromExtension(extension: String, gzAsBGZ: Boolean = false): CompressionCodec =
280263
extension match {
281264
case ".gz" =>
282265
if (gzAsBGZ)
@@ -290,7 +273,6 @@ trait FS extends Serializable with Logging {
290273
case _ =>
291274
null
292275
}
293-
}
294276

295277
def getCodecFromPath(path: String, gzAsBGZ: Boolean = false): CompressionCodec =
296278
getCodecFromExtension(getExtension(path), gzAsBGZ)
@@ -316,11 +298,6 @@ trait FS extends Serializable with Logging {
316298
throw new AssertionError("unreachable")
317299
}
318300

319-
def stripCodecExtension(path: String): String = {
320-
val ext = getCodecExtension(path)
321-
path.dropRight(ext.length)
322-
}
323-
324301
def getCodecExtension(path: String): String = {
325302
val ext = getExtension(path)
326303
if (ext == ".gz" || ext == ".bgz" || ext == ".tbi")
@@ -334,12 +311,6 @@ trait FS extends Serializable with Logging {
334311

335312
def openNoCompression(url: URL): SeekableDataInputStream
336313

337-
final def readNoCompression(filename: String): Array[Byte] = readNoCompression(parseUrl(filename))
338-
339-
def readNoCompression(url: URL): Array[Byte] = retryTransientErrors {
340-
using(openNoCompression(url))(is => IOUtils.toByteArray(is))
341-
}
342-
343314
final def createNoCompression(filename: String): PositionedDataOutputStream =
344315
createNoCompression(parseUrl(filename))
345316

@@ -399,7 +370,7 @@ trait FS extends Serializable with Logging {
399370
}
400371
}
401372
} else
402-
f(urlAddPathComponent(prefix, c), null, i + 1)
373+
f(prefix / c, null, i + 1)
403374
}
404375
}
405376

@@ -499,13 +470,6 @@ trait FS extends Serializable with Logging {
499470

500471
def makeQualified(path: String): String
501472

502-
final def deleteOnExit(filename: String): Unit = deleteOnExit(parseUrl(filename))
503-
504-
def deleteOnExit(url: URL): Unit =
505-
Runtime.getRuntime.addShutdownHook(
506-
new Thread(() => delete(url, recursive = false))
507-
)
508-
509473
final def open(filename: String, codec: CompressionCodec): InputStream =
510474
open(parseUrl(filename), codec)
511475

@@ -515,7 +479,6 @@ trait FS extends Serializable with Logging {
515479
codec.makeInputStream(is)
516480
else
517481
is
518-
519482
}
520483

521484
final def open(filename: String): InputStream = open(parseUrl(filename))
@@ -527,14 +490,14 @@ trait FS extends Serializable with Logging {
527490
open(parseUrl(filename), gzAsBGZ)
528491

529492
def open(url: URL, gzAsBGZ: Boolean): InputStream =
530-
open(url, getCodecFromPath(url.getPath, gzAsBGZ))
493+
open(url, getCodecFromPath(url.path, gzAsBGZ))
531494

532495
final def create(filename: String): OutputStream = create(parseUrl(filename))
533496

534497
def create(url: URL): OutputStream = {
535498
val os = createNoCompression(url)
536499

537-
val codec = getCodecFromPath(url.getPath, gzAsBGZ = false)
500+
val codec = getCodecFromPath(url.path, gzAsBGZ = false)
538501
if (codec != null)
539502
codec.makeOutputStream(os)
540503
else

hail/hail/src/is/hail/io/fs/GoogleStorageFS.scala

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,17 @@ import com.google.cloud.storage.Storage.{
2121
BlobGetOption, BlobListOption, BlobSourceOption, BlobWriteOption,
2222
}
2323

24-
case class GoogleStorageFSURL(bucket: String, path: String) extends FSURL {
25-
def addPathComponent(c: String): GoogleStorageFSURL =
26-
if (path == "")
27-
withPath(c)
28-
else
29-
withPath(s"$path/$c")
30-
31-
def withPath(newPath: String): GoogleStorageFSURL = GoogleStorageFSURL(bucket, newPath)
32-
def fromString(s: String): GoogleStorageFSURL = GoogleStorageFS.parseUrl(s)
33-
34-
def getPath: String = path
35-
36-
override def toString(): String = if (path.isEmpty) {
37-
s"gs://$bucket"
38-
} else {
39-
s"gs://$bucket/$path"
40-
}
24+
class GoogleStorageFSURL(val bucket: String, override val path: String)
25+
extends FSURL[GoogleStorageFSURL] {
26+
override def /(c: String): GoogleStorageFSURL =
27+
new GoogleStorageFSURL(bucket, if (path == "") c else s"$path/$c")
28+
29+
def withPath(p: String): GoogleStorageFSURL =
30+
new GoogleStorageFSURL(bucket, p)
31+
32+
override def toString: String =
33+
if (path.isEmpty) s"gs://$bucket"
34+
else s"gs://$bucket/$path"
4135
}
4236

4337
object GoogleStorageFS {
@@ -58,7 +52,7 @@ object GoogleStorageFS {
5852
val bucket = m.group(1)
5953
val maybePath = m.group(2)
6054
val path = Paths.get(if (maybePath == null) "" else maybePath.stripPrefix("/"))
61-
GoogleStorageFSURL(bucket, path.normalize().toString)
55+
new GoogleStorageFSURL(bucket, path.normalize().toString)
6256
case None => throw new IllegalArgumentException(
6357
s"GCS URI must be of the form: gs://bucket/path, found $filename"
6458
)
@@ -150,8 +144,6 @@ class GoogleStorageFS(
150144
override def validUrl(filename: String): Boolean =
151145
filename.startsWith("gs://")
152146

153-
def urlAddPathComponent(url: URL, component: String): URL = url.addPathComponent(component)
154-
155147
def getConfiguration(): Option[RequesterPaysConfig] =
156148
requesterPaysConfig
157149

@@ -256,10 +248,6 @@ class GoogleStorageFS(
256248
new WrappedSeekableDataInputStream(is)
257249
}
258250

259-
override def readNoCompression(url: URL): Array[Byte] = retryTransientErrors {
260-
storage.readAllBytes(url.bucket, url.path)
261-
}
262-
263251
def createNoCompression(url: URL): PositionedDataOutputStream = retryTransientErrors {
264252
log.info(f"createNoCompression: $url")
265253

@@ -482,7 +470,7 @@ class GoogleStorageFS(
482470
}
483471

484472
override def fileListEntry(url: URL): FileListEntry = {
485-
if (url.getPath == "") {
473+
if (url.path == "") {
486474
return GoogleStorageFileListEntry.dir(url)
487475
}
488476

@@ -502,17 +490,15 @@ class GoogleStorageFS(
502490
fileListEntryFromIterator(url, it)
503491
}
504492

505-
override def eTag(url: URL): Some[String] = {
506-
val GoogleStorageFSURL(bucket, blob) = url
493+
override def eTag(url: URL): Some[String] =
507494
handleRequesterPays(
508495
(options: Seq[BlobGetOption]) =>
509496
retryTransientErrors {
510-
Some(storage.get(bucket, blob, options: _*).getEtag)
497+
Some(storage.get(url.bucket, url.path, options: _*).getEtag)
511498
},
512499
BlobGetOption.userProject,
513-
bucket,
500+
url.bucket,
514501
)
515-
}
516502

517503
def makeQualified(filename: String): String = {
518504
if (!filename.startsWith("gs://"))

hail/hail/src/is/hail/io/fs/HadoopFS.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import scala.util.Try
88
import java.io._
99

1010
import org.apache.hadoop
11-
import org.apache.hadoop.fs.{EtagSource, FSDataInputStream, FSDataOutputStream}
11+
import org.apache.hadoop.fs.{EtagSource, FSDataInputStream, FSDataOutputStream, Path}
1212

1313
class HadoopFileListEntry(fs: hadoop.fs.FileStatus) extends FileListEntry {
1414
val normalizedPath = fs.getPath
@@ -72,27 +72,27 @@ object HadoopFS {
7272
}
7373
}
7474

75-
case class HadoopFSURL(path: String, conf: SerializableHadoopConfiguration) extends FSURL {
76-
private[this] val unqualifiedHadoopPath = new hadoop.fs.Path(path)
77-
val hadoopFs = unqualifiedHadoopPath.getFileSystem(conf.value)
78-
val hadoopPath = hadoopFs.makeQualified(unqualifiedHadoopPath)
75+
class HadoopFSURL(unqualified: Path, conf: SerializableHadoopConfiguration)
76+
extends FSURL[HadoopFSURL] {
77+
val hadoopFs = unqualified.getFileSystem(conf.value)
78+
val hadoopPath: Path = hadoopFs.makeQualified(unqualified)
7979

80-
def addPathComponent(c: String): HadoopFSURL = HadoopFSURL(s"${hadoopPath.toString}/$c", conf)
81-
def getPath: String = hadoopPath.toString
82-
def fromString(s: String): HadoopFSURL = HadoopFSURL(s, conf)
83-
override def toString(): String = hadoopPath.toString
80+
override def /(c: String): HadoopFSURL =
81+
new HadoopFSURL(new Path(unqualified, c), conf)
82+
83+
override def path: String = hadoopPath.toString
84+
override def toString: String = hadoopPath.toString
8485
}
8586

8687
class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends FS {
8788
type URL = HadoopFSURL
8889

89-
override def parseUrl(filename: String): URL = HadoopFSURL(filename, conf)
90+
override def parseUrl(filename: String): URL =
91+
new HadoopFSURL(new Path(filename), conf)
9092

9193
override def validUrl(filename: String): Boolean =
9294
Try(getFileSystem(filename)).isSuccess
9395

94-
def urlAddPathComponent(url: URL, component: String): URL = url.addPathComponent(component)
95-
9696
def getConfiguration(): SerializableHadoopConfiguration = conf
9797

9898
def setConfiguration(_conf: Any): Unit =
@@ -172,7 +172,7 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends
172172
override def fileStatus(url: URL): FileStatus = {
173173
val fle = fileListEntry(url)
174174
if (fle.isDirectory) {
175-
throw new FileNotFoundException(url.getPath)
175+
throw new FileNotFoundException(url.path)
176176
}
177177
fle
178178
}
@@ -192,9 +192,6 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends
192192
pathFS.makeQualified(ppath).toString
193193
}
194194

195-
override def deleteOnExit(url: URL): Unit =
196-
url.hadoopFs.deleteOnExit(url.hadoopPath): Unit
197-
198195
def supportsScheme(scheme: String): Boolean =
199196
(scheme == "") || {
200197
try {

0 commit comments

Comments
 (0)