Skip to content

Commit

Permalink
Parquet writer: add SORT_BY_BBOX=YES/NO layer creation option
Browse files Browse the repository at this point in the history
Defaults to NO

Documentation:
```
- .. lco:: SORT_BY_BBOX
     :choices: YES, NO
     :default: NO
     :since: 3.9

     Whether features should be sorted based on the bounding box of their
     geometries, before being written in the final file. Sorting them enables
     faster spatial filtering on reading, by grouping together spatially close
     features in the same group of rows.

     Note however that enabling this option involves creating a temporary
     GeoPackage file (in the same directory as the final Parquet file),
     and thus requires temporary storage (possibly up to several times the size
     of the final Parquet file, depending on Parquet compression) and additional
     processing time.

     The efficiency of spatial filtering depends on the ROW_GROUP_SIZE. If it
     is too large, too many features that are not spatially close will be grouped
     together. If it is too small, the file size will increase, and extra
     processing time will be necessary to browse through the row groups.

     Note also that when this option is enabled, the Arrow writing API (which
     is for example triggered when using ogr2ogr to convert from Parquet to Parquet),
     fallbacks to the generic implementation, which does not support advanced
     Arrow types (lists, maps, etc.).
```

Experiments with the canonical
https://storage.googleapis.com/open-geodata/linz-examples/nz-building-outlines.parquet
dataset:

* Generation of datasets:

// Organize in row groups of 65,536 features, no BBOX, no sorting
```
$ time ogr2ogr out_no_bbox.parquet nz-building-outlines.parquet -progress -lco WRITE_COVERING_BBOX=NO
0...10...20...30...40...50...60...70...80...90...100 - done.

real    0m4,457s
```

// Organize in row groups of 65,536 features, add BBOX columns, no sorting
```
$ time ogr2ogr out_unsorted.parquet nz-building-outlines.parquet -progress
0...10...20...30...40...50...60...70...80...90...100 - done.

real    0m5,408s
```

// Organize in row groups of max 65,536 features, add BBOX columns, sort using RTree
```
$ time ogr2ogr out_sorted.parquet nz-building-outlines.parquet -progress -lco SORT_BY_BBOX=YES
0...10...20...30...40...50...60...70...80...90...100 - done.

real    0m40,311s
```

// Organize in row groups of max 16,384 features, add BBOX columns, sort using RTree
```
$ time ogr2ogr out_sorted_16384.parquet nz-building-outlines.parquet -progress -lco SORT_BY_BBOX=YES  -lco ROW_GROUP_SIZE=16384
0...10...20...30...40...50...60...70...80...90...100 - done.

real    0m44,149s
```

* File sizes:

```
out_no_bbox.parquet          436,475,127
out_unsorted.parquet         504,120,728
out_sorted.parquet           489,507,910
out_sorted_16384.parquet     492,760,561
```

* Spatial filter selecting a single feature:
```
$ time ogrinfo out_no_bbox.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount
1

real    0m1,302s

$ time ogrinfo out_unsorted.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount
1

real    0m0,947s

$ time ogrinfo out_sorted.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount
1

real    0m0,278s

$ time ogrinfo out_sorted_16384.parquet -spat 1818654 5546189 1818655 5546190 -al -so -json -noextent | jq .layers[0].featureCount
1

real    0m0,183s
```

* Spatial filter selecting ~ 470,000 features (over a total of 3.2 millions):
```
$ time ogrinfo out_no_bbox.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount
471147

real    0m1,957s

$ time ogrinfo out_unsorted.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount
471147

real    0m1,718s

$ time ogrinfo out_sorted.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount
471147

real    0m1,067s

$ time ogrinfo out_sorted_16384.parquet -spat 1750445 5812014 1912866 5906677 -al -so -json -noextent | jq .layers[0].featureCount
471147

real    0m1,021s
```
  • Loading branch information
