Skip to content

Commit

Permalink
[CALCITE-5778] Add ARRAY_JOIN, ARRAYS_OVERLAP, ARRAYS_ZIP function (e…
Browse files Browse the repository at this point in the history
…nabled in Spark library)
  • Loading branch information
liuyongvs authored and NobiGo committed Jun 26, 2023
1 parent 95089c8 commit e969f87
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
import static org.apache.calcite.sql.fun.SqlInternalOperators.THROW_UNLESS;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ACOSH;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAYS_OVERLAP;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAYS_ZIP;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_AGG;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_APPEND;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_COMPACT;
Expand All @@ -126,6 +128,7 @@
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_DISTINCT;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_EXCEPT;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_INTERSECT;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_JOIN;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_LENGTH;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_MAX;
import static org.apache.calcite.sql.fun.SqlLibraryOperators.ARRAY_MIN;
Expand Down Expand Up @@ -725,6 +728,7 @@ Builder populate2() {
defineMethod(ARRAY_CONTAINS, BuiltInMethod.LIST_CONTAINS.method, NullPolicy.ANY);
defineMethod(ARRAY_DISTINCT, BuiltInMethod.ARRAY_DISTINCT.method, NullPolicy.STRICT);
defineMethod(ARRAY_EXCEPT, BuiltInMethod.ARRAY_EXCEPT.method, NullPolicy.ANY);
defineMethod(ARRAY_JOIN, "arrayToString", NullPolicy.STRICT);
defineMethod(ARRAY_INTERSECT, BuiltInMethod.ARRAY_INTERSECT.method, NullPolicy.ANY);
defineMethod(ARRAY_LENGTH, BuiltInMethod.COLLECTION_SIZE.method, NullPolicy.STRICT);
defineMethod(ARRAY_MAX, BuiltInMethod.ARRAY_MAX.method, NullPolicy.STRICT);
Expand All @@ -737,6 +741,8 @@ Builder populate2() {
defineMethod(ARRAY_SIZE, BuiltInMethod.COLLECTION_SIZE.method, NullPolicy.STRICT);
defineMethod(ARRAY_TO_STRING, "arrayToString", NullPolicy.STRICT);
defineMethod(ARRAY_UNION, BuiltInMethod.ARRAY_UNION.method, NullPolicy.ANY);
defineMethod(ARRAYS_OVERLAP, BuiltInMethod.ARRAYS_OVERLAP.method, NullPolicy.ANY);
defineMethod(ARRAYS_ZIP, BuiltInMethod.ARRAYS_ZIP.method, NullPolicy.ANY);
defineMethod(MAP_ENTRIES, BuiltInMethod.MAP_ENTRIES.method, NullPolicy.STRICT);
defineMethod(MAP_KEYS, BuiltInMethod.MAP_KEYS.method, NullPolicy.STRICT);
defineMethod(MAP_VALUES, BuiltInMethod.MAP_VALUES.method, NullPolicy.STRICT);
Expand Down
50 changes: 50 additions & 0 deletions core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3913,6 +3913,56 @@ private static AtomicLong getAtomicLong(String key) {
return atomic;
}

/** Support the ARRAYS_OVERLAP function. */
public static @Nullable Boolean arraysOverlap(List list1, List list2) {
if (list1.size() > list2.size()) {
return arraysOverlap(list2, list1);
}
final List smaller = list1;
final List bigger = list2;
boolean hasNull = false;
if (smaller.size() > 0 && bigger.size() > 0) {
final Set smallestSet = new HashSet(smaller);
hasNull = smallestSet.remove(null);
for (Object element : bigger) {
if (element == null) {
hasNull = true;
} else if (smallestSet.contains(element)) {
return true;
}
}
}
if (hasNull) {
return null;
} else {
return false;
}
}

/** Support the ARRAYS_ZIP function. */
@SuppressWarnings("argument.type.incompatible")
public static List arraysZip(List... lists) {
final int biggestCardinality = lists.length == 0
? 0
: Arrays.stream(lists).mapToInt(List::size).max().getAsInt();

final List result = new ArrayList(biggestCardinality);
for (int i = 0; i < biggestCardinality; i++) {
List<Object> row = new ArrayList<>();
Object value;
for (List list : lists) {
if (i < list.size() && list.get(i) != null) {
value = list.get(i);
} else {
value = null;
}
row.add(value);
}
result.add(row);
}
return result;
}

/** Support the ARRAY_COMPACT function. */
public static List compact(List list) {
final List result = new ArrayList();
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/calcite/sql/SqlKind.java
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,9 @@ public enum SqlKind {
/** {@code ARRAY_INTERSECT} function (Spark semantics). */
ARRAY_INTERSECT,

/** {@code ARRAY_JOIN} function (Spark semantics). */
ARRAY_JOIN,

/** {@code ARRAY_LENGTH} function (Spark semantics). */
ARRAY_LENGTH,

Expand Down Expand Up @@ -740,6 +743,12 @@ public enum SqlKind {
/** {@code ARRAY_UNION} function (Spark semantics). */
ARRAY_UNION,

/** {@code ARRAYS_OVERLAP} function (Spark semantics). */
ARRAYS_OVERLAP,

/** {@code ARRAYS_ZIP} function (Spark semantics). */
ARRAYS_ZIP,

/** {@code SORT_ARRAY} function (Spark semantics). */
SORT_ARRAY,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.calcite.sql.fun.SqlLibrary.ALL;
import static org.apache.calcite.sql.fun.SqlLibrary.BIG_QUERY;
Expand Down Expand Up @@ -1037,6 +1039,13 @@ private static RelDataType arrayAppendPrependReturnType(SqlOperatorBinding opBin
OperandTypes.SAME_SAME,
OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.ARRAY)));

/** The "ARRAY_JOIN(array, delimiter [, nullText ])" function. */
@LibraryOperator(libraries = {SPARK})
public static final SqlFunction ARRAY_JOIN =
SqlBasicFunction.create(SqlKind.ARRAY_JOIN,
ReturnTypes.VARCHAR_NULLABLE,
OperandTypes.STRING_ARRAY_CHARACTER_OPTIONAL_CHARACTER);

/** The "ARRAY_LENGTH(array)" function. */
@LibraryOperator(libraries = {BIG_QUERY})
public static final SqlFunction ARRAY_LENGTH =
Expand Down Expand Up @@ -1118,6 +1127,41 @@ private static RelDataType arrayAppendPrependReturnType(SqlOperatorBinding opBin
ReturnTypes.VARCHAR_NULLABLE,
OperandTypes.STRING_ARRAY_CHARACTER_OPTIONAL_CHARACTER);

/** The "ARRAYS_OVERLAP(array1, array2)" function (Spark). */
@LibraryOperator(libraries = {SPARK})
public static final SqlFunction ARRAYS_OVERLAP =
SqlBasicFunction.create(SqlKind.ARRAYS_OVERLAP,
ReturnTypes.BOOLEAN_NULLABLE.andThen(SqlTypeTransforms.COLLECTION_ELEMENT_TYPE_NULLABLE),
OperandTypes.and(
OperandTypes.SAME_SAME,
OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.ARRAY)));

private static RelDataType deriveTypeArraysZip(SqlOperatorBinding opBinding) {
final List<RelDataType> argComponentTypes = new ArrayList<>();
for (RelDataType arrayType : opBinding.collectOperandTypes()) {
final RelDataType componentType = requireNonNull(arrayType.getComponentType());
argComponentTypes.add(componentType);
}

final List<String> indexes = IntStream.range(0, argComponentTypes.size())
.mapToObj(i -> String.valueOf(i))
.collect(Collectors.toList());
final RelDataType structType =
opBinding.getTypeFactory().createStructType(argComponentTypes, indexes);
return SqlTypeUtil.createArrayType(
opBinding.getTypeFactory(),
requireNonNull(structType, "inferred value type"),
false);
}

/** The "ARRAYS_ZIP(array, ...)" function (Spark). */
@LibraryOperator(libraries = {SPARK})
public static final SqlFunction ARRAYS_ZIP =
SqlBasicFunction.create(SqlKind.ARRAYS_ZIP,
((SqlReturnTypeInference) SqlLibraryOperators::deriveTypeArraysZip)
.andThen(SqlTypeTransforms.TO_NULLABLE),
OperandTypes.SAME_VARIADIC);

/** The "SORT_ARRAY(array)" function (Spark). */
@LibraryOperator(libraries = {SPARK})
public static final SqlFunction SORT_ARRAY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.util.Util;

import java.util.ArrayList;
import java.util.List;

import static org.apache.calcite.sql.type.NonNullableAccessors.getCharset;
Expand Down Expand Up @@ -111,6 +112,26 @@ public abstract class SqlTypeTransforms {
return typeToTransform;
};

/**
* Parameter type-inference transform strategy where a derived type is
* transformed into the same type but nullable if any of element of a calls operands is
* nullable.
*/
public static final SqlTypeTransform COLLECTION_ELEMENT_TYPE_NULLABLE =
(opBinding, typeToTransform) -> {
final List<RelDataType> argComponentTypes = new ArrayList<>();
for (RelDataType arrayType : opBinding.collectOperandTypes()) {
final RelDataType componentType = requireNonNull(arrayType.getComponentType());
argComponentTypes.add(componentType);
}

if (argComponentTypes.stream().anyMatch(RelDataType::isNullable)) {
return opBinding.getTypeFactory()
.createTypeWithNullability(typeToTransform, true);
}
return typeToTransform;
};

/**
* Type-inference strategy whereby the result type of a call is VARYING the
* type given. The length returned is the same as length of the first
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ public enum BuiltInMethod {
ARRAY_INTERSECT(SqlFunctions.class, "arrayIntersect", List.class, List.class),
ARRAY_UNION(SqlFunctions.class, "arrayUnion", List.class, List.class),
ARRAY_REVERSE(SqlFunctions.class, "reverse", List.class),
ARRAYS_OVERLAP(SqlFunctions.class, "arraysOverlap", List.class, List.class),
ARRAYS_ZIP(SqlFunctions.class, "arraysZip", List.class, List.class),
SORT_ARRAY(SqlFunctions.class, "sortArray", List.class, boolean.class),
MAP_ENTRIES(SqlFunctions.class, "mapEntries", Map.class),
MAP_KEYS(SqlFunctions.class, "mapKeys", Map.class),
Expand Down
30 changes: 30 additions & 0 deletions core/src/test/java/org/apache/calcite/test/SqlFunctionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate;
import static org.apache.calcite.avatica.util.DateTimeUtils.timeStringToUnixDate;
import static org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate;
import static org.apache.calcite.runtime.SqlFunctions.arraysOverlap;
import static org.apache.calcite.runtime.SqlFunctions.charLength;
import static org.apache.calcite.runtime.SqlFunctions.concat;
import static org.apache.calcite.runtime.SqlFunctions.concatMulti;
Expand Down Expand Up @@ -99,6 +100,35 @@ static <E> List<E> list() {
return ImmutableList.of();
}

@Test void testArraysOverlap() {
final List<Object> listWithOnlyNull = new ArrayList<>();
listWithOnlyNull.add(null);

// list2 is empty
assertThat(arraysOverlap(list(), list()), is(false));
assertThat(arraysOverlap(listWithOnlyNull, list()), is(false));
assertThat(arraysOverlap(list(1, null), list()), is(false));
assertThat(arraysOverlap(list(1, 2), list()), is(false));

// list2 contains only nulls
assertThat(arraysOverlap(list(), listWithOnlyNull), is(false));
assertThat(arraysOverlap(listWithOnlyNull, listWithOnlyNull), is(nullValue()));
assertThat(arraysOverlap(list(1, null), listWithOnlyNull), is(nullValue()));
assertThat(arraysOverlap(list(1, 2), listWithOnlyNull), is(nullValue()));

// list2 contains a mixture of nulls and non-nulls
assertThat(arraysOverlap(list(), list(1, null)), is(false));
assertThat(arraysOverlap(listWithOnlyNull, list(1, null)), is(nullValue()));
assertThat(arraysOverlap(list(1, null), list(1, null)), is(true));
assertThat(arraysOverlap(list(1, 2), list(1, null)), is(true));

// list2 contains only non-null
assertThat(arraysOverlap(list(), list(1, 2)), is(false));
assertThat(arraysOverlap(listWithOnlyNull, list(1, 2)), is(nullValue()));
assertThat(arraysOverlap(list(1, null), list(1, 2)), is(true));
assertThat(arraysOverlap(list(1, 2), list(1, 2)), is(true));
}

@Test void testCharLength() {
assertThat(charLength("xyz"), is(3));
}
Expand Down
3 changes: 3 additions & 0 deletions site/_docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2659,6 +2659,7 @@ BigQuery's type system uses confusingly different names for types and functions:
| s | ARRAY_DISTINCT(array) | Removes duplicate values from the *array* that keeps ordering of elements
| s | ARRAY_EXCEPT(array1, array2) | Returns an array of the elements in *array1* but not in *array2*, without duplicates
| s | ARRAY_INTERSECT(array1, array2) | Returns an array of the elements in the intersection of *array1* and *array2*, without duplicates
| s | ARRAY_JOIN(array, delimiter [, nullText ]) | Synonym for `ARRAY_TO_STRING`
| b | ARRAY_LENGTH(array) | Synonym for `CARDINALITY`
| s | ARRAY_MAX(array) | Returns the maximum value in the *array*
| s | ARRAY_MIN(array) | Returns the minimum value in the *array*
Expand All @@ -2670,6 +2671,8 @@ BigQuery's type system uses confusingly different names for types and functions:
| s | ARRAY_SIZE(array) | Synonym for `CARDINALITY`
| b | ARRAY_TO_STRING(array, delimiter [, nullText ])| Returns a concatenation of the elements in *array* as a STRING and take *delimiter* as the delimiter. If the *nullText* parameter is used, the function replaces any `NULL` values in the array with the value of *nullText*. If the *nullText* parameter is not used, the function omits the `NULL` value and its preceding delimiter
| s | ARRAY_UNION(array1, array2) | Returns an array of the elements in the union of *array1* and *array2*, without duplicates
| s | ARRAYS_OVERLAP(array1, array2) | Returns true if *array1 contains at least a non-null element present also in *array2*. If the arrays have no common element and they are both non-empty and either of them contains a null element null is returned, false otherwise
| s | ARRAYS_ZIP(array [, array ]*) | Returns a merged *array* of structs in which the N-th struct contains all N-th values of input arrays
| s | SORT_ARRAY(array [, ascendingOrder]) | Sorts the *array* in ascending or descending order according to the natural ordering of the array elements. The default order is ascending if *ascendingOrder* is not specified. Null elements will be placed at the beginning of the returned array in ascending order or at the end of the returned array in descending order
| * | ASINH(numeric) | Returns the inverse hyperbolic sine of *numeric*
| * | ATANH(numeric) | Returns the inverse hyperbolic tangent of *numeric*
Expand Down
Loading

0 comments on commit e969f87

Please sign in to comment.