Skip to content

Commit

Permalink
[flink] Extract RefreshBlacklist from FileStoreLookupFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 5, 2024
1 parent 3ff13dc commit c17cdd5
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;

Expand All @@ -56,7 +53,6 @@
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -66,7 +62,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
Expand All @@ -93,9 +88,7 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private final List<String> projectFields;
private final List<String> joinKeys;
@Nullable private final Predicate predicate;

private final List<Pair<Long, Long>> timePeriodsBlacklist;
private long nextBlacklistCheckTime;
private final RefreshBlacklist refreshBlacklist;

private transient File path;
private transient LookupTable lookupTable;
Expand Down Expand Up @@ -142,44 +135,9 @@ public FileStoreLookupFunction(

this.predicate = predicate;

this.timePeriodsBlacklist =
parseTimePeriodsBlacklist(
this.refreshBlacklist =
new RefreshBlacklist(
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
this.nextBlacklistCheckTime = -1;
}

private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) {
if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
return Collections.emptyList();
}
String[] timePeriods = blacklist.split(",");
List<Pair<Long, Long>> result = new ArrayList<>();
for (String period : timePeriods) {
String[] times = period.split("->");
if (times.length != 2) {
throw new IllegalArgumentException(
String.format("Incorrect time periods format: [%s].", blacklist));
}

long left = parseToMillis(times[0]);
long right = parseToMillis(times[1]);
if (left > right) {
throw new IllegalArgumentException(
String.format("Incorrect time period: [%s->%s].", times[0], times[1]));
}
result.add(Pair.of(left, right));
}
return result;
}

private long parseToMillis(String dateTime) {
try {
return DateTimeUtils.parseTimestampData(dateTime + ":00", 3, TimeZone.getDefault())
.getMillisecond();
} catch (DateTimeParseException e) {
throw new IllegalArgumentException(
String.format("Date time format error: [%s].", dateTime), e);
}
}

public void open(FunctionContext context) throws Exception {
Expand Down Expand Up @@ -326,19 +284,7 @@ private void reopen() {
@VisibleForTesting
void tryRefresh() throws Exception {
// 1. check if this time is in black list
long currentTimeMillis = System.currentTimeMillis();
if (nextBlacklistCheckTime > currentTimeMillis) {
return;
}

Pair<Long, Long> period = getFirstTimePeriods(timePeriodsBlacklist, currentTimeMillis);
if (period != null) {
LOG.info(
"Current time {} is in black list {}-{}, so try to refresh cache next time.",
currentTimeMillis,
period.getLeft(),
period.getRight());
nextBlacklistCheckTime = period.getRight() + 1;
if (!refreshBlacklist.canRefresh()) {
return;
}

Expand Down Expand Up @@ -368,16 +314,6 @@ void tryRefresh() throws Exception {
}
}

@Nullable
private Pair<Long, Long> getFirstTimePeriods(List<Pair<Long, Long>> timePeriods, long time) {
for (Pair<Long, Long> period : timePeriods) {
if (period.getLeft() <= time && time <= period.getRight()) {
return period;
}
}
return null;
}

private boolean shouldRefreshLookupTable() {
if (nextRefreshTime > System.currentTimeMillis()) {
return false;
Expand All @@ -399,7 +335,7 @@ LookupTable lookupTable() {

@VisibleForTesting
long nextBlacklistCheckTime() {
return nextBlacklistCheckTime;
return refreshBlacklist.nextBlacklistCheckTime();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TimeZone;

/** Refresh black list for {@link FileStoreLookupFunction}. */
public class RefreshBlacklist {

private static final Logger LOG = LoggerFactory.getLogger(RefreshBlacklist.class);

private final List<Pair<Long, Long>> timePeriodsBlacklist;

private long nextBlacklistCheckTime;

public RefreshBlacklist(String blacklist) {
this.timePeriodsBlacklist = parseTimePeriodsBlacklist(blacklist);
this.nextBlacklistCheckTime = -1;
}

private List<Pair<Long, Long>> parseTimePeriodsBlacklist(String blacklist) {
if (StringUtils.isNullOrWhitespaceOnly(blacklist)) {
return Collections.emptyList();
}
String[] timePeriods = blacklist.split(",");
List<Pair<Long, Long>> result = new ArrayList<>();
for (String period : timePeriods) {
String[] times = period.split("->");
if (times.length != 2) {
throw new IllegalArgumentException(
String.format("Incorrect time periods format: [%s].", blacklist));
}

long left = parseToMillis(times[0]);
long right = parseToMillis(times[1]);
if (left > right) {
throw new IllegalArgumentException(
String.format("Incorrect time period: [%s->%s].", times[0], times[1]));
}
result.add(Pair.of(left, right));
}
return result;
}

private long parseToMillis(String dateTime) {
try {
return DateTimeUtils.parseTimestampData(dateTime + ":00", 3, TimeZone.getDefault())
.getMillisecond();
} catch (DateTimeParseException e) {
throw new IllegalArgumentException(
String.format("Date time format error: [%s].", dateTime), e);
}
}

public boolean canRefresh() {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis < nextBlacklistCheckTime) {
return false;
}

Pair<Long, Long> selectedPeriod = null;
for (Pair<Long, Long> period : timePeriodsBlacklist) {
if (period.getLeft() <= currentTimeMillis && currentTimeMillis <= period.getRight()) {
selectedPeriod = period;
break;
}
}

if (selectedPeriod != null) {
LOG.info(
"Current time {} is in black list {}-{}, so try to refresh cache next time.",
currentTimeMillis,
selectedPeriod.getLeft(),
selectedPeriod.getRight());
nextBlacklistCheckTime = selectedPeriod.getRight() + 1;
return false;
}

return true;
}

public long nextBlacklistCheckTime() {
return nextBlacklistCheckTime;
}
}

0 comments on commit c17cdd5

Please sign in to comment.