rouault committed Mar 17, 2024
1 parent 6b4cc47 commit eb3d124
Show file tree
Hide file tree
Showing 11 changed files with 738 additions and 22 deletions.
100 changes: 100 additions & 0 deletions autotest/ogr/ogr_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3383,3 +3383,103 @@ def test_ogr_parquet_get_extent_3d(tmp_vsimem):
lyr = ds.GetLayer(0)
assert lyr.TestCapability(ogr.OLCFastGetExtent3D) == 0
assert lyr.GetExtent3D() == (1.0, 4.0, 2.0, 5.0, 3.0, 6.0)


###############################################################################


@gdaltest.enable_exceptions()
@pytest.mark.require_driver("GPKG")
def test_ogr_parquet_sort_by_bbox(tmp_vsimem):

outfilename = str(tmp_vsimem / "test_ogr_parquet_sort_by_bbox.parquet")
ds = ogr.GetDriverByName("Parquet").CreateDataSource(outfilename)

gpkg_drv = gdal.GetDriverByName("GPKG")
gpkg_drv.Deregister()
ROW_GROUP_SIZE = 100
try:
with pytest.raises(
Exception,
match="Driver GPKG required for SORT_BY_BBOX layer creation option",
):
ds.CreateLayer(
"test",
geom_type=ogr.wkbPoint,
options=["SORT_BY_BBOX=YES", f"ROW_GROUP_SIZE={ROW_GROUP_SIZE}"],
)
finally:
gpkg_drv.Register()

lyr = ds.CreateLayer(
"test",
geom_type=ogr.wkbPoint,
options=["SORT_BY_BBOX=YES", f"ROW_GROUP_SIZE={ROW_GROUP_SIZE}", "FID=fid"],
)
assert lyr.TestCapability(ogr.OLCFastWriteArrowBatch) == 0
lyr.CreateField(ogr.FieldDefn("i", ogr.OFTInteger))
COUNT_NON_SPATIAL = 501
COUNT_SPATIAL = 601
for i in range(COUNT_NON_SPATIAL):
f = ogr.Feature(lyr.GetLayerDefn())
f["i"] = i
lyr.CreateFeature(f)
for i in range(COUNT_SPATIAL):
f = ogr.Feature(lyr.GetLayerDefn())
f["i"] = i + COUNT_NON_SPATIAL
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt(f"POINT({i} {i})"))
lyr.CreateFeature(f)
ds = None

