Skip to content

Commit

Permalink
Remove TableDataManagerConfig and simplify TableDataManager construct…
Browse files Browse the repository at this point in the history
…ion (apache#12189)
  • Loading branch information
Jackie-Jiang authored Jan 18, 2024
1 parent 8713dc0 commit 4ad36c3
Show file tree
Hide file tree
Showing 25 changed files with 410 additions and 678 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public static AuthConfig extractAuthConfig(PinotConfiguration pinotConfig, Strin
* @return auth provider
*/
public static AuthProvider extractAuthProvider(PinotConfiguration pinotConfig, String namespace) {
if (pinotConfig == null) {
return new NullAuthProvider();
}
return makeAuthProvider(extractAuthConfig(pinotConfig, namespace));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
Expand All @@ -73,6 +71,7 @@
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand All @@ -90,18 +89,20 @@ public abstract class BaseTableDataManager implements TableDataManager {
// Semaphore to restrict the maximum number of parallel segment downloads for a table.
private Semaphore _segmentDownloadSemaphore;

protected TableDataManagerConfig _tableDataManagerConfig;
protected InstanceDataManagerConfig _instanceDataManagerConfig;
protected String _instanceId;
protected TableConfig _tableConfig;
protected HelixManager _helixManager;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
protected ServerMetrics _serverMetrics;
protected String _tableNameWithType;
protected String _tableDataDir;
protected File _indexDir;
protected File _resourceTmpDir;
protected Logger _logger;
protected HelixManager _helixManager;
protected ExecutorService _segmentPreloadExecutor;
protected AuthProvider _authProvider;
protected String _peerDownloadScheme;
protected long _streamSegmentDownloadUntarRateLimitBytesPerSec;
protected boolean _isStreamSegmentDownloadUntar;

Expand All @@ -114,25 +115,22 @@ public abstract class BaseTableDataManager implements TableDataManager {
protected volatile boolean _shutDown;

@Override
public void init(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
TableDataManagerParams tableDataManagerParams) {
LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName());

_tableDataManagerConfig = tableDataManagerConfig;
_instanceId = instanceId;
_propertyStore = propertyStore;
_serverMetrics = serverMetrics;
public void init(InstanceDataManagerConfig instanceDataManagerConfig, TableConfig tableConfig,
HelixManager helixManager, @Nullable ExecutorService segmentPreloadExecutor,
@Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
LOGGER.info("Initializing table data manager for table: {}", tableConfig.getTableName());

_instanceDataManagerConfig = instanceDataManagerConfig;
_instanceId = instanceDataManagerConfig.getInstanceId();
_tableConfig = tableConfig;
_helixManager = helixManager;
_propertyStore = helixManager.getHelixPropertyStore();
_serverMetrics = ServerMetrics.get();
_segmentPreloadExecutor = segmentPreloadExecutor;
_authProvider = AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(), null);

_authProvider =
AuthProviderUtils.extractAuthProvider(toPinotConfiguration(_tableDataManagerConfig.getAuthConfig()), null);

_tableNameWithType = tableDataManagerConfig.getTableName();
_tableDataDir = tableDataManagerConfig.getDataDir();
_tableNameWithType = tableConfig.getTableName();
_tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + File.separator + _tableNameWithType;
_indexDir = new File(_tableDataDir);
if (!_indexDir.exists()) {
Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index directory at %s. "
Expand All @@ -148,18 +146,23 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI
}
_errorCache = errorCache;
_recentlyDeletedSegments =
CacheBuilder.newBuilder().maximumSize(tableDataManagerConfig.getTableDeletedSegmentsCacheSize())
.expireAfterWrite(tableDataManagerConfig.getTableDeletedSegmentsCacheTtlMinutes(), TimeUnit.MINUTES)
.build();
CacheBuilder.newBuilder().maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize())
.expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(), TimeUnit.MINUTES).build();

_peerDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
if (_peerDownloadScheme == null) {
_peerDownloadScheme = instanceDataManagerConfig.getSegmentPeerDownloadScheme();
}

_streamSegmentDownloadUntarRateLimitBytesPerSec =
tableDataManagerParams.getStreamSegmentDownloadUntarRateLimitBytesPerSec();
_isStreamSegmentDownloadUntar = tableDataManagerParams.isStreamSegmentDownloadUntar();
instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit();
_isStreamSegmentDownloadUntar = instanceDataManagerConfig.isStreamSegmentDownloadUntar();
if (_isStreamSegmentDownloadUntar) {
LOGGER.info("Using streamed download-untar for segment download! "
+ "The rate limit interval for streamed download-untar is {} bytes/s",
_streamSegmentDownloadUntarRateLimitBytesPerSec);
}
int maxParallelSegmentDownloads = tableDataManagerParams.getMaxParallelSegmentDownloads();
int maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();
if (maxParallelSegmentDownloads > 0) {
LOGGER.info(
"Construct segment download semaphore for Table: {}. Maximum number of parallel segment downloads: {}",
Expand All @@ -178,6 +181,16 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI

protected abstract void doInit();

@Override
public String getInstanceId() {
return _instanceId;
}

@Override
public InstanceDataManagerConfig getInstanceDataManagerConfig() {
return _instanceDataManagerConfig;
}

@Override
public synchronized void start() {
_logger.info("Starting table data manager for table: {}", _tableNameWithType);
Expand Down Expand Up @@ -255,7 +268,7 @@ public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s",
indexDir.getName(), _tableNameWithType);
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, indexLoadingConfig.getSchema()));
}

Expand Down Expand Up @@ -382,11 +395,6 @@ public File getTableDataDir() {
return _indexDir;
}

@Override
public TableDataManagerConfig getTableDataManagerConfig() {
return _tableDataManagerConfig;
}

@Override
public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInfo) {
_errorCache.put(Pair.of(_tableNameWithType, segmentName), segmentErrorInfo);
Expand All @@ -413,7 +421,7 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
String segmentTier = getSegmentCurrentTier(segmentName);
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
File indexDir = getSegmentDataDir(segmentName, segmentTier, indexLoadingConfig.getTableConfig());
try {
// Download segment from deep store if CRC changes or forced to download;
Expand Down Expand Up @@ -507,7 +515,7 @@ public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoad
String segmentTier = zkMetadata.getTier();
indexLoadingConfig.setSegmentTier(segmentTier);
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
if (localMetadata == null && tryLoadExistingSegment(segmentName, indexLoadingConfig, zkMetadata)) {
return;
}
Expand Down Expand Up @@ -630,7 +638,7 @@ File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File t
LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", segmentName,
_tableNameWithType, uri, tarFile);
_serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES, 1L);
if (_tableDataManagerConfig.getTablePeerDownloadScheme() == null) {
if (_peerDownloadScheme == null) {
throw e;
}
downloadFromPeersWithoutStreaming(segmentName, zkMetadata, tarFile);
Expand All @@ -649,11 +657,9 @@ File downloadAndDecrypt(String segmentName, SegmentZKMetadata zkMetadata, File t
// not thread safe. Caller should invoke it with safe concurrency control.
protected void downloadFromPeersWithoutStreaming(String segmentName, SegmentZKMetadata zkMetadata, File destTarFile)
throws Exception {
Preconditions.checkArgument(_tableDataManagerConfig.getTablePeerDownloadScheme() != null,
"Download peers require non null peer download scheme");
Preconditions.checkState(_peerDownloadScheme != null, "Download peers require non null peer download scheme");
List<URI> peerSegmentURIs =
PeerServerSegmentFinder.getPeerServerURIs(segmentName, _tableDataManagerConfig.getTablePeerDownloadScheme(),
_helixManager, _tableNameWithType);
PeerServerSegmentFinder.getPeerServerURIs(segmentName, _peerDownloadScheme, _helixManager, _tableNameWithType);
if (peerSegmentURIs.isEmpty()) {
String msg = String.format("segment %s doesn't have any peers", segmentName);
LOGGER.warn(msg);
Expand Down Expand Up @@ -745,7 +751,7 @@ public File getSegmentDataDir(String segmentName, @Nullable String segmentTier,
return getSegmentDataDir(segmentName);
}
String tierDataDir =
TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, _tableDataManagerConfig.getInstanceTierConfigs());
TierConfigUtils.getDataDirForTier(tableConfig, segmentTier, _instanceDataManagerConfig.getTierConfigs());
if (StringUtils.isEmpty(tierDataDir)) {
return getSegmentDataDir(segmentName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
Expand All @@ -45,56 +41,46 @@
* Factory for {@link TableDataManager}.
*/
public class TableDataManagerProvider {
private static Semaphore _segmentBuildSemaphore;
private static TableDataManagerParams _tableDataManagerParams;
private final InstanceDataManagerConfig _instanceDataManagerConfig;
private final Semaphore _segmentBuildSemaphore;

private TableDataManagerProvider() {
public TableDataManagerProvider(InstanceDataManagerConfig instanceDataManagerConfig) {
_instanceDataManagerConfig = instanceDataManagerConfig;
int maxParallelSegmentBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds();
_segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new Semaphore(maxParallelSegmentBuilds, true) : null;
}

public static void init(InstanceDataManagerConfig instanceDataManagerConfig) {
int maxParallelBuilds = instanceDataManagerConfig.getMaxParallelSegmentBuilds();
if (maxParallelBuilds > 0) {
_segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
}
_tableDataManagerParams = new TableDataManagerParams(instanceDataManagerConfig);
}

public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, null,
errorCache, () -> true);
public TableDataManager getTableDataManager(TableConfig tableConfig, HelixManager helixManager) {
return getTableDataManager(tableConfig, helixManager, null, null, () -> true);
}

public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager,
@Nullable ExecutorService segmentPreloadExecutor, LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
public TableDataManager getTableDataManager(TableConfig tableConfig, HelixManager helixManager,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
Supplier<Boolean> isServerReadyToServeQueries) {
TableDataManager tableDataManager;
switch (tableDataManagerConfig.getTableType()) {
switch (tableConfig.getTableType()) {
case OFFLINE:
if (tableDataManagerConfig.isDimTable()) {
tableDataManager = DimensionTableDataManager.createInstanceByTableName(tableDataManagerConfig.getTableName());
if (tableConfig.isDimTable()) {
tableDataManager = DimensionTableDataManager.createInstanceByTableName(tableConfig.getTableName());
} else {
tableDataManager = new OfflineTableDataManager();
}
break;
case REALTIME:
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(
tableDataManagerConfig.getTableConfig());
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
if (Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
&& StringUtils.isEmpty(tableDataManagerConfig.getInstanceDataManagerConfig().getSegmentStoreUri())) {
&& StringUtils.isEmpty(_instanceDataManagerConfig.getSegmentStoreUri())) {
throw new IllegalStateException(String.format("Table has enabled %s config. But the server has not "
+ "configured the segmentstore uri. Configure the server config %s",
+ "configured the segmentstore uri. Configure the server config %s",
StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
}
tableDataManager = new RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
break;
default:
throw new IllegalStateException();
}
tableDataManager.init(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager,
segmentPreloadExecutor, errorCache, _tableDataManagerParams);
tableDataManager.init(_instanceDataManagerConfig, tableConfig, helixManager, segmentPreloadExecutor, errorCache);
return tableDataManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_isReadyToConsumeData = isReadyToConsumeData;
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getServerInstance();
_instanceId = _realtimeTableDataManager.getInstanceId();
_leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
_protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
CompletionConfig completionConfig = _tableConfig.getValidationConfig().getCompletionConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ protected void doInit() {
Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
// NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to
// load segments into it
_tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig,
_tableDataManagerConfig.getInstanceDataManagerConfig().getUpsertConfigs());
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(tableConfig, _instanceDataManagerConfig.getUpsertConfig());
_tableUpsertMetadataManager.init(tableConfig, schema, this, _helixManager, _segmentPreloadExecutor);
}

Expand Down Expand Up @@ -328,7 +328,7 @@ public Semaphore getSegmentBuildSemaphore() {
}

public String getConsumerDir() {
String consumerDirPath = _tableDataManagerConfig.getConsumerDir();
String consumerDirPath = _instanceDataManagerConfig.getConsumerDir();
File consumerDir;
// If a consumer directory has been configured, use it to create a per-table path under the consumer dir.
// Otherwise, create a sub-dir under the table-specific data director and use it for consumer mmaps
Expand Down Expand Up @@ -391,7 +391,7 @@ public void addSegment(String segmentName, IndexLoadingConfig indexLoadingConfig

// Assign table directory and tier info to not let the segment be moved during loading/preprocessing
indexLoadingConfig.setTableDataDir(_tableDataDir);
indexLoadingConfig.setInstanceTierConfigs(_tableDataManagerConfig.getInstanceTierConfigs());
indexLoadingConfig.setInstanceTierConfigs(_instanceDataManagerConfig.getTierConfigs());
indexLoadingConfig.setSegmentTier(segmentZKMetadata.getTier());

File segmentDir = new File(_indexDir, segmentName);
Expand Down
Loading

0 comments on commit 4ad36c3

Please sign in to comment.