Skip to content

Commit

Permalink
Update kafka-connector chart for async invocations with backpressure
Browse files Browse the repository at this point in the history
Signed-off-by: Han Verstraete (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
welteki committed Jan 9, 2025
1 parent a382352 commit 754f128
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 36 deletions.
73 changes: 38 additions & 35 deletions chart/kafka-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ $ helm repo add openfaas https://openfaas.github.io/faas-netes/

Prepare a custom [values.yaml](values.yaml) with:

* brokerHosts - comma separted list of host:port
* topics - the topics to subscribe to
* replicas - this should match the partition size, so if the size is 3, set this to 3
- brokerHosts - comma separted list of host:port
- topics - the topics to subscribe to
- replicas - this should match the partition size, so if the size is 3, set this to 3

Then you will need to read up on the encryption and authentication options and update the settings accordingly.

Expand All @@ -65,46 +65,49 @@ $ helm repo update && \
## Encryption options

1) TLS off (default)
2) TLS on
1. TLS off (default)
2. TLS on

## Authentication options

1) TLS with SASL using CA from the default trust store
3) TLS with SASL using a custom CA
4) TLS with client certificates
1. TLS with SASL using CA from the default trust store
2. TLS with SASL using a custom CA
3. TLS with client certificates

## Configuration

Additional kafka-connector options in `values.yaml`.

| Parameter | Description | Default |
|------------------------|------------------------------------------------------------------------------------------------------------------------------------|--------------------------------|
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
| `asyncInvocation` | For long running or slow functions, offload to asychronous function invocations and carry on processing the stream | `false` |
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
| `printResponse` | Output the response of calling a function in the logs. | `true` |
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `` |
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
| `sasl` | Enable auth with a SASL username/password | `false` |
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
| `group` | Set the Kafka consumer group name. | `""` |
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
| `initialOffset` | Either newest or oldest. | `"oldest"` |
| `logs.debug` | Print debug logs | `false` |
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |
| Parameter | Description | Default |
| ---------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------ |
| `topics` | A single topic or list of comma separated topics to consume. | `faas-request` |
| `replicas` | The number of replicas of this connector, should be set to the size of the partition for the given topic, or a higher lower value. | `1` |
| `brokerHosts` | Host and port for the Kafka bootstrap server, multiple servers can be specified as a comma-separated list. | `kafka:9092` |
| `asyncInvocation` | Invoke function asychronously and carry on processing the stream | `false` |
| `asyncMaxInflight` | Limit the number of inflight async invocations for the connector. A value of 0 indicates no concurrency limit. | `0` |
| `asyncCallbackURL` | Override the callback url passed as the X-Callback-URl header for async invocations to notify the connector an async invocation has completed. | `""` |
| `natsURL` | URL used to connect to nats to count the async concurrency. Override this value if you are using external nats. | `nats://nats.openfaas:4222` |
| `upstreamTimeout` | Maximum timeout for upstream function call, must be a Go formatted duration string. | `2m` |
| `rebuildInterval` | Interval for rebuilding function to topic map, must be a Go formatted duration string. | `30s` |
| `gatewayURL` | The URL for the API gateway. | `http://gateway.openfaas:8080` |
| `printResponse` | Output the response of calling a function in the logs. | `true` |
| `printResponseBody` | Output to the logs the response body when calling a function. | `false` |
| `printRequestBody` | Output to the logs the request body when calling a function. | `false` |
| `fullnameOverride` | Override the name value used for the Connector Deployment object. | `` |
| `tls` | Connect to the broker server(s) using TLS encryption | `true` |
| `sasl` | Enable auth with a SASL username/password | `false` |
| `brokerPasswordSecret` | Name of secret for SASL password | `kafka-broker-password` |
| `brokerUsernameSecret` | Name of secret for SASL username | `kafka-broker-username` |
| `caSecret` | Name secret for TLS CA - leave empty to disable | `kafka-broker-ca` |
| `certSecret` | Name secret for TLS client certificate cert - leave empty to disable | `kafka-broker-cert` |
| `keySecret` | Name secret for TLS client certificate private key - leave empty to disable | `kafka-broker-key` |
| `contentType` | Set a HTTP Content Type during function invocation. | `""` |
| `group` | Set the Kafka consumer group name. | `""` |
| `maxBytes` | Set the maximum size of messages from the Kafka broker. | `1024*1024` |
| `sessionLogging` | Enable detailed logging from the consumer group. | `"false"` |
| `initialOffset` | Either newest or oldest. | `"oldest"` |
| `logs.debug` | Print debug logs | `false` |
| `logs.format` | The log encoding format. Supported values: `json` or `console` | `console` |

