mirror of
https://github.com/apple/pkl.git
synced 2026-03-22 17:19:13 +01:00
Polish external reader API/implementation (#759)
- keep implementation classes internal to their packages - make classes final if possible - make namespace classes non-instantiable - throw IllegalStateException instead of ExternalReaderProcessException for use after close - common convention already used by HttpClient etc. - programming errors should be signaled with unchecked exceptions - use private instead of public lock object - polish Javadoc - delete commented out code - don't use star import for a single class
This commit is contained in:
@@ -21,7 +21,7 @@ import java.util.regex.Pattern
|
||||
import kotlin.io.path.isRegularFile
|
||||
import org.pkl.core.*
|
||||
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings
|
||||
import org.pkl.core.externalreader.ExternalReaderProcessImpl
|
||||
import org.pkl.core.externalreader.ExternalReaderProcess
|
||||
import org.pkl.core.http.HttpClient
|
||||
import org.pkl.core.module.ModuleKeyFactories
|
||||
import org.pkl.core.module.ModuleKeyFactory
|
||||
@@ -185,12 +185,11 @@ abstract class CliCommand(protected val cliOptions: CliBaseOptions) {
|
||||
}
|
||||
|
||||
private val externalProcesses by lazy {
|
||||
// share ExternalProcessImpl instances between configured external resource/module readers with
|
||||
// the same spec
|
||||
// this avoids spawning multiple subprocesses if the same reader implements both reader types
|
||||
// and/or multiple schemes
|
||||
// Share ExternalReaderProcess instances between configured external resource/module readers
|
||||
// with the same spec. This avoids spawning multiple subprocesses if the same reader implements
|
||||
// both reader types and/or multiple schemes.
|
||||
(externalModuleReaders + externalResourceReaders).values.toSet().associateWith {
|
||||
ExternalReaderProcessImpl(it)
|
||||
ExternalReaderProcess.of(it)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ import java.util.regex.Pattern;
|
||||
import org.pkl.core.SecurityManagers.StandardBuilder;
|
||||
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader;
|
||||
import org.pkl.core.externalreader.ExternalReaderProcess;
|
||||
import org.pkl.core.externalreader.ExternalReaderProcessImpl;
|
||||
import org.pkl.core.http.HttpClient;
|
||||
import org.pkl.core.module.ModuleKeyFactories;
|
||||
import org.pkl.core.module.ModuleKeyFactory;
|
||||
@@ -482,14 +481,14 @@ public final class EvaluatorBuilder {
|
||||
setModuleCacheDir(settings.moduleCacheDir());
|
||||
}
|
||||
|
||||
// this isn't ideal as project and non-project ExternalProcessImpl instances can be dupes
|
||||
// this isn't ideal as project and non-project ExternalReaderProcess instances can be dupes
|
||||
var procs = new HashMap<ExternalReader, ExternalReaderProcess>();
|
||||
if (settings.externalModuleReaders() != null) {
|
||||
for (var entry : settings.externalModuleReaders().entrySet()) {
|
||||
addModuleKeyFactory(
|
||||
ModuleKeyFactories.externalProcess(
|
||||
entry.getKey(),
|
||||
procs.computeIfAbsent(entry.getValue(), ExternalReaderProcessImpl::new)));
|
||||
procs.computeIfAbsent(entry.getValue(), ExternalReaderProcess::of)));
|
||||
}
|
||||
}
|
||||
if (settings.externalResourceReaders() != null) {
|
||||
@@ -497,7 +496,7 @@ public final class EvaluatorBuilder {
|
||||
addResourceReader(
|
||||
ResourceReaders.externalProcess(
|
||||
entry.getKey(),
|
||||
procs.computeIfAbsent(entry.getValue(), ExternalReaderProcessImpl::new)));
|
||||
procs.computeIfAbsent(entry.getValue(), ExternalReaderProcess::of)));
|
||||
}
|
||||
}
|
||||
return this;
|
||||
|
||||
@@ -28,7 +28,7 @@ import org.pkl.core.messaging.Message;
|
||||
import org.pkl.core.messaging.Message.Type;
|
||||
import org.pkl.core.util.Nullable;
|
||||
|
||||
public class ExternalReaderMessagePackDecoder extends BaseMessagePackDecoder {
|
||||
final class ExternalReaderMessagePackDecoder extends BaseMessagePackDecoder {
|
||||
|
||||
public ExternalReaderMessagePackDecoder(MessageUnpacker unpacker) {
|
||||
super(unpacker);
|
||||
|
||||
@@ -25,7 +25,7 @@ import org.pkl.core.messaging.Message;
|
||||
import org.pkl.core.messaging.ProtocolException;
|
||||
import org.pkl.core.util.Nullable;
|
||||
|
||||
public class ExternalReaderMessagePackEncoder extends BaseMessagePackEncoder {
|
||||
final class ExternalReaderMessagePackEncoder extends BaseMessagePackEncoder {
|
||||
|
||||
public ExternalReaderMessagePackEncoder(MessagePacker packer) {
|
||||
super(packer);
|
||||
|
||||
@@ -20,7 +20,8 @@ import org.pkl.core.messaging.Messages.ModuleReaderSpec;
|
||||
import org.pkl.core.messaging.Messages.ResourceReaderSpec;
|
||||
import org.pkl.core.util.Nullable;
|
||||
|
||||
public class ExternalReaderMessages {
|
||||
final class ExternalReaderMessages {
|
||||
private ExternalReaderMessages() {}
|
||||
|
||||
public record InitializeModuleReaderRequest(long requestId, String scheme)
|
||||
implements Server.Request {
|
||||
|
||||
@@ -16,38 +16,57 @@
|
||||
package org.pkl.core.externalreader;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader;
|
||||
import org.pkl.core.messaging.MessageTransport;
|
||||
import org.pkl.core.messaging.Messages.ModuleReaderSpec;
|
||||
import org.pkl.core.messaging.Messages.ResourceReaderSpec;
|
||||
import org.pkl.core.util.Nullable;
|
||||
|
||||
/** An interface for interacting with external module/resource processes. */
|
||||
/** An external process that reads Pkl modules and resources. */
|
||||
public interface ExternalReaderProcess extends AutoCloseable {
|
||||
/**
|
||||
* Creates a new {@link ExternalReaderProcess} from the given spec. No resources are allocated at
|
||||
* this time.
|
||||
*/
|
||||
static ExternalReaderProcess of(ExternalReader spec) {
|
||||
return new ExternalReaderProcessImpl(spec);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the process's underlying {@link MessageTransport} for sending reader-specific message
|
||||
* Returns a message transport for communicating with this process.
|
||||
*
|
||||
* <p>May allocate resources upon first call, including spawning a child process. Must not be
|
||||
* called after {@link ExternalReaderProcess#close} has been called.
|
||||
* <p>Upon first call, this method may allocate resources, including spawning a child process.
|
||||
*
|
||||
* @throws IllegalStateException if this process has already been closed
|
||||
*/
|
||||
MessageTransport getTransport() throws ExternalReaderProcessException;
|
||||
|
||||
/** Retrieve the spec, if available, of the process's module reader with the given scheme. */
|
||||
/**
|
||||
* Returns the spec, if available, of this process's module reader with the given scheme.
|
||||
*
|
||||
* @throws IllegalStateException if this process has already been {@linkplain #close closed}
|
||||
* @throws IOException if an I/O error occurs
|
||||
*/
|
||||
@Nullable
|
||||
ModuleReaderSpec getModuleReaderSpec(String scheme) throws IOException;
|
||||
|
||||
/** Retrieve the spec, if available, of the process's resource reader with the given scheme. */
|
||||
/**
|
||||
* Returns the spec, if available, of this process's resource reader with the given scheme.
|
||||
*
|
||||
* @throws IllegalStateException if this process has already been {@linkplain #close closed}
|
||||
* @throws IOException if an I/O error occurs
|
||||
*/
|
||||
@Nullable
|
||||
ResourceReaderSpec getResourceReaderSpec(String scheme) throws IOException;
|
||||
|
||||
/**
|
||||
* Close the external process, cleaning up any resources.
|
||||
* Closes this process, releasing any associated resources.
|
||||
*
|
||||
* <p>The {@link MessageTransport} is sent the {@link ExternalReaderMessages.CloseExternalProcess}
|
||||
* message to request a graceful stop. A bespoke (empty) message type is used here instead of an
|
||||
* OS mechanism like signals to avoid forcing external reader implementers needing to handle many
|
||||
* OS-specific mechanisms. Implementations may then forcibly clean up resources after a timeout.
|
||||
* Must be safe to call multiple times.
|
||||
* <p>This method can be safely called multiple times. Subsequent calls have no effect.
|
||||
*
|
||||
* @implNote Implementers should request a graceful termination by sending a {@link
|
||||
* ExternalReaderMessages.CloseExternalProcess CloseExternalProcess} message to the process
|
||||
* before terminating it forcibly.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
@@ -38,7 +38,7 @@ import org.pkl.core.messaging.ProtocolException;
|
||||
import org.pkl.core.util.LateInit;
|
||||
import org.pkl.core.util.Nullable;
|
||||
|
||||
public class ExternalReaderProcessImpl implements ExternalReaderProcess {
|
||||
final class ExternalReaderProcessImpl implements ExternalReaderProcess {
|
||||
|
||||
private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(3);
|
||||
|
||||
@@ -49,14 +49,15 @@ public class ExternalReaderProcessImpl implements ExternalReaderProcess {
|
||||
private final Map<String, Future<@Nullable ResourceReaderSpec>>
|
||||
initializeResourceReaderResponses = new ConcurrentHashMap<>();
|
||||
|
||||
private @GuardedBy("this") boolean closed = false;
|
||||
private final Object lock = new Object();
|
||||
private @GuardedBy("lock") boolean closed = false;
|
||||
|
||||
@LateInit
|
||||
@GuardedBy("this")
|
||||
@GuardedBy("lock")
|
||||
private Process process;
|
||||
|
||||
@LateInit
|
||||
@GuardedBy("this")
|
||||
@GuardedBy("lock")
|
||||
private MessageTransport transport;
|
||||
|
||||
private void log(String msg) {
|
||||
@@ -65,7 +66,7 @@ public class ExternalReaderProcessImpl implements ExternalReaderProcess {
|
||||
}
|
||||
}
|
||||
|
||||
public ExternalReaderProcessImpl(ExternalReader spec) {
|
||||
ExternalReaderProcessImpl(ExternalReader spec) {
|
||||
this.spec = spec;
|
||||
logPrefix =
|
||||
Objects.equals(System.getenv("PKL_DEBUG"), "1")
|
||||
@@ -74,16 +75,19 @@ public class ExternalReaderProcessImpl implements ExternalReaderProcess {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized MessageTransport getTransport() throws ExternalReaderProcessException {
|
||||
if (closed) {
|
||||
throw new ExternalReaderProcessException("ExternalProcessImpl has already been closed");
|
||||
}
|
||||
if (process != null) {
|
||||
if (!process.isAlive()) {
|
||||
throw new ExternalReaderProcessException("ExternalProcessImpl process is no longer alive");
|
||||
public MessageTransport getTransport() throws ExternalReaderProcessException {
|
||||
synchronized (lock) {
|
||||
if (closed) {
|
||||
throw new IllegalStateException("External reader process has already been closed.");
|
||||
}
|
||||
if (process != null) {
|
||||
if (!process.isAlive()) {
|
||||
throw new ExternalReaderProcessException(
|
||||
"External reader process has already terminated.");
|
||||
}
|
||||
|
||||
return transport;
|
||||
return transport;
|
||||
}
|
||||
}
|
||||
|
||||
// This relies on Java/OS behavior around PATH resolution, absolute/relative paths, etc.
|
||||
@@ -104,7 +108,7 @@ public class ExternalReaderProcessImpl implements ExternalReaderProcess {
|
||||
new ExternalReaderMessagePackEncoder(process.getOutputStream()),
|
||||
this::log);
|
||||
|
||||
var rxThread = new Thread(this::runTransport, "ExternalProcessImpl rxThread for " + spec);
|
||||
var rxThread = new Thread(this::runTransport, "ExternalReaderProcessImpl rxThread for " + spec);
|
||||
rxThread.setDaemon(true);
|
||||
rxThread.start();
|
||||
|
||||
@@ -131,41 +135,43 @@ public class ExternalReaderProcessImpl implements ExternalReaderProcess {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
closed = true;
|
||||
if (process == null || !process.isAlive()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (transport != null) {
|
||||
transport.send(new CloseExternalProcess());
|
||||
transport.close();
|
||||
public void close() {
|
||||
synchronized (lock) {
|
||||
closed = true;
|
||||
if (process == null || !process.isAlive()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// forcefully stop the process after the timeout
|
||||
// note that both transport.close() and process.destroy() are safe to call multiple times
|
||||
new Timer()
|
||||
.schedule(
|
||||
new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (process != null) {
|
||||
transport.close();
|
||||
process.destroyForcibly();
|
||||
}
|
||||
}
|
||||
},
|
||||
CLOSE_TIMEOUT.inWholeMillis());
|
||||
try {
|
||||
if (transport != null) {
|
||||
transport.send(new CloseExternalProcess());
|
||||
transport.close();
|
||||
}
|
||||
|
||||
// block on process exit
|
||||
process.onExit().get();
|
||||
} catch (Exception e) {
|
||||
transport.close();
|
||||
process.destroyForcibly();
|
||||
} finally {
|
||||
process = null;
|
||||
transport = null;
|
||||
// forcefully stop the process after the timeout
|
||||
// note that both transport.close() and process.destroy() are safe to call multiple times
|
||||
new Timer()
|
||||
.schedule(
|
||||
new TimerTask() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (process != null) {
|
||||
transport.close();
|
||||
process.destroyForcibly();
|
||||
}
|
||||
}
|
||||
},
|
||||
CLOSE_TIMEOUT.inWholeMillis());
|
||||
|
||||
// block on process exit
|
||||
process.onExit().get();
|
||||
} catch (Exception e) {
|
||||
transport.close();
|
||||
process.destroyForcibly();
|
||||
} finally {
|
||||
process = null;
|
||||
transport = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,8 @@ import org.pkl.core.util.ErrorMessages;
|
||||
import org.pkl.core.util.Pair;
|
||||
|
||||
/** Factory methods for creating [MessageTransport]s. */
|
||||
public class MessageTransports {
|
||||
public final class MessageTransports {
|
||||
private MessageTransports() {}
|
||||
|
||||
public interface Logger {
|
||||
void log(String msg);
|
||||
|
||||
@@ -23,7 +23,8 @@ import org.pkl.core.messaging.Message.*;
|
||||
import org.pkl.core.module.PathElement;
|
||||
import org.pkl.core.util.Nullable;
|
||||
|
||||
public class Messages {
|
||||
public final class Messages {
|
||||
private Messages() {}
|
||||
|
||||
public record ModuleReaderSpec(
|
||||
String scheme, boolean hasHierarchicalUris, boolean isLocal, boolean isGlobbable) {}
|
||||
@@ -78,12 +79,6 @@ public class Messages {
|
||||
long requestId, long evaluatorId, byte @Nullable [] contents, @Nullable String error)
|
||||
implements Client.Response {
|
||||
|
||||
// workaround for kotlin bridging issue where `byte @Nullable [] contents` isn't detected as
|
||||
// nullable
|
||||
// public ReadResourceResponse(long requestId, long evaluatorId, @Nullable String error) {
|
||||
// this(requestId, evaluatorId, null, error);
|
||||
// }
|
||||
|
||||
public Type type() {
|
||||
return Type.READ_RESOURCE_RESPONSE;
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ import java.util.Map;
|
||||
import org.pkl.core.SecurityManager;
|
||||
import org.pkl.core.SecurityManagerException;
|
||||
import org.pkl.core.externalreader.ExternalReaderProcessException;
|
||||
import org.pkl.core.messaging.Messages.*;
|
||||
import org.pkl.core.messaging.Messages.ModuleReaderSpec;
|
||||
import org.pkl.core.packages.Dependency;
|
||||
import org.pkl.core.packages.Dependency.LocalDependency;
|
||||
import org.pkl.core.packages.PackageAssetUri;
|
||||
@@ -131,7 +131,7 @@ public final class ModuleKeys {
|
||||
|
||||
/** Creates a module key for an externally read module. */
|
||||
public static ModuleKey externalResolver(
|
||||
URI uri, ModuleReaderSpec spec, ExternalModuleResolver resolver) throws URISyntaxException {
|
||||
URI uri, ModuleReaderSpec spec, ExternalModuleResolver resolver) {
|
||||
return new ExternalResolver(uri, spec, resolver);
|
||||
}
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ public class ExternalResourceResolver {
|
||||
return Optional.of(new Resource(uri, result));
|
||||
}
|
||||
|
||||
public boolean hasElement(org.pkl.core.SecurityManager securityManager, URI elementUri)
|
||||
public boolean hasElement(SecurityManager securityManager, URI elementUri)
|
||||
throws SecurityManagerException {
|
||||
securityManager.checkResolveResource(elementUri);
|
||||
try {
|
||||
|
||||
@@ -31,7 +31,7 @@ import org.pkl.core.SecurityManager;
|
||||
import org.pkl.core.SecurityManagerException;
|
||||
import org.pkl.core.externalreader.ExternalReaderProcess;
|
||||
import org.pkl.core.externalreader.ExternalReaderProcessException;
|
||||
import org.pkl.core.messaging.Messages.*;
|
||||
import org.pkl.core.messaging.Messages.ResourceReaderSpec;
|
||||
import org.pkl.core.module.FileResolver;
|
||||
import org.pkl.core.module.ModulePathResolver;
|
||||
import org.pkl.core.module.PathElement;
|
||||
|
||||
@@ -15,7 +15,9 @@
|
||||
*/
|
||||
package org.pkl.core.util;
|
||||
|
||||
public class Readers {
|
||||
public final class Readers {
|
||||
private Readers() {}
|
||||
|
||||
/** Closes the given readers, ignoring any exceptions. */
|
||||
public static void closeQuietly(Iterable<? extends AutoCloseable> readers) {
|
||||
for (var reader : readers) {
|
||||
|
||||
@@ -25,7 +25,6 @@ import kotlin.random.Random
|
||||
import org.pkl.core.*
|
||||
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader
|
||||
import org.pkl.core.externalreader.ExternalReaderProcess
|
||||
import org.pkl.core.externalreader.ExternalReaderProcessImpl
|
||||
import org.pkl.core.http.HttpClient
|
||||
import org.pkl.core.messaging.MessageTransport
|
||||
import org.pkl.core.messaging.MessageTransports
|
||||
@@ -286,5 +285,5 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
|
||||
private fun getExternalProcess(evaluatorId: Long, spec: ExternalReader): ExternalReaderProcess =
|
||||
externalReaderProcesses
|
||||
.computeIfAbsent(evaluatorId) { ConcurrentHashMap() }
|
||||
.computeIfAbsent(spec) { ExternalReaderProcessImpl(it) }
|
||||
.computeIfAbsent(spec) { ExternalReaderProcess.of(it) }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user