Skip to content

Commit

Permalink
HIVE-28537 Iceberg: allow only partition columns in the WHERE clause
Browse files Browse the repository at this point in the history
Change-Id: Ic85efd70599413cdb96073c6cb50690fbc1c11b0
  • Loading branch information
zratkai committed Oct 16, 2024
1 parent 13bec9b commit 62b9eaa
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 30 deletions.
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ public enum ErrorMsg {
NONICEBERG_COMPACTION_WITH_FILTER_NOT_SUPPORTED(10440, "Compaction with filter is not allowed on non-Iceberg table {0}.{1}", true),
ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true),
COMPACTION_THREAD_INITIALIZATION(10442, "Compaction thread failed during initialization", false),
ALTER_TABLE_COMPACTION_NON_PARTITIONED_COLUMN_NOT_ALLOWED(10443, "Filter expression can contain only partition columns."),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,18 @@ public List<String> getPartitionNames(org.apache.hadoop.hive.ql.metadata.Table h
return IcebergTableUtil.getPartitionNames(icebergTable, partitionSpec, true);
}

/**
* Returns a list of partition key names
* @param hmsTable A Hive table instance.
* @return A list of partition names for all partitions
*/
@Override
public List<String> getAllPartitionKeyNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return IcebergTableUtil.getAllPartitionKeys(icebergTable).stream()
.map(fieldSchema -> fieldSchema.getName()).distinct().collect(Collectors.toList());
}

