Rework the ThrustCurve motor loading process to be highly parallel. This significantly reduced loading times. In order to achive this, the implementation of ZipDirectoryIterator.getNext() had to change to not close the iterator when it runs out of things.

This commit is contained in:
Kevin Ruland 2012-05-08 17:57:46 +00:00
parent 8c34d8488a
commit 51e2dfe6c7
5 changed files with 307 additions and 103 deletions

View File

@ -150,6 +150,9 @@ public class Database<T extends Comparable<T>> extends AbstractSet<T> {
log.warn("Error loading file " + file + ": " + e.getMessage(), e);
}
}
if ( files != null ) {
files.close();
}
}
/**

View File

@ -96,8 +96,6 @@ public class ZipDirectoryIterator extends FileIterator {
}
}
// No more elements exist
close();
return null;
}

View File

@ -19,13 +19,13 @@ import net.sf.openrocket.startup.Application;
import net.sf.openrocket.util.Pair;
public final class MotorLoaderHelper {
private static final LogHelper log = Application.getLogger();
private MotorLoaderHelper() {
// Prevent construction
}
/**
* Load a file or directory of thrust curves. Directories are loaded
* recursively. Any errors during loading are logged, but otherwise ignored.
@ -35,18 +35,18 @@ public final class MotorLoaderHelper {
*/
public static List<Motor> load(File target) {
GeneralMotorLoader loader = new GeneralMotorLoader();
if (target.isDirectory()) {
try {
return load(new DirectoryIterator(target, new SimpleFileFilter("", loader.getSupportedExtensions()), true));
} catch (IOException e) {
log.warn("Could not read directory " + target, e);
return Collections.emptyList();
}
} else {
InputStream is = null;
try {
is = new FileInputStream(target);
@ -63,11 +63,24 @@ public final class MotorLoaderHelper {
}
}
}
}
}
public static List<Motor> load( InputStream is, String fileName ) {
GeneralMotorLoader loader = new GeneralMotorLoader();
try {
List<Motor> motors = loader.load(is, fileName);
if (motors.size() == 0) {
log.warn("No motors found in file " + fileName);
}
return motors;
} catch (IOException e) {
log.warn("IOException when loading motor file " + fileName, e);
}
return Collections.<Motor>emptyList();
}
/**
* Load motors from files iterated over by a FileIterator. Any errors during
* loading are logged, but otherwise ignored.
@ -78,22 +91,16 @@ public final class MotorLoaderHelper {
* @return a list of all motors loaded.
*/
public static List<Motor> load(FileIterator iterator) {
GeneralMotorLoader loader = new GeneralMotorLoader();
List<Motor> list = new ArrayList<Motor>();
while (iterator.hasNext()) {
final Pair<String, InputStream> input = iterator.next();
log.debug("Loading motors from file " + input.getU());
try {
List<Motor> motors = loader.load(input.getV(), input.getU());
if (motors.size() == 0) {
log.warn("No motors found in file " + input.getU());
}
List<Motor> motors = load(input.getV(), input.getU());
for (Motor m : motors) {
list.add((ThrustCurveMotor) m);
}
} catch (IOException e) {
log.warn("IOException when loading motor file " + input.getU(), e);
} finally {
try {
input.getV().close();
@ -103,8 +110,8 @@ public final class MotorLoaderHelper {
}
}
iterator.close();
return list;
}
}

View File

@ -0,0 +1,268 @@
package net.sf.openrocket.startup;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.sf.openrocket.database.ThrustCurveMotorSet;
import net.sf.openrocket.database.ThrustCurveMotorSetDatabase;
import net.sf.openrocket.file.iterator.DirectoryIterator;
import net.sf.openrocket.file.iterator.FileIterator;
import net.sf.openrocket.file.motor.GeneralMotorLoader;
import net.sf.openrocket.file.motor.MotorLoaderHelper;
import net.sf.openrocket.gui.util.SimpleFileFilter;
import net.sf.openrocket.gui.util.SwingPreferences;
import net.sf.openrocket.logging.LogHelper;
import net.sf.openrocket.motor.Motor;
import net.sf.openrocket.motor.ThrustCurveMotor;
import net.sf.openrocket.util.BugException;
import net.sf.openrocket.util.Pair;
/**
* Load motors in parallel using a three stage pipeline.
*
* Stage 1: single thread managed by the ThrustCurveMotorSetDatabase. This thread generates
* one object for each thrust curve motor file and puts it in the second stage.
*
* Stage 2: multiple threads which process individual files. Each process takes
* a single motor file and parses out the list of motors it contains.
* The list of motors is queued up for the third stage to process.
*
* Stage 3: single thread which processes the list of motors generated in stage 2.
* This thread puts all the motors from the list in the motor set database.
*
* It is important that stage 3 be done with a single thread because ThrustCurveMotorSetDatabase
* is not thread safe. Even if synchronization were to be done, it is unlikely that parallelizing
* this process would improve anything.
*
*
*/
public class ConcurrentLoadingThrustCurveMotorSetDatabase extends ThrustCurveMotorSetDatabase {
private static final LogHelper log = Application.getLogger();
private final String thrustCurveDirectory;
public ConcurrentLoadingThrustCurveMotorSetDatabase(String thrustCurveDirectory) {
// configure ThrustCurveMotorSetDatabase as true so we get our own thread in
// loadMotors.
super(true);
this.thrustCurveDirectory = thrustCurveDirectory;
}
@Override
protected void loadMotors() {
BookKeeping keeper = new BookKeeping();
keeper.start();
try {
keeper.waitForFinish();
}
catch ( InterruptedException iex ) {
throw new BugException(iex);
}
keeper = null;
}
private void addAll( List<Motor> motors ) {
for (Motor m : motors) {
addMotor( (ThrustCurveMotor) m);
}
}
/**
* A class which holds all the threading data.
* Implemented as an inner class so we can easily jettison the references when
* the processing is terminated.
*
*/
private class BookKeeping {
/*
* Executor for Stage 3.
*/
private final ExecutorService writerThread;
/*
* Executor for Stage 2.
*/
private final ExecutorService loaderPool;
/*
* Runnable used for Stage 1.
*/
private final WorkGenerator workGenerator;
private long startTime;
/*
* Number of thrust curves loaded
*/
private int thrustCurveCount = 0;
/*
* Number of files processed.
*/
private int fileCount = 0;
/*
* We have to hold on to the zip file iterator which is used to load
* the system motor files until all processing is done. This is because
* closing the iterator prematurely causes all the InputStreams opened
* with it to close.
*/
private FileIterator iterator;
private BookKeeping() {
writerThread = Executors.newSingleThreadExecutor();
loaderPool = Executors.newFixedThreadPool(25);
workGenerator = new WorkGenerator();
}
private void start() {
startTime = System.currentTimeMillis();
log.info("Starting motor loading from " + thrustCurveDirectory + " in background thread.");
// Run the work generator - in this thread.
workGenerator.run();
}
private void waitForFinish() throws InterruptedException {
try {
loaderPool.shutdown();
loaderPool.awaitTermination(10, TimeUnit.SECONDS);
writerThread.shutdown();
writerThread.awaitTermination(10, TimeUnit.SECONDS);
}
finally {
iterator.close();
}
long endTime = System.currentTimeMillis();
int distinctMotorCount = 0;
int distinctThrustCurveCount = 0;
distinctMotorCount = motorSets.size();
for (ThrustCurveMotorSet set : motorSets) {
distinctThrustCurveCount += set.getMotorCount();
}
log.info("Motor loading done, took " + (endTime - startTime) + " ms to load "
+ fileCount + " files/directories containing "
+ thrustCurveCount + " thrust curves which contained "
+ distinctMotorCount + " distinct motors with "
+ distinctThrustCurveCount + " distinct thrust curves.");
}
private class WorkGenerator implements Runnable {
@Override
public void run() {
// Start loading
log.info("Loading motors from " + thrustCurveDirectory);
iterator = DirectoryIterator.findDirectory(thrustCurveDirectory,
new SimpleFileFilter("", false, "eng", "rse"));
// Load the packaged thrust curves
if (iterator == null) {
throw new IllegalStateException("Thrust curve directory " + thrustCurveDirectory +
"not found, distribution built wrong");
}
while( iterator.hasNext() ) {
Pair<String,InputStream> f = iterator.next();
MotorLoader loader = new MotorLoader( f.getV(), f.getU() );
loaderPool.execute(loader);
fileCount ++;
}
// Load the user-defined thrust curves
for (File file : ((SwingPreferences) Application.getPreferences()).getUserThrustCurveFiles()) {
log.info("Loading motors from " + file);
MotorLoader loader = new MotorLoader( file );
loaderPool.execute(loader);
fileCount++;
}
}
}
private class MotorLoader implements Runnable {
private final InputStream is;
private final String fileName;
private final File file;
public MotorLoader( File file ) {
super();
this.file = file;
this.is = null;
this.fileName = null;
}
public MotorLoader(InputStream is, String fileName) {
super();
this.file = null;
this.is = is;
this.fileName = fileName;
}
@Override
public void run() {
log.debug("Loading motor from " + fileName);
try {
List<Motor> motors;
if ( file == null ) {
motors = MotorLoaderHelper.load(is, fileName);
} else {
motors = MotorLoaderHelper.load(file);
}
writerThread.submit( new MotorInserter(motors));
}
finally {
if ( is != null ) {
try {
is.close();
} catch ( IOException iex ) {
}
}
}
}
}
private class MotorInserter implements Runnable {
private final List<Motor> motors;
MotorInserter( List<Motor> motors ) {
this.motors = motors;
}
@Override
public void run() {
thrustCurveCount += motors.size();
ConcurrentLoadingThrustCurveMotorSetDatabase.this.addAll(motors);
}
}
}
}

View File

@ -49,11 +49,6 @@ public class Startup2 {
private static final String THRUSTCURVE_DIRECTORY = "datafiles/thrustcurves/";
/** Block motor loading for this many milliseconds */
private static AtomicInteger blockLoading = new AtomicInteger(Integer.MAX_VALUE);
/**
* Run when starting up OpenRocket after Application has been set up.
*
@ -98,7 +93,7 @@ public class Startup2 {
// Latch which counts the number of background loading processes we need to complete.
CountDownLatch loading = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(2, new ThreadFactory() {
ExecutorService exec = Executors.newFixedThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -144,7 +139,11 @@ public class Startup2 {
// Load motors etc.
log.info("Loading databases");
loadMotor();
ConcurrentLoadingThrustCurveMotorSetDatabase motorLoader = new ConcurrentLoadingThrustCurveMotorSetDatabase(THRUSTCURVE_DIRECTORY);
motorLoader.startLoading();
Application.setMotorSetDatabase(motorLoader);
Databases.fakeMethod();
try {
@ -163,8 +162,6 @@ public class Startup2 {
log.info("Checking update status");
checkUpdateStatus(updateInfo);
// Block motor loading for 1.5 seconds to allow window painting to be faster
blockLoading.set(1500);
}
@ -185,78 +182,6 @@ public class Startup2 {
}
private static void loadMotor() {
log.info("Starting motor loading from " + THRUSTCURVE_DIRECTORY + " in background thread.");
ThrustCurveMotorSetDatabase db = new ThrustCurveMotorSetDatabase(true) {
@Override
protected void loadMotors() {
// Block loading until timeout occurs or database is taken into use
log.info("Blocking motor loading while starting up");
while (!inUse && blockLoading.addAndGet(-100) > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
log.info("Blocking ended, inUse=" + inUse + " blockLoading=" + blockLoading.get());
// Start loading
log.info("Loading motors from " + THRUSTCURVE_DIRECTORY);
long t0 = System.currentTimeMillis();
int fileCount;
int thrustCurveCount;
// Load the packaged thrust curves
List<Motor> list;
FileIterator iterator = DirectoryIterator.findDirectory(THRUSTCURVE_DIRECTORY,
new SimpleFileFilter("", false, "eng", "rse"));
if (iterator == null) {
throw new IllegalStateException("Thrust curve directory " + THRUSTCURVE_DIRECTORY +
"not found, distribution built wrong");
}
list = MotorLoaderHelper.load(iterator);
for (Motor m : list) {
this.addMotor((ThrustCurveMotor) m);
}
fileCount = iterator.getFileCount();
thrustCurveCount = list.size();
// Load the user-defined thrust curves
for (File file : ((SwingPreferences) Application.getPreferences()).getUserThrustCurveFiles()) {
log.info("Loading motors from " + file);
list = MotorLoaderHelper.load(file);
for (Motor m : list) {
this.addMotor((ThrustCurveMotor) m);
}
fileCount++;
thrustCurveCount += list.size();
}
long t1 = System.currentTimeMillis();
// Count statistics
int distinctMotorCount = 0;
int distinctThrustCurveCount = 0;
distinctMotorCount = motorSets.size();
for (ThrustCurveMotorSet set : motorSets) {
distinctThrustCurveCount += set.getMotorCount();
}
log.info("Motor loading done, took " + (t1 - t0) + " ms to load "
+ fileCount + " files/directories containing "
+ thrustCurveCount + " thrust curves which contained "
+ distinctMotorCount + " distinct motors with "
+ distinctThrustCurveCount + " distinct thrust curves.");
}
};
db.startLoading();
Application.setMotorSetDatabase(db);
}
private static void checkUpdateStatus(final UpdateInfoRetriever updateInfo) {
if (updateInfo == null)
return;
@ -313,8 +238,11 @@ public class Startup2 {
@Override
public Object call() throws Exception {
long start = System.currentTimeMillis();
componentPresetDao.load("datafiles/presets", "(?i).*orc");
latch.countDown();
long end = System.currentTimeMillis();
log.debug("Time to load presets: " + (end-start) + "ms");
return null;
}