Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](multicatlog) fix read hive/iceberg catalog on cosn & fix read data via broker #22087

Merged
merged 4 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class FeConstants {
public static String FS_PREFIX_GCS = "gs";
public static String FS_PREFIX_BOS = "bos";
public static String FS_PREFIX_COS = "cos";
public static String FS_PREFIX_COSN = "cosn";
public static String FS_PREFIX_OBS = "obs";
public static String FS_PREFIX_OFS = "ofs";
public static String FS_PREFIX_GFS = "gfs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public class S3Util {

public static boolean isObjStorage(String location) {
return isObjStorageUseS3Client(location)
|| location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
|| (location.startsWith(FeConstants.FS_PREFIX_COS) && !location.startsWith(FeConstants.FS_PREFIX_COSN))
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
}

private static boolean isObjStorageUseS3Client(String location) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public static Pair<FileSystemType, String> getFSIdentity(String location) {
fsType = FileSystemType.S3;
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS)) {
fsType = FileSystemType.DFS;
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS) || location.startsWith(FeConstants.FS_PREFIX_COSN)) {
// ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) {
fsType = FileSystemType.OFS;
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
fsType = FileSystemType.JFS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,16 @@ private void setLocationPropertiesIfNecessary(TFileType locationType, FileSplit
params.setHdfsParams(tHdfsParams);
}

if (locationType == TFileType.FILE_BROKER && !params.isSetBrokerAddresses()) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
if (locationType == TFileType.FILE_BROKER) {
params.setProperties(locationProperties);

if (!params.isSetBrokerAddresses()) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
}
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
} else if (locationType == TFileType.FILE_S3 && !params.isSetProperties()) {
params.setProperties(locationProperties);
Expand Down Expand Up @@ -450,6 +454,8 @@ protected static Optional<TFileType> getTFileType(String location) {
return Optional.of(TFileType.FILE_LOCAL);
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
return Optional.of(TFileType.FILE_BROKER);
} else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) {
return Optional.of(TFileType.FILE_BROKER);
} else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
return Optional.of(TFileType.FILE_BROKER);
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
Expand Down
Loading