Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions .github/workflows/iceberg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Borrowed from #1541 as its not merged in yet.

# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: Iceberg

on:
workflow_dispatch:
push:
branches:
- master
- branch-*
pull_request:
branches:
- master
- branch-*

concurrency:
group: iceberg-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
test-flink:
name: Test Iceberg ${{ matrix.iceberg }} javaVersion ${{ matrix.javaver }} scalaVersion ${{ matrix.scalaver }}
runs-on: ubuntu-24.04
strategy:
fail-fast: false
matrix:
iceberg: [ "1.9" ]
javaver: [ "11", "17"]
scalaver: [ "2.12" ]
module: [ "thirdparty/auron-iceberg" ]
sparkver: [ "spark-3.4", "spark-3.5" ]


steps:
- name: Checkout Auron
uses: actions/checkout@v4

- name: Setup Java and Maven cache
uses: actions/setup-java@v4
with:
distribution: 'adopt-hotspot'
java-version: ${{ matrix.javaver }}
cache: 'maven'

- name: Test Iceberg Module
run: ./build/mvn -B test -X -pl ${{ matrix.module }} -am -Pscala-${{ matrix.scalaver }} -Piceberg-${{ matrix.iceberg }} -P${{ matrix.sparkver }} -Prelease

- name: Upload reports
if: failure()
uses: actions/upload-artifact@v4
with:
name: ${{ matrix.module }}-test-report
path: ${{ matrix.module }}/target/surefire-reports
5 changes: 5 additions & 0 deletions thirdparty/auron-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
<description>Apache Auron Iceberg ${icebergVersion} ${scalaVersion}</description>

<dependencies>
<dependency>
<groupId>org.apache.auron</groupId>
<artifactId>spark-extension-shims-spark_2.12</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-${shortSparkVersion}_${scalaVersion}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.spark.sql.auron.iceberg.IcebergConvertProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iceberg.spark.source

import java.nio.ByteBuffer

import org.apache.iceberg.{FileScanTask, Table}
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

