Skip to content

Commit

Permalink
docs: update training, stream and torch components docs
Browse files Browse the repository at this point in the history
  • Loading branch information
percevalw committed Nov 4, 2024
1 parent cbc9f43 commit fc1f894
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 93 deletions.
22 changes: 11 additions & 11 deletions docs/assets/templates/python/material/docstring.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@
{{ log.debug("Rendering docstring") }}
{% for section in docstring_sections %}
{% if not config.only_parameters %}
{% if section.kind.value == "text" %}
{% if section.kind.value == "text" and (not config.sections or "text" in config.sections) %}
{{ section.value|convert_markdown(heading_level - 1, html_id) }}
{% elif section.kind.value == "attributes" %}
{% elif section.kind.value == "attributes" and (not config.sections or "attributes" in config.sections) %}
{% include "docstring/attributes.html" with context %}
{% elif section.kind.value == "parameters" %}
{% elif section.kind.value == "parameters" and (not config.sections or "parameters" in config.sections) %}
{% include "docstring/parameters.html" with context %}
{% elif section.kind.value == "other parameters" %}
{% elif section.kind.value == "other parameters" and (not config.sections or "parameters" in config.sections) %}
{% include "docstring/other_parameters.html" with context %}
{% elif section.kind.value == "raises" %}
{% elif section.kind.value == "raises" and (not config.sections or "raises" in config.sections) %}
{% include "docstring/raises.html" with context %}
{% elif section.kind.value == "warns" %}
{% elif section.kind.value == "warns" and (not config.sections or "warns" in config.sections) %}
{% include "docstring/warns.html" with context %}
{% elif section.kind.value == "yields" %}
{% elif section.kind.value == "yields" and (not config.sections or "yields" in config.sections) %}
{% include "docstring/yields.html" with context %}
{% elif section.kind.value == "receives" %}
{% elif section.kind.value == "receives" and (not config.sections or "receives" in config.sections) %}
{% include "docstring/receives.html" with context %}
{% elif section.kind.value == "returns" %}
{% elif section.kind.value == "returns" and (not config.sections or "returns" in config.sections) %}
{% include "docstring/returns.html" with context %}
{% elif section.kind.value == "examples" %}
{% elif section.kind.value == "examples" and (not config.sections or "examples" in config.sections) %}
{% include "docstring/examples.html" with context %}
{% elif section.kind.value == "admonition" %}
{% elif section.kind.value == "admonition" and (not config.sections or "admonition" in config.sections) %}
{% include "docstring/admonition.html" with context %}
{% endif %}
{% elif section.kind.value == "parameters" %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% if config.only_parameters != "no-header" %}
{% if config.only_parameters != "no-header" and config.header != false %}
{{ "# Parameters\n"|convert_markdown(heading_level, html_id) }}
{% endif %}
{% if config.docstring_section_style == "table" %}
Expand Down
112 changes: 95 additions & 17 deletions docs/concepts/inference.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,20 @@ doc = nlp(text)

To leverage multiple GPUs when processing multiple documents, refer to the [multiprocessing backend][edsnlp.processing.multiprocessing.execute_multiprocessing_backend] description below.

## Inference on multiple documents {: #edsnlp.core.stream.Stream }
## Streams

When processing multiple documents, we can optimize the inference by parallelizing the computation on a single core, multiple cores and GPUs or even multiple machines.

### Streams

These optimizations are enabled by performing *lazy inference* : the operations (e.g., reading a document, converting it to a Doc, running the different pipes of a model or writing the result somewhere) are not executed immediately but are instead scheduled in a [Stream][edsnlp.core.stream.Stream] object. It can then be executed by calling the `execute` method, iterating over it or calling a writing method (e.g., `to_pandas`). In fact, data connectors like `edsnlp.data.read_json` return a stream, as well as the `nlp.pipe` method.

A stream contains :

- a `reader`: the source of the data (e.g., a file, a database, a list of strings, etc.)
- the list of operations to perform under a `pipeline` attribute containing the name if any, function / pipe, keyword arguments and context for each operation
- the list of operations to perform (`stream.ops`) that contain the function / pipe, keyword arguments and context for each operation
- an optional `writer`: the destination of the data (e.g., a file, a database, a list of strings, etc.)
- the execution `config`, containing the backend to use and its configuration such as the number of workers, the batch size, etc.

All methods (`.map`, `.map_batches`, `.map_gpu`, `.map_pipeline`, `.set_processing`) of the stream are chainable, meaning that they return a new stream object (no in-place modification).
All methods (`map()`, `map_batches()`, `map_gpu()`, `map_pipeline()`, `set_processing()`) of the stream are chainable, meaning that they return a new stream object (no in-place modification).

For instance, the following code will load a model, read a folder of JSON files, apply the model to each document and write the result in a Parquet folder, using 4 CPUs and 2 GPUs.

