Skip to content

Commit 38bd775

Browse files
committed
[query] clean-up unused fs methods
1 parent 052c838 commit 38bd775

File tree

7 files changed

+60
-156
lines changed

7 files changed

+60
-156
lines changed

hail/hail/src/is/hail/io/fs/AzureStorageFS.scala

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,26 @@ import scala.collection.JavaConverters._
1818
import scala.collection.mutable
1919
import scala.collection.mutable.ArrayBuffer
2020

21-
import java.io.{ByteArrayOutputStream, FileNotFoundException, OutputStream}
21+
import java.io.{FileNotFoundException, OutputStream}
2222
import java.nio.file.{Path, Paths}
2323
import java.time.Duration
2424

2525
class AzureStorageFSURL(
2626
val account: String,
2727
val container: String,
28-
val path: String,
28+
override val path: String,
2929
val sasToken: Option[String],
30-
) extends FSURL {
30+
) extends FSURL[AzureStorageFSURL] {
3131

32-
def addPathComponent(c: String): AzureStorageFSURL =
33-
if (path == "")
34-
withPath(c)
35-
else
36-
withPath(s"$path/$c")
37-
38-
def fromString(s: String): AzureStorageFSURL = AzureStorageFS.parseUrl(s)
32+
override def /(c: String): AzureStorageFSURL =
33+
if (path == "") withPath(c)
34+
else withPath(s"$path/$c")
3935

4036
def withPath(newPath: String): AzureStorageFSURL =
4137
new AzureStorageFSURL(account, container, newPath, sasToken)
4238

4339
def prefix: String = s"https://$account.blob.core.windows.net/$container"
4440

45-
def getPath: String = path
46-
4741
def base: String = {
4842
val pathPart = if (path == "") "" else s"/$path"
4943
prefix + pathPart
@@ -158,8 +152,6 @@ class AzureStorageFS(val credential: AzureCloudCredentials) extends FS {
158152
case _: IllegalArgumentException => false
159153
}
160154

161-
def urlAddPathComponent(url: URL, component: String): URL = url.addPathComponent(component)
162-
163155
def getConfiguration(): Unit = ()
164156

165157
def setConfiguration(config: Any): Unit = {}
@@ -247,22 +239,6 @@ class AzureStorageFS(val credential: AzureCloudCredentials) extends FS {
247239
new WrappedSeekableDataInputStream(is)
248240
}
249241

250-
override def readNoCompression(url: URL): Array[Byte] = handlePublicAccessError(url) {
251-
val client = getBlobClient(url)
252-
val size = client.getProperties.getBlobSize
253-
if (size < 2L * 1024 * 1024 * 1024) { // https://learn.microsoft.com/en-us/java/api/com.azure.storage.blob.specialized.blobclientbase?view=azure-java-stable#com-azure-storage-blob-specialized-blobclientbase-downloadcontent()
254-
retryTransientErrors {
255-
client.downloadContent().toBytes()
256-
}
257-
} else {
258-
val baos = new ByteArrayOutputStream()
259-
retryTransientErrors {
260-
client.downloadStream(baos)
261-
}
262-
baos.toByteArray()
263-
}
264-
}
265-
266242
def createNoCompression(url: URL): PositionedDataOutputStream = retryTransientErrors {
267243
val blockBlobClient = getBlobClient(url).getBlockBlobClient
268244

@@ -358,12 +334,12 @@ class AzureStorageFS(val credential: AzureCloudCredentials) extends FS {
358334
}
359335

360336
override def fileListEntry(url: URL): FileListEntry = {
361-
if (url.getPath == "")
337+
if (url.path == "")
362338
return AzureStorageFileListEntry.dir(url)
363339

364340
val it = {
365341
val containerClient = getContainerClient(url)
366-
val options = new ListBlobsOptions().setPrefix(dropTrailingSlash(url.getPath))
342+
val options = new ListBlobsOptions().setPrefix(dropTrailingSlash(url.path))
367343
val prefixMatches = containerClient.listBlobsByHierarchy("/", options, timeout)
368344
prefixMatches.iterator()
369345
}.asScala.map(AzureStorageFileListEntry.apply(url, _))

hail/hail/src/is/hail/io/fs/FS.scala

Lines changed: 8 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package is.hail.io.fs
22

33
import is.hail.io.compress.{BGzipInputStream, BGzipOutputStream}
44
import is.hail.io.fs.FSUtil.{containsWildcard, dropTrailingSlash}
5-
import is.hail.services._
65
import is.hail.utils._
76

87
import scala.collection.mutable
@@ -48,8 +47,9 @@ class WrappedPositionOutputStream(os: OutputStream) extends OutputStream with Po
4847
def getPosition: Long = count
4948
}
5049

51-
trait FSURL {
52-
def getPath: String
50+
abstract class FSURL[URL <: FSURL[URL]] {
51+
def path: String
52+
def /(component: String): URL
5353
}
5454

5555
trait FileStatus {
@@ -253,30 +253,13 @@ abstract class FSPositionedOutputStream(val capacity: Int) extends OutputStream
253253
}
254254

255255
trait FS extends Serializable with Logging {
256-
type URL <: FSURL
256+
type URL <: FSURL[URL]
257257

258258
def parseUrl(filename: String): URL
259259

260260
def validUrl(filename: String): Boolean
261261

262-
def urlAddPathComponent(url: URL, component: String): URL
263-
264-
final def openCachedNoCompression(filename: String): SeekableDataInputStream =
265-
openNoCompression(filename)
266-
267-
def openCachedNoCompression(url: URL): SeekableDataInputStream = openNoCompression(url)
268-
269-
final def createCachedNoCompression(filename: String): PositionedDataOutputStream =
270-
createNoCompression(filename)
271-
272-
def createCachedNoCompression(url: URL): PositionedDataOutputStream = createNoCompression(url)
273-
274-
final def writeCached(filename: String)(writer: PositionedDataOutputStream => Unit) =
275-
writePDOS(filename)(writer)
276-
277-
def writeCached(url: URL)(writer: PositionedDataOutputStream => Unit) = writePDOS(url)(writer)
278-
279-
def getCodecFromExtension(extension: String, gzAsBGZ: Boolean = false): CompressionCodec = {
262+
def getCodecFromExtension(extension: String, gzAsBGZ: Boolean = false): CompressionCodec =
280263
extension match {
281264
case ".gz" =>
282265
if (gzAsBGZ)
@@ -290,7 +273,6 @@ trait FS extends Serializable with Logging {
290273
case _ =>
291274
null
292275
}
293-
}
294276

295277
def getCodecFromPath(path: String, gzAsBGZ: Boolean = false): CompressionCodec =
296278
getCodecFromExtension(getExtension(path), gzAsBGZ)
@@ -316,11 +298,6 @@ trait FS extends Serializable with Logging {
316298
throw new AssertionError("unreachable")
317299
}
318300

319-
def stripCodecExtension(path: String): String = {
320-
val ext = getCodecExtension(path)
321-
path.dropRight(ext.length)
322-
}
323-
324301
def getCodecExtension(path: String): String = {
325302
val ext = getExtension(path)
326303
if (ext == ".gz" || ext == ".bgz" || ext == ".tbi")
@@ -334,12 +311,6 @@ trait FS extends Serializable with Logging {
334311

335312
def openNoCompression(url: URL): SeekableDataInputStream
336313

337-
final def readNoCompression(filename: String): Array[Byte] = readNoCompression(parseUrl(filename))
338-
339-
def readNoCompression(url: URL): Array[Byte] = retryTransientErrors {
340-
using(openNoCompression(url))(is => IOUtils.toByteArray(is))
341-
}
342-
343314
final def createNoCompression(filename: String): PositionedDataOutputStream =
344315
createNoCompression(parseUrl(filename))
345316

@@ -399,7 +370,7 @@ trait FS extends Serializable with Logging {
399370
}
400371
}
401372
} else
402-
f(urlAddPathComponent(prefix, c), null, i + 1)
373+
f(prefix / c, null, i + 1)
403374
}
404375
}
405376

@@ -499,13 +470,6 @@ trait FS extends Serializable with Logging {
499470

500471
def makeQualified(path: String): String
501472

502-
final def deleteOnExit(filename: String): Unit = deleteOnExit(parseUrl(filename))
503-
504-
def deleteOnExit(url: URL): Unit =
505-
Runtime.getRuntime.addShutdownHook(
506-
new Thread(() => delete(url, recursive = false))
507-
)
508-
509473
final def open(filename: String, codec: CompressionCodec): InputStream =
510474
open(parseUrl(filename), codec)
511475

@@ -515,7 +479,6 @@ trait FS extends Serializable with Logging {
515479
codec.makeInputStream(is)
516480
else
517481
is
518-
519482
}
520483

521484
final def open(filename: String): InputStream = open(parseUrl(filename))
@@ -527,14 +490,14 @@ trait FS extends Serializable with Logging {
527490
open(parseUrl(filename), gzAsBGZ)
528491

529492
def open(url: URL, gzAsBGZ: Boolean): InputStream =
530-
open(url, getCodecFromPath(url.getPath, gzAsBGZ))
493+
open(url, getCodecFromPath(url.path, gzAsBGZ))
531494

532495
final def create(filename: String): OutputStream = create(parseUrl(filename))
533496

534497
def create(url: URL): OutputStream = {
535498
val os = createNoCompression(url)
536499

537-
val codec = getCodecFromPath(url.getPath, gzAsBGZ = false)
500+
val codec = getCodecFromPath(url.path, gzAsBGZ = false)
538501
if (codec != null)
539502
codec.makeOutputStream(os)
540503
else

hail/hail/src/is/hail/io/fs/GoogleStorageFS.scala

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,17 @@ import com.google.cloud.storage.Storage.{
2121
BlobGetOption, BlobListOption, BlobSourceOption, BlobWriteOption,
2222
}
2323

24-
case class GoogleStorageFSURL(bucket: String, path: String) extends FSURL {
25-
def addPathComponent(c: String): GoogleStorageFSURL =
26-
if (path == "")
27-
withPath(c)
28-
else
29-
withPath(s"$path/$c")
30-
31-
def withPath(newPath: String): GoogleStorageFSURL = GoogleStorageFSURL(bucket, newPath)
32-
def fromString(s: String): GoogleStorageFSURL = GoogleStorageFS.parseUrl(s)
33-
34-
def getPath: String = path
35-
36-
override def toString(): String = if (path.isEmpty) {
37-
s"gs://$bucket"
38-
} else {
39-
s"gs://$bucket/$path"
40-
}
24+
class GoogleStorageFSURL(val bucket: String, override val path: String)
25+
extends FSURL[GoogleStorageFSURL] {
26+
override def /(c: String): GoogleStorageFSURL =
27+
new GoogleStorageFSURL(bucket, if (path == "") c else s"$path/$c")
28+
29+
def withPath(p: String): GoogleStorageFSURL =
30+
new GoogleStorageFSURL(bucket, p)
31+
32+
override def toString: String =
33+
if (path.isEmpty) s"gs://$bucket"
34+
else s"gs://$bucket/$path"
4135
}
4236

4337
object GoogleStorageFS {
@@ -58,7 +52,7 @@ object GoogleStorageFS {
5852
val bucket = m.group(1)
5953
val maybePath = m.group(2)
6054
val path = Paths.get(if (maybePath == null) "" else maybePath.stripPrefix("/"))
61-
GoogleStorageFSURL(bucket, path.normalize().toString)
55+
new GoogleStorageFSURL(bucket, path.normalize().toString)
6256
case None => throw new IllegalArgumentException(
6357
s"GCS URI must be of the form: gs://bucket/path, found $filename"
6458
)
@@ -150,8 +144,6 @@ class GoogleStorageFS(
150144
override def validUrl(filename: String): Boolean =
151145
filename.startsWith("gs://")
152146

153-
def urlAddPathComponent(url: URL, component: String): URL = url.addPathComponent(component)
154-
155147
def getConfiguration(): Option[RequesterPaysConfig] =
156148
requesterPaysConfig
157149

@@ -256,10 +248,6 @@ class GoogleStorageFS(
256248
new WrappedSeekableDataInputStream(is)
257249
}
258250

259-
override def readNoCompression(url: URL): Array[Byte] = retryTransientErrors {
260-
storage.readAllBytes(url.bucket, url.path)
261-
}
262-
263251
def createNoCompression(url: URL): PositionedDataOutputStream = retryTransientErrors {
264252
log.info(f"createNoCompression: $url")
265253

@@ -482,7 +470,7 @@ class GoogleStorageFS(
482470
}
483471

484472
override def fileListEntry(url: URL): FileListEntry = {
485-
if (url.getPath == "") {
473+
if (url.path == "") {
486474
return GoogleStorageFileListEntry.dir(url)
487475
}
488476

@@ -502,17 +490,15 @@ class GoogleStorageFS(
502490
fileListEntryFromIterator(url, it)
503491
}
504492

505-
override def eTag(url: URL): Some[String] = {
506-
val GoogleStorageFSURL(bucket, blob) = url
493+
override def eTag(url: URL): Some[String] =
507494
handleRequesterPays(
508495
(options: Seq[BlobGetOption]) =>
509496
retryTransientErrors {
510-
Some(storage.get(bucket, blob, options: _*).getEtag)
497+
Some(storage.get(url.bucket, url.path, options: _*).getEtag)
511498
},
512499
BlobGetOption.userProject,
513-
bucket,
500+
url.bucket,
514501
)
515-
}
516502

517503
def makeQualified(filename: String): String = {
518504
if (!filename.startsWith("gs://"))

hail/hail/src/is/hail/io/fs/HadoopFS.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import scala.util.Try
88
import java.io._
99

1010
import org.apache.hadoop
11-
import org.apache.hadoop.fs.{EtagSource, FSDataInputStream, FSDataOutputStream}
11+
import org.apache.hadoop.fs.{EtagSource, FSDataInputStream, FSDataOutputStream, Path}
1212

1313
class HadoopFileListEntry(fs: hadoop.fs.FileStatus) extends FileListEntry {
1414
val normalizedPath = fs.getPath
@@ -72,27 +72,27 @@ object HadoopFS {
7272
}
7373
}
7474

75-
case class HadoopFSURL(path: String, conf: SerializableHadoopConfiguration) extends FSURL {
76-
private[this] val unqualifiedHadoopPath = new hadoop.fs.Path(path)
77-
val hadoopFs = unqualifiedHadoopPath.getFileSystem(conf.value)
78-
val hadoopPath = hadoopFs.makeQualified(unqualifiedHadoopPath)
75+
class HadoopFSURL(unqualified: Path, conf: SerializableHadoopConfiguration)
76+
extends FSURL[HadoopFSURL] {
77+
val hadoopFs = unqualified.getFileSystem(conf.value)
78+
val hadoopPath: Path = hadoopFs.makeQualified(unqualified)
7979

80-
def addPathComponent(c: String): HadoopFSURL = HadoopFSURL(s"${hadoopPath.toString}/$c", conf)
81-
def getPath: String = hadoopPath.toString
82-
def fromString(s: String): HadoopFSURL = HadoopFSURL(s, conf)
83-
override def toString(): String = hadoopPath.toString
80+
override def /(c: String): HadoopFSURL =
81+
new HadoopFSURL(new Path(unqualified, c), conf)
82+
83+
override def path: String = hadoopPath.toString
84+
override def toString: String = hadoopPath.toString
8485
}
8586

8687
class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends FS {
8788
type URL = HadoopFSURL
8889

89-
override def parseUrl(filename: String): URL = HadoopFSURL(filename, conf)
90+
override def parseUrl(filename: String): URL =
91+
new HadoopFSURL(new Path(filename), conf)
9092

9193
override def validUrl(filename: String): Boolean =
9294
Try(getFileSystem(filename)).isSuccess
9395

94-
def urlAddPathComponent(url: URL, component: String): URL = url.addPathComponent(component)
95-
9696
def getConfiguration(): SerializableHadoopConfiguration = conf
9797

9898
def setConfiguration(_conf: Any): Unit =
@@ -172,7 +172,7 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends
172172
override def fileStatus(url: URL): FileStatus = {
173173
val fle = fileListEntry(url)
174174
if (fle.isDirectory) {
175-
throw new FileNotFoundException(url.getPath)
175+
throw new FileNotFoundException(url.path)
176176
}
177177
fle
178178
}
@@ -192,9 +192,6 @@ class HadoopFS(private[this] var conf: SerializableHadoopConfiguration) extends
192192
pathFS.makeQualified(ppath).toString
193193
}
194194

195-
override def deleteOnExit(url: URL): Unit =
196-
url.hadoopFs.deleteOnExit(url.hadoopPath): Unit
197-
198195
def supportsScheme(scheme: String): Boolean =
199196
(scheme == "") || {
200197
try {

0 commit comments

Comments
 (0)