Skip to content

Commit

Permalink
#106 Support labelSelector, fieldSelector and resourceVersion in the … (
Browse files Browse the repository at this point in the history
  • Loading branch information
jkobejs authored Feb 9, 2022
1 parent f0a0407 commit 18fae66
Show file tree
Hide file tree
Showing 2 changed files with 596 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -1,35 +1,25 @@
package com.coralogix.zio.k8s.client.test

import com.coralogix.zio.k8s.client.model.K8sObject._
import com.coralogix.zio.k8s.client.model.{
Added,
Deleted,
FieldSelector,
K8sNamespace,
K8sObject,
K8sResourceType,
LabelSelector,
ListResourceVersion,
Modified,
Optional,
PropagationPolicy,
TypedWatchEvent
import com.coralogix.zio.k8s.client.model.LabelSelector.{ And, LabelEquals, LabelIn, LabelNotIn }
import com.coralogix.zio.k8s.client.model.ListResourceVersion.{
Any,
Exact,
MostRecent,
NotOlderThan
}
import com.coralogix.zio.k8s.client.{
DecodedFailure,
K8sFailure,
K8sRequestInfo,
NotFound,
Resource,
ResourceDelete,
ResourceDeleteAll
}
import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.{ DeleteOptions, Status }
import com.coralogix.zio.k8s.client.model._
import com.coralogix.zio.k8s.client.test.TestResourceClient._
import com.coralogix.zio.k8s.client._
import com.coralogix.zio.k8s.model.pkg.apis.meta.v1.{ DeleteOptions, ObjectMeta, Status }
import io.circe.{ Json, JsonNumber }
import sttp.model.StatusCode
import zio.duration.Duration
import zio.stm.{ TMap, TQueue, ZSTM }
import zio.stream._
import zio.{ IO, ZIO }
import zio.{ Chunk, IO, ZIO }

import scala.annotation.tailrec

/** Implementation of [[Resource]] and [[ResourceDeleteAll]] to be used from unit tests
* @param store
Expand All @@ -42,7 +32,7 @@ import zio.{ IO, ZIO }
* Result of the delete operation
*/
final class TestResourceClient[T: K8sObject, DeleteResult] private (
store: TMap[String, T],
store: TMap[String, Chunk[T]],
events: TQueue[TypedWatchEvent[T]],
createDeleteResult: () => DeleteResult
) extends Resource[T] with ResourceDelete[T, DeleteResult] with ResourceDeleteAll[T] {
Expand All @@ -54,18 +44,22 @@ final class TestResourceClient[T: K8sObject, DeleteResult] private (
labelSelector: Option[LabelSelector] = None,
resourceVersion: ListResourceVersion = ListResourceVersion.MostRecent
): Stream[K8sFailure, T] = {
// TODO: support fieldSelector, labelSelector and resourceVersion

val prefix = keyPrefix(namespace)
ZStream.unwrap {
filterByResourceVersion(ZStream.unwrap {
store.toList.commit.map { items =>
ZStream
.fromIterable(items)
.filter { case (key, _) => if (namespace.isDefined) key.startsWith(prefix) else true }
.map { case (_, value) => value }
.chunkN(chunkSize)
.filter { case (key, _) =>
if (namespace.isDefined) key.startsWith(prefix)
else true
}
.map(_._2)
}
}
})(resourceVersion)
.filter(filterByLabelSelector(labelSelector))
.filter(filterByFieldSelector(fieldSelector))
.chunkN(chunkSize)

}

override def watch(
Expand All @@ -74,13 +68,29 @@ final class TestResourceClient[T: K8sObject, DeleteResult] private (
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
): Stream[K8sFailure, TypedWatchEvent[T]] =
// TODO: support fieldSelector, labelSelector
ZStream.fromTQueue(events)
ZStream.fromTQueue(events).filter {
case Reseted() => true
case Added(item) =>
filterByLabelSelector(labelSelector)(item) && filterByFieldSelector(fieldSelector)(
item
) && resourceVersion
.forall(version => item.metadata.flatMap(_.resourceVersion).contains(version))
case Modified(item) =>
filterByLabelSelector(labelSelector)(item) && filterByFieldSelector(fieldSelector)(
item
) && resourceVersion
.forall(version => item.metadata.flatMap(_.resourceVersion).contains(version))
case Deleted(item) =>
filterByLabelSelector(labelSelector)(item) && filterByFieldSelector(fieldSelector)(
item
) && resourceVersion
.forall(version => item.metadata.flatMap(_.resourceVersion).contains(version))
}

override def get(name: String, namespace: Option[K8sNamespace]): IO[K8sFailure, T] = {
val prefix = keyPrefix(namespace)
store.get(prefix + name).commit.flatMap {
case Some(value) => ZIO.succeed(value)
case Some(value) => ZIO.succeed(value.last)
case None => ZIO.fail(NotFound)
}
}
Expand All @@ -107,19 +117,23 @@ final class TestResourceClient[T: K8sObject, DeleteResult] private (
for {
name <- finalResource.getName
stm = for {
_ <- store.contains(prefix + name).flatMap {
case true =>
ZSTM.fail(
DecodedFailure(
K8sRequestInfo(K8sResourceType("test", "group", "version"), "create"),
Status(),
StatusCode.Conflict
)
)
case false => ZSTM.unit
}
_ <- store.put(prefix + name, finalResource)
_ <- events.offer(Added(finalResource))
_ <- store.contains(prefix + name).flatMap {
case true =>
ZSTM.fail(
DecodedFailure(
K8sRequestInfo(K8sResourceType("test", "group", "version"), "create"),
Status(),
StatusCode.Conflict
)
)
case false => ZSTM.unit
}
maybeChunks <- store.get(prefix + name)
_ <- store.put(
prefix + name,
maybeChunks.getOrElse(Chunk.empty) ++ Chunk(finalResource)
)
_ <- events.offer(Added(finalResource))
} yield ()
_ <- stm.commit
} yield finalResource
Expand All @@ -138,21 +152,22 @@ final class TestResourceClient[T: K8sObject, DeleteResult] private (
if (!dryRun) {
val finalResource = increaseResourceVersion(updatedResource)
val stm = for {
_ <- store.get(prefix + name).flatMap {
case Some(existing)
if existing.metadata.flatMap(_.resourceVersion) != updatedResource.metadata
.flatMap(_.resourceVersion) =>
ZSTM.fail(
DecodedFailure(
K8sRequestInfo(K8sResourceType("test", "group", "version"), "replace"),
Status(),
StatusCode.Conflict
)
)
case _ => ZSTM.unit
}
_ <- store.put(prefix + name, finalResource)
_ <- events.offer(Modified(finalResource))
_ <- store.get(prefix + name).flatMap {
case Some(items)
if items.last.metadata.flatMap(_.resourceVersion) != updatedResource.metadata
.flatMap(_.resourceVersion) =>
ZSTM.fail(
DecodedFailure(
K8sRequestInfo(K8sResourceType("test", "group", "version"), "replace"),
Status(),
StatusCode.Conflict
)
)
case _ => ZSTM.unit
}
maybeChunks <- store.get(prefix + name)
_ <- store.put(prefix + name, maybeChunks.getOrElse(Chunk.empty) ++ Chunk(finalResource))
_ <- events.offer(Modified(finalResource))
} yield finalResource
stm.commit
} else {
Expand All @@ -175,7 +190,7 @@ final class TestResourceClient[T: K8sObject, DeleteResult] private (
_ <- ZSTM.foreach_(item) { item =>
for {
_ <- store.delete(prefix + name)
_ <- events.offer(Deleted(item))
_ <- events.offer(Deleted(item.last))
} yield ()
}
} yield createDeleteResult()
Expand All @@ -194,21 +209,25 @@ final class TestResourceClient[T: K8sObject, DeleteResult] private (
fieldSelector: Option[FieldSelector],
labelSelector: Option[LabelSelector]
): IO[K8sFailure, Status] = {
// TODO: support fieldSelector, labelSelector
val prefix = keyPrefix(namespace)
if (!dryRun) {
val stm = for {
keys <- store.keys
filteredKeys = keys.filter(_.startsWith(prefix))
_ <- ZSTM.foreach_(filteredKeys) { key =>
for {
item <- store.get(key)
_ <- ZSTM.foreach_(item) { item =>
for {
_ <- store.delete(key)
_ <- events.offer(Deleted(item))
} yield ()
}
items <- store.get(key)
_ <- ZSTM.foreach_(
items
.flatMap(_.lastOption)
.filter(filterByLabelSelector(labelSelector))
.filter(filterByFieldSelector(fieldSelector))
) { item =>
for {
_ <- store.delete(key)
_ <- events.offer(Deleted(item))
} yield ()
}
} yield ()
}
} yield Status()
Expand All @@ -220,6 +239,7 @@ final class TestResourceClient[T: K8sObject, DeleteResult] private (

private def keyPrefix(namespace: Option[K8sNamespace]): String =
namespace.map(_.value).getOrElse("") + ":"

}

object TestResourceClient {
Expand All @@ -237,7 +257,92 @@ object TestResourceClient {
createDeleteResult: () => DeleteResult
): ZIO[Any, Nothing, TestResourceClient[T, DeleteResult]] =
for {
store <- TMap.empty[String, T].commit
store <- TMap.empty[String, Chunk[T]].commit
events <- TQueue.unbounded[TypedWatchEvent[T]].commit
} yield new TestResourceClient[T, DeleteResult](store, events, createDeleteResult)

@tailrec
private def getValue(json: Json, fieldPath: List[String]): Json =
fieldPath match {
case head :: tail =>
json.asObject.flatMap(_.apply(head)) match {
case Some(value) => getValue(value, tail)
case None => Json.Null
}
case Nil => json
}

private def jsonEqualsTo(json: Json, value: String): Boolean =
json.asBoolean.exists(_.toString == value) || json.asString.contains(
value
) || (json.asNumber.nonEmpty && json.asNumber == JsonNumber.fromString(value))

// works for metadata only
private[test] def selectableByField(json: Json)(fieldSelector: FieldSelector): Boolean =
fieldSelector match {
case FieldSelector.FieldEquals(fieldPath, value) =>
fieldPath.toList match {
case "metadata" :: tail => jsonEqualsTo(getValue(json, tail), value)
case _ => false
}
case FieldSelector.FieldNotEquals(fieldPath, value) =>
(fieldPath.toList match {
case "metadata" :: tail => !jsonEqualsTo(getValue(json, tail), value)
case _ => false
})
case FieldSelector.And(selectors) => selectors.forall(selectableByField(json))
}

private[test] def filterByFieldSelector[T: K8sObject](
fieldSelector: Option[FieldSelector]
)(item: T): Boolean =
if (fieldSelector.isDefined) {
item.metadata
.map(ObjectMeta.ObjectMetaEncoder.apply)
.exists(json => selectableByField(json)(fieldSelector.get))
} else true

private def selectableByLabel(
labels: Map[String, String]
)(labelSelector: LabelSelector): Boolean =
labelSelector match {
case LabelEquals(label, value) => labels.get(label).contains(value)
case LabelIn(label, values) => labels.get(label).exists(values.contains)
case LabelNotIn(label, values) => !labels.get(label).exists(values.contains)
case And(selectors) => selectors.forall(selectableByLabel(labels))
}

private[test] def filterByLabelSelector[T: K8sObject](
labelSelector: Option[LabelSelector]
)(item: T): Boolean =
if (labelSelector.isDefined)
item.metadata
.flatMap(_.labels.map(labels => selectableByLabel(labels)(labelSelector.get)))
.getOrElse(false)
else true

private[test] def filterByResourceVersion[T: K8sObject](
stream: Stream[K8sFailure, Chunk[T]]
)(resourceVersion: ListResourceVersion): ZStream[Any, K8sFailure, T] =
resourceVersion match {
case Exact(version) =>
stream.flattenChunks.filter(_.metadata.flatMap(_.resourceVersion).contains(version))
case NotOlderThan(version) =>
stream.map { items =>
val generation = items
.find(_.metadata.flatMap(_.resourceVersion).contains(version))
.map(_.generation)
.getOrElse(0L)
items
.partition(
_.generation >= generation
)
._1
.sortBy(_.generation)
.takeRight(1)
}.flattenChunks

case MostRecent | Any =>
stream.map(_.takeRight(1)).flattenChunks
}
}
Loading

0 comments on commit 18fae66

Please sign in to comment.