Implement SPICE-0009 External Readers (#660)

This adds a new feature, which allows Pkl to read resources and modules from external processes.

Follows the design laid out in SPICE-0009.

Also, this moves most of the messaging API into pkl-core
This commit is contained in:
Josh B
2024-10-28 18:22:14 -07:00
committed by GitHub
parent 466ae6fd4c
commit 666f8c3939
110 changed files with 4368 additions and 1810 deletions
@@ -17,16 +17,17 @@ package org.pkl.server
import org.pkl.core.Logger
import org.pkl.core.StackFrame
import org.pkl.core.messaging.MessageTransport
internal class ClientLogger(
private val evaluatorId: Long,
private val transport: MessageTransport
) : Logger {
override fun trace(message: String, frame: StackFrame) {
transport.send(LogMessage(evaluatorId, level = 0, message, frame.moduleUri))
transport.send(LogMessage(evaluatorId, 0, message, frame.moduleUri))
}
override fun warn(message: String, frame: StackFrame) {
transport.send(LogMessage(evaluatorId, level = 1, message, frame.moduleUri))
transport.send(LogMessage(evaluatorId, 1, message, frame.moduleUri))
}
}
@@ -15,136 +15,27 @@
*/
package org.pkl.server
import java.io.IOException
import java.net.URI
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Future
import kotlin.random.Random
import org.pkl.core.SecurityManager
import org.pkl.core.module.ModuleKey
import org.pkl.core.module.ModuleKeyFactory
import org.pkl.core.module.PathElement
import org.pkl.core.module.ResolvedModuleKey
import org.pkl.core.module.ResolvedModuleKeys
import org.pkl.core.messaging.*
import org.pkl.core.messaging.Messages.*
import org.pkl.core.module.*
import org.pkl.core.module.ExternalModuleResolver
internal class ClientModuleKeyFactory(
private val readerSpecs: Collection<ModuleReaderSpec>,
transport: MessageTransport,
evaluatorId: Long
) : ModuleKeyFactory {
companion object {
private class ClientModuleKeyResolver(
private val transport: MessageTransport,
private val evaluatorId: Long,
) {
private val readResponses: MutableMap<URI, Future<String>> = ConcurrentHashMap()
private val listResponses: MutableMap<URI, Future<List<PathElement>>> = ConcurrentHashMap()
fun listElements(securityManager: SecurityManager, uri: URI): List<PathElement> {
securityManager.checkResolveModule(uri)
return doListElements(uri)
}
fun hasElement(securityManager: SecurityManager, uri: URI): Boolean {
securityManager.checkResolveModule(uri)
return try {
doReadModule(uri)
true
} catch (e: IOException) {
false
}
}
fun resolveModule(securityManager: SecurityManager, uri: URI): String {
securityManager.checkResolveModule(uri)
return doReadModule(uri)
}
private fun doReadModule(uri: URI): String =
readResponses
.computeIfAbsent(uri) {
CompletableFuture<String>().apply {
val request = ReadModuleRequest(Random.nextLong(), evaluatorId, uri)
transport.send(request) { response ->
when (response) {
is ReadModuleResponse -> {
if (response.error != null) {
completeExceptionally(IOException(response.error))
} else {
complete(response.contents ?: "")
}
}
else -> {
completeExceptionally(ProtocolException("unexpected response"))
}
}
}
}
}
.getUnderlying()
private fun doListElements(uri: URI): List<PathElement> =
listResponses
.computeIfAbsent(uri) {
CompletableFuture<List<PathElement>>().apply {
val request = ListModulesRequest(Random.nextLong(), evaluatorId, uri)
transport.send(request) { response ->
when (response) {
is ListModulesResponse -> {
if (response.error != null) {
completeExceptionally(IOException(response.error))
} else {
complete(response.pathElements ?: emptyList())
}
}
else -> completeExceptionally(ProtocolException("unexpected response"))
}
}
}
}
.getUnderlying()
}
/** [ModuleKey] that delegates module reads to the client. */
private class ClientModuleKey(
private val uri: URI,
private val spec: ModuleReaderSpec,
private val resolver: ClientModuleKeyResolver,
) : ModuleKey {
override fun isLocal(): Boolean = spec.isLocal
override fun hasHierarchicalUris(): Boolean = spec.hasHierarchicalUris
override fun isGlobbable(): Boolean = spec.isGlobbable
override fun getUri(): URI = uri
override fun listElements(securityManager: SecurityManager, baseUri: URI): List<PathElement> =
resolver.listElements(securityManager, baseUri)
override fun resolve(securityManager: SecurityManager): ResolvedModuleKey {
val contents = resolver.resolveModule(securityManager, uri)
return ResolvedModuleKeys.virtual(this, uri, contents, true)
}
override fun hasElement(securityManager: SecurityManager, uri: URI): Boolean {
return resolver.hasElement(securityManager, uri)
}
}
}
private val schemes = readerSpecs.map { it.scheme }
private val resolver: ClientModuleKeyResolver = ClientModuleKeyResolver(transport, evaluatorId)
private val resolver: ExternalModuleResolver = ExternalModuleResolver(transport, evaluatorId)
override fun create(uri: URI): Optional<ModuleKey> =
when (uri.scheme) {
in schemes -> {
val readerSpec = readerSpecs.find { it.scheme == uri.scheme }!!
val moduleKey = ClientModuleKey(uri, readerSpec, resolver)
val moduleKey = ModuleKeys.externalResolver(uri, readerSpec, resolver)
Optional.of(moduleKey)
}
else -> Optional.empty()
@@ -1,105 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import java.io.IOException
import java.net.URI
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Future
import kotlin.random.Random
import org.pkl.core.SecurityManager
import org.pkl.core.module.PathElement
import org.pkl.core.resource.Resource
import org.pkl.core.resource.ResourceReader
/** Resource reader that delegates read logic to the client. */
internal class ClientResourceReader(
private val transport: MessageTransport,
private val evaluatorId: Long,
private val readerSpec: ResourceReaderSpec,
) : ResourceReader {
private val readResponses: MutableMap<URI, Future<ByteArray>> = ConcurrentHashMap()
private val listResources: MutableMap<URI, Future<List<PathElement>>> = ConcurrentHashMap()
override fun hasHierarchicalUris(): Boolean = readerSpec.hasHierarchicalUris
override fun isGlobbable(): Boolean = readerSpec.isGlobbable
override fun getUriScheme() = readerSpec.scheme
override fun read(uri: URI): Optional<Any> = Optional.of(Resource(uri, doRead(uri)))
override fun hasElement(securityManager: SecurityManager, elementUri: URI): Boolean {
securityManager.checkResolveResource(elementUri)
return try {
doRead(elementUri)
true
} catch (e: IOException) {
false
}
}
override fun listElements(securityManager: SecurityManager, baseUri: URI): List<PathElement> {
securityManager.checkResolveResource(baseUri)
return doListElements(baseUri)
}
private fun doListElements(baseUri: URI): List<PathElement> =
listResources
.computeIfAbsent(baseUri) {
CompletableFuture<List<PathElement>>().apply {
val request = ListResourcesRequest(Random.nextLong(), evaluatorId, baseUri)
transport.send(request) { response ->
when (response) {
is ListResourcesResponse ->
if (response.error != null) {
completeExceptionally(IOException(response.error))
} else {
complete(response.pathElements ?: emptyList())
}
else -> completeExceptionally(ProtocolException("Unexpected response"))
}
}
}
}
.getUnderlying()
private fun doRead(uri: URI): ByteArray =
readResponses
.computeIfAbsent(uri) {
CompletableFuture<ByteArray>().apply {
val request = ReadResourceRequest(Random.nextLong(), evaluatorId, uri)
transport.send(request) { response ->
when (response) {
is ReadResourceResponse -> {
if (response.error != null) {
completeExceptionally(IOException(response.error))
} else {
complete(response.contents ?: ByteArray(0))
}
}
else -> {
completeExceptionally(ProtocolException("Unexpected response: $response"))
}
}
}
}
}
.getUnderlying()
}
@@ -1,21 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
/** Decodes a stream of messages. */
internal interface MessageDecoder {
fun decode(): Message?
}
@@ -1,31 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import java.io.InputStream
import org.msgpack.core.MessagePack
import org.msgpack.core.MessageUnpacker
/** Factory methods for creating [MessageDecoder]s. */
internal object MessageDecoders {
fun from(stream: InputStream): MessageDecoder =
MessagePackDecoder(MessagePack.newDefaultUnpacker(stream))
fun from(unpacker: MessageUnpacker): MessageDecoder = MessagePackDecoder(unpacker)
fun from(array: ByteArray): MessageDecoder =
MessagePackDecoder(MessagePack.newDefaultUnpacker(array))
}
@@ -1,21 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
/** Encodes a stream of messages. */
internal interface MessageEncoder {
fun encode(msg: Message)
}
@@ -1,28 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import java.io.OutputStream
import org.msgpack.core.MessagePack
import org.msgpack.core.MessagePacker
/** Factory methods for creating [MessageEncoder]s. */
internal object MessageEncoders {
fun into(stream: OutputStream): MessageEncoder =
MessagePackEncoder(MessagePack.newDefaultPacker(stream))
fun into(packer: MessagePacker): MessageEncoder = MessagePackEncoder(packer)
}
@@ -1,292 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import java.net.URI
import java.nio.file.Path
import java.time.Duration
import java.util.regex.Pattern
import org.msgpack.core.MessageTypeException
import org.msgpack.core.MessageUnpacker
import org.msgpack.value.Value
import org.msgpack.value.impl.ImmutableStringValueImpl
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings
import org.pkl.core.module.PathElement
import org.pkl.core.packages.Checksums
internal class MessagePackDecoder(private val unpacker: MessageUnpacker) : MessageDecoder {
override fun decode(): Message? {
if (!unpacker.hasNext()) return null
val code =
try {
val arraySize = unpacker.unpackArrayHeader()
if (arraySize != 2) {
throw DecodeException("Malformed message header (expected size 2, but got $arraySize).")
}
unpacker.unpackInt()
} catch (e: MessageTypeException) {
throw DecodeException("Malformed message header.", e)
}
return try {
val map = unpacker.unpackValue().asMapValue().map()
when (code) {
MessageType.CREATE_EVALUATOR_REQUEST.code -> {
CreateEvaluatorRequest(
requestId = map.get("requestId").asIntegerValue().asLong(),
allowedModules = map.unpackStringListOrNull("allowedModules")?.map(Pattern::compile),
allowedResources =
map.unpackStringListOrNull("allowedResources")?.map(Pattern::compile),
clientModuleReaders = map.unpackModuleReaderSpec(),
clientResourceReaders = map.unpackResourceReaderSpec(),
modulePaths = map.unpackStringListOrNull("modulePaths")?.map(Path::of),
env = map.unpackStringMapOrNull("env"),
properties = map.unpackStringMapOrNull("properties"),
timeout = map.unpackLongOrNull("timeoutSeconds")?.let(Duration::ofSeconds),
rootDir = map.unpackStringOrNull("rootDir")?.let(Path::of),
cacheDir = map.unpackStringOrNull("cacheDir")?.let(Path::of),
outputFormat = map.unpackStringOrNull("outputFormat"),
project = map.unpackProject(),
http = map.unpackHttp(),
)
}
MessageType.CREATE_EVALUATOR_RESPONSE.code -> {
CreateEvaluatorResponse(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLongOrNull("evaluatorId"),
error = map.unpackStringOrNull("error")
)
}
MessageType.CLOSE_EVALUATOR.code -> {
CloseEvaluator(evaluatorId = map.unpackLong("evaluatorId"))
}
MessageType.EVALUATE_REQUEST.code -> {
EvaluateRequest(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
moduleUri = map.unpackString("moduleUri").let(::URI),
moduleText = map.unpackStringOrNull("moduleText"),
expr = map.unpackStringOrNull("expr")
)
}
MessageType.EVALUATE_RESPONSE.code -> {
EvaluateResponse(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
result = map.unpackByteArrayOrNull("result"),
error = map.unpackStringOrNull("error")
)
}
MessageType.LOG_MESSAGE.code -> {
LogMessage(
evaluatorId = map.unpackLong("evaluatorId"),
level = map.unpackIntValue("level"),
message = map.unpackString("message"),
frameUri = map.unpackString("frameUri")
)
}
MessageType.READ_RESOURCE_REQUEST.code -> {
ReadResourceRequest(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
uri = map.unpackString("uri").let(::URI)
)
}
MessageType.READ_RESOURCE_RESPONSE.code -> {
ReadResourceResponse(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
contents = map.unpackByteArrayOrNull("contents"),
error = map.unpackStringOrNull("error")
)
}
MessageType.READ_MODULE_REQUEST.code -> {
ReadModuleRequest(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
uri = map.unpackString("uri").let(::URI)
)
}
MessageType.READ_MODULE_RESPONSE.code -> {
ReadModuleResponse(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
contents = map.unpackStringOrNull("contents"),
error = map.unpackStringOrNull("error")
)
}
MessageType.LIST_MODULES_REQUEST.code -> {
ListModulesRequest(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
uri = map.unpackString("uri").let(::URI)
)
}
MessageType.LIST_MODULES_RESPONSE.code -> {
ListModulesResponse(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
pathElements = map.unpackPathElements("pathElements"),
error = map.unpackStringOrNull("error")
)
}
MessageType.LIST_RESOURCES_REQUEST.code -> {
ListResourcesRequest(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
uri = map.unpackString("uri").let(::URI)
)
}
MessageType.LIST_RESOURCES_RESPONSE.code -> {
ListResourcesResponse(
requestId = map.unpackLong("requestId"),
evaluatorId = map.unpackLong("evaluatorId"),
pathElements = map.unpackPathElements("pathElements"),
error = map.unpackStringOrNull("error")
)
}
else -> throw ProtocolException("Invalid message code: $code")
}
} catch (e: MessageTypeException) {
throw DecodeException("Malformed message body for message with code `$code`.", e)
}
}
private fun Array<Value>.unpackValueOrNull(key: String): Value? {
for (i in indices.step(2)) {
val currKey = this[i].asStringValue().asString()
if (currKey == key) return this[i + 1]
}
return null
}
private fun Map<Value, Value>.getNullable(key: String): Value? =
this[ImmutableStringValueImpl(key)]
private fun Map<Value, Value>.get(key: String): Value =
getNullable(key) ?: throw DecodeException("Missing message parameter `$key`")
private fun Array<Value>.unpackValue(key: String): Value =
unpackValueOrNull(key) ?: throw DecodeException("Missing message parameter `$key`.")
private fun Map<Value, Value>.unpackStringListOrNull(key: String): List<String>? {
val value = getNullable(key) ?: return null
return value.asArrayValue().map { it.asStringValue().asString() }
}
private fun Map<Value, Value>.unpackStringMapOrNull(key: String): Map<String, String>? {
val value = getNullable(key) ?: return null
return value.asMapValue().entrySet().associate { (k, v) ->
k.asStringValue().asString() to v.asStringValue().asString()
}
}
private fun Map<Value, Value>.unpackLong(key: String): Long = get(key).asIntegerValue().asLong()
private fun Map<Value, Value>.unpackBoolean(key: String): Boolean =
get(key).asBooleanValue().boolean
private fun Map<Value, Value>.unpackBooleanOrNull(key: String): Boolean? =
getNullable(key)?.asBooleanValue()?.boolean
private fun Map<Value, Value>.unpackLongOrNull(key: String): Long? =
getNullable(key)?.asIntegerValue()?.asLong()
private fun Map<Value, Value>.unpackIntValue(key: String): Int = get(key).asIntegerValue().asInt()
private fun Map<Value, Value>.unpackString(key: String): String =
get(key).asStringValue().asString()
private fun Map<Value, Value>.unpackStringOrNull(key: String): String? =
getNullable(key)?.asStringValue()?.asString()
private fun Map<Value, Value>.unpackByteArrayOrNull(key: String): ByteArray? =
getNullable(key)?.asBinaryValue()?.asByteArray()
private fun Map<Value, Value>.unpackPathElements(key: String): List<PathElement>? =
getNullable(key)?.asArrayValue()?.map { pathElement ->
val map = pathElement.asMapValue().map()
PathElement(map.unpackString("name"), map.unpackBoolean("isDirectory"))
}
private fun Map<Value, Value>.unpackModuleReaderSpec(): List<ModuleReaderSpec>? {
val keys = getNullable("clientModuleReaders") ?: return null
return keys.asArrayValue().toList().map { value ->
val readerMap = value.asMapValue().map()
ModuleReaderSpec(
scheme = readerMap.unpackString("scheme"),
hasHierarchicalUris = readerMap.unpackBoolean("hasHierarchicalUris"),
isLocal = readerMap.unpackBoolean("isLocal"),
isGlobbable = readerMap.unpackBoolean("isGlobbable")
)
}
}
private fun Map<Value, Value>.unpackResourceReaderSpec(): List<ResourceReaderSpec> {
val keys = getNullable("clientResourceReaders") ?: return emptyList()
return keys.asArrayValue().toList().map { value ->
val readerMap = value.asMapValue().map()
ResourceReaderSpec(
scheme = readerMap.unpackString("scheme"),
hasHierarchicalUris = readerMap.unpackBoolean("hasHierarchicalUris"),
isGlobbable = readerMap.unpackBoolean("isGlobbable")
)
}
}
private fun Map<Value, Value>.unpackProject(): Project? {
val projMap = getNullable("project")?.asMapValue()?.map() ?: return null
val projectFileUri = URI(projMap.unpackString("projectFileUri"))
val dependencies = projMap.unpackDependencies("dependencies")
return Project(projectFileUri, null, dependencies)
}
private fun Map<Value, Value>.unpackHttp(): Http? {
val httpMap = getNullable("http")?.asMapValue()?.map() ?: return null
val proxy = httpMap.unpackProxy()
val caCertificates = httpMap.getNullable("caCertificates")?.asBinaryValue()?.asByteArray()
return Http(caCertificates, proxy)
}
private fun Map<Value, Value>.unpackProxy(): PklEvaluatorSettings.Proxy? {
val proxyMap = getNullable("proxy")?.asMapValue()?.map() ?: return null
val address = proxyMap.unpackString("address")
val noProxy = proxyMap.unpackStringListOrNull("noProxy")
return PklEvaluatorSettings.Proxy.create(address, noProxy)
}
private fun Map<Value, Value>.unpackDependencies(name: String): Map<String, Dependency> {
val mapValue = get(name).asMapValue().map()
return mapValue.entries.associate { (key, value) ->
val dependencyName = key.asStringValue().asString()
val dependencyObj = value.asMapValue().map()
val type = dependencyObj.unpackString("type")
val packageUri = URI(dependencyObj.unpackString("packageUri"))
if (type == DependencyType.REMOTE.value) {
val checksums =
dependencyObj.getNullable("checksums")?.asMapValue()?.map()?.let { obj ->
val sha256 = obj.unpackString("sha256")
Checksums(sha256)
}
return@associate dependencyName to RemoteDependency(packageUri, checksums)
}
val dependencies = dependencyObj.unpackDependencies("dependencies")
val projectFileUri = dependencyObj.unpackString("projectFileUri")
dependencyName to Project(URI(projectFileUri), packageUri, dependencies)
}
}
}
@@ -1,332 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import kotlin.io.path.pathString
import org.msgpack.core.MessagePacker
import org.pkl.core.module.PathElement
import org.pkl.core.packages.Checksums
internal class MessagePackEncoder(private val packer: MessagePacker) : MessageEncoder {
private fun MessagePacker.packModuleReaderSpec(reader: ModuleReaderSpec) {
packMapHeader(4)
packKeyValue("scheme", reader.scheme)
packKeyValue("hasHierarchicalUris", reader.hasHierarchicalUris)
packKeyValue("isLocal", reader.isLocal)
packKeyValue("isGlobbable", reader.isGlobbable)
}
private fun MessagePacker.packResourceReaderSpec(reader: ResourceReaderSpec) {
packMapHeader(3)
packKeyValue("scheme", reader.scheme)
packKeyValue("hasHierarchicalUris", reader.hasHierarchicalUris)
packKeyValue("isGlobbable", reader.isGlobbable)
}
private fun MessagePacker.packPathElement(pathElement: PathElement) {
packMapHeader(2)
packKeyValue("name", pathElement.name)
packKeyValue("isDirectory", pathElement.isDirectory)
}
private fun MessagePacker.packProject(project: Project) {
packMapHeader(2)
packKeyValue("projectFileUri", project.projectFileUri.toString())
packString("dependencies")
packDependencies(project.dependencies)
}
private fun MessagePacker.packHttp(http: Http) {
if ((http.caCertificates ?: http.proxy) == null) {
packMapHeader(0)
return
}
packMapHeader(0, http.caCertificates, http.proxy)
packKeyValue("caCertificates", http.caCertificates)
http.proxy?.let { proxy ->
packString("proxy")
packMapHeader(0, proxy.address, proxy.noProxy)
packKeyValue("address", proxy.address?.toString())
packKeyValue("noProxy", proxy.noProxy)
}
}
private fun MessagePacker.packDependencies(dependencies: Map<String, Dependency>) {
packMapHeader(dependencies.size)
for ((name, dep) in dependencies) {
packString(name)
if (dep is Project) {
packMapHeader(4)
packKeyValue("type", dep.type.value)
packKeyValue("packageUri", dep.packageUri.toString())
packKeyValue("projectFileUri", dep.projectFileUri.toString())
packString("dependencies")
packDependencies(dep.dependencies)
} else {
dep as RemoteDependency
packMapHeader(dep.checksums?.let { 3 } ?: 2)
packKeyValue("type", dep.type.value)
packKeyValue("packageUri", dep.packageUri.toString())
dep.checksums?.let { checksums ->
packString("checksums")
packChecksums(checksums)
}
}
}
}
private fun MessagePacker.packChecksums(checksums: Checksums) {
packMapHeader(1)
packKeyValue("sha256", checksums.sha256)
}
override fun encode(msg: Message) =
with(packer) {
packArrayHeader(2)
packInt(msg.type.code)
@Suppress("DuplicatedCode")
when (msg.type.code) {
MessageType.CREATE_EVALUATOR_REQUEST.code -> {
msg as CreateEvaluatorRequest
packMapHeader(
8,
msg.timeout,
msg.rootDir,
msg.cacheDir,
msg.outputFormat,
msg.project,
msg.http
)
packKeyValue("requestId", msg.requestId)
packKeyValue("allowedModules", msg.allowedModules?.map { it.toString() })
packKeyValue("allowedResources", msg.allowedResources?.map { it.toString() })
if (msg.clientModuleReaders != null) {
packString("clientModuleReaders")
packArrayHeader(msg.clientModuleReaders.size)
for (moduleReader in msg.clientModuleReaders) {
packModuleReaderSpec(moduleReader)
}
}
if (msg.clientResourceReaders != null) {
packString("clientResourceReaders")
packArrayHeader(msg.clientResourceReaders.size)
for (resourceReader in msg.clientResourceReaders) {
packResourceReaderSpec(resourceReader)
}
}
packKeyValue("modulePaths", msg.modulePaths?.map { it.pathString })
packKeyValue("env", msg.env)
packKeyValue("properties", msg.properties)
packKeyValue("timeoutSeconds", msg.timeout?.toSeconds())
packKeyValue("rootDir", msg.rootDir?.pathString)
packKeyValue("cacheDir", msg.cacheDir?.pathString)
packKeyValue("outputFormat", msg.outputFormat)
if (msg.project != null) {
packString("project")
packProject(msg.project)
}
if (msg.http != null) {
packString("http")
packHttp(msg.http)
}
}
MessageType.CREATE_EVALUATOR_RESPONSE.code -> {
msg as CreateEvaluatorResponse
packMapHeader(1, msg.evaluatorId, msg.error)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("error", msg.error)
}
MessageType.CLOSE_EVALUATOR.code -> {
msg as CloseEvaluator
packMapHeader(1)
packKeyValue("evaluatorId", msg.evaluatorId)
}
MessageType.EVALUATE_REQUEST.code -> {
msg as EvaluateRequest
packMapHeader(3, msg.moduleText, msg.expr)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("moduleUri", msg.moduleUri.toString())
packKeyValue("moduleText", msg.moduleText)
packKeyValue("expr", msg.expr)
}
MessageType.EVALUATE_RESPONSE.code -> {
msg as EvaluateResponse
packMapHeader(2, msg.result, msg.error)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("result", msg.result)
packKeyValue("error", msg.error)
}
MessageType.LOG_MESSAGE.code -> {
msg as LogMessage
packMapHeader(4)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("level", msg.level)
packKeyValue("message", msg.message)
packKeyValue("frameUri", msg.frameUri)
}
MessageType.READ_RESOURCE_REQUEST.code -> {
msg as ReadResourceRequest
packMapHeader(3)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("uri", msg.uri.toString())
}
MessageType.READ_RESOURCE_RESPONSE.code -> {
msg as ReadResourceResponse
packMapHeader(2, msg.contents, msg.error)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("contents", msg.contents)
packKeyValue("error", msg.error)
}
MessageType.READ_MODULE_REQUEST.code -> {
msg as ReadModuleRequest
packMapHeader(3)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("uri", msg.uri.toString())
}
MessageType.READ_MODULE_RESPONSE.code -> {
msg as ReadModuleResponse
packMapHeader(2, msg.contents, msg.error)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("contents", msg.contents)
packKeyValue("error", msg.error)
}
MessageType.LIST_MODULES_REQUEST.code -> {
msg as ListModulesRequest
packMapHeader(3)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("uri", msg.uri.toString())
}
MessageType.LIST_MODULES_RESPONSE.code -> {
msg as ListModulesResponse
packMapHeader(2, msg.pathElements, msg.error)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
if (msg.pathElements != null) {
packString("pathElements")
packArrayHeader(msg.pathElements.size)
for (pathElement in msg.pathElements) {
packPathElement(pathElement)
}
}
packKeyValue("error", msg.error)
}
MessageType.LIST_RESOURCES_REQUEST.code -> {
msg as ListResourcesRequest
packMapHeader(3)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("uri", msg.uri.toString())
}
MessageType.LIST_RESOURCES_RESPONSE.code -> {
msg as ListResourcesResponse
packMapHeader(2, msg.pathElements, msg.error)
packKeyValue("requestId", msg.requestId)
packKeyValue("evaluatorId", msg.evaluatorId)
if (msg.pathElements != null) {
packString("pathElements")
packArrayHeader(msg.pathElements.size)
for (pathElement in msg.pathElements) {
packPathElement(pathElement)
}
}
packKeyValue("error", msg.error)
}
else -> {
throw RuntimeException("Missing encoding for ${msg.javaClass.simpleName}")
}
}
flush()
}
private fun MessagePacker.packMapHeader(size: Int, value1: Any?, value2: Any?) =
packMapHeader(size + (if (value1 != null) 1 else 0) + (if (value2 != null) 1 else 0))
private fun MessagePacker.packMapHeader(
size: Int,
value1: Any?,
value2: Any?,
value3: Any?,
value4: Any?,
value5: Any?,
value6: Any?
) =
packMapHeader(
size +
(if (value1 != null) 1 else 0) +
(if (value2 != null) 1 else 0) +
(if (value3 != null) 1 else 0) +
(if (value4 != null) 1 else 0) +
(if (value5 != null) 1 else 0) +
(if (value6 != null) 1 else 0)
)
private fun MessagePacker.packKeyValue(name: String, value: Int?) {
if (value == null) return
packString(name)
packInt(value)
}
private fun MessagePacker.packKeyValue(name: String, value: Long?) {
if (value == null) return
packString(name)
packLong(value)
}
private fun MessagePacker.packKeyValue(name: String, value: String?) {
if (value == null) return
packString(name)
packString(value)
}
private fun MessagePacker.packKeyValue(name: String, value: Collection<String>?) {
if (value == null) return
packString(name)
packArrayHeader(value.size)
for (elem in value) packString(elem)
}
private fun MessagePacker.packKeyValue(name: String, value: Map<String, String>?) {
if (value == null) return
packString(name)
packMapHeader(value.size)
for ((k, v) in value) {
packString(k)
packString(v)
}
}
private fun MessagePacker.packKeyValue(name: String, value: ByteArray?) {
if (value == null) return
packString(name)
packBinaryHeader(value.size)
writePayload(value)
}
private fun MessagePacker.packKeyValue(name: String, value: Boolean) {
packString(name)
packBoolean(value)
}
}
@@ -1,27 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
/** A bidirectional transport for sending and receiving messages. */
interface MessageTransport : AutoCloseable {
fun start(oneWayHandler: (OneWayMessage) -> Unit, requestHandler: (RequestMessage) -> Unit)
fun send(message: OneWayMessage)
fun send(message: RequestMessage, responseHandler: (ResponseMessage) -> Unit)
fun send(message: ResponseMessage)
}
@@ -1,135 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import java.io.InputStream
import java.io.OutputStream
import java.util.concurrent.ConcurrentHashMap
/** Factory methods for creating [MessageTransport]s. */
object MessageTransports {
/** Creates a message transport that reads from [inputStream] and writes to [outputStream]. */
fun stream(inputStream: InputStream, outputStream: OutputStream): MessageTransport {
return EncodingMessageTransport(
MessageDecoders.from(inputStream),
MessageEncoders.into(outputStream)
)
}
/** Creates "client" and "server" transports that are directly connected to each other. */
fun direct(): Pair<MessageTransport, MessageTransport> {
val transport1 = DirectMessageTransport()
val transport2 = DirectMessageTransport()
transport1.other = transport2
transport2.other = transport1
return transport1 to transport2
}
internal class EncodingMessageTransport(
private val decoder: MessageDecoder,
private val encoder: MessageEncoder,
) : AbstractMessageTransport() {
@Volatile private var isClosed: Boolean = false
override fun doStart() {
while (!isClosed) {
val message = decoder.decode() ?: return
accept(message)
}
}
override fun doClose() {
isClosed = true
}
override fun doSend(message: Message) {
encoder.encode(message)
}
}
internal class DirectMessageTransport : AbstractMessageTransport() {
lateinit var other: DirectMessageTransport
override fun doStart() {}
override fun doClose() {}
override fun doSend(message: Message) {
other.accept(message)
}
}
// TODO: clean up callbacks if evaluation fails for some reason (ThreadInterrupt, timeout, etc)
internal abstract class AbstractMessageTransport : MessageTransport {
private lateinit var oneWayHandler: (OneWayMessage) -> Unit
private lateinit var requestHandler: (RequestMessage) -> Unit
private val responseHandlers: MutableMap<Long, (ResponseMessage) -> Unit> = ConcurrentHashMap()
protected abstract fun doStart()
protected abstract fun doClose()
protected abstract fun doSend(message: Message)
protected fun accept(message: Message) {
log("Received message: $message")
when (message) {
is OneWayMessage -> oneWayHandler(message)
is RequestMessage -> requestHandler(message)
is ResponseMessage -> {
val handler =
responseHandlers.remove(message.requestId)
?: throw ProtocolException(
"Received response ${message.javaClass.simpleName} for unknown request ID `${message.requestId}`."
)
handler(message)
}
}
}
final override fun start(
oneWayHandler: (OneWayMessage) -> Unit,
requestHandler: (RequestMessage) -> Unit
) {
log("Starting transport: $this")
this.oneWayHandler = oneWayHandler
this.requestHandler = requestHandler
doStart()
}
final override fun close() {
log("Closing transport: $this")
doClose()
responseHandlers.clear()
}
override fun send(message: OneWayMessage) {
log("Sending message: $message")
doSend(message)
}
override fun send(message: RequestMessage, responseHandler: (ResponseMessage) -> Unit) {
log("Sending message: $message")
responseHandlers[message.requestId] = responseHandler
return doSend(message)
}
override fun send(message: ResponseMessage) {
log("Sending message: $message")
doSend(message)
}
}
}
@@ -15,18 +15,27 @@
*/
package org.pkl.server
import java.io.InputStream
import java.io.OutputStream
import java.net.URI
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.random.Random
import org.pkl.core.*
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader
import org.pkl.core.externalreader.ExternalReaderProcess
import org.pkl.core.externalreader.ExternalReaderProcessImpl
import org.pkl.core.http.HttpClient
import org.pkl.core.messaging.MessageTransport
import org.pkl.core.messaging.MessageTransports
import org.pkl.core.messaging.ProtocolException
import org.pkl.core.module.ModuleKeyFactories
import org.pkl.core.module.ModuleKeyFactory
import org.pkl.core.module.ModulePathResolver
import org.pkl.core.packages.PackageUri
import org.pkl.core.project.DeclaredDependencies
import org.pkl.core.resource.ExternalResourceResolver
import org.pkl.core.resource.ResourceReader
import org.pkl.core.resource.ResourceReaders
import org.pkl.core.util.IoUtils
@@ -37,6 +46,22 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
// https://github.com/jano7/executor would be the perfect executor here
private val executor: ExecutorService = Executors.newSingleThreadExecutor()
// ExternalProcess instances with the same ExternalReader spec are shared per evaluator
private val externalReaderProcesses:
MutableMap<Long, MutableMap<ExternalReader, ExternalReaderProcess>> =
ConcurrentHashMap()
companion object {
fun stream(inputStream: InputStream, outputStream: OutputStream): Server =
Server(
MessageTransports.stream(
ServerMessagePackDecoder(inputStream),
ServerMessagePackEncoder(outputStream),
::log
)
)
}
/** Starts listening to incoming messages */
fun start() {
transport.start(
@@ -71,13 +96,13 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
private fun handleCreateEvaluator(message: CreateEvaluatorRequest) {
val evaluatorId = Random.Default.nextLong()
val baseResponse = CreateEvaluatorResponse(message.requestId, evaluatorId = null, error = null)
val baseResponse = CreateEvaluatorResponse(message.requestId(), null, null)
val evaluator =
try {
createEvaluator(message, evaluatorId)
} catch (e: ServerException) {
transport.send(baseResponse.copy(error = e.message))
} catch (e: ProtocolException) {
transport.send(baseResponse.copy(error = e.message ?: ""))
return
}
@@ -86,7 +111,7 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
}
private fun handleEvaluate(msg: EvaluateRequest) {
val baseResponse = EvaluateResponse(msg.requestId, msg.evaluatorId, result = null, error = null)
val baseResponse = EvaluateResponse(msg.requestId(), msg.evaluatorId, null, null)
val evaluator = evaluators[msg.evaluatorId]
if (evaluator == null) {
@@ -103,7 +128,7 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
} catch (e: PklBugException) {
transport.send(baseResponse.copy(error = e.toString()))
} catch (e: PklException) {
transport.send(baseResponse.copy(error = e.message))
transport.send(baseResponse.copy(error = e.message ?: ""))
}
}
}
@@ -115,6 +140,9 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
return
}
evaluator.close()
// close any running ExternalProcess instances for the closed evaluator
externalReaderProcesses[message.evaluatorId]?.values?.forEach { it.close() }
}
private fun buildDeclaredDependencies(
@@ -167,8 +195,9 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
message.http?.proxy?.let { proxy ->
setProxy(proxy.address, proxy.noProxy ?: listOf())
proxy.address?.let(IoUtils::setSystemProxy)
proxy.noProxy?.let { System.setProperty("http.nonProxyHosts", it.joinToString("|")) }
}
message.http?.caCertificates?.let { caCertificates -> addCertificates(caCertificates) }
message.http?.caCertificates?.let(::addCertificates)
buildLazily()
}
val dependencies =
@@ -210,10 +239,19 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
add(ResourceReaders.pkg())
add(ResourceReaders.projectpackage())
add(ResourceReaders.modulePath(modulePathResolver))
for ((scheme, spec) in message.externalResourceReaders ?: emptyMap()) {
add(
ResourceReaders.externalProcess(scheme, getExternalProcess(evaluatorId, spec), evaluatorId)
)
}
// add client-side resource readers last to ensure they win over builtin ones
for (readerSpec in message.clientResourceReaders ?: emptyList()) {
val resourceReader = ClientResourceReader(transport, evaluatorId, readerSpec)
add(resourceReader)
add(
ResourceReaders.externalResolver(
readerSpec,
ExternalResourceResolver(transport, evaluatorId)
)
)
}
}
@@ -226,6 +264,15 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
if (message.clientModuleReaders?.isNotEmpty() == true) {
add(ClientModuleKeyFactory(message.clientModuleReaders, transport, evaluatorId))
}
for ((scheme, spec) in message.externalModuleReaders ?: emptyMap()) {
add(
ModuleKeyFactories.externalProcess(
scheme,
getExternalProcess(evaluatorId, spec),
evaluatorId
)
)
}
add(ModuleKeyFactories.standardLibrary)
addAll(ModuleKeyFactories.fromServiceProviders())
add(ModuleKeyFactories.file)
@@ -235,4 +282,9 @@ class Server(private val transport: MessageTransport) : AutoCloseable {
add(ModuleKeyFactories.http)
add(ModuleKeyFactories.genericUrl)
}
private fun getExternalProcess(evaluatorId: Long, spec: ExternalReader): ExternalReaderProcess =
externalReaderProcesses
.computeIfAbsent(evaluatorId) { ConcurrentHashMap() }
.computeIfAbsent(spec) { ExternalReaderProcessImpl(it) }
}
@@ -1,24 +0,0 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
sealed class ServerException(msg: String, cause: Throwable?) : Exception(msg, cause)
open class ProtocolException(msg: String, cause: Throwable? = null) : ServerException(msg, cause)
class InvalidCommandException(msg: String, cause: Throwable? = null) : ServerException(msg, cause)
class DecodeException(msg: String, cause: Throwable? = null) : ProtocolException(msg, cause)
@@ -0,0 +1,134 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import java.io.InputStream
import java.net.URI
import java.nio.file.Path
import java.time.Duration
import java.util.regex.Pattern
import org.msgpack.core.MessagePack
import org.msgpack.core.MessageUnpacker
import org.msgpack.value.Value
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader
import org.pkl.core.messaging.BaseMessagePackDecoder
import org.pkl.core.messaging.Message
import org.pkl.core.packages.Checksums
class ServerMessagePackDecoder(unpacker: MessageUnpacker) : BaseMessagePackDecoder(unpacker) {
constructor(stream: InputStream) : this(MessagePack.newDefaultUnpacker(stream))
override fun decodeMessage(msgType: Message.Type, map: Map<Value, Value>): Message? {
return when (msgType) {
Message.Type.CREATE_EVALUATOR_REQUEST ->
CreateEvaluatorRequest(
get(map, "requestId").asIntegerValue().asLong(),
unpackStringListOrNull(map, "allowedModules", Pattern::compile),
unpackStringListOrNull(map, "allowedResources", Pattern::compile),
unpackListOrNull(map, "clientModuleReaders") { unpackModuleReaderSpec(it)!! },
unpackListOrNull(map, "clientResourceReaders") { unpackResourceReaderSpec(it)!! },
unpackStringListOrNull(map, "modulePaths", Path::of),
unpackStringMapOrNull(map, "env"),
unpackStringMapOrNull(map, "properties"),
unpackLongOrNull(map, "timeoutSeconds", Duration::ofSeconds),
unpackStringOrNull(map, "rootDir", Path::of),
unpackStringOrNull(map, "cacheDir", Path::of),
unpackStringOrNull(map, "outputFormat"),
map.unpackProject(),
map.unpackHttp(),
unpackStringMapOrNull(map, "externalModuleReaders", ::unpackExternalReader),
unpackStringMapOrNull(map, "externalResourceReaders", ::unpackExternalReader)
)
Message.Type.CREATE_EVALUATOR_RESPONSE ->
CreateEvaluatorResponse(
unpackLong(map, "requestId"),
unpackLongOrNull(map, "evaluatorId"),
unpackStringOrNull(map, "error")
)
Message.Type.CLOSE_EVALUATOR -> CloseEvaluator(unpackLong(map, "evaluatorId"))
Message.Type.EVALUATE_REQUEST ->
EvaluateRequest(
unpackLong(map, "requestId"),
unpackLong(map, "evaluatorId"),
URI(unpackString(map, "moduleUri")),
unpackStringOrNull(map, "moduleText"),
unpackStringOrNull(map, "expr")
)
Message.Type.EVALUATE_RESPONSE ->
EvaluateResponse(
unpackLong(map, "requestId"),
unpackLong(map, "evaluatorId"),
unpackByteArray(map, "result"),
unpackStringOrNull(map, "error")
)
Message.Type.LOG_MESSAGE ->
LogMessage(
unpackLong(map, "evaluatorId"),
unpackInt(map, "level"),
unpackString(map, "message"),
unpackString(map, "frameUri")
)
else -> super.decodeMessage(msgType, map)
}
}
private fun Map<Value, Value>.unpackProject(): Project? {
val projMap = getNullable(this, "project")?.asMapValue()?.map() ?: return null
val projectFileUri = URI(unpackString(projMap, "projectFileUri"))
val dependencies = projMap.unpackDependencies("dependencies")
return Project(projectFileUri, null, dependencies)
}
private fun Map<Value, Value>.unpackHttp(): Http? {
val httpMap = getNullable(this, "http")?.asMapValue()?.map() ?: return null
val proxy = httpMap.unpackProxy()
val caCertificates = getNullable(httpMap, "caCertificates")?.asBinaryValue()?.asByteArray()
return Http(caCertificates, proxy)
}
private fun Map<Value, Value>.unpackProxy(): PklEvaluatorSettings.Proxy? {
val proxyMap = getNullable(this, "proxy")?.asMapValue()?.map() ?: return null
val address = unpackString(proxyMap, "address")
val noProxy = unpackStringListOrNull(proxyMap, "noProxy")
return PklEvaluatorSettings.Proxy.create(address, noProxy)
}
private fun Map<Value, Value>.unpackDependencies(name: String): Map<String, Dependency> {
val mapValue = get(this, name).asMapValue().map()
return mapValue.entries.associate { (key, value) ->
val dependencyName = key.asStringValue().asString()
val dependencyObj = value.asMapValue().map()
val type = unpackString(dependencyObj, "type")
val packageUri = URI(unpackString(dependencyObj, "packageUri"))
if (type == DependencyType.REMOTE.value) {
val checksums =
getNullable(dependencyObj, "checksums")?.asMapValue()?.map()?.let { obj ->
val sha256 = unpackString(obj, "sha256")
Checksums(sha256)
}
return@associate dependencyName to RemoteDependency(packageUri, checksums)
}
val dependencies = dependencyObj.unpackDependencies("dependencies")
val projectFileUri = unpackString(dependencyObj, "projectFileUri")
dependencyName to Project(URI(projectFileUri), packageUri, dependencies)
}
}
private fun unpackExternalReader(map: Map<Value, Value>): ExternalReader =
ExternalReader(unpackString(map, "executable"), unpackStringListOrNull(map, "arguments")!!)
}
@@ -0,0 +1,197 @@
/*
* Copyright © 2024 Apple Inc. and the Pkl project authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.pkl.server
import java.io.OutputStream
import java.nio.file.Path
import kotlin.io.path.pathString
import org.msgpack.core.MessagePack
import org.msgpack.core.MessagePacker
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader
import org.pkl.core.messaging.BaseMessagePackEncoder
import org.pkl.core.messaging.Message
import org.pkl.core.packages.Checksums
class ServerMessagePackEncoder(packer: MessagePacker) : BaseMessagePackEncoder(packer) {
constructor(stream: OutputStream) : this(MessagePack.newDefaultPacker(stream))
private fun MessagePacker.packProject(project: Project) {
packMapHeader(2)
packKeyValue("projectFileUri", project.projectFileUri.toString())
packString("dependencies")
packDependencies(project.dependencies)
}
private fun MessagePacker.packHttp(http: Http) {
packMapHeader(0, http.caCertificates, http.proxy)
http.caCertificates?.let { packKeyValue("caCertificates", it) }
http.proxy?.let { proxy ->
packString("proxy")
packMapHeader(0, proxy.address, proxy.noProxy)
packKeyValue("address", proxy.address?.toString())
packKeyValue("noProxy", proxy.noProxy)
}
}
private fun MessagePacker.packDependencies(dependencies: Map<String, Dependency>) {
packMapHeader(dependencies.size)
for ((name, dep) in dependencies) {
packString(name)
if (dep is Project) {
packMapHeader(4)
packKeyValue("type", dep.type.value)
packKeyValue("packageUri", dep.packageUri.toString())
packKeyValue("projectFileUri", dep.projectFileUri.toString())
packString("dependencies")
packDependencies(dep.dependencies)
} else {
dep as RemoteDependency
packMapHeader(dep.checksums?.let { 3 } ?: 2)
packKeyValue("type", dep.type.value)
packKeyValue("packageUri", dep.packageUri.toString())
dep.checksums?.let { checksums ->
packString("checksums")
packChecksums(checksums)
}
}
}
}
private fun MessagePacker.packChecksums(checksums: Checksums) {
packMapHeader(1)
packKeyValue("sha256", checksums.sha256)
}
private fun MessagePacker.packExternalReader(spec: ExternalReader) {
packMapHeader(1, spec.arguments)
packKeyValue("executable", spec.executable)
spec.arguments?.let { packKeyValue("arguments", it) }
}
override fun encodeMessage(msg: Message) {
when (msg.type()) {
Message.Type.CREATE_EVALUATOR_REQUEST -> {
msg as CreateEvaluatorRequest
packMapHeader(
1,
msg.allowedModules,
msg.allowedResources,
msg.clientModuleReaders,
msg.clientResourceReaders,
msg.modulePaths,
msg.env,
msg.properties,
msg.timeout,
msg.rootDir,
msg.cacheDir,
msg.outputFormat,
msg.project,
msg.http,
msg.externalModuleReaders,
msg.externalResourceReaders,
)
packKeyValue("requestId", msg.requestId())
packKeyValue("allowedModules", msg.allowedModules?.map { it.toString() })
packKeyValue("allowedResources", msg.allowedResources?.map { it.toString() })
if (msg.clientModuleReaders != null) {
packer.packString("clientModuleReaders")
packer.packArrayHeader(msg.clientModuleReaders.size)
for (moduleReader in msg.clientModuleReaders) {
packModuleReaderSpec(moduleReader)
}
}
if (msg.clientResourceReaders != null) {
packer.packString("clientResourceReaders")
packer.packArrayHeader(msg.clientResourceReaders.size)
for (resourceReader in msg.clientResourceReaders) {
packResourceReaderSpec(resourceReader)
}
}
packKeyValue("modulePaths", msg.modulePaths, Path::toString)
packKeyValue("env", msg.env)
packKeyValue("properties", msg.properties)
packKeyValue("timeoutSeconds", msg.timeout?.toSeconds())
packKeyValue("rootDir", msg.rootDir?.pathString)
packKeyValue("cacheDir", msg.cacheDir?.pathString)
packKeyValue("outputFormat", msg.outputFormat)
if (msg.project != null) {
packer.packString("project")
packer.packProject(msg.project)
}
if (msg.http != null) {
packer.packString("http")
packer.packHttp(msg.http)
}
if (msg.externalModuleReaders != null) {
packer.packString("externalModuleReaders")
packer.packMapHeader(msg.externalModuleReaders.size)
for ((scheme, spec) in msg.externalModuleReaders) {
packer.packString(scheme)
packer.packExternalReader(spec)
}
}
if (msg.externalResourceReaders != null) {
packer.packString("externalResourceReaders")
packer.packMapHeader(msg.externalResourceReaders.size)
for ((scheme, spec) in msg.externalResourceReaders) {
packer.packString(scheme)
packer.packExternalReader(spec)
}
}
return
}
Message.Type.CREATE_EVALUATOR_RESPONSE -> {
msg as CreateEvaluatorResponse
packMapHeader(1, msg.evaluatorId, msg.error)
packKeyValue("requestId", msg.requestId())
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("error", msg.error)
}
Message.Type.CLOSE_EVALUATOR -> {
msg as CloseEvaluator
packer.packMapHeader(1)
packKeyValue("evaluatorId", msg.evaluatorId)
}
Message.Type.EVALUATE_REQUEST -> {
msg as EvaluateRequest
packMapHeader(3, msg.moduleText, msg.expr)
packKeyValue("requestId", msg.requestId())
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("moduleUri", msg.moduleUri.toString())
packKeyValue("moduleText", msg.moduleText)
packKeyValue("expr", msg.expr)
}
Message.Type.EVALUATE_RESPONSE -> {
msg as EvaluateResponse
packMapHeader(2, msg.result, msg.error)
packKeyValue("requestId", msg.requestId())
packKeyValue("evaluatorId", msg.evaluatorId)
msg.result?.let { packKeyValue("result", it) }
packKeyValue("error", msg.error)
}
Message.Type.LOG_MESSAGE -> {
msg as LogMessage
packer.packMapHeader(4)
packKeyValue("evaluatorId", msg.evaluatorId)
packKeyValue("level", msg.level)
packKeyValue("message", msg.message)
packKeyValue("frameUri", msg.frameUri)
}
else -> super.encodeMessage(msg)
}
}
}
@@ -20,98 +20,18 @@ import java.nio.file.Path
import java.time.Duration
import java.util.*
import java.util.regex.Pattern
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.Proxy
import org.pkl.core.module.PathElement
import org.pkl.core.messaging.Message
import org.pkl.core.messaging.Messages.*
import org.pkl.core.packages.Checksums
sealed interface Message {
val type: MessageType
}
sealed interface OneWayMessage : Message
sealed interface RequestMessage : Message {
val requestId: Long
}
sealed interface ResponseMessage : Message {
val requestId: Long
}
sealed class ClientMessage : Message
sealed class ClientRequestMessage : ClientMessage(), RequestMessage
sealed class ClientResponseMessage : ClientMessage(), ResponseMessage
sealed class ClientOneWayMessage : ClientMessage(), OneWayMessage
sealed class ServerMessage : Message
sealed class ServerRequestMessage : ServerMessage(), RequestMessage
sealed class ServerResponseMessage : ServerMessage(), ResponseMessage
sealed class ServerOneWayMessage : ServerMessage(), OneWayMessage
enum class MessageType(val code: Int) {
CREATE_EVALUATOR_REQUEST(0x20),
CREATE_EVALUATOR_RESPONSE(0x21),
CLOSE_EVALUATOR(0x22),
EVALUATE_REQUEST(0x23),
EVALUATE_RESPONSE(0x24),
LOG_MESSAGE(0x25),
READ_RESOURCE_REQUEST(0x26),
READ_RESOURCE_RESPONSE(0x27),
READ_MODULE_REQUEST(0x28),
READ_MODULE_RESPONSE(0x29),
LIST_RESOURCES_REQUEST(0x2a),
LIST_RESOURCES_RESPONSE(0x2b),
LIST_MODULES_REQUEST(0x2c),
LIST_MODULES_RESPONSE(0x2d),
}
data class ModuleReaderSpec(
val scheme: String,
val hasHierarchicalUris: Boolean,
val isLocal: Boolean,
val isGlobbable: Boolean
)
data class ResourceReaderSpec(
val scheme: String,
val hasHierarchicalUris: Boolean,
val isGlobbable: Boolean,
)
private fun <T> T?.equalsNullable(other: Any?): Boolean {
return Objects.equals(this, other)
}
enum class DependencyType(val value: String) {
LOCAL("local"),
REMOTE("remote")
}
sealed interface Dependency {
val type: DependencyType
val packageUri: URI?
}
data class RemoteDependency(override val packageUri: URI, val checksums: Checksums?) : Dependency {
override val type: DependencyType = DependencyType.REMOTE
}
data class Project(
val projectFileUri: URI,
override val packageUri: URI?,
val dependencies: Map<String, Dependency>
) : Dependency {
override val type: DependencyType = DependencyType.LOCAL
}
data class CreateEvaluatorRequest(
override val requestId: Long,
private val requestId: Long,
val allowedModules: List<Pattern>?,
val allowedResources: List<Pattern>?,
val clientModuleReaders: List<ModuleReaderSpec>?,
@@ -124,9 +44,14 @@ data class CreateEvaluatorRequest(
val cacheDir: Path?,
val outputFormat: String?,
val project: Project?,
val http: Http?
) : ClientRequestMessage() {
override val type = MessageType.CREATE_EVALUATOR_REQUEST
val http: Http?,
val externalModuleReaders: Map<String, ExternalReader>?,
val externalResourceReaders: Map<String, ExternalReader>?
) : Message.Client.Request {
override fun type(): Message.Type = Message.Type.CREATE_EVALUATOR_REQUEST
override fun requestId(): Long = requestId
// need to implement this manually because [Pattern.equals] returns false for two patterns
// that have the same underlying pattern string.
@@ -152,7 +77,9 @@ data class CreateEvaluatorRequest(
cacheDir.equalsNullable(other.cacheDir) &&
outputFormat.equalsNullable(other.outputFormat) &&
project.equalsNullable(other.project) &&
http.equalsNullable(other.http)
http.equalsNullable(other.http) &&
externalModuleReaders.equalsNullable(other.externalModuleReaders) &&
externalResourceReaders.equalsNullable(other.externalResourceReaders)
}
@Suppress("DuplicatedCode") // false duplicate within method
@@ -170,8 +97,9 @@ data class CreateEvaluatorRequest(
result = 31 * result + cacheDir.hashCode()
result = 31 * result + outputFormat.hashCode()
result = 31 * result + project.hashCode()
result = 31 * result + type.hashCode()
result = 31 * result + http.hashCode()
result = 31 * result + externalModuleReaders.hashCode()
result = 31 * result + externalResourceReaders.hashCode()
return result
}
}
@@ -200,69 +128,63 @@ data class Http(
}
}
enum class DependencyType(val value: String) {
LOCAL("local"),
REMOTE("remote")
}
sealed interface Dependency {
val type: DependencyType
val packageUri: URI?
}
data class RemoteDependency(override val packageUri: URI, val checksums: Checksums?) : Dependency {
override val type: DependencyType = DependencyType.REMOTE
}
data class Project(
val projectFileUri: URI,
override val packageUri: URI?,
val dependencies: Map<String, Dependency>
) : Dependency {
override val type: DependencyType = DependencyType.LOCAL
}
data class CreateEvaluatorResponse(
override val requestId: Long,
private val requestId: Long,
val evaluatorId: Long?,
val error: String?,
) : ServerResponseMessage() {
override val type
get() = MessageType.CREATE_EVALUATOR_RESPONSE
) : Message.Server.Response {
override fun type(): Message.Type = Message.Type.CREATE_EVALUATOR_RESPONSE
override fun requestId(): Long = requestId
}
data class ListResourcesRequest(override val requestId: Long, val evaluatorId: Long, val uri: URI) :
ServerRequestMessage() {
override val type: MessageType
get() = MessageType.LIST_RESOURCES_REQUEST
}
data class ListResourcesResponse(
override val requestId: Long,
val evaluatorId: Long,
val pathElements: List<PathElement>?,
val error: String?
) : ClientResponseMessage() {
override val type: MessageType
get() = MessageType.LIST_RESOURCES_RESPONSE
}
data class ListModulesRequest(override val requestId: Long, val evaluatorId: Long, val uri: URI) :
ServerRequestMessage() {
override val type: MessageType
get() = MessageType.LIST_MODULES_REQUEST
}
data class ListModulesResponse(
override val requestId: Long,
val evaluatorId: Long,
val pathElements: List<PathElement>?,
val error: String?
) : ClientResponseMessage() {
override val type: MessageType
get() = MessageType.LIST_MODULES_RESPONSE
}
data class CloseEvaluator(val evaluatorId: Long) : ClientOneWayMessage() {
override val type = MessageType.CLOSE_EVALUATOR
data class CloseEvaluator(val evaluatorId: Long) : Message.Client.OneWay {
override fun type(): Message.Type = Message.Type.CLOSE_EVALUATOR
}
data class EvaluateRequest(
override val requestId: Long,
private val requestId: Long,
val evaluatorId: Long,
val moduleUri: URI,
val moduleText: String?,
val expr: String?
) : ClientRequestMessage() {
override val type = MessageType.EVALUATE_REQUEST
) : Message.Client.Request {
override fun type(): Message.Type = Message.Type.EVALUATE_REQUEST
override fun requestId(): Long = requestId
}
data class EvaluateResponse(
override val requestId: Long,
private val requestId: Long,
val evaluatorId: Long,
val result: ByteArray?,
val error: String?
) : ServerResponseMessage() {
override val type
get() = MessageType.EVALUATE_RESPONSE
) : Message.Server.Response {
override fun type(): Message.Type = Message.Type.EVALUATE_RESPONSE
override fun requestId(): Long = requestId
// override to use [ByteArray.contentEquals]
@Suppress("DuplicatedCode")
@@ -291,58 +213,6 @@ data class LogMessage(
val level: Int,
val message: String,
val frameUri: String
) : ServerOneWayMessage() {
override val type
get() = MessageType.LOG_MESSAGE
}
data class ReadResourceRequest(override val requestId: Long, val evaluatorId: Long, val uri: URI) :
ServerRequestMessage() {
override val type
get() = MessageType.READ_RESOURCE_REQUEST
}
data class ReadResourceResponse(
override val requestId: Long,
val evaluatorId: Long,
val contents: ByteArray?,
val error: String?
) : ClientResponseMessage() {
override val type = MessageType.READ_RESOURCE_RESPONSE
// override to use [ByteArray.contentEquals]
@Suppress("DuplicatedCode")
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is ReadResourceResponse) return false
return requestId == other.requestId &&
evaluatorId == other.evaluatorId &&
contents.contentEquals(other.contents) &&
error == other.error
}
// override to use [ByteArray.contentHashCode]
override fun hashCode(): Int {
var result = requestId.hashCode()
result = 31 * result + evaluatorId.hashCode()
result = 31 * result + contents.contentHashCode()
result = 31 * result + error.hashCode()
return result
}
}
data class ReadModuleRequest(override val requestId: Long, val evaluatorId: Long, val uri: URI) :
ServerRequestMessage() {
override val type
get() = MessageType.READ_MODULE_REQUEST
}
data class ReadModuleResponse(
override val requestId: Long,
val evaluatorId: Long,
val contents: String?,
val error: String?
) : ClientResponseMessage() {
override val type = MessageType.READ_MODULE_RESPONSE
) : Message.Server.OneWay {
override fun type(): Message.Type = Message.Type.LOG_MESSAGE
}
@@ -20,6 +20,7 @@ import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import org.msgpack.core.MessageBufferPacker
import org.msgpack.core.MessagePack
import org.pkl.core.messaging.Message
internal fun log(msg: String) {
if (System.getenv("PKL_DEBUG") == "1") {
@@ -41,7 +42,7 @@ internal val threadLocalBufferPacker: ThreadLocal<MessageBufferPacker> =
private val threadLocalEncoder: ThreadLocal<(Message) -> ByteArray> =
ThreadLocal.withInitial {
val packer = threadLocalBufferPacker.get()
val encoder = MessageEncoders.into(packer);
val encoder = ServerMessagePackEncoder(packer);
{ message: Message ->
packer.clear()
encoder.encode(message)
@@ -25,10 +25,12 @@ import kotlin.io.path.outputStream
import kotlin.io.path.writeText
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
import org.msgpack.core.MessagePack
import org.pkl.commons.test.PackageServer
import org.pkl.core.messaging.Messages.*
import org.pkl.core.module.PathElement
abstract class AbstractServerTest {
@@ -55,8 +57,8 @@ abstract class AbstractServerTest {
@Test
fun `create and close evaluator`() {
val evaluatorId = client.sendCreateEvaluatorRequest(requestId = 123)
client.send(CloseEvaluator(evaluatorId = evaluatorId))
val evaluatorId = client.sendCreateEvaluatorRequest(123)
client.send(CloseEvaluator(evaluatorId))
}
@Test
@@ -66,24 +68,23 @@ abstract class AbstractServerTest {
client.send(
EvaluateRequest(
requestId = requestId,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText =
"""
requestId,
evaluatorId,
URI("repl:text"),
"""
foo {
bar = "bar"
}
"""
.trimIndent(),
expr = null
.trimIndent(),
null
)
)
val response = client.receive<EvaluateResponse>()
assertThat(response.error).isNull()
assertThat(response.result).isNotNull
assertThat(response.requestId).isEqualTo(requestId)
assertThat(response.requestId()).isEqualTo(requestId)
val unpacker = MessagePack.newDefaultUnpacker(response.result)
val value = unpacker.unpackValue()
@@ -96,15 +97,14 @@ abstract class AbstractServerTest {
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText =
"""
1,
evaluatorId,
URI("repl:text"),
"""
foo = trace(1 + 2 + 3)
"""
.trimIndent(),
expr = null
.trimIndent(),
null
)
)
@@ -121,18 +121,17 @@ abstract class AbstractServerTest {
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText =
"""
1,
evaluatorId,
URI("repl:text"),
"""
@Deprecated { message = "use bar instead" }
function foo() = 5
result = foo()
"""
.trimIndent(),
expr = null
.trimIndent(),
null
)
)
@@ -145,17 +144,16 @@ abstract class AbstractServerTest {
@Test
fun `read resource`() {
val reader =
ResourceReaderSpec(scheme = "bahumbug", hasHierarchicalUris = true, isGlobbable = false)
val reader = ResourceReaderSpec("bahumbug", true, false)
val evaluatorId = client.sendCreateEvaluatorRequest(resourceReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """res = read("bahumbug:/foo.pkl").text""",
expr = "res"
1,
evaluatorId,
URI("repl:text"),
"""res = read("bahumbug:/foo.pkl").text""",
"res"
)
)
@@ -165,10 +163,10 @@ abstract class AbstractServerTest {
client.send(
ReadResourceResponse(
requestId = readResourceMsg.requestId,
evaluatorId = evaluatorId,
contents = "my bahumbug".toByteArray(),
error = null
readResourceMsg.requestId,
evaluatorId,
"my bahumbug".toByteArray(),
null
)
)
@@ -180,10 +178,12 @@ abstract class AbstractServerTest {
assertThat(value.asStringValue().asString()).isEqualTo("my bahumbug")
}
@Disabled(
"Unable to construct ReadResourceResponse with null contents due to Kotlin compiler bug"
)
@Test
fun `read resource -- null contents and null error`() {
val reader =
ResourceReaderSpec(scheme = "bahumbug", hasHierarchicalUris = true, isGlobbable = false)
val reader = ResourceReaderSpec("bahumbug", true, false)
val evaluatorId = client.sendCreateEvaluatorRequest(resourceReaders = listOf(reader))
client.send(
@@ -200,14 +200,11 @@ abstract class AbstractServerTest {
assertThat(readResourceMsg.uri.toString()).isEqualTo("bahumbug:/foo.pkl")
assertThat(readResourceMsg.evaluatorId).isEqualTo(evaluatorId)
client.send(
ReadResourceResponse(
requestId = readResourceMsg.requestId,
evaluatorId = evaluatorId,
contents = null,
error = null
)
)
client.send(ReadResourceResponse(readResourceMsg.requestId, evaluatorId, byteArrayOf(), null))
// for this test to be correct this should actually be:
// client.send(ReadResourceResponse(readResourceMsg.requestId, evaluatorId, null, null))
// this should be evaluated again once https://github.com/apple/pkl/issues/698 is addressed
// see conversation here https://github.com/apple/pkl/pull/660#discussion_r1819545811
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.error).isNull()
@@ -219,17 +216,16 @@ abstract class AbstractServerTest {
@Test
fun `read resource error`() {
val reader =
ResourceReaderSpec(scheme = "bahumbug", hasHierarchicalUris = true, isGlobbable = false)
val reader = ResourceReaderSpec("bahumbug", true, false)
val evaluatorId = client.sendCreateEvaluatorRequest(resourceReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """res = read("bahumbug:/foo.txt").text""",
expr = "res"
1,
evaluatorId,
URI("repl:text"),
"""res = read("bahumbug:/foo.txt").text""",
"res"
)
)
@@ -237,10 +233,10 @@ abstract class AbstractServerTest {
client.send(
ReadResourceResponse(
requestId = readResourceMsg.requestId,
evaluatorId = evaluatorId,
contents = null,
error = "cannot read my bahumbug"
readResourceMsg.requestId,
evaluatorId,
byteArrayOf(),
"cannot read my bahumbug"
)
)
@@ -251,46 +247,44 @@ abstract class AbstractServerTest {
@Test
fun `glob resource`() {
val reader = ResourceReaderSpec(scheme = "bird", hasHierarchicalUris = true, isGlobbable = true)
val reader = ResourceReaderSpec("bird", true, true)
val evaluatorId = client.sendCreateEvaluatorRequest(resourceReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText =
"""
1,
evaluatorId,
URI("repl:text"),
"""
res = read*("bird:/**.txt").keys
"""
.trimIndent(),
expr = "res"
.trimIndent(),
"res"
)
)
val listResourcesRequest = client.receive<ListResourcesRequest>()
assertThat(listResourcesRequest.uri.toString()).isEqualTo("bird:/")
client.send(
ListResourcesResponse(
requestId = listResourcesRequest.requestId,
evaluatorId = listResourcesRequest.evaluatorId,
pathElements = listOf(PathElement("foo.txt", false), PathElement("subdir", true)),
error = null
listResourcesRequest.requestId,
listResourcesRequest.evaluatorId,
listOf(PathElement("foo.txt", false), PathElement("subdir", true)),
null
)
)
val listResourcesRequest2 = client.receive<ListResourcesRequest>()
assertThat(listResourcesRequest2.uri.toString()).isEqualTo("bird:/subdir/")
client.send(
ListResourcesResponse(
requestId = listResourcesRequest2.requestId,
evaluatorId = listResourcesRequest2.evaluatorId,
pathElements =
listOf(
PathElement("bar.txt", false),
),
error = null
listResourcesRequest2.requestId,
listResourcesRequest2.evaluatorId,
listOf(
PathElement("bar.txt", false),
),
null
)
)
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.result!!.debugYaml)
assertThat(evaluateResponse.result?.debugYaml)
.isEqualTo(
"""
- 6
@@ -304,32 +298,31 @@ abstract class AbstractServerTest {
@Test
fun `glob resources -- null pathElements and null error`() {
val reader = ResourceReaderSpec(scheme = "bird", hasHierarchicalUris = true, isGlobbable = true)
val reader = ResourceReaderSpec("bird", true, true)
val evaluatorId = client.sendCreateEvaluatorRequest(resourceReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText =
"""
1,
evaluatorId,
URI("repl:text"),
"""
res = read*("bird:/**.txt").keys
"""
.trimIndent(),
expr = "res"
.trimIndent(),
"res"
)
)
val listResourcesRequest = client.receive<ListResourcesRequest>()
client.send(
ListResourcesResponse(
requestId = listResourcesRequest.requestId,
evaluatorId = listResourcesRequest.evaluatorId,
pathElements = null,
error = null
listResourcesRequest.requestId,
listResourcesRequest.evaluatorId,
null,
null
)
)
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.result!!.debugYaml)
assertThat(evaluateResponse.result?.debugYaml)
.isEqualTo(
"""
- 6
@@ -341,29 +334,28 @@ abstract class AbstractServerTest {
@Test
fun `glob resource error`() {
val reader = ResourceReaderSpec(scheme = "bird", hasHierarchicalUris = true, isGlobbable = true)
val reader = ResourceReaderSpec("bird", true, true)
val evaluatorId = client.sendCreateEvaluatorRequest(resourceReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText =
"""
1,
evaluatorId,
URI("repl:text"),
"""
res = read*("bird:/**.txt").keys
"""
.trimIndent(),
expr = "res"
.trimIndent(),
"res"
)
)
val listResourcesRequest = client.receive<ListResourcesRequest>()
assertThat(listResourcesRequest.uri.toString()).isEqualTo("bird:/")
client.send(
ListResourcesResponse(
requestId = listResourcesRequest.requestId,
evaluatorId = listResourcesRequest.evaluatorId,
pathElements = null,
error = "didnt work"
listResourcesRequest.requestId,
listResourcesRequest.evaluatorId,
null,
"didnt work"
)
)
val evaluateResponse = client.receive<EvaluateResponse>()
@@ -389,22 +381,16 @@ abstract class AbstractServerTest {
@Test
fun `read module`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = true,
isGlobbable = false
)
val reader = ModuleReaderSpec("bird", true, true, false)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """res = import("bird:/pigeon.pkl").value""",
expr = "res"
1,
evaluatorId,
URI("repl:text"),
"""res = import("bird:/pigeon.pkl").value""",
"res"
)
)
@@ -412,14 +398,7 @@ abstract class AbstractServerTest {
assertThat(readModuleMsg.uri.toString()).isEqualTo("bird:/pigeon.pkl")
assertThat(readModuleMsg.evaluatorId).isEqualTo(evaluatorId)
client.send(
ReadModuleResponse(
requestId = readModuleMsg.requestId,
evaluatorId = evaluatorId,
contents = "value = 5",
error = null
)
)
client.send(ReadModuleResponse(readModuleMsg.requestId, evaluatorId, "value = 5", null))
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.error).isNull()
@@ -430,13 +409,7 @@ abstract class AbstractServerTest {
@Test
fun `read module -- null contents and null error`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = true,
isGlobbable = false
)
val reader = ModuleReaderSpec("bird", true, true, false)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
@@ -453,14 +426,7 @@ abstract class AbstractServerTest {
assertThat(readModuleMsg.uri.toString()).isEqualTo("bird:/pigeon.pkl")
assertThat(readModuleMsg.evaluatorId).isEqualTo(evaluatorId)
client.send(
ReadModuleResponse(
requestId = readModuleMsg.requestId,
evaluatorId = evaluatorId,
contents = null,
error = null
)
)
client.send(ReadModuleResponse(readModuleMsg.requestId, evaluatorId, null, null))
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.error).isNull()
@@ -473,22 +439,16 @@ abstract class AbstractServerTest {
@Test
fun `read module error`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = true,
isGlobbable = false
)
val reader = ModuleReaderSpec("bird", true, true, false)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """res = import("bird:/pigeon.pkl").value""",
expr = "res"
1,
evaluatorId,
URI("repl:text"),
"""res = import("bird:/pigeon.pkl").value""",
"res"
)
)
@@ -497,12 +457,7 @@ abstract class AbstractServerTest {
assertThat(readModuleMsg.evaluatorId).isEqualTo(evaluatorId)
client.send(
ReadModuleResponse(
requestId = readModuleMsg.requestId,
evaluatorId = evaluatorId,
contents = null,
error = "Don't know where Pigeon is"
)
ReadModuleResponse(readModuleMsg.requestId, evaluatorId, null, "Don't know where Pigeon is")
)
val evaluateResponse = client.receive<EvaluateResponse>()
@@ -511,22 +466,16 @@ abstract class AbstractServerTest {
@Test
fun `glob module`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = true,
isGlobbable = true
)
val reader = ModuleReaderSpec("bird", true, true, true)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """res = import*("bird:/**.pkl").keys""",
expr = "res"
1,
evaluatorId,
URI("repl:text"),
"""res = import*("bird:/**.pkl").keys""",
"res"
)
)
@@ -535,15 +484,14 @@ abstract class AbstractServerTest {
assertThat(listModulesMsg.uri.path).isEqualTo("/")
client.send(
ListModulesResponse(
requestId = listModulesMsg.requestId,
evaluatorId = evaluatorId,
pathElements =
listOf(
PathElement("birds", true),
PathElement("majesticBirds", true),
PathElement("Person.pkl", false)
),
error = null
listModulesMsg.requestId,
evaluatorId,
listOf(
PathElement("birds", true),
PathElement("majesticBirds", true),
PathElement("Person.pkl", false)
),
null
)
)
val listModulesMsg2 = client.receive<ListModulesRequest>()
@@ -551,14 +499,13 @@ abstract class AbstractServerTest {
assertThat(listModulesMsg2.uri.path).isEqualTo("/birds/")
client.send(
ListModulesResponse(
requestId = listModulesMsg2.requestId,
evaluatorId = listModulesMsg2.evaluatorId,
pathElements =
listOf(
PathElement("pigeon.pkl", false),
PathElement("parrot.pkl", false),
),
error = null
listModulesMsg2.requestId,
listModulesMsg2.evaluatorId,
listOf(
PathElement("pigeon.pkl", false),
PathElement("parrot.pkl", false),
),
null
)
)
val listModulesMsg3 = client.receive<ListModulesRequest>()
@@ -566,19 +513,18 @@ abstract class AbstractServerTest {
assertThat(listModulesMsg3.uri.path).isEqualTo("/majesticBirds/")
client.send(
ListModulesResponse(
requestId = listModulesMsg3.requestId,
evaluatorId = listModulesMsg3.evaluatorId,
pathElements =
listOf(
PathElement("barnOwl.pkl", false),
PathElement("elfOwl.pkl", false),
),
error = null
listModulesMsg3.requestId,
listModulesMsg3.evaluatorId,
listOf(
PathElement("barnOwl.pkl", false),
PathElement("elfOwl.pkl", false),
),
null
)
)
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.result!!.debugRendering)
assertThat(evaluateResponse.result?.debugRendering)
.isEqualTo(
"""
- 6
@@ -595,36 +541,23 @@ abstract class AbstractServerTest {
@Test
fun `glob module -- null pathElements and null error`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = true,
isGlobbable = true
)
val reader = ModuleReaderSpec("bird", true, true, true)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """res = import*("bird:/**.pkl").keys""",
expr = "res"
1,
evaluatorId,
URI("repl:text"),
"""res = import*("bird:/**.pkl").keys""",
"res"
)
)
val listModulesMsg = client.receive<ListModulesRequest>()
client.send(
ListModulesResponse(
requestId = listModulesMsg.requestId,
evaluatorId = evaluatorId,
pathElements = null,
error = null
)
)
client.send(ListModulesResponse(listModulesMsg.requestId, evaluatorId, null, null))
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.result!!.debugRendering)
assertThat(evaluateResponse.result?.debugRendering)
.isEqualTo(
"""
- 6
@@ -636,36 +569,23 @@ abstract class AbstractServerTest {
@Test
fun `glob module error`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = true,
isGlobbable = true
)
val reader = ModuleReaderSpec("bird", true, true, true)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """res = import*("bird:/**.pkl").keys""",
expr = "res"
1,
evaluatorId,
URI("repl:text"),
"""res = import*("bird:/**.pkl").keys""",
"res"
)
)
val listModulesMsg = client.receive<ListModulesRequest>()
assertThat(listModulesMsg.uri.scheme).isEqualTo("bird")
assertThat(listModulesMsg.uri.path).isEqualTo("/")
client.send(
ListModulesResponse(
requestId = listModulesMsg.requestId,
evaluatorId = evaluatorId,
pathElements = null,
error = "nope"
)
)
client.send(ListModulesResponse(listModulesMsg.requestId, evaluatorId, null, "nope"))
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.error)
.isEqualTo(
@@ -699,19 +619,13 @@ abstract class AbstractServerTest {
val evaluatorId = client.sendCreateEvaluatorRequest(modulePaths = listOf(jarFile))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("modulepath:/dir1/module.pkl"),
moduleText = null,
expr = "output.text"
)
EvaluateRequest(1, evaluatorId, URI("modulepath:/dir1/module.pkl"), null, "output.text")
)
val response = client.receive<EvaluateResponse>()
assertThat(response.error).isNull()
val tripleQuote = "\"\"\""
assertThat(response.result!!.debugYaml)
assertThat(response.result?.debugYaml)
.isEqualTo(
"""
|
@@ -741,38 +655,31 @@ abstract class AbstractServerTest {
@Test
fun `import triple-dot path`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = true,
isGlobbable = true
)
val reader = ModuleReaderSpec("bird", true, true, true)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("bird:/foo/bar/baz.pkl"),
moduleText =
"""
1,
evaluatorId,
URI("bird:/foo/bar/baz.pkl"),
"""
import ".../buz.pkl"
res = buz.res
"""
.trimIndent(),
expr = "res"
.trimIndent(),
"res"
)
)
val readModuleRequest = client.receive<ReadModuleRequest>()
assertThat(readModuleRequest.uri).isEqualTo(URI("bird:/foo/buz.pkl"))
client.send(
ReadModuleResponse(
requestId = readModuleRequest.requestId,
evaluatorId = readModuleRequest.evaluatorId,
contents = null,
error = "not here"
readModuleRequest.requestId,
readModuleRequest.evaluatorId,
null,
"not here"
)
)
@@ -780,54 +687,40 @@ abstract class AbstractServerTest {
assertThat(readModuleRequest2.uri).isEqualTo(URI("bird:/buz.pkl"))
client.send(
ReadModuleResponse(
requestId = readModuleRequest2.requestId,
evaluatorId = readModuleRequest2.evaluatorId,
contents = "res = 1",
error = null
readModuleRequest2.requestId,
readModuleRequest2.evaluatorId,
"res = 1",
null
)
)
val evaluatorResponse = client.receive<EvaluateResponse>()
assertThat(evaluatorResponse.result!!.debugYaml).isEqualTo("1")
assertThat(evaluatorResponse.result?.debugYaml).isEqualTo("1")
}
@Test
fun `evaluate error`() {
val evaluatorId = client.sendCreateEvaluatorRequest()
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("repl:text"),
moduleText = """foo = 1""",
expr = "foo as String"
)
)
client.send(EvaluateRequest(1, evaluatorId, URI("repl:text"), """foo = 1""", "foo as String"))
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.requestId).isEqualTo(1)
assertThat(evaluateResponse.requestId()).isEqualTo(1)
assertThat(evaluateResponse.error).contains("Expected value of type")
}
@Test
fun `evaluate client-provided module reader`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = false,
isGlobbable = false
)
val reader = ModuleReaderSpec("bird", true, false, false)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("bird:/pigeon.pkl"),
moduleText = null,
expr = "output.text",
1,
evaluatorId,
URI("bird:/pigeon.pkl"),
null,
"output.text",
)
)
@@ -836,22 +729,21 @@ abstract class AbstractServerTest {
client.send(
ReadModuleResponse(
requestId = readModuleRequest.requestId,
evaluatorId = evaluatorId,
contents =
"""
readModuleRequest.requestId,
evaluatorId,
"""
firstName = "Pigeon"
lastName = "Bird"
fullName = firstName + " " + lastName
"""
.trimIndent(),
error = null
.trimIndent(),
null
)
)
val evaluateResponse = client.receive<EvaluateResponse>()
assertThat(evaluateResponse.result).isNotNull
assertThat(evaluateResponse.result!!.debugYaml)
assertThat(evaluateResponse.result?.debugYaml)
.isEqualTo(
"""
|
@@ -865,33 +757,19 @@ abstract class AbstractServerTest {
@Test
fun `concurrent evaluations`() {
val reader =
ModuleReaderSpec(
scheme = "bird",
hasHierarchicalUris = true,
isLocal = false,
isGlobbable = false
)
val reader = ModuleReaderSpec("bird", true, false, false)
val evaluatorId = client.sendCreateEvaluatorRequest(moduleReaders = listOf(reader))
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = URI("bird:/pigeon.pkl"),
moduleText = null,
expr = "output.text",
1,
evaluatorId,
URI("bird:/pigeon.pkl"),
null,
"output.text",
)
)
client.send(
EvaluateRequest(
requestId = 2,
evaluatorId = evaluatorId,
moduleUri = URI("bird:/parrot.pkl"),
moduleText = null,
expr = "output.text"
)
)
client.send(EvaluateRequest(2, evaluatorId, URI("bird:/parrot.pkl"), null, "output.text"))
// evaluation is single-threaded; `parrot.pkl` gets evaluated after `pigeon.pkl` completes.
val response11 = client.receive<ReadModuleRequest>()
@@ -901,20 +779,19 @@ abstract class AbstractServerTest {
ReadModuleResponse(
response11.requestId,
evaluatorId,
contents =
"""
"""
firstName = "Pigeon"
lastName = "Bird"
fullName = firstName + " " + lastName
"""
.trimIndent(),
error = null
.trimIndent(),
null
)
)
val response12 = client.receive<EvaluateResponse>()
assertThat(response12.result).isNotNull
assertThat(response12.result!!.debugYaml)
assertThat(response12.result?.debugYaml)
.isEqualTo(
"""
|
@@ -932,20 +809,19 @@ abstract class AbstractServerTest {
ReadModuleResponse(
response21.requestId,
evaluatorId,
contents =
"""
"""
firstName = "Parrot"
lastName = "Bird"
fullName = firstName + " " + lastName
"""
.trimIndent(),
error = null
.trimIndent(),
null
)
)
val response22 = client.receive<EvaluateResponse>()
assertThat(response22.result).isNotNull
assertThat(response22.result!!.debugYaml)
assertThat(response22.result?.debugYaml)
.isEqualTo(
"""
|
@@ -1039,34 +915,32 @@ abstract class AbstractServerTest {
cacheDir = cacheDir,
project =
Project(
projectFileUri = projectDir.resolve("PklProject").toUri(),
packageUri = null,
dependencies =
mapOf(
"birds" to
RemoteDependency(packageUri = URI("package://localhost:0/birds@0.5.0"), null),
"lib" to
Project(
projectFileUri = libDir.toUri().resolve("PklProject"),
packageUri = URI("package://localhost:0/lib@5.0.0"),
dependencies = emptyMap()
)
)
projectDir.resolve("PklProject").toUri(),
null,
mapOf(
"birds" to RemoteDependency(URI("package://localhost:0/birds@0.5.0"), null),
"lib" to
Project(
libDir.toUri().resolve("PklProject"),
URI("package://localhost:0/lib@5.0.0"),
emptyMap()
)
)
)
)
client.send(
EvaluateRequest(
requestId = 1,
evaluatorId = evaluatorId,
moduleUri = module.toUri(),
moduleText = null,
expr = "output.text",
1,
evaluatorId,
module.toUri(),
null,
"output.text",
)
)
val resp2 = client.receive<EvaluateResponse>()
assertThat(resp2.error).isNull()
assertThat(resp2.result).isNotNull()
assertThat(resp2.result!!.debugRendering.trim())
assertThat(resp2.result?.debugRendering?.trim())
.isEqualTo(
"""
|
@@ -1098,26 +972,28 @@ abstract class AbstractServerTest {
): Long {
val message =
CreateEvaluatorRequest(
requestId = 123,
allowedResources = listOf(Pattern.compile(".*")),
allowedModules = listOf(Pattern.compile(".*")),
clientResourceReaders = resourceReaders,
clientModuleReaders = moduleReaders,
modulePaths = modulePaths,
env = mapOf(),
properties = mapOf(),
timeout = null,
rootDir = null,
cacheDir = cacheDir,
outputFormat = null,
project = project,
http = http
123,
listOf(Pattern.compile(".*")),
listOf(Pattern.compile(".*")),
moduleReaders,
resourceReaders,
modulePaths,
mapOf(),
mapOf(),
null,
null,
cacheDir,
null,
project,
http,
null,
null
)
send(message)
val response = receive<CreateEvaluatorResponse>()
assertThat(response.requestId).isEqualTo(requestId)
assertThat(response.requestId()).isEqualTo(requestId)
assertThat(response.evaluatorId).isNotNull
assertThat(response.error).isNull()
@@ -66,3 +66,6 @@ class BinaryEvaluatorSnippetTestEngine : InputOutputTestEngine() {
return true to bytes.debugRendering.stripFilePaths()
}
}
val ByteArray.debugRendering
get() = MessagePackDebugRenderer(this).output
@@ -19,17 +19,32 @@ import java.io.PipedInputStream
import java.io.PipedOutputStream
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.pkl.core.messaging.MessageTransport
import org.pkl.core.messaging.MessageTransports
import org.pkl.core.util.Pair
class JvmServerTest : AbstractServerTest() {
private val transports: Pair<MessageTransport, MessageTransport> = run {
if (USE_DIRECT_TRANSPORT) {
MessageTransports.direct()
MessageTransports.direct(::log)
} else {
val in1 = PipedInputStream()
val in1 =
PipedInputStream(10240) // use larger pipe size since large messages can be >1024 bytes
val out1 = PipedOutputStream(in1)
val in2 = PipedInputStream()
val out2 = PipedOutputStream(in2)
MessageTransports.stream(in1, out2) to MessageTransports.stream(in2, out1)
Pair.of(
MessageTransports.stream(
ServerMessagePackDecoder(in1),
ServerMessagePackEncoder(out2),
::log
),
MessageTransports.stream(
ServerMessagePackDecoder(in2),
ServerMessagePackEncoder(out1),
::log
)
)
}
}
@@ -105,6 +105,3 @@ class MessagePackDebugRenderer(bytes: ByteArray) {
sb.toString().removePrefix("\n")
}
}
val ByteArray.debugRendering
get() = MessagePackDebugRenderer(this).output
@@ -18,6 +18,7 @@ package org.pkl.server
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.pkl.commons.test.PklExecutablePaths
import org.pkl.core.messaging.MessageTransports
class NativeServerTest : AbstractServerTest() {
private lateinit var server: Process
@@ -27,7 +28,14 @@ class NativeServerTest : AbstractServerTest() {
fun beforeEach() {
val executable = PklExecutablePaths.firstExisting.toString()
server = ProcessBuilder(executable, "server").start()
client = TestTransport(MessageTransports.stream(server.inputStream, server.outputStream))
client =
TestTransport(
MessageTransports.stream(
ServerMessagePackDecoder(server.inputStream),
ServerMessagePackEncoder(server.outputStream)
) { _ ->
}
)
executor.execute { client.start() }
}
@@ -23,21 +23,25 @@ import java.time.Duration
import java.util.regex.Pattern
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.msgpack.core.MessagePack
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings
import org.pkl.core.module.PathElement
import org.pkl.core.evaluatorSettings.PklEvaluatorSettings.ExternalReader
import org.pkl.core.messaging.Message
import org.pkl.core.messaging.MessageDecoder
import org.pkl.core.messaging.MessageEncoder
import org.pkl.core.messaging.Messages.*
import org.pkl.core.packages.Checksums
class MessagePackCodecTest {
class ServerMessagePackCodecTest {
private val encoder: MessageEncoder
private val decoder: MessageDecoder
init {
val inputStream = PipedInputStream()
val inputStream =
PipedInputStream(10240) // use larger pipe size since large messages can be >1024 bytes
val outputStream = PipedOutputStream(inputStream)
encoder = MessagePackEncoder(MessagePack.newDefaultPacker(outputStream))
decoder = MessagePackDecoder(MessagePack.newDefaultUnpacker(inputStream))
encoder = ServerMessagePackEncoder(MessagePack.newDefaultPacker(outputStream))
decoder = ServerMessagePackDecoder(MessagePack.newDefaultUnpacker(inputStream))
}
private fun roundtrip(message: Message) {
@@ -50,31 +54,19 @@ class MessagePackCodecTest {
fun `round-trip CreateEvaluatorRequest`() {
val resourceReader1 =
ResourceReaderSpec(
scheme = "resourceReader1",
hasHierarchicalUris = true,
isGlobbable = true,
"resourceReader1",
true,
true,
)
val resourceReader2 =
ResourceReaderSpec(
scheme = "resourceReader2",
hasHierarchicalUris = true,
isGlobbable = false,
"resourceReader2",
true,
false,
)
val moduleReader1 =
ModuleReaderSpec(
scheme = "moduleReader1",
hasHierarchicalUris = true,
isGlobbable = true,
isLocal = true
)
val moduleReader2 =
ModuleReaderSpec(
scheme = "moduleReader2",
hasHierarchicalUris = true,
isGlobbable = false,
isLocal = false
)
@Suppress("HttpUrlsUsage")
val moduleReader1 = ModuleReaderSpec("moduleReader1", true, true, true)
val moduleReader2 = ModuleReaderSpec("moduleReader2", true, false, false)
val externalReader = ExternalReader("external-cmd", listOf("arg1", "arg2"))
roundtrip(
CreateEvaluatorRequest(
requestId = 123,
@@ -119,7 +111,9 @@ class MessagePackCodecTest {
Http(
proxy = PklEvaluatorSettings.Proxy(URI("http://foo.com:1234"), listOf("bar", "baz")),
caCertificates = byteArrayOf(1, 2, 3, 4)
)
),
externalModuleReaders = mapOf("external" to externalReader, "external2" to externalReader),
externalResourceReaders = mapOf("external" to externalReader),
)
)
}
@@ -170,113 +164,4 @@ class MessagePackCodecTest {
)
)
}
@Test
fun `round-trip ReadResourceRequest`() {
roundtrip(
ReadResourceRequest(requestId = 123, evaluatorId = 456, uri = URI("some/resource.json"))
)
}
@Test
fun `round-trip ReadResourceResponse`() {
roundtrip(
ReadResourceResponse(
requestId = 123,
evaluatorId = 456,
contents = byteArrayOf(1, 2, 3, 4, 5),
error = null
)
)
}
@Test
fun `round-trip ReadModuleRequest`() {
roundtrip(ReadModuleRequest(requestId = 123, evaluatorId = 456, uri = URI("some/module.pkl")))
}
@Test
fun `round-trip ReadModuleResponse`() {
roundtrip(
ReadModuleResponse(requestId = 123, evaluatorId = 456, contents = "x = 42", error = null)
)
}
@Test
fun `round-trip ListModulesRequest`() {
roundtrip(ListModulesRequest(requestId = 135, evaluatorId = 246, uri = URI("foo:/bar/baz/biz")))
}
@Test
fun `round-trip ListModulesResponse`() {
roundtrip(
ListModulesResponse(
requestId = 123,
evaluatorId = 234,
pathElements = listOf(PathElement("foo", true), PathElement("bar", false)),
error = null
)
)
roundtrip(
ListModulesResponse(
requestId = 123,
evaluatorId = 234,
pathElements = null,
error = "Something dun went wrong"
)
)
}
@Test
fun `round-trip ListResourcesRequest`() {
roundtrip(ListResourcesRequest(requestId = 987, evaluatorId = 1359, uri = URI("bar:/bazzy")))
}
@Test
fun `round-trip ListResourcesResponse`() {
roundtrip(
ListResourcesResponse(
requestId = 3851,
evaluatorId = 3019,
pathElements = listOf(PathElement("foo", true), PathElement("bar", false)),
error = null
)
)
roundtrip(
ListResourcesResponse(
requestId = 3851,
evaluatorId = 3019,
pathElements = null,
error = "something went wrong"
)
)
}
@Test
fun `decode request with missing request ID`() {
val bytes =
MessagePack.newDefaultBufferPacker()
.apply {
packArrayHeader(2)
packInt(MessageType.CREATE_EVALUATOR_REQUEST.code)
packMapHeader(1)
packString("clientResourceSchemes")
packArrayHeader(0)
}
.toByteArray()
val decoder = MessagePackDecoder(MessagePack.newDefaultUnpacker(bytes))
val exception = assertThrows<DecodeException> { decoder.decode() }
assertThat(exception.message).contains("requestId")
}
@Test
fun `decode invalid message header`() {
val bytes = MessagePack.newDefaultBufferPacker().apply { packInt(2) }.toByteArray()
val decoder = MessagePackDecoder(MessagePack.newDefaultUnpacker(bytes))
val exception = assertThrows<DecodeException> { decoder.decode() }
assertThat(exception).hasMessage("Malformed message header.")
assertThat(exception).hasRootCauseMessage("Expected Array, but got Integer (02)")
}
}
@@ -17,7 +17,9 @@ package org.pkl.server
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.BlockingQueue
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.*
import org.pkl.core.messaging.Message
import org.pkl.core.messaging.MessageTransport
class TestTransport(private val delegate: MessageTransport) : AutoCloseable {
val incomingMessages: BlockingQueue<Message> = ArrayBlockingQueue(10)
@@ -30,15 +32,15 @@ class TestTransport(private val delegate: MessageTransport) : AutoCloseable {
delegate.close()
}
fun send(message: ClientOneWayMessage) {
fun send(message: Message.Client.OneWay) {
delegate.send(message)
}
fun send(message: ClientRequestMessage) {
fun send(message: Message.Client.Request) {
delegate.send(message) { incomingMessages.put(it) }
}
fun send(message: ClientResponseMessage) {
fun send(message: Message.Client.Response) {
delegate.send(message)
}