with gdaltest.config_option("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "YES"):
ds = ogr.Open(outfilename)
lyr = ds.GetLayer(0)
theorical_number_of_groups = (
COUNT_SPATIAL + ROW_GROUP_SIZE - 1
) // ROW_GROUP_SIZE
assert lyr.GetFeatureCount() - theorical_number_of_groups <= max(
1, 0.3 * theorical_number_of_groups
)
assert sum([f["feature_count"] for f in lyr]) == COUNT_SPATIAL

def check_file(filename):
ds = ogr.Open(filename)
lyr = ds.GetLayer(0)

# First features should be non spatial ones
for i in range(COUNT_NON_SPATIAL):
f = lyr.GetNextFeature()
assert f.GetFID() == i
assert f["i"] == i
assert f.GetGeometryRef() is None

# Now spatial features
count = 0
foundNonSequential = False
set_i = set()
while True:
f = lyr.GetNextFeature()
if not f:
break
assert f["i"] >= COUNT_NON_SPATIAL
if f["i"] != i + COUNT_NON_SPATIAL:
foundNonSequential = True
assert f["i"] not in set_i
set_i.add(f["i"])
assert f.GetFID() == f["i"]
assert f.GetGeometryRef().GetX() == f["i"] - COUNT_NON_SPATIAL
count += 1

assert count == COUNT_SPATIAL
assert foundNonSequential

check_file(outfilename)

# Check that this works also when using the Arrow interface for creation
outfilename2 = str(tmp_vsimem / "test_ogr_parquet_sort_by_bbox2.parquet")
gdal.VectorTranslate(
outfilename2,
outfilename,
layerCreationOptions=["SORT_BY_BBOX=YES", "ROW_GROUP_SIZE=100"],
)
check_file(outfilename2)
26 changes: 26 additions & 0 deletions doc/source/drivers/vector/parquet.rst
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,32 @@ Layer creation options
Whether to write xmin/ymin/xmax/ymax columns with the bounding box of
geometries.

- .. lco:: SORT_BY_BBOX
:choices: YES, NO
:default: NO
:since: 3.9

Whether features should be sorted based on the bounding box of their
geometries, before being written in the final file. Sorting them enables
faster spatial filtering on reading, by grouping together spatially close
features in the same group of rows.

Note however that enabling this option involves creating a temporary
GeoPackage file (in the same directory as the final Parquet file),
and thus requires temporary storage (possibly up to several times the size
of the final Parquet file, depending on Parquet compression) and additional
processing time.

The efficiency of spatial filtering depends on the ROW_GROUP_SIZE. If it
is too large, too many features that are not spatially close will be grouped
together. If it is too small, the file size will increase, and extra
processing time will be necessary to browse through the row groups.

Note also that when this option is enabled, the Arrow writing API (which
is for example triggered when using ogr2ogr to convert from Parquet to Parquet),
fallbacks to the generic implementation, which does not support advanced
Arrow types (lists, maps, etc.).

SQL support
-----------

Expand Down
2 changes: 1 addition & 1 deletion ogr/ogrsf_frmts/arrow/ogr_feather.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class OGRFeatherWriterLayer final : public OGRArrowWriterLayer
return m_poFileWriter != nullptr;
}
virtual void CreateWriter() override;
virtual void CloseFileWriter() override;
virtual bool CloseFileWriter() override;

virtual void CreateSchema() override;
virtual void PerformStepsBeforeFinalFlushGroup() override;
Expand Down
3 changes: 2 additions & 1 deletion ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename,
/* CloseFileWriter() */
/************************************************************************/

void OGRFeatherWriterLayer::CloseFileWriter()
bool OGRFeatherWriterLayer::CloseFileWriter()
{
auto status = m_poFileWriter->Close();
if (!status.ok())
Expand All @@ -210,6 +210,7 @@ void OGRFeatherWriterLayer::CloseFileWriter()
"FileWriter::Close() failed with %s",
status.message().c_str());
}
return status.ok();
}

/************************************************************************/
Expand Down
6 changes: 4 additions & 2 deletions ogr/ogrsf_frmts/arrow_common/ogr_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer

virtual bool IsFileWriterCreated() const = 0;
virtual void CreateWriter() = 0;
virtual void CloseFileWriter() = 0;
virtual bool CloseFileWriter() = 0;

void CreateSchemaCommon();
void FinalizeSchema();
Expand All @@ -396,7 +396,7 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer
void ClearArrayBuilers();

virtual bool FlushGroup() = 0;
void FinalizeWriting();
bool FinalizeWriting();
bool WriteArrays(std::function<bool(const std::shared_ptr<arrow::Field> &,
const std::shared_ptr<arrow::Array> &)>
postProcessArray);
Expand Down Expand Up @@ -468,6 +468,8 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer

protected:
OGRErr ICreateFeature(OGRFeature *poFeature) override;

bool FlushFeatures();
};

#endif // OGR_ARROW_H
39 changes: 28 additions & 11 deletions ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ inline OGRArrowWriterLayer::~OGRArrowWriterLayer()
/* FinalizeWriting() */
/************************************************************************/

inline void OGRArrowWriterLayer::FinalizeWriting()
inline bool OGRArrowWriterLayer::FinalizeWriting()
{
bool ret = true;

if (!IsFileWriterCreated())
{
CreateWriter();
Expand All @@ -105,10 +107,13 @@ inline void OGRArrowWriterLayer::FinalizeWriting()
PerformStepsBeforeFinalFlushGroup();

if (!m_apoBuilders.empty() && m_apoFieldsFromArrowSchema.empty())
FlushGroup();
ret = FlushGroup();

CloseFileWriter();
if (!CloseFileWriter())
ret = false;
}

return ret;
}

