Skip to content

Commit

Permalink
[FLINK-34668][checkpoint] Report operator state handle of file mergin…
Browse files Browse the repository at this point in the history
…g directory to JM
  • Loading branch information
fredia committed Mar 26, 2024
1 parent 1aa35b9 commit 05b27be
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;

Expand Down Expand Up @@ -117,6 +118,17 @@ FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(
*/
Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);

/**
* Get the {@link DirectoryStreamStateHandle} of the managed directory, created in {@link
* #initFileSystem} or {@link #registerSubtaskForSharedStates}.
*
* @param subtaskKey the subtask key identifying the subtask.
* @param scope the checkpoint scope.
* @return the {@link DirectoryStreamStateHandle} for one subtask in specified checkpoint scope.
*/
DirectoryStreamStateHandle getManagedDirStateHandle(
SubtaskKey subtaskKey, CheckpointedStateScope scope);

/**
* Notifies the manager that the checkpoint with the given {@code checkpointId} completed and
* was committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
Expand All @@ -37,6 +39,7 @@
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
Expand Down Expand Up @@ -105,12 +108,24 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
*/
private final Map<SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<>();

/**
* The {@link DirectoryStreamStateHandle} for shared state directories, one for each subtask.
*/
private final Map<SubtaskKey, DirectoryStreamStateHandle> managedSharedStateDirHandles =
new ConcurrentHashMap<>();

/**
* The private state files are merged across subtasks, there is only one directory for
* merged-files within one TM per job.
*/
protected Path managedExclusiveStateDir;

/**
* The {@link DirectoryStreamStateHandle} for private state directory, one for each task
* manager.
*/
protected DirectoryStreamStateHandle managedExclusiveStateDirHandle;

public FileMergingSnapshotManagerBase(
String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) {
this.id = id;
Expand Down Expand Up @@ -152,6 +167,9 @@ public void initFileSystem(
Path managedExclusivePath = new Path(taskOwnedStateDir, id);
createManagedDirectory(managedExclusivePath);
this.managedExclusiveStateDir = managedExclusivePath;
this.managedExclusiveStateDirHandle =
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(managedExclusivePath.getPath()).toPath());
this.writeBufferSize = writeBufferSize;
}

Expand All @@ -162,6 +180,10 @@ public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
if (!managedSharedStateDir.containsKey(subtaskKey)) {
createManagedDirectory(managedPath);
managedSharedStateDir.put(subtaskKey, managedPath);
managedSharedStateDirHandles.put(
subtaskKey,
DirectoryStreamStateHandle.forPathWithZeroSize(
new File(managedPath.getPath()).toPath()));
}
}

Expand Down Expand Up @@ -477,6 +499,16 @@ public Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope) {
}
}

@Override
public DirectoryStreamStateHandle getManagedDirStateHandle(
SubtaskKey subtaskKey, CheckpointedStateScope scope) {
if (scope.equals(CheckpointedStateScope.SHARED)) {
return managedSharedStateDirHandles.get(subtaskKey);
} else {
return managedExclusiveStateDirHandle;
}
}

