Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

提供实时任务限流&执行状态可视化功能 #397

Open
baisui1981 opened this issue Dec 1, 2024 · 4 comments
Open

提供实时任务限流&执行状态可视化功能 #397

baisui1981 opened this issue Dec 1, 2024 · 4 comments
Milestone

Comments

@baisui1981
Copy link
Member

为了对正在执行的实时任务进行更细粒度的控制,需要提供任务限流&执行状态可视化功能,以应对实时任务执行期间执行批量任务构建,或者业务系统中有突发事件需要紧急限流。

需要在控制面中提供两个子功能予以配合:

  1. 任务限流: 可以实时控制每秒表纪录处理量
  2. 执行状态(吞吐速率)可视化: 设置任务限流功能后,可以立即从可视化看板上观察到执行吞吐的变化
@baisui1981
Copy link
Member Author

flink 内实现限流功能参考

为了在 Flink 作业运行时动态设置 updateRate 方法,你需要一个机制来接收外部的配置更新,并将这些更新应用到正在运行的任务实例中。这可以通过多种方式实现,下面我将详细说明一种常见的方法:使用广播变量和 Akka 的 Actor 系统。

使用广播变量和 Akka Actor 系统

  1. 设置广播变量

首先,在你的主程序(即提交 Flink Job 的地方)中,你需要创建一个广播流,用于发送最新的限流配置给所有并行任务实例。你可以使用一个特殊的 SourceFunction 来生成包含新速率值的消息,并将其与你的主要数据流合并。

// 创建一个广播流,用于发送新的限流配置
DataStream<Double> rateConfigStream = env.addSource(new RateConfigSource())
    .broadcast();

// 主要的数据流
DataStream<String> mainDataStream = ...;

// 将两者连接起来
ConnectedStreams<String, Double> connectedStreams = mainDataStream.connect(rateConfigStream);

// 应用自定义的 CoFlatMapFunction 或 RichCoFlatMapFunction 处理逻辑
connectedStreams.flatMap(new DynamicRateControlledFunction());
  1. 修改 DynamicRateControlledFunction
    接下来,修改 DynamicRateControlledFunction 类以支持接收广播变量中的最新配置:
public class DynamicRateControlledFunction extends RichCoFlatMapFunction<String, Double, String> {

    private transient RateLimiter rateLimiter;
    private transient BroadcastState<String, Double> broadcastState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // 获取广播状态
        ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("rate", Double.class);
        broadcastState = getRuntimeContext().getBroadcastState(descriptor);
        
        // 初始化默认限流器
        double initialRate = broadcastState.get("rate") != null ? broadcastState.get("rate") : 10.0;
        rateLimiter = RateLimiter.create(initialRate); // 假设每秒允许10个事件
    }

    @Override
    public void flatMap1(String value, Collector<String> out) throws Exception {
        if (rateLimiter.tryAcquire()) {
            // 如果获取到令牌,则继续处理
            out.collect(value);
        } else {
            // 否则丢弃或者延迟处理
        }
    }

    @Override
    public void flatMap2(Double newRate, Collector<String> out) throws Exception {
        // 更新广播状态中的限流速率
        broadcastState.put("rate", newRate);
        // 更新本地限流器的速率
        rateLimiter.setRate(newRate);
    }
}

在这个版本的 DynamicRateControlledFunction 中,我们重写了 flatMap2 方法来处理来自广播流的新速率值。每当收到新的速率配置时,它会更新广播状态和本地的 RateLimiter 实例。

  1. 使用 Akka Actor 系统接收外部命令

为了让外部系统能够触发速率更新,你可以设置一个 Akka Actor 来监听 HTTP 请求或其他形式的远程调用。这个 Actor 可以直接向广播流发送消息,或者通过其他方式通知广播流有新的速率配置需要传播。

例如,你可以创建一个简单的 REST API 服务器,当接收到 POST 请求时,它会调用 Akka Actor 发送速率更新消息到广播流。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.ExecutionContextExecutor

implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

val route =
  path("update-rate") {
    post {
      entity(as[Double]) { newRate =>
        // 调用广播流的源函数来发送新的速率配置
        RateConfigSource.updateRate(newRate)
        complete(s"Rate updated to $newRate")
      }
    }
  }

Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // 让应用程序保持运行

