Skip to content

Commit

Permalink
HIVE-28533: Fix compaction with custom pools (Dmitriy Fingerman, revi…
Browse files Browse the repository at this point in the history
…ewed by Denys Kuzmenko)

Closes #5466
  • Loading branch information
difin authored Oct 8, 2024
1 parent ea8aa34 commit 13bec9b
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 56 deletions.
3 changes: 2 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public class Constants {
public static final String ORC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
public static final String ORC_OUTPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";

public static final Pattern COMPACTION_POOLS_PATTERN = Pattern.compile("hive\\.compactor\\.worker\\.(.*)\\.threads");
public static final Pattern COMPACTION_POOLS_PATTERN = Pattern.compile("^hive\\.compactor\\.worker\\.(.+)\\.threads$");
public static final String COMPACTION_DEFAULT_POOL = "default";
public static final String HIVE_COMPACTOR_WORKER_POOL = "hive.compactor.worker.pool";

public static final String HTTP_HEADER_REQUEST_TRACK = "X-Request-ID";
Expand Down
12 changes: 0 additions & 12 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -7245,15 +7244,4 @@ public void syncFromConf(HiveConf conf) {
set(e.getKey(), e.getValue());
}
}

public List<Map.Entry<String, String>> getMatchingEntries(Pattern regex) {
List<Map.Entry<String, String>> matchingEntries = new ArrayList<>();
for (Map.Entry<String, String> entry : this) {
Matcher matcher = regex.matcher(entry.getKey());
if (matcher.matches()) {
matchingEntries.add(new AbstractMap.SimpleEntry<>(entry.getKey(), matcher.group(0)));
}
}
return matchingEntries;
}
}
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public enum ErrorMsg {
UNEXPECTED_PARTITION_TRANSFORM_SPEC(10437, "Partition transforms are only supported by Iceberg storage handler", true),
NONICEBERG_COMPACTION_WITH_FILTER_NOT_SUPPORTED(10440, "Compaction with filter is not allowed on non-Iceberg table {0}.{1}", true),
ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true),
COMPACTION_THREAD_INITIALIZATION(10442, "Compaction thread failed during initialization", false),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
Expand All @@ -27,7 +26,6 @@
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.TxnCommandsBaseForTests;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -36,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.io.*;

import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactorBase.executeStatementOnDriver;

Expand Down Expand Up @@ -171,4 +168,25 @@ public void testShowCompactionsRespectPoolName() throws Exception {
Assert.assertTrue(p.matcher(results.get(1).toString()).matches());
}

@Test
public void testCompactionWithCustomPool() throws Exception {
String poolName = "pool1";
conf.setInt(String.format("hive.compactor.worker.%s.threads", poolName), 1);

Map<String, String> properties = new HashMap<>();
properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, poolName);
provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties);
provider.insertTestData(DEFAULT_TABLE_NAME, false);

TxnCommandsBaseForTests.runInitiator(conf);

checkCompactionRequest("initiated", poolName);

Map<String, Integer> customPools = CompactorUtil.getPoolConf(conf);
Assert.assertEquals(1, customPools.size());
Assert.assertEquals(Integer.valueOf(1), customPools.get(poolName));

