From b00857368a56474e818fabe8a06b4873ab57d234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=A0est=C3=A1k=20V=C3=ADt?= Date: Wed, 21 Jun 2017 10:18:39 +0200 Subject: [PATCH] Added throttling to reduce Bamboo peak load and number of concurrent connections --- app/com/ysoft/odc/BambooDownloader.scala | 7 ++++-- .../VulnerabilityNotificationService.scala | 15 ------------- app/services/throttlers.scala | 22 +++++++++++++++++++ 3 files changed, 27 insertions(+), 17 deletions(-) create mode 100644 app/services/throttlers.scala diff --git a/app/com/ysoft/odc/BambooDownloader.scala b/app/com/ysoft/odc/BambooDownloader.scala index 5f2ad1c..38b647b 100644 --- a/app/com/ysoft/odc/BambooDownloader.scala +++ b/app/com/ysoft/odc/BambooDownloader.scala @@ -4,6 +4,7 @@ import com.google.inject.Inject import com.google.inject.name.Named import org.ccil.cowan.tagsoup.jaxp.SAXFactoryImpl import play.api.libs.ws.{WS, WSClient} +import services.SingleFutureExecutionThrottler import upickle.default._ import scala.concurrent.{ExecutionContext, Future} @@ -64,6 +65,8 @@ final case class FlatArtifactDirectory(name: String, items: Seq[(String, String) final class BambooDownloader @Inject()(@Named("bamboo-server-url") val server: String, @Named("bamboo-authentication") auth: AtlassianAuthentication)(implicit executionContext: ExecutionContext, wSClient: WSClient) extends Downloader { + private val throttler = new SingleFutureExecutionThrottler() + private object ArtifactKeys{ val BuildLog = "Build log" val ResultsHtml = "Report results-HTML" @@ -76,7 +79,7 @@ final class BambooDownloader @Inject()(@Named("bamboo-server-url") val server: S } private def downloadArtifact(url: String, name: String)(implicit wSClient: WSClient): Future[FlatArtifactItem] = { - bambooUrl(url).get().map{response => + throttler.throttle(bambooUrl(url).get()).map{response => response.header("Content-Disposition") match{ case Some(_) => ArtifactFile(name = name, data = response.bodyAsBytes) case None => @@ -149,7 +152,7 @@ final class BambooDownloader @Inject()(@Named("bamboo-server-url") val server: S private def downloadProjectReport(project: String, versionOption: Option[Int]): Future[(String, Try[(Build, ArtifactItem, ArtifactFile)])] = { val url = s"$server/rest/api/latest/result/$project-${versionOption.getOrElse("latest")}.json?expand=artifacts" - val resultFuture = (bambooUrl(url).get().flatMap { response => + val resultFuture = (throttler.throttle(bambooUrl(url).get()).flatMap { response => val build = read[Build](response.body) val artifactMap: Map[String, Artifact] = build.artifacts.artifact.map(x => x.name -> x).toMap val logFuture = downloadArtifact(artifactMap, ArtifactKeys.BuildLog).map(_.asInstanceOf[ArtifactFile]) diff --git a/app/services/VulnerabilityNotificationService.scala b/app/services/VulnerabilityNotificationService.scala index 6f9e074..a205624 100644 --- a/app/services/VulnerabilityNotificationService.scala +++ b/app/services/VulnerabilityNotificationService.scala @@ -14,21 +14,6 @@ import slick.jdbc.TransactionIsolation import scala.concurrent.{ExecutionContext, Future} -final class SingleFutureExecutionThrottler() (implicit executionContext: ExecutionContext){ - private var nextFuture: Future[_] = Future.successful(null) - - def throttle[T](f: => Future[T]): Future[T] = synchronized{ - val newFuture = nextFuture.recover{ case _ => null}.flatMap(_ => f) - nextFuture = newFuture - newFuture - } - -} - -final class NoThrottler() (implicit executionContext: ExecutionContext){ - def throttle[T](f: => Future[T]): Future[T] = f -} - class VulnerabilityNotificationService @Inject() (protected val dbConfigProvider: DatabaseConfigProvider)(implicit executionContext: ExecutionContext) extends HasDatabaseConfigProvider[models.profile.type]{ import dbConfig.driver.api._ import models.tables._ diff --git a/app/services/throttlers.scala b/app/services/throttlers.scala new file mode 100644 index 0000000..9dddb80 --- /dev/null +++ b/app/services/throttlers.scala @@ -0,0 +1,22 @@ +package services + +import scala.concurrent.{ExecutionContext, Future} + +trait Throttler { + def throttle[T](f: => Future[T]): Future[T] +} + +final class SingleFutureExecutionThrottler() (implicit executionContext: ExecutionContext) extends Throttler{ + private var nextFuture: Future[_] = Future.successful(null) + + def throttle[T](f: => Future[T]): Future[T] = synchronized{ + val newFuture = nextFuture.recover{ case _ => null}.flatMap(_ => f) + nextFuture = newFuture + newFuture + } + +} + +final class NoThrottler() (implicit executionContext: ExecutionContext) extends Throttler{ + def throttle[T](f: => Future[T]): Future[T] = f +}