package services import _root_.org.joda.time.DateTime import com.google.inject.Inject import com.mohiva.play.silhouette.api.LoginInfo import com.ysoft.odc.SetDiff import controllers.{ProjectsWithReports, ReportInfo} import models._ import models.tables._ import play.api.Logger import play.api.db.slick.{DatabaseConfigProvider, HasDatabaseConfigProvider} import slick.dbio.FutureAction import slick.jdbc.TransactionIsolation import scala.concurrent.{ExecutionContext, Future} class VulnerabilityNotificationService @Inject() (protected val dbConfigProvider: DatabaseConfigProvider)(implicit executionContext: ExecutionContext) extends HasDatabaseConfigProvider[models.profile.type]{ import dbConfig.driver.api._ import models.tables._ def subscribers = db.run(vulnerabilitySubscriptions.result).map(_.groupBy(_.user)) def watchedProjectsByUser(identity: LoginInfo) = db.run(vulnerabilitySubscriptions.filter(_.user === identity).result) def subscribe(user: LoginInfo, project: String) = db.run( DBIO.seq( ensureUserHasNotificationDigestStatus(user), vulnerabilitySubscriptions += VulnerabilitySubscription(user = user, project = project) ) ) private def ensureUserHasNotificationDigestStatus(user: LoginInfo): DBIOAction[Unit, slick.dbio.NoStream, Effect.Read with Effect.Write] = ( notificationDigestStatuses.filter(_.user === user).result.map(_.nonEmpty) flatMap { case true => DBIO.seq() case false => for{ changelogTopIdOption <- changelogTopIdOptionQuery (res: Int) <- notificationDigestStatuses += NotificationDigestStatus(user = user, lastChangelogIdOption = changelogTopIdOption) } yield () } ).withTransactionIsolation(TransactionIsolation.Serializable) def unsubscribe(user: LoginInfo, project: String) = db.run(vulnerabilitySubscriptions.filter(vs => vs.user === user && vs.project === project).delete) def getRecipientsForProjects(projects: Set[ReportInfo]) = { val bareProjects = projects.map(_.bare) val expandedProjects = projects ++ bareProjects val relevantFullIds = expandedProjects.map(_.fullId) db.run(vulnerabilitySubscriptions.filter(_.project inSet relevantFullIds).map(_.user()).result) } class ExportPlatform[T, U] private[VulnerabilityNotificationService] (ept: ExportPlatformTables[T, U]) { def loadUnfinishedTickets(): Future[Seq[(Int, ExportedVulnerability[T])]] = db.run( ept.tickets.filter(_.done === false).result ) def loadAllTickets(): Future[Seq[(Int, ExportedVulnerability[T])]] = db.run( ept.tickets.result ) def changeProjects(ticketId: Int, diff: SetDiff[String], projects: ProjectsWithReports): Future[Unit] = db.run( DBIO.seq( ept.tickets.filter(_.id === ticketId).map(_.done).update(diff.newSet.isEmpty), ept.projects.filter(_.exportedVulnerabilityId === ticketId).delete, ept.projects ++= diff.newSet.map(fullId => ExportedVulnerabilityProject(ticketId, fullId)).toSet ).transactionally ) def projectsForTickets(ticketsIds: Set[Int]): Future[Map[Int, Set[String]]] = db.run( ept.projects.filter(_.exportedVulnerabilityId inSet ticketsIds).result ).map{_.groupBy(_.exportedVulnerabilityId).mapValues(_.map(_.projectFullId).toSet).map(identity).withDefaultValue(Set())} def ticketsForVulnerabilities(vulnerabilities: Traversable[String]): Future[Map[String, (Int, ExportedVulnerability[T])]] = db.run( ept.tickets.filter(_.vulnerabilityName inSet vulnerabilities).result ).map(_.map{ rec => rec._2.vulnerabilityName -> rec }.toMap) def ticketForVulnerability(vulnerabilityName: String) = db.run( ept.tickets.filter(_.vulnerabilityName === vulnerabilityName).map(_.base).result ).map(_.headOption) def addTicket(vulnerabilityTicket: ExportedVulnerability[T], projects: Set[ReportInfo]): Future[Any] = db.run( ( ept.tickets.map(_.base).returning(ept.tickets.map(_.id)) += vulnerabilityTicket ).flatMap( id => ept.projects ++= projects.map(ri => ExportedVulnerabilityProject(id, ri.fullId)).toSet ).transactionally ) } /** * The changelogThrottler is a temporary hack than prevents some congestion that seems to occur in HikariCP or maybe in Slick. * It prevents exceptions like “java.sql.SQLException: Timeout of 1001ms encountered waiting for connection”. * It is probably prevented at a wrong level, but it works :) */ private val changelogThrottler = new SingleFutureExecutionThrottler() // private val changelogThrottler = new NoThrottler() def changeAffectedProjects(vulnerabilityName: String, affectedProjectsDiff: SetDiff[String]): Future[Unit] = { val time = DateTime.now() def createRecord(projectName: String, direction: Change.Direction) = Change( time = time, vulnerabilityName = vulnerabilityName, projectName = projectName, direction = direction, notifiedToSomebody = false ) val recordsToAdd = affectedProjectsDiff.added.map(projectName => createRecord(projectName, Change.Direction.Added)) ++ affectedProjectsDiff.removed.map(projectName => createRecord(projectName, Change.Direction.Removed)) /* Transaction: It is essential to ensure that records in changelog appear in order. Low level of isolation might be even worse than running outside of transaction. In longer term, it should be wrapped with transaction *with a proper isolation level* together with the export status modification. */ changelogThrottler.throttle(db.run( (changelog.map(_.base) ++= recordsToAdd).withTransactionIsolation(TransactionIsolation.Serializable) ).map(_ => ())) } private def changelogTopIdOptionQuery = changelog.map(_.id).max.result def sendDigestToSubscriber(subscriber: LoginInfo, subscriptions: Seq[VulnerabilitySubscription])(sendDigest: Seq[Change] => Future[Unit]): Future[Unit] = { def subscriptionCondition(change: Changes, subscription: VulnerabilitySubscription): Rep[Boolean] = { //noinspection ScalaUnnecessaryParentheses – because it looks less confusing (change.projectName === subscription.project) || (if (subscription.project contains '/') (false: Rep[Boolean]) else change.projectName.startsWith(subscription.project + "/")) } val projectCondition: Changes => Rep[Boolean] = (change) => subscriptions.foldLeft(false: Rep[Boolean])((cond, subscription) => cond || subscriptionCondition(change, subscription)) val notificationDigestStatusSelection = notificationDigestStatuses.filter(_.user === subscriber) db.run( ( for{ oldStatus <- notificationDigestStatusSelection.result.map(_.head) _ = println(oldStatus.lastChangelogIdOption.fold(changelog.filter(_ => true: Rep[Boolean]))(lastChangelogId => changelog.filter(_.id > lastChangelogId)).filter(projectCondition).result.statements.mkString("\n")) changelogSinceLastNotified <- oldStatus.lastChangelogIdOption.fold(changelog.filter(_ => true: Rep[Boolean]))(lastChangelogId => changelog.filter(_.id > lastChangelogId)).filter(projectCondition).result changelogIds = changelogSinceLastNotified.map(_._1) changelogTopIdOption <- changelogTopIdOptionQuery newLastChangelogIdOption = changelogTopIdOption.orElse(oldStatus.lastChangelogIdOption) (_: Unit) <- FutureAction(sendDigest(changelogSinceLastNotified.map(_._2))) // Yes, we are waiting for user to be notified over some slow I/O when having an open transaction (_: Int) <- notificationDigestStatusSelection.map(_.lastChangelogId).update(newLastChangelogIdOption) (_: Int) <- changelog.filter(_.id inSet changelogIds).map(_.notifiedToSomebody).update(true) } yield () ).withTransactionIsolation(TransactionIsolation.Serializable) ) } val issueTrackerExport = new ExportPlatform(tables.issueTrackerExportTables) val mailExport = new ExportPlatform(tables.mailExportTables) val diffDbExport = new ExportPlatform(tables.diffDbExportTables) }