NvdCveUpdater: Refactor thread pool concept

- Make thread pools members of the class to facilitate reuse
- Increase default max download thread pool size from 3 to 50 (should be fine for mostly blocking tasks like downloading)
This commit is contained in:
Stefan Neuhaus
2017-02-11 20:16:24 +01:00
parent a0198e34e7
commit cd4f09dc86
4 changed files with 38 additions and 25 deletions

View File

@@ -54,9 +54,21 @@ public class NvdCveUpdater extends BaseUpdater implements CachedWebDataSource {
*/ */
private static final Logger LOGGER = LoggerFactory.getLogger(NvdCveUpdater.class); private static final Logger LOGGER = LoggerFactory.getLogger(NvdCveUpdater.class);
/** /**
* The max thread pool size to use when downloading files. * The thread pool size to use for CPU-intense tasks.
*/ */
public static final int MAX_THREAD_POOL_SIZE = Settings.getInt(Settings.KEYS.MAX_DOWNLOAD_THREAD_POOL_SIZE, 3); private static final int PROCESSING_THREAD_POOL_SIZE = 1;
/**
* The thread pool size to use when downloading files.
*/
private static final int DOWNLOAD_THREAD_POOL_SIZE = Settings.getInt(Settings.KEYS.MAX_DOWNLOAD_THREAD_POOL_SIZE, 50);
/**
* ExecutorService for CPU-intense processing tasks.
*/
private ExecutorService processingExecutorService = null;
/**
* ExecutorService for tasks that involve blocking activities and are not very CPU-intense, e.g. downloading files.
*/
private ExecutorService downloadExecutorService = null;
/** /**
* Downloads the latest NVD CVE XML file from the web and imports it into * Downloads the latest NVD CVE XML file from the web and imports it into
@@ -72,10 +84,11 @@ public class NvdCveUpdater extends BaseUpdater implements CachedWebDataSource {
return; return;
} }
} catch (InvalidSettingException ex) { } catch (InvalidSettingException ex) {
LOGGER.trace("inavlid setting UPDATE_NVDCVE_ENABLED", ex); LOGGER.trace("invalid setting UPDATE_NVDCVE_ENABLED", ex);
} }
try { try {
initializeExecutorServices();
openDataStores(); openDataStores();
boolean autoUpdate = true; boolean autoUpdate = true;
try { try {
@@ -101,10 +114,27 @@ public class NvdCveUpdater extends BaseUpdater implements CachedWebDataSource {
} }
throw new UpdateException("Unable to download the NVD CVE data.", ex); throw new UpdateException("Unable to download the NVD CVE data.", ex);
} finally { } finally {
shutdownExecutorServices();
closeDataStores(); closeDataStores();
} }
} }
private void initializeExecutorServices() {
processingExecutorService = Executors.newFixedThreadPool(PROCESSING_THREAD_POOL_SIZE);
downloadExecutorService = Executors.newFixedThreadPool(DOWNLOAD_THREAD_POOL_SIZE);
LOGGER.debug("#download threads: {}", DOWNLOAD_THREAD_POOL_SIZE);
LOGGER.debug("#processing threads: {}", PROCESSING_THREAD_POOL_SIZE);
}
private void shutdownExecutorServices() {
if (processingExecutorService != null) {
processingExecutorService.shutdownNow();
}
if (downloadExecutorService != null) {
downloadExecutorService.shutdownNow();
}
}
/** /**
* Checks if the NVD CVE XML files were last checked recently. As an * Checks if the NVD CVE XML files were last checked recently. As an
* optimization, we can avoid repetitive checks against the NVD. Setting * optimization, we can avoid repetitive checks against the NVD. Setting
@@ -178,18 +208,13 @@ public class NvdCveUpdater extends BaseUpdater implements CachedWebDataSource {
LOGGER.info("NVD CVE requires several updates; this could take a couple of minutes."); LOGGER.info("NVD CVE requires several updates; this could take a couple of minutes.");
} }
final int poolSize = (MAX_THREAD_POOL_SIZE < maxUpdates) ? MAX_THREAD_POOL_SIZE : maxUpdates;
final ExecutorService downloadExecutors = Executors.newFixedThreadPool(poolSize);
final ExecutorService processExecutor = Executors.newSingleThreadExecutor();
final Set<Future<Future<ProcessTask>>> downloadFutures = new HashSet<Future<Future<ProcessTask>>>(maxUpdates); final Set<Future<Future<ProcessTask>>> downloadFutures = new HashSet<Future<Future<ProcessTask>>>(maxUpdates);
for (NvdCveInfo cve : updateable) { for (NvdCveInfo cve : updateable) {
if (cve.getNeedsUpdate()) { if (cve.getNeedsUpdate()) {
final DownloadTask call = new DownloadTask(cve, processExecutor, getCveDB(), Settings.getInstance()); final DownloadTask call = new DownloadTask(cve, processingExecutorService, getCveDB(), Settings.getInstance());
downloadFutures.add(downloadExecutors.submit(call)); downloadFutures.add(downloadExecutorService.submit(call));
} }
} }
downloadExecutors.shutdown();
//next, move the future future processTasks to just future processTasks //next, move the future future processTasks to just future processTasks
final Set<Future<ProcessTask>> processFutures = new HashSet<Future<ProcessTask>>(maxUpdates); final Set<Future<ProcessTask>> processFutures = new HashSet<Future<ProcessTask>>(maxUpdates);
@@ -198,21 +223,13 @@ public class NvdCveUpdater extends BaseUpdater implements CachedWebDataSource {
try { try {
task = future.get(); task = future.get();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
downloadExecutors.shutdownNow();
processExecutor.shutdownNow();
LOGGER.debug("Thread was interrupted during download", ex); LOGGER.debug("Thread was interrupted during download", ex);
throw new UpdateException("The download was interrupted", ex); throw new UpdateException("The download was interrupted", ex);
} catch (ExecutionException ex) { } catch (ExecutionException ex) {
downloadExecutors.shutdownNow();
processExecutor.shutdownNow();
LOGGER.debug("Thread was interrupted during download execution", ex); LOGGER.debug("Thread was interrupted during download execution", ex);
throw new UpdateException("The execution of the download was interrupted", ex); throw new UpdateException("The execution of the download was interrupted", ex);
} }
if (task == null) { if (task == null) {
downloadExecutors.shutdownNow();
processExecutor.shutdownNow();
LOGGER.debug("Thread was interrupted during download"); LOGGER.debug("Thread was interrupted during download");
throw new UpdateException("The download was interrupted; unable to complete the update"); throw new UpdateException("The download was interrupted; unable to complete the update");
} else { } else {
@@ -227,15 +244,11 @@ public class NvdCveUpdater extends BaseUpdater implements CachedWebDataSource {
throw task.getException(); throw task.getException();
} }
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
processExecutor.shutdownNow();
LOGGER.debug("Thread was interrupted during processing", ex); LOGGER.debug("Thread was interrupted during processing", ex);
throw new UpdateException(ex); throw new UpdateException(ex);
} catch (ExecutionException ex) { } catch (ExecutionException ex) {
processExecutor.shutdownNow();
LOGGER.debug("Execution Exception during process", ex); LOGGER.debug("Execution Exception during process", ex);
throw new UpdateException(ex); throw new UpdateException(ex);
} finally {
processExecutor.shutdown();
} }
} }

View File

@@ -1,7 +1,7 @@
application.name=${pom.name} application.name=${pom.name}
application.version=${pom.version} application.version=${pom.version}
autoupdate=true autoupdate=true
max.download.threads=3 max.download.threads=50
# the url to obtain the current engine version from # the url to obtain the current engine version from
engine.version.url=https://jeremylong.github.io/DependencyCheck/current.txt engine.version.url=https://jeremylong.github.io/DependencyCheck/current.txt

View File

@@ -1,7 +1,7 @@
application.name=${pom.name} application.name=${pom.name}
application.version=${pom.version} application.version=${pom.version}
autoupdate=true autoupdate=true
max.download.threads=3 max.download.threads=50
# the url to obtain the current engine version from # the url to obtain the current engine version from
engine.version.url=https://jeremylong.github.io/DependencyCheck/current.txt engine.version.url=https://jeremylong.github.io/DependencyCheck/current.txt

View File

@@ -1,7 +1,7 @@
application.name=${pom.name} application.name=${pom.name}
application.version=${pom.version} application.version=${pom.version}
autoupdate=true autoupdate=true
max.download.threads=3 max.download.threads=50
# the url to obtain the current engine version from # the url to obtain the current engine version from
engine.version.url=http://jeremylong.github.io/DependencyCheck/current.txt engine.version.url=http://jeremylong.github.io/DependencyCheck/current.txt