Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4139] Add Flink components #4140

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions backend/components/flink/helm/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@rules_pkg//:pkg.bzl", "pkg_tar")
load("@com_github_airyhq_bazel_tools//helm:helm.bzl", "helm_template_test")
load("//tools/build:helm.bzl", "helm_push_develop", "helm_push_release")

filegroup(
name = "files",
srcs = glob(
["**/*"],
exclude = ["BUILD"],
),
visibility = ["//visibility:public"],
)

pkg_tar(
name = "package",
srcs = [":files"],
extension = "tgz",
strip_prefix = "./",
)

helm_template_test(
name = "template",
chart = ":package",
)

helm_push_develop(
chart = ":package",
)

helm_push_release(
chart = ":package",
)
5 changes: 5 additions & 0 deletions backend/components/flink/helm/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: v2
appVersion: "1.0"
description: Flink connector
name: flink-connector
version: 1.0
10 changes: 10 additions & 0 deletions backend/components/flink/helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.component }}
labels:
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: "{{ .Values.component }}"
annotations:
core.airy.co/enabled: "{{ .Values.enabled }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.resultSender.topic }}
- name: API_COMMUNICATION_URL
value: {{ .Values.apiCommunicationUrl }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
15 changes: 15 additions & 0 deletions backend/components/flink/helm/templates/result-sender/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.resultSender.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.executor.topic }}
- name: FLINK_GATEWAY_URL
value: {{ .Values.gatewayUrl }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.executor.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.executor.name }}
15 changes: 15 additions & 0 deletions backend/components/flink/helm/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
component: flink-connector
mandatory: false
enabled: false
port: 8080
resources:
gatewayUrl: "http://flink-jobmanager:8083"
apiCommunicationUrl: "http://api-communication/messages.send"
executor:
name: statements-executor
image: connectors/flink/statements-executor
topic: flink.statements
resultSender:
name: result-sender
image: connectors/flink/result-sender
topic: flink.output
22 changes: 22 additions & 0 deletions backend/components/flink/result-sender/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use the official Golang image as the base image
FROM golang:1.17

# Set the working directory inside the container
WORKDIR /app

