Remove pkl.core.messaging from core APIs (#770)

This commit is contained in:
Josh B
2024-11-01 14:53:16 -07:00
committed by GitHub
parent fa25fb46fd
commit 9692504b5f
8 changed files with 288 additions and 220 deletions

View File

@@ -31,6 +31,8 @@ import org.pkl.core.Duration;
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader;
import org.pkl.core.externalreader.ExternalReaderMessages.*;
import org.pkl.core.messaging.MessageTransport;
import org.pkl.core.messaging.MessageTransportModuleResolver;
import org.pkl.core.messaging.MessageTransportResourceResolver;
import org.pkl.core.messaging.MessageTransports;
import org.pkl.core.messaging.ProtocolException;
import org.pkl.core.module.ExternalModuleResolver;
@@ -79,13 +81,13 @@ final class ExternalReaderProcessImpl implements ExternalReaderProcess {
@Override
public ExternalModuleResolver getModuleResolver(long evaluatorId)
throws ExternalReaderProcessException {
return new ExternalModuleResolver(getTransport(), evaluatorId);
return new MessageTransportModuleResolver(getTransport(), evaluatorId);
}
@Override
public ExternalResourceResolver getResourceResolver(long evaluatorId)
throws ExternalReaderProcessException {
return new ExternalResourceResolver(getTransport(), evaluatorId);
return new MessageTransportResourceResolver(getTransport(), evaluatorId);
}
private MessageTransport getTransport() throws ExternalReaderProcessException {

View File

@@ -0,0 +1,129 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.core.messaging;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.pkl.core.SecurityManager;
import org.pkl.core.SecurityManagerException;
import org.pkl.core.messaging.Messages.ListModulesRequest;
import org.pkl.core.messaging.Messages.ListModulesResponse;
import org.pkl.core.messaging.Messages.ReadModuleRequest;
import org.pkl.core.messaging.Messages.ReadModuleResponse;
import org.pkl.core.module.ExternalModuleResolver;
import org.pkl.core.module.PathElement;
public class MessageTransportModuleResolver implements ExternalModuleResolver {
private final MessageTransport transport;
private final long evaluatorId;
private final Map<URI, Future<String>> readResponses = new ConcurrentHashMap<>();
private final Map<URI, Future<List<PathElement>>> listResponses = new ConcurrentHashMap<>();
private final Random requestIdGenerator = new Random();
public MessageTransportModuleResolver(MessageTransport transport, long evaluatorId) {
this.transport = transport;
this.evaluatorId = evaluatorId;
}
public List<PathElement> listElements(SecurityManager securityManager, URI uri)
throws IOException, SecurityManagerException {
securityManager.checkResolveModule(uri);
return doListElements(uri);
}
public boolean hasElement(SecurityManager securityManager, URI uri)
throws SecurityManagerException {
securityManager.checkResolveModule(uri);
try {
doReadModule(uri);
return true;
} catch (IOException e) {
return false;
}
}
public String resolveModule(SecurityManager securityManager, URI uri)
throws IOException, SecurityManagerException {
securityManager.checkResolveModule(uri);
return doReadModule(uri);
}
private String doReadModule(URI moduleUri) throws IOException {
return MessageTransports.resolveFuture(
readResponses.computeIfAbsent(
moduleUri,
(uri) -> {
var future = new CompletableFuture<String>();
var request = new ReadModuleRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ReadModuleResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else if (resp.contents() != null) {
future.complete(resp.contents());
} else {
future.complete("");
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
private List<PathElement> doListElements(URI baseUri) throws IOException {
return MessageTransports.resolveFuture(
listResponses.computeIfAbsent(
baseUri,
(uri) -> {
var future = new CompletableFuture<List<PathElement>>();
var request = new ListModulesRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ListModulesResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else {
future.complete(
Objects.requireNonNullElseGet(resp.pathElements(), List::of));
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
}

View File

@@ -0,0 +1,129 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.core.messaging;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.pkl.core.SecurityManager;
import org.pkl.core.SecurityManagerException;
import org.pkl.core.messaging.Messages.*;
import org.pkl.core.module.PathElement;
import org.pkl.core.resource.ExternalResourceResolver;
import org.pkl.core.resource.Resource;
public class MessageTransportResourceResolver implements ExternalResourceResolver {
private final MessageTransport transport;
private final long evaluatorId;
private final Map<URI, Future<byte[]>> readResponses = new ConcurrentHashMap<>();
private final Map<URI, Future<List<PathElement>>> listResponses = new ConcurrentHashMap<>();
private final Random requestIdGenerator = new Random();
public MessageTransportResourceResolver(MessageTransport transport, long evaluatorId) {
this.transport = transport;
this.evaluatorId = evaluatorId;
}
public Optional<Object> read(URI uri) throws IOException {
var result = doRead(uri);
return Optional.of(new Resource(uri, result));
}
public boolean hasElement(SecurityManager securityManager, URI elementUri)
throws SecurityManagerException {
securityManager.checkResolveResource(elementUri);
try {
doRead(elementUri);
return true;
} catch (IOException e) {
return false;
}
}
public List<PathElement> listElements(SecurityManager securityManager, URI baseUri)
throws IOException, SecurityManagerException {
securityManager.checkResolveResource(baseUri);
return doListElements(baseUri);
}
public List<PathElement> doListElements(URI baseUri) throws IOException {
return MessageTransports.resolveFuture(
listResponses.computeIfAbsent(
baseUri,
(uri) -> {
var future = new CompletableFuture<List<PathElement>>();
var request =
new ListResourcesRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ListResourcesResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else {
future.complete(
Objects.requireNonNullElseGet(resp.pathElements(), List::of));
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
public byte[] doRead(URI baseUri) throws IOException {
return MessageTransports.resolveFuture(
readResponses.computeIfAbsent(
baseUri,
(uri) -> {
var future = new CompletableFuture<byte[]>();
var request =
new ReadResourceRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ReadResourceResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else if (resp.contents() != null) {
future.complete(resp.contents());
} else {
future.complete(new byte[0]);
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
}

View File

@@ -18,25 +18,12 @@ package org.pkl.core.module;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.pkl.core.SecurityManager;
import org.pkl.core.SecurityManagerException;
import org.pkl.core.messaging.MessageTransport;
import org.pkl.core.messaging.MessageTransports;
import org.pkl.core.messaging.Messages.ListModulesRequest;
import org.pkl.core.messaging.Messages.ListModulesResponse;
import org.pkl.core.messaging.Messages.ReadModuleRequest;
import org.pkl.core.messaging.Messages.ReadModuleResponse;
import org.pkl.core.messaging.ProtocolException;
public class ExternalModuleResolver {
public interface ExternalModuleResolver {
public interface Spec {
interface Spec {
boolean hasHierarchicalUris();
boolean isGlobbable();
@@ -46,96 +33,12 @@ public class ExternalModuleResolver {
String scheme();
}
private final MessageTransport transport;
private final long evaluatorId;
private final Map<URI, Future<String>> readResponses = new ConcurrentHashMap<>();
private final Map<URI, Future<List<PathElement>>> listResponses = new ConcurrentHashMap<>();
private final Random requestIdGenerator = new Random();
String resolveModule(SecurityManager securityManager, URI uri)
throws IOException, SecurityManagerException;
public ExternalModuleResolver(MessageTransport transport, long evaluatorId) {
this.transport = transport;
this.evaluatorId = evaluatorId;
}
boolean hasElement(org.pkl.core.SecurityManager securityManager, URI elementUri)
throws SecurityManagerException;
public List<PathElement> listElements(SecurityManager securityManager, URI uri)
throws IOException, SecurityManagerException {
securityManager.checkResolveModule(uri);
return doListElements(uri);
}
public boolean hasElement(SecurityManager securityManager, URI uri)
throws SecurityManagerException {
securityManager.checkResolveModule(uri);
try {
doReadModule(uri);
return true;
} catch (IOException e) {
return false;
}
}
public String resolveModule(SecurityManager securityManager, URI uri)
throws IOException, SecurityManagerException {
securityManager.checkResolveModule(uri);
return doReadModule(uri);
}
private String doReadModule(URI moduleUri) throws IOException {
return MessageTransports.resolveFuture(
readResponses.computeIfAbsent(
moduleUri,
(uri) -> {
var future = new CompletableFuture<String>();
var request = new ReadModuleRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ReadModuleResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else if (resp.contents() != null) {
future.complete(resp.contents());
} else {
future.complete("");
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
private List<PathElement> doListElements(URI baseUri) throws IOException {
return MessageTransports.resolveFuture(
listResponses.computeIfAbsent(
baseUri,
(uri) -> {
var future = new CompletableFuture<List<PathElement>>();
var request = new ListModulesRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ListModulesResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else {
future.complete(
Objects.requireNonNullElseGet(resp.pathElements(), List::of));
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
List<PathElement> listElements(SecurityManager securityManager, URI baseUri)
throws IOException, SecurityManagerException;
}

View File

@@ -18,24 +18,14 @@ package org.pkl.core.resource;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.pkl.core.SecurityManager;
import org.pkl.core.SecurityManagerException;
import org.pkl.core.messaging.MessageTransport;
import org.pkl.core.messaging.MessageTransports;
import org.pkl.core.messaging.Messages.*;
import org.pkl.core.messaging.ProtocolException;
import org.pkl.core.module.PathElement;
public class ExternalResourceResolver {
public interface ExternalResourceResolver {
public interface Spec {
interface Spec {
boolean hasHierarchicalUris();
boolean isGlobbable();
@@ -43,97 +33,11 @@ public class ExternalResourceResolver {
String scheme();
}
private final MessageTransport transport;
private final long evaluatorId;
private final Map<URI, Future<byte[]>> readResponses = new ConcurrentHashMap<>();
private final Map<URI, Future<List<PathElement>>> listResponses = new ConcurrentHashMap<>();
private final Random requestIdGenerator = new Random();
Optional<Object> read(URI uri) throws IOException;
public ExternalResourceResolver(MessageTransport transport, long evaluatorId) {
this.transport = transport;
this.evaluatorId = evaluatorId;
}
boolean hasElement(SecurityManager securityManager, URI elementUri)
throws SecurityManagerException;
public Optional<Object> read(URI uri) throws IOException {
var result = doRead(uri);
return Optional.of(new Resource(uri, result));
}
public boolean hasElement(SecurityManager securityManager, URI elementUri)
throws SecurityManagerException {
securityManager.checkResolveResource(elementUri);
try {
doRead(elementUri);
return true;
} catch (IOException e) {
return false;
}
}
public List<PathElement> listElements(SecurityManager securityManager, URI baseUri)
throws IOException, SecurityManagerException {
securityManager.checkResolveResource(baseUri);
return doListElements(baseUri);
}
public List<PathElement> doListElements(URI baseUri) throws IOException {
return MessageTransports.resolveFuture(
listResponses.computeIfAbsent(
baseUri,
(uri) -> {
var future = new CompletableFuture<List<PathElement>>();
var request =
new ListResourcesRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ListResourcesResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else {
future.complete(
Objects.requireNonNullElseGet(resp.pathElements(), List::of));
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
public byte[] doRead(URI baseUri) throws IOException {
return MessageTransports.resolveFuture(
readResponses.computeIfAbsent(
baseUri,
(uri) -> {
var future = new CompletableFuture<byte[]>();
var request =
new ReadResourceRequest(requestIdGenerator.nextLong(), evaluatorId, uri);
try {
transport.send(
request,
(response) -> {
if (response instanceof ReadResourceResponse resp) {
if (resp.error() != null) {
future.completeExceptionally(new IOException(resp.error()));
} else if (resp.contents() != null) {
future.complete(resp.contents());
} else {
future.complete(new byte[0]);
}
} else {
future.completeExceptionally(new ProtocolException("unexpected response"));
}
});
} catch (ProtocolException | IOException e) {
future.completeExceptionally(e);
}
return future;
}));
}
List<PathElement> listElements(SecurityManager securityManager, URI baseUri)
throws IOException, SecurityManagerException;
}

View File

@@ -25,11 +25,11 @@ import java.util.concurrent.Future
import kotlin.random.Random
import org.pkl.core.externalreader.ExternalReaderMessages.*
import org.pkl.core.messaging.MessageTransport
import org.pkl.core.messaging.MessageTransportModuleResolver
import org.pkl.core.messaging.MessageTransportResourceResolver
import org.pkl.core.messaging.MessageTransports
import org.pkl.core.messaging.Messages.*
import org.pkl.core.messaging.ProtocolException
import org.pkl.core.module.ExternalModuleResolver
import org.pkl.core.resource.ExternalResourceResolver
class TestExternalReaderProcess(private val transport: MessageTransport) : ExternalReaderProcess {
private val initializeModuleReaderResponses: MutableMap<String, Future<ModuleReaderSpec?>> =
@@ -42,11 +42,11 @@ class TestExternalReaderProcess(private val transport: MessageTransport) : Exter
transport.close()
}
override fun getModuleResolver(evaluatorId: Long): ExternalModuleResolver =
ExternalModuleResolver(transport, evaluatorId)
override fun getModuleResolver(evaluatorId: Long): MessageTransportModuleResolver =
MessageTransportModuleResolver(transport, evaluatorId)
override fun getResourceResolver(evaluatorId: Long): ExternalResourceResolver =
ExternalResourceResolver(transport, evaluatorId)
override fun getResourceResolver(evaluatorId: Long): MessageTransportResourceResolver =
MessageTransportResourceResolver(transport, evaluatorId)
fun run() {
try {

View File

@@ -18,9 +18,9 @@ package org.pkl.server
import java.net.URI
import java.util.Optional
import org.pkl.core.messaging.*
import org.pkl.core.messaging.MessageTransportModuleResolver
import org.pkl.core.messaging.Messages.*
import org.pkl.core.module.*
import org.pkl.core.module.ExternalModuleResolver
internal class ClientModuleKeyFactory(
private val readerSpecs: Collection<ModuleReaderSpec>,
@@ -29,7 +29,8 @@ internal class ClientModuleKeyFactory(
) : ModuleKeyFactory {
private val schemes = readerSpecs.map { it.scheme }
private val resolver: ExternalModuleResolver = ExternalModuleResolver(transport, evaluatorId)
private val resolver: MessageTransportModuleResolver =
MessageTransportModuleResolver(transport, evaluatorId)
override fun create(uri: URI): Optional<ModuleKey> =
when (uri.scheme) {

View File

@@ -27,6 +27,7 @@ import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader
import org.pkl.core.externalreader.ExternalReaderProcess
import org.pkl.core.http.HttpClient
import org.pkl.core.messaging.MessageTransport
import org.pkl.core.messaging.MessageTransportResourceResolver
import org.pkl.core.messaging.MessageTransports
import org.pkl.core.messaging.ProtocolException
import org.pkl.core.module.ModuleKeyFactories
@@ -34,7 +35,6 @@ import org.pkl.core.module.ModuleKeyFactory
import org.pkl.core.module.ModulePathResolver
import org.pkl.core.packages.PackageUri
import org.pkl.core.project.DeclaredDependencies
import org.pkl.core.resource.ExternalResourceResolver
import org.pkl.core.resource.ResourceReader
import org.pkl.core.resource.ResourceReaders
import org.pkl.core.util.IoUtils
@@ -248,7 +248,7 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
add(
ResourceReaders.externalResolver(
readerSpec,
ExternalResourceResolver(transport, evaluatorId)
MessageTransportResourceResolver(transport, evaluatorId)
)
)
}