diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java index f6a92a5add..968d79041a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageService.java @@ -17,6 +17,8 @@ import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.helix.SafeHelixDataAccessor; +import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.PersistenceType; @@ -42,6 +44,8 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.RocksDBException; @@ -371,6 +375,32 @@ public synchronized AbstractStorageEngine openStore( return engine; } + public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) { + if (manager == null) { + return; + } + for (AbstractStorageEngine storageEngine: getStorageEngineRepository().getAllLocalStorageEngines()) { + String storeName = storageEngine.getStoreVersionName(); + Set storageEnginePartitionIds = new HashSet<>(storageEngine.getPartitionIds()); + String instanceHostName = manager.getInstanceName(); + PropertyKey.Builder propertyKeyBuilder = + new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName()); + SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); + IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName)); + + if (idealState != null) { + Map> mapFields = idealState.getRecord().getMapFields(); + for (Integer partitionId: storageEnginePartitionIds) { + String partitionDbName = storeName + "_" + partitionId; + if (!mapFields.containsKey(partitionDbName) + || !mapFields.get(partitionDbName).containsKey(instanceHostName)) { + storageEngine.dropPartition(partitionId); + } + } + } + } + } + /** * Drops the partition of the specified store version in the storage service. When all data partitions are dropped, * it will also drop the storage engine of the specific store version. diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java index a86a4879be..6f163afff5 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/StorageServiceTest.java @@ -1,11 +1,16 @@ package com.linkedin.davinci.storage; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.linkedin.davinci.config.VeniceClusterConfig; import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -14,6 +19,8 @@ import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.StorageEngineFactory; import com.linkedin.venice.exceptions.VeniceNoStoreException; +import com.linkedin.venice.helix.SafeHelixDataAccessor; +import com.linkedin.venice.helix.SafeHelixManager; import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.PartitionerConfig; @@ -23,13 +30,21 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.utils.Utils; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.mockito.internal.util.collections.Sets; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Test; @@ -121,4 +136,74 @@ public void testGetStoreAndUserPartitionsMapping() { expectedMapping.put(resourceName, partitionSet); Assert.assertEquals(storageService.getStoreAndUserPartitionsMapping(), expectedMapping); } + + @Test + public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFieldException, IllegalAccessException { + StorageService mockStorageService = mock(StorageService.class); + SafeHelixManager manager = mock(SafeHelixManager.class); + StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class); + AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class); + mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine); + + String resourceName = "test_store_v1"; + String storeName = "test_store"; + + when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName); + abstractStorageEngine.addStoragePartition(0); + abstractStorageEngine.addStoragePartition(1); + + String clusterName = "test_cluster"; + VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class); + VeniceClusterConfig mockClusterConfig = mock(VeniceClusterConfig.class); + when(mockVeniceConfigLoader.getVeniceClusterConfig()).thenReturn(mockClusterConfig); + when(mockVeniceConfigLoader.getVeniceClusterConfig().getClusterName()).thenReturn(clusterName); + + List localStorageEngines = new ArrayList<>(); + localStorageEngines.add(abstractStorageEngine); + + SafeHelixDataAccessor helixDataAccessor = mock(SafeHelixDataAccessor.class); + when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor); + IdealState idealState = mock(IdealState.class); + when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState); + ZNRecord record = new ZNRecord("testId"); + Map> mapFields = new HashMap<>(); + Map testPartitionZero = new HashMap<>(); + Map testPartitionOne = new HashMap<>(); + testPartitionZero.put("host_1430", "LEADER"); + testPartitionZero.put("host_1435", "STANDBY"); + testPartitionZero.put("host_1440", "STANDBY"); + testPartitionOne.put("host_1520", "LEADER"); + testPartitionOne.put("host_1525", "STANDBY"); + testPartitionOne.put("host_1530", "STANDBY"); + mapFields.put("test_store_v1_0", testPartitionZero); + mapFields.put("test_store_v1_1", testPartitionOne); + record.setMapFields(mapFields); + when(idealState.getRecord()).thenReturn(record); + when(manager.getInstanceName()).thenReturn("host_1520"); + + Set partitionSet = new HashSet<>(Arrays.asList(0, 1)); + when(abstractStorageEngine.getPartitionIds()).thenReturn(partitionSet); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + int partitionId = invocation.getArgument(0); + abstractStorageEngine.getPartitionIds().remove(partitionId); + return null; + } + }).when(abstractStorageEngine).dropPartition(anyInt()); + + Field storageEngineRepositoryField = StorageService.class.getDeclaredField("storageEngineRepository"); + storageEngineRepositoryField.setAccessible(true); + storageEngineRepositoryField.set(mockStorageService, mockStorageEngineRepository); + when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository); + when(mockStorageService.getStorageEngineRepository().getAllLocalStorageEngines()).thenReturn(localStorageEngines); + Field configLoaderField = StorageService.class.getDeclaredField("configLoader"); + configLoaderField.setAccessible(true); + configLoaderField.set(mockStorageService, mockVeniceConfigLoader); + + doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + verify(abstractStorageEngine).dropPartition(0); + Assert.assertFalse(abstractStorageEngine.getPartitionIds().contains(0)); + } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index 0314e0f194..1a8e06e644 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -355,6 +355,11 @@ private List createServices() { return helixData; }); + managerFuture.thenApply(manager -> { + storageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager); + return true; + }); + heartbeatMonitoringService = new HeartbeatMonitoringService( metricsRepository, metadataRepo,