diff --git a/conventions/src/main/kotlin/actionbase/dependencies/Dependencies.kt b/conventions/src/main/kotlin/actionbase/dependencies/Dependencies.kt index d6271d41..0eb7539f 100644 --- a/conventions/src/main/kotlin/actionbase/dependencies/Dependencies.kt +++ b/conventions/src/main/kotlin/actionbase/dependencies/Dependencies.kt @@ -121,6 +121,7 @@ object Dependencies { const val CORE = "org.apache.spark:spark-core_${Versions.SCALA_BINARY}:${Versions.SPARK}" const val SQL = "org.apache.spark:spark-sql_${Versions.SCALA_BINARY}:${Versions.SPARK}" const val STREAMING = "org.apache.spark:spark-streaming_${Versions.SCALA_BINARY}:${Versions.SPARK}" + const val MLLIB = "org.apache.spark:spark-mllib_${Versions.SCALA_BINARY}:${Versions.SPARK}" } object Spring { diff --git a/pipeline/build.gradle.kts b/pipeline/build.gradle.kts index 25f1bb0f..01673b0a 100644 --- a/pipeline/build.gradle.kts +++ b/pipeline/build.gradle.kts @@ -17,6 +17,10 @@ dependencies { implementation(project(":codec-java")) implementation(Dependencies.Jackson.JACKSON_YAML) + + // Spark ML algorithms (ALS, etc.) used by built-in Transform steps. compileOnly because spark-submit provides it. + compileOnly(Dependencies.Spark.MLLIB) + testImplementation(Dependencies.Spark.MLLIB) } publishing { diff --git a/pipeline/conf/als-i2i-experiment.yaml b/pipeline/conf/als-i2i-experiment.yaml new file mode 100644 index 00000000..a24b7980 --- /dev/null +++ b/pipeline/conf/als-i2i-experiment.yaml @@ -0,0 +1,52 @@ +# ALS-based item-to-item similarity experiment. +# +# Reads V2 edge dumps stored as line-delimited JSON, treats each edge as an implicit-feedback signal +# (source=user, target=item, rating=1.0), trains an ALS model, and writes the top-20 most similar +# items per item by cosine similarity over the trained item factors. +# +# Run via `StepsRunnerJob` (or copy `steps:` block under a workflow `jobs:` definition): +# spark-submit --class com.kakao.actionbase.pipeline.jobs.StepsRunnerJob ... \ +# --steps=@als-i2i-experiment.yaml +# +# Tune `rank`, `maxIter`, `regParam`, and `k` per dataset. `seed` is fixed for reproducibility. + +steps: + - step: FileSource + args: + path: "s3a://ab-edges/v2/dump/latest/*.json" + format: "json" + + # Prep: cast source/target to int (ALS only takes int keys) and synthesize a rating column. + # Implicit feedback ⇒ rating is a constant; the ALS confidence comes from edge presence × alpha. + - step: SqlMerge + args: + query: | + SELECT + CAST(source AS INT) AS user, + CAST(target AS INT) AS item, + 1.0 AS rating + FROM _0 + WHERE source IS NOT NULL AND target IS NOT NULL + + - step: AlsFlow + args: + userCol: "user" + itemCol: "item" + ratingCol: "rating" + rank: 32 + maxIter: 15 + regParam: 0.05 + implicitPrefs: true + alpha: 1.0 + coldStartStrategy: "drop" + seed: 42 + + - step: TopKSimilarityFlow + args: + k: 20 + + - step: FileSink + args: + path: "s3a://ab-edges/v2/i2i/latest" + format: "parquet" + mode: "overwrite" diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/steps/transform/AlsFlow.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/steps/transform/AlsFlow.scala new file mode 100644 index 00000000..19ea34e2 --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/steps/transform/AlsFlow.scala @@ -0,0 +1,49 @@ +package com.kakao.actionbase.pipeline.steps.transform + +import com.kakao.actionbase.pipeline.dsl.Flow +import org.apache.spark.ml.recommendation.ALS +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** Fits Spark MLlib's ALS to a `(userCol, itemCol, ratingCol)` triple table and emits the trained model's `itemFactors` + * — `(id: int, features: array)` — for downstream i2i similarity. + * + * Defaults target implicit-feedback workloads (action presence ⇒ confidence 1.0): `implicitPrefs=true`, + * `coldStartStrategy="drop"`. The two ID columns must already be `int` (or castable); upstream is responsible for + * indexing arbitrary IDs (see the example workflow's `SqlMerge` prep step). + * + * Hyperparameters mirror `org.apache.spark.ml.recommendation.ALS`'s setters; defaults match Spark's defaults except + * `implicitPrefs` (flipped to `true`) and `coldStartStrategy` (`"drop"` instead of `"nan"` so downstream Sinks don't + * receive NaN rows from unseen IDs). + * + * Emits item factors only (single output). To persist both item and user factors, swap for a future `AlsFactorSplit` + * (1→2 Split) once Split built-ins land. + */ +case class AlsFlow( + userCol: String = "user", + itemCol: String = "item", + ratingCol: String = "rating", + rank: Int = 10, + maxIter: Int = 10, + regParam: Double = 0.1, + implicitPrefs: Boolean = true, + alpha: Double = 1.0, + coldStartStrategy: String = "drop", + seed: Long = 42L +) extends Flow { + + override def apply(in: DataFrame)(implicit spark: SparkSession): DataFrame = { + val als = new ALS() + .setUserCol(userCol) + .setItemCol(itemCol) + .setRatingCol(ratingCol) + .setRank(rank) + .setMaxIter(maxIter) + .setRegParam(regParam) + .setImplicitPrefs(implicitPrefs) + .setAlpha(alpha) + .setColdStartStrategy(coldStartStrategy) + .setSeed(seed) + + als.fit(in).itemFactors + } +} diff --git a/pipeline/src/main/scala/com/kakao/actionbase/pipeline/steps/transform/TopKSimilarityFlow.scala b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/steps/transform/TopKSimilarityFlow.scala new file mode 100644 index 00000000..98695e1e --- /dev/null +++ b/pipeline/src/main/scala/com/kakao/actionbase/pipeline/steps/transform/TopKSimilarityFlow.scala @@ -0,0 +1,60 @@ +package com.kakao.actionbase.pipeline.steps.transform + +import com.kakao.actionbase.pipeline.dsl.Flow +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.{DataFrame, SparkSession} + +/** Given a `(idCol, featuresCol)` table of factor vectors, emits the top-`k` most similar items per item by cosine + * similarity. Output schema is `(item_id long, similar_item_id long, score double, rank int)`. + * + * Computed via a cross self-join + Window `row_number` — O(N²) item-pairs, fine for experiment-scale catalogues but + * not for production scale. For larger N, swap for an ANN index (e.g., `BucketedRandomProjectionLSH`) — but that + * changes the contract (approximate) and belongs in a separate Step. + */ +case class TopKSimilarityFlow( + k: Int = 20, + idCol: String = "id", + featuresCol: String = "features" +) extends Flow { + + override def apply(in: DataFrame)(implicit spark: SparkSession): DataFrame = { + require(k > 0, s"k must be positive, got $k") + val factors = in.select(col(idCol).as("id"), col(featuresCol).as("features")) + + // UDF: cosine similarity for two float feature arrays. Returns 0.0 when either vector is the zero vector to avoid + // division-by-zero polluting the output with NaN. + val cosine = udf { (a: Seq[Float], b: Seq[Float]) => + val n = math.min(a.size, b.size) + var dot, na, nb = 0.0 + var i = 0 + while (i < n) { + val ai = a(i).toDouble + val bi = b(i).toDouble + dot += ai * bi + na += ai * ai + nb += bi * bi + i += 1 + } + val denom = math.sqrt(na) * math.sqrt(nb) + if (denom == 0.0) 0.0 else dot / denom + } + + val pairs = factors + .alias("a") + .crossJoin(factors.alias("b")) + .where(col("a.id") =!= col("b.id")) + .select( + col("a.id").cast(LongType).as("item_id"), + col("b.id").cast(LongType).as("similar_item_id"), + cosine(col("a.features"), col("b.features")).as("score") + ) + + val topK = Window.partitionBy("item_id").orderBy(col("score").desc, col("similar_item_id").asc) + + pairs + .withColumn("rank", row_number().over(topK)) + .filter(col("rank") <= k) + } +} diff --git a/pipeline/src/test/scala/com/kakao/actionbase/pipeline/AlsI2iWorkflowTest.scala b/pipeline/src/test/scala/com/kakao/actionbase/pipeline/AlsI2iWorkflowTest.scala new file mode 100644 index 00000000..362ec7e4 --- /dev/null +++ b/pipeline/src/test/scala/com/kakao/actionbase/pipeline/AlsI2iWorkflowTest.scala @@ -0,0 +1,79 @@ +package com.kakao.actionbase.pipeline + +import com.kakao.actionbase.pipeline.runner.StepsBuilder +import com.kakao.actionbase.pipeline.workflow.StepSpec +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} + +/** End-to-end exercise of the Step DSL on the ALS i2i experiment: line-delimited edge JSON → SQL prep → ALS → + * Top-K cosine similarity → Parquet sink. Doubles as a documentation example: the steps below mirror the YAML in + * `pipeline/conf/als-i2i-experiment.yaml`. + */ +class AlsI2iWorkflowTest extends SparkTest { + + @Test + def runsFullWorkflowOnSyntheticEdges(): Unit = { + // Two clear interaction clusters so ALS recovers structure ALS deterministically: users {0,1} love items {100,101}, + // users {2,3} love items {200,201}. Top-1 i2i neighbor of 100 should be 101, of 200 should be 201. + val edgesPath = Files.createTempDirectory("als-edges-").resolve("edges.json").toString + val payload = + """{"source":0,"target":100,"version":1} + |{"source":0,"target":101,"version":2} + |{"source":1,"target":100,"version":3} + |{"source":1,"target":101,"version":4} + |{"source":2,"target":200,"version":5} + |{"source":2,"target":201,"version":6} + |{"source":3,"target":200,"version":7} + |{"source":3,"target":201,"version":8} + |""".stripMargin + Files.write(Paths.get(edgesPath), payload.getBytes(StandardCharsets.UTF_8)) + + val outPath = Files.createTempDirectory("als-i2i-out-").resolve("topk").toString + + val steps = Seq( + StepSpec( + "FileSource", + Map("path" -> edgesPath, "format" -> "json") + ), + StepSpec( + "SqlMerge", + Map("query" -> "SELECT CAST(source AS INT) AS user, CAST(target AS INT) AS item, 1.0 AS rating FROM _0") + ), + StepSpec( + "AlsFlow", + Map("rank" -> 4, "maxIter" -> 10, "regParam" -> 0.01, "seed" -> 42L) + ), + StepSpec( + "TopKSimilarityFlow", + Map("k" -> 2) + ), + StepSpec( + "FileSink", + Map("path" -> outPath, "format" -> "parquet", "mode" -> "overwrite") + ) + ) + + StepsBuilder.build(steps).run() + + val result = spark.read.parquet(outPath) + assertEquals( + Set("item_id", "similar_item_id", "score", "rank"), + result.columns.toSet + ) + + // Each item's top-1 neighbor should be its cluster sibling. + val top1 = result + .where("rank = 1") + .collect() + .map(r => r.getLong(0) -> r.getLong(1)) + .toMap + + assertEquals(101L, top1(100L), "100's top neighbor must be 101 (same cluster)") + assertEquals(100L, top1(101L), "101's top neighbor must be 100 (same cluster)") + assertEquals(201L, top1(200L), "200's top neighbor must be 201 (same cluster)") + assertEquals(200L, top1(201L), "201's top neighbor must be 200 (same cluster)") + } +} diff --git a/pipeline/src/test/scala/com/kakao/actionbase/pipeline/steps/transform/AlsFlowTest.scala b/pipeline/src/test/scala/com/kakao/actionbase/pipeline/steps/transform/AlsFlowTest.scala new file mode 100644 index 00000000..8952c3fe --- /dev/null +++ b/pipeline/src/test/scala/com/kakao/actionbase/pipeline/steps/transform/AlsFlowTest.scala @@ -0,0 +1,48 @@ +package com.kakao.actionbase.pipeline.steps.transform + +import com.kakao.actionbase.pipeline.SparkTest +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +class AlsFlowTest extends SparkTest { + + /** Builds a small, learnable interaction table: 4 users × 4 items where users {0,1} prefer items {0,1} and users + * {2,3} prefer items {2,3}. ALS should recover two distinct factor regions. + */ + private def interactions() = { + import spark.implicits._ + Seq( + (0, 0, 1.0f), (0, 1, 1.0f), + (1, 0, 1.0f), (1, 1, 1.0f), + (2, 2, 1.0f), (2, 3, 1.0f), + (3, 2, 1.0f), (3, 3, 1.0f) + ).toDF("user", "item", "rating") + } + + @Test + def fitsAndEmitsItemFactors(): Unit = { + val out = AlsFlow(rank = 4, maxIter = 5).apply(interactions()) + + assertEquals(Set("id", "features"), out.columns.toSet) + val rows = out.collect() + assertEquals(4, rows.length, "one factor row per item") + rows.foreach { r => + val f = r.getAs[scala.collection.Seq[Float]]("features") + assertEquals(4, f.size, "feature dim must equal rank") + } + } + + @Test + def deterministicGivenSeed(): Unit = { + // Same seed + same data ⇒ same factors. Guards against accidental nondeterminism in step construction. + val a = AlsFlow(rank = 3, maxIter = 5, seed = 7L).apply(interactions()).collect() + val b = AlsFlow(rank = 3, maxIter = 5, seed = 7L).apply(interactions()).collect() + assertEquals(a.length, b.length) + a.zip(b).foreach { case (ra, rb) => + assertEquals(ra.getInt(0), rb.getInt(0)) + val fa = ra.getAs[scala.collection.Seq[Float]](1) + val fb = rb.getAs[scala.collection.Seq[Float]](1) + fa.zip(fb).foreach { case (x, y) => assertEquals(x, y, 1e-6f) } + } + } +} diff --git a/pipeline/src/test/scala/com/kakao/actionbase/pipeline/steps/transform/TopKSimilarityFlowTest.scala b/pipeline/src/test/scala/com/kakao/actionbase/pipeline/steps/transform/TopKSimilarityFlowTest.scala new file mode 100644 index 00000000..b9c2cb96 --- /dev/null +++ b/pipeline/src/test/scala/com/kakao/actionbase/pipeline/steps/transform/TopKSimilarityFlowTest.scala @@ -0,0 +1,69 @@ +package com.kakao.actionbase.pipeline.steps.transform + +import com.kakao.actionbase.pipeline.SparkTest +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +class TopKSimilarityFlowTest extends SparkTest { + + /** Hand-crafted 4-item factor table where (0,1) are colinear and (2,3) are colinear, so the expected top-1 neighbor + * pairs are 0↔1 and 2↔3. Lets us verify both ranking and cosine math without depending on ALS. + */ + private def factors() = { + import spark.implicits._ + Seq( + (0, Seq(1.0f, 0.0f)), + (1, Seq(2.0f, 0.0f)), + (2, Seq(0.0f, 1.0f)), + (3, Seq(0.0f, 3.0f)) + ).toDF("id", "features") + } + + @Test + def emitsSchemaContract(): Unit = { + val out = TopKSimilarityFlow(k = 1).apply(factors()) + assertEquals(Seq("item_id", "similar_item_id", "score", "rank"), out.columns.toSeq) + out.schema("item_id").dataType.toString.toLowerCase.contains("long") + out.schema("similar_item_id").dataType.toString.toLowerCase.contains("long") + } + + @Test + def computesCorrectTopNeighbors(): Unit = { + val rows = TopKSimilarityFlow(k = 1).apply(factors()).collect() + val pairs = rows.map(r => r.getLong(0) -> r.getLong(1)).toSet + // Each item's top-1 partner: 0→1, 1→0, 2→3, 3→2. Colinear vectors have cosine=1.0 vs. orthogonal=0.0. + assertEquals(Set(0L -> 1L, 1L -> 0L, 2L -> 3L, 3L -> 2L), pairs) + rows.foreach { r => assertEquals(1.0, r.getDouble(2), 1e-6, s"score for ${r.getLong(0)}→${r.getLong(1)}") } + } + + @Test + def respectsKLimit(): Unit = { + val out = TopKSimilarityFlow(k = 2).apply(factors()) + val perItem = out.collect().groupBy(_.getLong(0)).map { case (id, rs) => id -> rs.length } + perItem.values.foreach(n => assertTrue(n <= 2, s"per-item rows must not exceed k=2 (got $n)")) + } + + @Test + def rejectsNonPositiveK(): Unit = { + assertThrows( + classOf[IllegalArgumentException], + () => TopKSimilarityFlow(k = 0).apply(factors()) + ) + } + + @Test + def returnsZeroForZeroVector(): Unit = { + // Zero-vector input is a legitimate edge case (e.g., cold-start factor before training): cosine is undefined and we + // contract on returning 0.0 instead of NaN so downstream rank order stays well-defined. + import spark.implicits._ + val df = Seq( + (10, Seq(0.0f, 0.0f)), + (11, Seq(1.0f, 1.0f)) + ).toDF("id", "features") + val rows = TopKSimilarityFlow(k = 1).apply(df).collect() + rows.foreach { r => + val s = r.getDouble(2) + assertEquals(0.0, s, 1e-9, "zero-vector cosine must be 0.0, not NaN") + } + } +}