Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation 805 #905

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 363 additions & 0 deletions tests/forte/data/data_pack_profiling_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
# Copyright 2019 The Forte Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
profiling test for data pack: using typical usage scenarios such as POS
tagging, NER, serialization to check for possible bottlenecks.
"""

import os
import unittest

__all__ = [
"performance_test",
]

from typing import Dict, Set, List

from forte.common.configuration import Config
from forte.common.resources import Resources

from forte.data.readers import OntonotesReader, DirPackReader, StringReader
from forte.processors.writers import PackNameJsonPackWriter

from ft.onto.base_ontology import EntityMention, Token, Sentence
from nltk import ( # type: ignore
pos_tag,
ne_chunk,
PunktSentenceTokenizer,
download,
)

from nltk.tokenize import sent_tokenize, word_tokenize

from forte.processors.base import PackProcessor
from forte.data.data_pack import DataPack

from forte import Pipeline
from nltk.tokenize.treebank import TreebankWordTokenizer


class SentenceAndTokenProcessor(PackProcessor):

def process_tokens(self, sentences, input_pack: DataPack):
"""Basic tokenization and post tagging of the sentence.
Args:
sentences: Generator object which yields sentences in document.
input_pack: input pack which needs to be modified.
J007X marked this conversation as resolved.
Show resolved Hide resolved
Returns: A mapping from SpaCy token index to Forte Token.
"""
tokens: List[Token] = []

last_sentence_word_idx = 0
for s_idx, sentence in sentences:
Sentence(input_pack, s_idx, s_idx + len(sentence))

for word in sentence:
begin_pos_word = word.idx
end_pos_word = begin_pos_word + len(word.text)
token = Token(input_pack, begin_pos_word, end_pos_word)
tokens.append(token)

return tokens

def _process(self, input_pack: DataPack):
doc = input_pack.text

sentences = sent_tokenize(doc)

# tokens = process_tokens(sentences, input_pack) # sentences, input_pack
tokens: List[Token] = []

last_sentence_word_idx = 0
s_idx = 0
for sentence in sentences:
e_idx = s_idx + len(sentence)
Sentence(input_pack, s_idx, e_idx)

last_sentence_word_idx = s_idx
for word in word_tokenize(sentence):
begin_pos_word = last_sentence_word_idx
end_pos_word = begin_pos_word + len(word)
token = Token(input_pack, begin_pos_word, end_pos_word)
last_sentence_word_idx = end_pos_word + 1
tokens.append(token)

s_idx = e_idx + 1

return tokens

def record(self, record_meta: Dict[str, Set[str]]):
r"""Method to add output type record of current processor
to :attr:`forte.data.data_pack.Meta.record`. The processor produce
different types with different settings of `processors` in config.
Args:
record_meta: the field in the data pack for type record that need to
fill in for consistency checking.
"""
record_meta["ft.onto.base_ontology.Sentence"] = set()
record_meta["ft.onto.base_ontology.Token"] = set()


class NLTKNER(PackProcessor):
r"""A wrapper of NLTK NER."""

def initialize(self, resources: Resources, configs: Config):
super().initialize(resources, configs)
download("maxent_ne_chunker")
download("words")

def __init__(self):
super().__init__()
self.token_component = None

def _process(self, input_pack: DataPack):
for sentence in input_pack.get(Sentence):
token_entries = list(
input_pack.get(
entry_type=Token,
range_annotation=sentence,
components=self.token_component,
)
)
tokens = [(token.text, token.pos) for token in token_entries]
ne_tree = ne_chunk(tokens)

index = 0
for chunk in ne_tree:
if hasattr(chunk, "label"):
# For example:
# chunk: Tree('GPE', [('New', 'NNP'), ('York', 'NNP')])
begin_pos = token_entries[index].span.begin
end_pos = token_entries[index + len(chunk) - 1].span.end
entity = EntityMention(input_pack, begin_pos, end_pos)
entity.ner_type = chunk.label()
index += len(chunk)
else:
# For example:
# chunk: ('This', 'DT')
index += 1

def record(self, record_meta: Dict[str, Set[str]]):
r"""Method to add output type record of `NLTKNER` which is
`ft.onto.base_ontology.EntityMention` with attribute `phrase_type`
to :attr:`forte.data.data_pack.Meta.record`.

