Skip to content

Latest commit

 

History

History
 
 

kafka-to-elasticsearch

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Streaming data from Kafka to Elasticsearch using Kafka Connect Elasticsearch Sink

kafka to elasticsearch cover

🎥 Check out the video tutorial here: https://rmoff.dev/kafka-elasticsearch-video


This demo uses Docker and Docker Compose to provision the stack, but all you actually need for getting data from Kafka to Elasticsearch is Apache Kafka and the Kafka Connect Elasticsearch Sink connector. It also uses ksqlDB as an easy interface for producing/consuming from Kafka topics, and creating Kafka Connect connectors - but you don’t have to use it in order to use Kafka Connect.

Getting started

  1. Bring the Docker Compose up

    docker-compose up -d
  2. Make sure everything is up and running

    ➜ docker-compose ps
         Name                    Command                  State                    Ports
    --------------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up             0.0.0.0:9092->9092/tcp
    elasticsearch     /usr/local/bin/docker-entr ...   Up             0.0.0.0:9200->9200/tcp, 9300/tcp
    kafka-connect     bash -c echo "Installing c ...   Up (healthy)   0.0.0.0:8083->8083/tcp, 9092/tcp
    kafkacat          /bin/sh -c apk add jq;           Up
                      wh ...
    kibana            /usr/local/bin/dumb-init - ...   Up             0.0.0.0:5601->5601/tcp
    ksqldb            /usr/bin/docker/run              Up             0.0.0.0:8088->8088/tcp
    schema-registry   /etc/confluent/docker/run        Up             0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up             2181/tcp, 2888/tcp, 3888/tcp

    Wait for ksqlDB and Kafka Connect

    echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
      echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    done
    echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! \n--------------\n"
    
    docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'

Basics

  1. Create test topic + data using ksqlDB (but it’s still just a Kafka topic under the covers)

    docker exec -it ksqldb ksql http://ksqldb:8088
    CREATE STREAM TEST01 (ALPHA VARCHAR KEY,COL1 INT, COL2 VARCHAR)
      WITH (KAFKA_TOPIC='test01', PARTITIONS=1, FORMAT='AVRO');
    INSERT INTO TEST01 (ALPHA, COL1, COL2) VALUES ('X',1,'FOO');
    INSERT INTO TEST01 (ALPHA, COL1, COL2) VALUES ('Y',2,'BAR');
    SHOW TOPICS;
    PRINT test01 FROM BEGINNING LIMIT 2;
     Kafka Topic                           | Partitions | Partition Replicas
    -------------------------------------------------------------------------
     confluent_rmoff_01ksql_processing_log | 1          | 1
     test01                                | 1          | 1
    -------------------------------------------------------------------------
    Key format: AVRO or KAFKA_STRING
    Value format: AVRO or KAFKA_STRING
    rowtime: 2021/02/18 15:38:38.411 Z, key: X, value: {"COL1": 1, "COL2": "FOO"}, partition: 0
    rowtime: 2021/02/18 15:38:38.482 Z, key: Y, value: {"COL1": 2, "COL2": "BAR"}, partition: 0
    Topic printing ceased
    ksql>
  2. Stream the data to Elasticsearch with Kafka Connect

    I’m using ksqlDB to create the connector but you can use the Kafka Connect REST API directly if you want to. Kafka Connect is part of Apache Kafka and you don’t have to use ksqlDB to use Kafka Connect.

    CREATE SINK CONNECTOR SINK_ELASTIC_TEST_01 WITH (
      'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
      'connection.url'                      = 'http://elasticsearch:9200',
      'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
      'value.converter.schema.registry.url' = 'http://schema-registry:8081',
      'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
      'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
      'type.name'                           = '_doc',
      'topics'                              = 'test01',
      'key.ignore'                          = 'true',
      'schema.ignore'                       = 'false'
    );
  3. Check the data in Elasticsearch

    curl -s http://localhost:9200/test01/_search \
        -H 'content-type: application/json' \
        -d '{ "size": 42  }' | jq -c '.hits.hits[]'
    {"_index":"test01","_type":"_doc","_id":"test01+0+1","_score":1,"_source":{"COL1":2,"COL2":"BAR"}}
    {"_index":"test01","_type":"_doc","_id":"test01+0+0","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}

    Check the mapping

    curl -s http://localhost:9200/test01/_mapping | jq '.'
    {
      "test01": {
        "mappings": {
          "properties": {
            "COL1": {
              "type": "integer"
            },
            "COL2": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            }
          }
        }
      }
    }