# Copy the Go source code into the container
COPY ./src/*.go ./

# Install the required libraries
RUN go mod init main && \
go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
go get golang.org/x/net

# Build the Go program
RUN go build -o app

# Command to run the Go program
CMD ["./app"]
6 changes: 6 additions & 0 deletions backend/components/flink/result-sender/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
build:
docker build -t flink-connector/result-sender .

release: build
docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release
docker push ghcr.io/airyhq/connectors/flink/result-sender:release
111 changes: 111 additions & 0 deletions backend/components/flink/result-sender/src/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

kafkaURL := os.Getenv("KAFKA_BROKERS")
schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
topicName := os.Getenv("KAFKA_TOPIC_NAME")
systemToken := os.Getenv("systemToken")
authUsername := os.Getenv("AUTH_JAAS_USERNAME")
authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
groupID := "result-sender"
flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")

if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
return
}

http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
response := map[string]string{"status": "UP"}
jsonResponse, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonResponse)
})

go func() {
if err := http.ListenAndServe(":80", nil); err != nil {
panic(err)
}
}()

fmt.Println("Health-check started")

fmt.Println("Creating Kafka consumer for topic: ", topicName)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kafkaURL,
"group.id": groupID,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": authUsername,
"sasl.password": authPassword,
})
if err != nil {
fmt.Printf("Error creating consumer: %v\n", err)
return
}
c.SubscribeTopics([]string{topicName}, nil)
signals := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

go func() {
for {
select {
case sig := <-signals:
fmt.Printf("Caught signal %v: terminating\n", sig)
done <- true
return
default:
msg, err := c.ReadMessage(-1)
if err == nil {
var flinkOutput FlinkOutput
if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil {
fmt.Printf("Error unmarshalling message: %v\n", err)
continue
} else {
fmt.Printf("Received message: %+v\n", flinkOutput)

fmt.Println("Flink gateway: ", flinkGatewayURL)
result, err := getFlinkResult(flinkGatewayURL, flinkOutput.SessionID)
if err != nil {
fmt.Println("Unable to get Flink result:", err)
return
}
response, err := convertResultToMarkdown(result)
if err != nil {
fmt.Println("Unable to generate Markdown from result:", err)
sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken)
return
}
sendMessage(response, flinkOutput.ConversationID, systemToken)
}
} else {
fmt.Printf("Consumer error: %v\n", err)
}
}
}
}()
<-done
c.Close()
fmt.Println("Consumer closed")
}
160 changes: 160 additions & 0 deletions backend/components/flink/result-sender/src/tools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package main

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"time"
)

// getFlinkResult sends an SQL statement to the Flink Gateway
func getFlinkResult(url, sessionID string) (FlinkResult, error) {
// Create the SQL Statement
fmt.Println("The Flink session is: ", sessionID)
payload := FlinkSQLRequest{
Statement: "select * from output;",
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return FlinkResult{}, err
}

req, err := http.NewRequest("POST", url+"/v1/sessions/"+sessionID+"/statements/", bytes.NewReader(payloadBytes))
if err != nil {
return FlinkResult{}, err
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return FlinkResult{}, err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading response body from the API: %v", err)
}
fmt.Println("Statement submitted. Response: ", string(body))
var statementResponse FlinkStatementResponse
if err := json.Unmarshal(body, &statementResponse); err != nil {
fmt.Printf("Error unmarshaling message: %v\n", err)
return FlinkResult{}, err
}

// Fetch the results from the operationHandle
fmt.Printf("Fetching result from: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionID, statementResponse.OperationHandle)
time.Sleep(10 * time.Second)
req, err = http.NewRequest("GET", url+"/v1/sessions/"+sessionID+"/operations/"+statementResponse.OperationHandle+"/result/0", nil)
if err != nil {
return FlinkResult{}, err
}
req.Header.Set("Content-Type", "application/json")

client = &http.Client{}
resp, err = client.Do(req)
if err != nil {
return FlinkResult{}, err
}
body, err = io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading response body from the API: %v", err)
}
fmt.Println("Statement submitted. Response: ", string(body))
var flinkResultResponse FlinkResultResponse
if err := json.Unmarshal(body, &flinkResultResponse); err != nil {
fmt.Printf("Error unmarshaling message: %v\n", err)
return FlinkResult{}, err
}
defer resp.Body.Close()

// Handle the response (check if the request was successful)
return flinkResultResponse.Results, nil
}

func sendMessage(message string, conversationId string, systemToken string) (int, string, error) {
apiCommunicationUrl := os.Getenv("API_COMMUNICATION_URL")
messageContent := messageContent{
Text: message,
}
messageToSend := ApplicationCommunicationSendMessage{
ConversationID: conversationId,
Message: messageContent,
}
messageJSON, err := json.Marshal(messageToSend)
if err != nil {
fmt.Printf("Error encoding response to JSON: %v\n", err)
return 0, "", errors.New("The message could not be encoded to JSON for sending.")
}

req, err := http.NewRequest("POST", apiCommunicationUrl, bytes.NewReader(messageJSON))
if err != nil {
fmt.Printf("Error creating request: %v\n", err)
return 0, "", errors.New("The message could not be sent.")
}
req.Header.Add("Authorization", "Bearer "+systemToken)
req.Header.Add("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("Error sending POST request: %v\n", err)
return 0, "", errors.New("Error sending POST request.")
}
defer resp.Body.Close()

// Read the response body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading response body:", err)
return 0, "", errors.New("Error reading response body.")
}

var response SendMessageResponse
err = json.Unmarshal(body, &response)
if err != nil {
fmt.Println("Error unmarshaling response:", err)
return 0, "", errors.New("Response couldn't be unmarshaled.")
}

fmt.Printf("Message sent with status code: %d\n", resp.StatusCode)
return resp.StatusCode, response.ID, nil
}

func markdown(message string) (string, error) {
return message, nil
}

func convertResultToMarkdown(result FlinkResult) (string, error) {
var builder strings.Builder

// Add the header row
if len(result.Columns) == 0 {
return "", errors.New("No columns found for generating the Markdown table.")
}
for _, col := range result.Columns {
builder.WriteString("| " + col.Name + " ")
}
builder.WriteString("|\n")

// Add the separator row
for range result.Columns {
builder.WriteString("|---")
}
builder.WriteString("|\n")

// Add the data rows
for _, d := range result.Data {
for _, field := range d.Fields {
builder.WriteString(fmt.Sprintf("| %v ", field))
}
builder.WriteString("|\n")
}

return builder.String(), nil
}
66 changes: 66 additions & 0 deletions backend/components/flink/result-sender/src/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

type ApplicationCommunicationSendMessage struct {
ConversationID string `json:"conversation_id"`
Message messageContent `json:"message"`
Metadata map[string]string `json:"metadata"`
}

type messageContent struct {
Text string `json:"text"`
}

type SendMessageResponse struct {
ID string `json:"id"`
State string `json:"state"`
}

type FlinkOutput struct {
SessionID string `json:"session_id"`
Question string `json:"question"`
MessageID string `json:"message_id"`
ConversationID string `json:"conversation_id"`
}

// FlinkSQLRequest represents the payload for a Flink SQL request
type FlinkSQLRequest struct {
Statement string `json:"statement"`
}

type FlinkSessionResponse struct {
SessionHandle string `json:"sessionHandle"`
}

type FlinkStatementResponse struct {
OperationHandle string `json:"operationHandle"`
}

type Column struct {
Name string `json:"name"`
LogicalType struct {
Type string `json:"type"`
Nullable bool `json:"nullable"`
Length int `json:"length,omitempty"`
} `json:"logicalType"`
Comment interface{} `json:"comment"`
}

type Data struct {
Kind string `json:"kind"`
Fields []interface{} `json:"fields"`
}

type FlinkResult struct {
Columns []Column `json:"columns"`
RowFormat string `json:"rowFormat"`
Data []Data `json:"data"`
}

type FlinkResultResponse struct {
ResultType string `json:"resultType"`
IsQueryResult bool `json:"isQueryResult"`
JobID string `json:"jobID"`
ResultKind string `json:"resultKind"`
Results FlinkResult `json:"results"`
NextResultUri string `json:"nextResultUri"`
}
22 changes: 22 additions & 0 deletions backend/components/flink/statements-executor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use the official Golang image as the base image
FROM golang:1.17

# Set the working directory inside the container
WORKDIR /app

# Copy the Go source code into the container
COPY ./src/*.go ./

# Install the required libraries
RUN go mod init main && \
go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
go get golang.org/x/net

# Build the Go program
RUN go build -o app

# Command to run the Go program
CMD ["./app"]
6 changes: 6 additions & 0 deletions backend/components/flink/statements-executor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
build:
docker build -t flink-connector/statements-executor .

release: build
docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release
docker push ghcr.io/airyhq/connectors/flink/statements-executor:release
115 changes: 115 additions & 0 deletions backend/components/flink/statements-executor/src/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package main

import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

kafkaURL := os.Getenv("KAFKA_BROKERS")
schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
topicName := os.Getenv("KAFKA_TOPIC_NAME")
authUsername := os.Getenv("AUTH_JAAS_USERNAME")
authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
timestamp := time.Now().Unix()
strTimestamp := fmt.Sprintf("%d", timestamp)
groupID := "statement-executor-" + strTimestamp
flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")

if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
return
}

http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
response := map[string]string{"status": "UP"}
jsonResponse, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonResponse)
})

go func() {
if err := http.ListenAndServe(":80", nil); err != nil {
panic(err)
}
}()

fmt.Println("Health-check started")

fmt.Println("Creating Kafka consumer for topic: ", topicName)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kafkaURL,
"group.id": groupID,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": authUsername,
"sasl.password": authPassword,
})
if err != nil {
fmt.Printf("Error creating consumer: %v\n", err)
return
}
c.SubscribeTopics([]string{topicName}, nil)
signals := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

go func() {
for {
select {
case sig := <-signals:
fmt.Printf("Caught signal %v: terminating\n", sig)
done <- true
return
default:
msg, err := c.ReadMessage(-1)
if err == nil {
var statementSet FlinkStatementSet
if err := json.Unmarshal(msg.Value, &statementSet); err != nil {
fmt.Printf("Error unmarshalling message: %v\n", err)
continue
} else {
fmt.Printf("Received message: %+v\n", statementSet)
fmt.Println("Flink gateway: ", flinkGatewayURL)
sessionID, err := sendFlinkSQL(flinkGatewayURL, statementSet)
if err != nil {
fmt.Println("Error running Flink statement:", err)
return
}
fmt.Println("Successfully executing the Flink statement.")
var flinkOutput FlinkOutput
flinkOutput.SessionID = sessionID
flinkOutput.Question = statementSet.Question
flinkOutput.MessageID = statementSet.MessageID
flinkOutput.ConversationID = statementSet.ConversationID
err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword)
if err != nil {
fmt.Printf("error producing message to Kafka: %v\n", err)
}
}
} else {
fmt.Printf("Consumer error: %v\n", err)
}
}
}
}()
<-done
c.Close()
fmt.Println("Consumer closed")
}
134 changes: 134 additions & 0 deletions backend/components/flink/statements-executor/src/tools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) {
timestamp := time.Now().Unix()
strTimestamp := fmt.Sprintf("%d", timestamp)
replacements := map[string]string{
"{PROPERTIES_GROUP_ID}": "flink-" + strTimestamp,
"{PROPERTIES_BOOTSTRAP_SERVERS}": os.Getenv("KAFKA_BROKERS"),
"{PROPERTIES_SASL_JAAS_CONFIG}": fmt.Sprintf("org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", os.Getenv("AUTH_JAAS_USERNAME"), os.Getenv("AUTH_JAAS_PASSWORD")),
}
for i, stmt := range statementSet.Statements {
for placeholder, value := range replacements {
stmt = strings.Replace(stmt, placeholder, value, -1)
}
statementSet.Statements[i] = stmt
}
fmt.Println("Updated StatementSet: %+v\n", statementSet.Statements)

// Create a sessionHandle
req, err := http.NewRequest("POST", url+"/v1/sessions/", bytes.NewReader([]byte("")))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading response body from the API: %v", err)
}
fmt.Println("Response: ", string(body))
var sessionResponse FlinkSessionResponse
if err := json.Unmarshal(body, &sessionResponse); err != nil {
fmt.Printf("Error unmarshaling message: %v\n", err)
return "", err
}
defer resp.Body.Close()

// Create the SQL Statement
fmt.Println("The Flink session is: ", sessionResponse.SessionHandle)
for _, statement := range statementSet.Statements {
payload := FlinkSQLRequest{
Statement: statement,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return "", err
}

req, err = http.NewRequest("POST", url+"/v1/sessions/"+sessionResponse.SessionHandle+"/statements/", bytes.NewReader(payloadBytes))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")

client = &http.Client{}
resp, err = client.Do(req)
if err != nil {
return "", err
}
body, err = io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading response body from the API: %v", err)
}
fmt.Println("Statement submitted. Response: ", string(body))
var statementResponse FlinkStatementResponse
if err := json.Unmarshal(body, &statementResponse); err != nil {
fmt.Printf("Error unmarshaling message: %v\n", err)
return "", err
}
fmt.Printf("Check status on: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionResponse.SessionHandle, statementResponse.OperationHandle)
defer resp.Body.Close()
}

return sessionResponse.SessionHandle, nil
}

func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername, authPassword string) error {

kafkaTopic := "flink.outputs"

flinkOutputJSON, err := json.Marshal(flinkOutput)
if err != nil {
return fmt.Errorf("error marshaling query to JSON: %w", err)
}

configMap := kafka.ConfigMap{
"bootstrap.servers": kafkaURL,
}
if authUsername != "" && authPassword != "" {
configMap.SetKey("security.protocol", "SASL_SSL")
configMap.SetKey("sasl.mechanisms", "PLAIN")
configMap.SetKey("sasl.username", authUsername)
configMap.SetKey("sasl.password", authPassword)
}

producer, err := kafka.NewProducer(&configMap)
if err != nil {
return fmt.Errorf("failed to create producer: %w", err)
}
defer producer.Close()

message := kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny},
Key: []byte(flinkOutput.SessionID),
Value: flinkOutputJSON,
}

err = producer.Produce(&message, nil)
if err != nil {
return fmt.Errorf("failed to produce message: %w", err)
}
fmt.Println("message scheduled for production")
producer.Flush(15 * 1000)
fmt.Println("message flushed")
return nil
}
28 changes: 28 additions & 0 deletions backend/components/flink/statements-executor/src/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

type FlinkStatementSet struct {
Statements []string `json:"statements"`
Question string `json:"question"`
MessageID string `json:"message_id"`
ConversationID string `json:"conversation_id"`
}

type FlinkOutput struct {
SessionID string `json:"session_id"`
Question string `json:"question"`
MessageID string `json:"message_id"`
ConversationID string `json:"conversation_id"`
}

// FlinkSQLRequest represents the payload for a Flink SQL request
type FlinkSQLRequest struct {
Statement string `json:"statement"`
}

type FlinkSessionResponse struct {
SessionHandle string `json:"sessionHandle"`
}

type FlinkStatementResponse struct {
OperationHandle string `json:"operationHandle"`
}
32 changes: 32 additions & 0 deletions infrastructure/helm-chart/charts/tools/charts/flink/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@rules_pkg//:pkg.bzl", "pkg_tar")
load("@com_github_airyhq_bazel_tools//helm:helm.bzl", "helm_template_test")
load("//tools/build:helm.bzl", "helm_push_develop", "helm_push_release")

filegroup(
name = "files",
srcs = glob(
["**/*"],
exclude = ["BUILD"],
),
visibility = ["//visibility:public"],
)

pkg_tar(
name = "package",
srcs = [":files"],
extension = "tgz",
strip_prefix = "./",
)

helm_template_test(
name = "template",
chart = ":package",
)

helm_push_develop(
chart = ":package",
)

helm_push_release(
chart = ":package",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: v2
appVersion: "1.0"
description: Flink
name: flink
version: 1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.pekko.name = org.apache.pekko
logger.pekko.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
- name: restapi
port: 8083
selector:
app: flink
component: jobmanager
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: ghcr.io/airyhq/flink:release
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: ghcr.io/airyhq/flink:release
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: flink
mandatory: false
enabled: true
4 changes: 4 additions & 0 deletions infrastructure/images/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM flink:1.17.2-scala_2.12-java8

# Copy the Kafka connector
COPY files/flink-sql-connector-kafka-1.17.2.jar /opt/flink/lib/flink-sql-connector-kafka-1.17.2.jar
6 changes: 6 additions & 0 deletions infrastructure/images/flink/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
build:
docker build -t flink .

release: build
docker tag flink ghcr.io/airyhq/flink:release
docker push ghcr.io/airyhq/flink:release
Binary file not shown.