Expand Down Expand Up @@ -88,53 +86,133 @@ data = data.set_processing(
num_cpu_workers=4,
# 2 GPUs to accelerate deep-learning pipes
num_gpu_workers=2,
# Show the progress bar
show_progress=True,
)
# Write the result, this will execute the stream
data.write_parquet("path/to/output_folder", converter="...", write_in_worker=True)
```

### Applying operations to a stream
Streams support a variety of operations, such as applying a function to each element of the stream, batching the elements, applying a model to the elements, etc. In each case, the operations will not be executed immediately but will be scheduled to be executed when iterating of the collection, or calling the `execute()`, `to_*()` or `write_*()` methods.

### `map()` {: #edsnlp.core.stream.Stream.map }

::: edsnlp.core.stream.Stream.map
options:
sections: ['text', 'parameters']
header: false
show_source: false

To apply an operation to a stream, you can use the `.map` method. It takes a callable as input and an optional dictionary of keyword arguments. The function will be applied to each element of the collection.
### `map_batches()` {: #edsnlp.core.stream.Stream.map_batches }

To apply an operation to a stream in batches, you can use the `map_batches()` method. It takes a callable as input, an optional dictionary of keyword arguments and batching arguments.

::: edsnlp.core.stream.Stream.map_batches
options:
heading_level: 3
sections: ['text', 'parameters']
header: false
show_source: false

### `map_pipeline()` {: #edsnlp.core.stream.Stream.map_pipeline }

::: edsnlp.core.stream.Stream.map_pipeline
options:
heading_level: 3
sections: ['text', 'parameters']
header: false
show_source: false

### `map_gpu()` {: #edsnlp.core.stream.Stream.map_gpu }

::: edsnlp.core.stream.Stream.map_gpu
options:
heading_level: 3
sections: ['text', 'parameters']
header: false
show_source: false

To apply an operation to a stream in batches, you can use the `.map_batches` method. It takes a callable as input and an optional dictionary of keyword arguments. The function will be applied to each batch of the collection (as a list of elements), and should return a list of results, that will be concatenated at the end.
### `loop()` {: #edsnlp.core.stream.Stream.loop }

To apply a model, you can use the `.map_pipeline` method. It takes a model as input and will add every pipe of the model to the scheduled operations.
::: edsnlp.core.stream.Stream.loop
options:
heading_level: 3
sections: ['text', 'parameters']
header: false
show_source: false

To run a specific function on a GPU (for advanced users, otherwise `map_pipeline` should accommodate most use cases), you can use the `.map_gpu` method. It takes two or three callables as input: the first on (`prepare_batches`) takes a batch of inputs and should return some tensors that will be sent to the GPU and passed to the second callable (`forward`), which will apply the deep learning ops and return the results. The third callable (`postprocess`) and gets the batch of inputs as well as the `forward` results and should return the final results (for instance, the input documents annotated with the predictions).
### `shuffle()` {: #edsnlp.core.stream.Stream.shuffle }

In each cases, the operations will not be executed immediately but will be scheduled to be executed when iterating of the collection, or calling the `.execute`, `.to_*` or `.write_*` methods.
::: edsnlp.core.stream.Stream.shuffle
options:
heading_level: 3
sections: ['text', 'parameters']
header: false
show_source: false

### Execution of a stream {: #edsnlp.core.stream.Stream.set_processing }
### Configure the execution with `set_processing()` {: #edsnlp.core.stream.Stream.set_processing }

You can configure how the operations performed in the stream is executed by calling its `set_processing(...)` method. The following options are available :

::: edsnlp.core.stream.Stream.set_processing
options:
heading_level: 3
only_parameters: "no-header"
sections: ['text', 'parameters']
header: false
show_source: false

## Backends

### Simple backend {: #edsnlp.processing.simple.execute_simple_backend }
The `backend` parameter of the `set_processing` supports the following values:

### `simple` {: #edsnlp.processing.simple.execute_simple_backend }

::: edsnlp.processing.simple.execute_simple_backend
options:
heading_level: 3
show_source: false

### Multiprocessing backend {: #edsnlp.processing.multiprocessing.execute_multiprocessing_backend }
### `multiprocessing` {: #edsnlp.processing.multiprocessing.execute_multiprocessing_backend }

::: edsnlp.processing.multiprocessing.execute_multiprocessing_backend
options:
heading_level: 3
show_source: false

### Spark backend {: #edsnlp.processing.spark.execute_spark_backend }
### `spark` {: #edsnlp.processing.spark.execute_spark_backend }

::: edsnlp.processing.spark.execute_spark_backend
options:
heading_level: 3
show_source: false

## Batching

Many operations rely on batching, either to be more efficient or because they require a fixed-size input. The `batch_size` and `batch_by` argument of the `map_batches()` method allows you to specify the size of the batches and what function to use to compute the size of the batches.

```{ .python .no-check }
# Accumulate in chunks of 1024 documents
lengths = data.map_batches(len, batch_size=1024)
# Accumulate in chunks of 100 000 words
lengths = data.map_batches(len, batch_size=100_000, batch_by="words")
# or
lengths = data.map_batches(len, batch_size="100_000 words")
```

We also support special values for `batch_size` which use "sentinels" (i.e. markers inserted in the stream) to delimit the batches.

```{ .python .no-check }
# Accumulate every element of the input in a single batch
# which is useful when looping over the data in training
lengths = data.map_batches(len, batch_size="dataset")
# Accumulate in chunks of fragments, in the case of parquet datasets
lengths = data.map_batches(len, batch_size="fragments")
```

Note that these batch functions are only available under specific conditions:

- either `backend="simple"` or `deterministic=True` (default) if `backend="multiprocessing"`, otherwise elements might be processed out of order
- if every op before was elementwise (e.g. `map()`, `map_gpu()`, `map_pipeline()` and no generator function), or `sentinel_mode` was explicitly set to `"split"` in `map_batches()`, otherwise the sentinel are dropped by default when the user requires batching.
90 changes: 81 additions & 9 deletions docs/concepts/torch-component.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ During the training loop, when computing the loss for each component, the forwar

## Implementation example

Here is an example of a trainable component:
Here is a draft of a trainable component:

```python
from typing import Any, Dict, Iterable, Sequence
from typing import Any, Dict, Iterable, Sequence, List, Set

