Refine external reader API (#762)

* Encapsulate message transport by removing `ExternalReaderProcess.getTransport` and adding `getModuleResolver` and `getResourceResolver` methods
* Reuse `Random` instances within `ExternalReaderProcessImpl` and module/resource resolvers
* Externalize all `ExternalReaderProcessException` messages
* Add some missing doc comments to `ModuleKeyFactories` and `ResourceReaders` methods for external readers
* Move org.pkl.core.util.Readers to org.pkl.core.Readers
This commit is contained in:
Josh B
2024-10-31 16:51:25 -07:00
committed by GitHub
parent 66d751f093
commit e217cfcd6f
16 changed files with 89 additions and 31 deletions

View File

@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.core.util;
package org.pkl.core;
public final class Readers {
private Readers() {}

View File

@@ -17,9 +17,10 @@ package org.pkl.core.externalreader;
import java.io.IOException;
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader;
import org.pkl.core.messaging.MessageTransport;
import org.pkl.core.messaging.Messages.ModuleReaderSpec;
import org.pkl.core.messaging.Messages.ResourceReaderSpec;
import org.pkl.core.module.ExternalModuleResolver;
import org.pkl.core.resource.ExternalResourceResolver;
import org.pkl.core.util.Nullable;
/** An external process that reads Pkl modules and resources. */
@@ -33,13 +34,23 @@ public interface ExternalReaderProcess extends AutoCloseable {
}
/**
* Returns a message transport for communicating with this process.
* Returns a resolver for modules provided via this reader.
*
* <p>Upon first call, this method may allocate resources, including spawning a child process.
*
* @throws IllegalStateException if this process has already been closed
*/
MessageTransport getTransport() throws ExternalReaderProcessException;
ExternalModuleResolver getModuleResolver(long evaluatorId) throws ExternalReaderProcessException;
/**
* Returns a resolver for resources provided via this reader.
*
* <p>Upon first call, this method may allocate resources, including spawning a child process.
*
* @throws IllegalStateException if this process has already been closed
*/
ExternalResourceResolver getResourceResolver(long evaluatorId)
throws ExternalReaderProcessException;
/**
* Returns the spec, if available, of this process's module reader with the given scheme.

View File

@@ -35,6 +35,9 @@ import org.pkl.core.messaging.MessageTransports;
import org.pkl.core.messaging.Messages.ModuleReaderSpec;
import org.pkl.core.messaging.Messages.ResourceReaderSpec;
import org.pkl.core.messaging.ProtocolException;
import org.pkl.core.module.ExternalModuleResolver;
import org.pkl.core.resource.ExternalResourceResolver;
import org.pkl.core.util.ErrorMessages;
import org.pkl.core.util.LateInit;
import org.pkl.core.util.Nullable;
@@ -48,6 +51,7 @@ final class ExternalReaderProcessImpl implements ExternalReaderProcess {
new ConcurrentHashMap<>();
private final Map<String, Future<@Nullable ResourceReaderSpec>>
initializeResourceReaderResponses = new ConcurrentHashMap<>();
private final Random requestIdGenerator = new Random();
private final Object lock = new Object();
private @GuardedBy("lock") boolean closed = false;
@@ -75,7 +79,18 @@ final class ExternalReaderProcessImpl implements ExternalReaderProcess {
}
@Override
public MessageTransport getTransport() throws ExternalReaderProcessException {
public ExternalModuleResolver getModuleResolver(long evaluatorId)
throws ExternalReaderProcessException {
return new ExternalModuleResolver(getTransport(), evaluatorId);
}
@Override
public ExternalResourceResolver getResourceResolver(long evaluatorId)
throws ExternalReaderProcessException {
return new ExternalResourceResolver(getTransport(), evaluatorId);
}
private MessageTransport getTransport() throws ExternalReaderProcessException {
synchronized (lock) {
if (closed) {
throw new IllegalStateException("External reader process has already been closed.");
@@ -83,7 +98,7 @@ final class ExternalReaderProcessImpl implements ExternalReaderProcess {
if (process != null) {
if (!process.isAlive()) {
throw new ExternalReaderProcessException(
"External reader process has already terminated.");
ErrorMessages.create("externalReaderAlreadyTerminated"));
}
return transport;
@@ -182,7 +197,8 @@ final class ExternalReaderProcessImpl implements ExternalReaderProcess {
uriScheme,
(scheme) -> {
var future = new CompletableFuture<@Nullable ModuleReaderSpec>();
var request = new InitializeModuleReaderRequest(new Random().nextLong(), scheme);
var request =
new InitializeModuleReaderRequest(requestIdGenerator.nextLong(), scheme);
try {
getTransport()
.send(
@@ -209,7 +225,8 @@ final class ExternalReaderProcessImpl implements ExternalReaderProcess {
uriScheme,
(scheme) -> {
var future = new CompletableFuture<@Nullable ResourceReaderSpec>();
var request = new InitializeResourceReaderRequest(new Random().nextLong(), scheme);
var request =
new InitializeResourceReaderRequest(requestIdGenerator.nextLong(), scheme);
try {
getTransport()
.send(

View File

@@ -39,6 +39,7 @@ public class ExternalModuleResolver {
private final long evaluatorId;
private final Map<URI, Future<String>> readResponses = new ConcurrentHashMap<>();
private final Map<URI, Future<List<PathElement>>> listResponses = new ConcurrentHashMap<>();
private final Random requestIdGenerator = new Random();
public ExternalModuleResolver(MessageTransport transport, long evaluatorId) {
this.transport = transport;
@@ -74,7 +75,7 @@ public class ExternalModuleResolver {
moduleUri,
(uri) -> {
var future = new CompletableFuture<String>();
var request = new ReadModuleRequest(new Random().nextLong(), evaluatorId, uri);
var request = new ReadModuleRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
@@ -104,7 +105,7 @@ public class ExternalModuleResolver {
baseUri,
(uri) -> {
var future = new CompletableFuture<List<PathElement>>();
var request = new ListModulesRequest(new Random().nextLong(), evaluatorId, uri);
var request = new ListModulesRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,

View File

@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import javax.annotation.concurrent.GuardedBy;
import org.pkl.core.Readers;
import org.pkl.core.externalreader.ExternalReaderProcess;
import org.pkl.core.externalreader.ExternalReaderProcessException;
import org.pkl.core.util.ErrorMessages;
@@ -78,7 +79,7 @@ public final class ModuleKeyFactories {
}
/**
* Returns a factory for external reader module keys
* Returns a factory for external reader module keys.
*
* <p>NOTE: {@code process} needs to be {@link ExternalReaderProcess#close closed} to avoid
* resource leaks.
@@ -87,6 +88,12 @@ public final class ModuleKeyFactories {
return new ExternalProcess(scheme, process, 0);
}
/**
* Returns a factory for external reader module keys.
*
* <p>NOTE: {@code process} needs to be {@link ExternalReaderProcess#close closed} to avoid
* resource leaks.
*/
public static ModuleKeyFactory externalProcess(
String scheme, ExternalReaderProcess process, long evaluatorId) {
return new ExternalProcess(scheme, process, evaluatorId);
@@ -95,7 +102,7 @@ public final class ModuleKeyFactories {
/**
* Closes the given factories, ignoring any exceptions.
*
* @deprecated Replaced by {@link org.pkl.core.util.Readers#closeQuietly}.
* @deprecated Replaced by {@link Readers#closeQuietly}.
*/
@Deprecated(since = "0.27.0", forRemoval = true)
public static void closeQuietly(Iterable<ModuleKeyFactory> factories) {
@@ -272,7 +279,7 @@ public final class ModuleKeyFactories {
return resolver;
}
resolver = new ExternalModuleResolver(process.getTransport(), evaluatorId);
resolver = process.getModuleResolver(evaluatorId);
return resolver;
}

View File

@@ -38,6 +38,7 @@ public class ExternalResourceResolver {
private final long evaluatorId;
private final Map<URI, Future<byte[]>> readResponses = new ConcurrentHashMap<>();
private final Map<URI, Future<List<PathElement>>> listResponses = new ConcurrentHashMap<>();
private final Random requestIdGenerator = new Random();
public ExternalResourceResolver(MessageTransport transport, long evaluatorId) {
this.transport = transport;
@@ -72,7 +73,8 @@ public class ExternalResourceResolver {
baseUri,
(uri) -> {
var future = new CompletableFuture<List<PathElement>>();
var request = new ListResourcesRequest(new Random().nextLong(), evaluatorId, uri);
var request =
new ListResourcesRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
@@ -101,7 +103,8 @@ public class ExternalResourceResolver {
baseUri,
(uri) -> {
var future = new CompletableFuture<byte[]>();
var request = new ReadResourceRequest(new Random().nextLong(), evaluatorId, uri);
var request =
new ReadResourceRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,

View File

@@ -140,16 +140,28 @@ public final class ResourceReaders {
return FromServiceProviders.INSTANCE;
}
public static ResourceReader externalProcess(
String scheme, ExternalReaderProcess externalReaderProcess) {
return new ExternalProcess(scheme, externalReaderProcess, 0);
/**
* Returns a reader for external reader resources.
*
* <p>NOTE: {@code process} needs to be {@link ExternalReaderProcess#close closed} to avoid
* resource leaks.
*/
public static ResourceReader externalProcess(String scheme, ExternalReaderProcess process) {
return new ExternalProcess(scheme, process, 0);
}
/**
* Returns a reader for external reader resources.
*
* <p>NOTE: {@code process} needs to be {@link ExternalReaderProcess#close closed} to avoid
* resource leaks.
*/
public static ResourceReader externalProcess(
String scheme, ExternalReaderProcess externalReaderProcess, long evaluatorId) {
return new ExternalProcess(scheme, externalReaderProcess, evaluatorId);
String scheme, ExternalReaderProcess process, long evaluatorId) {
return new ExternalProcess(scheme, process, evaluatorId);
}
/** Returns a reader for external and client reader resources. */
public static ResourceReader externalResolver(
ResourceReaderSpec spec, ExternalResourceResolver resolver) {
return new ExternalResolver(spec, resolver);
@@ -637,9 +649,7 @@ public final class ResourceReaders {
throw new ExternalReaderProcessException(
ErrorMessages.create("externalReaderDoesNotSupportScheme", "resource", scheme));
}
underlying =
new ExternalResolver(
spec, new ExternalResourceResolver(process.getTransport(), evaluatorId));
underlying = new ExternalResolver(spec, process.getResourceResolver(evaluatorId));
return underlying;
}

View File

@@ -28,12 +28,12 @@ import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.pkl.core.*;
import org.pkl.core.Readers;
import org.pkl.core.http.HttpClient;
import org.pkl.core.module.ModuleKeyFactories;
import org.pkl.core.module.ModulePathResolver;
import org.pkl.core.project.Project;
import org.pkl.core.resource.ResourceReaders;
import org.pkl.core.util.Readers;
import org.pkl.executor.spi.v1.ExecutorSpi;
import org.pkl.executor.spi.v1.ExecutorSpiException;
import org.pkl.executor.spi.v1.ExecutorSpiOptions;

View File

@@ -1117,3 +1117,6 @@ Failed to communicate with external reader process.
externalReaderDoesNotSupportScheme=\
External {0} reader does not support scheme `{1}`.
externalReaderAlreadyTerminated=\
External reader process has already terminated.

View File

@@ -28,6 +28,8 @@ import org.pkl.core.messaging.MessageTransport
import org.pkl.core.messaging.MessageTransports
import org.pkl.core.messaging.Messages.*
import org.pkl.core.messaging.ProtocolException
import org.pkl.core.module.ExternalModuleResolver
import org.pkl.core.resource.ExternalResourceResolver
class TestExternalReaderProcess(private val transport: MessageTransport) : ExternalReaderProcess {
private val initializeModuleReaderResponses: MutableMap<String, Future<ModuleReaderSpec?>> =
@@ -40,7 +42,11 @@ class TestExternalReaderProcess(private val transport: MessageTransport) : Exter
transport.close()
}
override fun getTransport(): MessageTransport = transport
override fun getModuleResolver(evaluatorId: Long): ExternalModuleResolver =
ExternalModuleResolver(transport, evaluatorId)
override fun getResourceResolver(evaluatorId: Long): ExternalResourceResolver =
ExternalResourceResolver(transport, evaluatorId)
fun run() {
try {