diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 11d1fbb6f6..699da3a78a 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -54,6 +54,16 @@ jobs:
run: |
bazel test --test_tag_filters=-lint //...
+ - name: Cleanup space
+ run: |
+ df -h
+ sudo apt-get autoremove -y
+ sudo apt-get clean
+ docker images prune -a
+ sudo rm -rf /usr/local/share/powershell
+ sudo rm -rf /opt/hostedtoolcache
+ df -h
+
- name: Build all artifacts
run: |
bazel build //...
@@ -64,11 +74,6 @@ jobs:
echo ${{ secrets.PAT }} | docker login ghcr.io -u airydevci --password-stdin
./scripts/push-images.sh
- - name: Install aws cli
- uses: chrislennon/action-aws-cli@v1.1
- env:
- ACTIONS_ALLOW_UNSECURE_COMMANDS: 'true'
-
- name: Upload airy binary to S3
if: ${{ github.ref == 'refs/heads/develop' || startsWith(github.ref, 'refs/heads/release') || github.ref == 'refs/heads/main' }}
run: |
@@ -76,6 +81,7 @@ jobs:
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
+ AWS_REGION: ${{ secrets.AWS_REGION }}
GITHUB_BRANCH: ${{ github.ref }}
- name: Publish helm charts
diff --git a/README.md b/README.md
index 92d5b4b5b6..2d80c67605 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
---
-![Airy_Explainer_Highlevel_Readme](https://airy.co/docs/core/img/getting-started/introduction-light.png)
+![Airy_Explainer_Highlevel_Readme](https://airy.co/docs/core/img/getting-started/introduction.png)
Airy Core is an is an open-source streaming app framework to train ML models and supply them with historical and real-time data. With Airy you can process data from a variety of
sources:
diff --git a/VERSION b/VERSION
index 524456c776..316ba4bd9e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.54.0
+0.55.0
diff --git a/backend/components/chat-plugin/helm/templates/backend/deployment.yaml b/backend/components/chat-plugin/helm/templates/backend/deployment.yaml
index 1f8995b817..a82f25fd2f 100644
--- a/backend/components/chat-plugin/helm/templates/backend/deployment.yaml
+++ b/backend/components/chat-plugin/helm/templates/backend/deployment.yaml
@@ -51,6 +51,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.backend.resources | indent 10 }}
initContainers:
@@ -68,3 +73,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/contacts/helm/templates/deployment.yaml b/backend/components/contacts/helm/templates/deployment.yaml
index 5e4c6846a8..b0d2e68d9b 100644
--- a/backend/components/contacts/helm/templates/deployment.yaml
+++ b/backend/components/contacts/helm/templates/deployment.yaml
@@ -45,6 +45,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.resources | indent 10 }}
initContainers:
@@ -62,3 +67,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/facebook/helm/templates/deployments.yaml b/backend/components/facebook/helm/templates/deployments.yaml
index 4d72ef5174..a530a8122f 100644
--- a/backend/components/facebook/helm/templates/deployments.yaml
+++ b/backend/components/facebook/helm/templates/deployments.yaml
@@ -59,6 +59,11 @@ spec:
- name: Health-Check
value: health-check
initialDelaySeconds: 120
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.connector.resources | indent 12 }}
initContainers:
@@ -76,6 +81,11 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
---
apiVersion: apps/v1
kind: Deployment
@@ -124,6 +134,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.eventsRouter.resources | indent 10 }}
initContainers:
@@ -141,3 +156,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/flink-connector/Dockerfile.result-sender b/backend/components/flink-connector/Dockerfile.result-sender
new file mode 100644
index 0000000000..6580476fad
--- /dev/null
+++ b/backend/components/flink-connector/Dockerfile.result-sender
@@ -0,0 +1,16 @@
+FROM golang:1.17
+
+WORKDIR /app
+
+COPY ./src/types.go ./src/tools.go ./src/result-sender.go ./
+
+RUN go mod init main && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
+ go get golang.org/x/net
+
+RUN go build -o app
+
+CMD ["./app"]
diff --git a/backend/components/flink-connector/Dockerfile.statements-executor b/backend/components/flink-connector/Dockerfile.statements-executor
new file mode 100644
index 0000000000..3280b144f7
--- /dev/null
+++ b/backend/components/flink-connector/Dockerfile.statements-executor
@@ -0,0 +1,16 @@
+FROM golang:1.17
+
+WORKDIR /app
+
+COPY ./src/types.go ./src/tools.go ./src/statements-executor.go ./
+
+RUN go mod init main && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
+ go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
+ go get golang.org/x/net
+
+RUN go build -o app
+
+CMD ["./app"]
diff --git a/backend/components/flink-connector/Makefile b/backend/components/flink-connector/Makefile
new file mode 100644
index 0000000000..1e7dcec8d0
--- /dev/null
+++ b/backend/components/flink-connector/Makefile
@@ -0,0 +1,13 @@
+build-statements-executor:
+ docker build -t flink-connector/statements-executor -f Dockerfile.statements-executor .
+
+release-statements-executor: build-statements-executor
+ docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release
+ docker push ghcr.io/airyhq/connectors/flink/statements-executor:release
+
+build-result-sender:
+ docker build -t flink-connector/result-sender -f Dockerfile.result-sender .
+
+release-result-sender: build-result-sender
+ docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release
+ docker push ghcr.io/airyhq/connectors/flink/result-sender:release
diff --git a/backend/components/flink-connector/helm/BUILD b/backend/components/flink-connector/helm/BUILD
new file mode 100644
index 0000000000..45805d5f1c
--- /dev/null
+++ b/backend/components/flink-connector/helm/BUILD
@@ -0,0 +1,3 @@
+load("//tools/build:helm.bzl", "helm_ruleset_core_version")
+
+helm_ruleset_core_version()
diff --git a/backend/components/flink-connector/helm/Chart.yaml b/backend/components/flink-connector/helm/Chart.yaml
new file mode 100644
index 0000000000..4eb993fb25
--- /dev/null
+++ b/backend/components/flink-connector/helm/Chart.yaml
@@ -0,0 +1,6 @@
+
+apiVersion: v2
+appVersion: "1.0"
+description: Flink connector
+name: flink-connector
+version: 1.0
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/configmap.yaml b/backend/components/flink-connector/helm/templates/configmap.yaml
new file mode 100644
index 0000000000..d8301a65d1
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/configmap.yaml
@@ -0,0 +1,10 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: {{ .Values.component }}
+ labels:
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: "{{ .Values.component }}"
+ annotations:
+ core.airy.co/enabled: "{{ .Values.enabled }}"
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml b/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml
new file mode 100644
index 0000000000..0f50fc5540
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/result-sender/deployment.yaml
@@ -0,0 +1,50 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ labels:
+ app: {{ .Values.component }}
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: {{ .Values.component }}
+spec:
+ replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
+ selector:
+ matchLabels:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 1
+ type: RollingUpdate
+ template:
+ metadata:
+ labels:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ spec:
+ containers:
+ - name: app
+ image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release"
+ imagePullPolicy: Always
+ envFrom:
+ - configMapRef:
+ name: security
+ - configMapRef:
+ name: kafka-config
+ - configMapRef:
+ name: {{ .Values.component }}
+ env:
+ - name: KAFKA_TOPIC_NAME
+ value: {{ .Values.resultSender.topic }}
+ - name: API_COMMUNICATION_URL
+ value: {{ .Values.apiCommunicationUrl }}
+ livenessProbe:
+ httpGet:
+ path: /actuator/health
+ port: {{ .Values.port }}
+ httpHeaders:
+ - name: Health-Check
+ value: health-check
+ initialDelaySeconds: 43200
+ periodSeconds: 10
+ failureThreshold: 3
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/result-sender/service.yaml b/backend/components/flink-connector/helm/templates/result-sender/service.yaml
new file mode 100644
index 0000000000..cdf73d72ba
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/result-sender/service.yaml
@@ -0,0 +1,15 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ labels:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
+spec:
+ type: ClusterIP
+ clusterIP: None
+ ports:
+ - name: {{ .Values.component }}-{{ .Values.resultSender.name }}
+ port: 80
+ targetPort: {{ .Values.port }}
+ selector:
+ app: {{ .Values.component }}-{{ .Values.resultSender.name }}
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml b/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml
new file mode 100644
index 0000000000..44c7b6e59c
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/statements-executor/deployment.yaml
@@ -0,0 +1,50 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: {{ .Values.component }}-{{ .Values.executor.name }}
+ labels:
+ app: {{ .Values.component }}
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: {{ .Values.component }}
+spec:
+ replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
+ selector:
+ matchLabels:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 1
+ type: RollingUpdate
+ template:
+ metadata:
+ labels:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
+ spec:
+ containers:
+ - name: app
+ image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release"
+ imagePullPolicy: Always
+ envFrom:
+ - configMapRef:
+ name: security
+ - configMapRef:
+ name: kafka-config
+ - configMapRef:
+ name: {{ .Values.component }}
+ env:
+ - name: KAFKA_TOPIC_NAME
+ value: {{ .Values.executor.topic }}
+ - name: FLINK_GATEWAY_URL
+ value: {{ .Values.gatewayUrl }}
+ livenessProbe:
+ httpGet:
+ path: /actuator/health
+ port: {{ .Values.port }}
+ httpHeaders:
+ - name: Health-Check
+ value: health-check
+ initialDelaySeconds: 43200
+ periodSeconds: 10
+ failureThreshold: 3
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/templates/statements-executor/service.yaml b/backend/components/flink-connector/helm/templates/statements-executor/service.yaml
new file mode 100644
index 0000000000..3e5fbfc30c
--- /dev/null
+++ b/backend/components/flink-connector/helm/templates/statements-executor/service.yaml
@@ -0,0 +1,16 @@
+
+apiVersion: v1
+kind: Service
+metadata:
+ name: {{ .Values.component }}-{{ .Values.executor.name }}
+ labels:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
+spec:
+ type: ClusterIP
+ clusterIP: None
+ ports:
+ - name: {{ .Values.component }}-{{ .Values.executor.name }}
+ port: 80
+ targetPort: {{ .Values.port }}
+ selector:
+ app: {{ .Values.component }}-{{ .Values.executor.name }}
\ No newline at end of file
diff --git a/backend/components/flink-connector/helm/values.yaml b/backend/components/flink-connector/helm/values.yaml
new file mode 100644
index 0000000000..71f4475a38
--- /dev/null
+++ b/backend/components/flink-connector/helm/values.yaml
@@ -0,0 +1,16 @@
+
+component: flink-connector
+mandatory: false
+enabled: false
+port: 8080
+resources:
+gatewayUrl: "http://flink-jobmanager:8083"
+apiCommunicationUrl: "http://api-communication/messages.send"
+executor:
+ name: statements-executor
+ image: connectors/flink/statements-executor
+ topic: flink.statements
+resultSender:
+ name: result-sender
+ image: connectors/flink/result-sender
+ topic: flink.output
\ No newline at end of file
diff --git a/backend/components/flink-connector/src/result-sender.go b/backend/components/flink-connector/src/result-sender.go
new file mode 100644
index 0000000000..90bfff7381
--- /dev/null
+++ b/backend/components/flink-connector/src/result-sender.go
@@ -0,0 +1,158 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+)
+
+func main() {
+
+ // Create Kafka consumer to read the statements
+ kafkaURL := os.Getenv("KAFKA_BROKERS")
+ schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
+ topicName := os.Getenv("KAFKA_TOPIC_NAME")
+ systemToken := os.Getenv("systemToken")
+ authUsername := os.Getenv("AUTH_JAAS_USERNAME")
+ authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
+ flinkProvider := os.Getenv("provider")
+ groupID := "result-sender"
+ msgNormal := false
+ msgDebug := true
+
+ if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
+ fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
+ return
+ }
+
+ var confluentConnection ConfluentConnection
+ confluentConnection.Token = os.Getenv("confluentToken")
+ confluentConnection.ComputePoolID = os.Getenv("confluentComputePoolID")
+ confluentConnection.Principal = os.Getenv("confluentPrincipal")
+ confluentConnection.SQLCurrentCatalog = os.Getenv("confluentSQLCurrentCatalog")
+ confluentConnection.SQLCurrentDatabase = os.Getenv("confluentSQLCurrentDatabase")
+
+ // Healthcheck
+ http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
+ response := map[string]string{"status": "UP"}
+ jsonResponse, err := json.Marshal(response)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(jsonResponse)
+ })
+
+ go func() {
+ if err := http.ListenAndServe(":80", nil); err != nil {
+ panic(err)
+ }
+ }()
+
+ fmt.Println("Health-check started")
+
+ // Create Kafka consumer configuration
+ fmt.Println("Creating Kafka consumer for topic: ", topicName)
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": kafkaURL,
+ "group.id": groupID,
+ "auto.offset.reset": "earliest",
+ "security.protocol": "SASL_SSL",
+ "sasl.mechanisms": "PLAIN",
+ "sasl.username": authUsername,
+ "sasl.password": authPassword,
+ })
+ if err != nil {
+ fmt.Printf("Error creating consumer: %v\n", err)
+ return
+ }
+ c.SubscribeTopics([]string{topicName}, nil)
+ // Channel for signals
+ signals := make(chan os.Signal, 1)
+ done := make(chan bool, 1)
+
+ signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
+
+ go func() {
+ for {
+ select {
+ case sig := <-signals:
+ // If an interrupt signal is received, break the loop
+ fmt.Printf("Caught signal %v: terminating\n", sig)
+ done <- true
+ return
+ default:
+ msg, err := c.ReadMessage(-1)
+ if err == nil {
+ var flinkOutput FlinkOutput
+ if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil {
+ fmt.Printf("Error unmarshalling message: %v\n", err)
+ continue
+ } else {
+ fmt.Printf("Received message: %+v\n", flinkOutput)
+
+ flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")
+ confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL")
+
+ var result FlinkResult
+ var headerConfluent []string
+ var resultConfluent string
+
+ if flinkProvider == "flink" {
+ fmt.Println("Flink gateway: ", flinkGatewayURL)
+ result, err = getFlinkResult(flinkGatewayURL, flinkOutput.SessionID)
+ headerConfluent = []string{}
+ } else {
+ fmt.Println("Flink gateway: ", confluentGatewayURL)
+ fmt.Println("Waiting 20 seconds...")
+ time.Sleep(20 * time.Second)
+ headerConfluent, resultConfluent, err = getFlinkResultConfluent(confluentGatewayURL, flinkOutput.SessionID, confluentConnection)
+ }
+ if err != nil {
+ fmt.Println("Unable to get Flink result:", err)
+ sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
+ return
+ }
+ if flinkProvider == "flink" {
+ sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", result), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
+ response, err := convertResultToMarkdown(result)
+ if err != nil {
+ fmt.Println("Unable to generate Markdown from result:", err)
+ sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
+ return
+ }
+ sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
+ } else {
+ sendMessage("Result retrieved from Flink: "+fmt.Sprintf("%#v", resultConfluent), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("Now converting the result to Markdown", flinkOutput.ConversationID, systemToken, msgDebug)
+ response, err := convertConfluentResultToMarkdown(headerConfluent, resultConfluent)
+ if err != nil {
+ fmt.Println("Unable to generate Markdown from result:", err)
+ sendMessage("Error: "+err.Error(), flinkOutput.ConversationID, systemToken, msgDebug)
+ sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken, msgNormal)
+ return
+ }
+ sendMessage(response, flinkOutput.ConversationID, systemToken, msgNormal)
+ }
+ }
+ } else {
+ fmt.Printf("Consumer error: %v\n", err)
+ }
+ }
+ }
+ }()
+ <-done
+ c.Close()
+ fmt.Println("Consumer closed")
+}
diff --git a/backend/components/flink-connector/src/statements-executor.go b/backend/components/flink-connector/src/statements-executor.go
new file mode 100644
index 0000000000..6ccf2e91be
--- /dev/null
+++ b/backend/components/flink-connector/src/statements-executor.go
@@ -0,0 +1,140 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+)
+
+func main() {
+
+ kafkaURL := os.Getenv("KAFKA_BROKERS")
+ schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
+ topicName := os.Getenv("KAFKA_TOPIC_NAME")
+ systemToken := os.Getenv("systemToken")
+ authUsername := os.Getenv("AUTH_JAAS_USERNAME")
+ authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
+ flinkProvider := os.Getenv("provider")
+ groupID := "statement-executor-"
+ msgNormal := false
+ msgDebug := true
+
+ if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
+ fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
+ return
+ }
+
+ var confluentConnection ConfluentConnection
+ confluentConnection.Token = os.Getenv("confluentToken")
+ confluentConnection.ComputePoolID = os.Getenv("confluentComputePoolID")
+ confluentConnection.Principal = os.Getenv("confluentPrincipal")
+ confluentConnection.SQLCurrentCatalog = os.Getenv("confluentSQLCurrentCatalog")
+ confluentConnection.SQLCurrentDatabase = os.Getenv("confluentSQLCurrentDatabase")
+
+ // Healthcheck
+ http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
+ response := map[string]string{"status": "UP"}
+ jsonResponse, err := json.Marshal(response)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(jsonResponse)
+ })
+
+ go func() {
+ if err := http.ListenAndServe(":80", nil); err != nil {
+ panic(err)
+ }
+ }()
+
+ fmt.Println("Health-check started")
+
+ // Create Kafka consumer configuration
+ fmt.Println("Creating Kafka consumer for topic: ", topicName)
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": kafkaURL,
+ "group.id": groupID,
+ "auto.offset.reset": "earliest",
+ "security.protocol": "SASL_SSL",
+ "sasl.mechanisms": "PLAIN",
+ "sasl.username": authUsername,
+ "sasl.password": authPassword,
+ })
+ if err != nil {
+ fmt.Printf("Error creating consumer: %v\n", err)
+ return
+ }
+ c.SubscribeTopics([]string{topicName}, nil)
+ // Channel for signals
+ signals := make(chan os.Signal, 1)
+ done := make(chan bool, 1)
+
+ signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
+
+ go func() {
+ for {
+ select {
+ case sig := <-signals:
+ fmt.Printf("Caught signal %v: terminating\n", sig)
+ done <- true
+ return
+ default:
+ msg, err := c.ReadMessage(-1)
+ if err == nil {
+ var statementSet FlinkStatementSet
+ if err := json.Unmarshal(msg.Value, &statementSet); err != nil {
+ fmt.Printf("Error unmarshalling message: %v\n", err)
+ continue
+ } else {
+ fmt.Printf("Received message: %+v\n", statementSet)
+
+ flinkGatewayURL := os.Getenv("FLINK_GATEWAY_URL")
+ confluentGatewayURL := os.Getenv("CONFLUENT_GATEWAY_URL")
+ var sessionID string
+ if flinkProvider == "flink" {
+ sessionID, err = sendFlinkSQL(flinkGatewayURL, statementSet)
+ } else {
+ sessionID, err = sendFlinkSQLConfluent(confluentGatewayURL, statementSet, confluentConnection)
+ }
+
+ if err != nil {
+ fmt.Println("Error running Flink statement:", err)
+ sendMessage("Error: "+err.Error(), statementSet.ConversationID, systemToken, msgDebug)
+ sendMessage("I am sorry, I am unable to answer that question.", statementSet.ConversationID, systemToken, msgNormal)
+ return
+ }
+ fmt.Println("Successfully executed the Flink statement.")
+ sendMessage("FlinkSessionID: "+sessionID, statementSet.ConversationID, systemToken, msgDebug)
+ var flinkOutput FlinkOutput
+ flinkOutput.SessionID = sessionID
+ flinkOutput.Question = statementSet.Question
+ flinkOutput.MessageID = statementSet.MessageID
+ flinkOutput.ConversationID = statementSet.ConversationID
+ err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword)
+ if err != nil {
+
+ fmt.Printf("error producing message to Kafka: %v\n", err)
+ sendMessage("Error: "+err.Error(), statementSet.ConversationID, systemToken, msgDebug)
+ sendMessage("I am sorry, I am unable to answer that question.", statementSet.ConversationID, systemToken, msgNormal)
+ }
+ sendMessage("Message produced to topic: flink.outputs", statementSet.ConversationID, systemToken, msgDebug)
+ }
+ } else {
+ fmt.Printf("Consumer error: %v\n", err)
+ }
+ }
+ }
+ }()
+ <-done
+ c.Close()
+ fmt.Println("Consumer closed")
+}
diff --git a/backend/components/flink-connector/src/tools.go b/backend/components/flink-connector/src/tools.go
new file mode 100644
index 0000000000..be68a63687
--- /dev/null
+++ b/backend/components/flink-connector/src/tools.go
@@ -0,0 +1,503 @@
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "strings"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+)
+
+func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) {
+ timestamp := time.Now().Unix()
+ strTimestamp := fmt.Sprintf("%d", timestamp)
+ replacements := map[string]string{
+ "{PROPERTIES_GROUP_ID}": "flink-" + strTimestamp,
+ "{PROPERTIES_BOOTSTRAP_SERVERS}": os.Getenv("KAFKA_BROKERS"),
+ "{PROPERTIES_SASL_JAAS_CONFIG}": fmt.Sprintf("org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", os.Getenv("AUTH_JAAS_USERNAME"), os.Getenv("AUTH_JAAS_PASSWORD")),
+ }
+ for i, stmt := range statementSet.Statements {
+ for placeholder, value := range replacements {
+ stmt = strings.Replace(stmt, placeholder, value, -1)
+ }
+ statementSet.Statements[i] = stmt
+ }
+ fmt.Println("Updated StatementSet: %+v\n", statementSet.Statements)
+
+ req, err := http.NewRequest("POST", url+"/v1/sessions/", bytes.NewReader([]byte("")))
+ if err != nil {
+ return "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Response: ", string(body))
+ var sessionResponse FlinkSessionResponse
+ if err := json.Unmarshal(body, &sessionResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return "", err
+ }
+ defer resp.Body.Close()
+
+ fmt.Println("The Flink session is: ", sessionResponse.SessionHandle)
+ for _, statement := range statementSet.Statements {
+ payload := FlinkSQLRequest{
+ Statement: statement,
+ }
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return "", err
+ }
+
+ req, err = http.NewRequest("POST", url+"/v1/sessions/"+sessionResponse.SessionHandle+"/statements/", bytes.NewReader(payloadBytes))
+ if err != nil {
+ return "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client = &http.Client{}
+ resp, err = client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ body, err = io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse FlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return "", err
+ }
+ fmt.Printf("Check status on: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionResponse.SessionHandle, statementResponse.OperationHandle)
+ defer resp.Body.Close()
+ }
+
+ return sessionResponse.SessionHandle, nil
+}
+
+func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername, authPassword string) error {
+
+ kafkaTopic := "flink.outputs"
+
+ flinkOutputJSON, err := json.Marshal(flinkOutput)
+ if err != nil {
+ return fmt.Errorf("error marshaling query to JSON: %w", err)
+ }
+
+ configMap := kafka.ConfigMap{
+ "bootstrap.servers": kafkaURL,
+ }
+ if authUsername != "" && authPassword != "" {
+ configMap.SetKey("security.protocol", "SASL_SSL")
+ configMap.SetKey("sasl.mechanisms", "PLAIN")
+ configMap.SetKey("sasl.username", authUsername)
+ configMap.SetKey("sasl.password", authPassword)
+ }
+
+ producer, err := kafka.NewProducer(&configMap)
+ if err != nil {
+ return fmt.Errorf("failed to create producer: %w", err)
+ }
+ defer producer.Close()
+
+ // Produce the message
+ message := kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny},
+ Key: []byte(flinkOutput.SessionID),
+ Value: flinkOutputJSON,
+ }
+
+ err = producer.Produce(&message, nil)
+ if err != nil {
+ return fmt.Errorf("failed to produce message: %w", err)
+ }
+ fmt.Println("message scheduled for production")
+ producer.Flush(15 * 1000)
+ fmt.Println("message flushed")
+ return nil
+}
+
+func sendMessage(message string, conversationId string, systemToken string, debug bool) (int, string, error) {
+ messageContent := messageContent{
+ Text: message,
+ Debug: debug,
+ }
+ messageToSend := ApplicationCommunicationSendMessage{
+ ConversationID: conversationId,
+ Message: messageContent,
+ }
+ messageJSON, err := json.Marshal(messageToSend)
+ if err != nil {
+ fmt.Printf("Error encoding response to JSON: %v\n", err)
+ return 0, "", errors.New("The message could not be encoded to JSON for sending.")
+ }
+
+ req, err := http.NewRequest("POST", "http://api-communication/messages.send", bytes.NewReader(messageJSON))
+ if err != nil {
+ fmt.Printf("Error creating request: %v\n", err)
+ return 0, "", errors.New("The message could not be sent.")
+ }
+ req.Header.Add("Authorization", "Bearer "+systemToken)
+ req.Header.Add("Content-Type", "application/json")
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ fmt.Printf("Error sending POST request: %v\n", err)
+ return 0, "", errors.New("Error sending POST request.")
+ }
+ defer resp.Body.Close()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body:", err)
+ return 0, "", errors.New("Error reading response body.")
+ }
+
+ var response SendMessageResponse
+ err = json.Unmarshal(body, &response)
+ if err != nil {
+ fmt.Println("Error unmarshaling response:", err)
+ return 0, "", errors.New("Response couldn't be unmarshaled.")
+ }
+
+ fmt.Printf("Message sent with status code: %d\n", resp.StatusCode)
+ return resp.StatusCode, response.ID, nil
+}
+
+func sendFlinkSQLConfluent(url string, statementSet FlinkStatementSet, connection ConfluentConnection) (string, error) {
+ timestamp := time.Now().Unix()
+ strTimestamp := fmt.Sprintf("%d", timestamp)
+ statementName := "airy-" + strTimestamp
+ payload := ConfluentFlink{
+ Name: statementName,
+ Spec: ConfluentFlinkSpec{
+ Statement: statementSet.Statements[0],
+ ComputePoolID: connection.ComputePoolID,
+ Principal: connection.Principal,
+ Properties: FlinkSpecProperties{
+ SQLCurrentCatalog: connection.SQLCurrentCatalog,
+ SQLCurrentDatabase: connection.SQLCurrentDatabase,
+ },
+ Stopped: false,
+ },
+ }
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return "", err
+ }
+
+ req, err := http.NewRequest("POST", url, bytes.NewReader(payloadBytes))
+ if err != nil {
+ return "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse ConfluentFlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return "", err
+ }
+ fmt.Printf("Check status on: %s/%s\n", url, statementName)
+ defer resp.Body.Close()
+
+ return statementName, nil
+}
+
+func getFlinkResult(url, sessionID string) (FlinkResult, error) {
+ fmt.Println("The Flink session is: ", sessionID)
+ payload := FlinkSQLRequest{
+ Statement: "select * from output;",
+ }
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+
+ req, err := http.NewRequest("POST", url+"/v1/sessions/"+sessionID+"/statements/", bytes.NewReader(payloadBytes))
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse FlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return FlinkResult{}, err
+ }
+
+ fmt.Printf("Fetching result from: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionID, statementResponse.OperationHandle)
+ time.Sleep(20 * time.Second)
+ req, err = http.NewRequest("GET", url+"/v1/sessions/"+sessionID+"/operations/"+statementResponse.OperationHandle+"/result/0", nil)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ client = &http.Client{}
+ resp, err = client.Do(req)
+ if err != nil {
+ return FlinkResult{}, err
+ }
+ body, err = io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var flinkResultResponse FlinkResultResponse
+ if err := json.Unmarshal(body, &flinkResultResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return FlinkResult{}, err
+ }
+ defer resp.Body.Close()
+
+ if flinkResultResponse.Errors != nil {
+ statementError := errors.New(strings.Join(flinkResultResponse.Errors, ","))
+ return FlinkResult{}, statementError
+ }
+ return flinkResultResponse.Results, nil
+}
+
+func markdown(message string) (string, error) {
+ return message, nil
+}
+
+func convertResultToMarkdown(result FlinkResult) (string, error) {
+ var builder strings.Builder
+
+ if len(result.Columns) == 0 {
+ return "", errors.New("No columns found for generating the Markdown table.")
+ }
+ for _, col := range result.Columns {
+ builder.WriteString("| " + col.Name + " ")
+ }
+ builder.WriteString("|\n")
+
+ for range result.Columns {
+ builder.WriteString("|---")
+ }
+ builder.WriteString("|\n")
+
+ for _, d := range result.Data {
+ for _, field := range d.Fields {
+ builder.WriteString(fmt.Sprintf("| %v ", field))
+ }
+ builder.WriteString("|\n")
+ }
+
+ return builder.String(), nil
+}
+
+func getFlinkResultConfluent(url, sessionID string, connection ConfluentConnection) ([]string, string, error) {
+ req, err := http.NewRequest("GET", url+"/"+sessionID, bytes.NewReader([]byte("")))
+ if err != nil {
+ return []string{}, "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return []string{}, "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var statementResponse ConfluentFlinkStatementResponse
+ if err := json.Unmarshal(body, &statementResponse); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return []string{}, "", err
+ }
+ fmt.Printf("Received result for statement: %s\n", sessionID)
+ fmt.Println("Phase: ", statementResponse.Status.Phase, " Detail: ", statementResponse.Status.Detail)
+ defer resp.Body.Close()
+
+ if statementResponse.Status.Phase == "RUNNING" || statementResponse.Status.Phase == "COMPLETED" {
+ columns, err := getColumnNames(statementResponse.Status.ResultSchema)
+ if err != nil {
+ fmt.Println("Extracting of the column names failed.")
+ return []string{}, "", err
+ }
+ req, err := http.NewRequest("GET", url+"/"+sessionID+"/results", bytes.NewReader([]byte("")))
+ if err != nil {
+ return []string{}, "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return []string{}, "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var result ConfluentFlinkResultsResponse
+ if err := json.Unmarshal(body, &result); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return []string{}, "", err
+ }
+ nextResult := result.Metadata.Next
+ fmt.Println("Next result: ", nextResult)
+ fmt.Println("Result: ", result.Results.Data)
+ data, err := dataToString(result.Results.Data)
+ if data != "" {
+ return columns, data, nil
+ } else {
+ req, err := http.NewRequest("GET", nextResult, bytes.NewReader([]byte("")))
+ if err != nil {
+ return []string{}, "", err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Add("Authorization", "Basic "+connection.Token)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ if err != nil {
+ return []string{}, "", err
+ }
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println("Error reading response body from the API: %v", err)
+ }
+ fmt.Println("Statement submitted. Response: ", string(body))
+ var result ConfluentFlinkResultsResponse
+ if err := json.Unmarshal(body, &result); err != nil {
+ fmt.Printf("Error unmarshaling message: %v\n", err)
+ return []string{}, "", err
+ }
+ data, err := dataToString(result.Results.Data)
+ return columns, data, err
+ }
+ } else {
+ err := errors.New("Flink statement failed. Status: " + statementResponse.Status.Phase)
+ return []string{}, "", err
+ }
+}
+
+func dataToString(data interface{}) (string, error) {
+ if slice, ok := data.([]interface{}); ok && len(slice) > 0 {
+ dataBytes, err := json.Marshal(data)
+ if err != nil {
+ return "", err
+ }
+ return string(dataBytes), nil
+ }
+ return "", nil
+}
+
+func convertConfluentResultToMarkdown(headerNames []string, jsonStr string) (string, error) {
+ var dataRows []ConfluentDataRow
+ err := json.Unmarshal([]byte(jsonStr), &dataRows)
+ if err != nil {
+ return "", err
+ }
+
+ var sb strings.Builder
+
+ header := generateMarkdownHeader(headerNames)
+ sb.WriteString(header)
+ sb.WriteString("\n")
+
+ separator := strings.Repeat("| --- ", strings.Count(header, "|")-1) + "|"
+ sb.WriteString(separator)
+ sb.WriteString("\n")
+
+ for _, dataRow := range dataRows {
+ sb.WriteString("|")
+ for _, cell := range dataRow.Row {
+ sb.WriteString(" ")
+ sb.WriteString(cell)
+ sb.WriteString(" |")
+ }
+ sb.WriteString("\n")
+ }
+
+ return sb.String(), nil
+}
+
+func extractColumnNames(jsonStr string) ([]string, error) {
+ var schema ConfluentResultSchema
+ err := json.Unmarshal([]byte(jsonStr), &schema)
+ if err != nil {
+ return nil, err
+ }
+
+ var columnNames []string
+ for _, column := range schema.Columns {
+ columnNames = append(columnNames, column.Name)
+ }
+
+ return columnNames, nil
+}
+
+func generateMarkdownHeader(columnNames []string) string {
+ var header string
+
+ for _, name := range columnNames {
+ header += "| " + name + " "
+ }
+ header += "|"
+
+ return header
+}
+
+func ResultsToString(rs ConfluentResultSchema) string {
+ var columnNames []string
+ for _, column := range rs.Columns {
+ columnNames = append(columnNames, column.Name)
+ }
+ return strings.Join(columnNames, ", ")
+}
+
+func getColumnNames(schema ConfluentResultSchema) ([]string, error) {
+ var columnNames []string
+ for _, column := range schema.Columns {
+ columnNames = append(columnNames, column.Name)
+ }
+ return columnNames, nil
+}
diff --git a/backend/components/flink-connector/src/types.go b/backend/components/flink-connector/src/types.go
new file mode 100644
index 0000000000..c67ee3944f
--- /dev/null
+++ b/backend/components/flink-connector/src/types.go
@@ -0,0 +1,137 @@
+package main
+
+type ApplicationCommunicationSendMessage struct {
+ ConversationID string `json:"conversation_id"`
+ Message messageContent `json:"message"`
+ Metadata map[string]string `json:"metadata"`
+}
+
+type messageContent struct {
+ Text string `json:"text"`
+ Debug bool `json:"debug"`
+}
+
+type SendMessageResponse struct {
+ ID string `json:"id"`
+ State string `json:"state"`
+}
+
+type FlinkOutput struct {
+ SessionID string `json:"session_id"`
+ Question string `json:"question"`
+ MessageID string `json:"message_id"`
+ ConversationID string `json:"conversation_id"`
+}
+
+type FlinkSQLRequest struct {
+ Statement string `json:"statement"`
+}
+
+type FlinkSessionResponse struct {
+ SessionHandle string `json:"sessionHandle"`
+}
+
+type FlinkStatementResponse struct {
+ OperationHandle string `json:"operationHandle"`
+}
+
+type Column struct {
+ Name string `json:"name"`
+ LogicalType struct {
+ Type string `json:"type"`
+ Nullable bool `json:"nullable"`
+ Length int `json:"length,omitempty"`
+ } `json:"logicalType"`
+ Comment interface{} `json:"comment"`
+}
+
+type Data struct {
+ Kind string `json:"kind"`
+ Fields []interface{} `json:"fields"`
+}
+
+type FlinkResult struct {
+ Columns []Column `json:"columns"`
+ RowFormat string `json:"rowFormat"`
+ Data []Data `json:"data"`
+}
+
+type FlinkResultResponse struct {
+ ResultType string `json:"resultType"`
+ IsQueryResult bool `json:"isQueryResult"`
+ JobID string `json:"jobID"`
+ ResultKind string `json:"resultKind"`
+ Results FlinkResult `json:"results"`
+ NextResultUri string `json:"nextResultUri"`
+ Errors []string `json:"errors"`
+}
+
+type ConfluentFlink struct {
+ Name string `json:"name"`
+ Spec ConfluentFlinkSpec `json:"spec"`
+}
+
+type ConfluentFlinkSpec struct {
+ Statement string `json:"statement"`
+ ComputePoolID string `json:"compute_pool_id"`
+ Principal string `json:"principal"`
+ Properties FlinkSpecProperties `json:"properties"`
+ Stopped bool `json:"stopped"`
+}
+
+type FlinkSpecProperties struct {
+ SQLCurrentCatalog string `json:"sql.current-catalog"`
+ SQLCurrentDatabase string `json:"sql.current-database"`
+}
+
+type ConfluentFlinkStatementResponse struct {
+ Name string `json:"name"`
+ Status ConfluentFlinkStatementStatus `json:"status"`
+}
+
+type ConfluentFlinkStatementStatus struct {
+ Detail string `json:"detail"`
+ Phase string `json:"phase"`
+ ResultSchema ConfluentResultSchema `json:"result_schema"`
+}
+
+type ConfluentResultSchema struct {
+ Columns []struct {
+ Name string `json:"name"`
+ } `json:"columns"`
+}
+
+type ConfluentFlinkResultsResponse struct {
+ Metadata ResultResponseMetadata `json:"metadata"`
+ Results ResultResponseResults `json:"results"`
+}
+
+type ResultResponseMetadata struct {
+ CreatedAt string `json:"created_at"`
+ Next string `json:"next"`
+ Self string `json:"self"`
+}
+
+type ResultResponseResults struct {
+ Data interface{} `json:"data"`
+}
+
+type ConfluentDataRow struct {
+ Op int `json:"op"`
+ Row []string `json:"row"`
+}
+
+type FlinkStatementSet struct {
+ Statements []string `json:"statements"`
+ Question string `json:"question"`
+ MessageID string `json:"message_id"`
+ ConversationID string `json:"conversation_id"`
+}
+
+type ConfluentConnection struct {
+ Token string
+ ComputePoolID string
+ Principal string
+ SQLCurrentCatalog string
+ SQLCurrentDatabase string
+}
diff --git a/backend/components/google/helm/templates/deployments.yaml b/backend/components/google/helm/templates/deployments.yaml
index 7af611787e..80c2af2376 100644
--- a/backend/components/google/helm/templates/deployments.yaml
+++ b/backend/components/google/helm/templates/deployments.yaml
@@ -54,6 +54,11 @@ spec:
- name: Health-Check
value: health-check
initialDelaySeconds: 120
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.connector.resources | indent 12 }}
initContainers:
@@ -71,6 +76,11 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
---
apiVersion: apps/v1
kind: Deployment
@@ -122,6 +132,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.eventsRouter.resources | indent 10 }}
initContainers:
@@ -139,3 +154,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/media-resolver/helm/templates/deployment.yaml b/backend/components/media-resolver/helm/templates/deployment.yaml
index b31af97ffd..3484e0b71f 100644
--- a/backend/components/media-resolver/helm/templates/deployment.yaml
+++ b/backend/components/media-resolver/helm/templates/deployment.yaml
@@ -45,6 +45,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.resources | indent 12 }}
initContainers:
@@ -62,3 +67,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/schema-registry-manager/Dockerfile b/backend/components/schema-registry-manager/Dockerfile
new file mode 100644
index 0000000000..e069954862
--- /dev/null
+++ b/backend/components/schema-registry-manager/Dockerfile
@@ -0,0 +1,19 @@
+FROM node:18
+
+WORKDIR /app
+
+COPY ./src/package*.json ./
+COPY ./src/tsconfig*.json ./
+
+RUN npm install
+RUN npm install typescript -g
+
+COPY ./src/app.ts ./
+COPY ./src/types.ts ./
+COPY ./src/providers/karapace.ts ./providers/
+
+RUN tsc
+
+EXPOSE 3000
+
+CMD [ "node", "app.js" ]
diff --git a/backend/components/schema-registry-manager/Makefile b/backend/components/schema-registry-manager/Makefile
new file mode 100644
index 0000000000..3aa78401e3
--- /dev/null
+++ b/backend/components/schema-registry-manager/Makefile
@@ -0,0 +1,6 @@
+build:
+ docker build -t schema-registry-manager .
+
+release: build
+ docker tag schema-registry-manager ghcr.io/airyhq/backend/schema-registry-manager:release
+ docker push ghcr.io/airyhq/backend/schema-registry-manager:release
diff --git a/backend/components/schema-registry-manager/helm/BUILD b/backend/components/schema-registry-manager/helm/BUILD
new file mode 100644
index 0000000000..45805d5f1c
--- /dev/null
+++ b/backend/components/schema-registry-manager/helm/BUILD
@@ -0,0 +1,3 @@
+load("//tools/build:helm.bzl", "helm_ruleset_core_version")
+
+helm_ruleset_core_version()
diff --git a/backend/components/schema-registry-manager/helm/Chart.yaml b/backend/components/schema-registry-manager/helm/Chart.yaml
new file mode 100644
index 0000000000..7205b553ce
--- /dev/null
+++ b/backend/components/schema-registry-manager/helm/Chart.yaml
@@ -0,0 +1,5 @@
+apiVersion: v2
+appVersion: "1.0"
+description: Schema registry component to manage different providers
+name: schema-registry-manager
+version: 1.0
diff --git a/backend/components/schema-registry-manager/helm/templates/configmap.yaml b/backend/components/schema-registry-manager/helm/templates/configmap.yaml
new file mode 100644
index 0000000000..05de4d5898
--- /dev/null
+++ b/backend/components/schema-registry-manager/helm/templates/configmap.yaml
@@ -0,0 +1,10 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: {{ .Values.component }}
+ labels:
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: "{{ .Values.component }}"
+ annotations:
+ core.airy.co/enabled: "{{ .Values.enabled }}"
diff --git a/backend/components/schema-registry-manager/helm/templates/deployment.yaml b/backend/components/schema-registry-manager/helm/templates/deployment.yaml
new file mode 100644
index 0000000000..b902145179
--- /dev/null
+++ b/backend/components/schema-registry-manager/helm/templates/deployment.yaml
@@ -0,0 +1,48 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: {{ .Values.component }}
+ labels:
+ app: {{ .Values.component }}
+ core.airy.co/managed: "true"
+ core.airy.co/mandatory: "{{ .Values.mandatory }}"
+ core.airy.co/component: {{ .Values.component }}
+spec:
+ replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
+ selector:
+ matchLabels:
+ app: {{ .Values.component }}
+ strategy:
+ rollingUpdate:
+ maxSurge: 1
+ maxUnavailable: 1
+ type: RollingUpdate
+ template:
+ metadata:
+ labels:
+ app: {{ .Values.component }}
+ spec:
+ containers:
+ - name: app
+ image: "ghcr.io/airyhq/{{ .Values.image }}:{{ .Values.imageTag }}"
+ imagePullPolicy: Always
+ envFrom:
+ - configMapRef:
+ name: security
+ - configMapRef:
+ name: kafka-config
+ - configMapRef:
+ name: {{ .Values.component }}
+ env:
+ - name: KAFKA_TOPIC_NAME
+ value: {{ .Values.kafka.topic }}
+ livenessProbe:
+ httpGet:
+ path: /actuator/health
+ port: {{ .Values.port }}
+ httpHeaders:
+ - name: Health-Check
+ value: health-check
+ initialDelaySeconds: 43200
+ periodSeconds: 10
+ failureThreshold: 3
diff --git a/backend/components/schema-registry-manager/helm/templates/service.yaml b/backend/components/schema-registry-manager/helm/templates/service.yaml
new file mode 100644
index 0000000000..4d636e8b26
--- /dev/null
+++ b/backend/components/schema-registry-manager/helm/templates/service.yaml
@@ -0,0 +1,15 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: {{ .Values.component }}
+ labels:
+ app: {{ .Values.component }}
+spec:
+ type: ClusterIP
+ clusterIP: None
+ ports:
+ - name: {{ .Values.component }}
+ port: 80
+ targetPort: {{ .Values.port }}
+ selector:
+ app: {{ .Values.component }}
diff --git a/backend/components/schema-registry-manager/helm/values.yaml b/backend/components/schema-registry-manager/helm/values.yaml
new file mode 100644
index 0000000000..200ef3ca36
--- /dev/null
+++ b/backend/components/schema-registry-manager/helm/values.yaml
@@ -0,0 +1,9 @@
+component: schema-registry-manager
+mandatory: false
+enabled: false
+image: backend/schema-registry-manager
+imageTag: release
+port: 3000
+resources:
+kafka:
+ topic: application.communication.messages
\ No newline at end of file
diff --git a/backend/components/schema-registry-manager/src/app.ts b/backend/components/schema-registry-manager/src/app.ts
new file mode 100644
index 0000000000..fb7f384e3e
--- /dev/null
+++ b/backend/components/schema-registry-manager/src/app.ts
@@ -0,0 +1,263 @@
+import dotenv from 'dotenv';
+import express, {Express, Request as ExpressRequest, Response as ExpressResponse} from 'express';
+import http from 'http';
+import cors from 'cors';
+
+import {SchemaProvider} from './types';
+import {
+ checkCompatibilityOfNewSchema,
+ createSchema,
+ deleteSchema,
+ getLastMessage,
+ getSchemaInfo,
+ getSchemaVersions,
+ getSchemas,
+ updateSchema,
+} from './providers/karapace';
+
+dotenv.config();
+
+const app: Express = express();
+const port = process.env.PORT || 3000;
+const bodyParser = require('body-parser');
+const currentProvider: SchemaProvider = SchemaProvider.karapace;
+
+// Middleware
+app.use(bodyParser.json());
+
+// CORS options
+const corsOptions = {
+ origin: 'http://localhost:8080',
+};
+
+// Use cors middleware with the specified options
+app.use(cors(corsOptions));
+
+app.get('/schemas.provider', (req: ExpressRequest, res: ExpressResponse) => {
+ res.status(200).send(currentProvider);
+});
+
+app.get('/schemas.list', (req: ExpressRequest, res: ExpressResponse) => {
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ getSchemas(req.get('host') as string)
+ .then((response: string[]) => {
+ res.send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+app.get('/schemas.versions', (req: ExpressRequest, res: ExpressResponse) => {
+ if (!req.query.topicName) {
+ res.status(400).send('Missing topicName');
+ return;
+ }
+
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ getSchemaVersions(req.get('host') as string, req.query.topicName as string)
+ .then((response: any) => {
+ res.status(200).send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+app.get('/schemas.info', (req: ExpressRequest, res: ExpressResponse) => {
+ if (!req.query.topicName) {
+ res.status(400).send('Missing topicName');
+ return;
+ }
+
+ let version = 'latest';
+ if (req.query.version) {
+ version = req.query.version as string;
+ }
+
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ getSchemaInfo(req.get('host') as string, req.query.topicName as string, version)
+ .then((response: any) => {
+ res.status(200).send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+app.post('/schemas.update', (req: ExpressRequest, res: ExpressResponse) => {
+ if (!req.query.topicName) {
+ res.status(400).send('Missing topicName');
+ return;
+ }
+ if (!req.body.schema) {
+ res.status(400).send('Missing schema');
+ return;
+ }
+
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ updateSchema(req.get('host') as string, req.query.topicName as string, req.body.schema as string)
+ .then((response: any) => {
+ res.status(200).send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+app.post('/schemas.create', (req: ExpressRequest, res: ExpressResponse) => {
+ if (!req.query.topicName) {
+ res.status(400).send('Missing topicName');
+ return;
+ }
+ if (!req.body.schema) {
+ res.status(400).send('Missing schema');
+ return;
+ }
+
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ createSchema(req.get('host') as string, req.query.topicName as string, req.body.schema as string)
+ .then((response: any) => {
+ res.status(200).send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+app.post('/schemas.compatibility', (req: ExpressRequest, res: ExpressResponse) => {
+ if (!req.query.topicName) {
+ res.status(400).send('Missing topicName');
+ return;
+ }
+ if (!req.query.version) {
+ res.status(400).send('Missing version');
+ return;
+ }
+ if (!req.body.schema) {
+ res.status(400).send('Missing schema');
+ return;
+ }
+
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ checkCompatibilityOfNewSchema(
+ req.get('host') as string,
+ req.query.topicName as string,
+ req.body.schema as string,
+ req.query.version as string
+ )
+ .then((response: any) => {
+ res.status(200).send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+app.post('/schemas.delete', (req: ExpressRequest, res: ExpressResponse) => {
+ if (!req.query.topicName) {
+ res.status(400).send('Missing topicName');
+ return;
+ }
+
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ deleteSchema(req.get('host') as string, req.query.topicName as string)
+ .then((response: any) => {
+ res.status(200).send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+app.get('/schemas.lastMessage', (req: ExpressRequest, res: ExpressResponse) => {
+ if (!req.query.topicName) {
+ res.status(400).send('Missing topicName');
+ return;
+ }
+
+ switch (currentProvider) {
+ case SchemaProvider.karapace:
+ getLastMessage(req.get('host') as string, req.query.topicName as string)
+ .then((response: any) => {
+ res.status(200).send(response);
+ })
+ .catch((e: any) => {
+ res.status(500).send(e);
+ });
+ break;
+ default:
+ res.status(404).send('Provider Not Found');
+ break;
+ }
+});
+
+async function startHealthcheck() {
+ const server = http.createServer((req: any, res: any) => {
+ if (req.url === '/actuator/health' && req.method === 'GET') {
+ const response = {status: 'UP'};
+ const jsonResponse = JSON.stringify(response);
+
+ res.writeHead(200, {'Content-Type': 'application/json'});
+ res.end(jsonResponse);
+ } else {
+ res.writeHead(404, {'Content-Type': 'text/plain'});
+ res.end('Not Found');
+ }
+ });
+
+ server.listen(80, () => {
+ console.log('Health-check started');
+ });
+}
+
+async function main() {
+ startHealthcheck();
+ app.listen(port, () => {
+ console.log(`Server is running on http://localhost:${port}`);
+ });
+}
+
+main().catch(console.error);
diff --git a/backend/components/schema-registry-manager/src/package.json b/backend/components/schema-registry-manager/src/package.json
new file mode 100644
index 0000000000..970c2ea5f3
--- /dev/null
+++ b/backend/components/schema-registry-manager/src/package.json
@@ -0,0 +1,17 @@
+{
+ "dependencies": {
+ "@types/express": "^4.17.21",
+ "@types/node": "^20.10.3",
+ "cors": "^2.8.5",
+ "dotenv": "^16.4.2",
+ "express": "^4.18.2",
+ "node-fetch": "^2.6.1"
+ },
+ "devDependencies": {
+ "@types/cors": "^2.8.17",
+ "@types/express": "^4.17.21",
+ "@types/node": "^20.10.3",
+ "ts-node": "^10.9.2",
+ "typescript": "^5.3.3"
+ }
+}
diff --git a/backend/components/schema-registry-manager/src/providers/karapace.ts b/backend/components/schema-registry-manager/src/providers/karapace.ts
new file mode 100644
index 0000000000..487aa31a8a
--- /dev/null
+++ b/backend/components/schema-registry-manager/src/providers/karapace.ts
@@ -0,0 +1,121 @@
+export async function getSchemas(host: string) {
+ return getData(host, 'subjects').then(response => {
+ return response;
+ });
+}
+
+export async function getSchemaVersions(host: string, topicName: string) {
+ return getData(host, `subjects/${topicName}/versions`).then(response => {
+ if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
+ return Promise.reject('404 Not Found');
+ }
+ return response;
+ });
+}
+
+export async function getSchemaInfo(host: string, topicName: string, version: string) {
+ return getData(host, `subjects/${topicName}/versions/${version}`).then(response => {
+ if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
+ return Promise.reject('404 Not Found');
+ }
+ return response;
+ });
+}
+
+export async function updateSchema(host: string, topicName: string, schema: string) {
+ const body = {
+ schema: JSON.stringify({...JSON.parse(schema)}),
+ };
+ return postData(host, `subjects/${topicName}/versions`, body).then(response => {
+ if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
+ return Promise.reject('404 Not Found');
+ }
+ if (response.id) return response;
+ if (response.message) return Promise.reject(response.message);
+ return Promise.reject('Unknown Error');
+ });
+}
+
+export async function createSchema(host: string, topicName: string, schema: string) {
+ const body = {
+ schema: JSON.stringify({...JSON.parse(schema)}),
+ };
+ return postData(host, `subjects/${topicName}/versions`, body)
+ .then(response => {
+ if (response.id) return response;
+ if (response.message) return Promise.reject(response.message);
+ return Promise.reject('Unknown Error');
+ })
+ .catch(e => {
+ return Promise.reject(e);
+ });
+}
+
+export async function checkCompatibilityOfNewSchema(host: string, topicName: string, schema: string, version: string) {
+ const body = {
+ schema: JSON.stringify({...JSON.parse(schema)}),
+ };
+
+ return postData(host, `compatibility/subjects/${topicName}/versions/${version}`, body)
+ .then(response => {
+ if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
+ return Promise.reject('404 Not Found');
+ }
+ if (response.is_compatible !== undefined) {
+ if (response.is_compatible === true) {
+ return response;
+ }
+ return Promise.reject('Schema Not Compatible');
+ }
+ if (response.message) return Promise.reject(response.message);
+ return Promise.reject('Unknown Error');
+ })
+ .catch(e => {
+ return Promise.reject(e);
+ });
+}
+
+export async function deleteSchema(host: string, topicName: string) {
+ return deleteData(host, `subjects/${topicName}`).then(response => {
+ if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
+ return Promise.reject('404 Not Found');
+ }
+ return response;
+ });
+}
+
+export async function getLastMessage(host: string, topicName: string) {
+ const body = {
+ ksql: `PRINT '${topicName}' FROM BEGINNING LIMIT 1;`,
+ streamsProperties: {},
+ };
+ return postData(host, 'query', body).then(response => {
+ return response;
+ });
+}
+
+async function getData(host: string, url: string) {
+ const response = await fetch('https://' + host + '/' + url, {
+ method: 'GET',
+ });
+ return response.json();
+}
+
+async function deleteData(host: string, url: string) {
+ const response = await fetch('https://' + host + '/' + url, {
+ method: 'DELETE',
+ });
+ return response.json();
+}
+
+async function postData(host: string, url: string, body: any) {
+ const response = await fetch('https://' + host + '/' + url, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/vnd.schemaregistry.v1+json',
+ },
+ body: JSON.stringify(body),
+ });
+
+ return response.json();
+}
diff --git a/backend/components/schema-registry-manager/src/tsconfig.json b/backend/components/schema-registry-manager/src/tsconfig.json
new file mode 100644
index 0000000000..8975f66043
--- /dev/null
+++ b/backend/components/schema-registry-manager/src/tsconfig.json
@@ -0,0 +1,10 @@
+{
+ "compilerOptions": {
+ "target": "ES2016",
+ "module": "commonjs",
+ "strict": true,
+ "esModuleInterop": true,
+ "skipLibCheck": true,
+ "forceConsistentCasingInFileNames": true
+ }
+}
\ No newline at end of file
diff --git a/backend/components/schema-registry-manager/src/types.ts b/backend/components/schema-registry-manager/src/types.ts
new file mode 100644
index 0000000000..53776abb6d
--- /dev/null
+++ b/backend/components/schema-registry-manager/src/types.ts
@@ -0,0 +1,4 @@
+export enum SchemaProvider {
+ karapace = 'karapace',
+ confluentCloud = 'confluent-cloud',
+}
diff --git a/backend/components/sources-api/helm/templates/deployment.yaml b/backend/components/sources-api/helm/templates/deployment.yaml
index 9ce7cda665..4d23b16dea 100644
--- a/backend/components/sources-api/helm/templates/deployment.yaml
+++ b/backend/components/sources-api/helm/templates/deployment.yaml
@@ -50,6 +50,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.resources | indent 10 }}
initContainers:
@@ -67,3 +72,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/streams/helm/templates/deployment.yaml b/backend/components/streams/helm/templates/deployment.yaml
index 067e9bf6c9..b0cd1dfb3f 100644
--- a/backend/components/streams/helm/templates/deployment.yaml
+++ b/backend/components/streams/helm/templates/deployment.yaml
@@ -48,6 +48,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.resources | indent 10 }}
initContainers:
@@ -67,3 +72,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/twilio/helm/templates/deployments.yaml b/backend/components/twilio/helm/templates/deployments.yaml
index 0e3a6a5706..461dd6c30d 100644
--- a/backend/components/twilio/helm/templates/deployments.yaml
+++ b/backend/components/twilio/helm/templates/deployments.yaml
@@ -54,6 +54,11 @@ spec:
- name: Health-Check
value: health-check
initialDelaySeconds: 120
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.connector.resources | indent 12 }}
initContainers:
@@ -141,3 +146,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/viber/helm/templates/deployments.yaml b/backend/components/viber/helm/templates/deployments.yaml
index ebb0d5254b..55fbb10d72 100644
--- a/backend/components/viber/helm/templates/deployments.yaml
+++ b/backend/components/viber/helm/templates/deployments.yaml
@@ -49,6 +49,11 @@ spec:
- name: Health-Check
value: health-check
initialDelaySeconds: 120
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.connector.resources | indent 12 }}
initContainers:
@@ -66,3 +71,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/webhook/helm/templates/deployments.yaml b/backend/components/webhook/helm/templates/deployments.yaml
index 814a41c9ae..6c6cb5b1b1 100644
--- a/backend/components/webhook/helm/templates/deployments.yaml
+++ b/backend/components/webhook/helm/templates/deployments.yaml
@@ -55,6 +55,11 @@ spec:
- name: Health-Check
value: health-check
initialDelaySeconds: 120
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.consumer.resources | indent 10 }}
initContainers:
@@ -157,3 +162,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/backend/components/whatsapp/helm/templates/deployments.yaml b/backend/components/whatsapp/helm/templates/deployments.yaml
index 1fcbff4b11..4c5e04b48a 100644
--- a/backend/components/whatsapp/helm/templates/deployments.yaml
+++ b/backend/components/whatsapp/helm/templates/deployments.yaml
@@ -113,6 +113,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
+{{ if .Values.global.kafkaCertAuth }}
+ volumeMounts:
+ - name: kafka-config-certs
+ mountPath: /opt/kafka/certs
+{{ end }}
resources:
{{ toYaml .Values.eventsRouter.resources | indent 10 }}
initContainers:
@@ -130,3 +135,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
+{{ if .Values.global.kafkaCertAuth }}
+ - name: kafka-config-certs
+ configMap:
+ name: kafka-config-certs
+{{ end }}
\ No newline at end of file
diff --git a/docs/docs/changelog.md b/docs/docs/changelog.md
index 0b0cb793ab..9d4aa94e5c 100644
--- a/docs/docs/changelog.md
+++ b/docs/docs/changelog.md
@@ -3,6 +3,64 @@ title: Changelog
sidebar_label: 📝 Changelog
---
+## 0.55.0
+
+#### Changes
+
+- [[#4167](https://github.com/airyhq/airy/issues/4167)] Kafka certificate authentication [[#4168](https://github.com/airyhq/airy/pull/4168)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Change name for Pinecone connector [[#4127](https://github.com/airyhq/airy/pull/4127)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Add description for components [[#4126](https://github.com/airyhq/airy/pull/4126)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Add LLM filter in Catalog [[#4125](https://github.com/airyhq/airy/pull/4125)]
+
+#### 🚀 Features
+
+- [[#4139](https://github.com/airyhq/airy/issues/4139)] Add Flink connector [[#4147](https://github.com/airyhq/airy/pull/4147)]
+- [[#3945](https://github.com/airyhq/airy/issues/3945)] Slack connector [[#4146](https://github.com/airyhq/airy/pull/4146)]
+- [[#4144](https://github.com/airyhq/airy/issues/4144)] Adapt Frontend with Schema Manager [[#4145](https://github.com/airyhq/airy/pull/4145)]
+- [[#4141](https://github.com/airyhq/airy/issues/4141)] Schema Registry Manager [[#4143](https://github.com/airyhq/airy/pull/4143)]
+- [[#4137](https://github.com/airyhq/airy/issues/4137)] Improve Kafka Sections [[#4138](https://github.com/airyhq/airy/pull/4138)]
+- [[#4132](https://github.com/airyhq/airy/issues/4132)] Added Copilot source to libs and apps [[#4133](https://github.com/airyhq/airy/pull/4133)]
+
+#### 🐛 Bug Fixes
+
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Fix selecting fields in streams [[#4123](https://github.com/airyhq/airy/pull/4123)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Fix symbol [[#4122](https://github.com/airyhq/airy/pull/4122)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Update icons for LLM components [[#4120](https://github.com/airyhq/airy/pull/4120)]
+- [[#4108](https://github.com/airyhq/airy/issues/4108)] Fix schemas and topics screens [[#4109](https://github.com/airyhq/airy/pull/4109)]
+
+#### 📚 Documentation
+
+- [[#4134](https://github.com/airyhq/airy/issues/4134)] Update the diagram in the README file [[#4136](https://github.com/airyhq/airy/pull/4136)]
+- [[#4134](https://github.com/airyhq/airy/issues/4134)] Update main diagram [[#4135](https://github.com/airyhq/airy/pull/4135)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Improve docs [[#4131](https://github.com/airyhq/airy/pull/4131)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Improve docs [[#4128](https://github.com/airyhq/airy/pull/4128)]
+- [[#4156](https://github.com/airyhq/airy/issues/4156)] Docs - Remove Solutions link [[#4115](https://github.com/airyhq/airy/pull/4115)]
+
+#### 🧰 Maintenance
+
+- Bump google.golang.org/protobuf from 1.28.0 to 1.33.0 [[#4155](https://github.com/airyhq/airy/pull/4155)]
+- Bump webpack-dev-middleware from 5.3.3 to 5.3.4 in /docs [[#4154](https://github.com/airyhq/airy/pull/4154)]
+- Bump google.golang.org/protobuf from 1.28.0 to 1.33.0 in /infrastructure/lib/go/k8s/handler [[#4152](https://github.com/airyhq/airy/pull/4152)]
+- Bump webpack-dev-middleware from 5.3.3 to 5.3.4 [[#4153](https://github.com/airyhq/airy/pull/4153)]
+- Bump express from 4.18.2 to 4.19.2 [[#4151](https://github.com/airyhq/airy/pull/4151)]
+- Bump express from 4.18.2 to 4.19.2 in /docs [[#4150](https://github.com/airyhq/airy/pull/4150)]
+- Bump follow-redirects from 1.15.2 to 1.15.6 [[#4149](https://github.com/airyhq/airy/pull/4149)]
+- Bump follow-redirects from 1.15.2 to 1.15.6 in /docs [[#4148](https://github.com/airyhq/airy/pull/4148)]
+- Bump github.com/cyphar/filepath-securejoin from 0.2.3 to 0.2.4 [[#4118](https://github.com/airyhq/airy/pull/4118)]
+- Bump @adobe/css-tools from 4.0.1 to 4.3.1 [[#4116](https://github.com/airyhq/airy/pull/4116)]
+- Bump @svgr/plugin-svgo from 6.5.1 to 8.1.0 [[#4114](https://github.com/airyhq/airy/pull/4114)]
+- Bump semver from 5.7.1 to 5.7.2 in /docs [[#4110](https://github.com/airyhq/airy/pull/4110)]
+- Bump sass-loader from 13.1.0 to 13.3.2 [[#4103](https://github.com/airyhq/airy/pull/4103)]
+- Bump word-wrap from 1.2.3 to 1.2.4 [[#4112](https://github.com/airyhq/airy/pull/4112)]
+
+#### Airy CLI
+
+You can download the Airy CLI for your operating system from the following links:
+
+[MacOS](https://airy-core-binaries.s3.amazonaws.com/0.55.0/darwin/amd64/airy)
+[Linux](https://airy-core-binaries.s3.amazonaws.com/0.55.0/linux/amd64/airy)
+[Windows](https://airy-core-binaries.s3.amazonaws.com/0.55.0/windows/amd64/airy.exe)
+
## 0.54.0
#### 🚀 Features
@@ -1348,38 +1406,3 @@ You can download the Airy CLI for your operating system from the following links
[Linux](https://airy-core-binaries.s3.amazonaws.com/0.35.0/linux/amd64/airy)
[Windows](https://airy-core-binaries.s3.amazonaws.com/0.35.0/windows/amd64/airy.exe)
-## 0.34.0
-
-#### Changes
-
-#### 🚀 Features
-
-- [[#2518](https://github.com/airyhq/airy/issues/2518)] Add fargate annotation [[#2540](https://github.com/airyhq/airy/pull/2540)]
-- [[#2305](https://github.com/airyhq/airy/issues/2305)] Add CLI outdated version warning [[#2529](https://github.com/airyhq/airy/pull/2529)]
-
-#### 🐛 Bug Fixes
-
-- [[#2434](https://github.com/airyhq/airy/issues/2434)] Fix broken instagram Facebook inbox ingestion [[#2535](https://github.com/airyhq/airy/pull/2535)]
-- [[#2457](https://github.com/airyhq/airy/issues/2457)] Fix upgrade to same version [[#2538](https://github.com/airyhq/airy/pull/2538)]
-- [[#2510](https://github.com/airyhq/airy/issues/2510)] Improve error logging for helm install [[#2522](https://github.com/airyhq/airy/pull/2522)]
-- [[#2255](https://github.com/airyhq/airy/issues/2255)] Fix helm chart url [[#2525](https://github.com/airyhq/airy/pull/2525)]
-- [[#2523](https://github.com/airyhq/airy/issues/2523)] Fix VERSION and add changelog [[#2524](https://github.com/airyhq/airy/pull/2524)]
-- [[#2473](https://github.com/airyhq/airy/issues/2473)] fix failing cypress test [[#2507](https://github.com/airyhq/airy/pull/2507)]
-
-#### 🧰 Maintenance
-
-- Bump react-redux from 7.2.5 to 7.2.6 [[#2539](https://github.com/airyhq/airy/pull/2539)]
-- Bump reselect from 4.0.0 to 4.1.1 [[#2533](https://github.com/airyhq/airy/pull/2533)]
-- Bump sass-loader from 12.1.0 to 12.3.0 [[#2534](https://github.com/airyhq/airy/pull/2534)]
-- Bump @types/react-dom from 17.0.9 to 17.0.10 [[#2526](https://github.com/airyhq/airy/pull/2526)]
-- Bump react-markdown from 7.0.1 to 7.1.0 [[#2527](https://github.com/airyhq/airy/pull/2527)]
-- Bump webpack from 5.54.0 to 5.59.1 [[#2517](https://github.com/airyhq/airy/pull/2517)]
-
-#### Airy CLI
-
-You can download the Airy CLI for your operating system from the following links:
-
-[MacOS](https://airy-core-binaries.s3.amazonaws.com/0.34.0/darwin/amd64/airy)
-[Linux](https://airy-core-binaries.s3.amazonaws.com/0.34.0/linux/amd64/airy)
-[Windows](https://airy-core-binaries.s3.amazonaws.com/0.34.0/windows/amd64/airy.exe)
-
diff --git a/docs/docs/connectors/conversational-ai/introduction.md b/docs/docs/connectors/conversational-ai/introduction.md
index df111a622c..151d33b063 100644
--- a/docs/docs/connectors/conversational-ai/introduction.md
+++ b/docs/docs/connectors/conversational-ai/introduction.md
@@ -9,7 +9,7 @@ import ButtonBox from "@site/src/components/ButtonBox";
Level up your channels' communication with Airy Core's conversational AI [connectors](/concepts/architecture#components).
-Airy Core features conversational AI [connectors](/concepts/architecture#components) that you can easily install and configure on your instance.
+Airy Core features conversational AI [connectors](/concepts/architecture#components) that you can easily install and configure on your instance. For all of the LLM connectors that Airy supports, please refer to our [Enterprise Docs](https://airy.co/docs/enterprise/).
}
iconInvertible={true}
title='WebSockets to power real-time applications'
- description="A WebSocket server that allows clients to receive near real-time updates about data flowing through the system."
+ description="A WebSocket server that allows clients to receive near real-time updates about data flowing through the system. Particularly useful in combination with our LLM connectors and apps, that can send real-time data to enrich the interaction your customers."
link='/api/websocket'
/>
}
iconInvertible={true}
title='UI: From a control center to dashboards'
- description="No-code interfaces to manage and control Airy, your connectors and your streams."
+ description="No-code interfaces to manage and control Airy, your connectors, your LLM integrations and your streams. "
link='/ui/inbox/introduction'
/>
@@ -96,5 +96,6 @@ Here is a list of the open source components which can be added to `Airy Core`:
- sources-twilio
- sources-viber
- sources-whatsapp
+- flink-connector
More information about the components API can be found [here](/api/endpoints/components).
diff --git a/docs/docs/getting-started/glossary.md b/docs/docs/getting-started/glossary.md
index d6060e5d34..f995120149 100644
--- a/docs/docs/getting-started/glossary.md
+++ b/docs/docs/getting-started/glossary.md
@@ -118,21 +118,39 @@ A tag is a special use case of metadata, which is used to tag
common, Airy Core provides specialized endpoints and filters for tagging
conversations.
+## AI & ML
+
+## Large language model
+
+A type of artificial intelligence model designed to understand and generate human-like text based on vast amounts of data. It's trained on diverse internet text to predict the next word in a sequence, enabling it to answer questions, generate content, and assist with various tasks. Airy allows a plug-able interface into different LLMs.
+
+## Vector database
+
+A high-dimensional database store which is suitable for persistent storage for natural language processing or images. The data is represented as vectors and retrieval is based on similarity, allowing for efficient similarity searches and context creation. Vector databases are very convenient for storing vector representations of streaming data that can be queried and add context to questions that are sent to LLMs, in real time.
+
+## Automation
+
+The ability of a an Airy component to react and simulate human-like conversations and automate specific tasks, in real time. It aims to provide users with immediate, consistent responses, reducing the need for human intervention in customer support, inquiries, and other conversational scenarios.
+
## Source
A source represents a system that generates messaging data that a user wants to
process with Airy Core.
-## Stream
-
-The whole Airy platform is based on Kafka and real-time streaming of messages. In the context of `streams` feature that Airy supports, a `stream` is the process of joining two or multiple Kafka topics, combining the data and creating an outout topic where the result of the streaming operation will be stored. It is based on KSQL.
-
### Provider
Source providers are API platforms that allow Airy Core to connect to one or
more of their sources typically via a webhook. E.g. Twilio is a source provider
for the Twilio SMS and WhatsApp sources.
+## App
+
+Third party open-source packages that can be installed alongside Airy, in the same Kubernetes cluster, to provide a more robust and powerful application development environment. These `Apps` can vary from databases (ex. PostgreSQL or Redis) to LLM implementations and vector databases (ex. Llama2 or FAISS).
+
+## Stream
+
+The whole Airy platform is based on Kafka and real-time streaming of messages. In the context of `streams` feature that Airy supports, a `stream` is the process of joining two or multiple Kafka topics, combining the data and creating an outout topic where the result of the streaming operation will be stored. It is based on KSQL.
+
## User
A user represents one authorized agent in Airy Core, which is different from a Contact
diff --git a/docs/docs/getting-started/installation/helm.md b/docs/docs/getting-started/installation/helm.md
index 03dcc7038f..4d3738cc0f 100644
--- a/docs/docs/getting-started/installation/helm.md
+++ b/docs/docs/getting-started/installation/helm.md
@@ -290,6 +290,49 @@ Run the following command to create the `Airy` platform without the bundled inst
helm install airy airy/airy --timeout 10m --set prerequisites.kafka.enabled=false --values ./airy.yaml
```
+#### Confluent
+
+To connect to a Kafka instance in Confluent cloud, settings the `config.kafka.brokers` and `config.kafka.aurhJaas` is enough, prior to deploying the Helm chart.
+
+#### Aiven
+
+Aiven cloud uses a keystore and truststore certificates that need to be loaded on the workloads that are connecting to Kafka. Get the necessary certificates and connection files from Aiven using the `avn` CLI and place them in a separate directory.
+
+```
+avn service user-kafka-java-creds {KAFKA_INSTANCE} --username {USERNAME} -d ./aiven/ --password {PASSWORD}
+```
+
+Create a Kubernetes ConfigMap that contains the contents of the created directory:
+
+```
+kubectl create configmap kafka-config-certs --from-file aiven/
+```
+
+Set the connection appropriate parameters in your `airy.yaml` file:
+
+```yaml
+config:
+ kafka:
+ brokers: "the-aiven-kafka-broker-url"
+ keyTrustSecret: "the-key-trust-secret"
+```
+
+Then install Airy with the following command:
+
+```sh
+helm install airy airy/airy --timeout 10m --set prerequisites.kafka.enabled=false --set global.kafkaCertAuth=true --values ./airy.yaml
+```
+
+### Kafka partitions per topic
+
+Currently all the default topics in the Airy instance are created with 10 partitions. To create these topics with a different number of partitions, add the following to your `airy.yaml` file before running `helm install` (before the initial creation of the topics):
+
+```
+provisioning:
+ kafka:
+ partitions: 2
+```
+
### Beanstalkd
The default installation creates its own [Beanstalkd](https://beanstalkd.github.io/) deployment, as it is a prerequisite for using the `integration/webhook` component.
diff --git a/docs/docs/getting-started/installation/minikube.md b/docs/docs/getting-started/installation/minikube.md
index bf9a7912dc..9feddc0fc1 100644
--- a/docs/docs/getting-started/installation/minikube.md
+++ b/docs/docs/getting-started/installation/minikube.md
@@ -14,6 +14,11 @@ Run Airy on minikube with one command.
The goal of this document is to provide an overview of how to run Airy Core on
your local machine using [minikube](https://minikube.sigs.k8s.io/).
+## Requirements
+
+- [Terraform](https://learn.hashicorp.com/tutorials/terraform/install-cli) v1.2.0+
+- [Kubectl](https://kubernetes.io/docs/tasks/tools/)
+
## Install
:::note
diff --git a/docs/docs/getting-started/introduction.md b/docs/docs/getting-started/introduction.md
index ea4ecebc36..52e28db5f8 100644
--- a/docs/docs/getting-started/introduction.md
+++ b/docs/docs/getting-started/introduction.md
@@ -23,7 +23,7 @@ Airy Core is an is an **open-source** **streaming** **app framework** to train M
-
+
Get Airy up and running with one command
diff --git a/docs/docusaurus.config.js b/docs/docusaurus.config.js
index 1f1fbae64a..84987c3900 100644
--- a/docs/docusaurus.config.js
+++ b/docs/docusaurus.config.js
@@ -66,12 +66,6 @@ module.exports = {
position: 'left',
to: 'https://airy.co/docs/enterprise/',
},
- {
- target: '_self',
- label: 'Solutions',
- position: 'left',
- href: 'https://airy.co/solutions',
- },
{
target: '_self',
label: 'Customer Stories',
diff --git a/docs/static/img/getting-started/introduction.png b/docs/static/img/getting-started/introduction.png
new file mode 100644
index 0000000000..2391349407
Binary files /dev/null and b/docs/static/img/getting-started/introduction.png differ
diff --git a/docs/yarn.lock b/docs/yarn.lock
index b57c514a24..89bc9054dc 100644
--- a/docs/yarn.lock
+++ b/docs/yarn.lock
@@ -2620,13 +2620,13 @@ binary-extensions@^2.0.0:
resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-2.2.0.tgz#75f502eeaf9ffde42fc98829645be4ea76bd9e2d"
integrity sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA==
-body-parser@1.20.1:
- version "1.20.1"
- resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.20.1.tgz#b1812a8912c195cd371a3ee5e66faa2338a5c668"
- integrity sha512-jWi7abTbYwajOytWCQc37VulmWiRae5RyTpaCyDcS5/lMdtwSz5lOpDE67srw/HYe35f1z3fDQw+3txg7gNtWw==
+body-parser@1.20.2:
+ version "1.20.2"
+ resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.20.2.tgz#6feb0e21c4724d06de7ff38da36dad4f57a747fd"
+ integrity sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA==
dependencies:
bytes "3.1.2"
- content-type "~1.0.4"
+ content-type "~1.0.5"
debug "2.6.9"
depd "2.0.0"
destroy "1.2.0"
@@ -2634,7 +2634,7 @@ body-parser@1.20.1:
iconv-lite "0.4.24"
on-finished "2.4.1"
qs "6.11.0"
- raw-body "2.5.1"
+ raw-body "2.5.2"
type-is "~1.6.18"
unpipe "1.0.0"
@@ -3057,7 +3057,7 @@ content-disposition@0.5.4:
dependencies:
safe-buffer "5.2.1"
-content-type@~1.0.4:
+content-type@~1.0.4, content-type@~1.0.5:
version "1.0.5"
resolved "https://registry.yarnpkg.com/content-type/-/content-type-1.0.5.tgz#8b773162656d1d1086784c8f23a54ce6d73d7918"
integrity sha512-nTjqfcBFEipKdXCv4YDQWCfmcLZKm81ldF0pAopTvyrFGVbcR6P/VAAd5G7N+0tTr8QqiU0tFadD6FK4NtJwOA==
@@ -3072,10 +3072,10 @@ cookie-signature@1.0.6:
resolved "https://registry.yarnpkg.com/cookie-signature/-/cookie-signature-1.0.6.tgz#e303a882b342cc3ee8ca513a79999734dab3ae2c"
integrity sha512-QADzlaHc8icV8I7vbaJXJwod9HWYp8uCqf1xa4OfNu1T7JVxQIrUgOWtHdNDtPiywmFbiS12VjotIXLrKM3orQ==
-cookie@0.5.0:
- version "0.5.0"
- resolved "https://registry.yarnpkg.com/cookie/-/cookie-0.5.0.tgz#d1f5d71adec6558c58f389987c366aa47e994f8b"
- integrity sha512-YZ3GUyn/o8gfKJlnlX7g7xq4gyO6OSuhGPKaaGssGB2qgDUS0gPgtTvoyZLTt9Ab6dC4hfc9dV5arkvc/OCmrw==
+cookie@0.6.0:
+ version "0.6.0"
+ resolved "https://registry.yarnpkg.com/cookie/-/cookie-0.6.0.tgz#2798b04b071b0ecbff0dbb62a505a8efa4e19051"
+ integrity sha512-U71cyTamuh1CRNCfpGY6to28lxvNwPG4Guz/EVjgf3Jmzv0vlDp1atT9eS5dDjMYHucpHbWns6Lwf3BKz6svdw==
copy-text-to-clipboard@^3.0.1:
version "3.1.0"
@@ -3713,16 +3713,16 @@ execa@^5.0.0:
strip-final-newline "^2.0.0"
express@^4.17.3:
- version "4.18.2"
- resolved "https://registry.yarnpkg.com/express/-/express-4.18.2.tgz#3fabe08296e930c796c19e3c516979386ba9fd59"
- integrity sha512-5/PsL6iGPdfQ/lKM1UuielYgv3BUoJfz1aUwU9vHZ+J7gyvwdQXFEBIEIaxeGf0GIcreATNyBExtalisDbuMqQ==
+ version "4.19.2"
+ resolved "https://registry.yarnpkg.com/express/-/express-4.19.2.tgz#e25437827a3aa7f2a827bc8171bbbb664a356465"
+ integrity sha512-5T6nhjsT+EOMzuck8JjBHARTHfMht0POzlA60WV2pMD3gyXw2LZnZ+ueGdNxG+0calOJcWKbpFcuzLZ91YWq9Q==
dependencies:
accepts "~1.3.8"
array-flatten "1.1.1"
- body-parser "1.20.1"
+ body-parser "1.20.2"
content-disposition "0.5.4"
content-type "~1.0.4"
- cookie "0.5.0"
+ cookie "0.6.0"
cookie-signature "1.0.6"
debug "2.6.9"
depd "2.0.0"
@@ -3909,9 +3909,9 @@ flux@^4.0.1:
fbjs "^3.0.1"
follow-redirects@^1.0.0, follow-redirects@^1.14.7:
- version "1.15.2"
- resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.2.tgz#b460864144ba63f2681096f274c4e57026da2c13"
- integrity sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==
+ version "1.15.6"
+ resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.6.tgz#7f815c0cda4249c74ff09e95ef97c23b5fd0399b"
+ integrity sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==
fork-ts-checker-webpack-plugin@^6.5.0:
version "6.5.3"
@@ -5990,10 +5990,10 @@ range-parser@^1.2.1, range-parser@~1.2.1:
resolved "https://registry.yarnpkg.com/range-parser/-/range-parser-1.2.1.tgz#3cf37023d199e1c24d1a55b84800c2f3e6468031"
integrity sha512-Hrgsx+orqoygnmhFbKaHE6c296J+HTAQXoxEF6gNupROmmGJRoyzfG3ccAveqCBrwr/2yxQ5BVd/GTl5agOwSg==
-raw-body@2.5.1:
- version "2.5.1"
- resolved "https://registry.yarnpkg.com/raw-body/-/raw-body-2.5.1.tgz#fe1b1628b181b700215e5fd42389f98b71392857"
- integrity sha512-qqJBtEyVgS0ZmPGdCFPWJ3FreoqvG4MVQln/kCgF7Olq95IbOp0/BWyMwbdtn4VTvkM8Y7khCQ2Xgk/tcrCXig==
+raw-body@2.5.2:
+ version "2.5.2"
+ resolved "https://registry.yarnpkg.com/raw-body/-/raw-body-2.5.2.tgz#99febd83b90e08975087e8f1f9419a149366b68a"
+ integrity sha512-8zGqypfENjCIqGhgXToC8aB2r7YrBX+AQAfIPs/Mlk+BtPTztOvTS01NRW/3Eh60J+a48lt8qsCzirQ6loCVfA==
dependencies:
bytes "3.1.2"
http-errors "2.0.0"
@@ -6522,19 +6522,19 @@ semver-diff@^3.1.1:
semver "^6.3.0"
semver@^5.4.1:
- version "5.7.1"
- resolved "https://registry.yarnpkg.com/semver/-/semver-5.7.1.tgz#a954f931aeba508d307bbf069eff0c01c96116f7"
- integrity sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==
+ version "5.7.2"
+ resolved "https://registry.yarnpkg.com/semver/-/semver-5.7.2.tgz#48d55db737c3287cd4835e17fa13feace1c41ef8"
+ integrity sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==
semver@^6.0.0, semver@^6.1.1, semver@^6.1.2, semver@^6.2.0, semver@^6.3.0:
- version "6.3.0"
- resolved "https://registry.yarnpkg.com/semver/-/semver-6.3.0.tgz#ee0a64c8af5e8ceea67687b133761e1becbd1d3d"
- integrity sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==
+ version "6.3.1"
+ resolved "https://registry.yarnpkg.com/semver/-/semver-6.3.1.tgz#556d2ef8689146e46dcea4bfdd095f3434dffcb4"
+ integrity sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==
semver@^7.3.2, semver@^7.3.4, semver@^7.3.7, semver@^7.3.8:
- version "7.3.8"
- resolved "https://registry.yarnpkg.com/semver/-/semver-7.3.8.tgz#07a78feafb3f7b32347d725e33de7e2a2df67798"
- integrity sha512-NB1ctGL5rlHrPJtFDVIVzTyQylMLu9N9VICA6HSFJo8MCGVTMW6gfpicwKmmK/dAjTOrqu5l63JJOpDSrAis3A==
+ version "7.5.4"
+ resolved "https://registry.yarnpkg.com/semver/-/semver-7.5.4.tgz#483986ec4ed38e1c6c48c34894a9182dbff68a6e"
+ integrity sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==
dependencies:
lru-cache "^6.0.0"
@@ -7378,9 +7378,9 @@ webpack-bundle-analyzer@^4.5.0:
ws "^7.3.1"
webpack-dev-middleware@^5.3.1:
- version "5.3.3"
- resolved "https://registry.yarnpkg.com/webpack-dev-middleware/-/webpack-dev-middleware-5.3.3.tgz#efae67c2793908e7311f1d9b06f2a08dcc97e51f"
- integrity sha512-hj5CYrY0bZLB+eTO+x/j67Pkrquiy7kWepMHmUMoPsmcUaeEnQJqFzHJOyxgWlq746/wUuA64p9ta34Kyb01pA==
+ version "5.3.4"
+ resolved "https://registry.yarnpkg.com/webpack-dev-middleware/-/webpack-dev-middleware-5.3.4.tgz#eb7b39281cbce10e104eb2b8bf2b63fce49a3517"
+ integrity sha512-BVdTqhhs+0IfoeAf7EoH5WE+exCmqGerHfDM0IL096Px60Tq2Mn9MAbnaGUe6HiMa41KMCYF19gyzZmBcq/o4Q==
dependencies:
colorette "^2.0.10"
memfs "^3.4.3"
diff --git a/frontend/control-center/src/App.tsx b/frontend/control-center/src/App.tsx
index 294aabaae9..4788b33dbb 100644
--- a/frontend/control-center/src/App.tsx
+++ b/frontend/control-center/src/App.tsx
@@ -17,6 +17,8 @@ import {
STREAMS_ROUTE,
TOPICS_ROUTE,
SCHEMAS_ROUTE,
+ LLMS_ROUTE,
+ LLM_CONSUMERS_ROUTE,
} from './routes/routes';
import NotFound from './pages/NotFound';
import ConnectorsOutlet from './pages/Connectors/ConnectorsOutlet';
@@ -36,6 +38,8 @@ import {getAppExternalURL} from './services/getAppExternalURL';
import Streams from './pages/Streams';
import Topics from './pages/Topics';
import Schemas from './pages/Schemas';
+import LLMs from './pages/LLMs';
+import LLMConsumers from './pages/LLMConsumers';
const mapDispatchToProps = {
getClientConfig,
@@ -111,6 +115,8 @@ const App = (props: ConnectedProps) => {
} />
} />
+ } />
+ } />
} />
diff --git a/frontend/control-center/src/actions/streams/index.ts b/frontend/control-center/src/actions/streams/index.ts
index 3d47b61e03..2813ef3e8d 100644
--- a/frontend/control-center/src/actions/streams/index.ts
+++ b/frontend/control-center/src/actions/streams/index.ts
@@ -10,6 +10,7 @@ const SET_TOPIC_INFO = '@@metadata/SET_TOPIC_INFO';
const SET_TOPIC_SCHEMAS = '@@metadata/SET_TOPIC_SCHEMAS';
const SET_STREAMS = '@@metadata/SET_STREAMS';
const SET_SCHEMAS_INFO = '@@metadata/SET_SCHEMAS_INFO';
+const SET_SCHEMAS_VERSIONS = '@@metadata/SET_SCHEMAS_VERSIONS';
const SET_STREAM_INFO = '@@metadata/SET_STREAM_INFO';
const SET_LAST_MESSAGE = '@@metadata/SET_LAST_MESSAGRE';
@@ -68,14 +69,30 @@ export const getTopicInfo = (topicName: string) => async (dispatch: Dispatch async (dispatch: Dispatch) => {
- return getData('subjects').then(response => {
+ return getData('schemas.list').then(response => {
+ console.log(response);
dispatch(setTopicSchemasAction(response));
return Promise.resolve(true);
});
};
-export const getSchemaInfo = (topicName: string) => async (dispatch: Dispatch) => {
- return getData(`subjects/${topicName}/versions/latest`).then(response => {
+export const getSchemaVersions = (topicName: string) => async (dispatch: Dispatch) => {
+ return getData(`schemas.versions?topicName=${topicName}`).then(response => {
+ if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
+ return Promise.reject('404 Not Found');
+ } else {
+ dispatch(setCurrentSchemaVersionsAction({name: topicName, versions: response}));
+ }
+ return Promise.resolve(true);
+ });
+};
+
+export const getSchemaInfo = (topicName: string, version?: string) => async (dispatch: Dispatch) => {
+ let v = 'latest';
+ if (version) {
+ v = version;
+ }
+ return getData(`schemas.info?topicName=${topicName}&version=${v}`).then(response => {
if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
return Promise.reject('404 Not Found');
} else {
@@ -89,7 +106,7 @@ export const setSchemaSchema = (topicName: string, schema: string) => async () =
const body = {
schema: JSON.stringify({...JSON.parse(schema)}),
};
- return postData(`subjects/${topicName}/versions`, body).then(response => {
+ return postData(`schemas.update?topicName=${topicName}`, body).then(response => {
if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
return Promise.reject('404 Not Found');
}
@@ -103,7 +120,7 @@ export const createSchema = (topicName: string, schema: string) => async () => {
const body = {
schema: JSON.stringify({...JSON.parse(schema)}),
};
- return postData(`subjects/${topicName}/versions`, body)
+ return postData(`schemas.create?topicName=${topicName}`, body)
.then(response => {
if (response.id) return Promise.resolve(true);
if (response.message) return Promise.reject(response.message);
@@ -118,7 +135,7 @@ export const checkCompatibilityOfNewSchema = (topicName: string, schema: string,
const body = {
schema: JSON.stringify({...JSON.parse(schema)}),
};
- return postData(`compatibility/subjects/${topicName}/versions/${version}`, body)
+ return postData(`schemas.compatibility?topicName=${topicName}&version=${version}`, body)
.then(response => {
if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
return Promise.reject('404 Not Found');
@@ -138,7 +155,7 @@ export const checkCompatibilityOfNewSchema = (topicName: string, schema: string,
};
export const deleteSchema = (topicName: string) => async () => {
- return deleteData(`subjects/${topicName}`).then(response => {
+ return deleteData(`schemas.delete?topicName=${topicName}`).then(response => {
if (response.error_code && response.error_code.toString().includes('404') && !topicName.includes('-value')) {
return Promise.reject('404 Not Found');
}
@@ -147,11 +164,7 @@ export const deleteSchema = (topicName: string) => async () => {
};
export const getLastMessage = (topicName: string) => async (dispatch: Dispatch) => {
- const body = {
- ksql: `PRINT '${topicName}' FROM BEGINNING LIMIT 1;`,
- streamsProperties: {},
- };
- return postData('query', body).then(response => {
+ return getData(`schemas.lastMessage?topicName=${topicName}`).then(response => {
dispatch(setLastMessage(response));
return Promise.resolve(true);
});
@@ -177,11 +190,10 @@ async function postData(url: string, body: any) {
const response = await fetch(apiHostUrl + '/' + url, {
method: 'POST',
headers: {
- 'Content-Type': 'application/vnd.schemaregistry.v1+json',
+ 'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
-
return response.json();
}
@@ -197,6 +209,11 @@ export const setStreamsAction = createAction(SET_STREAMS, (streams: Stream[]) =>
export const setCurrentSchemaInfoAction = createAction(SET_SCHEMAS_INFO, (topicInfo: Schema) => topicInfo)();
+export const setCurrentSchemaVersionsAction = createAction(
+ SET_SCHEMAS_VERSIONS,
+ (topicInfo: {name: string; versions: []}) => topicInfo
+)<{name: string; versions: []}>();
+
export const setCurrentStreamInfoAction = createAction(
SET_STREAM_INFO,
(streamInfo: StreamInfo) => streamInfo
diff --git a/frontend/control-center/src/components/ChannelAvatar/index.tsx b/frontend/control-center/src/components/ChannelAvatar/index.tsx
index 7a26ed24fc..6894e2c241 100644
--- a/frontend/control-center/src/components/ChannelAvatar/index.tsx
+++ b/frontend/control-center/src/components/ChannelAvatar/index.tsx
@@ -19,6 +19,15 @@ import {ReactComponent as IbmWatsonAssistantAvatar} from 'assets/images/icons/ib
import {ReactComponent as RedisAvatar} from 'assets/images/icons/redisLogo.svg';
import {ReactComponent as PostgresAvatar} from 'assets/images/icons/postgresLogo.svg';
import {ReactComponent as FeastAvatar} from 'assets/images/icons/feastLogo.svg';
+import {ReactComponent as MetaAvatar} from 'assets/images/icons/meta.svg';
+import {ReactComponent as OpenaiAvatar} from 'assets/images/icons/openai.svg';
+import {ReactComponent as PineconeAvatar} from 'assets/images/icons/pinecone.svg';
+import {ReactComponent as ChromaAvatar} from 'assets/images/icons/chroma.svg';
+import {ReactComponent as MosaicAvatar} from 'assets/images/icons/mosaic.svg';
+import {ReactComponent as WeaviateAvatar} from 'assets/images/icons/weaviate.svg';
+import {ReactComponent as GmailAvatar} from 'assets/images/icons/gmail.svg';
+import {ReactComponent as SlackAvatar} from 'assets/images/icons/slack.svg';
+import {ReactComponent as FlinkAvatar} from 'assets/images/icons/flink.svg';
import {Channel, Source} from 'model';
import styles from './index.module.scss';
@@ -98,6 +107,37 @@ export const getChannelAvatar = (source: string) => {
case Source.feast:
case 'Feast':
return ;
+ case Source.faiss:
+ case 'FAISS':
+ return ;
+ case Source.faissConnector:
+ case 'FAISS connector':
+ return ;
+ case Source.llama2:
+ case 'LLama2':
+ return ;
+ case Source.openaiConnector:
+ case 'OpenAI connector':
+ return ;
+ case Source.pineconeConnector:
+ case 'Pinecone connector':
+ return ;
+ case Source.chroma:
+ case 'Chroma':
+ return ;
+ case Source.mosaic:
+ case 'Mosaic':
+ return ;
+ case Source.weaviate:
+ case 'Weaviate':
+ return ;
+ case Source.gmail:
+ case 'GMail connector':
+ return ;
+ case 'Slack connector':
+ return ;
+ case 'Flink connector':
+ return ;
default:
return ;
diff --git a/frontend/control-center/src/components/Sidebar/index.tsx b/frontend/control-center/src/components/Sidebar/index.tsx
index 043513f45f..95285c91e9 100644
--- a/frontend/control-center/src/components/Sidebar/index.tsx
+++ b/frontend/control-center/src/components/Sidebar/index.tsx
@@ -15,6 +15,8 @@ import {
STREAMS_ROUTE,
TOPICS_ROUTE,
SCHEMAS_ROUTE,
+ LLMS_ROUTE,
+ LLM_CONSUMERS_ROUTE,
} from '../../routes/routes';
import {ReactComponent as ConnectorsIcon} from 'assets/images/icons/gitMerge.svg';
@@ -31,18 +33,18 @@ type SideBarProps = {} & ConnectedProps;
const mapStateToProps = (state: StateModel) => ({
version: state.data.config.clusterVersion,
components: state.data.config.components,
+ connectors: state.data.catalog,
});
const connector = connect(mapStateToProps);
const Sidebar = (props: SideBarProps) => {
- const {version, components} = props;
+ const {version, components, connectors} = props;
const componentInfo = useCurrentComponentForSource(Source.airyWebhooks);
-
const webhooksEnabled = componentInfo.installationStatus === InstallationStatus.installed;
const inboxEnabled = components[Source.frontendInbox]?.enabled || false;
- const showLine = inboxEnabled || webhooksEnabled;
-
+ const llmsEnabled = connectors['llm-controller']?.installationStatus === 'installed' || false;
+ const showLine = inboxEnabled || webhooksEnabled || llmsEnabled;
const isActive = (route: string) => {
return useMatch(`${route}/*`);
};
@@ -51,6 +53,9 @@ const Sidebar = (props: SideBarProps) => {
const [kafkaSectionOpen, setKafkaSectionOpen] = useState(
href.includes(TOPICS_ROUTE) || href.includes(STREAMS_ROUTE)
);
+ const [llmSectionOpen, setLlmSectionOpen] = useState(
+ href.includes(LLMS_ROUTE) || href.includes(LLM_CONSUMERS_ROUTE)
+ );
return (
diff --git a/frontend/control-center/src/pages/Connectors/ConnectorWrapper/index.module.scss b/frontend/control-center/src/pages/Connectors/ConnectorWrapper/index.module.scss
index ae262da930..91129c4325 100644
--- a/frontend/control-center/src/pages/Connectors/ConnectorWrapper/index.module.scss
+++ b/frontend/control-center/src/pages/Connectors/ConnectorWrapper/index.module.scss
@@ -114,9 +114,12 @@
.connectorIcon {
display: flex;
width: 75px;
+ height: 75px;
margin-right: 15px;
svg {
+ width: 75px;
+ height: 75px;
fill: var(--color-text-contrast);
}
}
diff --git a/frontend/control-center/src/pages/LLMConsumers/EmptyState/index.module.scss b/frontend/control-center/src/pages/LLMConsumers/EmptyState/index.module.scss
new file mode 100644
index 0000000000..39684b4f2b
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMConsumers/EmptyState/index.module.scss
@@ -0,0 +1,52 @@
+@import 'assets/scss/colors.scss';
+@import 'assets/scss/fonts.scss';
+
+.container {
+ display: flex;
+ justify-content: center;
+ align-items: center;
+ width: 100%;
+ height: calc(100% - 88px);
+}
+
+.contentContainer {
+ display: flex;
+ flex-direction: column;
+ align-items: center;
+ justify-content: center;
+
+ h1 {
+ @include font-m;
+ font-weight: 800;
+ color: var(--color-text-contrast);
+ margin: 31px 0;
+ }
+
+ span {
+ @include font-base;
+ color: var(--color-text-gray);
+ }
+
+ .subscribeButton {
+ color: var(--color-airy-blue);
+ &:hover {
+ cursor: pointer;
+ text-decoration: underline;
+ }
+ }
+}
+
+.iconContainer {
+ display: flex;
+ justify-content: center;
+ align-items: center;
+ background: var(--color-background-gray);
+ height: 95px;
+ width: 105px;
+}
+
+.searchIcon {
+ height: 45px;
+ width: 45px;
+ color: var(--color-airy-blue);
+}
diff --git a/frontend/control-center/src/pages/LLMConsumers/EmptyState/index.tsx b/frontend/control-center/src/pages/LLMConsumers/EmptyState/index.tsx
new file mode 100644
index 0000000000..1a542500b8
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMConsumers/EmptyState/index.tsx
@@ -0,0 +1,30 @@
+import React, {Dispatch, SetStateAction} from 'react';
+import styles from './index.module.scss';
+import {ReactComponent as SearchIcon} from 'assets/images/icons/search.svg';
+import {useTranslation} from 'react-i18next';
+
+type EmptyStateProps = {
+ createNewLLM: Dispatch>;
+};
+
+export const EmptyState = (props: EmptyStateProps) => {
+ const {createNewLLM} = props;
+ const {t} = useTranslation();
+
+ return (
+
+
+
+
+
+
{t('noLLMConsumers')}
+
+ {t('noLLMConsumersText')}
+ createNewLLM(true)} className={styles.subscribeButton}>
+ {t('create') + ' one'}
+
+
+
+
+ );
+};
diff --git a/frontend/control-center/src/pages/LLMConsumers/LLMConsumerItem/index.module.scss b/frontend/control-center/src/pages/LLMConsumers/LLMConsumerItem/index.module.scss
new file mode 100644
index 0000000000..b4f9b0d97a
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMConsumers/LLMConsumerItem/index.module.scss
@@ -0,0 +1,47 @@
+@import 'assets/scss/fonts.scss';
+@import 'assets/scss/colors.scss';
+
+.container {
+ display: flex;
+ flex-direction: row;
+ height: 50px;
+ align-items: center;
+ justify-content: flex-start;
+
+ p {
+ @include font-base;
+ color: var(--color-text-contrast);
+ font-weight: bold;
+ width: 25%;
+ }
+
+ p:first-child {
+ width: 30%;
+ }
+
+ p:nth-child(4) {
+ width: 15%;
+ }
+}
+
+.actionButton {
+ width: 2%;
+ outline: none;
+ cursor: pointer;
+ border: none;
+ background: none;
+ padding: 0;
+}
+
+.actionSVG {
+ width: 16px;
+ height: 18px;
+ path {
+ fill: var(--color-dark-elements-gray);
+ }
+ &:hover {
+ path {
+ fill: var(--color-airy-blue);
+ }
+ }
+}
diff --git a/frontend/control-center/src/pages/LLMConsumers/LLMConsumerItem/index.tsx b/frontend/control-center/src/pages/LLMConsumers/LLMConsumerItem/index.tsx
new file mode 100644
index 0000000000..b59ce205bc
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMConsumers/LLMConsumerItem/index.tsx
@@ -0,0 +1,38 @@
+import React from 'react';
+import {ReactComponent as TrashIcon} from 'assets/images/icons/trash.svg';
+import {useTranslation} from 'react-i18next';
+import {HttpClientInstance} from '../../../httpClient';
+import styles from './index.module.scss';
+import {NotificationModel} from 'model';
+
+type EmptyStateProps = {
+ item: {name: string; topic: string; status: string; lag: number};
+ setNotification: (object: NotificationModel) => void;
+};
+
+export const LLMConsumerItem = (props: EmptyStateProps) => {
+ const {item, setNotification} = props;
+ const {t} = useTranslation();
+
+ const deleteConsumer = () => {
+ HttpClientInstance.deleteLLMConsumer({name: item.name})
+ .then(() => {
+ setNotification({show: true, successful: true, text: 'Consumer Deleted'});
+ })
+ .catch(() => {
+ setNotification({show: true, successful: false, text: t('errorOccurred')});
+ });
+ };
+
+ return (
+
+
{item.name}
+
{item.topic}
+
{item.status}
+
{item.lag}
+
+
+ );
+};
diff --git a/frontend/control-center/src/pages/LLMConsumers/index.module.scss b/frontend/control-center/src/pages/LLMConsumers/index.module.scss
new file mode 100644
index 0000000000..8eb971f0db
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMConsumers/index.module.scss
@@ -0,0 +1,136 @@
+@import 'assets/scss/fonts.scss';
+@import 'assets/scss/colors.scss';
+@import 'assets/scss/animations.scss';
+
+.llmsWrapper {
+ background: var(--color-background-white);
+ border-top-right-radius: 10px;
+ border-top-left-radius: 10px;
+ padding: 32px;
+ margin: 88px 1.5em 0 191px;
+ height: calc(100vh - 88px);
+ overflow-y: scroll;
+ overflow-x: hidden;
+ width: 100%;
+}
+
+.headlineContainer {
+ display: flex;
+ flex-direction: row;
+ justify-content: space-between;
+ width: 100%;
+}
+
+.llmsHeadline {
+ @include font-xl;
+ font-weight: 900;
+ letter-spacing: 0;
+ display: flex;
+ justify-content: space-between;
+ color: var(--color-text-contrast);
+ margin-bottom: 14px;
+}
+
+.llmsHeadlineText {
+ @include font-xl;
+ font-weight: 900;
+}
+
+.wrapper {
+ display: flex;
+ flex-direction: row;
+ flex-wrap: wrap;
+}
+
+.listHeader {
+ display: flex;
+ flex-direction: row;
+ height: 50px;
+ align-items: center;
+ justify-content: flex-start;
+
+ h2 {
+ @include font-base;
+ color: var(--color-text-gray);
+ font-weight: bold;
+ width: 25%;
+ }
+
+ h2:first-child {
+ width: 30%;
+ }
+
+ h2:last-child {
+ width: 10%;
+ }
+}
+
+.successfullySubscribed {
+ @include font-base;
+ color: white;
+}
+
+@keyframes translateYIn {
+ 0% {
+ transform: translateY(-50px);
+ opacity: 0;
+ }
+
+ 50% {
+ transform: translateY(16px);
+ opacity: 1;
+ }
+
+ 100% {
+ transform: translateY(-50px);
+ opacity: 0;
+ }
+}
+
+.translateYAnimIn {
+ animation: translateYIn 4s ease-in-out;
+}
+
+.animateIn {
+ animation: fadeInTranslateXLeft 3000ms ease;
+}
+
+.animateOut {
+ animation: fadeInTranslateXLeft 3000ms ease;
+}
+
+.llmCreateContainer {
+ display: flex;
+ flex-direction: column;
+ justify-content: space-between;
+ width: 100%;
+ margin-top: 32px;
+ button {
+ @include font-base;
+ font-weight: bold;
+ align-self: center;
+ color: white;
+ border-radius: 5px;
+ border: none;
+ padding: 8px 16px;
+ margin-top: 16px;
+ width: 60%;
+ cursor: pointer;
+ transition: all 0.2s ease-in-out;
+ &:hover {
+ background: var(--color-background-blue);
+ color: var(--color-text-contrast);
+ }
+ }
+ label {
+ margin-bottom: 12px;
+ }
+}
+
+.dropdownContainer {
+ button {
+ border: 1px solid gray;
+ width: 100%;
+ color: black;
+ }
+}
diff --git a/frontend/control-center/src/pages/LLMConsumers/index.tsx b/frontend/control-center/src/pages/LLMConsumers/index.tsx
new file mode 100644
index 0000000000..d4d86eed5d
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMConsumers/index.tsx
@@ -0,0 +1,216 @@
+import React, {useEffect, useState} from 'react';
+import {Dropdown, Input, NotificationComponent} from 'components';
+import {SettingsModal} from 'components/alerts/SettingsModal';
+import {Button} from 'components/cta/Button';
+import {useTranslation} from 'react-i18next';
+import {connect, ConnectedProps} from 'react-redux';
+import {setPageTitle} from '../../services/pageTitle';
+import {NotificationModel} from 'model';
+import {AiryLoader} from 'components/loaders/AiryLoader';
+import {EmptyState} from './EmptyState';
+import {HttpClientInstance} from '../../httpClient';
+import {LLMConsumerItem} from './LLMConsumerItem';
+import {getValidTopics} from '../../selectors';
+import {StateModel} from '../../reducers';
+import styles from './index.module.scss';
+import {getSchemaInfo, getSchemas} from '../../actions';
+
+type LLMConsumersProps = {} & ConnectedProps;
+
+const mapDispatchToProps = {
+ getSchemas,
+ getSchemaInfo,
+};
+
+const mapStateToProps = (state: StateModel) => {
+ return {
+ topics: getValidTopics(state),
+ schemas: state.data.streams.schemas,
+ };
+};
+
+const connector = connect(mapStateToProps, mapDispatchToProps);
+
+const LLMConsumers = (props: LLMConsumersProps) => {
+ const {topics, getSchemas} = props;
+
+ const [consumers, setConsumers] = useState([]);
+ const [notification, setNotification] = useState(null);
+ const [dataFetched, setDataFetched] = useState(false);
+ const [showSettingsModal, setShowSettingsModal] = useState(false);
+ const [name, setName] = useState('');
+ const [topic, setTopic] = useState('');
+ const [type, setType] = useState('');
+ const [textfield, setTextfield] = useState('');
+ const [metadataFields, setMetadataFields] = useState('');
+ const {t} = useTranslation();
+
+ useEffect(() => {
+ setPageTitle('LLM Consumers');
+ getSchemas();
+ }, []);
+
+ useEffect(() => {
+ HttpClientInstance.listLLMConsumers()
+ .then((response: any) => {
+ setConsumers(response);
+ setDataFetched(true);
+ })
+ .catch(() => {
+ handleNotification(true);
+ });
+ }, []);
+
+ const handleNotification = (show: boolean) => {
+ setNotification({show: show, successful: false, text: t('errorOccurred')});
+ };
+
+ const toggleCreateView = () => {
+ setShowSettingsModal(!showSettingsModal);
+ };
+
+ const createNewLLM = () => {
+ const metadataFieldsArray = metadataFields.replace(' ', '').split(',');
+ HttpClientInstance.createLLMConsumer({
+ name: name.trim(),
+ topic: topic.trim(),
+ textField: textfield.trim(),
+ metadataFields: metadataFieldsArray,
+ })
+ .then(() => {
+ setNotification({show: true, successful: true, text: t('llmConsumerCreatedSuccessfully')});
+ toggleCreateView();
+ setName('');
+ setTopic('');
+ setTextfield('');
+ setMetadataFields('');
+ setType('');
+ })
+ .catch(() => {
+ handleNotification(true);
+ });
+ };
+
+ return (
+ <>
+ {' '}
+ {showSettingsModal && (
+
+
+
+ )}
+
+
+
+
LLM Consumers
+
+
+
+
+
+ {consumers?.length === 0 && dataFetched ? (
+
+ ) : consumers?.length === 0 ? (
+
toggleCreateView()} />
+ ) : (
+ <>
+
+
Name
+ Topic
+ Status
+ Lag
+
+
+ {consumers &&
+ consumers.map((consumer: any) => (
+
+ ))}
+
+ {notification?.show && (
+
+ )}
+ >
+ )}
+
+ >
+ );
+};
+
+export default connector(LLMConsumers);
diff --git a/frontend/control-center/src/pages/LLMs/EmptyState/index.module.scss b/frontend/control-center/src/pages/LLMs/EmptyState/index.module.scss
new file mode 100644
index 0000000000..39684b4f2b
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMs/EmptyState/index.module.scss
@@ -0,0 +1,52 @@
+@import 'assets/scss/colors.scss';
+@import 'assets/scss/fonts.scss';
+
+.container {
+ display: flex;
+ justify-content: center;
+ align-items: center;
+ width: 100%;
+ height: calc(100% - 88px);
+}
+
+.contentContainer {
+ display: flex;
+ flex-direction: column;
+ align-items: center;
+ justify-content: center;
+
+ h1 {
+ @include font-m;
+ font-weight: 800;
+ color: var(--color-text-contrast);
+ margin: 31px 0;
+ }
+
+ span {
+ @include font-base;
+ color: var(--color-text-gray);
+ }
+
+ .subscribeButton {
+ color: var(--color-airy-blue);
+ &:hover {
+ cursor: pointer;
+ text-decoration: underline;
+ }
+ }
+}
+
+.iconContainer {
+ display: flex;
+ justify-content: center;
+ align-items: center;
+ background: var(--color-background-gray);
+ height: 95px;
+ width: 105px;
+}
+
+.searchIcon {
+ height: 45px;
+ width: 45px;
+ color: var(--color-airy-blue);
+}
diff --git a/frontend/control-center/src/pages/LLMs/EmptyState/index.tsx b/frontend/control-center/src/pages/LLMs/EmptyState/index.tsx
new file mode 100644
index 0000000000..2f1f7d7373
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMs/EmptyState/index.tsx
@@ -0,0 +1,30 @@
+import React, {Dispatch, SetStateAction} from 'react';
+import styles from './index.module.scss';
+import {ReactComponent as SearchIcon} from 'assets/images/icons/search.svg';
+import {useTranslation} from 'react-i18next';
+
+type EmptyStateProps = {
+ createNewLLM: Dispatch>;
+};
+
+export const EmptyState = (props: EmptyStateProps) => {
+ const {createNewLLM} = props;
+ const {t} = useTranslation();
+
+ return (
+
+
+
+
+
+
{t('noLLMs')}
+
+ {t('noLLMsText')}
+ createNewLLM(true)} className={styles.subscribeButton}>
+ {t('create') + ' one'}
+
+
+
+
+ );
+};
diff --git a/frontend/control-center/src/pages/LLMs/LLMInfoItem/index.module.scss b/frontend/control-center/src/pages/LLMs/LLMInfoItem/index.module.scss
new file mode 100644
index 0000000000..331d24a20e
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMs/LLMInfoItem/index.module.scss
@@ -0,0 +1,21 @@
+@import 'assets/scss/fonts.scss';
+@import 'assets/scss/colors.scss';
+
+.container {
+ display: flex;
+ flex-direction: row;
+ height: 50px;
+ align-items: center;
+ justify-content: flex-start;
+
+ p {
+ @include font-base;
+ color: var(--color-text-contrast);
+ font-weight: bold;
+ width: 25%;
+ }
+
+ p:first-child {
+ width: 30%;
+ }
+}
diff --git a/frontend/control-center/src/pages/LLMs/LLMInfoItem/index.tsx b/frontend/control-center/src/pages/LLMs/LLMInfoItem/index.tsx
new file mode 100644
index 0000000000..bcea927a86
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMs/LLMInfoItem/index.tsx
@@ -0,0 +1,18 @@
+import React from 'react';
+import styles from './index.module.scss';
+
+type EmptyStateProps = {
+ item: {llm: string; vectorDatabase: string; llmModel: string};
+};
+
+export const LLMInfoItem = (props: EmptyStateProps) => {
+ const {item} = props;
+
+ return (
+
+
{item.llm}
+
{item.vectorDatabase}
+
{item.llmModel}
+
+ );
+};
diff --git a/frontend/control-center/src/pages/LLMs/index.module.scss b/frontend/control-center/src/pages/LLMs/index.module.scss
new file mode 100644
index 0000000000..31195db2da
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMs/index.module.scss
@@ -0,0 +1,106 @@
+@import 'assets/scss/fonts.scss';
+@import 'assets/scss/colors.scss';
+@import 'assets/scss/animations.scss';
+
+.webhooksWrapper {
+ background: var(--color-background-white);
+ border-top-right-radius: 10px;
+ border-top-left-radius: 10px;
+ padding: 32px;
+ margin: 88px 1.5em 0 191px;
+ height: calc(100vh - 88px);
+ overflow-y: scroll;
+ overflow-x: hidden;
+ width: 100%;
+}
+
+.headlineContainer {
+ display: flex;
+ flex-direction: row;
+ justify-content: space-between;
+ width: 100%;
+}
+
+.webhooksHeadline {
+ @include font-xl;
+ font-weight: 900;
+ letter-spacing: 0;
+ display: flex;
+ justify-content: space-between;
+ color: var(--color-text-contrast);
+ margin-bottom: 14px;
+}
+
+.webhooksHeadlineText {
+ @include font-xl;
+ font-weight: 900;
+}
+
+.wrapper {
+ display: flex;
+ flex-direction: row;
+ flex-wrap: wrap;
+}
+
+.listHeader {
+ display: flex;
+ flex-direction: row;
+ height: 50px;
+ align-items: center;
+ justify-content: flex-start;
+
+ h2 {
+ @include font-base;
+ color: var(--color-text-gray);
+ font-weight: bold;
+ width: 25%;
+ }
+
+ h2:first-child {
+ width: 30%;
+ }
+
+ h2:last-child {
+ width: 10%;
+ }
+}
+
+.successfullySubscribed {
+ @include font-base;
+ color: white;
+}
+
+@keyframes translateYIn {
+ 0% {
+ transform: translateY(-50px);
+ opacity: 0;
+ }
+
+ 50% {
+ transform: translateY(16px);
+ opacity: 1;
+ }
+
+ 100% {
+ transform: translateY(-50px);
+ opacity: 0;
+ }
+}
+
+.translateYAnimIn {
+ animation: translateYIn 4s ease-in-out;
+}
+
+.animateIn {
+ animation: fadeInTranslateXLeft 3000ms ease;
+}
+
+.animateOut {
+ animation: fadeInTranslateXLeft 3000ms ease;
+}
+
+.embeddingsSection {
+ color: var(--color-text-contrast);
+ font-weight: bold;
+ margin-top: 64px;
+}
diff --git a/frontend/control-center/src/pages/LLMs/index.tsx b/frontend/control-center/src/pages/LLMs/index.tsx
new file mode 100644
index 0000000000..385cdd1cb4
--- /dev/null
+++ b/frontend/control-center/src/pages/LLMs/index.tsx
@@ -0,0 +1,92 @@
+import React, {useEffect, useState} from 'react';
+import {NotificationComponent} from 'components';
+import {useTranslation} from 'react-i18next';
+import {connect} from 'react-redux';
+import {setPageTitle} from '../../services/pageTitle';
+import {NotificationModel} from 'model';
+import {AiryLoader} from 'components/loaders/AiryLoader';
+import styles from './index.module.scss';
+import {EmptyState} from './EmptyState';
+import {HttpClientInstance} from '../../httpClient';
+import {LLMSStatsPayload} from 'httpclient/src';
+import {LLMInfoItem} from './LLMInfoItem';
+
+const mapDispatchToProps = {};
+
+const connector = connect(null, mapDispatchToProps);
+
+const LLMs = () => {
+ const [llms, setLlms] = useState([]);
+ const [embeddings, setEmbeddings] = useState(0);
+ const [notification, setNotification] = useState(null);
+ const [dataFetched, setDataFetched] = useState(false);
+ const {t} = useTranslation();
+
+ useEffect(() => {
+ setPageTitle('LLMs');
+ }, []);
+
+ useEffect(() => {
+ HttpClientInstance.getLLMInfo()
+ .then((response: any) => {
+ setLlms(response);
+ setDataFetched(true);
+ })
+ .catch(() => {
+ handleNotification(true);
+ });
+ HttpClientInstance.getLLMStats()
+ .then((response: LLMSStatsPayload) => {
+ setEmbeddings(response.embeddings);
+ })
+ .catch(() => {
+ handleNotification(true);
+ });
+ }, []);
+
+ const handleNotification = (show: boolean) => {
+ setNotification({show: show, successful: false, text: t('errorOccurred')});
+ };
+
+ const createNewLLM = () => {
+ console.log('create new LLM');
+ };
+
+ return (
+ <>
+
+
+ {llms?.length === 0 && dataFetched ? (
+
+ ) : llms?.length === 0 ? (
+
createNewLLM()} />
+ ) : (
+ <>
+
+
LLM Provider
+ Vector Database
+ Model
+
+ {llms && llms.map((llm: any) => )}
+ Embeddings: {embeddings}
+ {notification?.show && (
+
+ )}
+ >
+ )}
+
+ >
+ );
+};
+
+export default connector(LLMs);
diff --git a/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/EnrichedSchemaSection/index.tsx b/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/EnrichedSchemaSection/index.tsx
new file mode 100644
index 0000000000..7811f24b5c
--- /dev/null
+++ b/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/EnrichedSchemaSection/index.tsx
@@ -0,0 +1,296 @@
+import React, {MutableRefObject, useEffect, useRef, useState} from 'react';
+import MonacoEditor from '@uiw/react-monacoeditor';
+import {calculateHeightOfCodeString, isJSON} from '../../../../../services';
+import {HttpClientInstance} from '../../../../../httpClient';
+import styles from '../index.module.scss';
+import {Button} from 'components';
+import {ConnectedProps, connect} from 'react-redux';
+import {checkCompatibilityOfNewSchema, setSchemaSchema} from '../../../../../actions';
+import {useTranslation} from 'react-i18next';
+
+type EnrichedSchemaSectionProps = {
+ schemaName: string;
+ code: string;
+ setCode: (code: string) => void;
+ setFirstTabSelected: (flag: boolean) => void;
+ editorMode: string;
+ wrapperSection: MutableRefObject;
+ isEditMode: boolean;
+ setIsEditMode: (flag: boolean) => void;
+ setErrorMessage: (error: string) => void;
+ setShowErrorPopUp: (flag: boolean) => void;
+ version: number;
+} & ConnectedProps;
+
+const mapDispatchToProps = {
+ setSchemaSchema,
+ checkCompatibilityOfNewSchema,
+};
+
+const connector = connect(null, mapDispatchToProps);
+
+const EnrichedSchemaSection = (props: EnrichedSchemaSectionProps) => {
+ const {
+ schemaName,
+ code,
+ setCode,
+ setFirstTabSelected,
+ editorMode,
+ wrapperSection,
+ setSchemaSchema,
+ isEditMode,
+ setIsEditMode,
+ checkCompatibilityOfNewSchema,
+ setErrorMessage,
+ setShowErrorPopUp,
+ version,
+ } = props;
+
+ const [localCode, setLocalCode] = useState(undefined);
+ const [hasBeenChanged, setHasBeenChanged] = useState(false);
+ const codeRef = useRef(null);
+ const {t} = useTranslation();
+
+ useEffect(() => {
+ if (isEnrichmentAvailable(code)) {
+ setTimeout(() => {
+ const enriched = localStorage.getItem(schemaName);
+ if (enriched) {
+ setLocalCode(enriched);
+ recalculateContainerHeight(enriched);
+ }
+ }, 100);
+ enrichCode(code);
+ } else {
+ wrapperSection.current.style.height = '156px';
+ }
+ }, [code]);
+
+ const resetCodeAndEndEdition = () => {
+ setIsEditMode(false);
+ setFirstTabSelected(true);
+ };
+
+ const recalculateContainerHeight = (code: string) => {
+ const basicHeight = 220;
+ if (wrapperSection && wrapperSection.current) {
+ wrapperSection.current.style.height = `${calculateHeightOfCodeString(code) + basicHeight}px`;
+ } else {
+ wrapperSection.current.style.height = `${basicHeight}px`;
+ }
+ if ((wrapperSection.current.style.height.replace('px', '') as number) > 700) {
+ wrapperSection.current.style.height = '700px';
+ }
+ };
+
+ const recalculateCodeHeight = (code: string) => {
+ const codeHeight = calculateHeightOfCodeString(code);
+ if (codeHeight > 478) {
+ return 478;
+ }
+ return codeHeight;
+ };
+
+ const isEnrichmentAvailable = (code: string): boolean => {
+ let needsEnrichment = false;
+ const parsedCode = JSON.parse(code);
+ (parsedCode.fields || []).map(field => {
+ if (typeof field.type === 'object' && !Array.isArray(field.type)) {
+ if (!field.type.doc) {
+ needsEnrichment = true;
+ }
+ } else if (!field.doc) {
+ needsEnrichment = true;
+ }
+ });
+ return needsEnrichment;
+ };
+
+ const enrichCode = async (code: string) => {
+ let enrichedSchema = localStorage.getItem(schemaName);
+
+ if (!enrichedSchema) {
+ const enrichedCode = JSON.parse(code);
+
+ // Use map to create an array of promises
+ const promises = (enrichedCode.fields || []).map(async field => {
+ console.log(typeof field.type);
+ if (typeof field.type === 'object' && !Array.isArray(field.type)) {
+ if (!field.type.doc) {
+ const doc = await generateDocForField(field);
+ field.type.doc = doc;
+ }
+ } else if (!field.doc) {
+ const doc = await generateDocForField(field);
+ field.doc = doc;
+ }
+ });
+
+ // Wait for all promises to resolve
+ await Promise.all(promises);
+
+ enrichedSchema = JSON.stringify(enrichedCode, null, 2);
+ localStorage.setItem(schemaName, enrichedSchema);
+ }
+
+ setLocalCode(enrichedSchema);
+ recalculateContainerHeight(enrichedSchema);
+ };
+
+ const saveEnrichedSchema = () => {
+ setSchemaSchema(schemaName, JSON.stringify(localCode, null, 2));
+ };
+
+ const checkCompatibility = (_schemaName: string, _code: string, _version: number) => {
+ checkCompatibilityOfNewSchema(_schemaName, _code, _version)
+ .then(() => {
+ setSchemaSchema(_schemaName, _code)
+ .then(() => {
+ setCode(localCode);
+ setHasBeenChanged(false);
+ })
+ .catch((e: string) => {
+ setIsEditMode(true);
+ setErrorMessage(e);
+ setShowErrorPopUp(true);
+ setTimeout(() => setShowErrorPopUp(false), 5000);
+ });
+ })
+ .catch((e: string) => {
+ if (e.includes('404')) {
+ checkCompatibility(_schemaName + '-value', _code, _version);
+ } else {
+ setIsEditMode(true);
+ setErrorMessage(e);
+ setShowErrorPopUp(true);
+ setTimeout(() => setShowErrorPopUp(false), 5000);
+ }
+ });
+ };
+
+ const generateDocForField = async (field: any): Promise => {
+ try {
+ const response = await HttpClientInstance.llmQuery({
+ query: `This is the payload of a metadata field of a Kafka Schema ${JSON.stringify(
+ field
+ )}. A the name of the schema is ${schemaName}. This is the whole schema: ${code}. Give an accurante description of the field, so the users can understand what it is and what it is used for.`,
+ });
+ return response.answer.result;
+ } catch (error) {
+ console.error('Error in generateDocForField:', error);
+ return '';
+ }
+ };
+
+ return (
+ <>
+
+
+
+
+
+
+ {isEnrichmentAvailable(code) ? (
+ <>
+
+
+
+ This schema can be automatically enriched with documentation and saved as a new version as follows.
+
+
+
+
+
New schema:
+
+
+ {hasBeenChanged && (
+
+ )}
+
+
+
+ {localCode && localCode !== '{}' && (
+
+ {
+ if (value !== code) {
+ setHasBeenChanged(true);
+ } else {
+ setHasBeenChanged(false);
+ }
+ }}
+ onBlur={() => {
+ setLocalCode(codeRef.current.editor.getModel().getValue());
+ }}
+ options={{
+ scrollBeyondLastLine: isEditMode,
+ readOnly: !isEditMode,
+ theme: editorMode,
+ }}
+ />
+
+ )}
+ >
+ ) : (
+
+
+
This schema has been enriched already with documentation.
+
+
+ )}
+ >
+ );
+};
+
+export default connector(EnrichedSchemaSection);
diff --git a/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaDescription.tsx b/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaDescription.tsx
index 673d8dda08..36ff6cf9ad 100644
--- a/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaDescription.tsx
+++ b/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaDescription.tsx
@@ -1,14 +1,14 @@
import React, {MutableRefObject, useEffect, useState} from 'react';
-import {getSchemaInfo} from '../../../../actions';
+import {getSchemaInfo, getSchemaVersions} from '../../../../actions';
import {connect, ConnectedProps} from 'react-redux';
import {ErrorPopUp} from 'components';
-import {calculateHeightOfCodeString} from '../../../../services';
import SchemaSection from './SchemaSection';
import styles from './index.module.scss';
-import {MessageSection, lastMessageMock} from './MessageSection';
+import EnrichedSchemaSection from './EnrichedSchemaSection';
const mapDispatchToProps = {
getSchemaInfo,
+ getSchemaVersions,
};
const connector = connect(null, mapDispatchToProps);
@@ -19,15 +19,17 @@ type SchemaDescriptionProps = {
setCode: (code: string) => void;
wrapperSection: MutableRefObject;
version: number;
+ versions: string[];
} & ConnectedProps;
const SchemaDescription = (props: SchemaDescriptionProps) => {
- const {schemaName, code, setCode, getSchemaInfo, wrapperSection, version} = props;
+ const {schemaName, code, setCode, getSchemaInfo, getSchemaVersions, wrapperSection, version, versions} = props;
useEffect(() => {
getSchemaInfo(schemaName).catch(() => {
getSchemaInfo(schemaName + '-value');
});
+ getSchemaVersions(schemaName);
}, []);
useEffect(() => {
@@ -43,26 +45,14 @@ const SchemaDescription = (props: SchemaDescriptionProps) => {
const [errorMessage, setErrorMessage] = useState('');
const [editorMode, setEditorMode] = useState(localStorage.getItem('theme') === 'dark' ? 'vs-dark' : 'vs');
- useEffect(() => {
- if (firstTabSelected) {
- recalculateContainerHeight(code);
- } else {
- recalculateContainerHeight(lastMessageMock);
- }
- }, [firstTabSelected, code]);
-
const setNewSchemaCode = (text: string) => {
setCode(text);
};
- const recalculateContainerHeight = (code: string) => {
- const basicHeight = 50;
- const headerHeight = 32;
- if (wrapperSection && wrapperSection.current) {
- wrapperSection.current.style.height = `${calculateHeightOfCodeString(code) + headerHeight + basicHeight}px`;
- } else {
- wrapperSection.current.style.height = `${basicHeight}px`;
- }
+ const loadSchemaVersion = (version: string) => {
+ getSchemaInfo(schemaName, version).catch(() => {
+ getSchemaInfo(schemaName + '-value', version);
+ });
};
return (
@@ -76,17 +66,26 @@ const SchemaDescription = (props: SchemaDescriptionProps) => {
setIsEditMode={setIsEditMode}
setFirstTabSelected={setFirstTabSelected}
editorMode={editorMode}
- recalculateContainerHeight={recalculateContainerHeight}
+ wrapperSection={wrapperSection}
setErrorMessage={setErrorMessage}
setShowErrorPopUp={setShowErrorPopUp}
version={version}
+ versions={versions}
+ loadSchemaVersion={loadSchemaVersion}
/>
) : (
-
)}
{showErrorPopUp && setShowErrorPopUp(false)} />}
diff --git a/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaSection/index.tsx b/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaSection/index.tsx
index 701b6f136c..fad1bef22d 100644
--- a/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaSection/index.tsx
+++ b/frontend/control-center/src/pages/Schemas/SchemaItem/SchemaDescription/SchemaSection/index.tsx
@@ -1,11 +1,11 @@
-import React, {useEffect, useRef, useState} from 'react';
+import React, {MutableRefObject, useEffect, useRef, useState} from 'react';
import MonacoEditor from '@uiw/react-monacoeditor';
import {calculateHeightOfCodeString, isJSON} from '../../../../../services';
import {useTranslation} from 'react-i18next';
-import {Button} from 'components';
-import styles from '../index.module.scss';
+import {Button, Dropdown} from 'components';
import {checkCompatibilityOfNewSchema, setSchemaSchema} from '../../../../../actions';
import {ConnectedProps, connect} from 'react-redux';
+import styles from '../index.module.scss';
const mapDispatchToProps = {
setSchemaSchema,
@@ -22,10 +22,12 @@ type SchemaSectionProps = {
setIsEditMode: (flag: boolean) => void;
setFirstTabSelected: (flag: boolean) => void;
editorMode: string;
- recalculateContainerHeight: (text: string) => void;
+ wrapperSection: MutableRefObject;
setErrorMessage: (error: string) => void;
setShowErrorPopUp: (flag: boolean) => void;
version: number;
+ loadSchemaVersion: (version: string) => void;
+ versions: string[];
} & ConnectedProps;
const SchemaSection = (props: SchemaSectionProps) => {
@@ -37,12 +39,14 @@ const SchemaSection = (props: SchemaSectionProps) => {
setIsEditMode,
setFirstTabSelected,
editorMode,
- recalculateContainerHeight,
+ wrapperSection,
checkCompatibilityOfNewSchema,
setSchemaSchema,
setErrorMessage,
setShowErrorPopUp,
version,
+ loadSchemaVersion,
+ versions,
} = props;
const [localCode, setLocalCode] = useState(code);
@@ -60,6 +64,51 @@ const SchemaSection = (props: SchemaSectionProps) => {
setIsEditMode(!isEditMode);
};
+ const recalculateContainerHeight = (code: string) => {
+ const basicHeight = 220;
+ if (wrapperSection && wrapperSection.current) {
+ wrapperSection.current.style.height = `${calculateHeightOfCodeString(code) + basicHeight}px`;
+ } else {
+ wrapperSection.current.style.height = `${basicHeight}px`;
+ }
+ if (!isEnrichmentAvailable(code)) {
+ if ((wrapperSection.current.style.height.replace('px', '') as number) > 600) {
+ wrapperSection.current.style.height = '600px';
+ }
+ } else {
+ if ((wrapperSection.current.style.height.replace('px', '') as number) > 700) {
+ wrapperSection.current.style.height = '700px';
+ }
+ }
+ };
+
+ const recalculateCodeHeight = (code: string) => {
+ const codeHeight = calculateHeightOfCodeString(code);
+ let height = 478;
+ if (!isEnrichmentAvailable(code)) {
+ height = 510;
+ }
+ if (codeHeight > height) {
+ return height;
+ }
+ return codeHeight;
+ };
+
+ const isEnrichmentAvailable = (code: string): boolean => {
+ let needsEnrichment = false;
+ const parsedCode = JSON.parse(code);
+ (parsedCode.fields || []).map(field => {
+ if (typeof field.type === 'object' && !Array.isArray(field.type)) {
+ if (!field.type.doc) {
+ needsEnrichment = true;
+ }
+ } else if (!field.doc) {
+ needsEnrichment = true;
+ }
+ });
+ return needsEnrichment;
+ };
+
const checkCompatibility = (_schemaName: string, _code: string, _version: number) => {
checkCompatibilityOfNewSchema(_schemaName, _code, _version)
.then(() => {
@@ -99,14 +148,17 @@ const SchemaSection = (props: SchemaSectionProps) => {
>
Schema
- {/*