Key handling

Updating documents in Elasticsearch

  1. But where did our ALPHA key column go? And what happens if we insert new data against the same key and a new one?

    -- New key ('Z')
    INSERT INTO TEST01 (ALPHA, COL1, COL2) VALUES ('Z',1,'WOO');
    -- New value for existing key ('Y')
    INSERT INTO TEST01 (ALPHA, COL1, COL2) VALUES ('Y',4,'PFF');

    Elasticsearch:

    curl -s http://localhost:9200/test01/_search \
        -H 'content-type: application/json' \
        -d '{ "size": 42  }' | jq -c '.hits.hits[]'
    {"_index":"test01","_type":"_doc","_id":"test01+0+1","_score":1,"_source":{"COL1":2,"COL2":"BAR"}}
    {"_index":"test01","_type":"_doc","_id":"test01+0+0","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}
    {"_index":"test01","_type":"_doc","_id":"test01+0+3","_score":1,"_source":{"COL1":4,"COL2":"PFF"}}
    {"_index":"test01","_type":"_doc","_id":"test01+0+2","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}

    Note that the _id is made up of <topic><partition><offset>, which we can prove with kafkacat:

    docker exec kafkacat kafkacat \
            -b broker:29092 \
            -r http://schema-registry:8081 -s avro \
            -C -o beginning -e -q \
            -t test01 \
            -f 'Topic+Partition+Offset: %t+%p+%o\tKey: %k\tValue: %s\n'
    Topic+Partition+Offset: test01+0+0      Key: "X"  Value: {"COL1": {"int": 1}, "COL2": {"string": "FOO"}}
    Topic+Partition+Offset: test01+0+1      Key: "Y"  Value: {"COL1": {"int": 2}, "COL2": {"string": "BAR"}}
    Topic+Partition+Offset: test01+0+2      Key: "Z"  Value: {"COL1": {"int": 1}, "COL2": {"string": "WOO"}}
    Topic+Partition+Offset: test01+0+3      Key: "Y"  Value: {"COL1": {"int": 4}, "COL2": {"string": "PFF"}}
  2. Let’s recreate the connector and use the Kafka message key as the document ID to enable updates & deletes against existing documents.

    • ksqlDB - drop the connector

      • DROP CONNECTOR SINK_ELASTIC_TEST_01;

    • bash - delete the existing index in Elasticsearch (drop the connector first otherwise you’ll see the index get recreated)

      • docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test01"

    • In ksqlDB create the connector as before but with key.ignore=false.

      Note
      The connector is given a new name. If you give it the same as before then Kafka Connect will assume it’s the same connector and not re-send any of the existing records.
      CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02 WITH (
        'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
        'connection.url'                      = 'http://elasticsearch:9200',
        'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
        'value.converter.schema.registry.url' = 'http://schema-registry:8081',
        'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
        'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
        'type.name'                           = '_doc',
        'topics'                              = 'test01',
        'key.ignore'                          = 'false',
        'schema.ignore'                       = 'false'
      );

      Check the new data in Elasticsearch:

      curl -s http://localhost:9200/test01/_search \
          -H 'content-type: application/json' \
          -d '{ "size": 42  }' | jq -c '.hits.hits[]'
      {"_index":"test01","_type":"_doc","_id":"X","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}
      {"_index":"test01","_type":"_doc","_id":"Y","_score":1,"_source":{"COL1":4,"COL2":"PFF"}}
      {"_index":"test01","_type":"_doc","_id":"Z","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}

      Note that _id now maps the key of the Kafka message, and that the value for message key/document id Y has been updated in place. Here’s the data in the Kafka topic in ksqlDB:

      ksql> SET 'auto.offset.reset' = 'earliest';
      ksql> SELECT ALPHA, COL1, COL2 FROM TEST01 EMIT CHANGES LIMIT 4;
      +-------+------+-----+
      |ALPHA  |COL1  |COL2 |
      +-------+------+-----+
      |X      |1     |FOO  |
      |Y      |2     |BAR  |
      |Z      |1     |WOO  |
      |Y      |4     |PFF  |

Deleting documents in Elasticsearch with Tombstone messages

What about deletes? We can do those too, using tombstone (null value) messages. By default the connector will ignore these but there’s an option to process them as deletes - behavior.on.null.values.

  • ksqlDB - drop the connector

    DROP CONNECTOR SINK_ELASTIC_TEST_02;
  • bash - delete the existing index in Elasticsearch (drop the connector first otherwise you’ll see the index get recreated)

    docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test01"

In ksqlDB create the connector as before but with behavior.on.null.values=delete.

Note
The connector is given a new name. If you give it the same as before then Kafka Connect will assume it’s the same connector and not re-send any of the existing records.
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_03 WITH (
  'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'                      = 'http://elasticsearch:9200',
  'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url' = 'http://schema-registry:8081',
  'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
  'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
  'type.name'                           = '_doc',
  'topics'                              = 'test01',
  'key.ignore'                          = 'false',
  'schema.ignore'                       = 'false',
  'behavior.on.null.values'             = 'delete'
);

Remind ourselves of source data in ksqlDB:

PRINT test01 FROM BEGINNING;
rowtime: 4/30/20 4:24:12 PM UTC, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 4/30/20 4:24:12 PM UTC, key: Y, value: {"COL1": 2, "COL2": "BAR"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Z, value: {"COL1": 1, "COL2": "WOO"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Y, value: {"COL1": 4, "COL2": "PFF"}

Current Elasticsearch state:

curl -s http://localhost:9200/test01/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test01","_type":"_doc","_id":"X","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}
{"_index":"test01","_type":"_doc","_id":"Y","_score":1,"_source":{"COL1":4,"COL2":"PFF"}}
{"_index":"test01","_type":"_doc","_id":"Z","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}

Now send a tombstone message by writing a NULL value to the underlying topic:

CREATE STREAM TEST01_TOMBSTONE (ALPHA VARCHAR KEY,COL1 VARCHAR)
  WITH (KAFKA_TOPIC='test01', VALUE_FORMAT='KAFKA', KEY_FORMAT='AVRO');

INSERT INTO TEST01_TOMBSTONE (ALPHA, COL1) VALUES ('Y',CAST(NULL AS VARCHAR));

Check the topic:

PRINT test01 FROM BEGINNING;
rowtime: 4/30/20 4:24:12 PM UTC, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 4/30/20 4:24:12 PM UTC, key: Y, value: {"COL1": 2, "COL2": "BAR"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Z, value: {"COL1": 1, "COL2": "WOO"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Y, value: {"COL1": 4, "COL2": "PFF"}
rowtime: 4/30/20 4:27:50 PM UTC, key: Y, value: <null>

Check Elasticsearch to see that document with key Y has been deleted:

curl -s http://localhost:9200/test01/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test01","_type":"_doc","_id":"X","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}
{"_index":"test01","_type":"_doc","_id":"Z","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}

Schemas (& general troubleshooting)

  • schemas.ignore=false means that Kafka Connect will define the index mapping based on the schema of the source data

    • If you use this it is mandatory to have a source schema (e.g. Avro, Protobuf, JSON Schema etc — NOT plain JSON)

  • schemas.ignore=true means Kafka Connect will just send the values and let Elasticsearch figure out how to map them using dynamic field mapping and optionally dynamic templates that you define in advance.

Set up some JSON data in a topic:

CREATE STREAM TEST_JSON (COL1 INT, COL2 VARCHAR) WITH (KAFKA_TOPIC='TEST_JSON', PARTITIONS=1, VALUE_FORMAT='JSON');
INSERT INTO TEST_JSON (COL1, COL2) VALUES (1,'FOO');
INSERT INTO TEST_JSON (COL1, COL2) VALUES (2,'BAR');

Error 1 (reading JSON data with Avro converter)

Try streaming this JSON data to to Elasticsearch

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://elasticsearch:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'type.name'               = '_doc',
  'topics'                  = 'TEST_JSON',
  'key.ignore'              = 'true',
  'schema.ignore'           = 'false'
);

Connector fails. Why?

DESCRIBE CONNECTOR SINK_ELASTIC_TEST_JSON_A;

Name                 : SINK_ELASTIC_TEST_JSON_A
Class                : io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
----------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic TEST_JSON to Avro:
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

----------------------------------------------------------------------------------------------------------------------------------

Error within this is:

org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic TEST_JSON to Avro:
…
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

We’re reading JSON data but using the Avro converter (as specified as the default converter for the worker) in the Docker Compose:

  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.1.0

    environment:
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

Error 2 (reading JSON data and expecting a schema)

So recreate the connector and specify JSON converter (because we’re reading JSON data from the topic)

DROP CONNECTOR SINK_ELASTIC_TEST_JSON_A;
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://elasticsearch:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'         = 'org.apache.kafka.connect.json.JsonConverter',
  'value.converter.schemas.enable' = 'true',
  'type.name'               = '_doc',
  'topics'                  = 'TEST_JSON',
  'key.ignore'              = 'true',
  'schema.ignore'           = 'false'
);

Fails

DESCRIBE CONNECTOR SINK_ELASTIC_TEST_JSON_A;

Name                 : SINK_ELASTIC_TEST_JSON_A
Class                : io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
----------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:370)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 13 more

----------------------------------------------------------------------------------------------------------------------------------

Nested error:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

We’re reading JSON data but have told the converter to look for a schema (schemas.enable) which we don’t have.

Error 3 (Connector requires a schema but there isn’t one)

Recreate the connector and set the converter to not expect a schema embedded in the JSON data (value.converter.schemas.enable' = 'false'):

DROP CONNECTOR SINK_ELASTIC_TEST_JSON_A;
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://elasticsearch:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'         = 'org.apache.kafka.connect.json.JsonConverter',
  'value.converter.schemas.enable' = 'false',
  'type.name'               = '_doc',
  'topics'                  = 'TEST_JSON',
  'key.ignore'              = 'true',
  'schema.ignore'           = 'false'
);

Connector fails

ksql> DESCRIBE CONNECTOR SINK_ELASTIC_TEST_JSON_A;

Name                 : SINK_ELASTIC_TEST_JSON_A
Class                : io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State  | Error Trace
------------------------------------------------------------------------------------------------------------------------------
 0       | FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
        at io.confluent.connect.elasticsearch.Mapping.buildMapping(Mapping.java:81)
        at io.confluent.connect.elasticsearch.Mapping.buildMapping(Mapping.java:68)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.createMapping(ElasticsearchClient.java:212)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.checkMapping(ElasticsearchSinkTask.java:121)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:91)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more

------------------------------------------------------------------------------------------------------------------------------

Nested error:

org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.

The connector is being told that we will supply a schema with the data that will be used to create the Elasticsearch mapping:

'schema.ignore'           = 'false'

BUT we do not have a declared schema in the data.

Success!

DROP CONNECTOR SINK_ELASTIC_TEST_JSON_A;
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://elasticsearch:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'         = 'org.apache.kafka.connect.json.JsonConverter',
  'value.converter.schemas.enable' = 'false',
  'type.name'               = '_doc',
  'topics'                  = 'TEST_JSON',
  'key.ignore'              = 'true',
  'schema.ignore'           = 'true'
);
ksql> DESCRIBE CONNECTOR SINK_ELASTIC_TEST_JSON_A;

Name                 : SINK_ELASTIC_TEST_JSON_A
Class                : io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Type                 : sink
State                : RUNNING
WorkerId             : kafka-connect:8083

 Task ID | State   | Error Trace
---------------------------------
 0       | RUNNING |
---------------------------------

Data is in Elasticsearch:

➜ curl -s http://localhost:9200/test_json/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test_json","_type":"_doc","_id":"TEST_JSON+0+0","_score":1,"_source":{"COL2":"FOO","COL1":1}}
{"_index":"test_json","_type":"_doc","_id":"TEST_JSON+0+1","_score":1,"_source":{"COL2":"BAR","COL1":2}}

Timestamps

CREATE STREAM TEST02 (COL0 VARCHAR KEY, COL1 INT, ORDER_TS_EPOCH BIGINT, SHIP_TS_STR VARCHAR)
  WITH (KAFKA_TOPIC='test02', PARTITIONS=1, VALUE_FORMAT='AVRO');

INSERT INTO TEST02 (COL0, COL1, ORDER_TS_EPOCH, SHIP_TS_STR)
  VALUES ('MY_KEY__X',
          1,
          STRINGTOTIMESTAMP('2020-02-17T15:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'),
          '2020-02-17T15:22:00Z');

INSERT INTO TEST02 (COL0, COL1, ORDER_TS_EPOCH, SHIP_TS_STR)
  VALUES ('MY_KEY__Y',
          1,
          STRINGTOTIMESTAMP('2020-02-17T15:26:00Z','yyyy-MM-dd''T''HH:mm:ssX'),
          '2020-02-17T15:26:00Z');
PRINT test02 FROM BEGINNING;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 5/4/20 10:24:46 AM UTC, key: [M@6439948753387347800/-], value: {"COL1": 1, "ORDER_TS_EPOCH": 1581952920000, "SHIP_TS_STR": "2020-02-17T15:22:00Z"}
rowtime: 5/4/20 10:24:47 AM UTC, key: [M@6439948753387347801/-], value: {"COL1": 1, "ORDER_TS_EPOCH": 1581953160000, "SHIP_TS_STR": "2020-02-17T15:26:00Z"}
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_A WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://elasticsearch:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'= 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url'= 'http://schema-registry:8081',
  'type.name'               = '_doc',
  'topics'                  = 'test02',
  'key.ignore'              = 'false',
  'schema.ignore'           = 'false'
);

Check we’ve got data:

curl -s http://localhost:9200/test02/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test02","_type":"_doc","_id":"MY_KEY__Y","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581953160000,"SHIP_TS_STR":"2020-02-17T15:26:00Z"}}
{"_index":"test02","_type":"_doc","_id":"MY_KEY__X","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581952920000,"SHIP_TS_STR":"2020-02-17T15:22:00Z"}}

Check the mappings - note neither of the timestamps are date types

curl -s http://localhost:9200/test02/_mapping | jq '.'
{
  "test02": {
    "mappings": {
      "properties": {
        "COL1": {
          "type": "integer"
        },
        "ORDER_TS_EPOCH": {
          "type": "long"
        },
        "SHIP_TS_STR": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}

Drop the connector

DROP CONNECTOR SINK_ELASTIC_TEST_02_A;

Drop the index

docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"

Let Elasticsearch guess at the data types (dynamic field mapping)

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_B WITH (
  'connector.class'         = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'          = 'http://elasticsearch:9200',
  'key.converter'           = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'= 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url'= 'http://schema-registry:8081',
  'type.name'               = '_doc',
  'topics'                  = 'test02',
  'key.ignore'              = 'false',
  'schema.ignore'           = 'true'
);

Picks up string (SHIP_TS_STR) because it looks like one, but not the epoch (ORDER_TS_EPOCH)

curl -s http://localhost:9200/test02/_mapping | jq '.'
{
  "test02": {
    "mappings": {
      "properties": {
        "COL1": {
          "type": "long"
        },
        "ORDER_TS_EPOCH": {
          "type": "long"
        },
        "SHIP_TS_STR": {
          "type": "date"
        }
      }
    }
  }
}

Drop the connector

DROP CONNECTOR SINK_ELASTIC_TEST_02_B;

Drop the index

docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"

Specify field as a Timestamp using a Single Message Transform

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_C WITH (
  'connector.class'                          = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'                           = 'http://elasticsearch:9200',
  'key.converter'                            = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'                          = 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url'      = 'http://schema-registry:8081',
  'type.name'                                = '_doc',
  'topics'                                   = 'test02',
  'key.ignore'                               = 'false',
  'schema.ignore'                            = 'false',
  'transforms'                               = 'setTimestampType0',
  'transforms.setTimestampType0.type'        = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
  'transforms.setTimestampType0.field'       = 'ORDER_TS_EPOCH',
  'transforms.setTimestampType0.target.type' = 'Timestamp'
);
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
  "test02": {
    "mappings": {
      "properties": {
        "COL1": {
          "type": "integer"
        },
        "ORDER_TS_EPOCH": {
          "type": "date"
        },
        "SHIP_TS_STR": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}

Drop the connector

DROP CONNECTOR SINK_ELASTIC_TEST_02_C;

Drop the index

docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"

Declare the timestamp type in Elasticsearch in advance with Dynamic Template

Create dynamic template

curl -s -XPUT "http://localhost:9200/_template/rmoff/" -H 'Content-Type: application/json' -d'
          {
            "template": "*",
            "mappings": { "dynamic_templates": [ { "dates": { "match": "*_TS_*", "mapping": { "type": "date" } } } ]  }
          }'

Create the connector

Note
schema.ignore is set to true, since we want Elasticsearch to use its dynamic field mapping and thus dynamic templates to determine the mapping types.
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_D WITH (
  'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'                      = 'http://elasticsearch:9200',
  'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url' = 'http://schema-registry:8081',
  'type.name'                           = '_doc',
  'topics'                              = 'test02',
  'key.ignore'                          = 'false',
  'schema.ignore'                       = 'true'
);
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
  "test02": {
    "mappings": {
      "dynamic_templates": [
        {
          "dates": {
            "match": "*_TS_*",
            "mapping": {
              "type": "date"
            }
          }
        }
      ],
      "properties": {
        "COL1": {
          "type": "long"
        },
        "ORDER_TS_EPOCH": {
          "type": "date"
        },
        "SHIP_TS_STR": {
          "type": "date"
        }
      }
    }
  }
}

Drop connector :

DROP CONNECTOR SINK_ELASTIC_TEST_02_D;

Drop index

docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"

Drop dynamic template

docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/_template/rmoff/"

Add Kafka message timestamp as Elasticsearch timestamp field

What about if we want to use the Kafka message’s timestamp? Producer can set this, no point duplicating it in the message value itself.

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_E WITH (
  'connector.class'                             = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'                              = 'http://elasticsearch:9200',
  'key.converter'                               = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'                             = 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url'         = 'http://schema-registry:8081',
  'type.name'                                   = '_doc',
  'topics'                                      = 'test02',
  'key.ignore'                                  = 'false',
  'schema.ignore'                               = 'false',
  'transforms'                                  = 'ExtractTimestamp',
  'transforms.ExtractTimestamp.type'            = 'org.apache.kafka.connect.transforms.InsertField$Value',
  'transforms.ExtractTimestamp.timestamp.field' = 'MSG_TS'
);

Elasticsearch data:

curl -s http://localhost:9200/test02/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test02","_type":"_doc","_id":"MY_KEY__X","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581952920000,"SHIP_TS_STR":"2020-02-17T15:22:00Z","MSG_TS":1588587886954}}
{"_index":"test02","_type":"_doc","_id":"MY_KEY__Y","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581953160000,"SHIP_TS_STR":"2020-02-17T15:26:00Z","MSG_TS":1588587887036}}

Mapping for MSG_TS is date but since dynamic mapping is in use and there’s no dynamic template the other two date fields are not seen as date:

curl -s http://localhost:9200/test02/_mapping | jq '.'
{
  "test02": {
    "mappings": {
      "properties": {
        "COL1": {
          "type": "integer"
        },
        "MSG_TS": {
          "type": "date"
        },
        "ORDER_TS_EPOCH": {
          "type": "long"
        },
        "SHIP_TS_STR": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  }
}

Alternatives include:

  1. schema.ignore=false and SMT to set timestamp types (`org.apache.kafka.connect.transforms.TimestampConverter)

  2. schema.ignore=true and use a dynamic template

  3. schema.ignore=true and SMT to force MSG_TS to string so that Elasticsearch can guess at it correctly - see below

Drop connector

DROP CONNECTOR SINK_ELASTIC_TEST_02_E;

Drop index

docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"

Create connector

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_F WITH (
  'connector.class'                             = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'                              = 'http://elasticsearch:9200',
  'key.converter'                               = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'                             = 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url'         = 'http://schema-registry:8081',
  'type.name'                                   = '_doc',
  'topics'                                      = 'test02',
  'key.ignore'                                  = 'false',
  'schema.ignore'                               = 'true',
  'transforms'                                  = 'ExtractTimestamp, setTimestampType',
  'transforms.ExtractTimestamp.type'            = 'org.apache.kafka.connect.transforms.InsertField$Value',
  'transforms.ExtractTimestamp.timestamp.field' = 'MSG_TS',
  'transforms.setTimestampType.type'            = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
  'transforms.setTimestampType.field'           = 'MSG_TS',
  'transforms.setTimestampType.target.type'     = 'string',
  'transforms.setTimestampType.format'          = 'yyyy-MM-dd\''T\''HH:mm:ssX'
);
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
  "test02": {
    "mappings": {
      "properties": {
        "COL1": {
          "type": "long"
        },
        "MSG_TS": {
          "type": "date"
        },
        "ORDER_TS_EPOCH": {
          "type": "long"
        },
        "SHIP_TS_STR": {
          "type": "date"
        }
      }
    }
  }
}

Index naming and partitioning

Index name by default is the topic name, forced to lowercase automagically if necessary:

docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02                   2

Change target index name with RegEx

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_04 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'  = 'http://elasticsearch:9200',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'type.name'       = '_doc',
  'topics'          = 'test02',
  'key.ignore'      = 'true',
  'schema.ignore'   = 'true',
  'transforms'      = 'changeIndexname',
  'transforms.changeIndexname.type'        = 'org.apache.kafka.connect.transforms.RegexRouter',
  'transforms.changeIndexname.regex'       = '(.*)02',
  'transforms.changeIndexname.replacement' = 'foo-$1'
);
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02                   2
foo-test                 2

Use date / time in the target index name

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_05 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'  = 'http://elasticsearch:9200',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'type.name'       = '_doc',
  'topics'          = 'test02',
  'key.ignore'      = 'true',
  'schema.ignore'   = 'true',
  'transforms'      = 'appendTimestampToIX',
  'transforms.appendTimestampToIX.type'        = 'org.apache.kafka.connect.transforms.TimestampRouter',
  'transforms.appendTimestampToIX.topic.format' = '${topic}-${timestamp}',
  'transforms.appendTimestampToIX.timestamp.format' = 'yyyy-MM-dd'
);
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02                   2
test02-2020-05-01        2
foo-test                 2

Use both regex and date/time in target index name

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_06 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'  = 'http://elasticsearch:9200',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'type.name'       = '_doc',
  'topics'          = 'test02',
  'key.ignore'      = 'true',
  'schema.ignore'   = 'true',
  'transforms'      = 'changeIndexname,appendTimestampToIX',
  'transforms.changeIndexname.type'        = 'org.apache.kafka.connect.transforms.RegexRouter',
  'transforms.changeIndexname.regex'       = '(.*)02',
  'transforms.changeIndexname.replacement' = 'foo-$1',
  'transforms.appendTimestampToIX.type'        = 'org.apache.kafka.connect.transforms.TimestampRouter',
  'transforms.appendTimestampToIX.topic.format' = '${topic}-${timestamp}',
  'transforms.appendTimestampToIX.timestamp.format' = 'yyyy-MM-dd'
);
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02                   2
test02-2020-05-01        2
foo-test                 2
foo-test-2020-05-01      2

Error Handling in Kafka Connect and Elasticsearch Sink connector

Note
This section also illustrates working with Kafka Connect using the REST API directly instead of the ksqlDB interface as shown above.

Write to a topic:

echo '1:{"a":1}' | \ docker exec -i kafkacat kafkacat \ -b broker:29092 \ -P -t test03 -Z -K:

For info you can read from the topic if you want to:

docker exec kafkacat kafkacat \
        -b broker:29092 \
        -C -o beginning -u -q \
        -t test03 \
        -f 'Topic+Partition+Offset: %t+%p+%o\tKey: %k\tValue: %s\n'

Create the connector:

curl -i -X PUT -H  "Content-Type:application/json" \
  http://localhost:8083/connectors/sink-elastic-test03/config \
  -d '{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter"                   : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter"                 : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"  : "false",
    "topics"                          : "test03",
    "connection.url"                  : "http://elasticsearch:9200",
    "type.name"                       : "_doc",
    "key.ignore"                      : "false",
    "schema.ignore"                   : "true"
}'

Works as designed

curl -s http://localhost:9200/test03/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}

Now send a bad message (malformed JSON)

echo '1:{"fieldnamewithoutclosingquote:1}' | \
  docker exec -i kafkacat kafkacat \
          -b broker:29092 \
          -P -t test03 -Z -K:

Check connector status

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
sink  |  sink-elastic-test03   |  RUNNING  |  FAILED   |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

Check error

curl -s http://localhost:8083/connectors/sink-elastic-test03/status | jq -r '.tasks[].trace'
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
 at [Source: (byte[])"{"fieldnamewithoutclosingquote:1}"; line: 1, column: 34]

Ignore messages that cannot be deserialised

Add error handling

"errors.tolerance"                : "all",
"errors.log.enable"               : "true"
"errors.log.include.messages"     : "true"

This uses a PUT which creates the config if not there, and updates it if it is. Much easier than delete/create each time.

curl -i -X PUT -H  "Content-Type:application/json" \
  http://localhost:8083/connectors/sink-elastic-test03/config \
  -d '{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter"                   : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter"                 : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"  : "false",
    "topics"                          : "test03",
    "connection.url"                  : "http://elasticsearch:9200",
    "type.name"                       : "_doc",
    "key.ignore"                      : "false",
    "schema.ignore"                   : "true",
    "errors.tolerance"                : "all",
    "errors.log.enable"               : "true",
    "errors.log.include.messages"     : "true"
}'

Connector runs:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
sink  |  sink-elastic-test03  |  RUNNING  |  RUNNING  |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

Logs an message for the malformed message:

docker logs kafka-connect

Validate that the pipeline is running by sending a good message

echo '3:{"a":3}' | \
  docker exec -i kafkacat kafkacat \
          -b broker:29092 \
          -P -t test03 -Z -K:

Verify it’s present in Elasticsearch:

curl -s http://localhost:9200/test03/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}
{"_index":"test03","_type":"_doc","_id":"3","_score":1,"_source":{"a":3}}

Setting up a dead letter queue for Elasticsearch sink

curl -i -X PUT -H  "Content-Type:application/json" \
  http://localhost:8083/connectors/sink-elastic-test03/config \
  -d '{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter"                   : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter"                 : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"  : "false",
    "topics"                          : "test03",
    "connection.url"                  : "http://elasticsearch:9200",
    "type.name"                       : "_doc",
    "key.ignore"                      : "false",
    "schema.ignore"                   : "true",
    "errors.tolerance"                : "all",
    "errors.log.enable"               : "true",
    "errors.log.include.messages"     : "true",
    "errors.deadletterqueue.topic.name":"dlq_sink-elastic-test03",
    "errors.deadletterqueue.topic.replication.factor": 1,
    "errors.deadletterqueue.context.headers.enable":true
}'

Send a badly-formed message

echo '4:{never gonna give you up}' | \
  docker exec -i kafkacat kafkacat \
          -b broker:29092 \
          -P -t test03 -Z -K:

Look at the dead letter queue topic:

docker exec kafkacat kafkacat \
        -b broker:29092 \
        -C -o beginning -u -q \
        -t dlq_sink-elastic-test03 \
        -f '%t\tKey: %k\tValue: %s\nHeaders: %h\n'
dlq_sink-elastic-test03 Key: 4  Value: {never gonna give you up}
Headers: __connect.errors.topic=test03,__connect.errors.partition=0,__connect.errors.offset=3,__connect.errors.connector.name=sink-elastic-te
st03,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverte
r,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Converting byte[] to
 Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException:
Converting byte[] to Kafka Connect data failed due to serialization error:
…
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('n' (c
ode 110)): was expecting double-quote to start field name
 at [Source: (byte[])"{never gonna give you up}"; line: 1, column: 3]

Note how the full stack trace for the error is available from the header of the Kafka message, along with details of its source message offset etc

Dealing with correctly-formed messages that are invalid for Elasticsearch

Target mapping has field a with type long:

curl -s http://localhost:9200/test03/_mapping | jq '.'
{
  "test03": {
    "mappings": {
      "properties": {
        "a": {
          "type": "long"
        }
      }
    }
  }
}

What if you send through a value that’s not long?

echo '5:{"a":"this is valid JSON but is string content"}' | \
  docker exec -i kafkacat kafkacat \
          -b broker:29092 \
          -P -t test03 -Z -K:

Message doesn’t arrive in Elasticsearch:

➜ curl -s http://localhost:9200/test03/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}
{"_index":"test03","_type":"_doc","_id":"3","_score":1,"_source":{"a":3}}

Check connector status

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
sink  |  sink-elastic-test03  |  RUNNING  |  FAILED  |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

Why’s it crashed?

curl -s http://localhost:8083/connectors/sink-elastic-test03/status | jq -r '.tasks[].trace'
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse field [a] of type [long] in document with id '5'. Preview of field's value: 'this is valid JSON but is string content'","caused_by":{"type":"illegal_argument_exception","reason":"For input string: \"this is valid JSON but is string content\""}}]
…

Set "behavior.on.malformed.documents" : "warn":

curl -i -X PUT -H  "Content-Type:application/json" \
  http://localhost:8083/connectors/sink-elastic-test03/config \
  -d '{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter"                   : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter"                 : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"  : "false",
    "topics"                          : "test03",
    "connection.url"                  : "http://elasticsearch:9200",
    "type.name"                       : "_doc",
    "key.ignore"                      : "false",
    "schema.ignore"                   : "true",
    "errors.tolerance"                : "all",
    "errors.log.enable"               : "true",
    "errors.log.include.messages"     : "true",
    "errors.deadletterqueue.topic.name":"dlq_sink-elastic-test03",
    "errors.deadletterqueue.topic.replication.factor": 1,
    "errors.deadletterqueue.context.headers.enable":true,
    "behavior.on.malformed.documents" : "warn"
}'

Send some more data through

echo '6:{"a":42}' | \
  docker exec -i kafkacat kafkacat \
          -b broker:29092 \
          -P -t test03 -Z -K:

Pipeline is working

curl -s http://localhost:9200/test03/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 42  }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}
{"_index":"test03","_type":"_doc","_id":"3","_score":1,"_source":{"a":3}}
{"_index":"test03","_type":"_doc","_id":"6","_score":1,"_source":{"a":42}}

Video Tutorial

🎥 Check out the video tutorial here: https://rmoff.dev/kafka-elasticsearch-video