这里的 RateConfigSource.updateRate(newRate) 是一个假设的方法,它应该负责将新的速率值推送到广播流中。具体的实现取决于你如何设计广播流的源函数。

通过这种方式,你可以在 Flink 作业启动后动态地调整每个任务实例的限流速率,而不需要重启整个作业。这种方法不仅提供了灵活性,还保证了高可用性和实时性。

@baisui1981
Copy link
Member Author

#353

@baisui1981 baisui1981 added this to the v4.2.0 milestone Dec 23, 2024
@baisui1981
Copy link
Member Author

是的,可以通过扩展 Flink 的 REST API 或者通过其他方式将外部请求作为输入流发送到正在运行的 Flink 作业中。以下是几种实现这一目标的方法:

1. 使用 Flink 提供的 REST API 和自定义 SourceFunction

虽然 Flink 自身提供的 REST API 主要用于作业管理(如提交、取消作业等),但你可以创建一个自定义的 SourceFunction 来监听来自外部服务的事件,并将其转换为数据流。然后,你可以部署一个独立的 Web 服务器(如使用 Spring Boot 构建的 REST API)来接收 HTTP 请求,并将这些请求转发给你的自定义 SourceFunction

实现步骤:

  • 自定义 SourceFunction:实现一个可以与外部通信的 SourceFunction,例如通过网络套接字、消息队列(如 Kafka 或 RabbitMQ)或直接通过 HTTP。
  • Web 服务器:设置一个独立的 Web 服务器来接收 REST 请求,并将这些请求的数据传递给 Flink 作业中的自定义 SourceFunction。这可能涉及到使用像 Kafka 这样的中间件来解耦 Web 服务器和 Flink 作业之间的直接依赖关系。

2. 使用 Flink 提供的 Table & SQL API 和 Flink Restful Table API

Flink 提供了 Table & SQL API,以及从 Flink 1.13 开始引入的 Restful Table API,允许用户通过 REST 接口执行 SQL 查询并操作表。如果你的应用场景允许使用 SQL 来处理数据流,那么你可以考虑通过这种方式将外部请求转化为 SQL 操作的一部分。

3. 使用 Flink Connector 或者自定义 Connector

Flink 支持多种内置和社区贡献的连接器,它们可以帮助你将外部系统(如数据库、消息队列等)与 Flink 作业集成起来。如果现有的连接器不能满足需求,你可以开发自己的连接器,使它能够监听特定端点的 HTTP 请求,并将这些请求作为数据源提供给 Flink 作业。

4. 使用 Flink Stateful Functions (StateFun)

Flink Stateful Functions 是一种构建有状态应用程序的方式,它支持通过 gRPC 协议接收外部调用。如果你的应用需要处理有状态的交互式请求,那么 StateFun 可能是一个不错的选择。它可以很容易地接受来自外部系统的请求,并根据这些请求更新内部状态或触发计算逻辑。

5. 直接集成 Jetty 或其他 Web Server 到 Flink Job

理论上,可以在 Flink 作业中直接嵌入一个轻量级的 Web 服务器(如 Jetty),以接收 HTTP 请求并将这些请求的数据作为输入流的一部分。这种方法比较少见,因为它可能会增加代码复杂度并且不容易维护,但它确实是一种可行的技术方案。

推荐做法

在大多数情况下,推荐的做法是通过消息队列(如 Apache Kafka)或者其他中间件来解耦前端 Web 服务器和后端 Flink 作业。这样做不仅简化了架构设计,还提高了系统的可扩展性和可靠性。Web 服务器接收到请求后,将请求内容发布到消息队列中;Flink 作业订阅该消息队列,从而将外部请求作为输入流进行处理。这种模式下,即使 Web 服务器或 Flink 作业出现故障,也不会影响整个系统的正常运作。

选择哪种方法取决于你的具体应用场景、技术栈偏好以及对性能和可靠性的要求。

当然可以。下面我将给出一个简单的例子,展示如何使用 Flink Stateful Functions (StateFun) 来处理来自外部 REST API 的请求,并将其作为输入流发送到 Stateful Function 中进行处理。我们将创建一个简单的应用程序,它接收 HTTP 请求,然后根据请求内容调用相应的 Stateful Function。

