Skip to content

Commit

Permalink
Update parquet example
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Feb 9, 2024
1 parent 433f709 commit b316448
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ object ParquetExample {
* These case classes represent both full and projected field mappings from the [[Account]] Avro
* record.
*/
case class AccountFull(id: Int, `type`: String, name: Option[String], amount: Double)
case class AccountFull(
id: Int,
`type`: String,
name: Option[String],
amount: Double,
accountStatus: Option[AccountStatus]
)
case class AccountProjection(id: Int, name: Option[String])

/**
Expand Down Expand Up @@ -108,21 +114,19 @@ object ParquetExample {

private def avroSpecificIn(sc: ScioContext, args: Args): ClosedTap[String] = {
// Macros for generating column projections and row predicates
val projection = Projection[Account](_.getId, _.getName, _.getAmount)
// account_status is the only field with default value that can be left out the projection
val projection = Projection[Account](_.getId, _.getType, _.getName, _.getAmount)
val predicate = Predicate[Account](x => x.getAmount > 0)

sc.parquetAvroFile[Account](args("input"), projection, predicate)
// The result Account records are not complete Avro objects. Only the projected columns are present while the rest are null.
// These objects may fail serialization and it’s recommended that you map them out to tuples or case classes right after reading.
.map(x => AccountProjection(x.getId, Some(x.getName.toString)))
.saveAsTextFile(args("output"))
}

private def avroGenericIn(sc: ScioContext, args: Args): ClosedTap[String] = {
val schema = Account.getClassSchema
implicit val genericRecordCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema)

val parquetIn = sc.parquetAvroFile[GenericRecord](args("input"), schema)
val parquetIn = sc.parquetAvroGenericRecordFile(args("input"), schema)

// Catches a specific bug with encoding GenericRecords read by parquet-avro
parquetIn
Expand All @@ -146,12 +150,19 @@ object ParquetExample {
// but close to `parquet.block.size`, i.e. 1 GiB. This guarantees that each file contains 1 row group only and reduces seeks.
.saveAsParquetAvroFile(args("output"), numShards = 1, conf = fineTunedParquetWriterConfig)

private[extra] def toScalaFull(account: Account): AccountFull =
AccountFull(
account.getId,
account.getType.toString,
Some(account.getName.toString),
account.getAmount,
Some(account.getAccountStatus)
)

private def typedOut(sc: ScioContext, args: Args): ClosedTap[AccountFull] =
sc.parallelize(fakeData)
.map(x => AccountFull(x.getId, x.getType.toString, Some(x.getName.toString), x.getAmount))
.saveAsTypedParquetFile(
args("output")
)
.map(toScalaFull)
.saveAsTypedParquetFile(args("output"))

private[extra] def toExample(account: Account): Example = {
val amount = Feature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ParquetExampleTest extends PipelineSpec {

"ParquetExample" should "work for SpecificRecord input" in {
val expected = ParquetExample.fakeData
.map(x => AccountProjection(x.getId, Some(x.getName.toString)))
// set default value on field outside projection
.map(x => Account.newBuilder(x).setAccountStatus(null).build())
.map(_.toString)

JobTest[com.spotify.scio.examples.extra.ParquetExample.type]
Expand Down Expand Up @@ -79,8 +80,7 @@ class ParquetExampleTest extends PipelineSpec {
}

it should "work for typed output" in {
val expected = ParquetExample.fakeData
.map(a => AccountFull(a.getId, a.getType.toString, Some(a.getName.toString), a.getAmount))
val expected = ParquetExample.fakeData.map(ParquetExample.toScalaFull)

JobTest[com.spotify.scio.examples.extra.ParquetExample.type]
.args("--output=out.parquet", "--method=typedOut")
Expand Down

0 comments on commit b316448

Please sign in to comment.