Skip to content

Commit

Permalink
pipeline: outputs: es: support overriding the most plugin parameters …
Browse files Browse the repository at this point in the history
…with Upstream node configuration

For Elastic cloud authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: cloud_id.

For AWS authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: http_proxy, no_proxy, tls*.

Signed-off-by: Marat Abrarov <[email protected]>
  • Loading branch information
mabrarov committed Jul 7, 2023
1 parent 3434d61 commit 333d579
Show file tree
Hide file tree
Showing 6 changed files with 702 additions and 136 deletions.
73 changes: 37 additions & 36 deletions plugins/out_es/es.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "es.h"
#include "es_conf.h"
#include "es_conf_prop.h"
#include "es_bulk.h"
#include "murmur3.h"

Expand Down Expand Up @@ -1030,86 +1031,86 @@ static int cb_es_exit(void *data, struct flb_config *config)
/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_INDEX, FLB_ES_DEFAULT_INDEX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index),
"Set an index name"
},
{
FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TYPE, FLB_ES_DEFAULT_TYPE,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type),
"Set the document type property"
},
{
FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, suppress_type_name),
"If true, mapping types is removed. (for v7.0.0 or later)"
},

/* HTTP Authentication */
{
FLB_CONFIG_MAP_STR, "http_user", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_USER, NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_user),
"Optional username credential for Elastic X-Pack access"
},
{
FLB_CONFIG_MAP_STR, "http_passwd", "",
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, "",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_passwd),
"Password for user defined in HTTP_User"
},

/* HTTP Compression */
{
FLB_CONFIG_MAP_STR, "compress", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_COMPRESS, NULL,
0, FLB_FALSE, 0,
"Set payload compression mechanism. Option available is 'gzip'"
},

/* Cloud Authentication */
{
FLB_CONFIG_MAP_STR, "cloud_id", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_ID, NULL,
0, FLB_FALSE, 0,
"Elastic cloud ID of the cluster to connect to"
},
{
FLB_CONFIG_MAP_STR, "cloud_auth", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, NULL,
0, FLB_FALSE, 0,
"Elastic cloud authentication credentials"
},

/* AWS Authentication */
#ifdef FLB_HAVE_AWS
{
FLB_CONFIG_MAP_BOOL, "aws_auth", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_AWS_AUTH, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, has_aws_auth),
"Enable AWS Sigv4 Authentication"
},
{
FLB_CONFIG_MAP_STR, "aws_region", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_REGION, NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_region),
"AWS Region of your Amazon OpenSearch Service cluster"
},
{
FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_sts_endpoint),
"Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option"
},
{
FLB_CONFIG_MAP_STR, "aws_role_arn", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, NULL,
0, FLB_FALSE, 0,
"AWS IAM Role to assume to put records to your Amazon OpenSearch cluster"
},
{
FLB_CONFIG_MAP_STR, "aws_external_id", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, NULL,
0, FLB_FALSE, 0,
"External ID for the AWS IAM Role specified with `aws_role_arn`"
},
{
FLB_CONFIG_MAP_STR, "aws_service_name", "es",
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, "es",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_service_name),
"AWS Service Name"
},
{
FLB_CONFIG_MAP_STR, "aws_profile", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_profile),
"AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
"$HOME/.aws/ directory."
Expand All @@ -1118,68 +1119,68 @@ static struct flb_config_map config_map[] = {

/* Logstash compatibility */
{
FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_format),
"Enable Logstash format compatibility"
},
{
FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, FLB_ES_DEFAULT_PREFIX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix),
"When Logstash_Format is enabled, the Index name is composed using a prefix "
"and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
"become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
"when the data is being generated"
},
{
FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-",
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, "-",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_separator),
"Set a separator between logstash_prefix and date."
},
{
FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key),
"When included: the value in the record that belongs to the key will be looked "
"up and over-write the Logstash_Prefix for index generation. If the key/value "
"is not found in the record then the Logstash_Prefix option will act as a "
"fallback. Nested keys are supported through record accessor pattern"
},
{
FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, FLB_ES_DEFAULT_TIME_FMT,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat),
"Time format (based on strftime) to generate the second part of the Index name"
},