class IcebergPartitionValueConverter(table: Table) {

private case class FieldAccessor(javaClass: Class[_], convert: Any => Any)

// Use Iceberg's partition spec → Spark schema as the single source of truth
private val partitionType = table.spec().partitionType()
private val sparkPartitionSchema: StructType =
SparkSchemaUtil.convert(partitionType.asSchema())

// Fail fast if something is off
require(
partitionType.fields().size() == sparkPartitionSchema.fields.length,
s"Mismatch between Iceberg partition fields (${partitionType.fields().size()}) " +
s"and Spark partition schema (${sparkPartitionSchema.fields.length})")

private val fieldAccessors: Array[FieldAccessor] = {
val sFields = sparkPartitionSchema.fields

def javaClassFor(dt: DataType): Class[_] = dt match {
case BooleanType => classOf[java.lang.Boolean]
case IntegerType | DateType => classOf[java.lang.Integer]
case LongType | TimestampType => classOf[java.lang.Long]
case FloatType => classOf[java.lang.Float]
case DoubleType => classOf[java.lang.Double]
case StringType => classOf[CharSequence]
case BinaryType => classOf[java.nio.ByteBuffer]
case _: DecimalType => classOf[java.math.BigDecimal]
// Partition spec should only use primitives; anything else is a bug
case other =>
throw new UnsupportedOperationException(s"Unsupported Spark partition type: $other")
}

def converterFor(dt: DataType): Any => Any = dt match {
case StringType =>
(raw: Any) =>
if (raw == null) null
else
raw match {
case cs: CharSequence => UTF8String.fromString(cs.toString)
case other => UTF8String.fromString(other.toString)
}

case IntegerType | BooleanType | LongType | FloatType | DoubleType =>
(raw: Any) => raw // already Catalyst-friendly primitives

case DateType =>
(raw: Any) =>
if (raw == null) null
else raw.asInstanceOf[Integer].intValue() // days

case TimestampType =>
(raw: Any) =>
if (raw == null) null
else raw.asInstanceOf[Long] // micros

case BinaryType =>
(raw: Any) =>
if (raw == null) null
else
raw match {
case bb: ByteBuffer =>
val dup = bb.duplicate()
val arr = new Array[Byte](dup.remaining())
dup.get(arr)
arr
case arr: Array[Byte] => arr
case other =>
throw new IllegalArgumentException(
s"Unexpected binary partition value type: ${other.getClass}")
}

case d: DecimalType =>
(raw: Any) =>
if (raw == null) null
else {
val bd: java.math.BigDecimal = raw match {
case bd: java.math.BigDecimal => bd
case s: String => new java.math.BigDecimal(s)
case other => new java.math.BigDecimal(other.toString)
}
Decimal(bd, d.precision, d.scale)
}

case other =>
(_: Any) =>
throw new UnsupportedOperationException(
s"Unsupported Spark partition type in converter: $other")
}

sFields.map { field =>
val dt = field.dataType
FieldAccessor(javaClass = javaClassFor(dt), convert = converterFor(dt))
}
}

def convert(task: FileScanTask): InternalRow = {
val partitionData = task.file().partition()
if (partitionData == null || fieldAccessors.isEmpty) {
InternalRow.empty
} else {
val values = fieldAccessors.indices.map { i =>
val accessor = fieldAccessors(i)
val jcls = accessor.javaClass.asInstanceOf[Class[Any]]
val raw = partitionData.get(i, jcls)
accessor.convert(raw)
}
InternalRow.fromSeq(values)
}
}

def schema: StructType = sparkPartitionSchema
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iceberg.spark.source

import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter

import org.apache.iceberg._
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.types.StructType

object IcebergSourceUtil {

def isIcebergScan(scan: Scan): Boolean = {
scan match {
case _: org.apache.iceberg.spark.source.SparkBatchQueryScan => true
case _ => false
}
}

def getScanAsSparkBatchQueryScan(scan: Scan): SparkBatchQueryScan = {
scan match {
case s: SparkBatchQueryScan => s
case _ => throw new IllegalArgumentException("Scan is not a SparkBatchQueryScan")
}
}

def getTableFromScan(scan: Scan): Table = {
getScanAsSparkBatchQueryScan(scan).table()
}

def getInputPartitionAsSparkInputPartition(
inputPartition: InputPartition): SparkInputPartition = {
inputPartition match {
case s: SparkInputPartition => s
case _ => throw new IllegalArgumentException("InputPartition is not a SparkInputPartition")
}
}

def getFileScanTasks(tasks: List[ScanTask]): List[FileScanTask] = tasks match {
case t if t.forall(_.isFileScanTask) =>
t.map(_.asFileScanTask())
case t if t.forall(_.isInstanceOf[CombinedScanTask]) =>
t.iterator.flatMap(_.asCombinedScanTask().tasks().asScala).toList
case _ =>
throw new UnsupportedOperationException("Unsupported iceberg scan task type")
}

// Access Spark private API from within the Iceberg package to avoid accessibility errors
def getReadSchema(scan: Scan): StructType = {
getScanAsSparkBatchQueryScan(scan).readSchema
}

def planInputPartitions(scan: Scan): Array[InputPartition] = {
getScanAsSparkBatchQueryScan(scan).toBatch.planInputPartitions()
}

def getFileScanTasksFromInputPartition(inputPartition: InputPartition): Seq[FileScanTask] = {
val sip = getInputPartitionAsSparkInputPartition(inputPartition)
val tasks = sip.taskGroup[ScanTask]().tasks().asScala
getFileScanTasks(tasks.toList)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.auron.iceberg

import org.apache.iceberg.spark.source.IcebergSourceUtil
import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider}
import org.apache.spark.sql.auron.util.AuronLogUtils
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.auron.plan.NativeIcebergBatchScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

class IcebergConvertProvider extends AuronConvertProvider {

override def isEnabled: Boolean = {
AuronConverters.getBooleanConf("spark.auron.enable.iceberg.scan", defaultValue = false)
}

override def isSupported(exec: SparkPlan): Boolean = {
exec match {
case e: BatchScanExec =>
IcebergSourceUtil.isIcebergScan(e.scan)
case _ => false
}
}

override def convert(exec: SparkPlan): SparkPlan = {
exec match {
case batchScanExec: BatchScanExec =>
convertIcebergBatchScanExec(batchScanExec)
case _ => exec
}
}

private def convertIcebergBatchScanExec(batchScanExec: BatchScanExec): SparkPlan = {
// TODO: Validate table mode (COW support initially, MOR later)
val scan = IcebergSourceUtil.getScanAsSparkBatchQueryScan(batchScanExec.scan)
val table = IcebergSourceUtil.getTableFromScan(scan)

// Log conversion details
AuronLogUtils.logDebugPlanConversion(
batchScanExec,
Seq("scan" -> scan.getClass, "table" -> table.getClass, "output" -> batchScanExec.output))
AuronConverters.addRenameColumnsExec(NativeIcebergBatchScanExec(batchScanExec))
}
}
Loading
Loading