From d508405f3862d10735187a6ad269b9edb4c135c3 Mon Sep 17 00:00:00 2001 From: "wenzhuang.zwz" Date: Wed, 30 Aug 2023 21:04:43 +0800 Subject: [PATCH] [CALCITE-5968] Provide an interface class for RexExecutable to decouple janino runtime binding --- .../calcite/plan/RexImplicationChecker.java | 3 +- .../org/apache/calcite/rex/RexExecutable.java | 114 ++++---------- .../apache/calcite/rex/RexExecutableImpl.java | 117 +++++++++++++++ .../org/apache/calcite/rex/RexExecutor.java | 11 ++ .../apache/calcite/rex/RexExecutorImpl.java | 6 +- .../apache/calcite/rex/RexExecutorTest.java | 142 ++++++++++++++++++ 6 files changed, 300 insertions(+), 93 deletions(-) create mode 100644 core/src/main/java/org/apache/calcite/rex/RexExecutableImpl.java diff --git a/core/src/main/java/org/apache/calcite/plan/RexImplicationChecker.java b/core/src/main/java/org/apache/calcite/plan/RexImplicationChecker.java index f5230dd0913..6bdfbdc08fe 100644 --- a/core/src/main/java/org/apache/calcite/plan/RexImplicationChecker.java +++ b/core/src/main/java/org/apache/calcite/plan/RexImplicationChecker.java @@ -22,7 +22,6 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexExecutable; import org.apache.calcite.rex.RexExecutor; -import org.apache.calcite.rex.RexExecutorImpl; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; @@ -256,7 +255,7 @@ private boolean isSatisfiable(RexNode second, @Nullable DataContext dataValues) } ImmutableList constExps = ImmutableList.of(second); - final RexExecutable exec = RexExecutorImpl.getExecutable(builder, constExps, rowType); + final RexExecutable exec = executor.getExecutable(builder, constExps, rowType); @Nullable Object[] result; exec.setDataContext(dataValues); diff --git a/core/src/main/java/org/apache/calcite/rex/RexExecutable.java b/core/src/main/java/org/apache/calcite/rex/RexExecutable.java index cc8d7876864..5d1f716d989 100644 --- a/core/src/main/java/org/apache/calcite/rex/RexExecutable.java +++ b/core/src/main/java/org/apache/calcite/rex/RexExecutable.java @@ -17,100 +17,38 @@ package org.apache.calcite.rex; import org.apache.calcite.DataContext; -import org.apache.calcite.linq4j.function.Function1; -import org.apache.calcite.runtime.Hook; -import org.apache.calcite.runtime.Utilities; -import org.apache.calcite.util.Pair; import org.checkerframework.checker.nullness.qual.Nullable; -import org.codehaus.commons.compiler.CompileException; -import org.codehaus.janino.ClassBodyEvaluator; -import org.codehaus.janino.Scanner; -import java.io.IOException; -import java.io.Serializable; -import java.io.StringReader; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Arrays; import java.util.List; -import static java.util.Objects.requireNonNull; - /** - * Result of compiling code generated from a {@link RexNode} expression. + * Executable interface for a {@link RexNode} expression. */ -public class RexExecutable { - private static final String GENERATED_CLASS_NAME = "Reducer"; - - private final Function1 compiledFunction; - private final String code; - private @Nullable DataContext dataContext; - - public RexExecutable(String code, Object reason) { - this.code = code; - this.compiledFunction = compile(code, reason); - } - - private static Function1 compile(String code, - Object reason) { - try { - final ClassBodyEvaluator cbe = new ClassBodyEvaluator(); - cbe.setClassName(GENERATED_CLASS_NAME); - cbe.setExtendedClass(Utilities.class); - cbe.setImplementedInterfaces(new Class[] {Function1.class, Serializable.class}); - cbe.setParentClassLoader(RexExecutable.class.getClassLoader()); - cbe.cook(new Scanner(null, new StringReader(code))); - Class c = cbe.getClazz(); - //noinspection unchecked - final Constructor> constructor = - c.getConstructor(); - return constructor.newInstance(); - } catch (CompileException | IOException | InstantiationException - | IllegalAccessException | InvocationTargetException - | NoSuchMethodException e) { - throw new RuntimeException("While compiling " + reason, e); - } - } - - public void setDataContext(DataContext dataContext) { - this.dataContext = dataContext; - } - - public void reduce(RexBuilder rexBuilder, List constExps, - List reducedValues) { - @Nullable Object[] values; - try { - values = execute(); - if (values == null) { - reducedValues.addAll(constExps); - values = new Object[constExps.size()]; - } else { - assert values.length == constExps.size(); - final List<@Nullable Object> valueList = Arrays.asList(values); - for (Pair value : Pair.zip(constExps, valueList)) { - reducedValues.add( - rexBuilder.makeLiteral(value.right, value.left.getType(), true)); - } - } - } catch (RuntimeException e) { - // One or more of the expressions failed. - // Don't reduce any of the expressions. - reducedValues.addAll(constExps); - values = new Object[constExps.size()]; - } - Hook.EXPRESSION_REDUCER.run(Pair.of(code, values)); - } - - public Function1 getFunction() { - return compiledFunction; - } - - public @Nullable Object @Nullable [] execute() { - return compiledFunction.apply(requireNonNull(dataContext, "dataContext")); - } +public interface RexExecutable { + + /** Reduces expressions, and writes their results into {@code reducedValues}. + * + *

If an expression cannot be reduced, writes the original expression. + * For example, {@code CAST('abc' AS INTEGER)} gives an error when executed, so the executor + * ignores the error and writes the original expression. + * + * @param rexBuilder Rex builder + * @param constExps Expressions to be reduced + * @param reducedValues List to which reduced expressions are appended + */ + void reduce(RexBuilder rexBuilder, List constExps, List reducedValues); + + /** Set input data for the {@link RexNode} expression. + * + * @param dataContext input data + */ + void setDataContext(DataContext dataContext); + + /** Execute the {@link RexNode} expression. + * + * @return execute result + */ + @Nullable Object @Nullable [] execute(); - public String getSource() { - return code; - } } diff --git a/core/src/main/java/org/apache/calcite/rex/RexExecutableImpl.java b/core/src/main/java/org/apache/calcite/rex/RexExecutableImpl.java new file mode 100644 index 00000000000..da5bfc302d3 --- /dev/null +++ b/core/src/main/java/org/apache/calcite/rex/RexExecutableImpl.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rex; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.function.Function1; +import org.apache.calcite.runtime.Hook; +import org.apache.calcite.runtime.Utilities; +import org.apache.calcite.util.Pair; + +import org.checkerframework.checker.nullness.qual.Nullable; +import org.codehaus.commons.compiler.CompileException; +import org.codehaus.janino.ClassBodyEvaluator; +import org.codehaus.janino.Scanner; + +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Result of compiling code generated from a {@link RexNode} expression. + */ +public class RexExecutableImpl implements RexExecutable { + private static final String GENERATED_CLASS_NAME = "Reducer"; + + private final Function1 compiledFunction; + private final String code; + private @Nullable DataContext dataContext; + + public RexExecutableImpl(String code, Object reason) { + this.code = code; + this.compiledFunction = compile(code, reason); + } + + private static Function1 compile(String code, + Object reason) { + try { + final ClassBodyEvaluator cbe = new ClassBodyEvaluator(); + cbe.setClassName(GENERATED_CLASS_NAME); + cbe.setExtendedClass(Utilities.class); + cbe.setImplementedInterfaces(new Class[] {Function1.class, Serializable.class}); + cbe.setParentClassLoader(RexExecutableImpl.class.getClassLoader()); + cbe.cook(new Scanner(null, new StringReader(code))); + Class c = cbe.getClazz(); + //noinspection unchecked + final Constructor> constructor = + c.getConstructor(); + return constructor.newInstance(); + } catch (CompileException | IOException | InstantiationException + | IllegalAccessException | InvocationTargetException + | NoSuchMethodException e) { + throw new RuntimeException("While compiling " + reason, e); + } + } + + @Override public void setDataContext(DataContext dataContext) { + this.dataContext = dataContext; + } + + @Override public void reduce(RexBuilder rexBuilder, List constExps, + List reducedValues) { + @Nullable Object[] values; + try { + values = execute(); + if (values == null) { + reducedValues.addAll(constExps); + values = new Object[constExps.size()]; + } else { + assert values.length == constExps.size(); + final List<@Nullable Object> valueList = Arrays.asList(values); + for (Pair value : Pair.zip(constExps, valueList)) { + reducedValues.add( + rexBuilder.makeLiteral(value.right, value.left.getType(), true)); + } + } + } catch (RuntimeException e) { + // One or more of the expressions failed. + // Don't reduce any of the expressions. + reducedValues.clear(); + reducedValues.addAll(constExps); + values = new Object[constExps.size()]; + } + Hook.EXPRESSION_REDUCER.run(Pair.of(code, values)); + } + + public Function1 getFunction() { + return compiledFunction; + } + + @Override public @Nullable Object @Nullable [] execute() { + return compiledFunction.apply(requireNonNull(dataContext, "dataContext")); + } + + public String getSource() { + return code; + } +} diff --git a/core/src/main/java/org/apache/calcite/rex/RexExecutor.java b/core/src/main/java/org/apache/calcite/rex/RexExecutor.java index b9b3c3781f2..56d8584fa92 100644 --- a/core/src/main/java/org/apache/calcite/rex/RexExecutor.java +++ b/core/src/main/java/org/apache/calcite/rex/RexExecutor.java @@ -16,6 +16,8 @@ */ package org.apache.calcite.rex; +import org.apache.calcite.rel.type.RelDataType; + import java.util.List; /** Can reduce expressions, writing a literal for each into a list. */ @@ -32,4 +34,13 @@ public interface RexExecutor { * @param reducedValues List to which reduced expressions are appended */ void reduce(RexBuilder rexBuilder, List constExps, List reducedValues); + + + /** Creates an {@link RexExecutable} that allows to execute rex expressions. + * + * @param rexBuilder Rex builder + * @param exps Expressions + * @param rowType describes the structure of the input row. + */ + RexExecutable getExecutable(RexBuilder rexBuilder, List exps, RelDataType rowType); } diff --git a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java index eaa3c7dd46c..0d006289f15 100644 --- a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java +++ b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java @@ -115,13 +115,13 @@ private static String compile(RexBuilder rexBuilder, List constExps, * @param exps Expressions * @param rowType describes the structure of the input row. */ - public static RexExecutable getExecutable(RexBuilder rexBuilder, List exps, + @Override public RexExecutable getExecutable(RexBuilder rexBuilder, List exps, RelDataType rowType) { final JavaTypeFactoryImpl typeFactory = new JavaTypeFactoryImpl(rexBuilder.getTypeFactory().getTypeSystem()); final InputGetter getter = new DataContextInputGetter(rowType, typeFactory); final String code = compile(rexBuilder, exps, getter, rowType); - return new RexExecutable(code, "generated Rex code"); + return new RexExecutableImpl(code, "generated Rex code"); } /** @@ -134,7 +134,7 @@ public static RexExecutable getExecutable(RexBuilder rexBuilder, List e throw new UnsupportedOperationException(); }); - final RexExecutable executable = new RexExecutable(code, constExps); + final RexExecutable executable = new RexExecutableImpl(code, constExps); executable.setDataContext(dataContext); executable.reduce(rexBuilder, constExps, reducedValues); } diff --git a/core/src/test/java/org/apache/calcite/rex/RexExecutorTest.java b/core/src/test/java/org/apache/calcite/rex/RexExecutorTest.java index b1cb8041752..af4e3351c0f 100644 --- a/core/src/test/java/org/apache/calcite/rex/RexExecutorTest.java +++ b/core/src/test/java/org/apache/calcite/rex/RexExecutorTest.java @@ -17,9 +17,12 @@ package org.apache.calcite.rex; import org.apache.calcite.DataContext; import org.apache.calcite.DataContexts; +import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.avatica.util.ByteString; +import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlBinaryOperator; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; @@ -33,6 +36,7 @@ import org.apache.calcite.tools.Frameworks; import org.apache.calcite.util.DateString; import org.apache.calcite.util.NlsString; +import org.apache.calcite.util.Pair; import org.apache.calcite.util.TestUtil; import org.apache.calcite.util.Util; @@ -43,6 +47,7 @@ import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.function.Function; @@ -378,4 +383,141 @@ public void run() { interface Action { void check(RexBuilder rexBuilder, RexExecutorImpl executor); } + + /** + * ArrayList-based DataContext to check Rex execution. + */ + public static class TestDataContext extends SingleValueDataContext { + private TestDataContext(Object[] values) { + super("inputRecord", values); + } + } + + /** + * Context that holds a value for a particular context name. + */ + static class SingleValueDataContext implements DataContext { + private final String name; + private final Object value; + + SingleValueDataContext(String name, Object value) { + this.name = name; + this.value = value; + } + + public SchemaPlus getRootSchema() { + throw new RuntimeException("Unsupported"); + } + + public JavaTypeFactory getTypeFactory() { + throw new RuntimeException("Unsupported"); + } + + public QueryProvider getQueryProvider() { + throw new RuntimeException("Unsupported"); + } + + public Object get(String name) { + if (this.name.equals(name)) { + return value; + } else { + throw new RuntimeException("Wrong DataContext access"); + } + } + } + + /** + * User defined executable. + */ + static class UserDefinedExecutableImpl implements RexExecutable { + final RexBuilder rexBuilder; + final List exps; + DataContext dataContext; + UserDefinedExecutableImpl(final RexBuilder rexBuilder, final List exps) { + assert exps != null && exps.size() == 1; + this.rexBuilder = rexBuilder; + this.exps = exps; + } + @Override public void reduce(RexBuilder rexBuilder, List constExps, + List reducedValues) { + Object[] values; + try { + values = execute(); + assert values.length == constExps.size(); + final List valueList = Arrays.asList(values); + for (Pair value : Pair.zip(constExps, valueList)) { + reducedValues.add( + rexBuilder.makeLiteral(value.right, value.left.getType(), true)); + } + } catch (RuntimeException e) { + // One or more of the expressions failed. + // Don't reduce any of the expressions. + reducedValues.clear(); + reducedValues.addAll(constExps); + } + } + @Override public void setDataContext(DataContext dataContext) { + this.dataContext = dataContext; + } + @Override public Object[] execute() { + final RexNode exp = exps.get(0); + assert exp instanceof RexCall; + RexCall call = (RexCall) exp; + // NOTE: this isn't a correct substring function implementation, only for testing + if (!call.op.getName().equals("SUBSTRING")) { + throw new RuntimeException("not supported"); + } + assert call.getOperands().get(0) instanceof RexInputRef; + assert call.getOperands().get(1) instanceof RexLiteral; + String str = (String) ((Object[]) dataContext.get("inputRecord"))[0]; + Integer startIndex = ((RexLiteral) call.getOperands().get(1)).getValueAs(Integer.class); + Object[] values = new Object[1]; + values[0] = str.substring(startIndex - 1); + return values; + } + } + /** + * User defined executor. + */ + static class UserDefinedExecutorImpl implements RexExecutor { + @Override public void reduce(RexBuilder rexBuilder, List constExps, + List reducedValues) { + this.getExecutable(rexBuilder, constExps, null).reduce(rexBuilder, constExps, reducedValues); + } + @Override public RexExecutable getExecutable(RexBuilder rexBuilder, List exps, + RelDataType rowType) { + return new UserDefinedExecutableImpl(rexBuilder, exps); + } + } + @Test public void userExecutorTest() throws Exception { + check((rexBuilder, executor) -> { + Object[] values = new Object[1]; + final DataContext testContext = new TestDataContext(values); + final RelDataTypeFactory typeFactory = rexBuilder.getTypeFactory(); + final RelDataType varchar = + typeFactory.createSqlType(SqlTypeName.VARCHAR); + final RelDataType integer = + typeFactory.createSqlType(SqlTypeName.INTEGER); + // Calcite is internally creating the input ref via a RexRangeRef + // which eventually leads to a RexInputRef. So we are good. + final RexInputRef input = rexBuilder.makeInputRef(varchar, 0); + final RexNode lengthArg = rexBuilder.makeLiteral(3, integer, true); + final RexNode substr = + rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING, input, + lengthArg); + ImmutableList constExps = ImmutableList.of(substr); + final RelDataType rowType = typeFactory.builder() + .add("someStr", varchar) + .build(); + RexExecutor useExecutor = new UserDefinedExecutorImpl(); + final RexExecutable exec = + useExecutor.getExecutable(rexBuilder, constExps, rowType); + exec.setDataContext(testContext); + values[0] = "Hello World"; + Object[] result = exec.execute(); + assertTrue(result[0] instanceof String); + assertThat((String) result[0], equalTo("llo World")); + }); + } + }