/* Custom Time and Tag keys */
{
FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY, FLB_ES_DEFAULT_TIME_KEY,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key),
"When Logstash_Format is enabled, each record will get a new timestamp field. "
"The Time_Key property defines the name of that field"
},
{
FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, FLB_ES_DEFAULT_TIME_KEYF,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format),
"When Logstash_Format is enabled, this property defines the format of the "
"timestamp"
},
{
FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_nanos),
"When Logstash_Format is enabled, enabling this property sends nanosecond "
"precision timestamps"
},
{
FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, include_tag_key),
"When enabled, it append the Tag name to the record"
},
{
FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TAG_KEY, FLB_ES_DEFAULT_TAG_KEY,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key),
"When Include_Tag_Key is enabled, this property defines the key name for the tag"
},
{
FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX,
FLB_CONFIG_MAP_SIZE, FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, FLB_ES_DEFAULT_HTTP_MAX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, buffer_size),
"Specify the buffer size used to read the response from the Elasticsearch HTTP "
"service. This option is useful for debugging purposes where is required to read "
Expand All @@ -1190,64 +1191,64 @@ static struct flb_config_map config_map[] = {

/* Elasticsearch specifics */
{
FLB_CONFIG_MAP_STR, "path", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PATH, NULL,
0, FLB_FALSE, 0,
"Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also "
"possible to serve Elasticsearch behind a reverse proxy on a subpath. This "
"option defines such path on the fluent-bit side. It simply adds a path "
"prefix in the indexing HTTP POST URI"
},
{
FLB_CONFIG_MAP_STR, "pipeline", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PIPELINE, NULL,
0, FLB_FALSE, 0,
"Newer versions of Elasticsearch allows to setup filters called pipelines. "
"This option allows to define which pipeline the database should use. For "
"performance reasons is strongly suggested to do parsing and filtering on "
"Fluent Bit side, avoid pipelines"
},
{
FLB_CONFIG_MAP_BOOL, "generate_id", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_GENERATE_ID, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, generate_id),
"When enabled, generate _id for outgoing records. This prevents duplicate "
"records when retrying ES"
},
{
FLB_CONFIG_MAP_STR, "write_operation", "create",
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, "create",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, write_operation),
"Operation to use to write in bulk requests"
},
{
FLB_CONFIG_MAP_STR, "id_key", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_ID_KEY, NULL,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, id_key),
"If set, _id will be the value of the key from incoming record."
},
{
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, replace_dots),
"When enabled, replace field name dots with underscore, required by Elasticsearch "
"2.0-2.3."
},

{
FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, current_time_index),
"Use current time for index generation instead of message record"
},

/* Trace */
{
FLB_CONFIG_MAP_BOOL, "trace_output", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_output),
"When enabled print the Elasticsearch API calls to stdout (for diag only)"
},
{
FLB_CONFIG_MAP_BOOL, "trace_error", "false",
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, "false",
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error),
"When enabled print the Elasticsearch exception to stderr (for diag only)"
},

{
FLB_CONFIG_MAP_STR, "upstream", NULL,
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_UPSTREAM, NULL,
0, FLB_FALSE, 0,
"Path to 'upstream' configuration file (define multiple nodes)"
},
Expand Down
26 changes: 20 additions & 6 deletions plugins/out_es/es.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,24 @@
#define FLB_ES_DEFAULT_TAG_KEY "flb-key"
#define FLB_ES_DEFAULT_HTTP_MAX "512k"
#define FLB_ES_DEFAULT_HTTPS_PORT 443
#define FLB_ES_WRITE_OP_INDEX "index"
#define FLB_ES_WRITE_OP_CREATE "create"
#define FLB_ES_WRITE_OP_UPDATE "update"
#define FLB_ES_WRITE_OP_UPSERT "upsert"

struct flb_elasticsearch_config {
/* Elasticsearch index (database) and type (table) */
char *index;
int own_index;
char *type;
int own_type;
char suppress_type_name;
int suppress_type_name;

/* HTTP Auth */
char *http_user;
char *http_passwd;

/* Elastic Cloud Auth */
char *cloud_user;
int own_cloud_user;
char *cloud_passwd;
int own_cloud_passwd;

/* AWS Auth */
#ifdef FLB_HAVE_AWS
Expand All @@ -62,12 +60,17 @@ struct flb_elasticsearch_config {
char *aws_sts_endpoint;
char *aws_profile;
struct flb_aws_provider *aws_provider;
int own_aws_provider;
struct flb_aws_provider *base_aws_provider;
int own_base_aws_provider;
/* tls instances can't be re-used; aws provider requires a separate one */
struct flb_tls *aws_tls;
int own_aws_tls;
struct flb_tls *aws_sts_tls;
int own_aws_sts_tls;
char *aws_service_name;
struct mk_list *aws_unsigned_headers;
int own_aws_unsigned_headers;
#endif

/* HTTP Client Setup */
Expand All @@ -94,40 +97,51 @@ struct flb_elasticsearch_config {

/* prefix */
flb_sds_t logstash_prefix;
int own_logstash_prefix;
flb_sds_t logstash_prefix_separator;
int own_logstash_prefix_separator;

/* prefix key */
flb_sds_t logstash_prefix_key;
int own_logstash_prefix_key;

/* date format */
flb_sds_t logstash_dateformat;
int own_logstash_dateformat;

/* time key */
flb_sds_t time_key;
int own_time_key;

/* time key format */
flb_sds_t time_key_format;
int own_time_key_format;

/* time key nanoseconds */
int time_key_nanos;

/* write operation */
flb_sds_t write_operation;
int own_write_operation;
/* write operation elasticsearch operation */
flb_sds_t es_action;
const char *es_action;

/* id_key */
flb_sds_t id_key;
int own_id_key;
struct flb_record_accessor *ra_id_key;
int own_ra_id_key;

/* include_tag_key */
int include_tag_key;
flb_sds_t tag_key;
int own_tag_key;

/* Elasticsearch HTTP API */
char uri[256];

struct flb_record_accessor *ra_prefix_key;
int own_ra_prefix_key;

/* Compression mode (gzip) */
int compress_gzip;
Expand Down
Loading

0 comments on commit 333d579

Please sign in to comment.