Skip to content

Commit 2c2663d

Browse files
fix(public-api): Return all job types when jobType filter is not specified (#18592)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Benoit Moriceau <benoit@airbyte.io>
1 parent 413a89f commit 2c2663d

4 files changed

Lines changed: 80 additions & 6 deletions

File tree

airbyte-api/server-api/src/main/openapi/api_documentation_jobs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ components:
932932
x-speakeasy-component: true
933933
JobTypeEnum:
934934
description: "Enum that describes the different types of jobs that the platform\
935-
\ runs."
935+
\ runs. When not specified, all job types are returned."
936936
enum:
937937
- "sync"
938938
- "reset"

airbyte-api/server-api/src/main/openapi/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20501,7 +20501,7 @@ components:
2050120501
type: string
2050220502
x-sdk-component: true
2050320503
JobTypeEnum:
20504-
description: Enum that describes the different types of jobs that the platform runs.
20504+
description: Enum that describes the different types of jobs that the platform runs. When not specified, all job types are returned.
2050520505
enum:
2050620506
- sync
2050720507
- reset

airbyte-server/src/main/kotlin/io/airbyte/server/apis/publicapi/services/JobService.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import io.airbyte.api.model.generated.JobListForWorkspacesRequestBody.OrderByFie
1212
import io.airbyte.api.model.generated.JobListForWorkspacesRequestBody.OrderByMethodEnum
1313
import io.airbyte.api.model.generated.JobListRequestBody
1414
import io.airbyte.api.model.generated.Pagination
15-
import io.airbyte.api.problems.throwable.generated.UnprocessableEntityProblem
1615
import io.airbyte.commons.server.handlers.JobHistoryHandler
1716
import io.airbyte.commons.server.handlers.SchedulerHandler
1817
import io.airbyte.commons.server.support.CurrentUserService
@@ -220,14 +219,13 @@ class JobServiceImpl(
220219
private fun getJobConfigTypes(jobType: JobTypeEnum?): List<JobConfigType> {
221220
val configTypes: MutableList<JobConfigType> = ArrayList()
222221
if (jobType == null) {
223-
configTypes.addAll(listOf(JobConfigType.SYNC, JobConfigType.RESET_CONNECTION))
222+
configTypes.addAll(listOf(JobConfigType.SYNC, JobConfigType.RESET_CONNECTION, JobConfigType.CLEAR, JobConfigType.REFRESH))
224223
} else {
225224
when (jobType) {
226225
JobTypeEnum.SYNC -> configTypes.add(JobConfigType.SYNC)
227226
JobTypeEnum.RESET -> configTypes.add(JobConfigType.RESET_CONNECTION)
228227
JobTypeEnum.CLEAR -> configTypes.add(JobConfigType.CLEAR)
229228
JobTypeEnum.REFRESH -> configTypes.add(JobConfigType.REFRESH)
230-
else -> throw UnprocessableEntityProblem()
231229
}
232230
}
233231
return configTypes

airbyte-server/src/test/kotlin/io/airbyte/server/apis/publicapi/services/JobServiceTest.kt

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,36 @@
44

55
package io.airbyte.server.apis.publicapi.services
66

7+
import io.airbyte.api.model.generated.JobConfigType
8+
import io.airbyte.api.model.generated.JobReadList
79
import io.airbyte.api.problems.throwable.generated.StateConflictProblem
810
import io.airbyte.api.problems.throwable.generated.TryAgainLaterConflictProblem
911
import io.airbyte.commons.server.errors.ValueConflictKnownException
12+
import io.airbyte.commons.server.handlers.JobHistoryHandler
1013
import io.airbyte.commons.server.handlers.SchedulerHandler
1114
import io.airbyte.micronaut.runtime.AirbyteApiConfig
15+
import io.airbyte.publicApi.server.generated.models.JobTypeEnum
1216
import io.airbyte.server.apis.publicapi.errorHandlers.JOB_NOT_RUNNING_MESSAGE
17+
import io.airbyte.server.apis.publicapi.filters.JobsFilter
1318
import io.mockk.every
1419
import io.mockk.mockk
20+
import io.mockk.slot
21+
import io.mockk.verify
22+
import org.junit.jupiter.api.Assertions.assertEquals
23+
import org.junit.jupiter.api.Assertions.assertTrue
1524
import org.junit.jupiter.api.BeforeEach
1625
import org.junit.jupiter.api.Test
1726
import org.junit.jupiter.api.assertThrows
27+
import org.junit.jupiter.params.ParameterizedTest
28+
import org.junit.jupiter.params.provider.Arguments
29+
import org.junit.jupiter.params.provider.MethodSource
1830
import java.util.UUID
31+
import java.util.stream.Stream
1932

2033
class JobServiceTest {
2134
private lateinit var jobService: JobServiceImpl
2235
private val schedulerHandler: SchedulerHandler = mockk()
36+
private val jobHistoryHandler: JobHistoryHandler = mockk()
2337

2438
private val connectionId = UUID.randomUUID()
2539

@@ -28,7 +42,7 @@ class JobServiceTest {
2842
jobService =
2943
JobServiceImpl(
3044
schedulerHandler = schedulerHandler,
31-
jobHistoryHandler = mockk(),
45+
jobHistoryHandler = jobHistoryHandler,
3246
currentUserService = mockk(),
3347
userService = mockk(),
3448
airbyteApiConfig = AirbyteApiConfig(),
@@ -66,4 +80,66 @@ class JobServiceTest {
6680
every { schedulerHandler.syncConnection(any()) } throws IllegalStateException(failureReason)
6781
assertThrows<StateConflictProblem>(JOB_NOT_RUNNING_MESSAGE) { jobService.sync(connectionId) }
6882
}
83+
84+
@Test
85+
fun `test getJobList with null jobType returns all job types`() {
86+
val requestBodySlot = slot<io.airbyte.api.model.generated.JobListRequestBody>()
87+
every { jobHistoryHandler.listJobsForLight(capture(requestBodySlot)) } returns JobReadList().jobs(emptyList()).totalJobCount(0L)
88+
89+
val jobsFilter =
90+
JobsFilter(
91+
createdAtStart = null,
92+
createdAtEnd = null,
93+
updatedAtStart = null,
94+
updatedAtEnd = null,
95+
limit = 10,
96+
offset = 0,
97+
jobType = null,
98+
status = null,
99+
)
100+
jobService.getJobList(connectionId, jobsFilter)
101+
102+
verify { jobHistoryHandler.listJobsForLight(any()) }
103+
val capturedConfigTypes = requestBodySlot.captured.configTypes
104+
assertEquals(4, capturedConfigTypes.size)
105+
assertTrue(capturedConfigTypes.contains(JobConfigType.SYNC))
106+
assertTrue(capturedConfigTypes.contains(JobConfigType.RESET_CONNECTION))
107+
assertTrue(capturedConfigTypes.contains(JobConfigType.CLEAR))
108+
assertTrue(capturedConfigTypes.contains(JobConfigType.REFRESH))
109+
}
110+
111+
@ParameterizedTest(name = "jobType {0} should return config type {1}")
112+
@MethodSource("jobTypeToConfigTypeProvider")
113+
fun `test getJobList with specific jobType returns only that type`(
114+
jobType: JobTypeEnum,
115+
expectedConfigType: JobConfigType,
116+
) {
117+
val requestBodySlot = slot<io.airbyte.api.model.generated.JobListRequestBody>()
118+
every { jobHistoryHandler.listJobsForLight(capture(requestBodySlot)) } returns JobReadList().jobs(emptyList()).totalJobCount(0L)
119+
120+
val filter =
121+
JobsFilter(
122+
createdAtStart = null,
123+
createdAtEnd = null,
124+
updatedAtStart = null,
125+
updatedAtEnd = null,
126+
limit = 10,
127+
offset = 0,
128+
jobType = jobType,
129+
status = null,
130+
)
131+
jobService.getJobList(connectionId, filter)
132+
assertEquals(listOf(expectedConfigType), requestBodySlot.captured.configTypes)
133+
}
134+
135+
companion object {
136+
@JvmStatic
137+
fun jobTypeToConfigTypeProvider(): Stream<Arguments> =
138+
Stream.of(
139+
Arguments.of(JobTypeEnum.SYNC, JobConfigType.SYNC),
140+
Arguments.of(JobTypeEnum.RESET, JobConfigType.RESET_CONNECTION),
141+
Arguments.of(JobTypeEnum.CLEAR, JobConfigType.CLEAR),
142+
Arguments.of(JobTypeEnum.REFRESH, JobConfigType.REFRESH),
143+
)
144+
}
69145
}

0 commit comments

Comments
 (0)