Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[venice-server] Dropping unassigned partitions #1196

Draft
wants to merge 35 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cc1d7ff
Initial commit
kristyelee Sep 12, 2024
c4b6682
First test for removal of partition
kristyelee Sep 17, 2024
8093581
Write function to remove unsubscribed (unassigned) partitions
kristyelee Sep 18, 2024
c8ebe2d
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 18, 2024
3eaeafb
Write test function to test removing unsubscribed (unassigned) partit…
kristyelee Sep 18, 2024
0e24cec
Update StorageEngine intializer in VeniceServer
kristyelee Sep 23, 2024
ff6a165
StorageService arguments set to initial state in VeniceServer
kristyelee Sep 24, 2024
01ca67f
Standardize code
kristyelee Sep 24, 2024
fd14574
Standardize code
kristyelee Sep 24, 2024
56e7711
Update ideal state
kristyelee Sep 24, 2024
04c4e9e
Update initialized StorageService object [with revised import]
kristyelee Sep 25, 2024
e09f7bf
[Placeholder]
kristyelee Sep 25, 2024
6c50513
Updated StorageService constructor and initializer with functionToChe…
kristyelee Sep 27, 2024
06dd6c2
Updated StorageService constructor and initializer [modified]
kristyelee Sep 27, 2024
53d177c
Revised StorageService constructor
kristyelee Sep 27, 2024
d8bad05
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 27, 2024
df1c0b9
Code restructure: verifying storage partition
kristyelee Sep 30, 2024
54d8708
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 1, 2024
14924e1
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Oct 1, 2024
eb5dd89
[Commented modified code]
kristyelee Oct 2, 2024
d55f704
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 2, 2024
3ffd26d
Code restructure
kristyelee Oct 2, 2024
c63d62f
Retain relevant/used code changes
kristyelee Oct 3, 2024
3979b63
StorageService unit test
kristyelee Oct 7, 2024
b926839
Updates to StorageService unit test
kristyelee Oct 7, 2024
9a8b968
StorageService + unit test update
kristyelee Oct 8, 2024
a32b55c
Update to StorageService unit test
kristyelee Oct 10, 2024
8b27c8d
Apply review comments and code addition for hostname comparison.
kristyelee Oct 11, 2024
3431081
Apply review comments
kristyelee Oct 12, 2024
528f7a9
Apply review comments
kristyelee Oct 14, 2024
3a254ac
Code revision
kristyelee Oct 14, 2024
d86a064
Compare hostname for resource/partition name.
kristyelee Oct 15, 2024
f71bb24
Code revision
kristyelee Oct 15, 2024
377770f
Code revision
kristyelee Oct 16, 2024
2ef3419
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ public Instance getInstance() {
return instance;
}

public SafeHelixManager getHelixManager() {
return helixManager;
}

public VeniceOfflinePushMonitorAccessor getVeniceOfflinePushMonitorAccessor() {
return veniceOfflinePushMonitorAccessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.acl.StaticAccessController;
import com.linkedin.venice.cleaner.BackupVersionOptimizationService;
Expand All @@ -42,10 +43,12 @@
import com.linkedin.venice.listener.ServerReadMetadataRepository;
import com.linkedin.venice.listener.ServerStoreAclHandler;
import com.linkedin.venice.listener.StoreValueSchemasCacheService;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.StaticClusterInfoProvider;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.security.SSLFactory;
Expand All @@ -66,11 +69,16 @@
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -310,14 +318,20 @@ private List<AbstractVeniceService> createServices() {
? new RocksDBMemoryStats(metricsRepository, "RocksDBMemoryStats", plainTableEnabled)
: null;

boolean whetherToRestoreDataPartitions = !isIsolatedIngestion()
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
|| veniceConfigLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists();

// Create and add StorageService. storeRepository will be populated by StorageService
storageService = new StorageService(
veniceConfigLoader,
storageEngineStats,
rocksDBMemoryStats,
storeVersionStateSerializer,
partitionStateSerializer,
metadataRepo);
metadataRepo,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot());
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
storageEngineMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
services.add(storageEngineMetadataService);
Expand Down Expand Up @@ -586,6 +600,10 @@ public void start() throws VeniceException {
service.start();
}

for (AbstractStorageEngine storageEngine: storageService.getStorageEngineRepository().getAllLocalStorageEngines()) {
kvargha marked this conversation as resolved.
Show resolved Hide resolved
functionToCheckWhetherStoragePartitionsShouldBeKeptOrNot().apply(storageEngine);
}

for (ServiceDiscoveryAnnouncer serviceDiscoveryAnnouncer: serviceDiscoveryAnnouncers) {
LOGGER.info("Registering to service discovery: {}", serviceDiscoveryAnnouncer);
serviceDiscoveryAnnouncer.register();
Expand Down Expand Up @@ -693,6 +711,40 @@ protected VeniceConfigLoader getConfigLoader() {
return veniceConfigLoader;
}

protected final boolean isIsolatedIngestion() {
return veniceConfigLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED);
}

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
return storageEngineName -> true;
}

private Function<AbstractStorageEngine, Void> functionToCheckWhetherStoragePartitionsShouldBeKeptOrNot() {
return storageEngine -> {
String storageEngineName = storageEngine.toString();
String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);
PropertyKey.Builder propertyKeyBuilder =
new PropertyKey.Builder(veniceConfigLoader.getVeniceClusterConfig().getClusterName());
IdealState idealState = getHelixParticipationService().getHelixManager()
.getHelixDataAccessor()
.getProperty(propertyKeyBuilder.idealStates(storeName));

Set<Integer> idealStatePartitionIds = new HashSet<>();
idealState.getPartitionSet().stream().forEach(partitionId -> {
idealStatePartitionIds.add(Integer.parseInt(partitionId));
});
Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();

for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
if (idealStatePartitionIds.contains(storageEnginePartitionId)) {
continue;
}
storageEngine.dropPartition(storageEnginePartitionId);
}
return null;
};
}

public MetricsRepository getMetricsRepository() {
return metricsRepository;
}
Expand Down
Loading