Skip to content

Commit

Permalink
Update initialized StorageService object [with revised import]
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Sep 25, 2024
1 parent 56e7711 commit 04c4e9e
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.linkedin.venice.listener.ServerReadMetadataRepository;
import com.linkedin.venice.listener.ServerStoreAclHandler;
import com.linkedin.venice.listener.StoreValueSchemasCacheService;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -317,14 +318,20 @@ private List<AbstractVeniceService> createServices() {
? new RocksDBMemoryStats(metricsRepository, "RocksDBMemoryStats", plainTableEnabled)
: null;

boolean whetherToRestoreDataPartitions = !isIsolatedIngestion()
|| veniceConfigLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists();

// Create and add StorageService. storeRepository will be populated by StorageService
storageService = new StorageService(
veniceConfigLoader,
storageEngineStats,
rocksDBMemoryStats,
storeVersionStateSerializer,
partitionStateSerializer,
metadataRepo);
metadataRepo,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot());
storageEngineMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
services.add(storageEngineMetadataService);
Expand Down Expand Up @@ -699,6 +706,10 @@ protected VeniceConfigLoader getConfigLoader() {
return veniceConfigLoader;
}

protected final boolean isIsolatedIngestion() {
return veniceConfigLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED);
}

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
return storageEngineName -> {
String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);
Expand Down

0 comments on commit 04c4e9e

Please sign in to comment.