Skip to content

Commit

Permalink
[fix](multicatlog) fix read hive/iceberg catalog on cosn & fix read d…
Browse files Browse the repository at this point in the history
…ata via broker (#22087)

* [fix](multicatlog) fix read hive/iceberg catalog on cosn & fix read data via broker

* Update FileSystemFactory.java
  • Loading branch information
Yulei-Yang authored Aug 10, 2023
1 parent f2658dc commit f7d00d4
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class FeConstants {
public static String FS_PREFIX_GCS = "gs";
public static String FS_PREFIX_BOS = "bos";
public static String FS_PREFIX_COS = "cos";
public static String FS_PREFIX_COSN = "cosn";
public static String FS_PREFIX_OBS = "obs";
public static String FS_PREFIX_OFS = "ofs";
public static String FS_PREFIX_GFS = "gfs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ public class S3Util {

public static boolean isObjStorage(String location) {
return isObjStorageUseS3Client(location)
|| location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
|| (location.startsWith(FeConstants.FS_PREFIX_COS) && !location.startsWith(FeConstants.FS_PREFIX_COSN))
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
}

private static boolean isObjStorageUseS3Client(String location) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public static Pair<FileSystemType, String> getFSIdentity(String location) {
fsType = FileSystemType.S3;
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS)) {
fsType = FileSystemType.DFS;
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS) || location.startsWith(FeConstants.FS_PREFIX_COSN)) {
// ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) {
fsType = FileSystemType.OFS;
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
fsType = FileSystemType.JFS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,16 @@ private void setLocationPropertiesIfNecessary(TFileType locationType, FileSplit
params.setHdfsParams(tHdfsParams);
}

if (locationType == TFileType.FILE_BROKER && !params.isSetBrokerAddresses()) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
if (locationType == TFileType.FILE_BROKER) {
params.setProperties(locationProperties);

if (!params.isSetBrokerAddresses()) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
}
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
} else if (locationType == TFileType.FILE_S3 && !params.isSetProperties()) {
params.setProperties(locationProperties);
Expand Down Expand Up @@ -450,6 +454,8 @@ protected static Optional<TFileType> getTFileType(String location) {
return Optional.of(TFileType.FILE_LOCAL);
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
return Optional.of(TFileType.FILE_BROKER);
} else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) {
return Optional.of(TFileType.FILE_BROKER);
} else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
return Optional.of(TFileType.FILE_BROKER);
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
Expand Down

0 comments on commit f7d00d4

Please sign in to comment.