-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CALCITE-5409] Implement BatchNestedLoopJoin for JDBC #3562
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to you under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.calcite.adapter.jdbc; | ||
|
||
import org.apache.calcite.DataContext; | ||
import org.apache.calcite.adapter.java.JavaTypeFactory; | ||
import org.apache.calcite.linq4j.QueryProvider; | ||
import org.apache.calcite.schema.SchemaPlus; | ||
|
||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
/** | ||
* A special DataContext which handles correlation variable for batch nested loop joins. | ||
*/ | ||
public class JdbcCorrelationDataContext implements DataContext { | ||
public static final int OFFSET = 10000; | ||
|
||
private final DataContext delegate; | ||
private final Object[] parameters; | ||
|
||
public JdbcCorrelationDataContext(DataContext delegate, Object[] parameters) { | ||
this.delegate = delegate; | ||
this.parameters = parameters; | ||
} | ||
@Override public @Nullable SchemaPlus getRootSchema() { | ||
return delegate.getRootSchema(); | ||
} | ||
|
||
@Override public JavaTypeFactory getTypeFactory() { | ||
return delegate.getTypeFactory(); | ||
} | ||
|
||
@Override public QueryProvider getQueryProvider() { | ||
return delegate.getQueryProvider(); | ||
} | ||
|
||
@Override public @Nullable Object get(String name) { | ||
if (name.startsWith("?")) { | ||
int index = Integer.parseInt(name.substring(1)); | ||
if (index >= OFFSET && index < OFFSET + parameters.length) { | ||
return parameters[index - OFFSET]; | ||
} | ||
} | ||
return delegate.get(name); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to you under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.calcite.adapter.jdbc; | ||
|
||
import org.apache.calcite.rel.core.CorrelationId; | ||
|
||
import java.lang.reflect.Type; | ||
|
||
/** | ||
* A class to build an object of type JdbcCorrelationDataContext. | ||
*/ | ||
public interface JdbcCorrelationDataContextBuilder { | ||
int add(CorrelationId id, int ordinal, Type type); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to you under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.calcite.adapter.jdbc; | ||
|
||
import org.apache.calcite.DataContext; | ||
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; | ||
import org.apache.calcite.linq4j.tree.BlockBuilder; | ||
import org.apache.calcite.linq4j.tree.Expression; | ||
import org.apache.calcite.linq4j.tree.Expressions; | ||
import org.apache.calcite.linq4j.tree.Types; | ||
import org.apache.calcite.rel.core.CorrelationId; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
|
||
import java.lang.reflect.Constructor; | ||
import java.lang.reflect.Type; | ||
|
||
/** | ||
* An implementation class of JdbcCorrelationDataContext. | ||
*/ | ||
public class JdbcCorrelationDataContextBuilderImpl implements JdbcCorrelationDataContextBuilder { | ||
private static final Constructor NEW = | ||
Types.lookupConstructor(JdbcCorrelationDataContext.class, DataContext.class, Object[].class); | ||
private final ImmutableList.Builder<Expression> parameters = new ImmutableList.Builder<>(); | ||
private int offset = JdbcCorrelationDataContext.OFFSET; | ||
private final EnumerableRelImplementor implementor; | ||
private final BlockBuilder builder; | ||
private final Expression dataContext; | ||
|
||
public JdbcCorrelationDataContextBuilderImpl(EnumerableRelImplementor implementor, | ||
BlockBuilder builder, Expression dataContext) { | ||
this.implementor = implementor; | ||
this.builder = builder; | ||
this.dataContext = dataContext; | ||
} | ||
|
||
@Override public int add(CorrelationId id, int ordinal, Type type) { | ||
parameters.add(implementor.getCorrelVariableGetter(id.getName()).field(builder, ordinal, type)); | ||
return offset++; | ||
} | ||
|
||
public Expression build() { | ||
return Expressions.new_(NEW, dataContext, | ||
Expressions.newArrayInit(Object.class, 1, parameters.build())); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,20 +18,64 @@ | |
|
||
import org.apache.calcite.adapter.java.JavaTypeFactory; | ||
import org.apache.calcite.rel.RelNode; | ||
import org.apache.calcite.rel.core.CorrelationId; | ||
import org.apache.calcite.rel.rel2sql.RelToSqlConverter; | ||
import org.apache.calcite.rel.rel2sql.SqlImplementor; | ||
import org.apache.calcite.rel.type.RelDataTypeField; | ||
import org.apache.calcite.rex.RexCorrelVariable; | ||
import org.apache.calcite.sql.SqlDialect; | ||
import org.apache.calcite.util.Util; | ||
import org.apache.calcite.sql.SqlDynamicParam; | ||
import org.apache.calcite.sql.SqlNode; | ||
import org.apache.calcite.sql.parser.SqlParserPos; | ||
|
||
import java.lang.reflect.Type; | ||
import java.util.List; | ||
|
||
/** | ||
* State for generating a SQL statement. | ||
*/ | ||
public class JdbcImplementor extends RelToSqlConverter { | ||
public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory) { | ||
|
||
private final JdbcCorrelationDataContextBuilder dataContextBuilder; | ||
private final JavaTypeFactory typeFactory; | ||
|
||
public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory, | ||
JdbcCorrelationDataContextBuilder dataContextBuilder) { | ||
super(dialect); | ||
Util.discard(typeFactory); | ||
this. typeFactory = typeFactory; | ||
this.dataContextBuilder = dataContextBuilder; | ||
} | ||
|
||
public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory) { | ||
this(dialect, typeFactory, new JdbcCorrelationDataContextBuilder() { | ||
private int counter = 1; | ||
@Override public int add(CorrelationId id, int ordinal, Type type) { | ||
return counter++; | ||
} | ||
}); | ||
} | ||
|
||
public Result implement(RelNode node) { | ||
return dispatch(node); | ||
} | ||
|
||
@Override protected Context getAliasContext(RexCorrelVariable variable) { | ||
Context context = correlTableMap.get(variable.id); | ||
if (context != null) { | ||
return context; | ||
} | ||
List<RelDataTypeField> fieldList = variable.getType().getFieldList(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I'm just not very familiar with JDBC but could we potentially add a comment or two explaining what is going on here. (If it's obvious to everyone but me then maybe not 🤷 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a comment |
||
return new Context(dialect, fieldList.size()) { | ||
@Override public SqlNode field(int ordinal) { | ||
RelDataTypeField field = fieldList.get(ordinal); | ||
return new SqlDynamicParam( | ||
dataContextBuilder.add(variable.id, ordinal, | ||
typeFactory.getJavaClass(field.getType())), SqlParserPos.ZERO); | ||
} | ||
|
||
@Override public SqlImplementor implementor() { | ||
return JdbcImplementor.this; | ||
} | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,16 @@ | |
*/ | ||
package org.apache.calcite.test; | ||
|
||
import org.apache.calcite.adapter.enumerable.EnumerableRules; | ||
import org.apache.calcite.adapter.java.ReflectiveSchema; | ||
import org.apache.calcite.config.CalciteConnectionProperty; | ||
import org.apache.calcite.config.Lex; | ||
import org.apache.calcite.plan.RelOptPlanner; | ||
import org.apache.calcite.runtime.Hook; | ||
import org.apache.calcite.test.CalciteAssert.AssertThat; | ||
import org.apache.calcite.test.CalciteAssert.DatabaseInstance; | ||
import org.apache.calcite.test.schemata.foodmart.FoodmartSchema; | ||
import org.apache.calcite.test.schemata.hr.HrSchema; | ||
import org.apache.calcite.util.Smalls; | ||
import org.apache.calcite.util.TestUtil; | ||
|
||
|
@@ -35,6 +40,7 @@ | |
import java.util.Properties; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.function.Consumer; | ||
|
||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.CoreMatchers.is; | ||
|
@@ -1147,6 +1153,52 @@ private LockWrapper exclusiveCleanDb(Connection c) throws SQLException { | |
}); | ||
} | ||
|
||
@Test void testBatchNestedLoopJoinPlan() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could add comment with link to JIRA There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
final String sql = "SELECT *\n" | ||
+ "FROM \"s\".\"emps\" A\n" | ||
+ "LEFT OUTER JOIN \"foodmart\".\"store\" B ON A.\"empid\" = B.\"store_id\""; | ||
final String explain = "JdbcFilter(condition=[OR(=($cor0.empid0, $0), =($cor1.empid0, $0)"; | ||
final String jdbcSql = "SELECT *\n" | ||
+ "FROM \"foodmart\".\"store\"\n" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any other edge cases or situations we should test? Asked another way: Is one test enough to ensure that this works the way we intend? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think one test should be enough, as only one additional execution path is added to the code. Context context = correlTableMap.get(variable.id);
if (context != null) {
return context;
} |
||
+ "WHERE ? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR" | ||
+ " (? = \"store_id\" OR ? = \"store_id\")) OR (? = \"store_id\" OR (? = \"store_id\" OR ? " | ||
+ "= \"store_id\") OR (? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\"))) OR (? =" | ||
+ " \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\")) OR (? = \"store_id\" OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\") OR (? = \"store_id\" OR ? = \"store_id\" OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\")))) OR (? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = " | ||
+ "\"store_id\" OR (? = \"store_id\" OR ? = \"store_id\")) OR (? = \"store_id\" OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\"))) OR (? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = " | ||
+ "\"store_id\" OR (? = \"store_id\" OR ? = \"store_id\")) OR (? = \"store_id\" OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR ? = \"store_id\" OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\"))))) OR (? = \"store_id\" OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\") OR (? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\")) OR (? = " | ||
+ "\"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\"))) OR (? = \"store_id\" OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\") OR (? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\")) OR (? = " | ||
+ "\"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\" OR (? = \"store_id\" OR ? = \"store_id\")))) OR (? = \"store_id\" OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\")) OR (? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = " | ||
+ "\"store_id\" OR (? = \"store_id\" OR ? = \"store_id\"))) OR (? = \"store_id\" OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\") OR (? = \"store_id\" OR (? = \"store_id\" OR ? = " | ||
+ "\"store_id\")) OR (? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\") OR (? = " | ||
+ "\"store_id\" OR ? = \"store_id\" OR (? = \"store_id\" OR ? = \"store_id\"))))))"; | ||
CalciteAssert.model(FoodmartSchema.FOODMART_MODEL) | ||
.withSchema("s", new ReflectiveSchema(new HrSchema())) | ||
.withHook(Hook.PLANNER, (Consumer<RelOptPlanner>) planner -> { | ||
planner.addRule(EnumerableRules.ENUMERABLE_BATCH_NESTED_LOOP_JOIN_RULE); | ||
}) | ||
.query(sql) | ||
.explainContains(explain) | ||
.runs() | ||
.enable(CalciteAssert.DB == CalciteAssert.DatabaseInstance.HSQLDB | ||
|| CalciteAssert.DB == DatabaseInstance.POSTGRESQL) | ||
.planHasSql(jdbcSql) | ||
.returnsCount(4); | ||
} | ||
|
||
/** Acquires a lock, and releases it when closed. */ | ||
static class LockWrapper implements AutoCloseable { | ||
private final Lock lock; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK