Files
odc-analyzer/app/services/VulnerabilityNotificationService.scala
2016-03-10 16:30:16 +01:00

162 lines
8.0 KiB
Scala
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import _root_.org.joda.time.DateTime
import com.google.inject.Inject
import com.mohiva.play.silhouette.api.LoginInfo
import com.ysoft.odc.SetDiff
import controllers.{ProjectsWithReports, ReportInfo}
import models._
import models.tables._
import play.api.Logger
import play.api.db.slick.{DatabaseConfigProvider, HasDatabaseConfigProvider}
import slick.dbio.FutureAction
import slick.jdbc.TransactionIsolation
import scala.concurrent.{ExecutionContext, Future}
final class SingleFutureExecutionThrottler() (implicit executionContext: ExecutionContext){
private var nextFuture: Future[_] = Future.successful(null)
def throttle[T](f: => Future[T]): Future[T] = synchronized{
val newFuture = nextFuture.recover{ case _ => null}.flatMap(_ => f)
nextFuture = newFuture
newFuture
}
}
final class NoThrottler() (implicit executionContext: ExecutionContext){
def throttle[T](f: => Future[T]): Future[T] = f
}
class VulnerabilityNotificationService @Inject() (protected val dbConfigProvider: DatabaseConfigProvider)(implicit executionContext: ExecutionContext) extends HasDatabaseConfigProvider[models.profile.type]{
import dbConfig.driver.api._
import models.tables._
def subscribers = db.run(vulnerabilitySubscriptions.result).map(_.groupBy(_.user))
def watchedProjectsByUser(identity: LoginInfo) = db.run(vulnerabilitySubscriptions.filter(_.user === identity).result)
def subscribe(user: LoginInfo, project: String) = db.run(
DBIO.seq(
ensureUserHasNotificationDigestStatus(user),
vulnerabilitySubscriptions += VulnerabilitySubscription(user = user, project = project)
)
)
private def ensureUserHasNotificationDigestStatus(user: LoginInfo): DBIOAction[Unit, slick.dbio.NoStream, Effect.Read with Effect.Write] = (
notificationDigestStatuses.filter(_.user === user).result.map(_.nonEmpty) flatMap {
case true => DBIO.seq()
case false => for{
changelogTopIdOption <- changelogTopIdOptionQuery
(res: Int) <- notificationDigestStatuses += NotificationDigestStatus(user = user, lastChangelogIdOption = changelogTopIdOption)
} yield ()
}
).withTransactionIsolation(TransactionIsolation.Serializable)
def unsubscribe(user: LoginInfo, project: String) = db.run(vulnerabilitySubscriptions.filter(vs => vs.user === user && vs.project === project).delete)
def getRecipientsForProjects(projects: Set[ReportInfo]) = {
val bareProjects = projects.map(_.bare)
val expandedProjects = projects ++ bareProjects
val relevantFullIds = expandedProjects.map(_.fullId)
db.run(vulnerabilitySubscriptions.filter(_.project inSet relevantFullIds).map(_.user()).result)
}
class ExportPlatform[T, U] private[VulnerabilityNotificationService] (ept: ExportPlatformTables[T, U]) {
def changeProjects(ticketId: Int, diff: SetDiff[String], projects: ProjectsWithReports) = db.run(
DBIO.seq(
ept.projects.filter(_.exportedVulnerabilityId === ticketId).delete,
ept.projects ++= diff.newSet.map(fullId => ExportedVulnerabilityProject(ticketId, fullId)).toSet
).transactionally
)
def projectsForTickets(ticketsIds: Set[Int]): Future[Map[Int, Set[String]]] = db.run(
ept.projects.filter(_.exportedVulnerabilityId inSet ticketsIds).result
).map{_.groupBy(_.exportedVulnerabilityId).mapValues(_.map(_.projectFullId).toSet).map(identity).withDefaultValue(Set())}
def ticketsForVulnerabilities(vulnerabilities: Traversable[String]) = db.run(
ept.tickets.filter(_.vulnerabilityName inSet vulnerabilities).result
).map(_.map{ rec =>
rec._2.vulnerabilityName -> rec
}.toMap)
def ticketForVulnerability(vulnerabilityName: String) = db.run(
ept.tickets.filter(_.vulnerabilityName === vulnerabilityName).map(_.base).result
).map(_.headOption)
def addTicket(vulnerabilityTicket: ExportedVulnerability[T], projects: Set[ReportInfo]): Future[Any] = db.run(
(
ept.tickets.map(_.base).returning(ept.tickets.map(_.id)) += vulnerabilityTicket
).flatMap( id =>
ept.projects ++= projects.map(ri => ExportedVulnerabilityProject(id, ri.fullId)).toSet
).transactionally
)
}
/**
* The changelogThrottler is a temporary hack than prevents some congestion that seems to occur in HikariCP or maybe in Slick.
* It prevents exceptions like “java.sql.SQLException: Timeout of 1001ms encountered waiting for connection”.
* It is probably prevented at a wrong level, but it works :)
*/
private val changelogThrottler = new SingleFutureExecutionThrottler()
// private val changelogThrottler = new NoThrottler()
def changeAffectedProjects(vulnerabilityName: String, affectedProjectsDiff: SetDiff[String]): Future[Unit] = {
val time = DateTime.now()
def createRecord(projectName: String, direction: Change.Direction) = Change(
time = time,
vulnerabilityName = vulnerabilityName,
projectName = projectName,
direction = direction,
notifiedToSomebody = false
)
val recordsToAdd = affectedProjectsDiff.added.map(projectName => createRecord(projectName, Change.Direction.Added)) ++
affectedProjectsDiff.removed.map(projectName => createRecord(projectName, Change.Direction.Removed))
/*
Transaction:
It is essential to ensure that records in changelog appear in order. Low level of isolation might be even worse than running outside of transaction.
In longer term, it should be wrapped with transaction *with a proper isolation level* together with the export status modification.
*/
changelogThrottler.throttle(db.run(
(changelog.map(_.base) ++= recordsToAdd).withTransactionIsolation(TransactionIsolation.Serializable)
).map(_ => ()))
}
private def changelogTopIdOptionQuery = changelog.map(_.id).max.result
def sendDigestToSubscriber(subscriber: LoginInfo, subscriptions: Seq[VulnerabilitySubscription])(sendDigest: Seq[Change] => Future[Unit]): Future[Unit] = {
def subscriptionCondition(change: Changes, subscription: VulnerabilitySubscription): Rep[Boolean] = {
//noinspection ScalaUnnecessaryParentheses because it looks less confusing
(change.projectName === subscription.project) ||
(if (subscription.project contains '/') (false: Rep[Boolean]) else change.projectName.startsWith(subscription.project + "/"))
}
val projectCondition: Changes => Rep[Boolean] = (change) =>
subscriptions.foldLeft(false: Rep[Boolean])((cond, subscription) => cond || subscriptionCondition(change, subscription))
val notificationDigestStatusSelection = notificationDigestStatuses.filter(_.user === subscriber)
db.run(
(
for{
oldStatus <- notificationDigestStatusSelection.result.map(_.head)
_ = println(oldStatus.lastChangelogIdOption.fold(changelog.filter(_ => true: Rep[Boolean]))(lastChangelogId => changelog.filter(_.id > lastChangelogId)).filter(projectCondition).result.statements.mkString("\n"))
changelogSinceLastNotified <- oldStatus.lastChangelogIdOption.fold(changelog.filter(_ => true: Rep[Boolean]))(lastChangelogId => changelog.filter(_.id > lastChangelogId)).filter(projectCondition).result
changelogIds = changelogSinceLastNotified.map(_._1)
changelogTopIdOption <- changelogTopIdOptionQuery
newLastChangelogIdOption = changelogTopIdOption.orElse(oldStatus.lastChangelogIdOption)
(_: Unit) <- FutureAction(sendDigest(changelogSinceLastNotified.map(_._2))) // Yes, we are waiting for user to be notified over some slow I/O when having an open transaction
(_: Int) <- notificationDigestStatusSelection.map(_.lastChangelogId).update(newLastChangelogIdOption)
(_: Int) <- changelog.filter(_.id inSet changelogIds).map(_.notifiedToSomebody).update(true)
} yield ()
).withTransactionIsolation(TransactionIsolation.Serializable)
)
}
val issueTrackerExport = new ExportPlatform(tables.issueTrackerExportTables)
val mailExport = new ExportPlatform(tables.mailExportTables)
val diffDbExport = new ExportPlatform(tables.diffDbExportTables)
}