Changed order to prevent blocking, CallableDownloadTasks automatically submit the results to the process executor queue now

Former-commit-id: 1d15e48e0485817c4fad1fb7e117e03fd21d43ce
This commit is contained in:
Jeremy Long
2013-12-07 12:04:08 -05:00
parent 6640df18ac
commit 63848e815f
2 changed files with 65 additions and 70 deletions

View File

@@ -21,8 +21,11 @@ package org.owasp.dependencycheck.data.update;
import java.io.File;
import java.net.URL;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.owasp.dependencycheck.data.nvdcve.CveDB;
import org.owasp.dependencycheck.utils.DownloadFailedException;
import org.owasp.dependencycheck.utils.Downloader;
@@ -31,20 +34,37 @@ import org.owasp.dependencycheck.utils.Downloader;
*
* @author Jeremy Long (jeremy.long@owasp.org)
*/
public class CallableDownloadTask implements Callable<CallableDownloadTask> {
public class CallableDownloadTask implements Callable<Future<ProcessTask>> {
/**
* Simple constructor for the callable download task.
*
* @param nvdCveInfo the nvd cve info
* @param nvdCveInfo the NVD CVE info
* @param first the first file
* @param second the second file
* @param processor the processor service to submit the downloaded files to
* @param cveDB the CVE DB to use to store the vulnerability data
*/
public CallableDownloadTask(NvdCveInfo nvdCveInfo, File first, File second) {
public CallableDownloadTask(NvdCveInfo nvdCveInfo, File first, File second, ExecutorService processor, CveDB cveDB, DataStoreMetaInfo properties) {
this.nvdCveInfo = nvdCveInfo;
this.first = first;
this.second = second;
this.processorService = processor;
this.cveDB = cveDB;
this.properties = properties;
}
/**
* The DataStoreMeta information.
*/
private DataStoreMetaInfo properties;
/**
* The CVE DB to use when processing the files.
*/
private CveDB cveDB;
/**
* The processor service to pass the results of the download to.
*/
private ExecutorService processorService;
/**
* The NVD CVE Meta Data.
*/
@@ -135,7 +155,7 @@ public class CallableDownloadTask implements Callable<CallableDownloadTask> {
}
@Override
public CallableDownloadTask call() throws Exception {
public Future<ProcessTask> call() throws Exception {
try {
final URL url1 = new URL(nvdCveInfo.getUrl());
final URL url2 = new URL(nvdCveInfo.getOldSchemaVersionUrl());
@@ -145,10 +165,14 @@ public class CallableDownloadTask implements Callable<CallableDownloadTask> {
Downloader.fetchFile(url2, second);
msg = String.format("Download Complete for NVD CVE - %s", nvdCveInfo.getId());
Logger.getLogger(CallableDownloadTask.class.getName()).log(Level.INFO, msg);
final ProcessTask task = new ProcessTask(cveDB, properties, this);
return this.processorService.submit(task);
} catch (DownloadFailedException ex) {
this.exception = ex;
Logger.getLogger(CallableDownloadTask.class.getName()).log(Level.FINE, "Download Task Failed", ex);
}
return this;
return null;
}
/**

View File

@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
@@ -140,15 +141,13 @@ public class StandardUpdate {
openDataStores();
}
final int poolSize = (MAX_THREAD_POOL_SIZE > maxUpdates) ? MAX_THREAD_POOL_SIZE : maxUpdates;
final ExecutorService downloadExecutor = Executors.newFixedThreadPool(poolSize);
final int poolSize = (MAX_THREAD_POOL_SIZE < maxUpdates) ? MAX_THREAD_POOL_SIZE : maxUpdates;
final ExecutorService downloadExecutors = Executors.newFixedThreadPool(poolSize);
final ExecutorService processExecutor = Executors.newSingleThreadExecutor();
final Set<Future<CallableDownloadTask>> downloadFutures = new HashSet<Future<CallableDownloadTask>>(maxUpdates);
final Set<Future<ProcessTask>> processFutures = new HashSet<Future<ProcessTask>>(maxUpdates);
int ctr = 0;
final Set<Future<Future<ProcessTask>>> downloadFutures = new HashSet<Future<Future<ProcessTask>>>(maxUpdates);
for (NvdCveInfo cve : updateable) {
if (cve.getNeedsUpdate()) {
ctr += 1;
final File file1;
final File file2;
try {
@@ -157,56 +156,39 @@ public class StandardUpdate {
} catch (IOException ex) {
throw new UpdateException(ex);
}
final CallableDownloadTask call = new CallableDownloadTask(cve, file1, file2);
downloadFutures.add(downloadExecutor.submit(call));
boolean waitForFuture = ctr % 2 == 0;
final Iterator<Future<CallableDownloadTask>> itr = downloadFutures.iterator();
while (itr.hasNext()) {
final Future<CallableDownloadTask> future = itr.next();
if (waitForFuture) { //only allow two NVD/CVE files to be downloaded at a time
spinWaitForFuture(future);
}
if (future.isDone()) { //if we find something complete, add it to the process queue
try {
final CallableDownloadTask filePair = future.get();
itr.remove();
final ProcessTask task = new ProcessTask(cveDB, properties, filePair);
processFutures.add(processExecutor.submit(task));
} catch (InterruptedException ex) {
downloadExecutor.shutdownNow();
Logger.getLogger(StandardUpdate.class.getName()).log(Level.FINE, "Thread was interupted", ex);
throw new UpdateException(ex);
} catch (ExecutionException ex) {
downloadExecutor.shutdownNow();
Logger.getLogger(StandardUpdate.class.getName()).log(Level.SEVERE, null, ex);
throw new UpdateException(ex);
}
}
}
final CallableDownloadTask call = new CallableDownloadTask(cve, file1, file2, processExecutor, cveDB, properties);
downloadFutures.add(downloadExecutors.submit(call));
}
}
downloadExecutors.shutdown();
try {
final Iterator<Future<CallableDownloadTask>> itr = downloadFutures.iterator();
while (itr.hasNext()) {
final Future<CallableDownloadTask> future = itr.next();
final CallableDownloadTask filePair = future.get();
final ProcessTask task = new ProcessTask(cveDB, properties, filePair);
processFutures.add(processExecutor.submit(task));
//next, move the future future processTasks to just future processTasks
final Set<Future<ProcessTask>> processFutures = new HashSet<Future<ProcessTask>>(maxUpdates);
for (Future<Future<ProcessTask>> future : downloadFutures) {
Future<ProcessTask> task = null;
try {
task = future.get();
} catch (InterruptedException ex) {
downloadExecutors.shutdownNow();
processExecutor.shutdownNow();
Logger.getLogger(StandardUpdate.class.getName()).log(Level.FINE, "Thread was interupted during download", ex);
throw new UpdateException("The download was interupted", ex);
} catch (ExecutionException ex) {
downloadExecutors.shutdownNow();
processExecutor.shutdownNow();
Logger.getLogger(StandardUpdate.class.getName()).log(Level.FINE, "Thread was interupted during download execution", ex);
throw new UpdateException("The execution of the download was interupted", ex);
}
if (task == null) {
downloadExecutors.shutdownNow();
processExecutor.shutdownNow();
Logger.getLogger(StandardUpdate.class.getName()).log(Level.FINE, "Thread was interupted during download");
throw new UpdateException("The download was interupted; unable to complete the update");
} else {
processFutures.add(task);
}
} catch (InterruptedException ex) {
downloadExecutor.shutdownNow();
Logger.getLogger(StandardUpdate.class.getName()).log(Level.FINE, "Thread was interupted during download", ex);
throw new UpdateException(ex);
} catch (ExecutionException ex) {
downloadExecutor.shutdownNow();
Logger.getLogger(StandardUpdate.class.getName()).log(Level.FINE, "Execution Exception during download", ex);
throw new UpdateException(ex);
} finally {
downloadExecutor.shutdown();
}
for (Future<ProcessTask> future : processFutures) {
@@ -228,7 +210,7 @@ public class StandardUpdate {
}
}
if (maxUpdates >= 1) { //ensure the modified file date gets written
if (maxUpdates >= 1) { //ensure the modified file date gets written (we may not have actually updated it)
properties.save(updateable.get(MODIFIED));
cveDB.cleanupDatabase();
}
@@ -543,15 +525,4 @@ public class StandardUpdate {
final double differenceInDays = (compareTo - date) / 1000.0 / 60.0 / 60.0 / 24.0;
return differenceInDays < range;
}
private void spinWaitForFuture(final Future<CallableDownloadTask> future) {
//then wait for downloads to finish
while (!future.isDone()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(StandardUpdate.class.getName()).log(Level.FINE, null, ex);
}
}
}
}