diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 465973cabe587..575571b6b8f42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.connector.read.streaming import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxBytes, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{DataSource, FileIndexOptions, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.ArrayImplicits._ @@ -73,8 +73,9 @@ class FileStreamSource( private val optionsForInnerDataSource = sourceOptions.optionMapWithoutPath ++ { val pathOption = - if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) { - Map("basePath" -> path) + if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path") && + !CaseInsensitiveMap(options).contains(FileIndexOptions.BASE_PATH_PARAM)) { + Map(FileIndexOptions.BASE_PATH_PARAM -> path) } else { Map() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index a753da116924d..becaade94e009 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2619,6 +2619,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + test("SPARK-50603: respect user-provided basePath without globbing") { + withTempDirs { case (dir, tmp) => + val partitionFooSubDir = new File(dir, "partition=foo") + partitionFooSubDir.mkdir() + + val schema = new StructType().add("value", StringType).add("partition", StringType) + val fileStream = createFileStream("json", s"${dir.getCanonicalPath}/partition=foo", + Some(schema), Map("basePath" -> dir.getCanonicalPath())) + testStream(fileStream)( + // Add data to partition dir + AddTextFileData("{'value': 'abc'}", partitionFooSubDir, tmp), + CheckAnswer(("abc", "foo")), + + // Add more data to same partition=foo sub dir + AddTextFileData("{'value': 'def'}", partitionFooSubDir, tmp), + CheckAnswer(("abc", "foo"), ("def", "foo")) + ) + } + } } @SlowSQLTest