Specify each parameter using the `--set key=value[,key=value]` argument to `helm install`. See `values.yaml` for the default configuration.

Expand Down
17 changes: 17 additions & 0 deletions chart/kafka-connector/templates/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ spec:
app: {{ template "connector.name" . }}
component: kafka-connector
spec:
serviceAccountName: {{ template "connector.fullname" . }}
volumes:
- name: openfaas-license
secret:
Expand Down Expand Up @@ -87,6 +88,12 @@ spec:
- "-key-file=/var/secrets/broker-key/broker-key"
{{- end }}
env:
- name: connector_id
value: "{{template "connector.fullname" . }}"
- name: namespace
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: gateway_url
value: {{ .Values.gatewayURL | quote }}
- name: topics
Expand All @@ -99,6 +106,16 @@ spec:
value: {{ .Values.printRequestBody | quote }}
- name: asynchronous_invocation
value: {{ .Values.asyncInvocation | quote }}
- name: async_max_inflight
value: {{ .Values.asyncMaxInflight | quote }}
- name: async_callback_url
{{- if .Values.asyncCallbackURL }}
value: {{ .Values.asyncCallbackURL | quote }}
{{- else}}
value: "http://{{ template "connector.fullname" . }}.{{ .Release.Namespace }}:8080/api/v1/callback"
{{- end}}
- name: nats_url
value: {{ .Values.natsURL | quote }}
{{- if .Values.basic_auth }}
- name: basic_auth
value: "true"
Expand Down
38 changes: 38 additions & 0 deletions chart/kafka-connector/templates/rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.fullname" . }}
component: kafka-connector
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.name" . }}
component: kafka-connector
rules:
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "create"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
app: {{ template "connector.fullname" . }}
component: kafka-connector
subjects:
- kind: ServiceAccount
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
roleRef:
kind: Role
name: {{ template "connector.fullname" . }}
apiGroup: rbac.authorization.k8s.io
20 changes: 20 additions & 0 deletions chart/kafka-connector/templates/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: v1
kind: Service
metadata:
labels:
app: {{ template "connector.name" . }}
component: kafka-connector
chart: {{ .Chart.Name }}-{{ .Chart.Version }}
heritage: {{ .Release.Service }}
release: {{ .Release.Name }}
name: {{ template "connector.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
spec:
type: ClusterIP
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080
selector:
app: kafka-connector
14 changes: 13 additions & 1 deletion chart/kafka-connector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,21 @@ upstreamTimeout: 2m
# interval for rebuilding the map of functions and topics
rebuildInterval: 30s

# Use with slow consumers or long running functions
# Invoke functions asynchronously.
asyncInvocation: false

# Limit the number of inflight async invocations for the connector.
# A value of 0 indicates no concurrency limit.
asyncMaxInflight: 0

# Override the callback url passed as the X-Callback-URl header for async invocations
# to notify the connector an async invocation has completed.
asyncCallbackURL: ""

# URL used to connect to nats to count the async concurrency.
# Override this value if you are using external nats.
natsURL: "nats://nats.openfaas:4222"

# 1MB = 1024 bytes * 1024
maxBytes: "1048576"

Expand Down

0 comments on commit 754f128

Please sign in to comment.