From bb22d55a5882eb04ed9ae31dd3c27859ec8bffab Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Tue, 25 Jun 2024 10:39:30 +0800 Subject: [PATCH] Initial commit --- velox/docs/functions/spark/array.rst | 39 ++++++++++++++++++++++++++- velox/docs/functions/spark/map.rst | 15 +++++++++++ velox/functions/sparksql/Register.cpp | 5 ++++ 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/velox/docs/functions/spark/array.rst b/velox/docs/functions/spark/array.rst index 8fa1ec7f028c..6af28105a244 100644 --- a/velox/docs/functions/spark/array.rst +++ b/velox/docs/functions/spark/array.rst @@ -15,6 +15,14 @@ Array Functions SELECT array(1, 2, 3); -- [1,2,3] +.. spark::function:: arrays_zip(array(T), array(U),..) -> array(row(T,U, ...)) + + Returns the merge of the given arrays, element-wise into a single array of rows. + The M-th element of the N-th argument will be the N-th field of the M-th output element. + If the arguments have an uneven length, missing values are filled with ``NULL`` :: + + SELECT arrays_zip(ARRAY[1, 2], ARRAY['1b', null, '3b']); -- [ROW(1, '1b'), ROW(2, null), ROW(null, '3b')] + .. spark:function:: array_contains(array(E), value) -> boolean Returns true if the array contains the value. :: @@ -124,6 +132,15 @@ Array Functions SELECT concat(array(1, 2, 3), array(4, 5), array(6)); -- [1, 2, 3, 4, 5, 6] +.. spark:function:: exists(array(T), function(T, boolean)) → boolean + + Returns whether at least one element of an array matches the given predicate. + + Returns true if one or more elements match the predicate; + Returns false if none of the elements matches (a special case is when the array is empty); + Returns NULL if the predicate function returns NULL for one or more elements and false for all other elements. + Throws an exception if the predicate fails for one or more elements and returns false or NULL for the rest. + .. spark:function:: filter(array(E), func) -> array(E) Filters the input array using the given predicate. :: @@ -132,7 +149,7 @@ Array Functions SELECT filter(array(0, 2, 3), (x, i) -> x > i); -- [2, 3] SELECT filter(array(0, null, 2, 3, null), x -> x IS NOT NULL); -- [0, 2, 3] -.. function:: flatten(array(array(E))) -> array(E) +.. spark:function:: flatten(array(array(E))) -> array(E) Transforms an array of arrays into a single array. Returns NULL if the input is NULL or any of the nested arrays is NULL. :: @@ -141,6 +158,15 @@ Array Functions SELECT flatten(array(array(1, 2), array(3, NULL))); -- [1, 2, 3, NULL] SELECT flatten(array(array(1, 2), NULL, array(3, 4))); -- NULL +.. spark:function:: forall(array(T), function(T, boolean)) → boolean + + Returns whether all elements of an array match the given predicate. + + Returns true if all the elements match the predicate (a special case is when the array is empty); + Returns false if one or more elements don’t match; + Returns NULL if the predicate function returns NULL for one or more elements and true for all other elements. + Throws an exception if the predicate fails for one or more elements and returns true or NULL for the rest. + .. spark:function:: get(array(E), index) -> E Returns an element of the array at the specified 0-based index. @@ -198,3 +224,14 @@ Array Functions SELECT transform(array(1, 2, 3), x -> x + 1); -- [2,3,4] SELECT transform(array(1, 2, 3), (x, i) -> x + i); -- [1,3,5] + +.. spark:function:: zip_with(array(T), array(U), function(T,U,R)) -> array(R) + + Merges the two given arrays, element-wise, into a single array using ``function``. + If one array is shorter, nulls are appended at the end to match the length of the + longer array, before applying ``function`` :: + + SELECT zip_with(ARRAY[1, 3, 5], ARRAY['a', 'b', 'c'], (x, y) -> (y, x)); -- [ROW('a', 1), ROW('b', 3), ROW('c', 5)] + SELECT zip_with(ARRAY[1, 2], ARRAY[3, 4], (x, y) -> x + y); -- [4, 6] + SELECT zip_with(ARRAY['a', 'b', 'c'], ARRAY['d', 'e', 'f'], (x, y) -> concat(x, y)); -- ['ad', 'be', 'cf'] + SELECT zip_with(ARRAY['a'], ARRAY['d', null, 'f'], (x, y) -> coalesce(x, y)); -- ['a', null, 'f'] diff --git a/velox/docs/functions/spark/map.rst b/velox/docs/functions/spark/map.rst index 798067756834..923de1b45309 100644 --- a/velox/docs/functions/spark/map.rst +++ b/velox/docs/functions/spark/map.rst @@ -41,6 +41,21 @@ Map Functions Returns all the values in the map ``x``. +.. spark:function:: map_zip_with(map(K,V1), map(K,V2), function(K,V1,V2,V3)) -> map(K,V3) + + Merges the two given maps into a single map by applying ``function`` to the pair of values with the same key. + For keys only presented in one map, NULL will be passed as the value for the missing key. :: + + SELECT map_zip_with(MAP(ARRAY[1, 2, 3], ARRAY['a', 'b', 'c']), -- {1 -> ad, 2 -> be, 3 -> cf} + MAP(ARRAY[1, 2, 3], ARRAY['d', 'e', 'f']), + (k, v1, v2) -> concat(v1, v2)); + SELECT map_zip_with(MAP(ARRAY['k1', 'k2'], ARRAY[1, 2]), -- {k1 -> ROW(1, null), k2 -> ROW(2, 4), k3 -> ROW(null, 9)} + MAP(ARRAY['k2', 'k3'], ARRAY[4, 9]), + (k, v1, v2) -> (v1, v2)); + SELECT map_zip_with(MAP(ARRAY['a', 'b', 'c'], ARRAY[1, 8, 27]), -- {a -> a1, b -> b4, c -> c9} + MAP(ARRAY['a', 'b', 'c'], ARRAY[1, 2, 3]), + (k, v1, v2) -> k || CAST(v1/v2 AS VARCHAR)); + .. spark:function:: size(map(K,V)) -> bigint :noindex: diff --git a/velox/functions/sparksql/Register.cpp b/velox/functions/sparksql/Register.cpp index 0c945a5931be..2dbeda471473 100644 --- a/velox/functions/sparksql/Register.cpp +++ b/velox/functions/sparksql/Register.cpp @@ -101,9 +101,14 @@ static void workAroundRegistrationMacro(const std::string& prefix) { VELOX_REGISTER_VECTOR_FUNCTION(udf_array_distinct, prefix + "array_distinct"); VELOX_REGISTER_VECTOR_FUNCTION(udf_array_except, prefix + "array_except"); VELOX_REGISTER_VECTOR_FUNCTION(udf_array_position, prefix + "array_position"); + VELOX_REGISTER_VECTOR_FUNCTION(udf_zip_with, prefix + "zip_with"); + VELOX_REGISTER_VECTOR_FUNCTION(udf_all_match, prefix + "forall"); + VELOX_REGISTER_VECTOR_FUNCTION(udf_any_match, prefix + "exists"); + VELOX_REGISTER_VECTOR_FUNCTION(udf_zip, prefix + "arrays_zip"); VELOX_REGISTER_VECTOR_FUNCTION(udf_map_entries, prefix + "map_entries"); VELOX_REGISTER_VECTOR_FUNCTION(udf_map_keys, prefix + "map_keys"); VELOX_REGISTER_VECTOR_FUNCTION(udf_map_values, prefix + "map_values"); + VELOX_REGISTER_VECTOR_FUNCTION(udf_map_zip_with, prefix + "map_zip_with"); // This is the semantics of spark.sql.ansi.enabled = false. registerElementAtFunction(prefix + "element_at", true);