@@ -2,7 +2,7 @@ package is.hail.expr.ir
22
33import is .hail .backend .spark .SparkBackend
44import is .hail .io .compress .BGzipInputStream
5- import is .hail .io .fs .{BGZipCompressionCodec , FS , FileStatus , Positioned , PositionedInputStream }
5+ import is .hail .io .fs .{getCodecFromPath , BGZipCompressionCodec , FS , FileStatus , Positioned , PositionedInputStream }
66import is .hail .io .tabix .{TabixLineIterator , TabixReader }
77import is .hail .types .virtual .{TBoolean , TInt32 , TInt64 , TString , TStruct , Type }
88import is .hail .utils ._
@@ -43,30 +43,35 @@ object GenericLines {
4343 private var splitCompressed = false
4444 private val is : PositionedInputStream = {
4545 val rawIS = fs.openNoCompression(file)
46- val codec = fs.getCodecFromPath(file, gzAsBGZ)
47- if (codec == null ) {
48- assert(split || filePerPartition)
49- rawIS.seek(start)
50- rawIS
51- } else if (codec == BGZipCompressionCodec ) {
52- assert(split || filePerPartition)
53- splitCompressed = true
54- val bgzIS =
55- new BGzipInputStream (rawIS, start, end, SplittableCompressionCodec .READ_MODE .BYBLOCK )
56- new ProxyInputStream (bgzIS) with Positioned {
57- def getPosition : Long = bgzIS.getVirtualOffset
58- }
59- } else {
60- assert(! split || filePerPartition)
46+ getCodecFromPath(file, gzAsBGZ) match {
47+ case None =>
48+ assert(split || filePerPartition)
49+ rawIS.seek(start)
50+ rawIS
51+ case Some (BGZipCompressionCodec ) =>
52+ assert(split || filePerPartition)
53+ splitCompressed = true
54+ val bgzIS =
55+ new BGzipInputStream (
56+ rawIS,
57+ start,
58+ end,
59+ SplittableCompressionCodec .READ_MODE .BYBLOCK ,
60+ )
61+ new ProxyInputStream (bgzIS) with Positioned {
62+ def getPosition : Long = bgzIS.getVirtualOffset
63+ }
64+ case Some (codec) =>
65+ assert(! split || filePerPartition)
6166
62- val delegate =
63- new BoundedInputStream .Builder ()
64- .setInputStream(codec.makeInputStream(rawIS))
65- .get()
67+ val delegate =
68+ new BoundedInputStream .Builder ()
69+ .setInputStream(codec.makeInputStream(rawIS))
70+ .get()
6671
67- new ProxyInputStream (delegate) with Positioned {
68- override def getPosition : Long = delegate.getCount
69- }
72+ new ProxyInputStream (delegate) with Positioned {
73+ override def getPosition : Long = delegate.getCount
74+ }
7075 }
7176 }
7277
@@ -290,9 +295,9 @@ object GenericLines {
290295
291296 val contexts = fileStatuses.flatMap { case (fileListEntry, fileNum) =>
292297 val size = fileListEntry.getLen
293- val codec = fs. getCodecFromPath(fileListEntry.getPath, gzAsBGZ)
298+ val codec = getCodecFromPath(fileListEntry.getPath, gzAsBGZ)
294299
295- val splittable = codec == null || codec == BGZipCompressionCodec
300+ val splittable = codec.isEmpty || codec.contains( BGZipCompressionCodec )
296301 if (splittable && ! filePerPartition) {
297302 var fileNParts = ((totalPartitions.toDouble * size) / totalSize + 0.5 ).toInt
298303 if (fileNParts == 0 )
@@ -304,7 +309,7 @@ object GenericLines {
304309 .map { i =>
305310 val start = partScan(i)
306311 var end = partScan(i + 1 )
307- if (codec != null )
312+ if (codec.isDefined )
308313 end = makeVirtualOffset(end, 0 )
309314 Row (i, fileNum, fileListEntry.getPath, start, end, true )
310315 }
0 commit comments