diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 6c784f5373f29..6ca0ef513530b 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -66,7 +66,7 @@ class SharedArbitrator : public memory::MemoryArbitrator { static constexpr std::string_view kMemoryPoolTransferCapacity{ "memory-pool-transfer-capacity"}; static constexpr std::string_view kDefaultMemoryPoolTransferCapacity{ - "128MB"}; + "0B"}; static uint64_t getMemoryPoolTransferCapacity( const std::unordered_map& configs); diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 61a956d7dde62..2e71aa4390a05 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -144,6 +144,7 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { ASSERT_FALSE(manager.arbitrator()->growCapacity(rootPool.get(), 6 << 20)); ASSERT_EQ(rootPool->capacity(), 1 << 20); ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 2 << 20)); + ASSERT_TRUE(manager.arbitrator()->growCapacity(rootPool.get(), 1 << 20)); ASSERT_EQ(rootPool->capacity(), 4 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeCapacityBytes, 2 << 20); ASSERT_EQ(manager.arbitrator()->stats().freeReservedCapacityBytes, 2 << 20); @@ -154,19 +155,19 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { "Exceeded memory pool capacity after attempt to grow capacity through " "arbitration. Requestor pool name 'leaf-1.0', request size 7.00MB, " "memory pool capacity 4.00MB, memory pool max capacity 8.00MB"); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 1 << 20); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 0), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 1), 0); ASSERT_EQ(manager.arbitrator()->shrinkCapacity(rootPool.get(), 1), 0); - ASSERT_EQ(rootPool->capacity(), 3 << 20); + ASSERT_EQ(rootPool->capacity(), 4 << 20); static_cast(rootPool.get())->testingSetReservation(0); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(leafPool.get(), 1 << 20), 1 << 20); ASSERT_EQ( manager.arbitrator()->shrinkCapacity(rootPool.get(), 1 << 20), 1 << 20); - ASSERT_EQ(rootPool->capacity(), 1 << 20); - ASSERT_EQ(leafPool->capacity(), 1 << 20); - ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 1 << 20); + ASSERT_EQ(rootPool->capacity(), 2 << 20); + ASSERT_EQ(leafPool->capacity(), 2 << 20); + ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 2 << 20); ASSERT_EQ(rootPool->capacity(), 0); ASSERT_EQ(leafPool->capacity(), 0); } diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index cc32ef111791c..2f31092fe8e4c 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -558,7 +558,7 @@ TEST_F(MockSharedArbitrationTest, extraConfigs) { ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryPoolTransferCapacity( emptyConfigs), - 128 << 20); + 0); ASSERT_EQ( SharedArbitrator::ExtraConfig::getMemoryReclaimMaxWaitTimeMs( emptyConfigs), diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index da19a289eb2cc..0630c36a19ae7 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -59,8 +59,6 @@ std::unique_ptr createMemoryManager( options.extraArbitratorConfigs = { {std::string(ExtraConfig::kMemoryPoolInitialCapacity), folly::to(memoryPoolInitCapacity) + "B"}, - {std::string(ExtraConfig::kMemoryPoolTransferCapacity), - folly::to(memoryPoolTransferCapacity) + "B"}, {std::string(ExtraConfig::kMemoryReclaimMaxWaitTime), folly::to(maxReclaimWaitMs) + "ms"}, {std::string(ExtraConfig::kGlobalArbitrationEnabled), "true"}, diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/base.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/base.proto new file mode 100644 index 0000000000000..65e2493f83687 --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/base.proto @@ -0,0 +1,817 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +import "google/protobuf/any.proto"; +import "spark/connect/commands.proto"; +import "spark/connect/common.proto"; +import "spark/connect/expressions.proto"; +import "spark/connect/relations.proto"; +import "spark/connect/types.proto"; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; +option go_package = "internal/generated"; + +// A [[Plan]] is the structure that carries the runtime information for the execution from the +// client to the server. A [[Plan]] can either be of the type [[Relation]] which is a reference +// to the underlying logical plan or it can be of the [[Command]] type that is used to execute +// commands on the server. +message Plan { + oneof op_type { + Relation root = 1; + Command command = 2; + } +} + + + +// User Context is used to refer to one particular user session that is executing +// queries in the backend. +message UserContext { + string user_id = 1; + string user_name = 2; + + // To extend the existing user context message that is used to identify incoming requests, + // Spark Connect leverages the Any protobuf type that can be used to inject arbitrary other + // messages into this message. Extensions are stored as a `repeated` type to be able to + // handle multiple active extensions. + repeated google.protobuf.Any extensions = 999; +} + +// Request to perform plan analyze, optionally to explain the plan. +message AnalyzePlanRequest { + // (Required) + // + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string session_id = 1; + + // (Required) User context + UserContext user_context = 2; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 3; + + oneof analyze { + Schema schema = 4; + Explain explain = 5; + TreeString tree_string = 6; + IsLocal is_local = 7; + IsStreaming is_streaming = 8; + InputFiles input_files = 9; + SparkVersion spark_version = 10; + DDLParse ddl_parse = 11; + SameSemantics same_semantics = 12; + SemanticHash semantic_hash = 13; + Persist persist = 14; + Unpersist unpersist = 15; + GetStorageLevel get_storage_level = 16; + } + + message Schema { + // (Required) The logical plan to be analyzed. + Plan plan = 1; + } + + // Explains the input plan based on a configurable mode. + message Explain { + // (Required) The logical plan to be analyzed. + Plan plan = 1; + + // (Required) For analyzePlan rpc calls, configure the mode to explain plan in strings. + ExplainMode explain_mode = 2; + + // Plan explanation mode. + enum ExplainMode { + EXPLAIN_MODE_UNSPECIFIED = 0; + + // Generates only physical plan. + EXPLAIN_MODE_SIMPLE = 1; + + // Generates parsed logical plan, analyzed logical plan, optimized logical plan and physical plan. + // Parsed Logical plan is a unresolved plan that extracted from the query. Analyzed logical plans + // transforms which translates unresolvedAttribute and unresolvedRelation into fully typed objects. + // The optimized logical plan transforms through a set of optimization rules, resulting in the + // physical plan. + EXPLAIN_MODE_EXTENDED = 2; + + // Generates code for the statement, if any and a physical plan. + EXPLAIN_MODE_CODEGEN = 3; + + // If plan node statistics are available, generates a logical plan and also the statistics. + EXPLAIN_MODE_COST = 4; + + // Generates a physical plan outline and also node details. + EXPLAIN_MODE_FORMATTED = 5; + } + } + + message TreeString { + // (Required) The logical plan to be analyzed. + Plan plan = 1; + + // (Optional) Max level of the schema. + optional int32 level = 2; + } + + message IsLocal { + // (Required) The logical plan to be analyzed. + Plan plan = 1; + } + + message IsStreaming { + // (Required) The logical plan to be analyzed. + Plan plan = 1; + } + + message InputFiles { + // (Required) The logical plan to be analyzed. + Plan plan = 1; + } + + message SparkVersion { } + + message DDLParse { + // (Required) The DDL formatted string to be parsed. + string ddl_string = 1; + } + + + // Returns `true` when the logical query plans are equal and therefore return same results. + message SameSemantics { + // (Required) The plan to be compared. + Plan target_plan = 1; + + // (Required) The other plan to be compared. + Plan other_plan = 2; + } + + message SemanticHash { + // (Required) The logical plan to get a hashCode. + Plan plan = 1; + } + + message Persist { + // (Required) The logical plan to persist. + Relation relation = 1; + + // (Optional) The storage level. + optional StorageLevel storage_level = 2; + } + + message Unpersist { + // (Required) The logical plan to unpersist. + Relation relation = 1; + + // (Optional) Whether to block until all blocks are deleted. + optional bool blocking = 2; + } + + message GetStorageLevel { + // (Required) The logical plan to get the storage level. + Relation relation = 1; + } +} + +// Response to performing analysis of the query. Contains relevant metadata to be able to +// reason about the performance. +message AnalyzePlanResponse { + string session_id = 1; + + oneof result { + Schema schema = 2; + Explain explain = 3; + TreeString tree_string = 4; + IsLocal is_local = 5; + IsStreaming is_streaming = 6; + InputFiles input_files = 7; + SparkVersion spark_version = 8; + DDLParse ddl_parse = 9; + SameSemantics same_semantics = 10; + SemanticHash semantic_hash = 11; + Persist persist = 12; + Unpersist unpersist = 13; + GetStorageLevel get_storage_level = 14; + } + + message Schema { + DataType schema = 1; + } + + message Explain { + string explain_string = 1; + } + + message TreeString { + string tree_string = 1; + } + + message IsLocal { + bool is_local = 1; + } + + message IsStreaming { + bool is_streaming = 1; + } + + message InputFiles { + // A best-effort snapshot of the files that compose this Dataset + repeated string files = 1; + } + + message SparkVersion { + string version = 1; + } + + message DDLParse { + DataType parsed = 1; + } + + message SameSemantics { + bool result = 1; + } + + message SemanticHash { + int32 result = 1; + } + + message Persist { } + + message Unpersist { } + + message GetStorageLevel { + // (Required) The StorageLevel as a result of get_storage_level request. + StorageLevel storage_level = 1; + } +} + +// A request to be executed by the service. +message ExecutePlanRequest { + // (Required) + // + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string session_id = 1; + + // (Required) User context + // + // user_context.user_id and session+id both identify a unique remote spark session on the + // server side. + UserContext user_context = 2; + + // (Optional) + // Provide an id for this request. If not provided, it will be generated by the server. + // It is returned in every ExecutePlanResponse.operation_id of the ExecutePlan response stream. + // The id must be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + optional string operation_id = 6; + + // (Required) The logical plan to be executed / analyzed. + Plan plan = 3; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 4; + + // Repeated element for options that can be passed to the request. This element is currently + // unused but allows to pass in an extension value used for arbitrary options. + repeated RequestOption request_options = 5; + + message RequestOption { + oneof request_option { + ReattachOptions reattach_options = 1; + // Extension type for request options + google.protobuf.Any extension = 999; + } + } + + // Tags to tag the given execution with. + // Tags cannot contain ',' character and cannot be empty strings. + // Used by Interrupt with interrupt.tag. + repeated string tags = 7; +} + +// The response of a query, can be one or more for each request. Responses belonging to the +// same input query, carry the same `session_id`. +message ExecutePlanResponse { + string session_id = 1; + + // Identifies the ExecutePlan execution. + // If set by the client in ExecutePlanRequest.operationId, that value is returned. + // Otherwise generated by the server. + // It is an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string operation_id = 12; + + // Identified the response in the stream. + // The id is an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string response_id = 13; + + // Union type for the different response messages. + oneof response_type { + ArrowBatch arrow_batch = 2; + + // Special case for executing SQL commands. + SqlCommandResult sql_command_result = 5; + + // Response for a streaming query. + WriteStreamOperationStartResult write_stream_operation_start_result = 8; + + // Response for commands on a streaming query. + StreamingQueryCommandResult streaming_query_command_result = 9; + + // Response for 'SparkContext.resources'. + GetResourcesCommandResult get_resources_command_result = 10; + + // Response for commands on the streaming query manager. + StreamingQueryManagerCommandResult streaming_query_manager_command_result = 11; + + // Response type informing if the stream is complete in reattachable execution. + ResultComplete result_complete = 14; + + // Support arbitrary result objects. + google.protobuf.Any extension = 999; + } + + // Metrics for the query execution. Typically, this field is only present in the last + // batch of results and then represent the overall state of the query execution. + Metrics metrics = 4; + + // The metrics observed during the execution of the query plan. + repeated ObservedMetrics observed_metrics = 6; + + // (Optional) The Spark schema. This field is available when `collect` is called. + DataType schema = 7; + + // A SQL command returns an opaque Relation that can be directly used as input for the next + // call. + message SqlCommandResult { + Relation relation = 1; + } + + // Batch results of metrics. + message ArrowBatch { + int64 row_count = 1; + bytes data = 2; + } + + message Metrics { + + repeated MetricObject metrics = 1; + + message MetricObject { + string name = 1; + int64 plan_id = 2; + int64 parent = 3; + map execution_metrics = 4; + } + + message MetricValue { + string name = 1; + int64 value = 2; + string metric_type = 3; + } + } + + message ObservedMetrics { + string name = 1; + repeated Expression.Literal values = 2; + } + + message ResultComplete { + // If present, in a reattachable execution this means that after server sends onComplete, + // the execution is complete. If the server sends onComplete without sending a ResultComplete, + // it means that there is more, and the client should use ReattachExecute RPC to continue. + } +} + +// The key-value pair for the config request and response. +message KeyValue { + // (Required) The key. + string key = 1; + // (Optional) The value. + optional string value = 2; +} + +// Request to update or fetch the configurations. +message ConfigRequest { + // (Required) + // + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string session_id = 1; + + // (Required) User context + UserContext user_context = 2; + + // (Required) The operation for the config. + Operation operation = 3; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 4; + + message Operation { + oneof op_type { + Set set = 1; + Get get = 2; + GetWithDefault get_with_default = 3; + GetOption get_option = 4; + GetAll get_all = 5; + Unset unset = 6; + IsModifiable is_modifiable = 7; + } + } + + message Set { + // (Required) The config key-value pairs to set. + repeated KeyValue pairs = 1; + } + + message Get { + // (Required) The config keys to get. + repeated string keys = 1; + } + + message GetWithDefault { + // (Required) The config key-value paris to get. The value will be used as the default value. + repeated KeyValue pairs = 1; + } + + message GetOption { + // (Required) The config keys to get optionally. + repeated string keys = 1; + } + + message GetAll { + // (Optional) The prefix of the config key to get. + optional string prefix = 1; + } + + message Unset { + // (Required) The config keys to unset. + repeated string keys = 1; + } + + message IsModifiable { + // (Required) The config keys to check the config is modifiable. + repeated string keys = 1; + } +} + +// Response to the config request. +message ConfigResponse { + string session_id = 1; + + // (Optional) The result key-value pairs. + // + // Available when the operation is 'Get', 'GetWithDefault', 'GetOption', 'GetAll'. + // Also available for the operation 'IsModifiable' with boolean string "true" and "false". + repeated KeyValue pairs = 2; + + // (Optional) + // + // Warning messages for deprecated or unsupported configurations. + repeated string warnings = 3; +} + +// Request to transfer client-local artifacts. +message AddArtifactsRequest { + + // (Required) + // + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string session_id = 1; + + // User context + UserContext user_context = 2; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 6; + + // A chunk of an Artifact. + message ArtifactChunk { + // Data chunk. + bytes data = 1; + // CRC to allow server to verify integrity of the chunk. + int64 crc = 2; + } + + // An artifact that is contained in a single `ArtifactChunk`. + // Generally, this message represents tiny artifacts such as REPL-generated class files. + message SingleChunkArtifact { + // The name of the artifact is expected in the form of a "Relative Path" that is made up of a + // sequence of directories and the final file element. + // Examples of "Relative Path"s: "jars/test.jar", "classes/xyz.class", "abc.xyz", "a/b/X.jar". + // The server is expected to maintain the hierarchy of files as defined by their name. (i.e + // The relative path of the file on the server's filesystem will be the same as the name of + // the provided artifact) + string name = 1; + // A single data chunk. + ArtifactChunk data = 2; + } + + // A number of `SingleChunkArtifact` batched into a single RPC. + message Batch { + repeated SingleChunkArtifact artifacts = 1; + } + + // Signals the beginning/start of a chunked artifact. + // A large artifact is transferred through a payload of `BeginChunkedArtifact` followed by a + // sequence of `ArtifactChunk`s. + message BeginChunkedArtifact { + // Name of the artifact undergoing chunking. Follows the same conventions as the `name` in + // the `Artifact` message. + string name = 1; + // Total size of the artifact in bytes. + int64 total_bytes = 2; + // Number of chunks the artifact is split into. + // This includes the `initial_chunk`. + int64 num_chunks = 3; + // The first/initial chunk. + ArtifactChunk initial_chunk = 4; + } + + // The payload is either a batch of artifacts or a partial chunk of a large artifact. + oneof payload { + Batch batch = 3; + // The metadata and the initial chunk of a large artifact chunked into multiple requests. + // The server side is notified about the total size of the large artifact as well as the + // number of chunks to expect. + BeginChunkedArtifact begin_chunk = 4; + // A chunk of an artifact excluding metadata. This can be any chunk of a large artifact + // excluding the first chunk (which is included in `BeginChunkedArtifact`). + ArtifactChunk chunk = 5; + } +} + +// Response to adding an artifact. Contains relevant metadata to verify successful transfer of +// artifact(s). +message AddArtifactsResponse { + // Metadata of an artifact. + message ArtifactSummary { + string name = 1; + // Whether the CRC (Cyclic Redundancy Check) is successful on server verification. + // The server discards any artifact that fails the CRC. + // If false, the client may choose to resend the artifact specified by `name`. + bool is_crc_successful = 2; + } + + // The list of artifact(s) seen by the server. + repeated ArtifactSummary artifacts = 1; +} + +// Request to get current statuses of artifacts at the server side. +message ArtifactStatusesRequest { + // (Required) + // + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string session_id = 1; + + // User context + UserContext user_context = 2; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 3; + + // The name of the artifact is expected in the form of a "Relative Path" that is made up of a + // sequence of directories and the final file element. + // Examples of "Relative Path"s: "jars/test.jar", "classes/xyz.class", "abc.xyz", "a/b/X.jar". + // The server is expected to maintain the hierarchy of files as defined by their name. (i.e + // The relative path of the file on the server's filesystem will be the same as the name of + // the provided artifact) + repeated string names = 4; +} + +// Response to checking artifact statuses. +message ArtifactStatusesResponse { + message ArtifactStatus { + // Exists or not particular artifact at the server. + bool exists = 1; + } + + // A map of artifact names to their statuses. + map statuses = 1; +} + +message InterruptRequest { + // (Required) + // + // The session_id specifies a spark session for a user id (which is specified + // by user_context.user_id). The session_id is set by the client to be able to + // collate streaming responses from different queries within the dedicated session. + // The id should be an UUID string of the format `00112233-4455-6677-8899-aabbccddeeff` + string session_id = 1; + + // (Required) User context + UserContext user_context = 2; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 3; + + // (Required) The type of interrupt to execute. + InterruptType interrupt_type = 4; + + enum InterruptType { + INTERRUPT_TYPE_UNSPECIFIED = 0; + + // Interrupt all running executions within the session with the provided session_id. + INTERRUPT_TYPE_ALL = 1; + + // Interrupt all running executions within the session with the provided operation_tag. + INTERRUPT_TYPE_TAG = 2; + + // Interrupt the running execution within the session with the provided operation_id. + INTERRUPT_TYPE_OPERATION_ID = 3; + } + + oneof interrupt { + // if interrupt_tag == INTERRUPT_TYPE_TAG, interrupt operation with this tag. + string operation_tag = 5; + + // if interrupt_tag == INTERRUPT_TYPE_OPERATION_ID, interrupt operation with this operation_id. + string operation_id = 6; + } +} + +message InterruptResponse { + // Session id in which the interrupt was running. + string session_id = 1; + + // Operation ids of the executions which were interrupted. + repeated string interrupted_ids = 2; +} + +message ReattachOptions { + // If true, the request can be reattached to using ReattachExecute. + // ReattachExecute can be used either if the stream broke with a GRPC network error, + // or if the server closed the stream without sending a response with StreamStatus.complete=true. + // The server will keep a buffer of responses in case a response is lost, and + // ReattachExecute needs to back-track. + // + // If false, the execution response stream will will not be reattachable, and all responses are + // immediately released by the server after being sent. + bool reattachable = 1; +} + +message ReattachExecuteRequest { + // (Required) + // + // The session_id of the request to reattach to. + // This must be an id of existing session. + string session_id = 1; + + // (Required) User context + // + // user_context.user_id and session+id both identify a unique remote spark session on the + // server side. + UserContext user_context = 2; + + // (Required) + // Provide an id of the request to reattach to. + // This must be an id of existing operation. + string operation_id = 3; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 4; + + // (Optional) + // Last already processed response id from the response stream. + // After reattach, server will resume the response stream after that response. + // If not specified, server will restart the stream from the start. + // + // Note: server controls the amount of responses that it buffers and it may drop responses, + // that are far behind the latest returned response, so this can't be used to arbitrarily + // scroll back the cursor. If the response is no longer available, this will result in an error. + optional string last_response_id = 5; +} + +message ReleaseExecuteRequest { + // (Required) + // + // The session_id of the request to reattach to. + // This must be an id of existing session. + string session_id = 1; + + // (Required) User context + // + // user_context.user_id and session+id both identify a unique remote spark session on the + // server side. + UserContext user_context = 2; + + // (Required) + // Provide an id of the request to reattach to. + // This must be an id of existing operation. + string operation_id = 3; + + // Provides optional information about the client sending the request. This field + // can be used for language or version specific information and is only intended for + // logging purposes and will not be interpreted by the server. + optional string client_type = 4; + + // Release and close operation completely. + // This will also interrupt the query if it is running execution, and wait for it to be torn down. + message ReleaseAll {} + + // Release all responses from the operation response stream up to and including + // the response with the given by response_id. + // While server determines by itself how much of a buffer of responses to keep, client providing + // explicit release calls will help reduce resource consumption. + // Noop if response_id not found in cached responses. + message ReleaseUntil { + string response_id = 1; + } + + oneof release { + ReleaseAll release_all = 5; + ReleaseUntil release_until = 6; + } +} + +message ReleaseExecuteResponse { + // Session id in which the release was running. + string session_id = 1; + + // Operation id of the operation on which the release executed. + // If the operation couldn't be found (because e.g. it was concurrently released), will be unset. + // Otherwise, it will be equal to the operation_id from request. + optional string operation_id = 2; +} + +// Main interface for the SparkConnect service. +service SparkConnectService { + + // Executes a request that contains the query and returns a stream of [[Response]]. + // + // It is guaranteed that there is at least one ARROW batch returned even if the result set is empty. + rpc ExecutePlan(ExecutePlanRequest) returns (stream ExecutePlanResponse) {} + + // Analyzes a query and returns a [[AnalyzeResponse]] containing metadata about the query. + rpc AnalyzePlan(AnalyzePlanRequest) returns (AnalyzePlanResponse) {} + + // Update or fetch the configurations and returns a [[ConfigResponse]] containing the result. + rpc Config(ConfigRequest) returns (ConfigResponse) {} + + // Add artifacts to the session and returns a [[AddArtifactsResponse]] containing metadata about + // the added artifacts. + rpc AddArtifacts(stream AddArtifactsRequest) returns (AddArtifactsResponse) {} + + // Check statuses of artifacts in the session and returns them in a [[ArtifactStatusesResponse]] + rpc ArtifactStatus(ArtifactStatusesRequest) returns (ArtifactStatusesResponse) {} + + // Interrupts running executions + rpc Interrupt(InterruptRequest) returns (InterruptResponse) {} + + // Reattach to an existing reattachable execution. + // The ExecutePlan must have been started with ReattachOptions.reattachable=true. + // If the ExecutePlanResponse stream ends without a ResultComplete message, there is more to + // continue. If there is a ResultComplete, the client should use ReleaseExecute with + rpc ReattachExecute(ReattachExecuteRequest) returns (stream ExecutePlanResponse) {} + + // Release an reattachable execution, or parts thereof. + // The ExecutePlan must have been started with ReattachOptions.reattachable=true. + // Non reattachable executions are released automatically and immediately after the ExecutePlan + // RPC and ReleaseExecute may not be used. + rpc ReleaseExecute(ReleaseExecuteRequest) returns (ReleaseExecuteResponse) {} +} + diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/catalog.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/catalog.proto new file mode 100644 index 0000000000000..5b1b90b0087d0 --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/catalog.proto @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +import "spark/connect/common.proto"; +import "spark/connect/types.proto"; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; +option go_package = "internal/generated"; + +// Catalog messages are marked as unstable. +message Catalog { + oneof cat_type { + CurrentDatabase current_database = 1; + SetCurrentDatabase set_current_database = 2; + ListDatabases list_databases = 3; + ListTables list_tables = 4; + ListFunctions list_functions = 5; + ListColumns list_columns = 6; + GetDatabase get_database = 7; + GetTable get_table = 8; + GetFunction get_function = 9; + DatabaseExists database_exists = 10; + TableExists table_exists = 11; + FunctionExists function_exists = 12; + CreateExternalTable create_external_table = 13; + CreateTable create_table = 14; + DropTempView drop_temp_view = 15; + DropGlobalTempView drop_global_temp_view = 16; + RecoverPartitions recover_partitions = 17; + IsCached is_cached = 18; + CacheTable cache_table = 19; + UncacheTable uncache_table = 20; + ClearCache clear_cache = 21; + RefreshTable refresh_table = 22; + RefreshByPath refresh_by_path = 23; + CurrentCatalog current_catalog = 24; + SetCurrentCatalog set_current_catalog = 25; + ListCatalogs list_catalogs = 26; + } +} + +// See `spark.catalog.currentDatabase` +message CurrentDatabase { } + +// See `spark.catalog.setCurrentDatabase` +message SetCurrentDatabase { + // (Required) + string db_name = 1; +} + +// See `spark.catalog.listDatabases` +message ListDatabases { + // (Optional) The pattern that the database name needs to match + optional string pattern = 1; +} + +// See `spark.catalog.listTables` +message ListTables { + // (Optional) + optional string db_name = 1; + // (Optional) The pattern that the table name needs to match + optional string pattern = 2; +} + +// See `spark.catalog.listFunctions` +message ListFunctions { + // (Optional) + optional string db_name = 1; + // (Optional) The pattern that the function name needs to match + optional string pattern = 2; +} + +// See `spark.catalog.listColumns` +message ListColumns { + // (Required) + string table_name = 1; + // (Optional) + optional string db_name = 2; +} + +// See `spark.catalog.getDatabase` +message GetDatabase { + // (Required) + string db_name = 1; +} + +// See `spark.catalog.getTable` +message GetTable { + // (Required) + string table_name = 1; + // (Optional) + optional string db_name = 2; +} + +// See `spark.catalog.getFunction` +message GetFunction { + // (Required) + string function_name = 1; + // (Optional) + optional string db_name = 2; +} + +// See `spark.catalog.databaseExists` +message DatabaseExists { + // (Required) + string db_name = 1; +} + +// See `spark.catalog.tableExists` +message TableExists { + // (Required) + string table_name = 1; + // (Optional) + optional string db_name = 2; +} + +// See `spark.catalog.functionExists` +message FunctionExists { + // (Required) + string function_name = 1; + // (Optional) + optional string db_name = 2; +} + +// See `spark.catalog.createExternalTable` +message CreateExternalTable { + // (Required) + string table_name = 1; + // (Optional) + optional string path = 2; + // (Optional) + optional string source = 3; + // (Optional) + optional DataType schema = 4; + // Options could be empty for valid data source format. + // The map key is case insensitive. + map options = 5; +} + +// See `spark.catalog.createTable` +message CreateTable { + // (Required) + string table_name = 1; + // (Optional) + optional string path = 2; + // (Optional) + optional string source = 3; + // (Optional) + optional string description = 4; + // (Optional) + optional DataType schema = 5; + // Options could be empty for valid data source format. + // The map key is case insensitive. + map options = 6; +} + +// See `spark.catalog.dropTempView` +message DropTempView { + // (Required) + string view_name = 1; +} + +// See `spark.catalog.dropGlobalTempView` +message DropGlobalTempView { + // (Required) + string view_name = 1; +} + +// See `spark.catalog.recoverPartitions` +message RecoverPartitions { + // (Required) + string table_name = 1; +} + +// See `spark.catalog.isCached` +message IsCached { + // (Required) + string table_name = 1; +} + +// See `spark.catalog.cacheTable` +message CacheTable { + // (Required) + string table_name = 1; + + // (Optional) + optional StorageLevel storage_level = 2; +} + +// See `spark.catalog.uncacheTable` +message UncacheTable { + // (Required) + string table_name = 1; +} + +// See `spark.catalog.clearCache` +message ClearCache { } + +// See `spark.catalog.refreshTable` +message RefreshTable { + // (Required) + string table_name = 1; +} + +// See `spark.catalog.refreshByPath` +message RefreshByPath { + // (Required) + string path = 1; +} + +// See `spark.catalog.currentCatalog` +message CurrentCatalog { } + +// See `spark.catalog.setCurrentCatalog` +message SetCurrentCatalog { + // (Required) + string catalog_name = 1; +} + +// See `spark.catalog.listCatalogs` +message ListCatalogs { + // (Optional) The pattern that the catalog name needs to match + optional string pattern = 1; +} diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/commands.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/commands.proto new file mode 100644 index 0000000000000..49b25f099bf2e --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/commands.proto @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +import "google/protobuf/any.proto"; +import "spark/connect/common.proto"; +import "spark/connect/expressions.proto"; +import "spark/connect/relations.proto"; + +package spark.connect; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; +option go_package = "internal/generated"; + +// A [[Command]] is an operation that is executed by the server that does not directly consume or +// produce a relational result. +message Command { + oneof command_type { + CommonInlineUserDefinedFunction register_function = 1; + WriteOperation write_operation = 2; + CreateDataFrameViewCommand create_dataframe_view = 3; + WriteOperationV2 write_operation_v2 = 4; + SqlCommand sql_command = 5; + WriteStreamOperationStart write_stream_operation_start = 6; + StreamingQueryCommand streaming_query_command = 7; + GetResourcesCommand get_resources_command = 8; + StreamingQueryManagerCommand streaming_query_manager_command = 9; + CommonInlineUserDefinedTableFunction register_table_function = 10; + + // This field is used to mark extensions to the protocol. When plugins generate arbitrary + // Commands they can add them here. During the planning the correct resolution is done. + google.protobuf.Any extension = 999; + + } +} + +// A SQL Command is used to trigger the eager evaluation of SQL commands in Spark. +// +// When the SQL provide as part of the message is a command it will be immediately evaluated +// and the result will be collected and returned as part of a LocalRelation. If the result is +// not a command, the operation will simply return a SQL Relation. This allows the client to be +// almost oblivious to the server-side behavior. +message SqlCommand { + // (Required) SQL Query. + string sql = 1; + + // (Optional) A map of parameter names to literal expressions. + map args = 2; + + // (Optional) A sequence of literal expressions for positional parameters in the SQL query text. + repeated Expression.Literal pos_args = 3; +} + +// A command that can create DataFrame global temp view or local temp view. +message CreateDataFrameViewCommand { + // (Required) The relation that this view will be built on. + Relation input = 1; + + // (Required) View name. + string name = 2; + + // (Required) Whether this is global temp view or local temp view. + bool is_global = 3; + + // (Required) + // + // If true, and if the view already exists, updates it; if false, and if the view + // already exists, throws exception. + bool replace = 4; +} + +// As writes are not directly handled during analysis and planning, they are modeled as commands. +message WriteOperation { + // (Required) The output of the `input` relation will be persisted according to the options. + Relation input = 1; + + // (Optional) Format value according to the Spark documentation. Examples are: text, parquet, delta. + optional string source = 2; + + // (Optional) + // + // The destination of the write operation can be either a path or a table. + // If the destination is neither a path nor a table, such as jdbc and noop, + // the `save_type` should not be set. + oneof save_type { + string path = 3; + SaveTable table = 4; + } + + // (Required) the save mode. + SaveMode mode = 5; + + // (Optional) List of columns to sort the output by. + repeated string sort_column_names = 6; + + // (Optional) List of columns for partitioning. + repeated string partitioning_columns = 7; + + // (Optional) Bucketing specification. Bucketing must set the number of buckets and the columns + // to bucket by. + BucketBy bucket_by = 8; + + // (Optional) A list of configuration options. + map options = 9; + + message SaveTable { + // (Required) The table name. + string table_name = 1; + // (Required) The method to be called to write to the table. + TableSaveMethod save_method = 2; + + enum TableSaveMethod { + TABLE_SAVE_METHOD_UNSPECIFIED = 0; + TABLE_SAVE_METHOD_SAVE_AS_TABLE = 1; + TABLE_SAVE_METHOD_INSERT_INTO = 2; + } + } + + message BucketBy { + repeated string bucket_column_names = 1; + int32 num_buckets = 2; + } + + enum SaveMode { + SAVE_MODE_UNSPECIFIED = 0; + SAVE_MODE_APPEND = 1; + SAVE_MODE_OVERWRITE = 2; + SAVE_MODE_ERROR_IF_EXISTS = 3; + SAVE_MODE_IGNORE = 4; + } +} + +// As writes are not directly handled during analysis and planning, they are modeled as commands. +message WriteOperationV2 { + // (Required) The output of the `input` relation will be persisted according to the options. + Relation input = 1; + + // (Required) The destination of the write operation must be either a path or a table. + string table_name = 2; + + // (Optional) A provider for the underlying output data source. Spark's default catalog supports + // "parquet", "json", etc. + optional string provider = 3; + + // (Optional) List of columns for partitioning for output table created by `create`, + // `createOrReplace`, or `replace` + repeated Expression partitioning_columns = 4; + + // (Optional) A list of configuration options. + map options = 5; + + // (Optional) A list of table properties. + map table_properties = 6; + + // (Required) Write mode. + Mode mode = 7; + + enum Mode { + MODE_UNSPECIFIED = 0; + MODE_CREATE = 1; + MODE_OVERWRITE = 2; + MODE_OVERWRITE_PARTITIONS = 3; + MODE_APPEND = 4; + MODE_REPLACE = 5; + MODE_CREATE_OR_REPLACE = 6; + } + + // (Optional) A condition for overwrite saving mode + Expression overwrite_condition = 8; +} + +// Starts write stream operation as streaming query. Query ID and Run ID of the streaming +// query are returned. +message WriteStreamOperationStart { + + // (Required) The output of the `input` streaming relation will be written. + Relation input = 1; + + // The following fields directly map to API for DataStreamWriter(). + // Consult API documentation unless explicitly documented here. + + string format = 2; + map options = 3; + repeated string partitioning_column_names = 4; + + oneof trigger { + string processing_time_interval = 5; + bool available_now = 6; + bool once = 7; + string continuous_checkpoint_interval = 8; + } + + string output_mode = 9; + string query_name = 10; + + // The destination is optional. When set, it can be a path or a table name. + oneof sink_destination { + string path = 11; + string table_name = 12; + } + + StreamingForeachFunction foreach_writer = 13; + StreamingForeachFunction foreach_batch = 14; +} + +message StreamingForeachFunction { + oneof function { + PythonUDF python_function = 1; + ScalarScalaUDF scala_function = 2; + } +} + +message WriteStreamOperationStartResult { + + // (Required) Query instance. See `StreamingQueryInstanceId`. + StreamingQueryInstanceId query_id = 1; + + // An optional query name. + string name = 2; + + // TODO: How do we indicate errors? + // TODO: Consider adding status, last progress etc here. +} + +// A tuple that uniquely identifies an instance of streaming query run. It consists of `id` that +// persists across the streaming runs and `run_id` that changes between each run of the +// streaming query that resumes from the checkpoint. +message StreamingQueryInstanceId { + + // (Required) The unique id of this query that persists across restarts from checkpoint data. + // That is, this id is generated when a query is started for the first time, and + // will be the same every time it is restarted from checkpoint data. + string id = 1; + + // (Required) The unique id of this run of the query. That is, every start/restart of a query + // will generate a unique run_id. Therefore, every time a query is restarted from + // checkpoint, it will have the same `id` but different `run_id`s. + string run_id = 2; +} + +// Commands for a streaming query. +message StreamingQueryCommand { + + // (Required) Query instance. See `StreamingQueryInstanceId`. + StreamingQueryInstanceId query_id = 1; + + // See documentation for the corresponding API method in StreamingQuery. + oneof command { + // status() API. + bool status = 2; + // lastProgress() API. + bool last_progress = 3; + // recentProgress() API. + bool recent_progress = 4; + // stop() API. Stops the query. + bool stop = 5; + // processAllAvailable() API. Waits till all the available data is processed + bool process_all_available = 6; + // explain() API. Returns logical and physical plans. + ExplainCommand explain = 7; + // exception() API. Returns the exception in the query if any. + bool exception = 8; + // awaitTermination() API. Waits for the termination of the query. + AwaitTerminationCommand await_termination = 9; + } + + message ExplainCommand { + // TODO: Consider reusing Explain from AnalyzePlanRequest message. + // We can not do this right now since it base.proto imports this file. + bool extended = 1; + } + + message AwaitTerminationCommand { + optional int64 timeout_ms = 2; + } +} + +// Response for commands on a streaming query. +message StreamingQueryCommandResult { + // (Required) Query instance id. See `StreamingQueryInstanceId`. + StreamingQueryInstanceId query_id = 1; + + oneof result_type { + StatusResult status = 2; + RecentProgressResult recent_progress = 3; + ExplainResult explain = 4; + ExceptionResult exception = 5; + AwaitTerminationResult await_termination = 6; + } + + message StatusResult { + // See documentation for these Scala 'StreamingQueryStatus' struct + string status_message = 1; + bool is_data_available = 2; + bool is_trigger_active = 3; + bool is_active = 4; + } + + message RecentProgressResult { + // Progress reports as an array of json strings. + repeated string recent_progress_json = 5; + } + + message ExplainResult { + // Logical and physical plans as string + string result = 1; + } + + message ExceptionResult { + // (Optional) Exception message as string, maps to the return value of original + // StreamingQueryException's toString method + optional string exception_message = 1; + // (Optional) Exception error class as string + optional string error_class = 2; + // (Optional) Exception stack trace as string + optional string stack_trace = 3; + } + + message AwaitTerminationResult { + bool terminated = 1; + } +} + +// Commands for the streaming query manager. +message StreamingQueryManagerCommand { + + // See documentation for the corresponding API method in StreamingQueryManager. + oneof command { + // active() API, returns a list of active queries. + bool active = 1; + // get() API, returns the StreamingQuery identified by id. + string get_query = 2; + // awaitAnyTermination() API, wait until any query terminates or timeout. + AwaitAnyTerminationCommand await_any_termination = 3; + // resetTerminated() API. + bool reset_terminated = 4; + // addListener API. + StreamingQueryListenerCommand add_listener = 5; + // removeListener API. + StreamingQueryListenerCommand remove_listener = 6; + // listListeners() API, returns a list of streaming query listeners. + bool list_listeners = 7; + } + + message AwaitAnyTerminationCommand { + // (Optional) The waiting time in milliseconds to wait for any query to terminate. + optional int64 timeout_ms = 1; + } + + message StreamingQueryListenerCommand { + bytes listener_payload = 1; + optional PythonUDF python_listener_payload = 2; + string id = 3; + } +} + +// Response for commands on the streaming query manager. +message StreamingQueryManagerCommandResult { + oneof result_type { + ActiveResult active = 1; + StreamingQueryInstance query = 2; + AwaitAnyTerminationResult await_any_termination = 3; + bool reset_terminated = 4; + bool add_listener = 5; + bool remove_listener = 6; + ListStreamingQueryListenerResult list_listeners = 7; + } + + message ActiveResult { + repeated StreamingQueryInstance active_queries = 1; + } + + message StreamingQueryInstance { + // (Required) The id and runId of this query. + StreamingQueryInstanceId id = 1; + // (Optional) The name of this query. + optional string name = 2; + } + + message AwaitAnyTerminationResult { + bool terminated = 1; + } + + message StreamingQueryListenerInstance { + bytes listener_payload = 1; + } + + message ListStreamingQueryListenerResult { + // (Required) Reference IDs of listener instances. + repeated string listener_ids = 1; + } +} + +// Command to get the output of 'SparkContext.resources' +message GetResourcesCommand { } + +// Response for command 'GetResourcesCommand'. +message GetResourcesCommandResult { + map resources = 1; +} diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/common.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/common.proto new file mode 100644 index 0000000000000..5c538cf10825e --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/common.proto @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; +option go_package = "internal/generated"; + +// StorageLevel for persisting Datasets/Tables. +message StorageLevel { + // (Required) Whether the cache should use disk or not. + bool use_disk = 1; + // (Required) Whether the cache should use memory or not. + bool use_memory = 2; + // (Required) Whether the cache should use off-heap or not. + bool use_off_heap = 3; + // (Required) Whether the cached data is deserialized or not. + bool deserialized = 4; + // (Required) The number of replicas. + int32 replication = 5; +} + + +// ResourceInformation to hold information about a type of Resource. +// The corresponding class is 'org.apache.spark.resource.ResourceInformation' +message ResourceInformation { + // (Required) The name of the resource + string name = 1; + // (Required) An array of strings describing the addresses of the resource. + repeated string addresses = 2; +} diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/example_plugins.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/example_plugins.proto new file mode 100644 index 0000000000000..7ad171d2e8a1e --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/example_plugins.proto @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +import "spark/connect/relations.proto"; +import "spark/connect/expressions.proto"; +option go_package = "internal/generated"; + +package spark.connect; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; + +message ExamplePluginRelation { + Relation input = 1; + string custom_field = 2; + +} + +message ExamplePluginExpression { + Expression child = 1; + string custom_field = 2; +} + +message ExamplePluginCommand { + string custom_field = 1; +} \ No newline at end of file diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/expressions.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/expressions.proto new file mode 100644 index 0000000000000..557b9db912376 --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/expressions.proto @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +import "google/protobuf/any.proto"; +import "spark/connect/types.proto"; + +package spark.connect; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; +option go_package = "internal/generated"; + +// Expression used to refer to fields, functions and similar. This can be used everywhere +// expressions in SQL appear. +message Expression { + + oneof expr_type { + Literal literal = 1; + UnresolvedAttribute unresolved_attribute = 2; + UnresolvedFunction unresolved_function = 3; + ExpressionString expression_string = 4; + UnresolvedStar unresolved_star = 5; + Alias alias = 6; + Cast cast = 7; + UnresolvedRegex unresolved_regex = 8; + SortOrder sort_order = 9; + LambdaFunction lambda_function = 10; + Window window = 11; + UnresolvedExtractValue unresolved_extract_value = 12; + UpdateFields update_fields = 13; + UnresolvedNamedLambdaVariable unresolved_named_lambda_variable = 14; + CommonInlineUserDefinedFunction common_inline_user_defined_function = 15; + CallFunction call_function = 16; + + // This field is used to mark extensions to the protocol. When plugins generate arbitrary + // relations they can add them here. During the planning the correct resolution is done. + google.protobuf.Any extension = 999; + } + + + // Expression for the OVER clause or WINDOW clause. + message Window { + + // (Required) The window function. + Expression window_function = 1; + + // (Optional) The way that input rows are partitioned. + repeated Expression partition_spec = 2; + + // (Optional) Ordering of rows in a partition. + repeated SortOrder order_spec = 3; + + // (Optional) Window frame in a partition. + // + // If not set, it will be treated as 'UnspecifiedFrame'. + WindowFrame frame_spec = 4; + + // The window frame + message WindowFrame { + + // (Required) The type of the frame. + FrameType frame_type = 1; + + // (Required) The lower bound of the frame. + FrameBoundary lower = 2; + + // (Required) The upper bound of the frame. + FrameBoundary upper = 3; + + enum FrameType { + FRAME_TYPE_UNDEFINED = 0; + + // RowFrame treats rows in a partition individually. + FRAME_TYPE_ROW = 1; + + // RangeFrame treats rows in a partition as groups of peers. + // All rows having the same 'ORDER BY' ordering are considered as peers. + FRAME_TYPE_RANGE = 2; + } + + message FrameBoundary { + oneof boundary { + // CURRENT ROW boundary + bool current_row = 1; + + // UNBOUNDED boundary. + // For lower bound, it will be converted to 'UnboundedPreceding'. + // for upper bound, it will be converted to 'UnboundedFollowing'. + bool unbounded = 2; + + // This is an expression for future proofing. We are expecting literals on the server side. + Expression value = 3; + } + } + } + } + + // SortOrder is used to specify the data ordering, it is normally used in Sort and Window. + // It is an unevaluable expression and cannot be evaluated, so can not be used in Projection. + message SortOrder { + // (Required) The expression to be sorted. + Expression child = 1; + + // (Required) The sort direction, should be ASCENDING or DESCENDING. + SortDirection direction = 2; + + // (Required) How to deal with NULLs, should be NULLS_FIRST or NULLS_LAST. + NullOrdering null_ordering = 3; + + enum SortDirection { + SORT_DIRECTION_UNSPECIFIED = 0; + SORT_DIRECTION_ASCENDING = 1; + SORT_DIRECTION_DESCENDING = 2; + } + + enum NullOrdering { + SORT_NULLS_UNSPECIFIED = 0; + SORT_NULLS_FIRST = 1; + SORT_NULLS_LAST = 2; + } + } + + message Cast { + // (Required) the expression to be casted. + Expression expr = 1; + + // (Required) the data type that the expr to be casted to. + oneof cast_to_type { + DataType type = 2; + // If this is set, Server will use Catalyst parser to parse this string to DataType. + string type_str = 3; + } + } + + message Literal { + oneof literal_type { + DataType null = 1; + bytes binary = 2; + bool boolean = 3; + + int32 byte = 4; + int32 short = 5; + int32 integer = 6; + int64 long = 7; + float float = 10; + double double = 11; + Decimal decimal = 12; + + string string = 13; + + // Date in units of days since the UNIX epoch. + int32 date = 16; + // Timestamp in units of microseconds since the UNIX epoch. + int64 timestamp = 17; + // Timestamp in units of microseconds since the UNIX epoch (without timezone information). + int64 timestamp_ntz = 18; + + CalendarInterval calendar_interval = 19; + int32 year_month_interval = 20; + int64 day_time_interval = 21; + Array array = 22; + Map map = 23; + Struct struct = 24; + } + + message Decimal { + // the string representation. + string value = 1; + // The maximum number of digits allowed in the value. + // the maximum precision is 38. + optional int32 precision = 2; + // declared scale of decimal literal + optional int32 scale = 3; + } + + message CalendarInterval { + int32 months = 1; + int32 days = 2; + int64 microseconds = 3; + } + + message Array { + DataType element_type = 1; + repeated Literal elements = 2; + } + + message Map { + DataType key_type = 1; + DataType value_type = 2; + repeated Literal keys = 3; + repeated Literal values = 4; + } + + message Struct { + DataType struct_type = 1; + repeated Literal elements = 2; + } + } + + // An unresolved attribute that is not explicitly bound to a specific column, but the column + // is resolved during analysis by name. + message UnresolvedAttribute { + // (Required) An identifier that will be parsed by Catalyst parser. This should follow the + // Spark SQL identifier syntax. + string unparsed_identifier = 1; + + // (Optional) The id of corresponding connect plan. + optional int64 plan_id = 2; + } + + // An unresolved function is not explicitly bound to one explicit function, but the function + // is resolved during analysis following Sparks name resolution rules. + message UnresolvedFunction { + // (Required) name (or unparsed name for user defined function) for the unresolved function. + string function_name = 1; + + // (Optional) Function arguments. Empty arguments are allowed. + repeated Expression arguments = 2; + + // (Required) Indicate if this function should be applied on distinct values. + bool is_distinct = 3; + + // (Required) Indicate if this is a user defined function. + // + // When it is not a user defined function, Connect will use the function name directly. + // When it is a user defined function, Connect will parse the function name first. + bool is_user_defined_function = 4; + } + + // Expression as string. + message ExpressionString { + // (Required) A SQL expression that will be parsed by Catalyst parser. + string expression = 1; + } + + // UnresolvedStar is used to expand all the fields of a relation or struct. + message UnresolvedStar { + + // (Optional) The target of the expansion. + // + // If set, it should end with '.*' and will be parsed by 'parseAttributeName' + // in the server side. + optional string unparsed_target = 1; + } + + // Represents all of the input attributes to a given relational operator, for example in + // "SELECT `(id)?+.+` FROM ...". + message UnresolvedRegex { + // (Required) The column name used to extract column with regex. + string col_name = 1; + + // (Optional) The id of corresponding connect plan. + optional int64 plan_id = 2; + } + + // Extracts a value or values from an Expression + message UnresolvedExtractValue { + // (Required) The expression to extract value from, can be + // Map, Array, Struct or array of Structs. + Expression child = 1; + + // (Required) The expression to describe the extraction, can be + // key of Map, index of Array, field name of Struct. + Expression extraction = 2; + } + + // Add, replace or drop a field of `StructType` expression by name. + message UpdateFields { + // (Required) The struct expression. + Expression struct_expression = 1; + + // (Required) The field name. + string field_name = 2; + + // (Optional) The expression to add or replace. + // + // When not set, it means this field will be dropped. + Expression value_expression = 3; + } + + message Alias { + // (Required) The expression that alias will be added on. + Expression expr = 1; + + // (Required) a list of name parts for the alias. + // + // Scalar columns only has one name that presents. + repeated string name = 2; + + // (Optional) Alias metadata expressed as a JSON map. + optional string metadata = 3; + } + + message LambdaFunction { + // (Required) The lambda function. + // + // The function body should use 'UnresolvedAttribute' as arguments, the sever side will + // replace 'UnresolvedAttribute' with 'UnresolvedNamedLambdaVariable'. + Expression function = 1; + + // (Required) Function variables. Must contains 1 ~ 3 variables. + repeated Expression.UnresolvedNamedLambdaVariable arguments = 2; + } + + message UnresolvedNamedLambdaVariable { + + // (Required) a list of name parts for the variable. Must not be empty. + repeated string name_parts = 1; + } +} + +message CommonInlineUserDefinedFunction { + // (Required) Name of the user-defined function. + string function_name = 1; + // (Optional) Indicate if the user-defined function is deterministic. + bool deterministic = 2; + // (Optional) Function arguments. Empty arguments are allowed. + repeated Expression arguments = 3; + // (Required) Indicate the function type of the user-defined function. + oneof function { + PythonUDF python_udf = 4; + ScalarScalaUDF scalar_scala_udf = 5; + JavaUDF java_udf = 6; + } +} + +message PythonUDF { + // (Required) Output type of the Python UDF + DataType output_type = 1; + // (Required) EvalType of the Python UDF + int32 eval_type = 2; + // (Required) The encoded commands of the Python UDF + bytes command = 3; + // (Required) Python version being used in the client. + string python_ver = 4; +} + +message ScalarScalaUDF { + // (Required) Serialized JVM object containing UDF definition, input encoders and output encoder + bytes payload = 1; + // (Optional) Input type(s) of the UDF + repeated DataType inputTypes = 2; + // (Required) Output type of the UDF + DataType outputType = 3; + // (Required) True if the UDF can return null value + bool nullable = 4; +} + +message JavaUDF { + // (Required) Fully qualified name of Java class + string class_name = 1; + + // (Optional) Output type of the Java UDF + optional DataType output_type = 2; + + // (Required) Indicate if the Java user-defined function is an aggregate function + bool aggregate = 3; +} + +message CallFunction { + // (Required) Unparsed name of the SQL function. + string function_name = 1; + + // (Optional) Function arguments. Empty arguments are allowed. + repeated Expression arguments = 2; +} diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/relations.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/relations.proto new file mode 100644 index 0000000000000..f7f1315ede0f8 --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/relations.proto @@ -0,0 +1,1003 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +import "google/protobuf/any.proto"; +import "spark/connect/expressions.proto"; +import "spark/connect/types.proto"; +import "spark/connect/catalog.proto"; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; +option go_package = "internal/generated"; + +// The main [[Relation]] type. Fundamentally, a relation is a typed container +// that has exactly one explicit relation type set. +// +// When adding new relation types, they have to be registered here. +message Relation { + RelationCommon common = 1; + oneof rel_type { + Read read = 2; + Project project = 3; + Filter filter = 4; + Join join = 5; + SetOperation set_op = 6; + Sort sort = 7; + Limit limit = 8; + Aggregate aggregate = 9; + SQL sql = 10; + LocalRelation local_relation = 11; + Sample sample = 12; + Offset offset = 13; + Deduplicate deduplicate = 14; + Range range = 15; + SubqueryAlias subquery_alias = 16; + Repartition repartition = 17; + ToDF to_df = 18; + WithColumnsRenamed with_columns_renamed = 19; + ShowString show_string = 20; + Drop drop = 21; + Tail tail = 22; + WithColumns with_columns = 23; + Hint hint = 24; + Unpivot unpivot = 25; + ToSchema to_schema = 26; + RepartitionByExpression repartition_by_expression = 27; + MapPartitions map_partitions = 28; + CollectMetrics collect_metrics = 29; + Parse parse = 30; + GroupMap group_map = 31; + CoGroupMap co_group_map = 32; + WithWatermark with_watermark = 33; + ApplyInPandasWithState apply_in_pandas_with_state = 34; + HtmlString html_string = 35; + CachedLocalRelation cached_local_relation = 36; + CachedRemoteRelation cached_remote_relation = 37; + CommonInlineUserDefinedTableFunction common_inline_user_defined_table_function = 38; + + // NA functions + NAFill fill_na = 90; + NADrop drop_na = 91; + NAReplace replace = 92; + + // stat functions + StatSummary summary = 100; + StatCrosstab crosstab = 101; + StatDescribe describe = 102; + StatCov cov = 103; + StatCorr corr = 104; + StatApproxQuantile approx_quantile = 105; + StatFreqItems freq_items = 106; + StatSampleBy sample_by = 107; + + // Catalog API (experimental / unstable) + Catalog catalog = 200; + + // This field is used to mark extensions to the protocol. When plugins generate arbitrary + // relations they can add them here. During the planning the correct resolution is done. + google.protobuf.Any extension = 998; + Unknown unknown = 999; + } +} + +// Used for testing purposes only. +message Unknown {} + +// Common metadata of all relations. +message RelationCommon { + // (Required) Shared relation metadata. + string source_info = 1; + + // (Optional) A per-client globally unique id for a given connect plan. + optional int64 plan_id = 2; +} + +// Relation that uses a SQL query to generate the output. +message SQL { + // (Required) The SQL query. + string query = 1; + + // (Optional) A map of parameter names to literal expressions. + map args = 2; + + // (Optional) A sequence of literal expressions for positional parameters in the SQL query text. + repeated Expression.Literal pos_args = 3; +} + +// Relation that reads from a file / table or other data source. Does not have additional +// inputs. +message Read { + oneof read_type { + NamedTable named_table = 1; + DataSource data_source = 2; + } + + // (Optional) Indicates if this is a streaming read. + bool is_streaming = 3; + + message NamedTable { + // (Required) Unparsed identifier for the table. + string unparsed_identifier = 1; + + // Options for the named table. The map key is case insensitive. + map options = 2; + } + + message DataSource { + // (Optional) Supported formats include: parquet, orc, text, json, parquet, csv, avro. + // + // If not set, the value from SQL conf 'spark.sql.sources.default' will be used. + optional string format = 1; + + // (Optional) If not set, Spark will infer the schema. + // + // This schema string should be either DDL-formatted or JSON-formatted. + optional string schema = 2; + + // Options for the data source. The context of this map varies based on the + // data source format. This options could be empty for valid data source format. + // The map key is case insensitive. + map options = 3; + + // (Optional) A list of path for file-system backed data sources. + repeated string paths = 4; + + // (Optional) Condition in the where clause for each partition. + // + // This is only supported by the JDBC data source. + repeated string predicates = 5; + } +} + +// Projection of a bag of expressions for a given input relation. +// +// The input relation must be specified. +// The projected expression can be an arbitrary expression. +message Project { + // (Optional) Input relation is optional for Project. + // + // For example, `SELECT ABS(-1)` is valid plan without an input plan. + Relation input = 1; + + // (Required) A Project requires at least one expression. + repeated Expression expressions = 3; +} + +// Relation that applies a boolean expression `condition` on each row of `input` to produce +// the output result. +message Filter { + // (Required) Input relation for a Filter. + Relation input = 1; + + // (Required) A Filter must have a condition expression. + Expression condition = 2; +} + +// Relation of type [[Join]]. +// +// `left` and `right` must be present. +message Join { + // (Required) Left input relation for a Join. + Relation left = 1; + + // (Required) Right input relation for a Join. + Relation right = 2; + + // (Optional) The join condition. Could be unset when `using_columns` is utilized. + // + // This field does not co-exist with using_columns. + Expression join_condition = 3; + + // (Required) The join type. + JoinType join_type = 4; + + // Optional. using_columns provides a list of columns that should present on both sides of + // the join inputs that this Join will join on. For example A JOIN B USING col_name is + // equivalent to A JOIN B on A.col_name = B.col_name. + // + // This field does not co-exist with join_condition. + repeated string using_columns = 5; + + enum JoinType { + JOIN_TYPE_UNSPECIFIED = 0; + JOIN_TYPE_INNER = 1; + JOIN_TYPE_FULL_OUTER = 2; + JOIN_TYPE_LEFT_OUTER = 3; + JOIN_TYPE_RIGHT_OUTER = 4; + JOIN_TYPE_LEFT_ANTI = 5; + JOIN_TYPE_LEFT_SEMI = 6; + JOIN_TYPE_CROSS = 7; + } + + // (Optional) Only used by joinWith. Set the left and right join data types. + optional JoinDataType join_data_type = 6; + + message JoinDataType { + // If the left data type is a struct. + bool is_left_struct = 1; + // If the right data type is a struct. + bool is_right_struct = 2; + } +} + +// Relation of type [[SetOperation]] +message SetOperation { + // (Required) Left input relation for a Set operation. + Relation left_input = 1; + + // (Required) Right input relation for a Set operation. + Relation right_input = 2; + + // (Required) The Set operation type. + SetOpType set_op_type = 3; + + // (Optional) If to remove duplicate rows. + // + // True to preserve all results. + // False to remove duplicate rows. + optional bool is_all = 4; + + // (Optional) If to perform the Set operation based on name resolution. + // + // Only UNION supports this option. + optional bool by_name = 5; + + // (Optional) If to perform the Set operation and allow missing columns. + // + // Only UNION supports this option. + optional bool allow_missing_columns = 6; + + enum SetOpType { + SET_OP_TYPE_UNSPECIFIED = 0; + SET_OP_TYPE_INTERSECT = 1; + SET_OP_TYPE_UNION = 2; + SET_OP_TYPE_EXCEPT = 3; + } +} + +// Relation of type [[Limit]] that is used to `limit` rows from the input relation. +message Limit { + // (Required) Input relation for a Limit. + Relation input = 1; + + // (Required) the limit. + int32 limit = 2; +} + +// Relation of type [[Offset]] that is used to read rows staring from the `offset` on +// the input relation. +message Offset { + // (Required) Input relation for an Offset. + Relation input = 1; + + // (Required) the limit. + int32 offset = 2; +} + +// Relation of type [[Tail]] that is used to fetch `limit` rows from the last of the input relation. +message Tail { + // (Required) Input relation for an Tail. + Relation input = 1; + + // (Required) the limit. + int32 limit = 2; +} + +// Relation of type [[Aggregate]]. +message Aggregate { + // (Required) Input relation for a RelationalGroupedDataset. + Relation input = 1; + + // (Required) How the RelationalGroupedDataset was built. + GroupType group_type = 2; + + // (Required) Expressions for grouping keys + repeated Expression grouping_expressions = 3; + + // (Required) List of values that will be translated to columns in the output DataFrame. + repeated Expression aggregate_expressions = 4; + + // (Optional) Pivots a column of the current `DataFrame` and performs the specified aggregation. + Pivot pivot = 5; + + enum GroupType { + GROUP_TYPE_UNSPECIFIED = 0; + GROUP_TYPE_GROUPBY = 1; + GROUP_TYPE_ROLLUP = 2; + GROUP_TYPE_CUBE = 3; + GROUP_TYPE_PIVOT = 4; + } + + message Pivot { + // (Required) The column to pivot + Expression col = 1; + + // (Optional) List of values that will be translated to columns in the output DataFrame. + // + // Note that if it is empty, the server side will immediately trigger a job to collect + // the distinct values of the column. + repeated Expression.Literal values = 2; + } +} + +// Relation of type [[Sort]]. +message Sort { + // (Required) Input relation for a Sort. + Relation input = 1; + + // (Required) The ordering expressions + repeated Expression.SortOrder order = 2; + + // (Optional) if this is a global sort. + optional bool is_global = 3; +} + + +// Drop specified columns. +message Drop { + // (Required) The input relation. + Relation input = 1; + + // (Optional) columns to drop. + repeated Expression columns = 2; + + // (Optional) names of columns to drop. + repeated string column_names = 3; +} + + +// Relation of type [[Deduplicate]] which have duplicate rows removed, could consider either only +// the subset of columns or all the columns. +message Deduplicate { + // (Required) Input relation for a Deduplicate. + Relation input = 1; + + // (Optional) Deduplicate based on a list of column names. + // + // This field does not co-use with `all_columns_as_keys`. + repeated string column_names = 2; + + // (Optional) Deduplicate based on all the columns of the input relation. + // + // This field does not co-use with `column_names`. + optional bool all_columns_as_keys = 3; + + // (Optional) Deduplicate within the time range of watermark. + optional bool within_watermark = 4; +} + +// A relation that does not need to be qualified by name. +message LocalRelation { + // (Optional) Local collection data serialized into Arrow IPC streaming format which contains + // the schema of the data. + optional bytes data = 1; + + // (Optional) The schema of local data. + // It should be either a DDL-formatted type string or a JSON string. + // + // The server side will update the column names and data types according to this schema. + // If the 'data' is not provided, then this schema will be required. + optional string schema = 2; +} + +// A local relation that has been cached already. +message CachedLocalRelation { + // `userId` and `sessionId` fields are deleted since the server must always use the active + // session/user rather than arbitrary values provided by the client. It is never valid to access + // a local relation from a different session/user. + reserved 1, 2; + reserved "userId", "sessionId"; + + // (Required) A sha-256 hash of the serialized local relation in proto, see LocalRelation. + string hash = 3; +} + +// Represents a remote relation that has been cached on server. +message CachedRemoteRelation { + // (Required) ID of the remote related (assigned by the service). + string relation_id = 1; +} + +// Relation of type [[Sample]] that samples a fraction of the dataset. +message Sample { + // (Required) Input relation for a Sample. + Relation input = 1; + + // (Required) lower bound. + double lower_bound = 2; + + // (Required) upper bound. + double upper_bound = 3; + + // (Optional) Whether to sample with replacement. + optional bool with_replacement = 4; + + // (Optional) The random seed. + optional int64 seed = 5; + + // (Required) Explicitly sort the underlying plan to make the ordering deterministic or cache it. + // This flag is true when invoking `dataframe.randomSplit` to randomly splits DataFrame with the + // provided weights. Otherwise, it is false. + bool deterministic_order = 6; +} + +// Relation of type [[Range]] that generates a sequence of integers. +message Range { + // (Optional) Default value = 0 + optional int64 start = 1; + + // (Required) + int64 end = 2; + + // (Required) + int64 step = 3; + + // Optional. Default value is assigned by 1) SQL conf "spark.sql.leafNodeDefaultParallelism" if + // it is set, or 2) spark default parallelism. + optional int32 num_partitions = 4; +} + +// Relation alias. +message SubqueryAlias { + // (Required) The input relation of SubqueryAlias. + Relation input = 1; + + // (Required) The alias. + string alias = 2; + + // (Optional) Qualifier of the alias. + repeated string qualifier = 3; +} + +// Relation repartition. +message Repartition { + // (Required) The input relation of Repartition. + Relation input = 1; + + // (Required) Must be positive. + int32 num_partitions = 2; + + // (Optional) Default value is false. + optional bool shuffle = 3; +} + +// Compose the string representing rows for output. +// It will invoke 'Dataset.showString' to compute the results. +message ShowString { + // (Required) The input relation. + Relation input = 1; + + // (Required) Number of rows to show. + int32 num_rows = 2; + + // (Required) If set to more than 0, truncates strings to + // `truncate` characters and all cells will be aligned right. + int32 truncate = 3; + + // (Required) If set to true, prints output rows vertically (one line per column value). + bool vertical = 4; +} + +// Compose the string representing rows for output. +// It will invoke 'Dataset.htmlString' to compute the results. +message HtmlString { + // (Required) The input relation. + Relation input = 1; + + // (Required) Number of rows to show. + int32 num_rows = 2; + + // (Required) If set to more than 0, truncates strings to + // `truncate` characters and all cells will be aligned right. + int32 truncate = 3; +} + +// Computes specified statistics for numeric and string columns. +// It will invoke 'Dataset.summary' (same as 'StatFunctions.summary') +// to compute the results. +message StatSummary { + // (Required) The input relation. + Relation input = 1; + + // (Optional) Statistics from to be computed. + // + // Available statistics are: + // count + // mean + // stddev + // min + // max + // arbitrary approximate percentiles specified as a percentage (e.g. 75%) + // count_distinct + // approx_count_distinct + // + // If no statistics are given, this function computes 'count', 'mean', 'stddev', 'min', + // 'approximate quartiles' (percentiles at 25%, 50%, and 75%), and 'max'. + repeated string statistics = 2; +} + +// Computes basic statistics for numeric and string columns, including count, mean, stddev, min, +// and max. If no columns are given, this function computes statistics for all numerical or +// string columns. +message StatDescribe { + // (Required) The input relation. + Relation input = 1; + + // (Optional) Columns to compute statistics on. + repeated string cols = 2; +} + +// Computes a pair-wise frequency table of the given columns. Also known as a contingency table. +// It will invoke 'Dataset.stat.crosstab' (same as 'StatFunctions.crossTabulate') +// to compute the results. +message StatCrosstab { + // (Required) The input relation. + Relation input = 1; + + // (Required) The name of the first column. + // + // Distinct items will make the first item of each row. + string col1 = 2; + + // (Required) The name of the second column. + // + // Distinct items will make the column names of the DataFrame. + string col2 = 3; +} + +// Calculate the sample covariance of two numerical columns of a DataFrame. +// It will invoke 'Dataset.stat.cov' (same as 'StatFunctions.calculateCov') to compute the results. +message StatCov { + // (Required) The input relation. + Relation input = 1; + + // (Required) The name of the first column. + string col1 = 2; + + // (Required) The name of the second column. + string col2 = 3; +} + +// Calculates the correlation of two columns of a DataFrame. Currently only supports the Pearson +// Correlation Coefficient. It will invoke 'Dataset.stat.corr' (same as +// 'StatFunctions.pearsonCorrelation') to compute the results. +message StatCorr { + // (Required) The input relation. + Relation input = 1; + + // (Required) The name of the first column. + string col1 = 2; + + // (Required) The name of the second column. + string col2 = 3; + + // (Optional) Default value is 'pearson'. + // + // Currently only supports the Pearson Correlation Coefficient. + optional string method = 4; +} + +// Calculates the approximate quantiles of numerical columns of a DataFrame. +// It will invoke 'Dataset.stat.approxQuantile' (same as 'StatFunctions.approxQuantile') +// to compute the results. +message StatApproxQuantile { + // (Required) The input relation. + Relation input = 1; + + // (Required) The names of the numerical columns. + repeated string cols = 2; + + // (Required) A list of quantile probabilities. + // + // Each number must belong to [0, 1]. + // For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + repeated double probabilities = 3; + + // (Required) The relative target precision to achieve (greater than or equal to 0). + // + // If set to zero, the exact quantiles are computed, which could be very expensive. + // Note that values greater than 1 are accepted but give the same result as 1. + double relative_error = 4; +} + +// Finding frequent items for columns, possibly with false positives. +// It will invoke 'Dataset.stat.freqItems' (same as 'StatFunctions.freqItems') +// to compute the results. +message StatFreqItems { + // (Required) The input relation. + Relation input = 1; + + // (Required) The names of the columns to search frequent items in. + repeated string cols = 2; + + // (Optional) The minimum frequency for an item to be considered `frequent`. + // Should be greater than 1e-4. + optional double support = 3; +} + + +// Returns a stratified sample without replacement based on the fraction +// given on each stratum. +// It will invoke 'Dataset.stat.freqItems' (same as 'StatFunctions.freqItems') +// to compute the results. +message StatSampleBy { + // (Required) The input relation. + Relation input = 1; + + // (Required) The column that defines strata. + Expression col = 2; + + // (Required) Sampling fraction for each stratum. + // + // If a stratum is not specified, we treat its fraction as zero. + repeated Fraction fractions = 3; + + // (Optional) The random seed. + optional int64 seed = 5; + + message Fraction { + // (Required) The stratum. + Expression.Literal stratum = 1; + + // (Required) The fraction value. Must be in [0, 1]. + double fraction = 2; + } +} + + +// Replaces null values. +// It will invoke 'Dataset.na.fill' (same as 'DataFrameNaFunctions.fill') to compute the results. +// Following 3 parameter combinations are supported: +// 1, 'values' only contains 1 item, 'cols' is empty: +// replaces null values in all type-compatible columns. +// 2, 'values' only contains 1 item, 'cols' is not empty: +// replaces null values in specified columns. +// 3, 'values' contains more than 1 items, then 'cols' is required to have the same length: +// replaces each specified column with corresponding value. +message NAFill { + // (Required) The input relation. + Relation input = 1; + + // (Optional) Optional list of column names to consider. + repeated string cols = 2; + + // (Required) Values to replace null values with. + // + // Should contain at least 1 item. + // Only 4 data types are supported now: bool, long, double, string + repeated Expression.Literal values = 3; +} + + +// Drop rows containing null values. +// It will invoke 'Dataset.na.drop' (same as 'DataFrameNaFunctions.drop') to compute the results. +message NADrop { + // (Required) The input relation. + Relation input = 1; + + // (Optional) Optional list of column names to consider. + // + // When it is empty, all the columns in the input relation will be considered. + repeated string cols = 2; + + // (Optional) The minimum number of non-null and non-NaN values required to keep. + // + // When not set, it is equivalent to the number of considered columns, which means + // a row will be kept only if all columns are non-null. + // + // 'how' options ('all', 'any') can be easily converted to this field: + // - 'all' -> set 'min_non_nulls' 1; + // - 'any' -> keep 'min_non_nulls' unset; + optional int32 min_non_nulls = 3; +} + + +// Replaces old values with the corresponding values. +// It will invoke 'Dataset.na.replace' (same as 'DataFrameNaFunctions.replace') +// to compute the results. +message NAReplace { + // (Required) The input relation. + Relation input = 1; + + // (Optional) List of column names to consider. + // + // When it is empty, all the type-compatible columns in the input relation will be considered. + repeated string cols = 2; + + // (Optional) The value replacement mapping. + repeated Replacement replacements = 3; + + message Replacement { + // (Required) The old value. + // + // Only 4 data types are supported now: null, bool, double, string. + Expression.Literal old_value = 1; + + // (Required) The new value. + // + // Should be of the same data type with the old value. + Expression.Literal new_value = 2; + } +} + + +// Rename columns on the input relation by the same length of names. +message ToDF { + // (Required) The input relation of RenameColumnsBySameLengthNames. + Relation input = 1; + + // (Required) + // + // The number of columns of the input relation must be equal to the length + // of this field. If this is not true, an exception will be returned. + repeated string column_names = 2; +} + + +// Rename columns on the input relation by a map with name to name mapping. +message WithColumnsRenamed { + // (Required) The input relation. + Relation input = 1; + + + // (Required) + // + // Renaming column names of input relation from A to B where A is the map key + // and B is the map value. This is a no-op if schema doesn't contain any A. It + // does not require that all input relation column names to present as keys. + // duplicated B are not allowed. + map rename_columns_map = 2; +} + +// Adding columns or replacing the existing columns that have the same names. +message WithColumns { + // (Required) The input relation. + Relation input = 1; + + // (Required) + // + // Given a column name, apply the corresponding expression on the column. If column + // name exists in the input relation, then replace the column. If the column name + // does not exist in the input relation, then adds it as a new column. + // + // Only one name part is expected from each Expression.Alias. + // + // An exception is thrown when duplicated names are present in the mapping. + repeated Expression.Alias aliases = 2; +} + +message WithWatermark { + + // (Required) The input relation + Relation input = 1; + + // (Required) Name of the column containing event time. + string event_time = 2; + + // (Required) + string delay_threshold = 3; +} + +// Specify a hint over a relation. Hint should have a name and optional parameters. +message Hint { + // (Required) The input relation. + Relation input = 1; + + // (Required) Hint name. + // + // Supported Join hints include BROADCAST, MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL. + // + // Supported partitioning hints include COALESCE, REPARTITION, REPARTITION_BY_RANGE. + string name = 2; + + // (Optional) Hint parameters. + repeated Expression parameters = 3; +} + +// Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns set. +message Unpivot { + // (Required) The input relation. + Relation input = 1; + + // (Required) Id columns. + repeated Expression ids = 2; + + // (Optional) Value columns to unpivot. + optional Values values = 3; + + // (Required) Name of the variable column. + string variable_column_name = 4; + + // (Required) Name of the value column. + string value_column_name = 5; + + message Values { + repeated Expression values = 1; + } +} + +message ToSchema { + // (Required) The input relation. + Relation input = 1; + + // (Required) The user provided schema. + // + // The Sever side will update the dataframe with this schema. + DataType schema = 2; +} + +message RepartitionByExpression { + // (Required) The input relation. + Relation input = 1; + + // (Required) The partitioning expressions. + repeated Expression partition_exprs = 2; + + // (Optional) number of partitions, must be positive. + optional int32 num_partitions = 3; +} + +message MapPartitions { + // (Required) Input relation for a mapPartitions-equivalent API: mapInPandas, mapInArrow. + Relation input = 1; + + // (Required) Input user-defined function. + CommonInlineUserDefinedFunction func = 2; + + // (Optional) Whether to use barrier mode execution or not. + optional bool is_barrier = 3; +} + +message GroupMap { + // (Required) Input relation for Group Map API: apply, applyInPandas. + Relation input = 1; + + // (Required) Expressions for grouping keys. + repeated Expression grouping_expressions = 2; + + // (Required) Input user-defined function. + CommonInlineUserDefinedFunction func = 3; + + // (Optional) Expressions for sorting. Only used by Scala Sorted Group Map API. + repeated Expression sorting_expressions = 4; + + // Below fields are only used by (Flat)MapGroupsWithState + // (Optional) Input relation for initial State. + Relation initial_input = 5; + + // (Optional) Expressions for grouping keys of the initial state input relation. + repeated Expression initial_grouping_expressions = 6; + + // (Optional) True if MapGroupsWithState, false if FlatMapGroupsWithState. + optional bool is_map_groups_with_state = 7; + + // (Optional) The output mode of the function. + optional string output_mode = 8; + + // (Optional) Timeout configuration for groups that do not receive data for a while. + optional string timeout_conf = 9; +} + +message CoGroupMap { + // (Required) One input relation for CoGroup Map API - applyInPandas. + Relation input = 1; + + // Expressions for grouping keys of the first input relation. + repeated Expression input_grouping_expressions = 2; + + // (Required) The other input relation. + Relation other = 3; + + // Expressions for grouping keys of the other input relation. + repeated Expression other_grouping_expressions = 4; + + // (Required) Input user-defined function. + CommonInlineUserDefinedFunction func = 5; + + // (Optional) Expressions for sorting. Only used by Scala Sorted CoGroup Map API. + repeated Expression input_sorting_expressions = 6; + + // (Optional) Expressions for sorting. Only used by Scala Sorted CoGroup Map API. + repeated Expression other_sorting_expressions = 7; +} + +message ApplyInPandasWithState { + // (Required) Input relation for applyInPandasWithState. + Relation input = 1; + + // (Required) Expressions for grouping keys. + repeated Expression grouping_expressions = 2; + + // (Required) Input user-defined function. + CommonInlineUserDefinedFunction func = 3; + + // (Required) Schema for the output DataFrame. + string output_schema = 4; + + // (Required) Schema for the state. + string state_schema = 5; + + // (Required) The output mode of the function. + string output_mode = 6; + + // (Required) Timeout configuration for groups that do not receive data for a while. + string timeout_conf = 7; +} + +message CommonInlineUserDefinedTableFunction { + // (Required) Name of the user-defined table function. + string function_name = 1; + + // (Optional) Whether the user-defined table function is deterministic. + bool deterministic = 2; + + // (Optional) Function input arguments. Empty arguments are allowed. + repeated Expression arguments = 3; + + // (Required) Type of the user-defined table function. + oneof function { + PythonUDTF python_udtf = 4; + } +} + +message PythonUDTF { + // (Optional) Return type of the Python UDTF. + optional DataType return_type = 1; + + // (Required) EvalType of the Python UDTF. + int32 eval_type = 2; + + // (Required) The encoded commands of the Python UDTF. + bytes command = 3; + + // (Required) Python version being used in the client. + string python_ver = 4; +} + +// Collect arbitrary (named) metrics from a dataset. +message CollectMetrics { + // (Required) The input relation. + Relation input = 1; + + // (Required) Name of the metrics. + string name = 2; + + // (Required) The metric sequence. + repeated Expression metrics = 3; +} + +message Parse { + // (Required) Input relation to Parse. The input is expected to have single text column. + Relation input = 1; + // (Required) The expected format of the text. + ParseFormat format = 2; + + // (Optional) DataType representing the schema. If not set, Spark will infer the schema. + optional DataType schema = 3; + + // Options for the csv/json parser. The map key is case insensitive. + map options = 4; + enum ParseFormat { + PARSE_FORMAT_UNSPECIFIED = 0; + PARSE_FORMAT_CSV = 1; + PARSE_FORMAT_JSON = 2; + } +} diff --git a/velox/functions/sparksql/fuzzer/proto/spark/connect/types.proto b/velox/functions/sparksql/fuzzer/proto/spark/connect/types.proto new file mode 100644 index 0000000000000..43552381d28f8 --- /dev/null +++ b/velox/functions/sparksql/fuzzer/proto/spark/connect/types.proto @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = 'proto3'; + +package spark.connect; + +option java_multiple_files = true; +option java_package = "org.apache.spark.connect.proto"; +option go_package = "internal/generated"; + +// This message describes the logical [[DataType]] of something. It does not carry the value +// itself but only describes it. +message DataType { + oneof kind { + NULL null = 1; + + Binary binary = 2; + + Boolean boolean = 3; + + // Numeric types + Byte byte = 4; + Short short = 5; + Integer integer = 6; + Long long = 7; + + Float float = 8; + Double double = 9; + Decimal decimal = 10; + + // String types + String string = 11; + Char char = 12; + VarChar var_char = 13; + + // Datatime types + Date date = 14; + Timestamp timestamp = 15; + TimestampNTZ timestamp_ntz = 16; + + // Interval types + CalendarInterval calendar_interval = 17; + YearMonthInterval year_month_interval = 18; + DayTimeInterval day_time_interval = 19; + + // Complex types + Array array = 20; + Struct struct = 21; + Map map = 22; + + // UserDefinedType + UDT udt = 23; + + // UnparsedDataType + Unparsed unparsed = 24; + } + + message Boolean { + uint32 type_variation_reference = 1; + } + + message Byte { + uint32 type_variation_reference = 1; + } + + message Short { + uint32 type_variation_reference = 1; + } + + message Integer { + uint32 type_variation_reference = 1; + } + + message Long { + uint32 type_variation_reference = 1; + } + + message Float { + uint32 type_variation_reference = 1; + } + + message Double { + uint32 type_variation_reference = 1; + } + + message String { + uint32 type_variation_reference = 1; + } + + message Binary { + uint32 type_variation_reference = 1; + } + + message NULL { + uint32 type_variation_reference = 1; + } + + message Timestamp { + uint32 type_variation_reference = 1; + } + + message Date { + uint32 type_variation_reference = 1; + } + + message TimestampNTZ { + uint32 type_variation_reference = 1; + } + + message CalendarInterval { + uint32 type_variation_reference = 1; + } + + message YearMonthInterval { + optional int32 start_field = 1; + optional int32 end_field = 2; + uint32 type_variation_reference = 3; + } + + message DayTimeInterval { + optional int32 start_field = 1; + optional int32 end_field = 2; + uint32 type_variation_reference = 3; + } + + // Start compound types. + message Char { + int32 length = 1; + uint32 type_variation_reference = 2; + } + + message VarChar { + int32 length = 1; + uint32 type_variation_reference = 2; + } + + message Decimal { + optional int32 scale = 1; + optional int32 precision = 2; + uint32 type_variation_reference = 3; + } + + message StructField { + string name = 1; + DataType data_type = 2; + bool nullable = 3; + optional string metadata = 4; + } + + message Struct { + repeated StructField fields = 1; + uint32 type_variation_reference = 2; + } + + message Array { + DataType element_type = 1; + bool contains_null = 2; + uint32 type_variation_reference = 3; + } + + message Map { + DataType key_type = 1; + DataType value_type = 2; + bool value_contains_null = 3; + uint32 type_variation_reference = 4; + } + + message UDT { + string type = 1; + optional string jvm_class = 2; + optional string python_class = 3; + optional string serialized_python_class = 4; + DataType sql_type = 5; + } + + message Unparsed { + // (Required) The unparsed data type string + string data_type_string = 1; + } +}