diff --git a/avro_to_bigquery/converter.py b/avro_to_bigquery/converter.py index 015324b..38c814a 100644 --- a/avro_to_bigquery/converter.py +++ b/avro_to_bigquery/converter.py @@ -91,6 +91,23 @@ def _convert_complex_type(avro_type): field_type = AVRO_TO_BIGQUERY_TYPES[avro_type["items"]] elif avro_type["type"] == "enum": field_type = AVRO_TO_BIGQUERY_TYPES[avro_type["type"]] + elif avro_type["type"] == "map": + field_type = "RECORD" + mode = "REPEATED" + # Create artificial fields to represent map in BQ + key_field = { + "name": "key", + "type": "string", + "doc": "Key for map avro field", + } + value_field = { + "name": "value", + "type": avro_type["values"], + "doc": "Value for map avro field", + } + fields = tuple( + map(lambda f: _convert_field(f), [key_field, value_field]) + ) elif "logicalType" in avro_type: field_type = AVRO_TO_BIGQUERY_TYPES[avro_type["logicalType"]] else: diff --git a/tests/unit/test_converter.py b/tests/unit/test_converter.py index 805d5fe..ea4e243 100644 --- a/tests/unit/test_converter.py +++ b/tests/unit/test_converter.py @@ -100,6 +100,14 @@ def test_convert_avro_schema_to_bigquery_schema(): }, ], }, + {"name": "map_field", "type": {"type": "map", "values": "int"}}, + { + "name": "complex_map", + "type": { + "type": "map", + "values": {"type": "array", "items": "int"}, + }, + }, ], } @@ -107,7 +115,7 @@ def test_convert_avro_schema_to_bigquery_schema(): s = convert_schema(avs) # assert - assert len(s) == 15 + assert len(s) == 17 assert s[0].name == "full_name" assert s[1].field_type == "INTEGER" assert s[2].description == "Just a boolean tester" @@ -132,6 +140,16 @@ def test_convert_avro_schema_to_bigquery_schema(): assert s[13].field_type == "DATE" assert s[13].mode == "REPEATED" assert s[14].field_type == "STRING" + assert s[15].name == "map_field" + assert s[15].field_type == "RECORD" + assert s[15].mode == "REPEATED" + assert s[15].fields[0].field_type == "STRING" + assert s[15].fields[1].field_type == "INTEGER" + assert s[15].fields[0].name == "key" + assert s[15].fields[1].name == "value" + assert s[16].fields[0].field_type == "STRING" + assert s[16].fields[1].field_type == "INTEGER" + assert s[16].fields[1].mode == "REPEATED" def test_incorrect_nullable_field():