Skip to content

Commit

Permalink
[fix][fn] Align WindowContext with BaseContext (apache#23628)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng authored Nov 25, 2024
1 parent 2126d40 commit 7909d2d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,16 @@
*/
package org.apache.pulsar.functions.api;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.slf4j.Logger;

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WindowContext {

/**
* The tenant this function belongs to.
*
* @return the tenant this function belongs to
*/
String getTenant();

/**
* The namespace this function belongs to.
*
* @return the namespace this function belongs to
*/
String getNamespace();

public interface WindowContext extends BaseContext {
/**
* The name of the function that we are executing.
*
Expand All @@ -59,20 +42,6 @@ public interface WindowContext {
*/
String getFunctionId();

/**
* The id of the instance that invokes this function.
*
* @return the instance id
*/
int getInstanceId();

/**
* Get the number of instances that invoke this function.
*
* @return the number of instances that invoke this function.
*/
int getNumInstances();

/**
* The version of the function that we are executing.
*
Expand Down Expand Up @@ -101,45 +70,6 @@ public interface WindowContext {
*/
String getOutputSchemaType();

/**
* The logger object that can be used to log in a function.
*
* @return the logger object
*/
Logger getLogger();

/**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);

/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);

/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);

/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);

/**
* Get a map of all user-defined key/value configs for the function.
*
Expand All @@ -164,14 +94,6 @@ public interface WindowContext {
*/
Object getUserConfigValueOrDefault(String key, Object defaultValue);

/**
* Record a user defined metric.
*
* @param metricName The name of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);

/**
* Publish an object using serDe for serializing to the topic.
*
Expand All @@ -194,4 +116,4 @@ public interface WindowContext {
* @return A future that completes when the framework is done publishing the message
*/
<T> CompletableFuture<Void> publish(String topicName, T object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,61 @@ public Logger getLogger() {
return this.context.getLogger();
}

@Override
public String getSecret(String secretName) {
return this.context.getSecret(secretName);
}

@Override
public void incrCounter(String key, long amount) {
this.context.incrCounter(key, amount);
}

@Override
public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
return this.context.incrCounterAsync(key, amount);
}

@Override
public long getCounter(String key) {
return this.context.getCounter(key);
}

@Override
public CompletableFuture<Long> getCounterAsync(String key) {
return this.context.getCounterAsync(key);
}

@Override
public void putState(String key, ByteBuffer value) {
this.context.putState(key, value);
}

@Override
public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
return this.context.putStateAsync(key, value);
}

@Override
public ByteBuffer getState(String key) {
return this.context.getState(key);
}

@Override
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return this.context.getStateAsync(key);
}

@Override
public void deleteState(String key) {
this.context.deleteState(key);
}

@Override
public CompletableFuture<Void> deleteStateAsync(String key) {
return this.context.deleteStateAsync(key);
}

@Override
public Map<String, Object> getUserConfigMap() {
return this.context.getUserConfigMap();
Expand All @@ -130,6 +165,11 @@ public void recordMetric(String metricName, double value) {
this.context.recordMetric(metricName, value);
}

@Override
public void fatal(Throwable t) {
this.context.fatal(t);
}

@Override
public <T> CompletableFuture<Void> publish(String topicName, T object) {
return this.context.publish(topicName, object);
Expand Down

0 comments on commit 7909d2d

Please sign in to comment.