Skip to content

Commit

Permalink
[Bugfix] Always close StarRocksSourceDataReader and check keepAliveMi…
Browse files Browse the repository at this point in the history
…n is valid (#138)
  • Loading branch information
banmoy committed Nov 10, 2022
1 parent 4832bb0 commit 75bf88e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

package com.starrocks.connector.flink.table.source;

import com.google.common.base.Strings;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets;
import com.starrocks.connector.flink.table.source.struct.QueryInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;

import com.google.common.base.Strings;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
Expand All @@ -28,13 +27,18 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicSourceFunction.class);

private final StarRocksSourceOptions sourceOptions;
private QueryInfo queryInfo;
private Long dataCount;
Expand All @@ -45,6 +49,7 @@ public class StarRocksDynamicSourceFunction extends RichParallelSourceFunction<R
private StarRocksSourceQueryType queryType;

private transient Counter counterTotalScannedRows;
private transient AtomicBoolean dataReaderClosed;
private static final String TOTAL_SCANNED_ROWS = "totalScannedRows";

public StarRocksDynamicSourceFunction(TableSchema flinkSchema, StarRocksSourceOptions sourceOptions) {
Expand Down Expand Up @@ -125,10 +130,10 @@ private String genSQL(StarRocksSourceQueryType queryType, String columns, String
return sqlSb.toString();
}


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.dataReaderClosed = new AtomicBoolean(false);
this.counterTotalScannedRows = getRuntimeContext().getMetricGroup().counter(TOTAL_SCANNED_ROWS);

int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
Expand Down Expand Up @@ -161,11 +166,23 @@ public void run(SourceContext<RowData> sourceContext) {

@Override
public void cancel() {
this.dataReaderList.parallelStream().forEach(dataReader -> {
if (dataReader != null) {
dataReader.close();
}
});
internalClose();
}

@Override
public void close() {
internalClose();
}

private void internalClose() {
if (dataReaderClosed.compareAndSet(false, true)) {
LOG.info("Close readers");
this.dataReaderList.parallelStream().forEach(dataReader -> {
if (dataReader != null) {
dataReader.close();
}
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package com.starrocks.connector.flink.table.source;

import com.starrocks.connector.flink.row.source.StarRocksSourceFlinkRows;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.Const;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.connector.flink.table.source.struct.StarRocksSchema;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TProtocol;
Expand All @@ -31,7 +31,6 @@
import com.starrocks.thrift.TScanOpenResult;
import com.starrocks.thrift.TStarrocksExternalService;
import com.starrocks.thrift.TStatusCode;

import org.apache.flink.table.data.GenericRowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,9 +75,9 @@ public StarRocksSourceBeReader(String beNodeInfo,
throw new RuntimeException("Not find be node info from the be port mappping list");
}
beNodeInfo = mappingMap.get(beNodeInfo);
LOG.info("query data from be by using be-hostname");
LOG.info("query data from be by using be-hostname {}", beNodeInfo);
} else {
LOG.info("query data from be by using be-ip");
LOG.info("query data from be by using be-ip {}", beNodeInfo);
}
String[] beNode = beNodeInfo.split(":");
String ip = beNode[0].trim();
Expand Down Expand Up @@ -113,7 +112,8 @@ public void openScanner(List<Long> tablets, String opaqued_query_plan, StarRocks
params.setProperties(sourceOptions.getProperties());
}
// params.setLimit(sourceOptions.getLimit());
params.setKeep_alive_min((short) sourceOptions.getKeepAliveMin());
short keepAliveMin = (short) Math.min(Short.MAX_VALUE, sourceOptions.getKeepAliveMin());
params.setKeep_alive_min(keepAliveMin);
params.setQuery_timeout(sourceOptions.getQueryTimeout());
params.setMem_limit(sourceOptions.getMemLimit());
LOG.info("open Scan params.mem_limit {} B", params.getMem_limit());
Expand All @@ -133,6 +133,8 @@ public void openScanner(List<Long> tablets, String opaqued_query_plan, StarRocks
}
this.srSchema = StarRocksSchema.genSchema(result.getSelected_columns());
this.contextId = result.getContext_id();
LOG.info("Open scanner for {}:{} with context id {}, and there are {} tablets {}",
IP, PORT, contextId, tablets.size(), tablets);
}

public void startToRead() {
Expand Down Expand Up @@ -189,11 +191,13 @@ private void handleResult(TScanBatchResult result) {

@Override
public void close() {
LOG.info("Close reader for {}:{} with context id {}", IP, PORT, contextId);
TScanCloseParams tScanCloseParams = new TScanCloseParams();
tScanCloseParams.setContext_id(this.contextId);
try {
this.client.close_scanner(tScanCloseParams);
} catch (TException e) {
LOG.error("Failed to close reader {}:{} with context id {}", IP, PORT, contextId, e);
throw new RuntimeException(e.getMessage());
}
}
Expand Down

0 comments on commit 75bf88e

Please sign in to comment.