Skip to content

Commit

Permalink
[flink] Avoid deprecated usage of DiscardingSink
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Nov 23, 2024
1 parent 559b287 commit e80d4bc
Show file tree
Hide file tree
Showing 23 changed files with 1,280 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

import org.apache.flink.annotation.PublicEvolving;

/** The {@link JobInfo} represents the meta information of current job. */
@PublicEvolving
public interface JobInfo {

/**
* Get the ID of the job.
*
* @return the ID of the job
*/
JobID getJobId();

/**
* Get the name of the job.
*
* @return the name of the job
*/
String getJobName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

import org.apache.flink.annotation.PublicEvolving;

/**
* The interface indicates that it supports multiple attempts executing at the same time.
*
* <p>Currently, the interface is used for speculative execution. If a sink implementation (SinkV2,
* OutputFormat or SinkFunction) inherits this interface, the sink operator would be considered to
* support speculative execution.
*/
@PublicEvolving
public interface SupportsConcurrentExecutionAttempts {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;

import java.util.OptionalLong;

/**
* Common interface which exposes runtime info for creating {@link SinkWriter} and {@link Committer}
* objects.
*/
@Internal
public interface InitContext {
/**
* The first checkpoint id when an application is started and not recovered from a previously
* taken checkpoint or savepoint.
*/
long INITIAL_CHECKPOINT_ID = 1;

/**
* Get the id of task where the committer is running.
*
* @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be
* provided uniformly by {@link #getTaskInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default int getSubtaskId() {
return getTaskInfo().getIndexOfThisSubtask();
}

/**
* Get the number of parallel committer tasks.
*
* @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be
* provided uniformly by {@link #getTaskInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default int getNumberOfParallelSubtasks() {
return getTaskInfo().getNumberOfParallelSubtasks();
}

/**
* Gets the attempt number of this parallel subtask. First attempt is numbered 0.
*
* @return Attempt number of the subtask.
* @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be
* provided uniformly by {@link #getTaskInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default int getAttemptNumber() {
return getTaskInfo().getAttemptNumber();
}

/**
* Returns id of the restored checkpoint, if state was restored from the snapshot of a previous
* execution.
*/
OptionalLong getRestoredCheckpointId();

/**
* The ID of the current job. Note that Job ID can change in particular upon manual restart. The
* returned ID should NOT be used for any job management tasks.
*
* @deprecated This method is deprecated since Flink 1.19. All metadata about the job should be
* provided uniformly by {@link #getJobInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default JobID getJobId() {
return getJobInfo().getJobId();
}

/**
* Get the meta information of current job.
*
* @return the job meta information.
*/
@PublicEvolving
JobInfo getJobInfo();

/**
* Get the meta information of current task.
*
* @return the task meta information.
*/
@PublicEvolving
TaskInfo getTaskInfo();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.sink2;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;

import java.util.Optional;
import java.util.function.Consumer;

/** The interface exposes some runtime info for creating a {@link SinkWriter}. */
@Public
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext {
/**
* Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, but
* are part of the jar file of a user job.
*
* @see UserCodeClassLoader
*/
UserCodeClassLoader getUserCodeClassLoader();

/**
* Returns the mailbox executor that allows to execute {@link Runnable}s inside the task thread
* in between record processing.
*
* <p>Note that this method should not be used per-record for performance reasons in the same
* way as records should not be sent to the external system individually. Rather, implementers
* are expected to batch records and only enqueue a single {@link Runnable} per batch to handle
* the result.
*/
MailboxExecutor getMailboxExecutor();

/**
* Returns a {@link ProcessingTimeService} that can be used to get the current time and register
* timers.
*/
ProcessingTimeService getProcessingTimeService();

/** @return The metric group this writer belongs to. */
SinkWriterMetricGroup metricGroup();

/** Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */
SerializationSchema.InitializationContext asSerializationSchemaInitializationContext();

/** Returns whether object reuse has been enabled or disabled. */
boolean isObjectReuseEnabled();

/** Creates a serializer for the type of sink's input. */
<IN> TypeSerializer<IN> createInputSerializer();

/**
* Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type
* {@link MetaT} to the consumer.
*
* <p>It is recommended to use a separate thread pool to publish the metadata because enqueuing
* a lot of these messages in the mailbox may lead to a performance decrease. thread, and the
* {@link Consumer#accept} method is executed very fast.
*/
default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.v2;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

import java.io.IOException;

/**
* A special sink that ignores all elements.
*
* @param <IN> The type of elements received by the sink.
*/
@PublicEvolving
public class DiscardingSink<IN> implements Sink<IN>, SupportsConcurrentExecutionAttempts {
private static final long serialVersionUID = 1L;

@Override
public SinkWriter<IN> createWriter(InitContext context) throws IOException {
return new DiscardingElementWriter();
}

private class DiscardingElementWriter implements SinkWriter<IN> {

@Override
public void write(IN element, Context context) throws IOException, InterruptedException {
// discard it.
}

@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
// this writer has no pending data.
}

@Override
public void close() throws Exception {
// do nothing.
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

import org.apache.flink.annotation.PublicEvolving;

/** The {@link JobInfo} represents the meta information of current job. */
@PublicEvolving
public interface JobInfo {

/**
* Get the ID of the job.
*
* @return the ID of the job
*/
JobID getJobId();

/**
* Get the name of the job.
*
* @return the name of the job
*/
String getJobName();
}
Loading

0 comments on commit e80d4bc

Please sign in to comment.