Skip to content

Commit

Permalink
making nonLeaderForTables exhaustive (apache#12345)
Browse files Browse the repository at this point in the history
  • Loading branch information
gortiz authored Feb 1, 2024
1 parent 17db0fd commit 76379fb
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh

private TableSizeReader _tableSizeReader;

private Set<String> _cachedTableNamesWithType = new HashSet<>();

/**
* Constructs the segment status checker.
* @param pinotHelixResourceManager The resource checker used to interact with Helix
Expand Down Expand Up @@ -152,12 +150,6 @@ protected void postprocess(Context context) {
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_DISABLED, 0);
}
});

// Remove metrics for tables that are no longer in the cluster
_cachedTableNamesWithType.removeAll(context._processedTables);
_cachedTableNamesWithType.forEach(this::removeMetricsForTable);
_cachedTableNamesWithType.clear();
_cachedTableNamesWithType.addAll(context._processedTables);
}

/**
Expand Down Expand Up @@ -350,6 +342,7 @@ protected void nonLeaderCleanup(List<String> tableNamesWithType) {
}

private void removeMetricsForTable(String tableNameWithType) {
LOGGER.info("Removing metrics from {} given it is not a table known by Helix", tableNameWithType);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.periodictask;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
Expand All @@ -46,6 +51,7 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
protected final PinotHelixResourceManager _pinotHelixResourceManager;
protected final LeadControllerManager _leadControllerManager;
protected final ControllerMetrics _controllerMetrics;
protected Set<String> _prevLeaderOfTables = new HashSet<>();

public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager,
Expand All @@ -63,30 +69,23 @@ protected final void runTask(Properties periodicTaskProperties) {
// Check if we have a specific table against which this task needs to be run.
String propTableNameWithType = (String) periodicTaskProperties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
// Process the tables that are managed by this controller
List<String> tablesToProcess = new ArrayList<>();
List<String> nonLeaderForTables = new ArrayList<>();
if (propTableNameWithType == null) {
// Table name is not available, so task should run on all tables for which this controller is the lead.
for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) {
if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
tablesToProcess.add(tableNameWithType);
} else {
nonLeaderForTables.add(tableNameWithType);
}
}
} else {
// Table name is available, so task should run only on the specified table.
if (_leadControllerManager.isLeaderForTable(propTableNameWithType)) {
tablesToProcess.add(propTableNameWithType);
}
}
List<String> allTables = propTableNameWithType == null
? _pinotHelixResourceManager.getAllTables()
: Collections.singletonList(propTableNameWithType);

Set<String> currentLeaderOfTables = allTables.stream()
.filter(_leadControllerManager::isLeaderForTable)
.collect(Collectors.toSet());

if (!tablesToProcess.isEmpty()) {
processTables(tablesToProcess, periodicTaskProperties);
if (!currentLeaderOfTables.isEmpty()) {
processTables(new ArrayList<>(currentLeaderOfTables), periodicTaskProperties);
}

Set<String> nonLeaderForTables = Sets.difference(_prevLeaderOfTables, currentLeaderOfTables);
if (!nonLeaderForTables.isEmpty()) {
nonLeaderCleanup(nonLeaderForTables);
nonLeaderCleanup(new ArrayList<>(nonLeaderForTables));
}
_prevLeaderOfTables = currentLeaderOfTables;
} catch (Exception e) {
LOGGER.error("Caught exception while running task: {}", _taskName, e);
_controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
Expand All @@ -98,9 +97,12 @@ public final ControllerMetrics getControllerMetrics() {
}

/**
* Processes the given list of tables, and returns the number of tables processed.
* Processes the given list of tables lead by the current controller, and returns the number of tables processed.
* <p>
* Override one of this method, {@link #processTable(String)} or {@link #processTable(String, C)}.
* <p/>
* Note: This method is called each time the task is executed <b>if and only if</b> the current controller is the
* leader of at least one table. A corollary is that it won't be called every time the task is executed.
*/
protected void processTables(List<String> tableNamesWithType, Properties periodicTaskProperties) {
int numTables = tableNamesWithType.size();
Expand Down Expand Up @@ -128,14 +130,14 @@ protected void processTables(List<String> tableNamesWithType, Properties periodi
}

/**
* Can be overridden to provide context before processing the tables.
* Can be overridden to provide context before processing the tables lead by the current controller.
*/
protected C preprocess(Properties periodicTaskProperties) {
return null;
}

/**
* Processes the given table.
* Processes the given table lead by the current controller.
* <p>
* Override one of this method, {@link #processTable(String)} or {@link #processTables(List, Properties)}.
*/
Expand All @@ -144,28 +146,31 @@ protected void processTable(String tableNameWithType, C context) {
}

/**
* Processes the given table.
* Processes the given table lead by the current controller.
* <p>
* Override one of this method, {@link #processTable(String, C)} or {@link #processTables(List, Properties)}.
*/
protected void processTable(String tableNameWithType) {
}

/**
* Can be overridden to perform cleanups after processing the tables.
* Can be overridden to perform cleanups after processing the tables lead by the current controller.
*/
protected void postprocess(C context) {
postprocess();
}

/**
* Can be overridden to perform cleanups after processing the tables.
* Can be overridden to perform cleanups after processing the tables lead by the current controller.
*/
protected void postprocess() {
}

/**
* Can be overridden to perform cleanups for tables that the current controller isn't the leader.
* Can be overridden to perform cleanups for tables the current controller lost the leadership.
* <p/>
* Note: This method is only being called when there is at least one table in the given list. A corollary is that it
* won't be called every time the task is executed.
*
* @param tableNamesWithType the table names that the current controller isn't the leader for
*/
Expand Down

0 comments on commit 76379fb

Please sign in to comment.