环境准备

首先,请确保你已经安装了必要的工具和库:

  • Java 8 或更高版本
  • Maven 构建工具
  • Docker(用于运行 StateFun SDK 和其他依赖服务)

示例代码

1. 创建 Stateful Function

我们将定义一个非常简单的 Stateful Function,该函数接收一个字符串消息并返回一条问候信息。为了简化示例,我们假设这个函数是无状态的,但你可以很容易地扩展它来包含持久化状态。

// GreetingFunction.java
package com.example.statefun;

import io.statefun.flink.core.fn.Context;
import io.statefun.flink.types.Type;
import io.statefun.functions.*;

public class GreetingFunction implements SimpleFunction<String, String> {
    @Override
    public void invoke(Context context, String input, Emitter emitter) {
        // Process the incoming message and emit a greeting response.
        String response = "Hello, " + input + "!";
        emitter.emit(context.self(), response);
    }

    @Override
    public Type<String> inputType() {
        return Types.string();
    }

    @Override
    public Type<String> outputType() {
        return Types.string();
    }
}

2. 配置 StateFun 应用程序

接下来,我们需要配置 StateFun 应用来加载我们的 GreetingFunction。这通常通过编写 YAML 文件或直接在代码中完成。

# statefun.yaml
functions:
  - typename: "com.example/greeter"
    type: "builtin/python" # or "builtin/java" if you're using Java
    spec:
      factory-class: "com.example.statefun.GreetingFunction"

请注意,如果你使用的是 Python 函数,则需要调整配置文件以适应 Python 环境。

3. 设置 REST 接收器

现在,让我们设置一个简单的 REST 接收器,它将把接收到的 HTTP POST 请求转换为 StateFun 消息并发送给 GreetingFunction

// RestReceiver.java
package com.example.receiver;

import org.springframework.web.bind.annotation.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.statefun.clients.StateFunClient;
import io.statefun.generated.Address;
import io.statefun.generated.TypedMessage;

@RestController
@RequestMapping("/api")
public class RestReceiver {

    private final StateFunClient stateFunClient;

    public RestReceiver() {
        // Initialize gRPC channel to connect to StateFun.
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
        this.stateFunClient = new StateFunClient(channel);
    }

    @PostMapping("/greet")
    public String greet(@RequestParam String name) {
        // Prepare the message to send to StateFun.
        Address targetAddress = Address.from("com.example/greeter", "greeter");
        TypedMessage message = TypedMessage.newBuilder()
                .setTypeName("com.example/string")
                .setValue(name)
                .build();

        // Send the message and receive the response.
        TypedMessage response = stateFunClient.invoke(targetAddress, message);

        // Return the response from the Stateful Function.
        return response.getValue();
    }
}

在这个例子中,我们使用 Spring Boot 来搭建 REST API,并且利用了 StateFun 客户端库来与 StateFun 进行交互。你需要确保你的应用能够访问 StateFun 服务(在这里是通过 localhost:8080)。

4. 启动 StateFun 和 REST 接收器

最后,启动 StateFun 和 REST 接收器:

  • 启动 StateFun:按照官方文档中的说明,使用 Docker 或其他方式启动 StateFun。
  • 启动 REST 接收器:编译并运行包含上述代码的 Spring Boot 应用程序。

一旦两者都已启动,你可以通过向 /api/greet?name=YourName 发送 HTTP POST 请求来测试整个流程。你应该会收到类似 "Hello, YourName!" 的响应。

注意事项

  • 实际部署时,请考虑安全性、性能优化以及错误处理等因素。
  • 根据实际情况调整配置和服务地址。
  • 如果你想实现更复杂的功能,比如有状态计算、异步回调等,可以进一步探索 StateFun 的特性。

这段代码只是一个起点,旨在帮助你理解如何将 REST 请求作为输入流发送到 StateFun 中。随着项目的深入,你可以逐步添加更多功能和改进现有逻辑。

@baisui1981
Copy link
Member Author

前端框架可以参考:https://github.com/dagrejs/dagre-d3/tree/master
例子:https://github.com/dagrejs/dagre-d3/wiki
TIS中的例子:src/runtime/corenodemanage.component.ts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant