diff --git a/backend/components/flink/helm/BUILD b/backend/components/flink/helm/BUILD new file mode 100644 index 000000000..5bb7f8f59 --- /dev/null +++ b/backend/components/flink/helm/BUILD @@ -0,0 +1,32 @@ +load("@rules_pkg//:pkg.bzl", "pkg_tar") +load("@com_github_airyhq_bazel_tools//helm:helm.bzl", "helm_template_test") +load("//tools/build:helm.bzl", "helm_push_develop", "helm_push_release") + +filegroup( + name = "files", + srcs = glob( + ["**/*"], + exclude = ["BUILD"], + ), + visibility = ["//visibility:public"], +) + +pkg_tar( + name = "package", + srcs = [":files"], + extension = "tgz", + strip_prefix = "./", +) + +helm_template_test( + name = "template", + chart = ":package", +) + +helm_push_develop( + chart = ":package", +) + +helm_push_release( + chart = ":package", +) diff --git a/backend/components/flink/helm/Chart.yaml b/backend/components/flink/helm/Chart.yaml new file mode 100644 index 000000000..e260114c6 --- /dev/null +++ b/backend/components/flink/helm/Chart.yaml @@ -0,0 +1,5 @@ +apiVersion: v2 +appVersion: "1.0" +description: Flink connector +name: flink-connector +version: 1.0 diff --git a/backend/components/flink/helm/templates/configmap.yaml b/backend/components/flink/helm/templates/configmap.yaml new file mode 100644 index 000000000..05de4d589 --- /dev/null +++ b/backend/components/flink/helm/templates/configmap.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.component }} + labels: + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: "{{ .Values.component }}" + annotations: + core.airy.co/enabled: "{{ .Values.enabled }}" diff --git a/backend/components/flink/helm/templates/result-sender/deployment.yaml b/backend/components/flink/helm/templates/result-sender/deployment.yaml new file mode 100644 index 000000000..b35f3bafd --- /dev/null +++ b/backend/components/flink/helm/templates/result-sender/deployment.yaml @@ -0,0 +1,50 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.component }}-{{ .Values.resultSender.name }} + labels: + app: {{ .Values.component }} + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: {{ .Values.component }} +spec: + replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }} + selector: + matchLabels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} + spec: + containers: + - name: app + image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release" + imagePullPolicy: Always + envFrom: + - configMapRef: + name: security + - configMapRef: + name: kafka-config + - configMapRef: + name: {{ .Values.component }} + env: + - name: KAFKA_TOPIC_NAME + value: {{ .Values.resultSender.topic }} + - name: API_COMMUNICATION_URL + value: {{ .Values.apiCommunicationUrl }} + livenessProbe: + httpGet: + path: /actuator/health + port: {{ .Values.port }} + httpHeaders: + - name: Health-Check + value: health-check + initialDelaySeconds: 43200 + periodSeconds: 10 + failureThreshold: 3 diff --git a/backend/components/flink/helm/templates/result-sender/service.yaml b/backend/components/flink/helm/templates/result-sender/service.yaml new file mode 100644 index 000000000..b5ef5f7f7 --- /dev/null +++ b/backend/components/flink/helm/templates/result-sender/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.component }}-{{ .Values.resultSender.name }} + labels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} +spec: + type: ClusterIP + clusterIP: None + ports: + - name: {{ .Values.component }}-{{ .Values.resultSender.name }} + port: 80 + targetPort: {{ .Values.port }} + selector: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} diff --git a/backend/components/flink/helm/templates/statements-executor/deployment.yaml b/backend/components/flink/helm/templates/statements-executor/deployment.yaml new file mode 100644 index 000000000..5d6d4c1dc --- /dev/null +++ b/backend/components/flink/helm/templates/statements-executor/deployment.yaml @@ -0,0 +1,50 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.component }}-{{ .Values.executor.name }} + labels: + app: {{ .Values.component }} + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: {{ .Values.component }} +spec: + replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }} + selector: + matchLabels: + app: {{ .Values.component }}-{{ .Values.executor.name }} + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: {{ .Values.component }}-{{ .Values.executor.name }} + spec: + containers: + - name: app + image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release" + imagePullPolicy: Always + envFrom: + - configMapRef: + name: security + - configMapRef: + name: kafka-config + - configMapRef: + name: {{ .Values.component }} + env: + - name: KAFKA_TOPIC_NAME + value: {{ .Values.executor.topic }} + - name: FLINK_GATEWAY_URL + value: {{ .Values.gatewayUrl }} + livenessProbe: + httpGet: + path: /actuator/health + port: {{ .Values.port }} + httpHeaders: + - name: Health-Check + value: health-check + initialDelaySeconds: 43200 + periodSeconds: 10 + failureThreshold: 3 diff --git a/backend/components/flink/helm/templates/statements-executor/service.yaml b/backend/components/flink/helm/templates/statements-executor/service.yaml new file mode 100644 index 000000000..e0cbbd5e8 --- /dev/null +++ b/backend/components/flink/helm/templates/statements-executor/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.component }}-{{ .Values.executor.name }} + labels: + app: {{ .Values.component }}-{{ .Values.executor.name }} +spec: + type: ClusterIP + clusterIP: None + ports: + - name: {{ .Values.component }}-{{ .Values.executor.name }} + port: 80 + targetPort: {{ .Values.port }} + selector: + app: {{ .Values.component }}-{{ .Values.executor.name }} diff --git a/backend/components/flink/helm/values.yaml b/backend/components/flink/helm/values.yaml new file mode 100644 index 000000000..61480685f --- /dev/null +++ b/backend/components/flink/helm/values.yaml @@ -0,0 +1,15 @@ +component: flink-connector +mandatory: false +enabled: false +port: 8080 +resources: +gatewayUrl: "http://flink-jobmanager:8083" +apiCommunicationUrl: "http://api-communication/messages.send" +executor: + name: statements-executor + image: connectors/flink/statements-executor + topic: flink.statements +resultSender: + name: result-sender + image: connectors/flink/result-sender + topic: flink.output diff --git a/backend/components/flink/result-sender/Dockerfile b/backend/components/flink/result-sender/Dockerfile new file mode 100644 index 000000000..13e3499ac --- /dev/null +++ b/backend/components/flink/result-sender/Dockerfile @@ -0,0 +1,22 @@ +# Use the official Golang image as the base image +FROM golang:1.17 + +# Set the working directory inside the container +WORKDIR /app + +# Copy the Go source code into the container +COPY ./src/*.go ./ + +# Install the required libraries +RUN go mod init main && \ + go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \ + go get golang.org/x/net + +# Build the Go program +RUN go build -o app + +# Command to run the Go program +CMD ["./app"] diff --git a/backend/components/flink/result-sender/Makefile b/backend/components/flink/result-sender/Makefile new file mode 100644 index 000000000..3b5059783 --- /dev/null +++ b/backend/components/flink/result-sender/Makefile @@ -0,0 +1,6 @@ +build: + docker build -t flink-connector/result-sender . + +release: build + docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release + docker push ghcr.io/airyhq/connectors/flink/result-sender:release diff --git a/backend/components/flink/result-sender/src/main.go b/backend/components/flink/result-sender/src/main.go new file mode 100644 index 000000000..6f5fc32a6 --- /dev/null +++ b/backend/components/flink/result-sender/src/main.go @@ -0,0 +1,111 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + kafkaURL := os.Getenv("KAFKA_BROKERS") + schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") + topicName := os.Getenv("KAFKA_TOPIC_NAME") + systemToken := os.Getenv("systemToken") + authUsername := os.Getenv("AUTH_JAAS_USERNAME") + authPassword := os.Getenv("AUTH_JAAS_PASSWORD") + groupID := "result-sender" + flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL") + + if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { + fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") + return + } + + http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { + response := map[string]string{"status": "UP"} + jsonResponse, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(jsonResponse) + }) + + go func() { + if err := http.ListenAndServe(":80", nil); err != nil { + panic(err) + } + }() + + fmt.Println("Health-check started") + + fmt.Println("Creating Kafka consumer for topic: ", topicName) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + "group.id": groupID, + "auto.offset.reset": "earliest", + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "sasl.username": authUsername, + "sasl.password": authPassword, + }) + if err != nil { + fmt.Printf("Error creating consumer: %v\n", err) + return + } + c.SubscribeTopics([]string{topicName}, nil) + signals := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + + go func() { + for { + select { + case sig := <-signals: + fmt.Printf("Caught signal %v: terminating\n", sig) + done <- true + return + default: + msg, err := c.ReadMessage(-1) + if err == nil { + var flinkOutput FlinkOutput + if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil { + fmt.Printf("Error unmarshalling message: %v\n", err) + continue + } else { + fmt.Printf("Received message: %+v\n", flinkOutput) + + fmt.Println("Flink gateway: ", flinkGatewayURL) + result, err := getFlinkResult(flinkGatewayURL, flinkOutput.SessionID) + if err != nil { + fmt.Println("Unable to get Flink result:", err) + return + } + response, err := convertResultToMarkdown(result) + if err != nil { + fmt.Println("Unable to generate Markdown from result:", err) + sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken) + return + } + sendMessage(response, flinkOutput.ConversationID, systemToken) + } + } else { + fmt.Printf("Consumer error: %v\n", err) + } + } + } + }() + <-done + c.Close() + fmt.Println("Consumer closed") +} diff --git a/backend/components/flink/result-sender/src/tools.go b/backend/components/flink/result-sender/src/tools.go new file mode 100644 index 000000000..6be5dee2f --- /dev/null +++ b/backend/components/flink/result-sender/src/tools.go @@ -0,0 +1,160 @@ +package main + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "strings" + "time" +) + +// getFlinkResult sends an SQL statement to the Flink Gateway +func getFlinkResult(url, sessionID string) (FlinkResult, error) { + // Create the SQL Statement + fmt.Println("The Flink session is: ", sessionID) + payload := FlinkSQLRequest{ + Statement: "select * from output;", + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return FlinkResult{}, err + } + + req, err := http.NewRequest("POST", url+"/v1/sessions/"+sessionID+"/statements/", bytes.NewReader(payloadBytes)) + if err != nil { + return FlinkResult{}, err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return FlinkResult{}, err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse FlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return FlinkResult{}, err + } + + // Fetch the results from the operationHandle + fmt.Printf("Fetching result from: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionID, statementResponse.OperationHandle) + time.Sleep(10 * time.Second) + req, err = http.NewRequest("GET", url+"/v1/sessions/"+sessionID+"/operations/"+statementResponse.OperationHandle+"/result/0", nil) + if err != nil { + return FlinkResult{}, err + } + req.Header.Set("Content-Type", "application/json") + + client = &http.Client{} + resp, err = client.Do(req) + if err != nil { + return FlinkResult{}, err + } + body, err = io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var flinkResultResponse FlinkResultResponse + if err := json.Unmarshal(body, &flinkResultResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return FlinkResult{}, err + } + defer resp.Body.Close() + + // Handle the response (check if the request was successful) + return flinkResultResponse.Results, nil +} + +func sendMessage(message string, conversationId string, systemToken string) (int, string, error) { + apiCommunicationUrl := os.Getenv("API_COMMUNICATION_URL") + messageContent := messageContent{ + Text: message, + } + messageToSend := ApplicationCommunicationSendMessage{ + ConversationID: conversationId, + Message: messageContent, + } + messageJSON, err := json.Marshal(messageToSend) + if err != nil { + fmt.Printf("Error encoding response to JSON: %v\n", err) + return 0, "", errors.New("The message could not be encoded to JSON for sending.") + } + + req, err := http.NewRequest("POST", apiCommunicationUrl, bytes.NewReader(messageJSON)) + if err != nil { + fmt.Printf("Error creating request: %v\n", err) + return 0, "", errors.New("The message could not be sent.") + } + req.Header.Add("Authorization", "Bearer "+systemToken) + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + fmt.Printf("Error sending POST request: %v\n", err) + return 0, "", errors.New("Error sending POST request.") + } + defer resp.Body.Close() + + // Read the response body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + return 0, "", errors.New("Error reading response body.") + } + + var response SendMessageResponse + err = json.Unmarshal(body, &response) + if err != nil { + fmt.Println("Error unmarshaling response:", err) + return 0, "", errors.New("Response couldn't be unmarshaled.") + } + + fmt.Printf("Message sent with status code: %d\n", resp.StatusCode) + return resp.StatusCode, response.ID, nil +} + +func markdown(message string) (string, error) { + return message, nil +} + +func convertResultToMarkdown(result FlinkResult) (string, error) { + var builder strings.Builder + + // Add the header row + if len(result.Columns) == 0 { + return "", errors.New("No columns found for generating the Markdown table.") + } + for _, col := range result.Columns { + builder.WriteString("| " + col.Name + " ") + } + builder.WriteString("|\n") + + // Add the separator row + for range result.Columns { + builder.WriteString("|---") + } + builder.WriteString("|\n") + + // Add the data rows + for _, d := range result.Data { + for _, field := range d.Fields { + builder.WriteString(fmt.Sprintf("| %v ", field)) + } + builder.WriteString("|\n") + } + + return builder.String(), nil +} diff --git a/backend/components/flink/result-sender/src/types.go b/backend/components/flink/result-sender/src/types.go new file mode 100644 index 000000000..7c751feb0 --- /dev/null +++ b/backend/components/flink/result-sender/src/types.go @@ -0,0 +1,66 @@ +package main + +type ApplicationCommunicationSendMessage struct { + ConversationID string `json:"conversation_id"` + Message messageContent `json:"message"` + Metadata map[string]string `json:"metadata"` +} + +type messageContent struct { + Text string `json:"text"` +} + +type SendMessageResponse struct { + ID string `json:"id"` + State string `json:"state"` +} + +type FlinkOutput struct { + SessionID string `json:"session_id"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +// FlinkSQLRequest represents the payload for a Flink SQL request +type FlinkSQLRequest struct { + Statement string `json:"statement"` +} + +type FlinkSessionResponse struct { + SessionHandle string `json:"sessionHandle"` +} + +type FlinkStatementResponse struct { + OperationHandle string `json:"operationHandle"` +} + +type Column struct { + Name string `json:"name"` + LogicalType struct { + Type string `json:"type"` + Nullable bool `json:"nullable"` + Length int `json:"length,omitempty"` + } `json:"logicalType"` + Comment interface{} `json:"comment"` +} + +type Data struct { + Kind string `json:"kind"` + Fields []interface{} `json:"fields"` +} + +type FlinkResult struct { + Columns []Column `json:"columns"` + RowFormat string `json:"rowFormat"` + Data []Data `json:"data"` +} + +type FlinkResultResponse struct { + ResultType string `json:"resultType"` + IsQueryResult bool `json:"isQueryResult"` + JobID string `json:"jobID"` + ResultKind string `json:"resultKind"` + Results FlinkResult `json:"results"` + NextResultUri string `json:"nextResultUri"` +} diff --git a/backend/components/flink/statements-executor/Dockerfile b/backend/components/flink/statements-executor/Dockerfile new file mode 100644 index 000000000..13e3499ac --- /dev/null +++ b/backend/components/flink/statements-executor/Dockerfile @@ -0,0 +1,22 @@ +# Use the official Golang image as the base image +FROM golang:1.17 + +# Set the working directory inside the container +WORKDIR /app + +# Copy the Go source code into the container +COPY ./src/*.go ./ + +# Install the required libraries +RUN go mod init main && \ + go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \ + go get golang.org/x/net + +# Build the Go program +RUN go build -o app + +# Command to run the Go program +CMD ["./app"] diff --git a/backend/components/flink/statements-executor/Makefile b/backend/components/flink/statements-executor/Makefile new file mode 100644 index 000000000..0a969da1d --- /dev/null +++ b/backend/components/flink/statements-executor/Makefile @@ -0,0 +1,6 @@ +build: + docker build -t flink-connector/statements-executor . + +release: build + docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release + docker push ghcr.io/airyhq/connectors/flink/statements-executor:release diff --git a/backend/components/flink/statements-executor/src/main.go b/backend/components/flink/statements-executor/src/main.go new file mode 100644 index 000000000..89aea27e3 --- /dev/null +++ b/backend/components/flink/statements-executor/src/main.go @@ -0,0 +1,115 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + kafkaURL := os.Getenv("KAFKA_BROKERS") + schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") + topicName := os.Getenv("KAFKA_TOPIC_NAME") + authUsername := os.Getenv("AUTH_JAAS_USERNAME") + authPassword := os.Getenv("AUTH_JAAS_PASSWORD") + timestamp := time.Now().Unix() + strTimestamp := fmt.Sprintf("%d", timestamp) + groupID := "statement-executor-" + strTimestamp + flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL") + + if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { + fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") + return + } + + http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { + response := map[string]string{"status": "UP"} + jsonResponse, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(jsonResponse) + }) + + go func() { + if err := http.ListenAndServe(":80", nil); err != nil { + panic(err) + } + }() + + fmt.Println("Health-check started") + + fmt.Println("Creating Kafka consumer for topic: ", topicName) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + "group.id": groupID, + "auto.offset.reset": "earliest", + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "sasl.username": authUsername, + "sasl.password": authPassword, + }) + if err != nil { + fmt.Printf("Error creating consumer: %v\n", err) + return + } + c.SubscribeTopics([]string{topicName}, nil) + signals := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + + go func() { + for { + select { + case sig := <-signals: + fmt.Printf("Caught signal %v: terminating\n", sig) + done <- true + return + default: + msg, err := c.ReadMessage(-1) + if err == nil { + var statementSet FlinkStatementSet + if err := json.Unmarshal(msg.Value, &statementSet); err != nil { + fmt.Printf("Error unmarshalling message: %v\n", err) + continue + } else { + fmt.Printf("Received message: %+v\n", statementSet) + fmt.Println("Flink gateway: ", flinkGatewayURL) + sessionID, err := sendFlinkSQL(flinkGatewayURL, statementSet) + if err != nil { + fmt.Println("Error running Flink statement:", err) + return + } + fmt.Println("Successfully executing the Flink statement.") + var flinkOutput FlinkOutput + flinkOutput.SessionID = sessionID + flinkOutput.Question = statementSet.Question + flinkOutput.MessageID = statementSet.MessageID + flinkOutput.ConversationID = statementSet.ConversationID + err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword) + if err != nil { + fmt.Printf("error producing message to Kafka: %v\n", err) + } + } + } else { + fmt.Printf("Consumer error: %v\n", err) + } + } + } + }() + <-done + c.Close() + fmt.Println("Consumer closed") +} diff --git a/backend/components/flink/statements-executor/src/tools.go b/backend/components/flink/statements-executor/src/tools.go new file mode 100644 index 000000000..57ec01fda --- /dev/null +++ b/backend/components/flink/statements-executor/src/tools.go @@ -0,0 +1,134 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) { + timestamp := time.Now().Unix() + strTimestamp := fmt.Sprintf("%d", timestamp) + replacements := map[string]string{ + "{PROPERTIES_GROUP_ID}": "flink-" + strTimestamp, + "{PROPERTIES_BOOTSTRAP_SERVERS}": os.Getenv("KAFKA_BROKERS"), + "{PROPERTIES_SASL_JAAS_CONFIG}": fmt.Sprintf("org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", os.Getenv("AUTH_JAAS_USERNAME"), os.Getenv("AUTH_JAAS_PASSWORD")), + } + for i, stmt := range statementSet.Statements { + for placeholder, value := range replacements { + stmt = strings.Replace(stmt, placeholder, value, -1) + } + statementSet.Statements[i] = stmt + } + fmt.Println("Updated StatementSet: %+v\n", statementSet.Statements) + + // Create a sessionHandle + req, err := http.NewRequest("POST", url+"/v1/sessions/", bytes.NewReader([]byte(""))) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Response: ", string(body)) + var sessionResponse FlinkSessionResponse + if err := json.Unmarshal(body, &sessionResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return "", err + } + defer resp.Body.Close() + + // Create the SQL Statement + fmt.Println("The Flink session is: ", sessionResponse.SessionHandle) + for _, statement := range statementSet.Statements { + payload := FlinkSQLRequest{ + Statement: statement, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return "", err + } + + req, err = http.NewRequest("POST", url+"/v1/sessions/"+sessionResponse.SessionHandle+"/statements/", bytes.NewReader(payloadBytes)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + client = &http.Client{} + resp, err = client.Do(req) + if err != nil { + return "", err + } + body, err = io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse FlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return "", err + } + fmt.Printf("Check status on: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionResponse.SessionHandle, statementResponse.OperationHandle) + defer resp.Body.Close() + } + + return sessionResponse.SessionHandle, nil +} + +func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername, authPassword string) error { + + kafkaTopic := "flink.outputs" + + flinkOutputJSON, err := json.Marshal(flinkOutput) + if err != nil { + return fmt.Errorf("error marshaling query to JSON: %w", err) + } + + configMap := kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + } + if authUsername != "" && authPassword != "" { + configMap.SetKey("security.protocol", "SASL_SSL") + configMap.SetKey("sasl.mechanisms", "PLAIN") + configMap.SetKey("sasl.username", authUsername) + configMap.SetKey("sasl.password", authPassword) + } + + producer, err := kafka.NewProducer(&configMap) + if err != nil { + return fmt.Errorf("failed to create producer: %w", err) + } + defer producer.Close() + + message := kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny}, + Key: []byte(flinkOutput.SessionID), + Value: flinkOutputJSON, + } + + err = producer.Produce(&message, nil) + if err != nil { + return fmt.Errorf("failed to produce message: %w", err) + } + fmt.Println("message scheduled for production") + producer.Flush(15 * 1000) + fmt.Println("message flushed") + return nil +} diff --git a/backend/components/flink/statements-executor/src/types.go b/backend/components/flink/statements-executor/src/types.go new file mode 100644 index 000000000..6c9f04287 --- /dev/null +++ b/backend/components/flink/statements-executor/src/types.go @@ -0,0 +1,28 @@ +package main + +type FlinkStatementSet struct { + Statements []string `json:"statements"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +type FlinkOutput struct { + SessionID string `json:"session_id"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +// FlinkSQLRequest represents the payload for a Flink SQL request +type FlinkSQLRequest struct { + Statement string `json:"statement"` +} + +type FlinkSessionResponse struct { + SessionHandle string `json:"sessionHandle"` +} + +type FlinkStatementResponse struct { + OperationHandle string `json:"operationHandle"` +} diff --git a/infrastructure/helm-chart/charts/tools/charts/flink/BUILD b/infrastructure/helm-chart/charts/tools/charts/flink/BUILD new file mode 100644 index 000000000..5bb7f8f59 --- /dev/null +++ b/infrastructure/helm-chart/charts/tools/charts/flink/BUILD @@ -0,0 +1,32 @@ +load("@rules_pkg//:pkg.bzl", "pkg_tar") +load("@com_github_airyhq_bazel_tools//helm:helm.bzl", "helm_template_test") +load("//tools/build:helm.bzl", "helm_push_develop", "helm_push_release") + +filegroup( + name = "files", + srcs = glob( + ["**/*"], + exclude = ["BUILD"], + ), + visibility = ["//visibility:public"], +) + +pkg_tar( + name = "package", + srcs = [":files"], + extension = "tgz", + strip_prefix = "./", +) + +helm_template_test( + name = "template", + chart = ":package", +) + +helm_push_develop( + chart = ":package", +) + +helm_push_release( + chart = ":package", +) diff --git a/infrastructure/helm-chart/charts/tools/charts/flink/Chart.yaml b/infrastructure/helm-chart/charts/tools/charts/flink/Chart.yaml new file mode 100644 index 000000000..f6a32ed28 --- /dev/null +++ b/infrastructure/helm-chart/charts/tools/charts/flink/Chart.yaml @@ -0,0 +1,5 @@ +apiVersion: v2 +appVersion: "1.0" +description: Flink +name: flink +version: 1.0 diff --git a/infrastructure/helm-chart/charts/tools/charts/flink/templates/configuration.yaml b/infrastructure/helm-chart/charts/tools/charts/flink/templates/configuration.yaml new file mode 100644 index 000000000..0e2edb551 --- /dev/null +++ b/infrastructure/helm-chart/charts/tools/charts/flink/templates/configuration.yaml @@ -0,0 +1,61 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: flink-config + labels: + app: flink +data: + flink-conf.yaml: |+ + jobmanager.rpc.address: flink-jobmanager + taskmanager.numberOfTaskSlots: 2 + blob.server.port: 6124 + jobmanager.rpc.port: 6123 + taskmanager.rpc.port: 6122 + jobmanager.memory.process.size: 1600m + taskmanager.memory.process.size: 1728m + parallelism.default: 2 + log4j-console.properties: |+ + # This affects logging for both user code and Flink + rootLogger.level = INFO + rootLogger.appenderRef.console.ref = ConsoleAppender + rootLogger.appenderRef.rolling.ref = RollingFileAppender + + # Uncomment this if you want to _only_ change Flink's logging + #logger.flink.name = org.apache.flink + #logger.flink.level = INFO + + # The following lines keep the log level of common libraries/connectors on + # log level INFO. The root logger does not override this. You have to manually + # change the log levels here. + logger.pekko.name = org.apache.pekko + logger.pekko.level = INFO + logger.kafka.name= org.apache.kafka + logger.kafka.level = INFO + logger.hadoop.name = org.apache.hadoop + logger.hadoop.level = INFO + logger.zookeeper.name = org.apache.zookeeper + logger.zookeeper.level = INFO + + # Log all infos to the console + appender.console.name = ConsoleAppender + appender.console.type = CONSOLE + appender.console.layout.type = PatternLayout + appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n + + # Log all infos in the given rolling file + appender.rolling.name = RollingFileAppender + appender.rolling.type = RollingFile + appender.rolling.append = false + appender.rolling.fileName = ${sys:log.file} + appender.rolling.filePattern = ${sys:log.file}.%i + appender.rolling.layout.type = PatternLayout + appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n + appender.rolling.policies.type = Policies + appender.rolling.policies.size.type = SizeBasedTriggeringPolicy + appender.rolling.policies.size.size=100MB + appender.rolling.strategy.type = DefaultRolloverStrategy + appender.rolling.strategy.max = 10 + + # Suppress the irrelevant (wrong) warnings from the Netty channel handler + logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline + logger.netty.level = OFF diff --git a/infrastructure/helm-chart/charts/tools/charts/flink/templates/jobmanager-service.yaml b/infrastructure/helm-chart/charts/tools/charts/flink/templates/jobmanager-service.yaml new file mode 100644 index 000000000..38a92b7df --- /dev/null +++ b/infrastructure/helm-chart/charts/tools/charts/flink/templates/jobmanager-service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + name: flink-jobmanager +spec: + type: ClusterIP + ports: + - name: rpc + port: 6123 + - name: blob-server + port: 6124 + - name: webui + port: 8081 + - name: restapi + port: 8083 + selector: + app: flink + component: jobmanager diff --git a/infrastructure/helm-chart/charts/tools/charts/flink/templates/jobmanager.yaml b/infrastructure/helm-chart/charts/tools/charts/flink/templates/jobmanager.yaml new file mode 100644 index 000000000..6a76b5f94 --- /dev/null +++ b/infrastructure/helm-chart/charts/tools/charts/flink/templates/jobmanager.yaml @@ -0,0 +1,46 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: flink-jobmanager +spec: + replicas: 1 + selector: + matchLabels: + app: flink + component: jobmanager + template: + metadata: + labels: + app: flink + component: jobmanager + spec: + containers: + - name: jobmanager + image: ghcr.io/airyhq/flink:release + args: ["jobmanager"] + ports: + - containerPort: 6123 + name: rpc + - containerPort: 6124 + name: blob-server + - containerPort: 8081 + name: webui + livenessProbe: + tcpSocket: + port: 6123 + initialDelaySeconds: 30 + periodSeconds: 60 + volumeMounts: + - name: flink-config-volume + mountPath: /opt/flink/conf + securityContext: + runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary + volumes: + - name: flink-config-volume + configMap: + name: flink-config + items: + - key: flink-conf.yaml + path: flink-conf.yaml + - key: log4j-console.properties + path: log4j-console.properties diff --git a/infrastructure/helm-chart/charts/tools/charts/flink/templates/taskmanager.yaml b/infrastructure/helm-chart/charts/tools/charts/flink/templates/taskmanager.yaml new file mode 100644 index 000000000..5f6d68e46 --- /dev/null +++ b/infrastructure/helm-chart/charts/tools/charts/flink/templates/taskmanager.yaml @@ -0,0 +1,42 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: flink-taskmanager +spec: + replicas: 2 + selector: + matchLabels: + app: flink + component: taskmanager + template: + metadata: + labels: + app: flink + component: taskmanager + spec: + containers: + - name: taskmanager + image: ghcr.io/airyhq/flink:release + args: ["taskmanager"] + ports: + - containerPort: 6122 + name: rpc + livenessProbe: + tcpSocket: + port: 6122 + initialDelaySeconds: 30 + periodSeconds: 60 + volumeMounts: + - name: flink-config-volume + mountPath: /opt/flink/conf/ + securityContext: + runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary + volumes: + - name: flink-config-volume + configMap: + name: flink-config + items: + - key: flink-conf.yaml + path: flink-conf.yaml + - key: log4j-console.properties + path: log4j-console.properties diff --git a/infrastructure/helm-chart/charts/tools/charts/flink/values.yaml b/infrastructure/helm-chart/charts/tools/charts/flink/values.yaml new file mode 100644 index 000000000..7843d48d6 --- /dev/null +++ b/infrastructure/helm-chart/charts/tools/charts/flink/values.yaml @@ -0,0 +1,3 @@ +name: flink +mandatory: false +enabled: true diff --git a/infrastructure/images/flink/Dockerfile b/infrastructure/images/flink/Dockerfile new file mode 100644 index 000000000..03b915a41 --- /dev/null +++ b/infrastructure/images/flink/Dockerfile @@ -0,0 +1,4 @@ +FROM flink:1.17.2-scala_2.12-java8 + +# Copy the Kafka connector +COPY files/flink-sql-connector-kafka-1.17.2.jar /opt/flink/lib/flink-sql-connector-kafka-1.17.2.jar diff --git a/infrastructure/images/flink/Makefile b/infrastructure/images/flink/Makefile new file mode 100644 index 000000000..ed9e561dc --- /dev/null +++ b/infrastructure/images/flink/Makefile @@ -0,0 +1,6 @@ +build: + docker build -t flink . + +release: build + docker tag flink ghcr.io/airyhq/flink:release + docker push ghcr.io/airyhq/flink:release diff --git a/infrastructure/images/flink/files/flink-sql-connector-kafka-1.17.2.jar b/infrastructure/images/flink/files/flink-sql-connector-kafka-1.17.2.jar new file mode 100644 index 000000000..013845f7d Binary files /dev/null and b/infrastructure/images/flink/files/flink-sql-connector-kafka-1.17.2.jar differ