Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
package uk.gov.hmrc.excisemovementcontrolsystemapi.controllers

import cats.data._
import cats.implicits.toFlatMapOps
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.ByteString
import play.api.Logging
import play.api.http.HttpEntity.Strict
import play.api.libs.json.Json
import play.api.mvc._
import uk.gov.hmrc.excisemovementcontrolsystemapi.controllers.actions._
import uk.gov.hmrc.excisemovementcontrolsystemapi.filters.{MovementFilter, TraderType}
import uk.gov.hmrc.excisemovementcontrolsystemapi.models.auth.EnrolmentRequest
import uk.gov.hmrc.excisemovementcontrolsystemapi.models.validation.MovementIdValidation
import uk.gov.hmrc.excisemovementcontrolsystemapi.models.{ErrorResponse, ExciseMovementResponse}
import uk.gov.hmrc.excisemovementcontrolsystemapi.repository.model.Movement
Expand All @@ -35,6 +42,7 @@ import uk.gov.hmrc.play.http.HeaderCarrierConverter
import java.time.Instant
import javax.inject.Inject
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.scalaUtilChainingOps
import scala.util.control.NonFatal

class GetMovementsController @Inject() (
Expand All @@ -49,7 +57,7 @@ class GetMovementsController @Inject() (
messageService: MessageService,
movementIdValidator: MovementIdValidation,
auditService: AuditService
)(implicit ec: ExecutionContext)
)(implicit ec: ExecutionContext, materializer: Materializer)
extends BackendController(cc)
with Logging {

Expand All @@ -76,15 +84,19 @@ class GetMovementsController @Inject() (
traderType.map(trader => TraderType(trader, request.erns.toSeq))
)

{
for {
_ <- messageService.updateAllMessages(ern.fold(request.erns)(Set(_)))
movements <- movementService.getMovementByErn(request.erns.toSeq, filter)
} yield {
auditService.getInformationForGetMovements(filter, movements, request)
Comment thread
hmrc-spifix marked this conversation as resolved.
Ok(Json.toJson(movements.map(createResponseFrom)))
}
}.recover { case NonFatal(ex) =>
val result = for {
_ <- messageService.updateAllMessages(ern.fold(request.erns)(Set(_)))
payload <- movementService
.streamMovementsByErn(request.erns.toSeq)
.pipe(sizedJsonArrayPayload)
.flatTap { case (count, _) => Future.successful(audit(count, filter)) }
.map(_._2)

} yield {
Result(ResponseHeader(OK), Strict(payload, Some("application/json")))
}

result.recover { case NonFatal(ex) =>
logger.warn(
s"Error getting movements for erns ${request.erns} with filters ern: $ern, lrn: $lrn, arc: $arc, updatedSince: $updatedSince, traderType: $traderType",
ex
Expand All @@ -101,6 +113,24 @@ class GetMovementsController @Inject() (
}
}

private def audit(movementCount: Int, filter: MovementFilter)(implicit
request: EnrolmentRequest[AnyContent],
hc: HeaderCarrier
): Unit =
auditService.getInformationForGetMovements(filter, movementCount, request)

private def sizedJsonArrayPayload(source: Source[Movement, NotUsed]): Future[(Int, ByteString)] =
source
.map(createResponseFrom)
.map(Json.toJson(_).toString())
.grouped(2)
.map(group => group.size -> ByteString(group.mkString(",")))
.pipe(Source.single(0 -> ByteString("[")) ++ _ ++ Source.single(0 -> ByteString("]")))
.runFold(0 -> ByteString.empty)(foldSizedPayload)

private def foldSizedPayload(left: (Int, ByteString), right: (Int, ByteString)): (Int, ByteString) =
left._1 + right._1 -> left._2.concat(right._2)

def getMovement(movementId: String): Action[AnyContent] =
(authAction andThen correlationIdAction).async(parse.default) { implicit request =>
implicit val hc: HeaderCarrier = HeaderCarrierConverter.fromRequest(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class AuditEventFactory @Inject() (emcsUtils: EmcsUtils, ieMessageFactory: IEMes

def createGetMovementsDetails(
movementFilter: MovementFilter,
movements: Seq[Movement],
movements: Int,
request: EnrolmentRequest[AnyContent]
): GetMovementsAuditInfo = {
val parameters = GetMovementsParametersAuditInfo(
Expand All @@ -108,7 +108,7 @@ class AuditEventFactory @Inject() (emcsUtils: EmcsUtils, ieMessageFactory: IEMes
movementFilter.traderType.map(x => x.traderType)
)
val response = GetMovementsResponseAuditInfo(
movements.length
movements
)
GetMovementsAuditInfo(
request = parameters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package uk.gov.hmrc.excisemovementcontrolsystemapi.repository

import cats.implicits.toFunctorOps
import org.apache.pekko.Done
import org.apache.pekko.{Done, NotUsed}
import org.apache.pekko.stream.scaladsl.Source
import org.bson.conversions.Bson
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model._
Expand Down Expand Up @@ -170,6 +171,16 @@ class MovementRepository @Inject() (
.toFuture()
}

def streamMovementsByERN(
ern: Seq[String]
): Source[Movement, NotUsed] = {

val ernFilters: Seq[Bson] = getErnFilters(ern)
val publisher = collection.find(and(ernFilters: _*))

Source.fromPublisher(publisher)
}

private def getErnFilters(ern: Seq[String]) =
Seq(
Filters.in("consignorId", ern: _*),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class AuditService @Inject() (auditConnector: AuditConnector, appConfig: AppConf

def getInformationForGetMovements(
movementFilter: MovementFilter,
movements: Seq[Movement],
movements: Int,
request: EnrolmentRequest[AnyContent]
)(implicit hc: HeaderCarrier): Unit =
if (appConfig.newAuditingEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package uk.gov.hmrc.excisemovementcontrolsystemapi.services

import com.google.inject.Singleton
import org.apache.pekko.Done
import org.apache.pekko.{Done, NotUsed}
import org.apache.pekko.stream.scaladsl.Source
import org.mongodb.scala.MongoCommandException
import play.api.Logging
import play.api.libs.json.Json
Expand Down Expand Up @@ -117,6 +118,9 @@ class MovementService @Inject() (
.getMovementByERN(ern, filter)
.map(movements => filterMovementByTraderType(movements, filter.traderType))

def streamMovementsByErn(ern: Seq[String]): Source[Movement, NotUsed] =
movementRepository.streamMovementsByERN(ern)

private def filterMovementByTraderType(movements: Seq[Movement], traderType: Option[TraderType]) =
traderType.fold[Seq[Movement]](movements) { trader =>
if (trader.traderType.equalsIgnoreCase("consignor")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package uk.gov.hmrc.excisemovementcontrolsystemapi

import org.apache.pekko.stream.scaladsl.Source
import org.mockito.ArgumentMatchersSugar.{any, eqTo}
import org.mockito.MockitoSugar.{reset, when}
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -72,9 +73,9 @@ class GetMovementsControllerItSpec
"return 200 and movements when logged in as consignor" in {
withAuthorizedTrader(consignorId)
when(
movementRepository.getMovementByERN(Seq(consignorId), MovementFilter.emptyFilter)
movementRepository.streamMovementsByERN(Seq(consignorId))
)
.thenReturn(Future.successful(Seq(movement1, movement2)))
.thenReturn(Source.fromIterator(() => Iterator(movement1, movement2)))

val result = getRequest(baseUrl)

Expand All @@ -89,6 +90,23 @@ class GetMovementsControllerItSpec
}
}

"return 200 with a large stream" in {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Profiled with -Xmx512m and gracefully handles all 20 concurrent client requests for 90K movements

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image

withAuthorizedTrader(consignorId)
when(
movementRepository.streamMovementsByERN(Seq(consignorId))
).thenReturn(Source.fromIterator(() => Iterator.fill(90000)(movement1)))

val requests = for(_ <- 0 to 19) yield clientRequest(baseUrl)

await(Future.sequence(requests)).foreach(
_.status mustBe OK
)
}

def clientRequest(url: String) = {
Future(getRequest(url))
}

"return an Unauthorized (401) when no authorized trader" in {
withUnauthorizedTrader(InternalError("A general auth failure"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class AuditEventFactorySpec extends AnyFreeSpec with Matchers with Auditing with

val result = service.createGetMovementsDetails(
MovementFilter(None, None, None, None, None),
Seq(movement),
Seq(movement).size,
EnrolmentRequest(fakeRequest, ernsSet, userDetails)
)

Expand Down
Loading