Args:
record_meta: the field in the datapack for type record that need to
fill in for consistency checking.
"""
record_meta["ft.onto.base_ontology.EntityMention"] = {"ner_type"}

def expected_types_and_attributes(self):
r"""Method to add expected type ft.onto.base_ontology.Token` with
attribute `pos` and `ft.onto.base_ontology.Sentence` which
would be checked before running the processor if
the pipeline is initialized with
`enforce_consistency=True` or
:meth:`~forte.pipeline.Pipeline.enforce_consistency` was enabled for
the pipeline.
"""
return {
"ft.onto.base_ontology.Sentence": set(),
"ft.onto.base_ontology.Token": {"pos"},
}


class NLTKWordTokenizer(PackProcessor):
r"""A wrapper of NLTK word tokenizer."""

def __init__(self):
super().__init__()
self.tokenizer = TreebankWordTokenizer()

def _process(self, input_pack: DataPack):
for begin, end in self.tokenizer.span_tokenize(input_pack.text):
Token(input_pack, begin, end)

def record(self, record_meta: Dict[str, Set[str]]):
r"""Method to add output type record of `NLTKWordTokenizer`, which is
`ft.onto.base_ontology.Token`,
to :attr:`forte.data.data_pack.Meta.record`.

Args:
record_meta: the field in the datapack for type record that need to
fill in for consistency checking.
"""
record_meta["ft.onto.base_ontology.Token"] = set()


class NLTKSentenceSegmenter(PackProcessor):
r"""A wrapper of NLTK sentence tokenizer."""

def initialize(self, resources: Resources, configs: Config):
super().initialize(resources, configs)
download("punkt")

def __init__(self):
super().__init__()
self.sent_splitter = PunktSentenceTokenizer()

def _process(self, input_pack: DataPack):
for begin, end in self.sent_splitter.span_tokenize(input_pack.text):
Sentence(input_pack, begin, end)

def record(self, record_meta: Dict[str, Set[str]]):
r"""Method to add output type record of `NLTKSentenceSegmenter`, which
is `ft.onto.base_ontology.Sentence`
to :attr:`forte.data.data_pack.Meta.record`.

Args:
record_meta: the field in the datapack for type record that need to
fill in for consistency checking.
"""
record_meta["ft.onto.base_ontology.Sentence"] = set()


class NLTKPOSTagger(PackProcessor):
r"""A wrapper of NLTK pos tagger."""

def initialize(self, resources: Resources, configs: Config):
super().initialize(resources, configs)
download("averaged_perceptron_tagger")

def __init__(self):
super().__init__()
self.token_component = None

def _process(self, input_pack: DataPack):
token_entries = list(
input_pack.get(entry_type=Token, components=self.token_component)
)
token_texts = [token.text for token in token_entries]
taggings = pos_tag(token_texts)
for token, tag in zip(token_entries, taggings):
token.pos = tag[1]

def record(self, record_meta: Dict[str, Set[str]]):
r"""Method to add output type record of `NLTKPOSTagger`, which adds
attribute `pos` to `ft.onto.base_ontology.Token`
to :attr:`forte.data.data_pack.Meta.record`.

Args:
record_meta: the field in the datapack for type record that need to
fill in for consistency checking.
"""
record_meta["ft.onto.base_ontology.Token"].add("pos")

def expected_types_and_attributes(self):
r"""Method to add expected type `ft.onto.base_ontology.Token` for input
which would be checked before running the processor if
the pipeline is initialized with
`enforce_consistency=True` or
:meth:`~forte.pipeline.Pipeline.enforce_consistency` was enabled for
the pipeline.
"""
return {"ft.onto.base_ontology.Token": set()}


class NLP_Pipeline_Performance_Test(unittest.TestCase):
"""
Test performance for POS, NER tasks.
"""

def setUp(self) -> None:
self.nlp = Pipeline[DataPack]()

def testPOSTaggingNER(self, input_path: str = ""): # input_output_pair ,
"""
Verify the intermediate representation of pipeline.
"""
# input_path = (
# "...path_to_conll ... /Semantic-Role-Labeling-master/conll-formatted-ontonotes-5.0/"
# "data/conll-2012-test/data/english/annotations/bc/phoenix/00/"
# )
if len(input_path) == 0:
self.nlp.set_reader(StringReader())
input_param = (
"Forte is a data-centric ML framework. Muad Dib learned \
rapidly because his first training was in how to learn. "
"And the first lesson of all was the basic trust that he \
could learn. "
"It's shocking to find how many people do not believe they \
can learn, and how many more believe learning to be difficult."
)
else:
self.nlp.set_reader(OntonotesReader())
input_param = input_path
self.nlp.add(NLTKSentenceSegmenter()) # SentenceAndTokenProcessor
self.nlp.add(NLTKWordTokenizer())
self.nlp.add(NLTKPOSTagger())
self.nlp.add(NLTKNER())

for pack in self.nlp.initialize().process_dataset(
input_param
): # initialize().run(input_path): #: rs: #
for sentence in pack.get("ft.onto.base_ontology.Sentence"):
print("The sentence is: ", sentence.text)
print("The POS tags of the tokens are:")
for token in pack.get(Token, sentence):
print(f" {token.text}[{token.pos}]", end=" ")
print()

def testSimpleSerialization(self, input_path: str = "", output_path: str = ""):
"""
Verify the intermediate representation of pipeline.
"""
# input_path = (
# "... path_to_conll ... /Semantic-Role-Labeling-master/conll-formatted-ontonotes-5.0/"
# "data/conll-2012-test/data/english/annotations/bc/phoenix/00/"
# )
# output_path = "./test_simple_pack_output/"

if len(input_path) == 0:
self.nlp.set_reader(StringReader())
input_param = (
"Forte is a data-centric ML framework. Muad Dib learned \
rapidly because his first training was in how to learn. "
"And the first lesson of all was the basic trust that he \
could learn. "
"It's shocking to find how many people do not believe they \
can learn, and how many more believe learning to be difficult."
)
else:
self.nlp.set_reader(OntonotesReader())
input_param = input_path
self.nlp.add(
PackNameJsonPackWriter(),
{
"output_dir": output_path,
"indent": 2,
"overwrite": True,
},
)

self.nlp.run(input_param)

coref_pl = Pipeline()
coref_pl.set_reader(DirPackReader())
# coref_pl.add(MultiPackBoxer())
if len(output_path) > 0:
coref_pl.run(output_path)


def define_skip_condition(flag: str, explanation: str):
return unittest.skipUnless(
os.environ.get(flag, 0) or os.environ.get("TEST_ALL", 0),
explanation + f" Set `{flag}=1` or `TEST_ALL=1` to run.",
)


performance_test = define_skip_condition(
"TEST_PERFORMANCE", "Test the performance of Forte modules."
)