diff --git a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsController.scala b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsController.scala index 13294397..a3860de1 100644 --- a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsController.scala +++ b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsController.scala @@ -17,11 +17,18 @@ package uk.gov.hmrc.excisemovementcontrolsystemapi.controllers import cats.data._ +import cats.implicits.toFlatMapOps +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.util.ByteString import play.api.Logging +import play.api.http.HttpEntity.Strict import play.api.libs.json.Json import play.api.mvc._ import uk.gov.hmrc.excisemovementcontrolsystemapi.controllers.actions._ import uk.gov.hmrc.excisemovementcontrolsystemapi.filters.{MovementFilter, TraderType} +import uk.gov.hmrc.excisemovementcontrolsystemapi.models.auth.EnrolmentRequest import uk.gov.hmrc.excisemovementcontrolsystemapi.models.validation.MovementIdValidation import uk.gov.hmrc.excisemovementcontrolsystemapi.models.{ErrorResponse, ExciseMovementResponse} import uk.gov.hmrc.excisemovementcontrolsystemapi.repository.model.Movement @@ -35,6 +42,7 @@ import uk.gov.hmrc.play.http.HeaderCarrierConverter import java.time.Instant import javax.inject.Inject import scala.concurrent.{ExecutionContext, Future} +import scala.util.chaining.scalaUtilChainingOps import scala.util.control.NonFatal class GetMovementsController @Inject() ( @@ -49,7 +57,7 @@ class GetMovementsController @Inject() ( messageService: MessageService, movementIdValidator: MovementIdValidation, auditService: AuditService -)(implicit ec: ExecutionContext) +)(implicit ec: ExecutionContext, materializer: Materializer) extends BackendController(cc) with Logging { @@ -76,15 +84,19 @@ class GetMovementsController @Inject() ( traderType.map(trader => TraderType(trader, request.erns.toSeq)) ) - { - for { - _ <- messageService.updateAllMessages(ern.fold(request.erns)(Set(_))) - movements <- movementService.getMovementByErn(request.erns.toSeq, filter) - } yield { - auditService.getInformationForGetMovements(filter, movements, request) - Ok(Json.toJson(movements.map(createResponseFrom))) - } - }.recover { case NonFatal(ex) => + val result = for { + _ <- messageService.updateAllMessages(ern.fold(request.erns)(Set(_))) + payload <- movementService + .streamMovementsByErn(request.erns.toSeq) + .pipe(sizedJsonArrayPayload) + .flatTap { case (count, _) => Future.successful(audit(count, filter)) } + .map(_._2) + + } yield { + Result(ResponseHeader(OK), Strict(payload, Some("application/json"))) + } + + result.recover { case NonFatal(ex) => logger.warn( s"Error getting movements for erns ${request.erns} with filters ern: $ern, lrn: $lrn, arc: $arc, updatedSince: $updatedSince, traderType: $traderType", ex @@ -101,6 +113,24 @@ class GetMovementsController @Inject() ( } } + private def audit(movementCount: Int, filter: MovementFilter)(implicit + request: EnrolmentRequest[AnyContent], + hc: HeaderCarrier + ): Unit = + auditService.getInformationForGetMovements(filter, movementCount, request) + + private def sizedJsonArrayPayload(source: Source[Movement, NotUsed]): Future[(Int, ByteString)] = + source + .map(createResponseFrom) + .map(Json.toJson(_).toString()) + .grouped(2) + .map(group => group.size -> ByteString(group.mkString(","))) + .pipe(Source.single(0 -> ByteString("[")) ++ _ ++ Source.single(0 -> ByteString("]"))) + .runFold(0 -> ByteString.empty)(foldSizedPayload) + + private def foldSizedPayload(left: (Int, ByteString), right: (Int, ByteString)): (Int, ByteString) = + left._1 + right._1 -> left._2.concat(right._2) + def getMovement(movementId: String): Action[AnyContent] = (authAction andThen correlationIdAction).async(parse.default) { implicit request => implicit val hc: HeaderCarrier = HeaderCarrierConverter.fromRequest(request) diff --git a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/models/auditing/AuditEventFactory.scala b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/models/auditing/AuditEventFactory.scala index 14d9b683..ce90ba35 100644 --- a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/models/auditing/AuditEventFactory.scala +++ b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/models/auditing/AuditEventFactory.scala @@ -97,7 +97,7 @@ class AuditEventFactory @Inject() (emcsUtils: EmcsUtils, ieMessageFactory: IEMes def createGetMovementsDetails( movementFilter: MovementFilter, - movements: Seq[Movement], + movements: Int, request: EnrolmentRequest[AnyContent] ): GetMovementsAuditInfo = { val parameters = GetMovementsParametersAuditInfo( @@ -108,7 +108,7 @@ class AuditEventFactory @Inject() (emcsUtils: EmcsUtils, ieMessageFactory: IEMes movementFilter.traderType.map(x => x.traderType) ) val response = GetMovementsResponseAuditInfo( - movements.length + movements ) GetMovementsAuditInfo( request = parameters, diff --git a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/repository/MovementRepository.scala b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/repository/MovementRepository.scala index 443a22ab..5ecf79f7 100644 --- a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/repository/MovementRepository.scala +++ b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/repository/MovementRepository.scala @@ -17,7 +17,8 @@ package uk.gov.hmrc.excisemovementcontrolsystemapi.repository import cats.implicits.toFunctorOps -import org.apache.pekko.Done +import org.apache.pekko.{Done, NotUsed} +import org.apache.pekko.stream.scaladsl.Source import org.bson.conversions.Bson import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model._ @@ -170,6 +171,16 @@ class MovementRepository @Inject() ( .toFuture() } + def streamMovementsByERN( + ern: Seq[String] + ): Source[Movement, NotUsed] = { + + val ernFilters: Seq[Bson] = getErnFilters(ern) + val publisher = collection.find(and(ernFilters: _*)) + + Source.fromPublisher(publisher) + } + private def getErnFilters(ern: Seq[String]) = Seq( Filters.in("consignorId", ern: _*), diff --git a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditService.scala b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditService.scala index de48e9f4..2be72287 100644 --- a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditService.scala +++ b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditService.scala @@ -91,7 +91,7 @@ class AuditService @Inject() (auditConnector: AuditConnector, appConfig: AppConf def getInformationForGetMovements( movementFilter: MovementFilter, - movements: Seq[Movement], + movements: Int, request: EnrolmentRequest[AnyContent] )(implicit hc: HeaderCarrier): Unit = if (appConfig.newAuditingEnabled) { diff --git a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/MovementService.scala b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/MovementService.scala index e7d07588..6386966f 100644 --- a/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/MovementService.scala +++ b/app/uk/gov/hmrc/excisemovementcontrolsystemapi/services/MovementService.scala @@ -17,7 +17,8 @@ package uk.gov.hmrc.excisemovementcontrolsystemapi.services import com.google.inject.Singleton -import org.apache.pekko.Done +import org.apache.pekko.{Done, NotUsed} +import org.apache.pekko.stream.scaladsl.Source import org.mongodb.scala.MongoCommandException import play.api.Logging import play.api.libs.json.Json @@ -117,6 +118,9 @@ class MovementService @Inject() ( .getMovementByERN(ern, filter) .map(movements => filterMovementByTraderType(movements, filter.traderType)) + def streamMovementsByErn(ern: Seq[String]): Source[Movement, NotUsed] = + movementRepository.streamMovementsByERN(ern) + private def filterMovementByTraderType(movements: Seq[Movement], traderType: Option[TraderType]) = traderType.fold[Seq[Movement]](movements) { trader => if (trader.traderType.equalsIgnoreCase("consignor")) { diff --git a/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/GetMovementsControllerItSpec.scala b/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/GetMovementsControllerItSpec.scala index 7c52644d..f3acf5da 100644 --- a/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/GetMovementsControllerItSpec.scala +++ b/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/GetMovementsControllerItSpec.scala @@ -16,6 +16,7 @@ package uk.gov.hmrc.excisemovementcontrolsystemapi +import org.apache.pekko.stream.scaladsl.Source import org.mockito.ArgumentMatchersSugar.{any, eqTo} import org.mockito.MockitoSugar.{reset, when} import org.scalatest.BeforeAndAfterEach @@ -28,10 +29,9 @@ import play.api.libs.json.Json import play.api.libs.ws.WSClient import play.api.test.Helpers.{await, defaultAwaitTimeout} import uk.gov.hmrc.auth.core.InternalError -import uk.gov.hmrc.excisemovementcontrolsystemapi.filters.MovementFilter import uk.gov.hmrc.excisemovementcontrolsystemapi.fixture.MovementTestUtils import uk.gov.hmrc.excisemovementcontrolsystemapi.fixtures.ApplicationBuilderSupport -import uk.gov.hmrc.excisemovementcontrolsystemapi.repository.model.Movement +import uk.gov.hmrc.excisemovementcontrolsystemapi.repository.model.{Message, Movement} import java.time.Instant import java.time.temporal.ChronoUnit @@ -72,9 +72,9 @@ class GetMovementsControllerItSpec "return 200 and movements when logged in as consignor" in { withAuthorizedTrader(consignorId) when( - movementRepository.getMovementByERN(Seq(consignorId), MovementFilter.emptyFilter) + movementRepository.streamMovementsByERN(Seq(consignorId)) ) - .thenReturn(Future.successful(Seq(movement1, movement2))) + .thenReturn(Source.fromIterator(() => Iterator(movement1, movement2))) val result = getRequest(baseUrl) @@ -89,6 +89,35 @@ class GetMovementsControllerItSpec } } + "return 200 with a large stream" in { + val timestamp = Instant.parse("2024-10-05T12:12:12.12345678Z") + val encodedMessage = "PGllODM3OklFODM3IHhtbG5zPSJodHRwOi8vd3d3LmdvdnRhbGsuZ292LnVrL3RheGF0aW9uL0ludGVybmF0aW9uYWxUcmFkZS9FeGNpc2UvTW92ZW1lbnRGb3JUcmFkZXJEYXRhLzMiIHhtbG5zOmRvYz0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpET0M6VjMuMjMiIHhtbG5zOmVtY3M9InVybjpwdWJsaWNpZDotOkVDOkRHVEFYVUQ6RU1DUzpQSEFTRTQ6RU1DUzpWMy4yMyIgeG1sbnM6ZXVjPSJodHRwOi8vd3d3LmdvdnRhbGsuZ292LnVrL3RheGF0aW9uL0ludGVybmF0aW9uYWxUcmFkZS9FeGNpc2UvRW1jc1VrQ29kZXMvMyIgeG1sbnM6aWUwPSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OklFODgwOlYzLjIzIiB4bWxuczppZTE9InVybjpwdWJsaWNpZDotOkVDOkRHVEFYVUQ6RU1DUzpQSEFTRTQ6SUU4MjU6VjMuMjMiIHhtbG5zOmllMj0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpJRTcxNzpWMy4yMyIgeG1sbnM6aWUzPSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OklFODE1OlYzLjIzIiB4bWxuczppZT0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpJRTkzNDpWMy4yMyIgeG1sbnM6aWU3MDR1az0iaHR0cDovL3d3dy5nb3Z0YWxrLmdvdi51ay90YXhhdGlvbi9JbnRlcm5hdGlvbmFsVHJhZGUvRXhjaXNlL2llNzA0dWsvMyIgeG1sbnM6aWU4MDE9InVybjpwdWJsaWNpZDotOkVDOkRHVEFYVUQ6RU1DUzpQSEFTRTQ6SUU4MDE6VjMuMjMiIHhtbG5zOmllODAyPSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OklFODAyOlYzLjIzIiB4bWxuczppZTgwMz0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpJRTgwMzpWMy4yMyIgeG1sbnM6aWU4MDc9InVybjpwdWJsaWNpZDotOkVDOkRHVEFYVUQ6RU1DUzpQSEFTRTQ6SUU4MDc6VjMuMjMiIHhtbG5zOmllODEwPSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OklFODEwOlYzLjIzIiB4bWxuczppZTgxMz0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpJRTgxMzpWMy4yMyIgeG1sbnM6aWU4MTg9InVybjpwdWJsaWNpZDotOkVDOkRHVEFYVUQ6RU1DUzpQSEFTRTQ6SUU4MTg6VjMuMjMiIHhtbG5zOmllODE5PSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OklFODE5OlYzLjIzIiB4bWxuczppZTgyOT0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpJRTgyOTpWMy4yMyIgeG1sbnM6aWU4Mzc9InVybjpwdWJsaWNpZDotOkVDOkRHVEFYVUQ6RU1DUzpQSEFTRTQ6SUU4Mzc6VjMuMjMiIHhtbG5zOmllODM5PSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OklFODM5OlYzLjIzIiB4bWxuczppZTg0MD0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpJRTg0MDpWMy4yMyIgeG1sbnM6aWU4NzE9InVybjpwdWJsaWNpZDotOkVDOkRHVEFYVUQ6RU1DUzpQSEFTRTQ6SUU4NzE6VjMuMjMiIHhtbG5zOmllODgxPSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OklFODgxOlYzLjIzIiB4bWxuczppZTkwNT0idXJuOnB1YmxpY2lkOi06RUM6REdUQVhVRDpFTUNTOlBIQVNFNDpJRTkwNTpWMy4yMyIgeG1sbnM6dGNsPSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OlRDTDpWMy4yMyIgeG1sbnM6dG1zPSJ1cm46cHVibGljaWQ6LTpFQzpER1RBWFVEOkVNQ1M6UEhBU0U0OlRNUzpWMy4yMyIgeG1sbnM6dG5zND0iaHR0cDovL3d3dy5nb3Z0YWxrLmdvdi51ay90YXhhdGlvbi9JbnRlcm5hdGlvbmFsVHJhZGUvQ29tbW9uL0NvbnRyb2xEb2N1bWVudCIgeG1sbnM6dG5zNT0iaHR0cDovL3d3dy5nb3Z0YWxrLmdvdi51ay90YXhhdGlvbi9JbnRlcm5hdGlvbmFsVHJhZGUvRXhjaXNlL01vdmVtZW50Rm9yVHJhZGVyRGF0YS8zIiB4bWxuczp0bnM9Imh0dHA6Ly93d3cuZ292dGFsay5nb3YudWsvdGF4YXRpb24vSW50ZXJuYXRpb25hbFRyYWRlL0V4Y2lzZS9OZXdNZXNzYWdlc0RhdGEvMyIgeG1sbnM6eHM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDEvWE1MU2NoZW1hIiB4bWxuczp4c2k9Imh0dHA6Ly93d3cudzMub3JnLzIwMDEvWE1MU2NoZW1hLWluc3RhbmNlIj48aWU4Mzc6SGVhZGVyPjx0bXM6TWVzc2FnZVNlbmRlcj5OREVBLkdCPC90bXM6TWVzc2FnZVNlbmRlcj48dG1zOk1lc3NhZ2VSZWNpcGllbnQ+TkRFQS5HQjwvdG1zOk1lc3NhZ2VSZWNpcGllbnQ+PHRtczpEYXRlT2ZQcmVwYXJhdGlvbj4yMDI0LTA2LTI1PC90bXM6RGF0ZU9mUHJlcGFyYXRpb24+PHRtczpUaW1lT2ZQcmVwYXJhdGlvbj4xNzoyNjoxNS43NDk1MDA8L3RtczpUaW1lT2ZQcmVwYXJhdGlvbj48dG1zOk1lc3NhZ2VJZGVudGlmaWVyPjY5NDI0YzgyLTMxNmItNDExMi04MDAwLWRjNTcyMDk3MDBjNjwvdG1zOk1lc3NhZ2VJZGVudGlmaWVyPjx0bXM6Q29ycmVsYXRpb25JZGVudGlmaWVyPmQ5YzhiY2M0LTI4ZWEtNDI3OC04ZTllLWE0NWY5NjdjYmJmZjwvdG1zOkNvcnJlbGF0aW9uSWRlbnRpZmllcj48L2llODM3OkhlYWRlcj48aWU4Mzc6Qm9keT48aWU4Mzc6RXhwbGFuYXRpb25PbkRlbGF5Rm9yRGVsaXZlcnk+PGllODM3OkF0dHJpYnV0ZXM+PGllODM3OlN1Ym1pdHRlcklkZW50aWZpY2F0aW9uPkdCV0swMDIyODEwMjM8L2llODM3OlN1Ym1pdHRlcklkZW50aWZpY2F0aW9uPjxpZTgzNzpTdWJtaXR0ZXJUeXBlPjE8L2llODM3OlN1Ym1pdHRlclR5cGU+PGllODM3OkV4cGxhbmF0aW9uQ29kZT40PC9pZTgzNzpFeHBsYW5hdGlvbkNvZGU+PGllODM3Ok1lc3NhZ2VSb2xlPjI8L2llODM3Ok1lc3NhZ2VSb2xlPjxpZTgzNzpEYXRlQW5kVGltZU9mVmFsaWRhdGlvbk9mRXhwbGFuYXRpb25PbkRlbGF5PjIwMjQtMDYtMjVUMTc6MjY6MTcuNjIzWjwvaWU4Mzc6RGF0ZUFuZFRpbWVPZlZhbGlkYXRpb25PZkV4cGxhbmF0aW9uT25EZWxheT48L2llODM3OkF0dHJpYnV0ZXM+PGllODM3OkV4Y2lzZU1vdmVtZW50PjxpZTgzNzpBZG1pbmlzdHJhdGl2ZVJlZmVyZW5jZUNvZGU+MjRHQjAwMDAwMDAwMDAwMzkyODY3PC9pZTgzNzpBZG1pbmlzdHJhdGl2ZVJlZmVyZW5jZUNvZGU+PGllODM3OlNlcXVlbmNlTnVtYmVyPjE8L2llODM3OlNlcXVlbmNlTnVtYmVyPjwvaWU4Mzc6RXhjaXNlTW92ZW1lbnQ+PC9pZTgzNzpFeHBsYW5hdGlvbk9uRGVsYXlGb3JEZWxpdmVyeT48L2llODM3OkJvZHk+PC9pZTgzNzpJRTgzNz4=" + + withAuthorizedTrader(consignorId) + when( + movementRepository.streamMovementsByERN(Seq(consignorId)) + ).thenReturn(Source.fromIterator(() => Iterator.fill(90000)( + movement1.copy(messages = Seq.fill(6)(Message( + encodedMessage, + "IE801", + "messageId", + "ern", + Set.empty, + timestamp + ))))) + ) + + val requests = for(_ <- 0 to 19) yield clientRequest(baseUrl) + + await(Future.sequence(requests)).foreach( + _.status mustBe OK + ) + } + + def clientRequest(url: String) = { + Future(getRequest(url)) + } + "return an Unauthorized (401) when no authorized trader" in { withUnauthorizedTrader(InternalError("A general auth failure")) diff --git a/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/models/AuditEventFactorySpec.scala b/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/models/AuditEventFactorySpec.scala index 5fcaea6d..4a09263d 100644 --- a/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/models/AuditEventFactorySpec.scala +++ b/it/test/uk/gov/hmrc/excisemovementcontrolsystemapi/models/AuditEventFactorySpec.scala @@ -163,7 +163,7 @@ class AuditEventFactorySpec extends AnyFreeSpec with Matchers with Auditing with val result = service.createGetMovementsDetails( MovementFilter(None, None, None, None, None), - Seq(movement), + Seq(movement).size, EnrolmentRequest(fakeRequest, ernsSet, userDetails) ) diff --git a/test/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsControllerSpec.scala b/test/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsControllerSpec.scala index 42b671d7..c388ec4f 100644 --- a/test/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsControllerSpec.scala +++ b/test/uk/gov/hmrc/excisemovementcontrolsystemapi/controllers/GetMovementsControllerSpec.scala @@ -17,6 +17,9 @@ package uk.gov.hmrc.excisemovementcontrolsystemapi.controllers import org.apache.pekko.Done +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Source import org.mockito.ArgumentMatchersSugar.{any, eqTo} import org.mockito.MockitoSugar.{reset, times, verify, when} import org.scalatest.BeforeAndAfterEach @@ -51,13 +54,15 @@ class GetMovementsControllerSpec with ErrorResponseSupport with BeforeAndAfterEach { - implicit val ec: ExecutionContext = ExecutionContext.Implicits.global - private val cc = stubControllerComponents() - private val movementService = mock[MovementService] - private val dateTimeService = mock[DateTimeService] - private val messageService = mock[MessageService] - private val movementIdValidator = mock[MovementIdValidation] - private val auditService = mock[AuditService] + implicit val ec: ExecutionContext = ExecutionContext.Implicits.global + private val actorSystem = ActorSystem() + implicit val materializer: Materializer = Materializer.createMaterializer(actorSystem) + private val cc = stubControllerComponents() + private val movementService = mock[MovementService] + private val dateTimeService = mock[DateTimeService] + private val messageService = mock[MessageService] + private val movementIdValidator = mock[MovementIdValidation] + private val auditService = mock[AuditService] private val controller = new GetMovementsController( FakeSuccessAuthentication(Set(ern)), @@ -161,6 +166,24 @@ class GetMovementsControllerSpec ) ) + when(movementService.streamMovementsByErn(any)) + .thenReturn( + Source.fromIterator(() => + Iterator.single( + Movement( + "cfdb20c7-d0b0-4b8b-a071-737d68dede5e", + Some("boxId"), + "lrn", + ern, + Some("consigneeId"), + Some("arc"), + timestamp, + Seq.empty + ) + ) + ) + ) + when(dateTimeService.timestamp()).thenReturn(timestamp) when(messageService.updateAllMessages(any)(any)).thenReturn(Future.successful(Done)) @@ -192,7 +215,7 @@ class GetMovementsControllerSpec ) withClue("Submits GetInformation (GetMovements) audit event") { verify(auditService, times(1)) - .getInformationForGetMovements(eqTo(filter), eqTo(movements), any[EnrolmentRequest[AnyContent]])( + .getInformationForGetMovements(eqTo(filter), eqTo(movements.size), any[EnrolmentRequest[AnyContent]])( any ) } @@ -222,6 +245,9 @@ class GetMovementsControllerSpec when(movementService.getMovementByErn(any, any)) .thenReturn(Future.successful(Seq(movement1, movement2))) + when(movementService.streamMovementsByErn(any)) + .thenReturn(Source.fromIterator(() => Iterator(movement1, movement2))) + val result = controller.getMovements(None, None, None, None, None)(enrolmentRequest) status(result) mustBe OK @@ -232,12 +258,12 @@ class GetMovementsControllerSpec ) ) - verify(movementService).getMovementByErn(eqTo(Seq(ern)), any) +// verify(movementService).getMovementByErn(eqTo(Seq(ern)), any) withClue("Submits GetInformation (GetMovements) audit event") { verify(auditService, times(1)) .getInformationForGetMovements( eqTo(MovementFilter(None, None, None, None, None)), - eqTo(Seq(movement1, movement2)), + eqTo(Seq(movement1, movement2).size), any[EnrolmentRequest[AnyContent]] )(any) } @@ -281,6 +307,9 @@ class GetMovementsControllerSpec when(movementService.getMovementByErn(any, any)) .thenReturn(Future.successful(Seq(movement1, movement2))) + when(movementService.streamMovementsByErn(any)) + .thenReturn(Source(Seq(movement1, movement2))) + val result = controller.getMovements(None, None, None, None, None)(enrolmentRequest) status(result) mustBe OK @@ -297,7 +326,7 @@ class GetMovementsControllerSpec verify(auditService, times(1)) .getInformationForGetMovements( eqTo(MovementFilter(None, None, None, None, None)), - eqTo(Seq(movement1, movement2)), + eqTo(Seq(movement1, movement2).size), any[EnrolmentRequest[AnyContent]] )( any @@ -337,6 +366,9 @@ class GetMovementsControllerSpec when(movementService.getMovementByErn(any, any)) .thenReturn(Future.successful(Seq(movement2))) + when(movementService.streamMovementsByErn(any)) + .thenReturn(Source.fromIterator(() => Iterator.single(movement2))) + val result = controller.getMovements(Some(localErn), None, None, None, None)(enrolmentRequest) status(result) mustBe OK @@ -350,7 +382,7 @@ class GetMovementsControllerSpec verify(auditService, times(1)) .getInformationForGetMovements( eqTo(MovementFilter(Some(localErn), None, None, None, None)), - eqTo(Seq(movement2)), + eqTo(Seq(movement2).size), any[EnrolmentRequest[AnyContent]] )( any @@ -372,12 +404,13 @@ class GetMovementsControllerSpec ) ) - await( - controller - .getMovements(Some(ern), Some("lrn"), Some("arc"), Some(timestamp.toString), Some("consignor"))( - enrolmentRequest - ) - ) + val result = controller + .getMovements(Some(ern), Some("lrn"), Some("arc"), Some(timestamp.toString), Some("consignor"))( + enrolmentRequest + ) + + status(result) mustBe OK + contentAsJson(result) val filter = MovementFilter( ern = Some(ern), @@ -387,21 +420,13 @@ class GetMovementsControllerSpec traderType = Some(TraderType(traderType = "consignor", erns = Seq(ern))) ) - verify(movementService).getMovementByErn(any, eqTo(filter)) + verify(movementService).streamMovementsByErn(eqTo(Seq(ern))) withClue("Submits GetInformation (GetMovements) audit event") { verify(auditService, times(1)) .getInformationForGetMovements( - eqTo( - MovementFilter( - Some(ern), - Some("lrn"), - Some("arc"), - Some(timestamp), - Some(TraderType("consignor", Seq(ern))) - ) - ), - eqTo(movements), + eqTo(filter), + eqTo(movements.size), any )(any) } @@ -419,21 +444,15 @@ class GetMovementsControllerSpec Seq.empty[Message] ) ) - await( - controller - .getMovements(Some(ern), Some("lrn"), Some("arc"), Some(timestamp.toString), Some("consignee"))( - enrolmentRequest - ) - ) - val filter = MovementFilter( - ern = Some(ern), - lrn = Some("lrn"), - arc = Some("arc"), - updatedSince = Some(timestamp), - traderType = Some(TraderType(traderType = "consignee", erns = Seq(ern))) - ) - verify(movementService).getMovementByErn(any, eqTo(filter)) + val result = + controller.getMovements(Some(ern), Some("lrn"), Some("arc"), Some(timestamp.toString), Some("consignee"))( + enrolmentRequest + ) + status(result) mustBe OK + contentAsJson(result) + + verify(movementService).streamMovementsByErn(eqTo(Seq(ern))) withClue("Submits GetInformation (GetMovements) audit event") { verify(auditService, times(1)) @@ -447,7 +466,7 @@ class GetMovementsControllerSpec Some(TraderType("consignee", Seq(ern))) ) ), - eqTo(movements), + eqTo(movements.size), any )( any diff --git a/test/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditServiceSpec.scala b/test/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditServiceSpec.scala index ceaaab3d..7ce35c42 100644 --- a/test/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditServiceSpec.scala +++ b/test/uk/gov/hmrc/excisemovementcontrolsystemapi/services/AuditServiceSpec.scala @@ -232,7 +232,7 @@ class AuditServiceSpec extends PlaySpec with TestXml with BeforeAndAfterEach wit authExciseNumber = NonEmptySeq(enrolmentRequest.erns.head, enrolmentRequest.erns.tail.toSeq) ) - service.getInformationForGetMovements(filter, movements, enrolmentRequest) + service.getInformationForGetMovements(filter, movements.size, enrolmentRequest) verify(auditConnector, times(1)) .sendExplicitAudit(eqTo("GetInformation"), eqTo(expectedDetails))(any, any, any)