import torch
from tqdm import tqdm
Expand All @@ -138,12 +138,27 @@ class MyComponent(TorchComponent):
self, # A subcomponent
nlp: Pipeline,
name: str,
*,
embedding: TorchComponent,
):
super().__init__(nlp=nlp, name=name)
self.embedding = embedding

def post_init(self, gold_data: Iterable["spacy.tokens.Doc"], exclude: set):
"""
This method completes the attributes of the component, by looking at some
documents. It is especially useful to build vocabularies or detect the labels
of a classification task.
Parameters
----------
gold_data: Iterable[Doc]
The documents to use for initialization.
exclude: Set
The names of components to exclude from initialization.
This argument will be gradually updated with the names of initialized
components
"""
super().post_init(gold_data, exclude)

# Initialize the component with the gold documents
Expand All @@ -159,23 +174,61 @@ class MyComponent(TorchComponent):
# Initialize any layer that might be missing from the module
self.classifier = torch.nn.Linear(...)

def preprocess(self, doc: "spacy.tokens.Doc") -> Dict[str, Any]:
# Preprocess the doc to extract features required to run the embedding
# subcomponent, and this component
def preprocess(self, doc: "spacy.tokens.Doc", **kwargs) -> Dict[str, Any]:
"""
Preprocess the document to extract features that will be used by the
neural network and its subcomponents on to perform its predictions.
Parameters
----------
doc: Doc
Document to preprocess
Returns
-------
Dict[str, Any]
Dictionary (optionally nested) containing the features extracted from
the document.
"""
return {
"embedding": self.embedding.preprocess(doc),
"my-feature": ...,
}

def collate(self, batch) -> Dict:
# Collate the features of the "embedding" subcomponent
# and the features of this component as well
"""
Collate the batch of features into a single batch of tensors that can be
used by the forward method of the component.
Parameters
----------
batch: Dict[str, Any]
Batch of features
Returns
-------
BatchInput
Dictionary (optionally nested) containing the collated tensors
"""
return {
"embedding": self.embedding.collate(batch["embedding"]),
"my-feature": torch.as_tensor(batch["my-feature"]),
}

def forward(self, batch: Dict) -> Dict:
"""
Perform the forward pass of the neural network.
Parameters
----------
batch: BatchInput
Batch of tensors (nested dictionary) computed by the collate method
Returns
-------
BatchOutput
Dict of scores, losses, embeddings tensors, etc.
"""
# Call the embedding subcomponent
embeds = self.embedding(batch["embedding"])

Expand All @@ -185,9 +238,28 @@ class MyComponent(TorchComponent):
return output

def postprocess(
self, docs: Sequence["spacy.tokens.Doc"], output: Dict
self,
docs: Sequence["spacy.tokens.Doc"],
results: Dict,
inputs: List[Dict[str, Any]],
) -> Sequence["spacy.tokens.Doc"]:
# Annotate the docs with the outputs of the forward method
"""
Update the documents with the predictions of the neural network.
By default, this is a no-op.
Parameters
----------
docs: Sequence[Doc]
List of documents to update
results: BatchOutput
Batch of predictions, as returned by the forward method
inputs: BatchInput
List of preprocessed features, as returned by the preprocess method
Returns
-------
Sequence[Doc]
"""
...
return docs
```
Loading

0 comments on commit fc1f894

Please sign in to comment.