/**
* A function to fetch the column information of the underlying column defined by the table format.
* @param hmsTable A Hive table instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ public static List<FieldSchema> getPartitionKeys(Table table, int specId) {
String.format("Transform: %s", partField.transform().toString()))).collect(Collectors.toList());
}

public static List<FieldSchema> getAllPartitionKeys(Table table) {
return table.specs().keySet().stream().flatMap(id -> getPartitionKeys(table, id).stream())
.distinct().collect(Collectors.toList());
}
public static List<PartitionField> getPartitionFields(Table table) {
return table.specs().values().stream().flatMap(spec -> spec.fields()
.stream()).distinct().collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc;

insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222);
insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444);
insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666);

alter table iceberg_orc_compaction COMPACT 'major' and wait where c in ('text1', 'text2');
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,26 @@ delete from ice_orc where last_name in ('ln1', 'ln9');
delete from ice_orc where last_name in ('ln3', 'ln11');
delete from ice_orc where last_name in ('ln5', 'ln13');

alter table ice_orc set partition spec(team_id);
insert into ice_orc VALUES
('fn17', 'ln17', 1, 10, 100),
('fn18','ln18', 1, 10, 100);
insert into ice_orc VALUES
('fn19','ln19', 2, 11, 100),
('fn20','ln20', 2, 11, 100);
insert into ice_orc VALUES
('fn21','ln21', 3, 12, 100),
('fn22','ln22', 3, 12, 100);
insert into ice_orc VALUES
('fn23','ln23', 4, 13, 100),
('fn24','ln24', 4, 13, 100);


select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15');
alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15');
explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2);
alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2);

select * from ice_orc;
describe formatted ice_orc;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
PREHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
FAILED: SemanticException [Error 10443]: Filter expression can contain only partition columns.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,61 @@ POSTHOOK: query: delete from ice_orc where last_name in ('ln5', 'ln13')
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
PREHOOK: query: alter table ice_orc set partition spec(team_id)
PREHOOK: type: ALTERTABLE_SETPARTSPEC
PREHOOK: Input: default@ice_orc
POSTHOOK: query: alter table ice_orc set partition spec(team_id)
POSTHOOK: type: ALTERTABLE_SETPARTSPEC
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn17', 'ln17', 1, 10, 100),
('fn18','ln18', 1, 10, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn17', 'ln17', 1, 10, 100),
('fn18','ln18', 1, 10, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn19','ln19', 2, 11, 100),
('fn20','ln20', 2, 11, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn19','ln19', 2, 11, 100),
('fn20','ln20', 2, 11, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn21','ln21', 3, 12, 100),
('fn22','ln22', 3, 12, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn21','ln21', 3, 12, 100),
('fn22','ln22', 3, 12, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn23','ln23', 4, 13, 100),
('fn24','ln24', 4, 13, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn23','ln23', 4, 13, 100),
('fn24','ln24', 4, 13, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: select * from ice_orc
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
Expand All @@ -162,7 +217,15 @@ fn12 ln12 2 11 100
fn14 ln14 3 12 100
fn15 ln15 4 13 100
fn16 ln16 4 13 100
fn17 ln17 1 10 100
fn18 ln18 1 10 100
fn19 ln19 2 11 100
fn2 ln2 1 10 100
fn20 ln20 2 11 100
fn21 ln21 3 12 100
fn22 ln22 3 12 100
fn23 ln23 4 13 100
fn24 ln24 4 13 100
fn4 ln4 2 11 100
fn6 ln6 3 12 100
fn7 ln7 4 13 100
Expand All @@ -182,8 +245,7 @@ company_id bigint

# Partition Transform Information
# col_name transform_type
company_id IDENTITY
dept_id IDENTITY
team_id IDENTITY

# Detailed Table Information
Database: default
Expand All @@ -192,24 +254,24 @@ Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"}
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"company_id\":\"true\",\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\",\"team_id\":\"true\"}}
EXTERNAL TRUE
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-position-delete-files\":\"2\",\"added-delete-files\":\"2\",\"added-files-size\":\"#Masked#\",\"added-position-deletes\":\"2\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"6\",\"total-position-deletes\":\"6\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"2\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"24\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"12\",\"total-delete-files\":\"6\",\"total-position-deletes\":\"6\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
default-partition-spec {\"spec-id\":2,\"fields\":[{\"name\":\"team_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1002}]}
format-version 2
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 8
numRows 10
numFiles 12
numRows 18
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 11
snapshot-count 15
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -226,11 +288,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -246,11 +308,11 @@ STAGE PLANS:
table name: default.ice_orc
blocking: true

PREHOOK: query: alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
PREHOOK: query: alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -267,7 +329,15 @@ fn12 ln12 2 11 100
fn14 ln14 3 12 100
fn15 ln15 4 13 100
fn16 ln16 4 13 100
fn17 ln17 1 10 100
fn18 ln18 1 10 100
fn19 ln19 2 11 100
fn2 ln2 1 10 100
fn20 ln20 2 11 100
fn21 ln21 3 12 100
fn22 ln22 3 12 100
fn23 ln23 4 13 100
fn24 ln24 4 13 100
fn4 ln4 2 11 100
fn6 ln6 3 12 100
fn7 ln7 4 13 100
Expand All @@ -287,8 +357,7 @@ company_id bigint

# Partition Transform Information
# col_name transform_type
company_id IDENTITY
dept_id IDENTITY
team_id IDENTITY

# Detailed Table Information
Database: default
Expand All @@ -302,19 +371,19 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-data-files\":\"4\",\"deleted-data-files\":\"4\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"5\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"5\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"1\",\"total-position-deletes\":\"1\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-summary {\"added-data-files\":\"4\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"6\",\"removed-delete-files\":\"6\",\"added-records\":\"10\",\"deleted-records\":\"16\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"6\",\"changed-partition-count\":\"9\",\"total-records\":\"18\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
default-partition-spec {\"spec-id\":2,\"fields\":[{\"name\":\"team_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1002}]}
format-version 2
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 8
numRows 10
numRows 18
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 15
snapshot-count 20
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -336,7 +405,8 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=4 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=10 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=11 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=12 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=13 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.RowResolver;
Expand Down Expand Up @@ -93,8 +95,10 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
case HiveParser.TOK_WHERE:
RowResolver rwsch = new RowResolver();
Map<String, String> colTypes = new HashMap<>();
Table table;
try {
for (FieldSchema fs : getDb().getTable(tableName).getCols()) {
table = getDb().getTable(tableName);
for (FieldSchema fs : table.getCols()) {
TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(fs.getType());
rwsch.put(tableName.getTable(), fs.getName(),
new ColumnInfo(fs.getName(), columnType, null, true));
Expand All @@ -106,6 +110,9 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
TypeCheckCtx tcCtx = new TypeCheckCtx(rwsch);
ASTNode conds = (ASTNode) node.getChild(0);
filterExpr = ExprNodeTypeCheck.genExprNode(conds, tcCtx).get(conds);
if (!PartitionPruner.onlyContainsPartnCols(table, filterExpr)) {
throw new SemanticException(ErrorMsg.ALTER_TABLE_COMPACTION_NON_PARTITIONED_COLUMN_NOT_ALLOWED);
}
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,11 @@ default List<FieldSchema> getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Ta
"for a table.");
}

default List<String> getAllPartitionKeyNames(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
throw new UnsupportedOperationException("Storage handler does not support getting partition names " +
"for a table.");
}

/**
* Returns a list of partitions with the latest partition spec which contain any files whose content falls under
* the provided filter condition.
Expand Down
Loading

0 comments on commit 62b9eaa

Please sign in to comment.