Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,16 @@
import org.apache.doris.datasource.property.constants.MCProperties;

import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.Partition;
import com.aliyun.odps.Project;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AccountFormat;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.security.SecurityManager;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.Credentials;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.utils.StringUtils;
import com.google.common.collect.ImmutableList;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.log4j.Logger;

import java.time.ZoneId;
Expand All @@ -66,7 +61,6 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
private String defaultProject;
private String quota;
private EnvironmentSettings settings;
private String catalogOwner;

private String splitStrategy;
private SplitOptions splitOptions;
Expand All @@ -81,6 +75,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {

AccountFormat accountFormat = AccountFormat.DISPLAYNAME;

private McStructureHelper mcStructureHelper = null;

private static final Map<String, ZoneId> REGION_ZONE_MAP;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.PROJECT,
Expand Down Expand Up @@ -231,6 +227,10 @@ protected void initLocalObjectsImpl() {
.withQuotaName(quota)
.withRestOptions(restOptions)
.build();

boolean enableNamespaceSchema = Boolean.parseBoolean(
props.getOrDefault(MCProperties.ENABLE_NAMESPACE_SCHEMA, MCProperties.DEFAULT_ENABLE_NAMESPACE_SCHEMA));
mcStructureHelper = McStructureHelper.getHelper(enableNamespaceSchema, defaultProject);
}

public Odps getClient() {
Expand All @@ -239,83 +239,53 @@ public Odps getClient() {
}

protected List<String> listDatabaseNames() {
List<String> result = new ArrayList<>();
result.add(defaultProject);

try {
result.add(defaultProject);
if (StringUtils.isNullOrEmpty(catalogOwner)) {
SecurityManager sm = odps.projects().get().getSecurityManager();
String whoami = sm.runQuery("whoami", false);

JsonObject js = JsonParser.parseString(whoami).getAsJsonObject();
catalogOwner = js.get("DisplayName").getAsString();
}
Iterator<Project> iterator = odps.projects().iterator(catalogOwner);
while (iterator.hasNext()) {
Project project = iterator.next();
if (!project.getName().equals(defaultProject)) {
result.add(project.getName());
}
}
} catch (OdpsException e) {
throw new RuntimeException(e);
}
return result;
makeSureInitialized();
return mcStructureHelper.listDatabaseNames(getClient(), getDefaultProject());
}

@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
try {
return getClient().tables().exists(dbName, tblName);
} catch (OdpsException e) {
throw new RuntimeException(e);
}
return mcStructureHelper.tableExist(getClient(), dbName, tblName);

}

public List<String> listPartitionNames(String dbName, String tbl) {
return listPartitionNames(dbName, tbl, 0, -1);
}

public List<String> listPartitionNames(String dbName, String tbl, long skip, long limit) {
try {
if (getClient().projects().exists(dbName)) {
List<Partition> parts;
if (limit < 0) {
parts = getClient().tables().get(dbName, tbl).getPartitions();
} else {
skip = skip < 0 ? 0 : skip;
parts = new ArrayList<>();
Iterator<Partition> it = getClient().tables().get(dbName, tbl).getPartitionIterator();
int count = 0;
while (it.hasNext()) {
if (count < skip) {
count++;
it.next();
} else if (parts.size() >= limit) {
break;
} else {
parts.add(it.next());
}
if (mcStructureHelper.databaseExist(getClient(), dbName)) {
List<Partition> parts;
if (limit < 0) {
parts = mcStructureHelper.getPartitions(getClient(), dbName, tbl);
} else {
skip = skip < 0 ? 0 : skip;
parts = new ArrayList<>();
Iterator<Partition> it = mcStructureHelper.getPartitionIterator(getClient(), dbName, tbl);
int count = 0;
while (it.hasNext()) {
if (count < skip) {
count++;
it.next();
} else if (parts.size() >= limit) {
break;
} else {
parts.add(it.next());
}
}
return parts.stream().map(p -> p.getPartitionSpec().toString(false, true))
.collect(Collectors.toList());
} else {
throw new OdpsException("Max compute project: " + dbName + " not exists.");
}
} catch (OdpsException e) {
throw new RuntimeException(e);
return parts.stream().map(p -> p.getPartitionSpec().toString(false, true))
.collect(Collectors.toList());
} else {
throw new RuntimeException("MaxCompute schema/project: " + dbName + " not exists.");
}
}

@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
List<String> result = new ArrayList<>();
getClient().tables().iterable(dbName).forEach(e -> result.add(e.getName()));
return result;
return mcStructureHelper.listTableNames(getClient(), dbName);
}

public String getAccessKey() {
Expand Down Expand Up @@ -402,6 +372,14 @@ public long getSplitByteSize() {
return splitByteSize;
}

public com.aliyun.odps.Table getOdpsTable(String dbName, String tableName) {
return mcStructureHelper.getOdpsTable(getClient(), dbName, tableName);
}

public TableIdentifier getOdpsTableIdentifier(String dbName, String tableName) {
return mcStructureHelper.getTableIdentifier(dbName, tableName);
}

@Override
public void checkProperties() throws DdlException {
super.checkProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Table;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.type.ArrayTypeInfo;
import com.aliyun.odps.type.CharTypeInfo;
import com.aliyun.odps.type.DecimalTypeInfo;
Expand Down Expand Up @@ -172,9 +173,12 @@ public Map<String, com.aliyun.odps.Column> getColumnNameToOdpsColumn() {
public Optional<SchemaCacheValue> initSchema() {
// this method will be called at semantic parsing.
makeSureInitialized();
Table odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(dbName, name);
List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) catalog;

Table odpsTable = mcCatalog.getOdpsTable(dbName, name);
TableIdentifier tableIdentifier = mcCatalog.getOdpsTableIdentifier(dbName, name);

List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();

for (com.aliyun.odps.Column column : columns) {
columnNameToOdpsColumn.put(column.getName(), column);
Expand Down Expand Up @@ -213,8 +217,8 @@ public Optional<SchemaCacheValue> initSchema() {
partitionSpecs = ImmutableList.of();
}

return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable, partitionColumnNames,
partitionSpecs, partitionDorisColumns, partitionTypes));
return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable, tableIdentifier,
partitionColumnNames, partitionSpecs, partitionDorisColumns, partitionTypes));
}

private Type mcTypeToDorisType(TypeInfo typeInfo) {
Expand Down Expand Up @@ -303,6 +307,13 @@ private Type mcTypeToDorisType(TypeInfo typeInfo) {
}
}

public TableIdentifier getTableIdentifier() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getTableIdentifier())
.orElse(null);
}

@Override
public TTableDescriptor toThrift() {
// ak sk endpoint project quota
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.datasource.SchemaCacheValue;

import com.aliyun.odps.Table;
import com.aliyun.odps.table.TableIdentifier;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -31,15 +32,18 @@
@Setter
public class MaxComputeSchemaCacheValue extends SchemaCacheValue {
private Table odpsTable;
private TableIdentifier tableIdentifier;
private List<String> partitionColumnNames;
private List<String> partitionSpecs;
private List<Column> partitionColumns;
private List<Type> partitionTypes;

public MaxComputeSchemaCacheValue(List<Column> schema, Table odpsTable, List<String> partitionColumnNames,
List<String> partitionSpecs, List<Column> partitionColumns, List<Type> partitionTypes) {
public MaxComputeSchemaCacheValue(List<Column> schema, Table odpsTable, TableIdentifier tableIdentifier,
List<String> partitionColumnNames, List<String> partitionSpecs, List<Column> partitionColumns,
List<Type> partitionTypes) {
super(schema);
this.odpsTable = odpsTable;
this.tableIdentifier = tableIdentifier;
this.partitionSpecs = partitionSpecs;
this.partitionColumnNames = partitionColumnNames;
this.partitionColumns = partitionColumns;
Expand Down
Loading