Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
deniskuzZ committed Oct 17, 2024
1 parent 39e3e39 commit e03de0e
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,16 @@ public boolean canProvideBasicStatistics() {
return true;
}

@Override
public boolean canProvidePartitionStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
Map<String, String> summary = table.currentSnapshot().summary();
if (summary != null) {
return Boolean.parseBoolean(summary.get(SnapshotSummary.PARTITION_SUMMARY_PROP));
}
return false;
}

@Override
public StorageFormatDescriptor getStorageFormatDescriptor(org.apache.hadoop.hive.metastore.api.Table table)
throws SemanticException {
Expand Down
5 changes: 2 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/ddl/DDLUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ public static void setColumnsAndStorePartitionTransformSpecOfTable(
HiveConf conf, Table tbl) {
Optional<List<FieldSchema>> cols = Optional.ofNullable(columns);
Optional<List<FieldSchema>> partCols = Optional.ofNullable(partitionColumns);
HiveStorageHandler storageHandler = tbl.getStorageHandler();

if (storageHandler != null && storageHandler.alwaysUnpartitioned()) {

if (tbl.alwaysUnpartitioned()) {
tbl.getSd().setCols(new ArrayList<>());
cols.ifPresent(c -> tbl.getSd().getCols().addAll(c));
if (partCols.isPresent() && !partCols.get().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,13 @@ private void getColumnsNoColumnPath(Table table, Partition partition, List<Field

// Fetch partition statistics only for describe extended or formatted.
if (desc.isExtended() || desc.isFormatted()) {
boolean disablePartitionStats = HiveConf.getBoolVar(context.getConf(), HiveConf.ConfVars.HIVE_DESCRIBE_PARTITIONED_TABLE_IGNORE_STATS);
boolean disablePartitionStats = table.alwaysUnpartitioned() ||
HiveConf.getBoolVar(context.getConf(), HiveConf.ConfVars.HIVE_DESCRIBE_PARTITIONED_TABLE_IGNORE_STATS);

if (table.isPartitioned() && partition == null && !disablePartitionStats) {
// No partition specified for partitioned table, lets fetch all.
Map<String, String> tblProps = table.getParameters() == null ?
new HashMap<String, String>() : table.getParameters();
new HashMap<>() : table.getParameters();

Map<String, Long> valueMap = new HashMap<>();
Map<String, Boolean> stateMap = new HashMap<>();
Expand All @@ -164,7 +166,7 @@ private void getColumnsNoColumnPath(Table table, Partition partition, List<Field
}

PartitionIterable partitions = new PartitionIterable(context.getDb(), table, null,
MetastoreConf.getIntVar(context.getConf(), MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX));
MetastoreConf.getIntVar(context.getConf(), MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX));
int numParts = 0;
for (Partition p : partitions) {
Map<String, String> partitionProps = p.getParameters();
Expand All @@ -190,7 +192,7 @@ private void getColumnsNoColumnPath(Table table, Partition partition, List<Field

private void getColumnDataColPathSpecified(Table table, Partition part, List<FieldSchema> cols,
List<ColumnStatisticsObj> colStats, Deserializer deserializer)
throws SemanticException, HiveException, MetaException {
throws HiveException, MetaException {
// when column name is specified in describe table DDL, colPath will be db_name.table_name.column_name
String colName = desc.getColumnPath().split("\\.")[2];
List<String> colNames = Lists.newArrayList(colName.toLowerCase());
Expand All @@ -199,7 +201,7 @@ private void getColumnDataColPathSpecified(Table table, Partition part, List<Fie
if (null == part) {
if (table.isPartitioned() && !table.alwaysUnpartitioned()) {
Map<String, String> tableProps = table.getParameters() == null ?
new HashMap<String, String>() : table.getParameters();
new HashMap<>() : table.getParameters();
if (table.isPartitionKey(colNames.get(0))) {
getColumnDataForPartitionKeyColumn(table, cols, colStats, colNames, tableProps);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private void addTruncateTableOutputs(ASTNode root, Table table, Map<String, Stri
}
} else {
if (AlterTableUtils.isFullPartitionSpec(table, partitionSpec)) {
if (table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned()) {
if (table.alwaysUnpartitioned()) {
table.getStorageHandler().validatePartSpec(table, partitionSpec);
try {
String partName = Warehouse.makePartName(partitionSpec, false);
Expand Down
4 changes: 2 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
Original file line number Diff line number Diff line change
Expand Up @@ -4301,7 +4301,7 @@ private List<Partition> getPartitionsWithAuth(Table tbl, Map<String, String> par
*/
public List<Partition> getPartitions(Table tbl, Map<String, String> partialPartSpec)
throws HiveException {
if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) {
if (tbl.alwaysUnpartitioned()) {
return tbl.getStorageHandler().getPartitions(tbl, partialPartSpec, false);
} else {
return getPartitions(tbl, partialPartSpec, (short)-1);
Expand Down Expand Up @@ -4554,7 +4554,7 @@ public boolean getPartitionsByExpr(Table tbl, ExprNodeDesc expr, HiveConf conf,
try {
Preconditions.checkNotNull(partitions);
String defaultPartitionName = HiveConf.getVar(conf, ConfVars.DEFAULT_PARTITION_NAME);
if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) {
if (tbl.alwaysUnpartitioned()) {
partitions.addAll(tbl.getStorageHandler().getPartitionsByExpr(tbl, expr));
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ default boolean canProvideBasicStatistics() {
return false;
}

default boolean canProvidePartitionStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
return false;
}

/**
* Return some col statistics (Lower bounds, Upper bounds, Null value counts, NaN, total counts) calculated by
* the underlying storage handler implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ private Collection<List<ColumnStatisticsObj>> verifyAndGetPartColumnStats(
private Long getRowCnt(
ParseContext pCtx, TableScanOperator tsOp, Table tbl) throws HiveException {
Long rowCnt = 0L;
if (tbl.isPartitioned() && !tbl.alwaysUnpartitioned()) {
if (tbl.isPartitioned() && StatsUtils.checkCanProvidePartitionStats(tbl)) {
for (Partition part : pctx.getPrunedPartitions(
tsOp.getConf().getAlias(), tsOp).getPartitions()) {
if (!StatsUtils.areBasicStatsUptoDateForQueryAnswering(part.getTable(), part.getParameters())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit
validatePartSpec(tableHandle, tmpPartSpec, ast, conf, false);

List<FieldSchema> parts = tableHandle.getPartitionKeys();
if (tableHandle.getStorageHandler() != null && tableHandle.getStorageHandler().alwaysUnpartitioned()) {
if (tableHandle.alwaysUnpartitioned()) {
partSpec = tmpPartSpec;
} else {
partSpec = new LinkedHashMap<>(partspec.getChildCount());
Expand All @@ -1269,7 +1269,7 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit
// check if the partition spec is valid
if (numDynParts > 0) {
int numStaPart;
if (tableHandle.getStorageHandler() != null && tableHandle.getStorageHandler().alwaysUnpartitioned()) {
if (tableHandle.alwaysUnpartitioned()) {
numStaPart = partSpec.size() - numDynParts;
} else {
numStaPart = parts.size() - numDynParts;
Expand All @@ -1281,7 +1281,7 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit

// Partitions in partSpec is already checked via storage handler.
// Hence no need to check for cases which are always unpartitioned.
if (tableHandle.getStorageHandler() == null || !tableHandle.getStorageHandler().alwaysUnpartitioned()) {
if (!tableHandle.alwaysUnpartitioned()) {
// check the partitions in partSpec be the same as defined in table schema
if (partSpec.keySet().size() != parts.size()) {
ErrorPartSpec(partSpec, parts);
Expand Down Expand Up @@ -1314,7 +1314,7 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit
partitions = db.getPartitions(tableHandle, partSpec);
} else {
// this doesn't create partition.
if (tableHandle.getStorageHandler() == null || !tableHandle.getStorageHandler().alwaysUnpartitioned()) {
if (!tableHandle.alwaysUnpartitioned()) {
partHandle = db.getPartition(tableHandle, partSpec, false);
}
if (partHandle == null) {
Expand Down Expand Up @@ -1706,7 +1706,7 @@ public static Map<String, String> getPartSpec(ASTNode node) {

public static void validatePartSpec(Table tbl, Map<String, String> partSpec,
ASTNode astNode, HiveConf conf, boolean shouldBeFull) throws SemanticException {
if (tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) {
if (tbl.alwaysUnpartitioned()) {
tbl.getStorageHandler().validatePartSpec(tbl, partSpec, Context.RewritePolicy.get(conf));
} else {
tbl.validatePartColumnNames(partSpec, shouldBeFull);
Expand All @@ -1725,7 +1725,7 @@ public static void validatePartSpec(Table tbl, Map<String, String> partSpec,
* @param partitionClausePresent Whether a partition clause is present in the query (e.g. PARTITION(last_name='Don'))
*/
protected static void validateUnsupportedPartitionClause(Table tbl, boolean partitionClausePresent) {
if (partitionClausePresent && tbl.getStorageHandler() != null && tbl.getStorageHandler().alwaysUnpartitioned()) {
if (partitionClausePresent && tbl.alwaysUnpartitioned()) {
throw new UnsupportedOperationException("Using partition spec in query is unsupported for non-native table" +
" backed by: " + tbl.getStorageHandler().toString());
}
Expand Down
4 changes: 2 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,8 @@ public static Map<Integer, List<ExprNodeGenericFuncDesc>> getFullPartitionSpecs(
CommonTree ast, Table table, Configuration conf, boolean canGroupExprs) throws SemanticException {
String defaultPartitionName = HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULT_PARTITION_NAME);
Map<String, String> colTypes = new HashMap<>();
List<FieldSchema> partitionKeys = table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned() ?
table.getStorageHandler().getPartitionKeys(table) : table.getPartitionKeys();
List<FieldSchema> partitionKeys = table.alwaysUnpartitioned() ?
table.getStorageHandler().getPartitionKeys(table) : table.getPartitionKeys();
for (FieldSchema fs : partitionKeys) {
colTypes.put(fs.getName().toLowerCase(), fs.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private static String normalizeColName(String colName) {
protected void updateOutputs(Table targetTable) {
markReadEntityForUpdate();

if (targetTable.isPartitioned()) {
if (targetTable.isPartitioned() && !targetTable.alwaysUnpartitioned()) {
List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
if (!partitionsRead.isEmpty()) {
// if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
Expand Down
21 changes: 10 additions & 11 deletions ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,7 @@ private static Statistics collectStatistics(HiveConf conf, PrunedPartitionList p
boolean estimateStats = HiveConf.getBoolVar(conf, ConfVars.HIVE_STATS_ESTIMATE_STATS);
boolean metaTable = table.getMetaTable() != null;

if (!table.isPartitioned()) {

if (!table.isPartitioned() || !checkCanProvidePartitionStats(table)) {
Factory basicStatsFactory = new BasicStats.Factory();

if (estimateStats) {
Expand Down Expand Up @@ -2035,29 +2034,29 @@ public static Range combineRange(Range range1, Range range2) {
}

public static boolean checkCanProvideStats(Table table) {
if (MetaStoreUtils.isExternalTable(table.getTTable())) {
if (MetaStoreUtils.isNonNativeTable(table.getTTable()) && table.getStorageHandler().canProvideBasicStatistics()) {
return true;
}
return false;
}
return true;
return !MetaStoreUtils.isExternalTable(table.getTTable()) || MetaStoreUtils.isNonNativeTable(table.getTTable())
&& table.getStorageHandler().canProvideBasicStatistics();
}

public static boolean checkCanProvidePartitionStats(Table table) {
return !MetaStoreUtils.isExternalTable(table.getTTable()) || MetaStoreUtils.isNonNativeTable(table.getTTable())
&& table.getStorageHandler().canProvidePartitionStatistics(table);
}

/**
* Are the basic stats for the table up-to-date for query planning.
* Can run additional checks compared to the version in StatsSetupConst.
*/
public static boolean areBasicStatsUptoDateForQueryAnswering(Table table, Map<String, String> params) {
return checkCanProvideStats(table) == true ? StatsSetupConst.areBasicStatsUptoDate(params) : false;
return checkCanProvideStats(table) && StatsSetupConst.areBasicStatsUptoDate(params);
}

/**
* Are the column stats for the table up-to-date for query planning.
* Can run additional checks compared to the version in StatsSetupConst.
*/
public static boolean areColumnStatsUptoDateForQueryAnswering(Table table, Map<String, String> params, String colName) {
return checkCanProvideStats(table) == true ? StatsSetupConst.areColumnStatsUptoDate(params, colName) : false;
return checkCanProvideStats(table) && StatsSetupConst.areColumnStatsUptoDate(params, colName);
}

/**
Expand Down

0 comments on commit e03de0e

Please sign in to comment.