static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) {
// Currently, we do file sync regardless of the file system.
// TODO: Determine whether do file sync more wisely. Add an interface to FileSystem if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.CollectionUtil;

Expand Down Expand Up @@ -121,7 +124,17 @@ public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(

if (registeredBroadcastStatesDeepCopies.isEmpty()
&& registeredOperatorStatesDeepCopies.isEmpty()) {
return snapshotCloseableRegistry -> SnapshotResult.empty();
if (streamFactory instanceof FsMergingCheckpointStorageLocation) {
FsMergingCheckpointStorageLocation location =
(FsMergingCheckpointStorageLocation) streamFactory;
return snapshotCloseableRegistry ->
SnapshotResult.of(
EmptyFileMergingOperatorStreamStateHandle.create(
location.getExclusiveStateHandle(),
location.getSharedStateHandle()));
} else {
return snapshotCloseableRegistry -> SnapshotResult.empty();
}
}

return (snapshotCloseableRegistry) -> {
Expand Down Expand Up @@ -204,7 +217,17 @@ public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(
if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
StreamStateHandle stateHandle = localOut.closeAndGetHandle();
if (stateHandle != null) {
retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
retValue =
streamFactory instanceof FsMergingCheckpointStorageLocation
? new FileMergingOperatorStreamStateHandle(
((FsMergingCheckpointStorageLocation) streamFactory)
.getExclusiveStateHandle(),
((FsMergingCheckpointStorageLocation) streamFactory)
.getSharedStateHandle(),
writtenStatesMetaData,
stateHandle)
: new OperatorStreamStateHandle(
writtenStatesMetaData, stateHandle);
}
return SnapshotResult.of(retValue);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.filemerging;

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;

import javax.annotation.Nonnull;

import java.nio.file.Path;
import java.util.Optional;

/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
public class DirectoryStreamStateHandle extends DirectoryStateHandle implements StreamStateHandle {

private static final long serialVersionUID = 1L;

public DirectoryStreamStateHandle(@Nonnull Path directory, long directorySize) {
super(directory, directorySize);
}

@Override
public FSDataInputStream openInputStream() {
throw new UnsupportedOperationException();
}

@Override
public Optional<byte[]> asBytesIfInMemory() {
return Optional.empty();
}

@Override
public PhysicalStateHandleID getStreamStateHandleID() {
return new PhysicalStateHandleID(getDirectory().toString());
}

public SharedStateRegistryKey createStateRegistryKey() {
return new SharedStateRegistryKey(getDirectory().toUri().toString());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o;

return getDirectory().equals(that.getDirectory());
}

@Override
public String toString() {
return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + '}';
}

/**
* Return a {@link DirectoryStreamStateHandle} with zero size, which usually used to be
* registered to {@link org.apache.flink.runtime.state.SharedStateRegistry} to track the life
* cycle of the directory, therefore a fake size is provided.
*
* @param directory the directory.
* @return DirectoryStreamStateHandle with zero size.
*/
public static DirectoryStreamStateHandle forPathWithZeroSize(@Nonnull Path directory) {
return new DirectoryStreamStateHandle(directory, 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.filemerging;

import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;

import java.util.Collections;
import java.util.Map;

/**
* An empty {@link FileMergingOperatorStreamStateHandle} that is only used as a placeholder to
* prevent file merging directory from being deleted.
*/
public class EmptyFileMergingOperatorStreamStateHandle
extends FileMergingOperatorStreamStateHandle {

private static final long serialVersionUID = 1L;

public EmptyFileMergingOperatorStreamStateHandle(
DirectoryStreamStateHandle taskOwnedDirHandle,
DirectoryStreamStateHandle sharedDirHandle,
Map<String, StateMetaInfo> stateNameToPartitionOffsets,
StreamStateHandle delegateStateHandle) {
super(
taskOwnedDirHandle,
sharedDirHandle,
stateNameToPartitionOffsets,
delegateStateHandle);
}

/**
* Create an empty {@link EmptyFileMergingOperatorStreamStateHandle}.
*
* @param taskownedDirHandle the directory where operator state is stored.
* @param sharedDirHandle the directory where shared state is stored.
*/
public static EmptyFileMergingOperatorStreamStateHandle create(
DirectoryStreamStateHandle taskownedDirHandle,
DirectoryStreamStateHandle sharedDirHandle) {
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
Collections.emptyMap();
return new EmptyFileMergingOperatorStreamStateHandle(
taskownedDirHandle,
sharedDirHandle,
writtenStatesMetaData,
EmptySegmentFileStateHandle.INSTANCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.filemerging;

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointedStateScope;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/** An empty {@link SegmentFileStateHandle} that is only used as a placeholder. */
public class EmptySegmentFileStateHandle extends SegmentFileStateHandle {
private static final long serialVersionUID = 1L;

public static final EmptySegmentFileStateHandle INSTANCE =
new EmptySegmentFileStateHandle(
new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE);

private EmptySegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
super(filePath, startPos, stateSize, scope);
}

@Override
public FSDataInputStream openInputStream() throws IOException {
throw new UnsupportedEncodingException(
"Cannot open input stream from an EmptySegmentFileStateHandle.");
}
}
Loading

0 comments on commit 05b27be

Please sign in to comment.