/************************************************************************/
Expand Down Expand Up @@ -1767,20 +1772,32 @@ inline OGRErr OGRArrowWriterLayer::ICreateFeature(OGRFeature *poFeature)
// Flush the current row group if reaching the limit of rows per group.
if (!m_apoBuilders.empty() && m_apoBuilders[0]->length() == m_nRowGroupSize)
{
if (!IsFileWriterCreated())
{
CreateWriter();
if (!IsFileWriterCreated())
return OGRERR_FAILURE;
}

if (!FlushGroup())
if (!FlushFeatures())
return OGRERR_FAILURE;
}

return OGRERR_NONE;
}

/************************************************************************/
/* FlushFeatures() */
/************************************************************************/

inline bool OGRArrowWriterLayer::FlushFeatures()
{
if (m_apoBuilders.empty() || m_apoBuilders[0]->length() == 0)
return true;

if (!IsFileWriterCreated())
{
CreateWriter();
if (!IsFileWriterCreated())
return false;
}

return FlushGroup();
}

/************************************************************************/
/* GetFeatureCount() */
/************************************************************************/
Expand Down
28 changes: 25 additions & 3 deletions ogr/ogrsf_frmts/parquet/ogr_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ class OGRParquetLayer final : public OGRParquetLayerBase
OGRFieldType &eType, OGRFieldSubType &eSubType,
std::string &osMinTmp,
std::string &osMaxTmp) const;

bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin,
int &iParquetXMax, int &iParquetYMax) const;
};

/************************************************************************/
Expand Down Expand Up @@ -282,12 +285,19 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer
bool m_bEdgesSpherical = false;
parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{};

//! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode
std::unique_ptr<GDALDataset> m_poTmpGPKG{};
//! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode
OGRLayer *m_poTmpGPKGLayer = nullptr;
//! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode
GIntBig m_nTmpFeatureCount = 0;

virtual bool IsFileWriterCreated() const override
{
return m_poFileWriter != nullptr;
}
virtual void CreateWriter() override;
virtual void CloseFileWriter() override;
virtual bool CloseFileWriter() override;

virtual void CreateSchema() override;
virtual void PerformStepsBeforeFinalFlushGroup() override;
Expand All @@ -312,14 +322,15 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer

std::string GetGeoMetadata() const;

//! Copy temporary GeoPackage layer to final Parquet file
bool CopyTmpGpkgLayerToFinalFile();

public:
OGRParquetWriterLayer(
OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool,
const std::shared_ptr<arrow::io::OutputStream> &poOutputStream,
const char *pszLayerName);

~OGRParquetWriterLayer() override;

CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override;

bool SetOptions(CSLConstList papszOptions,
Expand Down Expand Up @@ -355,10 +366,19 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer
bool IsArrowSchemaSupported(const struct ArrowSchema *schema,
CSLConstList papszOptions,
std::string &osErrorMsg) const override;
bool
CreateFieldFromArrowSchema(const struct ArrowSchema *schema,
CSLConstList papszOptions = nullptr) override;
bool WriteArrowBatch(const struct ArrowSchema *schema,
struct ArrowArray *array,
CSLConstList papszOptions = nullptr) override;
#endif

protected:
OGRErr ICreateFeature(OGRFeature *poFeature) override;

friend class OGRParquetWriterDataset;
bool Close();
};

/************************************************************************/
Expand All @@ -380,6 +400,8 @@ class OGRParquetWriterDataset final : public GDALPamDataset
return m_poMemoryPool.get();
}

CPLErr Close() override;

int GetLayerCount() override;
OGRLayer *GetLayer(int idx) override;
int TestCapability(const char *pszCap) override;
Expand Down
Loading

0 comments on commit eb3d124

Please sign in to comment.