TxnCommandsBaseForTests.runWorker(conf, poolName);
checkCompactionRequest("ready for cleaning", poolName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.ql.ErrorMsg;

@InterfaceAudience.Public
@InterfaceStability.Stable
public class CompactionException extends RuntimeException {

/**
* Standard predefined message with error code and possibly SQL State, etc.
*/
private final ErrorMsg canonicalErrorMsg;

/**
* Error Messages returned from remote exception (eg. hadoop error)
*/
private final String remoteErrorMsg;

public CompactionException() {
this(null, null, ErrorMsg.GENERIC_ERROR);
}

public CompactionException(String message) {
this(message, null);
}

public CompactionException(Throwable cause) {
this(cause, null, ErrorMsg.GENERIC_ERROR);
}

public CompactionException(String message, Throwable cause) {
super(message, cause);
canonicalErrorMsg = ErrorMsg.GENERIC_ERROR;
remoteErrorMsg = null;
}

public CompactionException(ErrorMsg message, String... msgArgs) {
this(null, null, message, msgArgs);
}

public CompactionException(Throwable cause, ErrorMsg errorMsg, String... msgArgs) {
this(cause, null, errorMsg, msgArgs);
}

public CompactionException(Throwable cause, ErrorMsg errorMsg) {
this(cause, null, errorMsg);
}

public CompactionException(ErrorMsg errorMsg) {
this(null, null, errorMsg);
}

/**
* This is the recommended constructor to use since it helps use
* canonical messages throughout and propagate remote errors.
*
* @param errorMsg Canonical error message
* @param msgArgs message arguments if message is parametrized; must be {@code null} is message takes no arguments
*/
public CompactionException(Throwable cause, String remErrMsg, ErrorMsg errorMsg, String... msgArgs) {
super(errorMsg.format(msgArgs), cause);
canonicalErrorMsg = errorMsg;
remoteErrorMsg = remErrMsg;
}

/**
* @return {@link ErrorMsg#GENERIC_ERROR} by default
*/
public ErrorMsg getCanonicalErrorMsg() {
return canonicalErrorMsg;
}

public String getRemoteErrorMsg() { return remoteErrorMsg; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;

import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,11 +112,14 @@ protected String tableName(Table t) {
return Warehouse.getQualifiedName(t);
}

public static void initializeAndStartThread(CompactorThread thread,
Configuration conf) throws Exception {
public static void initializeAndStartThread(CompactorThread thread, Configuration conf) {
LOG.info("Starting compactor thread of type " + thread.getClass().getName());
thread.setConf(conf);
thread.init(new AtomicBoolean());
try {
thread.init(new AtomicBoolean());
} catch (Exception e) {
throw new CompactionException(e, ErrorMsg.COMPACTION_THREAD_INITIALIZATION);
}
thread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
Expand Down Expand Up @@ -76,6 +79,7 @@
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

import static java.lang.String.format;
Expand Down Expand Up @@ -549,4 +553,15 @@ public static CompactionResponse initiateCompactionForPartition(Table table, Par
}
return compactionResponse;
}

public static Map<String, Integer> getPoolConf(HiveConf hiveConf) {
Map<String, Integer> poolConf = Maps.newHashMap();
for (Map.Entry<String, String> entry : hiveConf) {
Matcher matcher = Constants.COMPACTION_POOLS_PATTERN.matcher(entry.getKey());
if (matcher.matches()) {
poolConf.put(matcher.group(1), NumberUtils.toInt(entry.getValue(), 0));
}
}
return poolConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
Expand Down Expand Up @@ -246,12 +247,18 @@ public static void runInitiator(HiveConf hiveConf) throws Exception {
public static void runWorker(HiveConf hiveConf) throws Exception {
runCompactorThread(hiveConf, CompactorThreadType.WORKER);
}
public static void runWorker(HiveConf hiveConf, String poolName) throws Exception {
runCompactorThread(hiveConf, CompactorThreadType.WORKER, poolName);
}
public static void runCleaner(HiveConf hiveConf) throws Exception {
// Wait for the cooldown period so the Cleaner can see the last committed txn as the highest committed watermark
Thread.sleep(MetastoreConf.getTimeVar(hiveConf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
runCompactorThread(hiveConf, CompactorThreadType.CLEANER);
}
private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType type)
private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType type) throws Exception {
runCompactorThread(hiveConf, type, Constants.COMPACTION_DEFAULT_POOL);
}
private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType type, String poolName)
throws Exception {
AtomicBoolean stop = new AtomicBoolean(true);
CompactorThread t;
Expand All @@ -261,6 +268,9 @@ private static void runCompactorThread(HiveConf hiveConf, CompactorThreadType ty
break;
case WORKER:
t = new Worker();
if (poolName != null && !poolName.equals(Constants.COMPACTION_DEFAULT_POOL)) {
((Worker)t).setPoolName(poolName);
}
break;
case CLEANER:
t = new Cleaner();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.Assert;
import org.junit.Test;

import java.util.Map;

public class TestCompactorUtil {

@Test
public void testGetPoolConf() {
HiveConf conf = new HiveConf();
conf.setInt("hive.compactor.worker.iceberg1.threads", 4);
conf.setInt("hive.compactor.worker.iceberg2", 4);
conf.setInt("hive.compactor.worker.iceberg3.thread.zzz", 4);
conf.setInt("aaa.hive.compactor.worker.iceberg4.threads", 4);

Map<String, Integer> entries = CompactorUtil.getPoolConf(conf);

Assert.assertEquals(1, entries.size());
Assert.assertEquals(Integer.valueOf(4), entries.get("iceberg1"));
}
}
Loading

0 comments on commit 13bec9b

Please sign in to comment.