diff --git a/autotest/ogr/ogr_parquet.py b/autotest/ogr/ogr_parquet.py index a277424f94db..f6b3d909c585 100755 --- a/autotest/ogr/ogr_parquet.py +++ b/autotest/ogr/ogr_parquet.py @@ -3383,3 +3383,103 @@ def test_ogr_parquet_get_extent_3d(tmp_vsimem): lyr = ds.GetLayer(0) assert lyr.TestCapability(ogr.OLCFastGetExtent3D) == 0 assert lyr.GetExtent3D() == (1.0, 4.0, 2.0, 5.0, 3.0, 6.0) + + +############################################################################### + + +@gdaltest.enable_exceptions() +@pytest.mark.require_driver("GPKG") +def test_ogr_parquet_sort_by_bbox(tmp_vsimem): + + outfilename = str(tmp_vsimem / "test_ogr_parquet_sort_by_bbox.parquet") + ds = ogr.GetDriverByName("Parquet").CreateDataSource(outfilename) + + gpkg_drv = gdal.GetDriverByName("GPKG") + gpkg_drv.Deregister() + ROW_GROUP_SIZE = 100 + try: + with pytest.raises( + Exception, + match="Driver GPKG required for SORT_BY_BBOX layer creation option", + ): + ds.CreateLayer( + "test", + geom_type=ogr.wkbPoint, + options=["SORT_BY_BBOX=YES", f"ROW_GROUP_SIZE={ROW_GROUP_SIZE}"], + ) + finally: + gpkg_drv.Register() + + lyr = ds.CreateLayer( + "test", + geom_type=ogr.wkbPoint, + options=["SORT_BY_BBOX=YES", f"ROW_GROUP_SIZE={ROW_GROUP_SIZE}", "FID=fid"], + ) + assert lyr.TestCapability(ogr.OLCFastWriteArrowBatch) == 0 + lyr.CreateField(ogr.FieldDefn("i", ogr.OFTInteger)) + COUNT_NON_SPATIAL = 501 + COUNT_SPATIAL = 601 + for i in range(COUNT_NON_SPATIAL): + f = ogr.Feature(lyr.GetLayerDefn()) + f["i"] = i + lyr.CreateFeature(f) + for i in range(COUNT_SPATIAL): + f = ogr.Feature(lyr.GetLayerDefn()) + f["i"] = i + COUNT_NON_SPATIAL + f.SetGeometryDirectly(ogr.CreateGeometryFromWkt(f"POINT({i} {i})")) + lyr.CreateFeature(f) + ds = None + + with gdaltest.config_option("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "YES"): + ds = ogr.Open(outfilename) + lyr = ds.GetLayer(0) + theorical_number_of_groups = ( + COUNT_SPATIAL + ROW_GROUP_SIZE - 1 + ) // ROW_GROUP_SIZE + assert lyr.GetFeatureCount() - theorical_number_of_groups <= max( + 1, 0.3 * theorical_number_of_groups + ) + assert sum([f["feature_count"] for f in lyr]) == COUNT_SPATIAL + + def check_file(filename): + ds = ogr.Open(filename) + lyr = ds.GetLayer(0) + + # First features should be non spatial ones + for i in range(COUNT_NON_SPATIAL): + f = lyr.GetNextFeature() + assert f.GetFID() == i + assert f["i"] == i + assert f.GetGeometryRef() is None + + # Now spatial features + count = 0 + foundNonSequential = False + set_i = set() + while True: + f = lyr.GetNextFeature() + if not f: + break + assert f["i"] >= COUNT_NON_SPATIAL + if f["i"] != i + COUNT_NON_SPATIAL: + foundNonSequential = True + assert f["i"] not in set_i + set_i.add(f["i"]) + assert f.GetFID() == f["i"] + assert f.GetGeometryRef().GetX() == f["i"] - COUNT_NON_SPATIAL + count += 1 + + assert count == COUNT_SPATIAL + assert foundNonSequential + + check_file(outfilename) + + # Check that this works also when using the Arrow interface for creation + outfilename2 = str(tmp_vsimem / "test_ogr_parquet_sort_by_bbox2.parquet") + gdal.VectorTranslate( + outfilename2, + outfilename, + layerCreationOptions=["SORT_BY_BBOX=YES", "ROW_GROUP_SIZE=100"], + ) + check_file(outfilename2) diff --git a/doc/source/drivers/vector/parquet.rst b/doc/source/drivers/vector/parquet.rst index b524e7953a0f..687d83f1ddf4 100644 --- a/doc/source/drivers/vector/parquet.rst +++ b/doc/source/drivers/vector/parquet.rst @@ -123,6 +123,32 @@ Layer creation options Whether to write xmin/ymin/xmax/ymax columns with the bounding box of geometries. +- .. lco:: SORT_BY_BBOX + :choices: YES, NO + :default: NO + :since: 3.9 + + Whether features should be sorted based on the bounding box of their + geometries, before being written in the final file. Sorting them enables + faster spatial filtering on reading, by grouping together spatially close + features in the same group of rows. + + Note however that enabling this option involves creating a temporary + GeoPackage file (in the same directory as the final Parquet file), + and thus requires temporary storage (possibly up to several times the size + of the final Parquet file, depending on Parquet compression) and additional + processing time. + + The efficiency of spatial filtering depends on the ROW_GROUP_SIZE. If it + is too large, too many features that are not spatially close will be grouped + together. If it is too small, the file size will increase, and extra + processing time will be necessary to browse through the row groups. + + Note also that when this option is enabled, the Arrow writing API (which + is for example triggered when using ogr2ogr to convert from Parquet to Parquet), + fallbacks to the generic implementation, which does not support advanced + Arrow types (lists, maps, etc.). + SQL support ----------- diff --git a/ogr/ogrsf_frmts/arrow/ogr_feather.h b/ogr/ogrsf_frmts/arrow/ogr_feather.h index 59bc6b51df16..d02c73563d97 100644 --- a/ogr/ogrsf_frmts/arrow/ogr_feather.h +++ b/ogr/ogrsf_frmts/arrow/ogr_feather.h @@ -165,7 +165,7 @@ class OGRFeatherWriterLayer final : public OGRArrowWriterLayer return m_poFileWriter != nullptr; } virtual void CreateWriter() override; - virtual void CloseFileWriter() override; + virtual bool CloseFileWriter() override; virtual void CreateSchema() override; virtual void PerformStepsBeforeFinalFlushGroup() override; diff --git a/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp b/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp index 4bb9db0cf2b2..c9fa9c7f8057 100644 --- a/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp +++ b/ogr/ogrsf_frmts/arrow/ogrfeatherwriterlayer.cpp @@ -201,7 +201,7 @@ bool OGRFeatherWriterLayer::SetOptions(const std::string &osFilename, /* CloseFileWriter() */ /************************************************************************/ -void OGRFeatherWriterLayer::CloseFileWriter() +bool OGRFeatherWriterLayer::CloseFileWriter() { auto status = m_poFileWriter->Close(); if (!status.ok()) @@ -210,6 +210,7 @@ void OGRFeatherWriterLayer::CloseFileWriter() "FileWriter::Close() failed with %s", status.message().c_str()); } + return status.ok(); } /************************************************************************/ diff --git a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h index 016cc1de30d2..ea8375b9da71 100644 --- a/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h +++ b/ogr/ogrsf_frmts/arrow_common/ogr_arrow.h @@ -381,7 +381,7 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer virtual bool IsFileWriterCreated() const = 0; virtual void CreateWriter() = 0; - virtual void CloseFileWriter() = 0; + virtual bool CloseFileWriter() = 0; void CreateSchemaCommon(); void FinalizeSchema(); @@ -396,7 +396,7 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer void ClearArrayBuilers(); virtual bool FlushGroup() = 0; - void FinalizeWriting(); + bool FinalizeWriting(); bool WriteArrays(std::function &, const std::shared_ptr &)> postProcessArray); @@ -468,6 +468,8 @@ class OGRArrowWriterLayer CPL_NON_FINAL : public OGRLayer protected: OGRErr ICreateFeature(OGRFeature *poFeature) override; + + bool FlushFeatures(); }; #endif // OGR_ARROW_H diff --git a/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp b/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp index a07e88395dc6..3f203b265901 100644 --- a/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp +++ b/ogr/ogrsf_frmts/arrow_common/ograrrowwriterlayer.hpp @@ -94,8 +94,10 @@ inline OGRArrowWriterLayer::~OGRArrowWriterLayer() /* FinalizeWriting() */ /************************************************************************/ -inline void OGRArrowWriterLayer::FinalizeWriting() +inline bool OGRArrowWriterLayer::FinalizeWriting() { + bool ret = true; + if (!IsFileWriterCreated()) { CreateWriter(); @@ -105,10 +107,13 @@ inline void OGRArrowWriterLayer::FinalizeWriting() PerformStepsBeforeFinalFlushGroup(); if (!m_apoBuilders.empty() && m_apoFieldsFromArrowSchema.empty()) - FlushGroup(); + ret = FlushGroup(); - CloseFileWriter(); + if (!CloseFileWriter()) + ret = false; } + + return ret; } /************************************************************************/ @@ -1767,20 +1772,32 @@ inline OGRErr OGRArrowWriterLayer::ICreateFeature(OGRFeature *poFeature) // Flush the current row group if reaching the limit of rows per group. if (!m_apoBuilders.empty() && m_apoBuilders[0]->length() == m_nRowGroupSize) { - if (!IsFileWriterCreated()) - { - CreateWriter(); - if (!IsFileWriterCreated()) - return OGRERR_FAILURE; - } - - if (!FlushGroup()) + if (!FlushFeatures()) return OGRERR_FAILURE; } return OGRERR_NONE; } +/************************************************************************/ +/* FlushFeatures() */ +/************************************************************************/ + +inline bool OGRArrowWriterLayer::FlushFeatures() +{ + if (m_apoBuilders.empty() || m_apoBuilders[0]->length() == 0) + return true; + + if (!IsFileWriterCreated()) + { + CreateWriter(); + if (!IsFileWriterCreated()) + return false; + } + + return FlushGroup(); +} + /************************************************************************/ /* GetFeatureCount() */ /************************************************************************/ diff --git a/ogr/ogrsf_frmts/parquet/ogr_parquet.h b/ogr/ogrsf_frmts/parquet/ogr_parquet.h index 9488234a2c99..9c7e6440860d 100644 --- a/ogr/ogrsf_frmts/parquet/ogr_parquet.h +++ b/ogr/ogrsf_frmts/parquet/ogr_parquet.h @@ -197,6 +197,9 @@ class OGRParquetLayer final : public OGRParquetLayerBase OGRFieldType &eType, OGRFieldSubType &eSubType, std::string &osMinTmp, std::string &osMaxTmp) const; + + bool GeomColsBBOXParquet(int iGeom, int &iParquetXMin, int &iParquetYMin, + int &iParquetXMax, int &iParquetYMax) const; }; /************************************************************************/ @@ -282,12 +285,19 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer bool m_bEdgesSpherical = false; parquet::WriterProperties::Builder m_oWriterPropertiesBuilder{}; + //! Temporary GeoPackage dataset. Only used in SORT_BY_BBOX mode + std::unique_ptr m_poTmpGPKG{}; + //! Temporary GeoPackage layer. Only used in SORT_BY_BBOX mode + OGRLayer *m_poTmpGPKGLayer = nullptr; + //! Number of features written by ICreateFeature(). Only used in SORT_BY_BBOX mode + GIntBig m_nTmpFeatureCount = 0; + virtual bool IsFileWriterCreated() const override { return m_poFileWriter != nullptr; } virtual void CreateWriter() override; - virtual void CloseFileWriter() override; + virtual bool CloseFileWriter() override; virtual void CreateSchema() override; virtual void PerformStepsBeforeFinalFlushGroup() override; @@ -312,14 +322,15 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer std::string GetGeoMetadata() const; + //! Copy temporary GeoPackage layer to final Parquet file + bool CopyTmpGpkgLayerToFinalFile(); + public: OGRParquetWriterLayer( OGRParquetWriterDataset *poDS, arrow::MemoryPool *poMemoryPool, const std::shared_ptr &poOutputStream, const char *pszLayerName); - ~OGRParquetWriterLayer() override; - CPLErr SetMetadata(char **papszMetadata, const char *pszDomain) override; bool SetOptions(CSLConstList papszOptions, @@ -355,10 +366,19 @@ class OGRParquetWriterLayer final : public OGRArrowWriterLayer bool IsArrowSchemaSupported(const struct ArrowSchema *schema, CSLConstList papszOptions, std::string &osErrorMsg) const override; + bool + CreateFieldFromArrowSchema(const struct ArrowSchema *schema, + CSLConstList papszOptions = nullptr) override; bool WriteArrowBatch(const struct ArrowSchema *schema, struct ArrowArray *array, CSLConstList papszOptions = nullptr) override; #endif + + protected: + OGRErr ICreateFeature(OGRFeature *poFeature) override; + + friend class OGRParquetWriterDataset; + bool Close(); }; /************************************************************************/ @@ -380,6 +400,8 @@ class OGRParquetWriterDataset final : public GDALPamDataset return m_poMemoryPool.get(); } + CPLErr Close() override; + int GetLayerCount() override; OGRLayer *GetLayer(int idx) override; int TestCapability(const char *pszCap) override; diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp index 9cd1a52559f5..746032728958 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetdriver.cpp @@ -363,6 +363,124 @@ OpenParquetDatasetWithoutMetadata(const std::string &osBasePathIn, #endif +/************************************************************************/ +/* BuildMemDatasetWithRowGroupExtents() */ +/************************************************************************/ + +/** Builds a Memory dataset that contains, for each row-group of the input file, + * the feature count and spatial extent of the features of this row group, + * using Parquet statistics. This assumes that the Parquet file declares + * a "covering":{"bbox":{ ... }} metadata item. + * + * Only for debug purposes. + */ +static GDALDataset *BuildMemDatasetWithRowGroupExtents(OGRParquetLayer *poLayer) +{ + int iParquetXMin = -1; + int iParquetYMin = -1; + int iParquetXMax = -1; + int iParquetYMax = -1; + if (poLayer->GeomColsBBOXParquet(0, iParquetXMin, iParquetYMin, + iParquetXMax, iParquetYMax)) + { + auto poMemDrv = GetGDALDriverManager()->GetDriverByName("Memory"); + if (!poMemDrv) + return nullptr; + auto poMemDS = std::unique_ptr( + poMemDrv->Create("", 0, 0, 0, GDT_Unknown, nullptr)); + if (!poMemDS) + return nullptr; + OGRSpatialReference *poTmpSRS = nullptr; + const auto poSrcSRS = poLayer->GetSpatialRef(); + if (poSrcSRS) + poTmpSRS = poSrcSRS->Clone(); + auto poMemLayer = + poMemDS->CreateLayer("footprint", poTmpSRS, wkbPolygon, nullptr); + if (poTmpSRS) + poTmpSRS->Release(); + if (!poMemLayer) + return nullptr; + poMemLayer->CreateField( + std::make_unique("feature_count", OFTInteger64) + .get()); + + const auto metadata = + poLayer->GetReader()->parquet_reader()->metadata(); + const int numRowGroups = metadata->num_row_groups(); + for (int iRowGroup = 0; iRowGroup < numRowGroups; ++iRowGroup) + { + std::string osMinTmp, osMaxTmp; + OGRField unusedF; + bool unusedB; + OGRFieldSubType unusedSubType; + + OGRField sXMin; + OGR_RawField_SetNull(&sXMin); + bool bFoundXMin = false; + OGRFieldType eXMinType = OFTMaxType; + + OGRField sYMin; + OGR_RawField_SetNull(&sYMin); + bool bFoundYMin = false; + OGRFieldType eYMinType = OFTMaxType; + + OGRField sXMax; + OGR_RawField_SetNull(&sXMax); + bool bFoundXMax = false; + OGRFieldType eXMaxType = OFTMaxType; + + OGRField sYMax; + OGR_RawField_SetNull(&sYMax); + bool bFoundYMax = false; + OGRFieldType eYMaxType = OFTMaxType; + + if (poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetXMin, nullptr, + /* bComputeMin = */ true, sXMin, bFoundXMin, + /* bComputeMax = */ false, unusedF, unusedB, eXMinType, + unusedSubType, osMinTmp, osMaxTmp) && + bFoundXMin && eXMinType == OFTReal && + poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetYMin, nullptr, + /* bComputeMin = */ true, sYMin, bFoundYMin, + /* bComputeMax = */ false, unusedF, unusedB, eYMinType, + unusedSubType, osMinTmp, osMaxTmp) && + bFoundYMin && eYMinType == OFTReal && + poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetXMax, nullptr, + /* bComputeMin = */ false, unusedF, unusedB, + /* bComputeMax = */ true, sXMax, bFoundXMax, eXMaxType, + unusedSubType, osMaxTmp, osMaxTmp) && + bFoundXMax && eXMaxType == OFTReal && + poLayer->GetMinMaxForParquetCol( + iRowGroup, iParquetYMax, nullptr, + /* bComputeMin = */ false, unusedF, unusedB, + /* bComputeMax = */ true, sYMax, bFoundYMax, eYMaxType, + unusedSubType, osMaxTmp, osMaxTmp) && + bFoundYMax && eYMaxType == OFTReal) + { + OGRFeature oFeat(poMemLayer->GetLayerDefn()); + oFeat.SetField(0, + static_cast( + metadata->RowGroup(iRowGroup)->num_rows())); + auto poPoly = std::make_unique(); + auto poLR = std::make_unique(); + poLR->addPoint(sXMin.Real, sYMin.Real); + poLR->addPoint(sXMin.Real, sYMax.Real); + poLR->addPoint(sXMax.Real, sYMax.Real); + poLR->addPoint(sXMax.Real, sYMin.Real); + poLR->addPoint(sXMin.Real, sYMin.Real); + poPoly->addRingDirectly(poLR.release()); + oFeat.SetGeometryDirectly(poPoly.release()); + CPL_IGNORE_RET_VAL(poMemLayer->CreateFeature(&oFeat)); + } + } + + return poMemDS.release(); + } + return nullptr; +} + /************************************************************************/ /* Open() */ /************************************************************************/ @@ -533,6 +651,14 @@ static GDALDataset *OGRParquetDriverOpen(GDALOpenInfo *poOpenInfo) auto poLayer = std::make_unique( poDS.get(), CPLGetBasename(osFilename.c_str()), std::move(arrow_reader), poOpenInfo->papszOpenOptions); + + // For debug purposes: return a layer with the extent of each row group + if (CPLTestBool( + CPLGetConfigOption("OGR_PARQUET_SHOW_ROW_GROUP_EXTENT", "NO"))) + { + return BuildMemDatasetWithRowGroupExtents(poLayer.get()); + } + poDS->SetLayer(std::move(poLayer)); return poDS.release(); } @@ -749,6 +875,16 @@ void OGRParquetDriver::InitMetadata() "geometries"); } + { + auto psOption = CPLCreateXMLNode(oTree.get(), CXT_Element, "Option"); + CPLAddXMLAttributeAndValue(psOption, "name", "SORT_BY_BBOX"); + CPLAddXMLAttributeAndValue(psOption, "type", "boolean"); + CPLAddXMLAttributeAndValue(psOption, "default", "NO"); + CPLAddXMLAttributeAndValue(psOption, "description", + "Whether features should be sorted based on " + "the bounding box of their geometries"); + } + char *pszXML = CPLSerializeXMLTree(oTree.get()); GDALDriver::SetMetadataItem(GDAL_DS_LAYER_CREATIONOPTIONLIST, pszXML); CPLFree(pszXML); diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp index e98d00474bb8..d8c092bb1ffe 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetlayer.cpp @@ -2547,3 +2547,28 @@ bool OGRParquetLayer::GetMinMaxForParquetCol( return bFoundMin || bFoundMax; } + +/************************************************************************/ +/* GeomColsBBOXParquet() */ +/************************************************************************/ + +/** Return for a given geometry column (iGeom: in [0, GetGeomFieldCount()-1] range), + * the Parquet column number of the corresponding xmin,ymin,xmax,ymax bounding + * box columns, if existing. + */ +bool OGRParquetLayer::GeomColsBBOXParquet(int iGeom, int &iParquetXMin, + int &iParquetYMin, int &iParquetXMax, + int &iParquetYMax) const +{ + const auto oIter = m_oMapGeomFieldIndexToGeomColBBOXParquet.find(iGeom); + const bool bFound = + (oIter != m_oMapGeomFieldIndexToGeomColBBOXParquet.end()); + if (bFound) + { + iParquetXMin = oIter->second.iParquetXMin; + iParquetYMin = oIter->second.iParquetYMin; + iParquetXMax = oIter->second.iParquetXMax; + iParquetYMax = oIter->second.iParquetYMax; + } + return bFound; +} diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp index 545b45a420b8..f970c001a243 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetwriterdataset.cpp @@ -41,6 +41,29 @@ OGRParquetWriterDataset::OGRParquetWriterDataset( { } +/************************************************************************/ +/* Close() */ +/************************************************************************/ + +CPLErr OGRParquetWriterDataset::Close() +{ + CPLErr eErr = CE_None; + if (nOpenFlags != OPEN_FLAGS_CLOSED) + { + if (m_poLayer && !m_poLayer->Close()) + { + eErr = CE_Failure; + } + + if (GDALPamDataset::Close() != CE_None) + { + eErr = CE_Failure; + } + } + + return eErr; +} + /************************************************************************/ /* GetLayerCount() */ /************************************************************************/ diff --git a/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp b/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp index 860810e8a4d4..43b496932c5d 100644 --- a/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp +++ b/ogr/ogrsf_frmts/parquet/ogrparquetwriterlayer.cpp @@ -35,6 +35,8 @@ #include "ogr_wkb.h" +#include + /************************************************************************/ /* OGRParquetWriterLayer() */ /************************************************************************/ @@ -51,13 +53,235 @@ OGRParquetWriterLayer::OGRParquetWriterLayer( } /************************************************************************/ -/* ~OGRParquetWriterLayer() */ +/* Close() */ /************************************************************************/ -OGRParquetWriterLayer::~OGRParquetWriterLayer() +bool OGRParquetWriterLayer::Close() { + if (m_poTmpGPKGLayer) + { + if (!CopyTmpGpkgLayerToFinalFile()) + return false; + } + if (m_bInitializationOK) - FinalizeWriting(); + { + if (!FinalizeWriting()) + return false; + } + + return true; +} + +/************************************************************************/ +/* CopyTmpGpkgLayerToFinalFile() */ +/************************************************************************/ + +bool OGRParquetWriterLayer::CopyTmpGpkgLayerToFinalFile() +{ + if (!m_poTmpGPKGLayer) + { + return true; + } + + CPLDebug("PARQUET", "CopyTmpGpkgLayerToFinalFile(): start..."); + + VSIUnlink(m_poTmpGPKG->GetDescription()); + + OGRFeature oFeat(m_poFeatureDefn); + + // Interval in terms of features between 2 debug progress report messages + constexpr int PROGRESS_FC_INTERVAL = 100 * 1000; + + // First, write features without geometries + { + auto poTmpLayer = std::unique_ptr(m_poTmpGPKG->ExecuteSQL( + "SELECT serialized_feature FROM tmp WHERE fid NOT IN (SELECT id " + "FROM rtree_tmp_geom)", + nullptr, nullptr)); + if (!poTmpLayer) + return false; + for (const auto &poSrcFeature : poTmpLayer.get()) + { + int nBytesFeature = 0; + const GByte *pabyFeatureData = + poSrcFeature->GetFieldAsBinary(0, &nBytesFeature); + if (!oFeat.DeserializeFromBinary(pabyFeatureData, nBytesFeature)) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot deserialize feature"); + return false; + } + if (OGRArrowWriterLayer::ICreateFeature(&oFeat) != OGRERR_NONE) + { + return false; + } + + if ((m_nFeatureCount % PROGRESS_FC_INTERVAL) == 0) + { + CPLDebugProgress( + "PARQUET", + "CopyTmpGpkgLayerToFinalFile(): %.02f%% progress", + 100.0 * double(m_nFeatureCount) / + double(m_nTmpFeatureCount)); + } + } + + if (!FlushFeatures()) + { + return false; + } + } + + // Now walk through the GPKG RTree for features with geometries + // Cf https://github.com/sqlite/sqlite/blob/master/ext/rtree/rtree.c + // for the description of the content of the rtree _node table + std::vector> aNodeNoDepthPair; + int nTreeDepth = 0; + // Queue the root node + aNodeNoDepthPair.emplace_back( + std::make_pair(/* nodeNo = */ 1, /* depth = */ 0)); + int nCountWrittenFeaturesSinceLastFlush = 0; + while (!aNodeNoDepthPair.empty()) + { + const auto &oLastPair = aNodeNoDepthPair.back(); + const int64_t nNodeNo = oLastPair.first; + const int nCurDepth = oLastPair.second; + //CPLDebug("PARQUET", "Reading nodeNode=%d, curDepth=%d", int(nNodeNo), nCurDepth); + aNodeNoDepthPair.pop_back(); + + auto poRTreeLayer = std::unique_ptr(m_poTmpGPKG->ExecuteSQL( + CPLSPrintf("SELECT data FROM rtree_tmp_geom_node WHERE nodeno " + "= " CPL_FRMT_GIB, + static_cast(nNodeNo)), + nullptr, nullptr)); + if (!poRTreeLayer) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot read node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + const auto poRTreeFeature = + std::unique_ptr(poRTreeLayer->GetNextFeature()); + if (!poRTreeFeature) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot read node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + + int nNodeBytes = 0; + const GByte *pabyNodeData = + poRTreeFeature->GetFieldAsBinary(0, &nNodeBytes); + constexpr int BLOB_HEADER_SIZE = 4; + if (nNodeBytes < BLOB_HEADER_SIZE) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Not enough bytes when reading node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + if (nNodeNo == 1) + { + // Get the RTree depth from the root node + nTreeDepth = (pabyNodeData[0] << 8) | pabyNodeData[1]; + //CPLDebug("PARQUET", "nTreeDepth = %d", nTreeDepth); + } + + const int nCellCount = (pabyNodeData[2] << 8) | pabyNodeData[3]; + constexpr int SIZEOF_CELL = 24; // int64_t + 4 float + if (nNodeBytes < BLOB_HEADER_SIZE + SIZEOF_CELL * nCellCount) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Not enough bytes when reading node " CPL_FRMT_GIB, + static_cast(nNodeNo)); + return false; + } + + size_t nOffset = BLOB_HEADER_SIZE; + if (nCurDepth == nTreeDepth) + { + // Leaf node: it references feature IDs. + + // If we are about to go above m_nRowGroupSize, flush past + // features now, to improve the spatial compacity of the row group. + if (m_nRowGroupSize > nCellCount && + nCountWrittenFeaturesSinceLastFlush + nCellCount > + m_nRowGroupSize) + { + nCountWrittenFeaturesSinceLastFlush = 0; + if (!FlushFeatures()) + { + return false; + } + } + + for (int i = 0; i < nCellCount; ++i) + { + int64_t nFID; + memcpy(&nFID, pabyNodeData + nOffset, sizeof(int64_t)); + CPL_MSBPTR64(&nFID); + + const auto poSrcFeature = std::unique_ptr( + m_poTmpGPKGLayer->GetFeature(nFID)); + if (!poSrcFeature) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot get feature " CPL_FRMT_GIB, + static_cast(nFID)); + return false; + } + + int nBytesFeature = 0; + const GByte *pabyFeatureData = + poSrcFeature->GetFieldAsBinary(0, &nBytesFeature); + if (!oFeat.DeserializeFromBinary(pabyFeatureData, + nBytesFeature)) + { + CPLError(CE_Failure, CPLE_AppDefined, + "Cannot deserialize feature"); + return false; + } + if (OGRArrowWriterLayer::ICreateFeature(&oFeat) != OGRERR_NONE) + { + return false; + } + + nOffset += SIZEOF_CELL; + + ++nCountWrittenFeaturesSinceLastFlush; + + if ((m_nFeatureCount % PROGRESS_FC_INTERVAL) == 0 || + m_nFeatureCount == m_nTmpFeatureCount / 2) + { + CPLDebugProgress( + "PARQUET", + "CopyTmpGpkgLayerToFinalFile(): %.02f%% progress", + 100.0 * double(m_nFeatureCount) / + double(m_nTmpFeatureCount)); + } + } + } + else + { + // Non-leaf node: it references child nodes. + for (int i = 0; i < nCellCount; ++i) + { + int64_t nNode; + memcpy(&nNode, pabyNodeData + nOffset, sizeof(int64_t)); + CPL_MSBPTR64(&nNode); + aNodeNoDepthPair.emplace_back( + std::make_pair(nNode, nCurDepth + 1)); + nOffset += SIZEOF_CELL; + } + } + } + + CPLDebug("PARQUET", + "CopyTmpGpkgLayerToFinalFile(): 100%%, successfully finished"); + return true; } /************************************************************************/ @@ -99,6 +323,33 @@ bool OGRParquetWriterLayer::SetOptions(CSLConstList papszOptions, papszOptions, "WRITE_COVERING_BBOX", CPLGetConfigOption("OGR_PARQUET_WRITE_COVERING_BBOX", "YES"))); + if (CPLTestBool(CSLFetchNameValueDef(papszOptions, "SORT_BY_BBOX", "NO"))) + { + const std::string osTmpGPKG(std::string(m_poDataset->GetDescription()) + + ".tmp.gpkg"); + auto poGPKGDrv = GetGDALDriverManager()->GetDriverByName("GPKG"); + if (!poGPKGDrv) + { + CPLError( + CE_Failure, CPLE_AppDefined, + "Driver GPKG required for SORT_BY_BBOX layer creation option"); + return false; + } + m_poTmpGPKG.reset(poGPKGDrv->Create(osTmpGPKG.c_str(), 0, 0, 0, + GDT_Unknown, nullptr)); + if (!m_poTmpGPKG) + return false; + m_poTmpGPKG->MarkSuppressOnClose(); + m_poTmpGPKGLayer = m_poTmpGPKG->CreateLayer("tmp"); + if (!m_poTmpGPKGLayer) + return false; + // Serialized feature + m_poTmpGPKGLayer->CreateField( + std::make_unique("serialized_feature", OFTBinary) + .get()); + CPL_IGNORE_RET_VAL(m_poTmpGPKGLayer->StartTransaction()); + } + const char *pszGeomEncoding = CSLFetchNameValue(papszOptions, "GEOMETRY_ENCODING"); m_eGeomEncoding = OGRArrowGeomEncoding::WKB; @@ -234,7 +485,7 @@ bool OGRParquetWriterLayer::SetOptions(CSLConstList papszOptions, /* CloseFileWriter() */ /************************************************************************/ -void OGRParquetWriterLayer::CloseFileWriter() +bool OGRParquetWriterLayer::CloseFileWriter() { auto status = m_poFileWriter->Close(); if (!status.ok()) @@ -243,6 +494,7 @@ void OGRParquetWriterLayer::CloseFileWriter() "FileWriter::Close() failed with %s", status.message().c_str()); } + return status.ok(); } /************************************************************************/ @@ -740,6 +992,68 @@ void OGRParquetWriterLayer::CreateWriter() &m_poKeyValueMetadata)); } +/************************************************************************/ +/* ICreateFeature() */ +/************************************************************************/ + +OGRErr OGRParquetWriterLayer::ICreateFeature(OGRFeature *poFeature) +{ + // If not using SORT_BY_BBOX=YES layer creation option, we can directly + // write features to the final Parquet file + if (!m_poTmpGPKGLayer) + return OGRArrowWriterLayer::ICreateFeature(poFeature); + + // SORT_BY_BBOX=YES case: we write for now a serialized version of poFeature + // in a temporary GeoPackage file. + + GIntBig nFID = poFeature->GetFID(); + if (!m_osFIDColumn.empty() && nFID == OGRNullFID) + { + nFID = m_nTmpFeatureCount; + poFeature->SetFID(nFID); + } + ++m_nTmpFeatureCount; + + std::vector abyBuffer; + // Serialize the source feature as a single array of bytes to preserve it + // fully + if (!poFeature->SerializeToBinary(abyBuffer)) + { + return OGRERR_FAILURE; + } + + // SQLite3 limitation: a row must fit in slightly less than 1 GB. + constexpr int SOME_MARGIN = 128; + if (abyBuffer.size() > 1024 * 1024 * 1024 - SOME_MARGIN) + { + CPLError(CE_Failure, CPLE_NotSupported, + "Features larger than 1 GB are not supported"); + return OGRERR_FAILURE; + } + + OGRFeature oFeat(m_poTmpGPKGLayer->GetLayerDefn()); + oFeat.SetFID(nFID); + oFeat.SetField(0, static_cast(abyBuffer.size()), abyBuffer.data()); + const auto poSrcGeom = poFeature->GetGeometryRef(); + if (poSrcGeom && !poSrcGeom->IsEmpty()) + { + // For the purpose of building an RTree, just use the bounding box of + // the geometry as the geometry. + OGREnvelope sEnvelope; + poSrcGeom->getEnvelope(&sEnvelope); + auto poPoly = std::make_unique(); + auto poLR = std::make_unique(); + poLR->addPoint(sEnvelope.MinX, sEnvelope.MinY); + poLR->addPoint(sEnvelope.MinX, sEnvelope.MaxY); + poLR->addPoint(sEnvelope.MaxX, sEnvelope.MaxY); + poLR->addPoint(sEnvelope.MaxX, sEnvelope.MinY); + poLR->addPoint(sEnvelope.MinX, sEnvelope.MinY); + poPoly->addRingDirectly(poLR.release()); + oFeat.SetGeometryDirectly(poPoly.release()); + } + return m_poTmpGPKGLayer->CreateFeature(&oFeat); +} + /************************************************************************/ /* FlushGroup() */ /************************************************************************/ @@ -832,6 +1146,15 @@ OGRParquetWriterLayer::WriteArrowBatch(const struct ArrowSchema *schema, struct ArrowArray *array, CSLConstList papszOptions) { + if (m_poTmpGPKGLayer) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. Hence we fallback + // to the OGRLayer base implementation, which will ultimately call + // OGRParquetWriterLayer::ICreateFeature() + return OGRLayer::WriteArrowBatch(schema, array, papszOptions); + } + return WriteArrowBatchInternal( schema, array, papszOptions, [this](const std::shared_ptr &poBatch) @@ -869,9 +1192,40 @@ inline int OGRParquetWriterLayer::TestCapability(const char *pszCap) if (EQUAL(pszCap, OLCFastWriteArrowBatch)) return false; #endif + + if (m_poTmpGPKGLayer && EQUAL(pszCap, OLCFastWriteArrowBatch)) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. So this is not + // fast + return false; + } + return OGRArrowWriterLayer::TestCapability(pszCap); } +/************************************************************************/ +/* CreateFieldFromArrowSchema() */ +/************************************************************************/ + +#if PARQUET_VERSION_MAJOR > 10 +bool OGRParquetWriterLayer::CreateFieldFromArrowSchema( + const struct ArrowSchema *schema, CSLConstList papszOptions) +{ + if (m_poTmpGPKGLayer) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. But this process + // only supports the base Arrow types supported by + // OGRLayer::WriteArrowBatch() + return OGRLayer::CreateFieldFromArrowSchema(schema, papszOptions); + } + + return OGRArrowWriterLayer::CreateFieldFromArrowSchema(schema, + papszOptions); +} +#endif + /************************************************************************/ /* IsArrowSchemaSupported() */ /************************************************************************/ @@ -881,6 +1235,16 @@ bool OGRParquetWriterLayer::IsArrowSchemaSupported( const struct ArrowSchema *schema, CSLConstList papszOptions, std::string &osErrorMsg) const { + if (m_poTmpGPKGLayer) + { + // When using SORT_BY_BBOX=YES option, we can't directly write the + // input array, because we need to sort features. But this process + // only supports the base Arrow types supported by + // OGRLayer::WriteArrowBatch() + return OGRLayer::IsArrowSchemaSupported(schema, papszOptions, + osErrorMsg); + } + if (schema->format[0] == 'e' && schema->format[1] == 0) { osErrorMsg = "float16 not supported";