Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ object Dependencies {
const val CORE = "org.apache.spark:spark-core_${Versions.SCALA_BINARY}:${Versions.SPARK}"
const val SQL = "org.apache.spark:spark-sql_${Versions.SCALA_BINARY}:${Versions.SPARK}"
const val STREAMING = "org.apache.spark:spark-streaming_${Versions.SCALA_BINARY}:${Versions.SPARK}"
const val MLLIB = "org.apache.spark:spark-mllib_${Versions.SCALA_BINARY}:${Versions.SPARK}"
}

object Spring {
Expand Down
4 changes: 4 additions & 0 deletions pipeline/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ dependencies {
implementation(project(":codec-java"))

implementation(Dependencies.Jackson.JACKSON_YAML)

// Spark ML algorithms (ALS, etc.) used by built-in Transform steps. compileOnly because spark-submit provides it.
compileOnly(Dependencies.Spark.MLLIB)
testImplementation(Dependencies.Spark.MLLIB)
}

publishing {
Expand Down
52 changes: 52 additions & 0 deletions pipeline/conf/als-i2i-experiment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# ALS-based item-to-item similarity experiment.
#
# Reads V2 edge dumps stored as line-delimited JSON, treats each edge as an implicit-feedback signal
# (source=user, target=item, rating=1.0), trains an ALS model, and writes the top-20 most similar
# items per item by cosine similarity over the trained item factors.
#
# Run via `StepsRunnerJob` (or copy `steps:` block under a workflow `jobs:` definition):
# spark-submit --class com.kakao.actionbase.pipeline.jobs.StepsRunnerJob ... \
# --steps=@als-i2i-experiment.yaml
#
# Tune `rank`, `maxIter`, `regParam`, and `k` per dataset. `seed` is fixed for reproducibility.

steps:
- step: FileSource
args:
path: "s3a://ab-edges/v2/dump/latest/*.json"
format: "json"

# Prep: cast source/target to int (ALS only takes int keys) and synthesize a rating column.
# Implicit feedback ⇒ rating is a constant; the ALS confidence comes from edge presence × alpha.
- step: SqlMerge
args:
query: |
SELECT
CAST(source AS INT) AS user,
CAST(target AS INT) AS item,
1.0 AS rating
FROM _0
WHERE source IS NOT NULL AND target IS NOT NULL

- step: AlsFlow
args:
userCol: "user"
itemCol: "item"
ratingCol: "rating"
rank: 32
maxIter: 15
regParam: 0.05
implicitPrefs: true
alpha: 1.0
coldStartStrategy: "drop"
seed: 42

- step: TopKSimilarityFlow
args:
k: 20

- step: FileSink
args:
path: "s3a://ab-edges/v2/i2i/latest"
format: "parquet"
mode: "overwrite"
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.kakao.actionbase.pipeline.steps.transform

import com.kakao.actionbase.pipeline.dsl.Flow
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.{DataFrame, SparkSession}

/** Fits Spark MLlib's ALS to a `(userCol, itemCol, ratingCol)` triple table and emits the trained model's `itemFactors`
* — `(id: int, features: array<float>)` — for downstream i2i similarity.
*
* Defaults target implicit-feedback workloads (action presence ⇒ confidence 1.0): `implicitPrefs=true`,
* `coldStartStrategy="drop"`. The two ID columns must already be `int` (or castable); upstream is responsible for
* indexing arbitrary IDs (see the example workflow's `SqlMerge` prep step).
*
* Hyperparameters mirror `org.apache.spark.ml.recommendation.ALS`'s setters; defaults match Spark's defaults except
* `implicitPrefs` (flipped to `true`) and `coldStartStrategy` (`"drop"` instead of `"nan"` so downstream Sinks don't
* receive NaN rows from unseen IDs).
*
* Emits item factors only (single output). To persist both item and user factors, swap for a future `AlsFactorSplit`
* (1→2 Split) once Split built-ins land.
*/
case class AlsFlow(
userCol: String = "user",
itemCol: String = "item",
ratingCol: String = "rating",
rank: Int = 10,
maxIter: Int = 10,
regParam: Double = 0.1,
implicitPrefs: Boolean = true,
alpha: Double = 1.0,
coldStartStrategy: String = "drop",
seed: Long = 42L
) extends Flow {

override def apply(in: DataFrame)(implicit spark: SparkSession): DataFrame = {
val als = new ALS()
.setUserCol(userCol)
.setItemCol(itemCol)
.setRatingCol(ratingCol)
.setRank(rank)
.setMaxIter(maxIter)
.setRegParam(regParam)
.setImplicitPrefs(implicitPrefs)
.setAlpha(alpha)
.setColdStartStrategy(coldStartStrategy)
.setSeed(seed)

als.fit(in).itemFactors
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.kakao.actionbase.pipeline.steps.transform

import com.kakao.actionbase.pipeline.dsl.Flow
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.{DataFrame, SparkSession}

/** Given a `(idCol, featuresCol)` table of factor vectors, emits the top-`k` most similar items per item by cosine
* similarity. Output schema is `(item_id long, similar_item_id long, score double, rank int)`.
*
* Computed via a cross self-join + Window `row_number` — O(N²) item-pairs, fine for experiment-scale catalogues but
* not for production scale. For larger N, swap for an ANN index (e.g., `BucketedRandomProjectionLSH`) — but that
* changes the contract (approximate) and belongs in a separate Step.
*/
case class TopKSimilarityFlow(
k: Int = 20,
idCol: String = "id",
featuresCol: String = "features"
) extends Flow {

override def apply(in: DataFrame)(implicit spark: SparkSession): DataFrame = {
require(k > 0, s"k must be positive, got $k")
val factors = in.select(col(idCol).as("id"), col(featuresCol).as("features"))

// UDF: cosine similarity for two float feature arrays. Returns 0.0 when either vector is the zero vector to avoid
// division-by-zero polluting the output with NaN.
val cosine = udf { (a: Seq[Float], b: Seq[Float]) =>
val n = math.min(a.size, b.size)
var dot, na, nb = 0.0
var i = 0
while (i < n) {
val ai = a(i).toDouble
val bi = b(i).toDouble
dot += ai * bi
na += ai * ai
nb += bi * bi
i += 1
}
val denom = math.sqrt(na) * math.sqrt(nb)
if (denom == 0.0) 0.0 else dot / denom
}

val pairs = factors
.alias("a")
.crossJoin(factors.alias("b"))
.where(col("a.id") =!= col("b.id"))
.select(
col("a.id").cast(LongType).as("item_id"),
col("b.id").cast(LongType).as("similar_item_id"),
cosine(col("a.features"), col("b.features")).as("score")
)

val topK = Window.partitionBy("item_id").orderBy(col("score").desc, col("similar_item_id").asc)

pairs
.withColumn("rank", row_number().over(topK))
.filter(col("rank") <= k)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.kakao.actionbase.pipeline

import com.kakao.actionbase.pipeline.runner.StepsBuilder
import com.kakao.actionbase.pipeline.workflow.StepSpec
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

/** End-to-end exercise of the Step DSL on the ALS i2i experiment: line-delimited edge JSON → SQL prep → ALS →
* Top-K cosine similarity → Parquet sink. Doubles as a documentation example: the steps below mirror the YAML in
* `pipeline/conf/als-i2i-experiment.yaml`.
*/
class AlsI2iWorkflowTest extends SparkTest {

@Test
def runsFullWorkflowOnSyntheticEdges(): Unit = {
// Two clear interaction clusters so ALS recovers structure ALS deterministically: users {0,1} love items {100,101},
// users {2,3} love items {200,201}. Top-1 i2i neighbor of 100 should be 101, of 200 should be 201.
val edgesPath = Files.createTempDirectory("als-edges-").resolve("edges.json").toString
val payload =
"""{"source":0,"target":100,"version":1}
|{"source":0,"target":101,"version":2}
|{"source":1,"target":100,"version":3}
|{"source":1,"target":101,"version":4}
|{"source":2,"target":200,"version":5}
|{"source":2,"target":201,"version":6}
|{"source":3,"target":200,"version":7}
|{"source":3,"target":201,"version":8}
|""".stripMargin
Files.write(Paths.get(edgesPath), payload.getBytes(StandardCharsets.UTF_8))

val outPath = Files.createTempDirectory("als-i2i-out-").resolve("topk").toString

val steps = Seq(
StepSpec(
"FileSource",
Map("path" -> edgesPath, "format" -> "json")
),
StepSpec(
"SqlMerge",
Map("query" -> "SELECT CAST(source AS INT) AS user, CAST(target AS INT) AS item, 1.0 AS rating FROM _0")
),
StepSpec(
"AlsFlow",
Map("rank" -> 4, "maxIter" -> 10, "regParam" -> 0.01, "seed" -> 42L)
),
StepSpec(
"TopKSimilarityFlow",
Map("k" -> 2)
),
StepSpec(
"FileSink",
Map("path" -> outPath, "format" -> "parquet", "mode" -> "overwrite")
)
)

StepsBuilder.build(steps).run()

val result = spark.read.parquet(outPath)
assertEquals(
Set("item_id", "similar_item_id", "score", "rank"),
result.columns.toSet
)

// Each item's top-1 neighbor should be its cluster sibling.
val top1 = result
.where("rank = 1")
.collect()
.map(r => r.getLong(0) -> r.getLong(1))
.toMap

assertEquals(101L, top1(100L), "100's top neighbor must be 101 (same cluster)")
assertEquals(100L, top1(101L), "101's top neighbor must be 100 (same cluster)")
assertEquals(201L, top1(200L), "200's top neighbor must be 201 (same cluster)")
assertEquals(200L, top1(201L), "201's top neighbor must be 200 (same cluster)")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.kakao.actionbase.pipeline.steps.transform

import com.kakao.actionbase.pipeline.SparkTest
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

class AlsFlowTest extends SparkTest {

/** Builds a small, learnable interaction table: 4 users × 4 items where users {0,1} prefer items {0,1} and users
* {2,3} prefer items {2,3}. ALS should recover two distinct factor regions.
*/
private def interactions() = {
import spark.implicits._
Seq(
(0, 0, 1.0f), (0, 1, 1.0f),
(1, 0, 1.0f), (1, 1, 1.0f),
(2, 2, 1.0f), (2, 3, 1.0f),
(3, 2, 1.0f), (3, 3, 1.0f)
).toDF("user", "item", "rating")
}

@Test
def fitsAndEmitsItemFactors(): Unit = {
val out = AlsFlow(rank = 4, maxIter = 5).apply(interactions())

assertEquals(Set("id", "features"), out.columns.toSet)
val rows = out.collect()
assertEquals(4, rows.length, "one factor row per item")
rows.foreach { r =>
val f = r.getAs[scala.collection.Seq[Float]]("features")
assertEquals(4, f.size, "feature dim must equal rank")
}
}

@Test
def deterministicGivenSeed(): Unit = {
// Same seed + same data ⇒ same factors. Guards against accidental nondeterminism in step construction.
val a = AlsFlow(rank = 3, maxIter = 5, seed = 7L).apply(interactions()).collect()
val b = AlsFlow(rank = 3, maxIter = 5, seed = 7L).apply(interactions()).collect()
assertEquals(a.length, b.length)
a.zip(b).foreach { case (ra, rb) =>
assertEquals(ra.getInt(0), rb.getInt(0))
val fa = ra.getAs[scala.collection.Seq[Float]](1)
val fb = rb.getAs[scala.collection.Seq[Float]](1)
fa.zip(fb).foreach { case (x, y) => assertEquals(x, y, 1e-6f) }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.kakao.actionbase.pipeline.steps.transform

import com.kakao.actionbase.pipeline.SparkTest
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

class TopKSimilarityFlowTest extends SparkTest {

/** Hand-crafted 4-item factor table where (0,1) are colinear and (2,3) are colinear, so the expected top-1 neighbor
* pairs are 0↔1 and 2↔3. Lets us verify both ranking and cosine math without depending on ALS.
*/
private def factors() = {
import spark.implicits._
Seq(
(0, Seq(1.0f, 0.0f)),
(1, Seq(2.0f, 0.0f)),
(2, Seq(0.0f, 1.0f)),
(3, Seq(0.0f, 3.0f))
).toDF("id", "features")
}

@Test
def emitsSchemaContract(): Unit = {
val out = TopKSimilarityFlow(k = 1).apply(factors())
assertEquals(Seq("item_id", "similar_item_id", "score", "rank"), out.columns.toSeq)
out.schema("item_id").dataType.toString.toLowerCase.contains("long")
out.schema("similar_item_id").dataType.toString.toLowerCase.contains("long")
}

@Test
def computesCorrectTopNeighbors(): Unit = {
val rows = TopKSimilarityFlow(k = 1).apply(factors()).collect()
val pairs = rows.map(r => r.getLong(0) -> r.getLong(1)).toSet
// Each item's top-1 partner: 0→1, 1→0, 2→3, 3→2. Colinear vectors have cosine=1.0 vs. orthogonal=0.0.
assertEquals(Set(0L -> 1L, 1L -> 0L, 2L -> 3L, 3L -> 2L), pairs)
rows.foreach { r => assertEquals(1.0, r.getDouble(2), 1e-6, s"score for ${r.getLong(0)}→${r.getLong(1)}") }
}

@Test
def respectsKLimit(): Unit = {
val out = TopKSimilarityFlow(k = 2).apply(factors())
val perItem = out.collect().groupBy(_.getLong(0)).map { case (id, rs) => id -> rs.length }
perItem.values.foreach(n => assertTrue(n <= 2, s"per-item rows must not exceed k=2 (got $n)"))
}

@Test
def rejectsNonPositiveK(): Unit = {
assertThrows(
classOf[IllegalArgumentException],
() => TopKSimilarityFlow(k = 0).apply(factors())
)
}

@Test
def returnsZeroForZeroVector(): Unit = {
// Zero-vector input is a legitimate edge case (e.g., cold-start factor before training): cosine is undefined and we
// contract on returning 0.0 instead of NaN so downstream rank order stays well-defined.
import spark.implicits._
val df = Seq(
(10, Seq(0.0f, 0.0f)),
(11, Seq(1.0f, 1.0f))
).toDF("id", "features")
val rows = TopKSimilarityFlow(k = 1).apply(df).collect()
rows.foreach { r =>
val s = r.getDouble(2)
assertEquals(0.0, s, 1e-9, "zero-vector cosine must be 0.0, not NaN")
}
}
}
Loading