Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV #29516

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-1.2
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ stream/2.9.6//stream-2.9.6.jar
stringtemplate/3.2.1//stringtemplate-3.2.1.jar
super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
xml-apis/1.4.01//xml-apis-1.4.01.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ stream/2.9.6//stream-2.9.6.jar
super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
velocity/1.5//velocity-1.5.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
token-provider/1.0.1//token-provider-1.0.1.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
velocity/1.5//velocity-1.5.jar
woodstox-core/5.0.3//woodstox-core-5.0.3.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2348,7 +2348,7 @@
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.8.3</version>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ object CSVExprUtils {
* This is currently being used in CSV reading path and CSV schema inference.
*/
def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
iter.filter { line =>
line.trim.nonEmpty && !line.startsWith(options.comment.toString)
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.filter { line =>
line.trim.nonEmpty && !line.startsWith(commentPrefix)
}
} else {
iter.filter(_.trim.nonEmpty)
}
}

def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.dropWhile { line =>
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
line.trim.isEmpty || line.startsWith(commentPrefix)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's correct to not trim the string that's checked to see if it starts with a comment, which is a slightly separate issue. \u0000 can't be used as a comment char, but other non-printable chars could.

}
} else {
iter.dropWhile(_.trim.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ class CSVOptions(
format.setQuote(quote)
format.setQuoteEscape(escape)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
if (isCommentSet) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably we should rework the handling of 'optional' configs to not use this default of \u0000 to mean "none" but I avoided that here. One consequence is that you cannot use \u0000 as a comment char right now.

Copy link

@ghost ghost Aug 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we will change that way then it might impact existing users for which \u0000 is a comment character by default. So I would say a separate optional config is a better solution. What I am saying here is that we need to wait for univocity 3.0.0 to be available where the new changes will be available then we can add spark changes in a proper manner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, but, this has never been a valid comment character, and the flip side is the bug you describe: it's always a comment character. I think it's reasonable to fix as a bug. I don't think we need yet another config, as I think it would be quite obscure to use this non-printing control code for comments in a CSV file.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but once the changes will be done then \u0000 won't be treated as comment character. It will resolve this bug. But then default comment character will be # as in univocity this is the default comment character. So if my data row starts with # then will the row be processed now. If not then it will break most of the existing jobs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I'll fix that in the next commit - we need to set the comment char to whatever Spark is using no matter what. However it looks like we are going to need your univocity fix to really fix this. Looks like that was just released in 2.9.0: uniVocity/univocity-parsers@f392311

let me try that.

@dongjoon-hyun it is a correctness issue but I wouldn't hold up a release for it. We should address it but doesn't absolutely have to happen in 2.4.7 or 3.0.1. It's not a regression.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, if you are fine I can also raise a PR for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think this is rather a bug fix. If comment is not set, it shouldn't assume anything else is a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's also what we documented, see also DataFrameReader.csv.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, yeah, so we have to use the new method in univocity 2.9.0 to turn off its comment handling if its unset in Spark (= \u0000)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, this stanza is for writer settings. There is no setCommentProcessingEnabled for writers in univocity. Comments aren't generated. In fact the comment setting doesn't matter, really?

format.setComment(comment)
}
lineSeparatorInWrite.foreach(format.setLineSeparator)

writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
Expand All @@ -242,7 +244,11 @@ class CSVOptions(
format.setQuoteEscape(escape)
lineSeparator.foreach(format.setLineSeparator)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
if (isCommentSet) {
format.setComment(comment)
} else {
settings.setCommentProcessingEnabled(false)
}

settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1902,25 +1902,26 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa

test("SPARK-25387: bad input should not cause NPE") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test was wrong in 2 ways. First it relied on, actually, ignoring lines starting with \u0000, which is the very bug we're fixing. You can see below it's asserting there is no result at all, when there should be some result.


checkAnswer(spark.read.schema(schema).csv(input), Row(null))
checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
assert(spark.read.csv(input).collect().toSet == Set(Row()))
assert(spark.read.schema(schema).csv(input).collect().toSet == Set(Row(null)))
}

test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") {
val schema = StructType(
StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil)
val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))

checkAnswer(
spark.read
.option("columnNameOfCorruptRecord", "_corrupt_record")
.schema(schema)
.csv(input),
Row(null, null))
assert(spark.read.csv(input).collect().toSet == Set(Row()))
Row(null, "\u0001\u0000\u0001234"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other problem I think is that this was asserting there is no corrupt record -- no result at all -- when I think clearly the test should result in a single row with a corrupt record.

assert(spark.read.schema(schema).csv(input).collect().toSet ==
Set(Row(null, "\u0001\u0000\u0001234")))
}

test("field names of inferred schema shouldn't compare to the first row") {
Expand Down Expand Up @@ -2366,6 +2367,17 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
}
}

test("SPARK-32614: don't treat rows starting with null char as comment") {
withTempPath { path =>
Seq("\u0000foo", "bar", "baz").toDS.write.text(path.getCanonicalPath)
val df = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "true")
.load(path.getCanonicalPath)
assert(df.count() == 3)
}
}

test("case sensitivity of filters references") {
Seq(true, false).foreach { filterPushdown =>
withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) {
Expand Down