Skip to content

Commit

Permalink
Pass extrametadata as record instead of keys and values
Browse files Browse the repository at this point in the history
We define extrametadata as record schema. Currently it has only json schema and its length. We can add more if we need any useful thing.

We needed to pass json schema since avro writer omits field-ids while converting schema to json. Now, when we pass the json schema, it will use it instead with filled field-ids.
  • Loading branch information
aykut-bozkurt authored Jul 24, 2024
2 parents bffea61 + 0b86a09 commit fb5b0f7
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions lang/c/src/datafile.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ static int write_header(avro_file_writer_t w, avro_value_t *extra_meta)
avro_writer_t schema_writer;
const avro_encoding_t *enc = &avro_binary_encoding;
int64_t schema_len;
size_t extra_meta_index;
size_t extra_meta_size = 0;

const char *json_schema = NULL;
size_t json_schema_len = 0;
if (extra_meta != NULL)
{
check(rval, avro_value_get_size(extra_meta, &extra_meta_size));
avro_value_t meta_schema = {0};
check(rval, avro_value_get_by_name(extra_meta, "avroschema", &meta_schema, NULL));
check(rval, avro_value_get_string(&meta_schema, &json_schema, &json_schema_len));
}

/* Generate random sync */
Expand All @@ -95,34 +97,30 @@ static int write_header(avro_file_writer_t w, avro_value_t *extra_meta)
check(rval, avro_write(w->writer, "Obj", 3));
check(rval, avro_write(w->writer, &version, 1));

check(rval, enc->write_long(w->writer, 2 + extra_meta_size));
check(rval, enc->write_long(w->writer, 2));
check(rval, enc->write_string(w->writer, "avro.codec"));
check(rval, enc->write_bytes(w->writer, w->codec->name, strlen(w->codec->name)));
check(rval, enc->write_string(w->writer, "avro.schema"));
schema_writer =
avro_writer_memory(&w->schema_buf[0], sizeof(w->schema_buf));
rval = avro_schema_to_json(w->writers_schema, schema_writer);

if (json_schema) {
rval = avro_write(schema_writer, (char *)json_schema, strlen(json_schema));
}
else {
rval = avro_schema_to_json(w->writers_schema, schema_writer);
}

if (rval) {
avro_writer_free(schema_writer);
return rval;
}

schema_len = avro_writer_tell(schema_writer);
avro_writer_free(schema_writer);
check(rval,
enc->write_bytes(w->writer, w->schema_buf, schema_len));

for (extra_meta_index = 0; extra_meta_index < extra_meta_size; extra_meta_index++) {
avro_value_t child;
const char *key;
const void *value;
size_t value_len;

check(rval, avro_value_get_by_index(extra_meta, extra_meta_index, &child, &key));
check(rval, avro_value_get_bytes(&child, &value, &value_len));
check(rval, enc->write_string(w->writer, key));
check(rval, enc->write_bytes(w->writer, value, value_len));
}

check(rval, enc->write_long(w->writer, 0));
return write_sync(w);
}
Expand Down

0 comments on commit fb5b0f7

Please sign in to comment.