Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[venice-server] Dropping unassigned partitions #1196

Draft
wants to merge 35 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cc1d7ff
Initial commit
kristyelee Sep 12, 2024
c4b6682
First test for removal of partition
kristyelee Sep 17, 2024
8093581
Write function to remove unsubscribed (unassigned) partitions
kristyelee Sep 18, 2024
c8ebe2d
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 18, 2024
3eaeafb
Write test function to test removing unsubscribed (unassigned) partit…
kristyelee Sep 18, 2024
0e24cec
Update StorageEngine intializer in VeniceServer
kristyelee Sep 23, 2024
ff6a165
StorageService arguments set to initial state in VeniceServer
kristyelee Sep 24, 2024
01ca67f
Standardize code
kristyelee Sep 24, 2024
fd14574
Standardize code
kristyelee Sep 24, 2024
56e7711
Update ideal state
kristyelee Sep 24, 2024
04c4e9e
Update initialized StorageService object [with revised import]
kristyelee Sep 25, 2024
e09f7bf
[Placeholder]
kristyelee Sep 25, 2024
6c50513
Updated StorageService constructor and initializer with functionToChe…
kristyelee Sep 27, 2024
06dd6c2
Updated StorageService constructor and initializer [modified]
kristyelee Sep 27, 2024
53d177c
Revised StorageService constructor
kristyelee Sep 27, 2024
d8bad05
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 27, 2024
df1c0b9
Code restructure: verifying storage partition
kristyelee Sep 30, 2024
54d8708
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 1, 2024
14924e1
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Oct 1, 2024
eb5dd89
[Commented modified code]
kristyelee Oct 2, 2024
d55f704
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 2, 2024
3ffd26d
Code restructure
kristyelee Oct 2, 2024
c63d62f
Retain relevant/used code changes
kristyelee Oct 3, 2024
3979b63
StorageService unit test
kristyelee Oct 7, 2024
b926839
Updates to StorageService unit test
kristyelee Oct 7, 2024
9a8b968
StorageService + unit test update
kristyelee Oct 8, 2024
a32b55c
Update to StorageService unit test
kristyelee Oct 10, 2024
8b27c8d
Apply review comments and code addition for hostname comparison.
kristyelee Oct 11, 2024
3431081
Apply review comments
kristyelee Oct 12, 2024
528f7a9
Apply review comments
kristyelee Oct 14, 2024
3a254ac
Code revision
kristyelee Oct 14, 2024
d86a064
Compare hostname for resource/partition name.
kristyelee Oct 15, 2024
f71bb24
Code revision
kristyelee Oct 15, 2024
377770f
Code revision
kristyelee Oct 16, 2024
2ef3419
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public class StorageService extends AbstractVeniceService {
* @param restoreDataPartitions indicates if store data needs to be restored.
* @param restoreMetadataPartitions indicates if meta data needs to be restored.
* @param checkWhetherStorageEngineShouldBeKeptOrNot check whether the local storage engine should be kept or not.
* @param checkWhetherStoragePartitionsShouldBeKeptOrNot check whether the partition is assigned and thus should be kept or not.
*/
StorageService(
VeniceConfigLoader configLoader,
Expand All @@ -89,7 +88,6 @@ public class StorageService extends AbstractVeniceService {
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionsShouldBeKeptOrNot,
Optional<Map<PersistenceType, StorageEngineFactory>> persistenceTypeToStorageEngineFactoryMapOptional) {
String dataPath = configLoader.getVeniceServerConfig().getDataBasePath();
if (!Utils.directoryExists(dataPath)) {
Expand Down Expand Up @@ -124,97 +122,10 @@ public class StorageService extends AbstractVeniceService {
configLoader,
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot,
checkWhetherStoragePartitionsShouldBeKeptOrNot);
checkWhetherStorageEngineShouldBeKeptOrNot);
}
}

/**
* Allocates a new {@code StorageService} object.
* @param configLoader a config loader to load configs related to cluster and server.
* @param storageEngineStats storage engine related stats.
* @param rocksDBMemoryStats RocksDB memory consumption stats.
* @param storeVersionStateSerializer serializer for translating a store-version level state into avro-format.
* @param partitionStateSerializer serializer for translating a partition state into avro-format.
* @param storeRepository supports readonly operations to access stores
* @param restoreDataPartitions indicates if store data needs to be restored.
* @param restoreMetadataPartitions indicates if meta data needs to be restored.
* @param checkWhetherStorageEngineShouldBeKeptOrNot check whether the local storage engine should be kept or not.
*/
StorageService(
VeniceConfigLoader configLoader,
AggVersionedStorageEngineStats storageEngineStats,
RocksDBMemoryStats rocksDBMemoryStats,
InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
ReadOnlyStoreRepository storeRepository,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Optional<Map<PersistenceType, StorageEngineFactory>> persistenceTypeToStorageEngineFactoryMapOptional) {
String dataPath = configLoader.getVeniceServerConfig().getDataBasePath();
if (!Utils.directoryExists(dataPath)) {
if (!configLoader.getVeniceServerConfig().isAutoCreateDataPath()) {
throw new VeniceException(
"Data directory '" + dataPath + "' does not exist and " + ConfigKeys.AUTOCREATE_DATA_PATH
+ " is disabled.");
}

File dataDir = new File(dataPath);
LOGGER.info("Creating data directory {}", dataDir.getAbsolutePath());
dataDir.mkdirs();
}

this.configLoader = configLoader;
this.serverConfig = configLoader.getVeniceServerConfig();
this.storageEngineRepository = new StorageEngineRepository();

this.aggVersionedStorageEngineStats = storageEngineStats;
this.rocksDBMemoryStats = rocksDBMemoryStats;
this.storeVersionStateSerializer = storeVersionStateSerializer;
this.partitionStateSerializer = partitionStateSerializer;
this.storeRepository = storeRepository;
if (persistenceTypeToStorageEngineFactoryMapOptional.isPresent()) {
this.persistenceTypeToStorageEngineFactoryMap = persistenceTypeToStorageEngineFactoryMapOptional.get();
} else {
this.persistenceTypeToStorageEngineFactoryMap = new HashMap<>();
initInternalStorageEngineFactories();
}
if (restoreDataPartitions || restoreMetadataPartitions) {
restoreAllStores(
configLoader,
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot,
se -> null);
}
}

public StorageService(
VeniceConfigLoader configLoader,
AggVersionedStorageEngineStats storageEngineStats,
RocksDBMemoryStats rocksDBMemoryStats,
InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
ReadOnlyStoreRepository storeRepository,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionsShouldBeKeptOrNot) {
this(
configLoader,
storageEngineStats,
rocksDBMemoryStats,
storeVersionStateSerializer,
partitionStateSerializer,
storeRepository,
restoreDataPartitions,
restoreMetadataPartitions,
checkWhetherStorageEngineShouldBeKeptOrNot,
checkWhetherStoragePartitionsShouldBeKeptOrNot,
Optional.empty());
}

public StorageService(
VeniceConfigLoader configLoader,
AggVersionedStorageEngineStats storageEngineStats,
Expand Down Expand Up @@ -322,8 +233,7 @@ private void restoreAllStores(
VeniceConfigLoader configLoader,
boolean restoreDataPartitions,
boolean restoreMetadataPartitions,
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot,
Function<AbstractStorageEngine, Void> checkWhetherStoragePartitionsShouldBeKeptOrNot) {
Function<String, Boolean> checkWhetherStorageEngineShouldBeKeptOrNot) {
LOGGER.info("Start restoring all the stores persisted previously");
for (Map.Entry<PersistenceType, StorageEngineFactory> entry: persistenceTypeToStorageEngineFactoryMap.entrySet()) {
PersistenceType pType = entry.getKey();
Expand All @@ -344,7 +254,6 @@ private void restoreAllStores(
if (checkWhetherStorageEngineShouldBeKeptOrNot.apply(storeName)) {
try {
storageEngine = openStore(storeConfig, () -> null);
checkWhetherStoragePartitionsShouldBeKeptOrNot.apply(storageEngine);
} catch (Exception e) {
if (ExceptionUtils.recursiveClassEquals(e, RocksDBException.class)) {
LOGGER.warn("Encountered RocksDB error while opening store: {}", storeName, e);
Expand Down Expand Up @@ -558,7 +467,7 @@ public synchronized void closeStorageEngine(String kafkaTopic) {
public void cleanupAllStores(VeniceConfigLoader configLoader) {
// Load local storage and delete them safely.
// TODO Just clean the data dir in case loading and deleting is too slow.
restoreAllStores(configLoader, true, true, s -> true, se -> null);
restoreAllStores(configLoader, true, true, s -> true);
LOGGER.info("Start cleaning up all the stores persisted previously");
storageEngineRepository.getAllLocalStorageEngines().stream().forEach(storageEngine -> {
String storeName = storageEngine.getStoreVersionName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public void testGetStoreAndUserPartitionsMapping() {
true,
true,
(s) -> true,
(se) -> null,
Optional.of(persistenceTypeToStorageEngineFactoryMap));

Map<String, Set<Integer>> expectedMapping = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ private List<AbstractVeniceService> createServices() {
metadataRepo,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot(),
functionToCheckWhetherStoragePartitionsShouldBeKeptOrNot());
functionToCheckWhetherStorageEngineShouldBeKeptOrNot());
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
storageEngineMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
services.add(storageEngineMetadataService);
Expand Down Expand Up @@ -601,6 +600,10 @@ public void start() throws VeniceException {
service.start();
}

for (AbstractStorageEngine storageEngine: storageService.getStorageEngineRepository().getAllLocalStorageEngines()) {
kvargha marked this conversation as resolved.
Show resolved Hide resolved
functionToCheckWhetherStoragePartitionsShouldBeKeptOrNot().apply(storageEngine);
}

for (ServiceDiscoveryAnnouncer serviceDiscoveryAnnouncer: serviceDiscoveryAnnouncers) {
LOGGER.info("Registering to service discovery: {}", serviceDiscoveryAnnouncer);
serviceDiscoveryAnnouncer.register();
Expand Down
Loading