diff --git a/docs/tutorials/multiple-texts.md b/docs/tutorials/multiple-texts.md index 2d6a2bcc8..f22197da1 100644 --- a/docs/tutorials/multiple-texts.md +++ b/docs/tutorials/multiple-texts.md @@ -102,17 +102,23 @@ There are a few issues with this approach: To efficiently perform the same operations on multiple documents at once, EDS-NLP uses [streams][edsnlp.core.stream.Stream], which record the operations to perform on the documents without actually executing them directly, similar to the way Spark does, or polars with its LazyFrame. -This allows EDS-NLP to distribute these operations on multiple cores or machines when it is time to execute them. We can configure how the collection operations are run (how many jobs/workers, how -many gpus, whether to use the spark engine) via the stream [`.set_processing(...)`][edsnlp.core.stream.Stream.set_processing] method. +This allows EDS-NLP to distribute these operations on multiple cores or machines when it is time to execute them. We can configure how the collection operations are run (how many jobs/workers, how many gpus, whether to use the spark engine) via the stream [`set_processing()`][edsnlp.core.stream.Stream.set_processing] method. For instance, ```python docs = edsnlp.data.from_iterable(corpus) -print(docs) -# +print(docs) # (1)! ``` +1. Printed version of the stream: + ``` + Stream( + reader=IterableReader(data=), + ops=[], + writer=None) + ``` + as well as any `edsnlp.data.read_*` or `edsnlp.data.from_*` return a stream, that we can iterate over or complete with more operations. To apply the model on our collection of documents, we can simply do: @@ -120,8 +126,28 @@ can simply do: docs = docs.map_pipeline(nlp) # or à la spaCy : # docs = nlp.pipe(docs) +print(docs) # (1)! ``` +1. Printed version of the stream: + ``` + Stream( + reader=IterableReader(data=), + ops=[ + map(_ensure_doc[]), + batchify(size=None, fn=None, sentinel_mode=None), + map_batches_op(), + map_batches_op(), + map_batches_op(), + map_batches_op(), + map_batches_op(), + map_batches_op(), + map_batches_op(), + unbatchify() + ], + writer=None) + ``` + ??? warning "SpaCy vs EDS-NLP" SpaCy's `nlp.pipe` method is not the same as EDS-NLP's `nlp.pipe` method, and will iterate over anything you pass to it, therefore executing the operations scheduled in our stream. @@ -174,13 +200,13 @@ df = docs.to_pandas(converter=convert_doc_to_rows) df = docs.to_pandas( converter="ents", span_getter=["ents", "dates"], - span_attributes={ + span_attributes=[ # span._.*** name: column name - "negation": "negation", - "hypothesis": "hypothesis", - "family": "family", - "date.datetime": "datetime", - }, + "negation", + "hypothesis", + "family", + "date.datetime", + ], ) ``` @@ -270,22 +296,22 @@ note_nlp = docs.to_pandas( converter="ents", # Below are the arguments to the converter span_getter=["ents", "dates"], - span_attributes={ # (1) + span_attributes=[ # (1) # span._.*** name: column name - "negation": "negation", - "hypothesis": "hypothesis", - "family": "family", - "date.datetime": "datetime", + "negation", + "hypothesis", + "family", + "date.datetime", # having individual columns for each date part # can be useful for incomplete dates (eg, "in May") - "date.day": "date_day", - "date.month": "date_month", - "date.year": "date_year", - }, + "date.day", + "date.month", + "date.year", + ], ) ``` -1. You can just pass a list if you don't want to rename the attributes. +1. You can just pass a dict if you want to explicitely rename the attributes. The result on the first note: @@ -297,10 +323,6 @@ The result on the first note: ### Locally, using multiple parallel workers -!!! warning "Caveat" - - Since workers can produce their results in any order, the order of the rows in the resulting DataFrame may not be the same as the order of the input data. - ```{ .python hl_lines="8" } # Read from a dataframe & use the omop converter docs = edsnlp.data.from_pandas(data, converter="omop") @@ -316,50 +338,65 @@ docs = docs.set_processing(backend="multiprocessing") note_nlp = docs.to_pandas( converter="ents", span_getter=["ents", "dates"], - span_attributes={ - "negation": "negation", - "hypothesis": "hypothesis", - "family": "family", - "date.datetime": "datetime", + span_attributes=[ + "negation", + "hypothesis", + "family", + "date.datetime", # having individual columns for each date part # can be useful for incomplete dates (eg, "in May") - "date.day": "date_day", - "date.month": "date_month", - "date.year": "date_year", - }, + "date.day", + "date.month", + "date.year", + ], ) ``` +!!! note "Deterministic processing" + + By default, from version 0.14.0, EDS-NLP dispatches tasks to workers in a round-robin fashion to ensure deterministic processing. This mechanism can be disabled to send documents to workers as soon as they are available, which may result in faster processing but out-of-order results. + + To disable processing determinism, use `set_processing(deterministic=False)`. Note that this parameter is only used when using the `multiprocessing` backend. + ### In a distributed fashion with spark To use the Spark engine to distribute the computation, we create our stream from the Spark dataframe directly and write the result to a new Spark dataframe. EDS-NLP will automatically distribute the operations on the cluster (setting `backend="spark"` behind the scenes), but you can change the backend (for instance to `multiprocessing` to run locally). -```{ .python hl_lines="2 9" .no-check } +!!! warning "Spark backend" + + When processing from AND to a Spark DataFrame, the backend is automatically set to "spark". + + We do NOT recommend using other backend when Spark dataframe are involved, as there may be a discrepancy between the time it takes to process the data locally and the timeout of the spark job. + +```{ .python hl_lines="2 12" .no-check } # Read from the pyspark dataframe & use the omop converter docs = edsnlp.data.from_spark(df, converter="omop") # Add the pipeline to operations that will be run -docs = docs.map_pipeline(nlp +docs = docs.map_pipeline(nlp) + +# Backend is set by default to "spark" +# docs = docs.set_processing(backend="spark") # Convert each doc to a list of dicts (one by entity) # and store the result in a pyspark DataFrame note_nlp = docs.to_spark( converter="ents", span_getter=["ents", "dates"], - span_attributes={ - "negation": "negation", - "hypothesis": "hypothesis", - "family": "family", - "date.datetime": "datetime", + span_attributes=[ + "negation", + "hypothesis", + "family", + "date.datetime", # having individual columns for each date part # can be useful for incomplete dates (eg, "in May") - "date.day": "date_day", - "date.month": "date_month", - "date.year": "date_year", - }, + "date.day", + "date.month", + "date.year", + ], dtypes=None, # (1) ) ``` diff --git a/edsnlp/core/stream.py b/edsnlp/core/stream.py index 0b4a57793..cfca4dea4 100644 --- a/edsnlp/core/stream.py +++ b/edsnlp/core/stream.py @@ -1041,10 +1041,12 @@ def validate_ops(self, ops, update: bool = False): def __repr__(self): ops_str = ",\n".join(textwrap.indent(repr(op), " ") for op in self.ops) + if ops_str: + ops_str = "\n" + ops_str + "\n " return ( f"Stream(\n" f" reader={self.reader},\n" - f" ops=[\n{ops_str}\n ],\n" + f" ops=[{ops_str}],\n" f" writer={self.writer})\n" ) diff --git a/edsnlp/data/converters.py b/edsnlp/data/converters.py index 2551e8a8b..c8c262354 100644 --- a/edsnlp/data/converters.py +++ b/edsnlp/data/converters.py @@ -107,12 +107,10 @@ def validate_kwargs(func, kwargs): class AttributesMappingArg(Validated): """ - A mapping from JSON attributes to Span extensions (can be a list too). + A span attribute mapping (can be a list too to keep the same names). For instance: - - `doc_attributes={"datetime": "note_datetime"}` will map the `datetime` JSON - attribute to the `note_datetime` extension. - `doc_attributes="note_datetime"` will map the `note_datetime` JSON attribute to the `note_datetime` extension. - `span_attributes=["negation", "family"]` will map the `negation` and `family` JSON @@ -328,7 +326,7 @@ class StandoffDoc2DictConverter: # Optional parameters span_getter={"ents": True}, - span_attributes={"negation": "negated"}, + span_attributes=["negation"], ) # or docs.to_standoff(...) if it's already a # [Stream][edsnlp.core.stream.Stream] diff --git a/edsnlp/data/standoff.py b/edsnlp/data/standoff.py index 228dcd1e9..b2ec6bca0 100644 --- a/edsnlp/data/standoff.py +++ b/edsnlp/data/standoff.py @@ -446,9 +446,8 @@ def read_standoff( ```{ .python .no-check } doc_iterator = edsnlp.data.read_standoff( "path/to/brat/directory", - # Mapping from 'BRAT attribute name' to 'Doc attribute name' - span_attributes={"Negation": "negated"}, - bool_attributes=["negated"], # Missing values will be set to False + span_attributes=["negation", "family"], + bool_attributes=["negation"], # Missing values will be set to False ) ``` diff --git a/edsnlp/pipes/misc/dates/dates.py b/edsnlp/pipes/misc/dates/dates.py index 345e25c48..3b00719a8 100644 --- a/edsnlp/pipes/misc/dates/dates.py +++ b/edsnlp/pipes/misc/dates/dates.py @@ -104,7 +104,7 @@ class DatesMatcher(BaseNERComponent): docs = docs.map_pipeline(nlp) docs = docs.to_pandas( converter="ents", - span_attributes={"date.datetime": "datetime"}, + span_attributes=["date.datetime"], ) print(docs) # note_id start end label lexical_variant span_type datetime diff --git a/edsnlp/pipes/misc/quantities/quantities.py b/edsnlp/pipes/misc/quantities/quantities.py index bc57d4d0c..ee9c1b66f 100644 --- a/edsnlp/pipes/misc/quantities/quantities.py +++ b/edsnlp/pipes/misc/quantities/quantities.py @@ -449,7 +449,7 @@ class QuantitiesMatcher(BaseNERComponent): docs = docs.map_pipeline(nlp) docs.to_pandas( converter="ents", - span_attributes={"value.unit": "original_unit", "value.kg": "kg"}, + span_attributes=["value.unit", "value.kg"], ) # note_id start end label lexical_variant span_type original_unit kg # 0 None 18 27 weight 40000,0 g ents g 40.0