From 6918b400d3cf6c9151db2104137afe2c52dd68e4 Mon Sep 17 00:00:00 2001 From: lindsay stevens Date: Sat, 30 Nov 2024 20:40:43 +1100 Subject: [PATCH] chg: performance and memory usage improvements - the main problem being addressed is that SurveyElement had a lot of fields which are not relevant to every subtype, and many of these fields are containers, which wastes time and memory. - For example an Option (which represents a choice) would get about 60 attributes (now ~8) - most of which empty strings but many are dicts or lists. For large forms this can really stack up and consume a lot of memory. A lot of code was not tolerant of None as a default value so all the fields were initialised. - The dynamic class approach would have made more sense in the earlier days of XLSForm when the columns and behaviour were changing rapidly and there were not many fields. But now there are more features, and specs, and extensive test suites that make it useful to use a more clearly typed class approach. - Having concrete types also makes development easier, by having missing attribute or type warnings. When looking at the git history, clearly it was also confusing about what is a field - for example "jr:count" (repeat_count) is part of the bind dict, not a top-level attribute of a Section. - The changes in this commit could be taken further, for example: - some SurveyElement funcs relate to certain types and could be moved to those types, or a mixin / intermediate subclass. i.e. a fair bit of awkward "hasattr" or "ininstance" remains. - the builder.py module externalises the initialisation of the SurveyElements but perhaps this could be done by SurveyElements - other issues addressed: - avoid copying dicts unnecessarily - use __slots__ on all SurveyElement types to reduce memory usage - do "or other" question/choice insertion in xls2xform instead of the builder module, so that row-specific errors can be shown, and the choice lists are finalised sooner - potential compatibility issues: - minimal tests fixes were required for these changes, so any issues would be more for projects using pyxform internal classes or APIs. - removed some SurveyElement fields that seem to not be used at all, and SurveyElement classes will not have all 60+ fields anymore. - any extra data passed in as kwargs that is not a declared attribute is put into a new `extra_data` dict, and is excluded from `to_json_dict` output. The `extra_data` gets used when specifying extra choices columns, to generate choices output for that data. --- pyxform/builder.py | 194 ++++------- pyxform/constants.py | 2 + pyxform/entities/entity_declaration.py | 18 +- pyxform/external_instance.py | 17 +- pyxform/question.py | 358 +++++++++++++++----- pyxform/section.py | 117 +++++-- pyxform/survey.py | 377 +++++++++++++++------- pyxform/survey_element.py | 303 ++++++++--------- pyxform/utils.py | 27 +- pyxform/xls2json.py | 46 ++- tests/test_builder.py | 8 +- tests/test_j2x_creation.py | 49 +-- tests/test_j2x_question.py | 6 +- tests/test_j2x_xform_build_preparation.py | 4 +- 14 files changed, 969 insertions(+), 557 deletions(-) diff --git a/pyxform/builder.py b/pyxform/builder.py index df0801fb..86675f14 100644 --- a/pyxform/builder.py +++ b/pyxform/builder.py @@ -2,10 +2,9 @@ Survey builder functionality. """ -import copy import os from collections import defaultdict -from typing import TYPE_CHECKING, Any, Union +from typing import Any from pyxform import constants as const from pyxform import file_utils, utils @@ -15,6 +14,7 @@ from pyxform.question import ( InputQuestion, MultipleChoiceQuestion, + Option, OsmUploadQuestion, Question, RangeQuestion, @@ -24,15 +24,9 @@ from pyxform.question_type_dictionary import QUESTION_TYPE_DICT from pyxform.section import GroupedSection, RepeatingSection from pyxform.survey import Survey +from pyxform.survey_element import SurveyElement from pyxform.xls2json import SurveyReader -if TYPE_CHECKING: - from pyxform.survey_element import SurveyElement - -OR_OTHER_CHOICE = { - const.NAME: "other", - const.LABEL: "Other", -} QUESTION_CLASSES = { "": Question, "action": Question, @@ -64,8 +58,6 @@ def __init__(self, **kwargs): self.setvalues_by_triggering_ref = defaultdict(list) # dictionary of setgeopoint target and value tuple indexed by triggering element self.setgeopoint_by_triggering_ref = defaultdict(list) - # For tracking survey-level choices while recursing through the survey. - self._choices: dict[str, Any] = {} def set_sections(self, sections): """ @@ -78,39 +70,28 @@ def set_sections(self, sections): self._sections = sections def create_survey_element_from_dict( - self, d: dict[str, Any], deep_copy: bool = True - ) -> Union["SurveyElement", list["SurveyElement"]]: + self, d: dict[str, Any], choices: dict[str, tuple[Option, ...]] | None = None + ) -> SurveyElement | list[SurveyElement]: """ Convert from a nested python dictionary/array structure (a json dict I call it because it corresponds directly with a json object) to a survey object :param d: data to use for constructing SurveyElements. - :param deep_copy: Take a copy.deepcopy() of the input data, to avoid changing the - original objects during processing. All methods private to this Builder class - should use "False" since the data is already copied by its public methods. """ - # TODO: ideally avoid copying entirely, but at least try to only copy once. - if deep_copy: - d = copy.deepcopy(d) - if "add_none_option" in d: self._add_none_option = d["add_none_option"] if d[const.TYPE] in SECTION_CLASSES: - if d[const.TYPE] == const.SURVEY: - self._choices = d.get(const.CHOICES, {}) - - section = self._create_section_from_dict(d) + section = self._create_section_from_dict(d=d, choices=choices) if d[const.TYPE] == const.SURVEY: section.setvalues_by_triggering_ref = self.setvalues_by_triggering_ref section.setgeopoint_by_triggering_ref = self.setgeopoint_by_triggering_ref - section.choices = self._choices return section elif d[const.TYPE] == const.LOOP: - return self._create_loop_from_dict(d) + return self._create_loop_from_dict(d=d, choices=choices) elif d[const.TYPE] == "include": section_name = d[const.NAME] if section_name not in self._sections: @@ -120,16 +101,19 @@ def create_survey_element_from_dict( self._sections.keys(), ) d = self._sections[section_name] - full_survey = self.create_survey_element_from_dict(d=d, deep_copy=False) + full_survey = self.create_survey_element_from_dict(d=d, choices=choices) return full_survey.children - elif d[const.TYPE] in ["xml-external", "csv-external"]: + elif d[const.TYPE] in {"xml-external", "csv-external"}: return ExternalInstance(**d) elif d[const.TYPE] == "entity": return EntityDeclaration(**d) else: self._save_trigger(d=d) return self._create_question_from_dict( - d, QUESTION_TYPE_DICT, self._add_none_option + d=d, + question_type_dictionary=QUESTION_TYPE_DICT, + add_none_option=self._add_none_option, + choices=choices, ) def _save_trigger(self, d: dict) -> None: @@ -149,69 +133,35 @@ def _create_question_from_dict( d: dict[str, Any], question_type_dictionary: dict[str, Any], add_none_option: bool = False, - ) -> Question | list[Question]: + choices: dict[str, tuple[Option, ...]] | None = None, + ) -> Question | tuple[Question, ...]: question_type_str = d[const.TYPE] # TODO: Keep add none option? if add_none_option and question_type_str.startswith(const.SELECT_ALL_THAT_APPLY): SurveyElementBuilder._add_none_option_to_select_all_that_apply(d) - # Handle or_other on select type questions - or_other_len = len(const.SELECT_OR_OTHER_SUFFIX) - if question_type_str.endswith(const.SELECT_OR_OTHER_SUFFIX): - question_type_str = question_type_str[: len(question_type_str) - or_other_len] - d[const.TYPE] = question_type_str - SurveyElementBuilder._add_other_option_to_multiple_choice_question(d) - return [ - SurveyElementBuilder._create_question_from_dict( - d, question_type_dictionary, add_none_option - ), - SurveyElementBuilder._create_specify_other_question_from_dict(d), - ] - question_class = SurveyElementBuilder._get_question_class( question_type_str, question_type_dictionary ) - # todo: clean up this spaghetti code - d["question_type_dictionary"] = question_type_dictionary if question_class: - return question_class(**d) - - return [] - - @staticmethod - def _add_other_option_to_multiple_choice_question(d: dict[str, Any]) -> None: - # ideally, we'd just be pulling from children - choice_list = d.get(const.CHOICES, d.get(const.CHILDREN, [])) - if len(choice_list) <= 0: - raise PyXFormError("There should be choices for this question.") - if not any(c[const.NAME] == OR_OTHER_CHOICE[const.NAME] for c in choice_list): - choice_list.append(SurveyElementBuilder._get_or_other_choice(choice_list)) + if const.CHOICES in d and choices: + return question_class( + question_type_dictionary=question_type_dictionary, + choices=choices.get(d[const.ITEMSET], d[const.CHOICES]), + **{k: v for k, v in d.items() if k != const.CHOICES}, + ) + else: + return question_class( + question_type_dictionary=question_type_dictionary, **d + ) - @staticmethod - def _get_or_other_choice( - choice_list: list[dict[str, Any]], - ) -> dict[str, str | dict]: - """ - If the choices have any translations, return an OR_OTHER choice for each lang. - """ - if any(isinstance(c.get(const.LABEL), dict) for c in choice_list): - langs = { - lang - for c in choice_list - for lang in c[const.LABEL] - if isinstance(c.get(const.LABEL), dict) - } - return { - const.NAME: OR_OTHER_CHOICE[const.NAME], - const.LABEL: {lang: OR_OTHER_CHOICE[const.LABEL] for lang in langs}, - } - return OR_OTHER_CHOICE + return () @staticmethod def _add_none_option_to_select_all_that_apply(d_copy): - choice_list = d_copy.get(const.CHOICES, d_copy.get(const.CHILDREN, [])) + choice_list = d_copy.get(const.CHOICES, d_copy.get(const.CHILDREN, ())) if len(choice_list) <= 0: raise PyXFormError("There should be choices for this question.") none_choice = {const.NAME: "none", const.LABEL: "None"} @@ -232,81 +182,65 @@ def _get_question_class(question_type_str, question_type_dictionary): and find what class it maps to going through type_dictionary -> QUESTION_CLASSES """ - question_type = question_type_dictionary.get(question_type_str, {}) - control_dict = question_type.get(const.CONTROL, {}) - control_tag = control_dict.get("tag", "") - if control_tag == "upload" and control_dict.get("mediatype") == "osm/*": - control_tag = "osm" + control_tag = "" + question_type = question_type_dictionary.get(question_type_str) + if question_type: + control_dict = question_type.get(const.CONTROL) + if control_dict: + control_tag = control_dict.get("tag") + if control_tag == "upload" and control_dict.get("mediatype") == "osm/*": + control_tag = "osm" return QUESTION_CLASSES[control_tag] - @staticmethod - def _create_specify_other_question_from_dict(d: dict[str, Any]) -> InputQuestion: - kwargs = { - const.TYPE: "text", - const.NAME: f"{d[const.NAME]}_other", - const.LABEL: "Specify other.", - const.BIND: {"relevant": f"selected(../{d[const.NAME]}, 'other')"}, - } - return InputQuestion(**kwargs) - - def _create_section_from_dict(self, d): - children = d.pop(const.CHILDREN, []) + def _create_section_from_dict( + self, d: dict[str, Any], choices: dict[str, tuple[Option, ...]] | None = None + ) -> Survey | GroupedSection | RepeatingSection: + children = d.get(const.CHILDREN) section_class = SECTION_CLASSES[d[const.TYPE]] if d[const.TYPE] == const.SURVEY and const.TITLE not in d: d[const.TITLE] = d[const.NAME] result = section_class(**d) - for child in children: - # Deep copying the child is a hacky solution to the or_other bug. - # I don't know why it works. - # And I hope it doesn't break something else. - # I think the good solution would be to rewrite this class. - survey_element = self.create_survey_element_from_dict( - d=child, deep_copy=False - ) - if child[const.TYPE].endswith(const.SELECT_OR_OTHER_SUFFIX): - select_question = survey_element[0] - itemset_choices = self._choices.get(select_question[const.ITEMSET], None) - if ( - itemset_choices is not None - and isinstance(itemset_choices, list) - and not any( - c[const.NAME] == OR_OTHER_CHOICE[const.NAME] - for c in itemset_choices + if children: + for child in children: + if isinstance(result, Survey): + survey_element = self.create_survey_element_from_dict( + d=child, choices=result.choices + ) + else: + survey_element = self.create_survey_element_from_dict( + d=child, choices=choices ) - ): - itemset_choices.append(self._get_or_other_choice(itemset_choices)) - # This is required for builder_tests.BuilderTests.test_loop to pass. - self._add_other_option_to_multiple_choice_question(d=child) - if survey_element: result.add_children(survey_element) return result - def _create_loop_from_dict(self, d): + def _create_loop_from_dict( + self, d: dict[str, Any], choices: dict[str, tuple[Option, ...]] | None = None + ): """ Takes a json_dict of "loop" type Returns a GroupedSection """ - children = d.pop(const.CHILDREN, []) - columns = d.pop(const.COLUMNS, []) + children = d.get(const.CHILDREN) result = GroupedSection(**d) # columns is a left over from when this was # create_table_from_dict, I will need to clean this up - for column_dict in columns: + for column_dict in d.get(const.COLUMNS, ()): # If this is a none option for a select all that apply # question then we should skip adding it to the result if column_dict[const.NAME] == "none": continue - column = GroupedSection(**column_dict) - for child in children: - question_dict = self._name_and_label_substitutions(child, column_dict) - question = self.create_survey_element_from_dict( - d=question_dict, deep_copy=False - ) - column.add_child(question) + column = GroupedSection(type=const.GROUP, **column_dict) + if children is not None: + for child in children: + question_dict = self._name_and_label_substitutions(child, column_dict) + question = self.create_survey_element_from_dict( + d=question_dict, choices=choices + ) + column.add_child(question) result.add_child(column) if result.name != "": return result @@ -318,7 +252,7 @@ def _create_loop_from_dict(self, d): def _name_and_label_substitutions(question_template, column_headers): # if the label in column_headers has multiple languages setup a # dictionary by language to do substitutions. - info_by_lang = {} + info_by_lang = None if isinstance(column_headers[const.LABEL], dict): info_by_lang = { lang: { @@ -335,7 +269,7 @@ def _name_and_label_substitutions(question_template, column_headers): elif isinstance(result[key], dict): result[key] = result[key].copy() for key2 in result[key].keys(): - if isinstance(column_headers[const.LABEL], dict): + if info_by_lang and isinstance(column_headers[const.LABEL], dict): result[key][key2] %= info_by_lang.get(key2, column_headers) else: result[key][key2] %= column_headers @@ -344,7 +278,7 @@ def _name_and_label_substitutions(question_template, column_headers): def create_survey_element_from_json(self, str_or_path): d = utils.get_pyobj_from_json(str_or_path) # Loading JSON creates a new dictionary structure so no need to re-copy. - return self.create_survey_element_from_dict(d=d, deep_copy=False) + return self.create_survey_element_from_dict(d=d) def create_survey_element_from_dict(d, sections=None): diff --git a/pyxform/constants.py b/pyxform/constants.py index 21d59bdb..c33323e9 100644 --- a/pyxform/constants.py +++ b/pyxform/constants.py @@ -169,3 +169,5 @@ class EntityColumns(StrEnum): "xmlns:orx": "http://openrosa.org/xforms", "xmlns:odk": "http://www.opendatakit.org/xforms", } +SUPPORTED_MEDIA_TYPES = {"image", "big-image", "audio", "video"} +OR_OTHER_CHOICE = {NAME: "other", LABEL: "Other"} diff --git a/pyxform/entities/entity_declaration.py b/pyxform/entities/entity_declaration.py index 18ae84c6..ed046003 100644 --- a/pyxform/entities/entity_declaration.py +++ b/pyxform/entities/entity_declaration.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING from pyxform import constants as const -from pyxform.survey_element import SurveyElement +from pyxform.survey_element import SURVEY_ELEMENT_FIELDS, SurveyElement from pyxform.utils import node if TYPE_CHECKING: @@ -9,6 +9,11 @@ EC = const.EntityColumns +ENTITY_EXTRA_FIELDS = ( + const.TYPE, + const.PARAMETERS, +) +ENTITY_FIELDS = (*SURVEY_ELEMENT_FIELDS, *ENTITY_EXTRA_FIELDS) class EntityDeclaration(SurveyElement): @@ -29,6 +34,17 @@ class EntityDeclaration(SurveyElement): 0 1 1 error, need id to update """ + __slots__ = ENTITY_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return ENTITY_FIELDS + + def __init__(self, name: str, type: str, parameters: dict, **kwargs): + self.parameters: dict = parameters + self.type: str = type + super().__init__(name=name, **kwargs) + def xml_instance(self, **kwargs): parameters = self.get(const.PARAMETERS, {}) diff --git a/pyxform/external_instance.py b/pyxform/external_instance.py index 4688b4de..50301ddb 100644 --- a/pyxform/external_instance.py +++ b/pyxform/external_instance.py @@ -4,13 +4,28 @@ from typing import TYPE_CHECKING -from pyxform.survey_element import SurveyElement +from pyxform import constants +from pyxform.survey_element import SURVEY_ELEMENT_FIELDS, SurveyElement if TYPE_CHECKING: from pyxform.survey import Survey +EXTERNAL_INSTANCE_EXTRA_FIELDS = (constants.TYPE,) +EXTERNAL_INSTANCE_FIELDS = (*SURVEY_ELEMENT_FIELDS, *EXTERNAL_INSTANCE_EXTRA_FIELDS) + + class ExternalInstance(SurveyElement): + __slots__ = EXTERNAL_INSTANCE_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return EXTERNAL_INSTANCE_FIELDS + + def __init__(self, name: str, type: str, **kwargs): + self.type: str = type + super().__init__(name=name, **kwargs) + def xml_control(self, survey: "Survey"): """ No-op since there is no associated form control to place under . diff --git a/pyxform/question.py b/pyxform/question.py index 1b7c30b4..211626d8 100644 --- a/pyxform/question.py +++ b/pyxform/question.py @@ -3,8 +3,11 @@ """ import os.path +from collections.abc import Iterable +from itertools import chain from typing import TYPE_CHECKING +from pyxform import constants from pyxform.constants import ( EXTERNAL_CHOICES_ITEMSET_REF_LABEL, EXTERNAL_CHOICES_ITEMSET_REF_LABEL_GEOJSON, @@ -14,7 +17,7 @@ ) from pyxform.errors import PyXFormError from pyxform.question_type_dictionary import QUESTION_TYPE_DICT -from pyxform.survey_element import SurveyElement +from pyxform.survey_element import SURVEY_ELEMENT_FIELDS, SurveyElement from pyxform.utils import ( PYXFORM_REFERENCE_REGEX, DetachableElement, @@ -27,15 +30,115 @@ from pyxform.survey import Survey +QUESTION_EXTRA_FIELDS = ( + "_itemset_dyn_label", + "_itemset_has_media", + "_itemset_multi_language", + "_qtd_defaults", + "_qtd_kwargs", + "action", + "default", + "guidance_hint", + "instance", + "query", + "sms_field", + "trigger", + constants.BIND, + constants.CHOICE_FILTER, + constants.COMPACT_TAG, # used for compact (sms) representation + constants.CONTROL, + constants.HINT, + constants.MEDIA, + constants.PARAMETERS, + constants.TYPE, +) +QUESTION_FIELDS = (*SURVEY_ELEMENT_FIELDS, *QUESTION_EXTRA_FIELDS) + +SELECT_QUESTION_EXTRA_FIELDS = ( + constants.CHILDREN, + constants.ITEMSET, + constants.LIST_NAME_U, +) +SELECT_QUESTION_FIELDS = (*QUESTION_FIELDS, *SELECT_QUESTION_EXTRA_FIELDS) + +OSM_QUESTION_EXTRA_FIELDS = (constants.CHILDREN,) +OSM_QUESTION_FIELDS = (*QUESTION_FIELDS, *SELECT_QUESTION_EXTRA_FIELDS) + +OPTION_EXTRA_FIELDS = ( + "_choice_itext_id", + constants.MEDIA, + "sms_option", +) +OPTION_FIELDS = (*SURVEY_ELEMENT_FIELDS, *OPTION_EXTRA_FIELDS) + +TAG_EXTRA_FIELDS = (constants.CHILDREN,) +TAG_FIELDS = (*SURVEY_ELEMENT_FIELDS, *TAG_EXTRA_FIELDS) + + class Question(SurveyElement): - FIELDS = SurveyElement.FIELDS.copy() - FIELDS.update( - { - "_itemset_multi_language": bool, - "_itemset_has_media": bool, - "_itemset_dyn_label": bool, - } - ) + __slots__ = QUESTION_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return QUESTION_FIELDS + + def __init__(self, fields: tuple[str, ...] | None = None, **kwargs): + # Internals + self._qtd_defaults: dict | None = None + self._qtd_kwargs: dict | None = None + + # Structure + self.action: dict[str, str] | None = None + self.bind: dict | None = None + self.control: dict | None = None + self.instance: dict | None = None + self.media: dict | None = None + self.type: str | None = None + + # Common / template settings + self.choice_filter: str | None = None + self.default: str | None = None + self.guidance_hint: str | dict | None = None + self.hint: str | dict | None = None + # constraint_message, required_message are placed in bind dict. + self.parameters: dict | None = None + self.query: str | None = None + self.trigger: str | None = None + + # SMS / compact settings + self.compact_tag: str | None = None + self.sms_field: str | None = None + + qtd = kwargs.pop("question_type_dictionary", QUESTION_TYPE_DICT) + type_arg = kwargs.get("type") + default_type = qtd.get(type_arg) + if default_type is None: + raise PyXFormError(f"Unknown question type '{type_arg}'.") + + # Keeping original qtd_kwargs is only needed if output of QTD data is not + # acceptable in to_json_dict() i.e. to exclude default bind/control values. + self._qtd_defaults = qtd.get(type_arg) + qtd_kwargs = None + for k, v in self._qtd_defaults.items(): + if isinstance(v, dict): + template = v.copy() + if k in kwargs: + template.update(kwargs[k]) + if qtd_kwargs is None: + qtd_kwargs = {} + qtd_kwargs[k] = kwargs[k] + kwargs[k] = template + elif k not in kwargs: + kwargs[k] = v + + if qtd_kwargs: + self._qtd_kwargs = qtd_kwargs + + if fields is None: + fields = QUESTION_EXTRA_FIELDS + else: + fields = chain(QUESTION_EXTRA_FIELDS, fields) + super().__init__(fields=fields, **kwargs) def validate(self): SurveyElement.validate(self) @@ -46,10 +149,12 @@ def validate(self): raise PyXFormError(f"Unknown question type '{self.type}'.") def xml_instance(self, survey: "Survey", **kwargs): - attributes = {} - attributes.update(self.get("instance", {})) - for key, value in attributes.items(): - attributes[key] = survey.insert_xpaths(value, self) + attributes = self.get("instance") + if attributes is None: + attributes = {} + else: + for key, value in attributes.items(): + attributes[key] = survey.insert_xpaths(value, self) if self.get("default") and not default_is_dynamic(self.default, self.type): return node(self.name, str(self.get("default")), **attributes) @@ -57,7 +162,15 @@ def xml_instance(self, survey: "Survey", **kwargs): def xml_control(self, survey: "Survey"): if self.type == "calculate" or ( - ("calculate" in self.bind or self.trigger) and not (self.label or self.hint) + ( + ( + hasattr(self, "bind") + and self.bind is not None + and "calculate" in self.bind + ) + or self.trigger + ) + and not (self.label or self.hint) ): nested_setvalues = survey.get_trigger_values_for_question_name( self.name, "setvalue" @@ -93,6 +206,19 @@ def xml_control(self, survey: "Survey"): return xml_node + def xml_action(self): + """ + Return the action for this survey element. + """ + if self.action: + return node( + self.action["name"], + ref=self.get_xpath(), + **{k: v for k, v in self.action.items() if k != "name"}, + ) + + return None + def nest_set_nodes(self, survey, xml_node, tag, nested_items): for item in nested_items: node_attrs = { @@ -107,6 +233,19 @@ def nest_set_nodes(self, survey, xml_node, tag, nested_items): def build_xml(self, survey: "Survey") -> DetachableElement | None: return None + def to_json_dict(self, delete_keys: Iterable[str] | None = None) -> dict: + to_delete = (k for k in self.get_slot_names() if k.startswith("_")) + if self._qtd_defaults: + to_delete = chain(to_delete, self._qtd_defaults) + if delete_keys is not None: + to_delete = chain(to_delete, delete_keys) + result = super().to_json_dict(delete_keys=to_delete) + if self._qtd_kwargs: + for k, v in self._qtd_kwargs.items(): + if v: + result[k] = v + return result + class InputQuestion(Question): """ @@ -125,15 +264,17 @@ def build_xml(self, survey: "Survey"): result = node(**control_dict) if label_and_hint: for element in self.xml_label_and_hint(survey=survey): - result.appendChild(element) + if element: + result.appendChild(element) # Input types are used for selects with external choices sheets. if self["query"]: - choice_filter = self.get("choice_filter") - query = "instance('" + self["query"] + "')/root/item" - choice_filter = survey.insert_xpaths(choice_filter, self, True) - if choice_filter: - query += "[" + choice_filter + "]" + choice_filter = self.get(constants.CHOICE_FILTER) + if choice_filter is not None: + pred = survey.insert_xpaths(choice_filter, self, True) + query = f"""instance('{self["query"]}')/root/item[{pred}]""" + else: + query = f"""instance('{self["query"]}')/root/item""" result.setAttribute("query", query) return result @@ -163,6 +304,26 @@ def build_xml(self, survey: "Survey"): class Option(SurveyElement): + __slots__ = OPTION_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return OPTION_FIELDS + + def __init__( + self, + name: str, + label: str | dict | None = None, + media: dict | None = None, + sms_option: str | None = None, + **kwargs, + ): + self._choice_itext_id: str | None = None + self.media: dict | None = media + self.sms_option: str | None = sms_option + + super().__init__(name=name, label=label, **kwargs) + def xml_value(self): return node("value", self.name) @@ -180,65 +341,85 @@ def xml_control(self, survey: "Survey"): raise NotImplementedError() def _translation_path(self, display_element): - choice_itext_id = self.get("_choice_itext_id") - if choice_itext_id is not None: - return choice_itext_id + if self._choice_itext_id is not None: + return self._choice_itext_id return super()._translation_path(display_element=display_element) + def to_json_dict(self, delete_keys: Iterable[str] | None = None) -> dict: + to_delete = (k for k in self.get_slot_names() if k.startswith("_")) + if delete_keys is not None: + to_delete = chain(to_delete, delete_keys) + return super().to_json_dict(delete_keys=to_delete) + class MultipleChoiceQuestion(Question): - def __init__(self, **kwargs): + __slots__ = SELECT_QUESTION_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return SELECT_QUESTION_FIELDS + + def __init__( + self, itemset: str | None = None, list_name: str | None = None, **kwargs + ): + # Internals + self._itemset_dyn_label: bool = False + self._itemset_has_media: bool = False + self._itemset_multi_language: bool = False + + # Structure + self.children: tuple[Option, ...] | None = None + self.itemset: str | None = itemset + self.list_name: str | None = list_name + # Notice that choices can be specified under choices or children. # I'm going to try to stick to just choices. # Aliases in the json format will make it more difficult # to use going forward. - kwargs["children"] = [ - Option(**c) - for c in combine_lists( - a=kwargs.pop("choices", None), b=kwargs.pop("children", None) + choices = combine_lists( + a=kwargs.pop(constants.CHOICES, None), b=kwargs.pop(constants.CHILDREN, None) + ) + if choices: + self.children = tuple( + c if isinstance(c, Option) else Option(**c) for c in choices ) - ] super().__init__(**kwargs) def validate(self): Question.validate(self) - descendants = self.iter_descendants() - next(descendants) # iter_descendants includes self; we need to pop it - - for choice in descendants: - choice.validate() + if self.children: + for child in self.children: + child.validate() def build_xml(self, survey: "Survey"): - if self.bind["type"] not in ["string", "odk:rank"]: + if self.bind["type"] not in {"string", "odk:rank"}: raise PyXFormError("""Invalid value for `self.bind["type"]`.""") - control_dict = self.control.copy() + # Resolve field references in attributes - for key, value in control_dict.items(): - control_dict[key] = survey.insert_xpaths(value, self) + control_dict = { + key: survey.insert_xpaths(value, self) for key, value in self.control.items() + } control_dict["ref"] = self.get_xpath() result = node(**control_dict) for element in self.xml_label_and_hint(survey=survey): - result.appendChild(element) + if element: + result.appendChild(element) # itemset are only supposed to be strings, # check to prevent the rare dicts that show up if self["itemset"] and isinstance(self["itemset"], str): - choice_filter = self.get("choice_filter") - itemset, file_extension = os.path.splitext(self["itemset"]) - itemset_value_ref = self.parameters.get( - "value", - EXTERNAL_CHOICES_ITEMSET_REF_VALUE_GEOJSON - if file_extension == ".geojson" - else EXTERNAL_CHOICES_ITEMSET_REF_VALUE, - ) - itemset_label_ref = self.parameters.get( - "label", - EXTERNAL_CHOICES_ITEMSET_REF_LABEL_GEOJSON - if file_extension == ".geojson" - else EXTERNAL_CHOICES_ITEMSET_REF_LABEL, - ) + + if file_extension == ".geojson": + itemset_value_ref = EXTERNAL_CHOICES_ITEMSET_REF_VALUE_GEOJSON + itemset_label_ref = EXTERNAL_CHOICES_ITEMSET_REF_LABEL_GEOJSON + else: + itemset_value_ref = EXTERNAL_CHOICES_ITEMSET_REF_VALUE + itemset_label_ref = EXTERNAL_CHOICES_ITEMSET_REF_LABEL + if hasattr(self, "parameters") and self.parameters is not None: + itemset_value_ref = self.parameters.get("value", itemset_value_ref) + itemset_label_ref = self.parameters.get("label", itemset_label_ref) multi_language = self.get("_itemset_multi_language", False) has_media = self.get("_itemset_has_media", False) @@ -255,9 +436,11 @@ def build_xml(self, survey: "Survey"): itemset = self["itemset"] itemset_label_ref = "jr:itext(itextId)" - choice_filter = survey.insert_xpaths( - choice_filter, self, True, is_previous_question - ) + choice_filter = self.get(constants.CHOICE_FILTER) + if choice_filter is not None: + choice_filter = survey.insert_xpaths( + choice_filter, self, True, is_previous_question + ) if is_previous_question: path = ( survey.insert_xpaths(self["itemset"], self, reference_parent=True) @@ -277,10 +460,10 @@ def build_xml(self, survey: "Survey"): name = path[-1] choice_filter = f"./{name} != ''" else: - nodeset = "instance('" + itemset + "')/root/item" + nodeset = f"instance('{itemset}')/root/item" if choice_filter: - nodeset += "[" + choice_filter + "]" + nodeset += f"[{choice_filter}]" if self["parameters"]: params = self["parameters"] @@ -305,34 +488,38 @@ def build_xml(self, survey: "Survey"): node("label", ref=itemset_label_ref), ] result.appendChild(node("itemset", *itemset_children, nodeset=nodeset)) - else: + elif self.children: for child in self.children: result.appendChild(child.xml(survey=survey)) return result -class SelectOneQuestion(MultipleChoiceQuestion): - def __init__(self, **kwargs): - self._dict[self.TYPE] = "select one" - super().__init__(**kwargs) +class Tag(SurveyElement): + __slots__ = TAG_EXTRA_FIELDS + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return TAG_FIELDS -class Tag(SurveyElement): - def __init__(self, **kwargs): - kwargs["children"] = [ - Option(**c) - for c in combine_lists( - a=kwargs.pop("choices", None), b=kwargs.pop("children", None) + def __init__(self, name: str, label: str | dict | None = None, **kwargs): + self.children: tuple[Option, ...] | None = None + + choices = combine_lists( + a=kwargs.pop(constants.CHOICES, None), b=kwargs.pop(constants.CHILDREN, None) + ) + if choices: + self.children = tuple( + c if isinstance(c, Option) else Option(**c) for c in choices ) - ] - super().__init__(**kwargs) + super().__init__(name=name, label=label, **kwargs) def xml(self, survey: "Survey"): result = node("tag", key=self.name) result.appendChild(self.xml_label(survey=survey)) - for choice in self.children: - result.appendChild(choice.xml(survey=survey)) + if self.children: + for choice in self.children: + result.appendChild(choice.xml(survey=survey)) return result @@ -344,13 +531,21 @@ def xml_control(self, survey: "Survey"): class OsmUploadQuestion(UploadQuestion): + __slots__ = OSM_QUESTION_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return OSM_QUESTION_FIELDS + def __init__(self, **kwargs): - kwargs["children"] = [ - Tag(**c) - for c in combine_lists( - a=kwargs.pop("tags", None), b=kwargs.pop("children", None) - ) - ] + self.children: tuple[Option, ...] | None = None + + choices = combine_lists( + a=kwargs.pop("tags", None), b=kwargs.pop(constants.CHILDREN, None) + ) + if choices: + self.children = tuple(Tag(**c) for c in choices) + super().__init__(**kwargs) def build_xml(self, survey: "Survey"): @@ -359,8 +554,9 @@ def build_xml(self, survey: "Survey"): control_dict["mediatype"] = self._get_media_type() result = node("upload", *self.xml_label_and_hint(survey=survey), **control_dict) - for osm_tag in self.children: - result.appendChild(osm_tag.xml(survey=survey)) + if self.children: + for osm_tag in self.children: + result.appendChild(osm_tag.xml(survey=survey)) return result diff --git a/pyxform/section.py b/pyxform/section.py index b91fea59..2111980c 100644 --- a/pyxform/section.py +++ b/pyxform/section.py @@ -2,19 +2,76 @@ Section survey element module. """ -from collections.abc import Generator +from collections.abc import Generator, Iterable +from itertools import chain from typing import TYPE_CHECKING +from pyxform import constants from pyxform.errors import PyXFormError from pyxform.external_instance import ExternalInstance -from pyxform.survey_element import SurveyElement +from pyxform.survey_element import SURVEY_ELEMENT_FIELDS, SurveyElement from pyxform.utils import DetachableElement, node if TYPE_CHECKING: + from pyxform.question import Question from pyxform.survey import Survey +SECTION_EXTRA_FIELDS = ( + constants.BIND, + constants.CHILDREN, + constants.CONTROL, + constants.HINT, + constants.MEDIA, + constants.TYPE, + "instance", + "flat", + "sms_field", +) +SECTION_FIELDS = (*SURVEY_ELEMENT_FIELDS, *SECTION_EXTRA_FIELDS) + + class Section(SurveyElement): + __slots__ = SECTION_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return SECTION_FIELDS + + def __init__( + self, + name: str, + type: str, + label: str | dict | None = None, + hint: str | dict | None = None, + bind: dict | None = None, + control: dict | None = None, + instance: dict | None = None, + media: dict | None = None, + flat: bool | None = None, + sms_field: str | None = None, + fields: tuple[str, ...] | None = None, + **kwargs, + ): + # Structure + self.bind: dict | None = bind + self.children: list[Section | Question] | None = None + self.control: dict | None = control + # instance is for custom instance attrs from survey e.g. instance::abc:xyz + self.instance: dict | None = instance + # TODO: is media valid for groups? No tests for it, but it behaves like Questions. + self.media: dict | None = media + self.type: str | None = type + + # Group settings are generally put in bind/control dicts. + self.hint: str | dict | None = hint + self.flat: bool | None = flat + self.sms_field: str | None = sms_field + + # Recursively creating child objects currently handled by the builder module. + kwargs.pop(constants.CHILDREN, None) + super().__init__(name=name, label=label, fields=fields, **kwargs) + def validate(self): super().validate() for element in self.children: @@ -42,7 +99,8 @@ def xml_instance(self, survey: "Survey", **kwargs): attributes = {} attributes.update(kwargs) - attributes.update(self.get("instance", {})) + if self.instance: + attributes.update(self.instance) # Resolve field references in attributes for key, value in attributes.items(): attributes[key] = survey.insert_xpaths(value, self) @@ -50,7 +108,7 @@ def xml_instance(self, survey: "Survey", **kwargs): for child in self.children: repeating_template = None - if child.get("flat"): + if hasattr(child, "flat") and child.get("flat"): for grandchild in child.xml_instance_array(survey=survey): result.appendChild(grandchild) elif isinstance(child, ExternalInstance): @@ -82,7 +140,7 @@ def xml_instance_array(self, survey: "Survey"): This method is used for generating flat instances. """ for child in self.children: - if child.get("flat"): + if hasattr(child, "flat") and child.get("flat"): yield from child.xml_instance_array(survey=survey) else: yield child.xml_instance(survey=survey) @@ -113,11 +171,15 @@ def xml_control(self, survey: "Survey"): """ - control_dict = self.control.copy() # Resolve field references in attributes - for key, value in control_dict.items(): - control_dict[key] = survey.insert_xpaths(value, self) - repeat_node = node("repeat", nodeset=self.get_xpath(), **control_dict) + if self.control: + control_dict = { + key: survey.insert_xpaths(value, self) + for key, value in self.control.items() + } + repeat_node = node("repeat", nodeset=self.get_xpath(), **control_dict) + else: + repeat_node = node("repeat", nodeset=self.get_xpath()) for n in Section.xml_control(self, survey=survey): repeat_node.appendChild(n) @@ -128,7 +190,10 @@ def xml_control(self, survey: "Survey"): label = self.xml_label(survey=survey) if label: return node("group", label, repeat_node, ref=self.get_xpath()) - return node("group", repeat_node, ref=self.get_xpath(), **self.control) + if self.control: + return node("group", repeat_node, ref=self.get_xpath(), **self.control) + else: + return node("group", repeat_node, ref=self.get_xpath()) # Get setvalue nodes for all descendants of this repeat that have dynamic defaults and aren't nested in other repeats. def _dynamic_defaults_helper( @@ -165,38 +230,38 @@ class GroupedSection(Section): # super(GroupedSection, self).__init__(kwargs) def xml_control(self, survey: "Survey"): - control_dict = self.control - - if control_dict.get("bodyless"): + if self.control and self.control.get("bodyless"): return None children = [] - attributes = {} - attributes.update(self.control) # Resolve field references in attributes - for key, value in attributes.items(): - attributes[key] = survey.insert_xpaths(value, self) + if self.control: + attributes = { + key: survey.insert_xpaths(value, self) + for key, value in self.control.items() + } + if "appearance" in self.control: + attributes["appearance"] = self.control["appearance"] + else: + attributes = {} if not self.get("flat"): attributes["ref"] = self.get_xpath() - if "label" in self and len(self["label"]) > 0: + if "label" in self and self.label is not None and len(self["label"]) > 0: children.append(self.xml_label(survey=survey)) for n in Section.xml_control(self, survey=survey): children.append(n) - if "appearance" in control_dict: - attributes["appearance"] = control_dict["appearance"] - - if "intent" in control_dict: - attributes["intent"] = survey.insert_xpaths(control_dict["intent"], self) - return node("group", *children, **attributes) - def to_json_dict(self): + def to_json_dict(self, delete_keys: Iterable[str] | None = None) -> dict: + to_delete = (constants.BIND,) + if delete_keys is not None: + to_delete = chain(to_delete, delete_keys) + result = super().to_json_dict(delete_keys=to_delete) # This is quite hacky, might want to think about a smart way # to approach this problem. - result = super().to_json_dict() result["type"] = "group" return result diff --git a/pyxform/survey.py b/pyxform/survey.py index ddcc5d28..a45ca14c 100644 --- a/pyxform/survey.py +++ b/pyxform/survey.py @@ -7,20 +7,22 @@ import tempfile import xml.etree.ElementTree as ETree from collections import defaultdict -from collections.abc import Generator +from collections.abc import Generator, Iterable from datetime import datetime from functools import lru_cache +from itertools import chain from pathlib import Path from pyxform import aliases, constants from pyxform.constants import EXTERNAL_INSTANCE_EXTENSIONS, NSMAP +from pyxform.entities.entity_declaration import EntityDeclaration from pyxform.errors import PyXFormError, ValidationError from pyxform.external_instance import ExternalInstance from pyxform.instance import SurveyInstance from pyxform.parsing import instance_expression -from pyxform.question import Option, Question -from pyxform.section import Section -from pyxform.survey_element import SurveyElement +from pyxform.question import MultipleChoiceQuestion, Option, Question, Tag +from pyxform.section import SECTION_EXTRA_FIELDS, Section +from pyxform.survey_element import SURVEY_ELEMENT_FIELDS, SurveyElement from pyxform.utils import ( BRACKETED_TAG_REGEX, LAST_SAVED_INSTANCE_NAME, @@ -74,7 +76,7 @@ def register_nsmap(): register_nsmap() -@lru_cache(maxsize=65536) # 2^16 +@lru_cache(maxsize=128) def is_parent_a_repeat(survey, xpath): """ Returns the XPATH of the first repeat of the given xpath in the survey, @@ -84,14 +86,14 @@ def is_parent_a_repeat(survey, xpath): if not parent_xpath: return False - for item in survey.iter_descendants(): + for item in survey.iter_descendants(condition=lambda i: isinstance(i, Section)): if item.type == constants.REPEAT and item.get_xpath() == parent_xpath: return parent_xpath return is_parent_a_repeat(survey, parent_xpath) -@lru_cache(maxsize=65536) # 2^16 +@lru_cache(maxsize=128) def share_same_repeat_parent(survey, xpath, context_xpath, reference_parent=False): """ Returns a tuple of the number of steps from the context xpath to the shared @@ -169,7 +171,7 @@ def _get_steps_and_target_xpath(context_parent, xpath_parent, include_parent=Fal return (None, None) -@lru_cache(maxsize=65536) # 2^16 +@lru_cache(maxsize=128) def is_label_dynamic(label: str) -> bool: return ( label is not None @@ -182,44 +184,117 @@ def recursive_dict(): return defaultdict(recursive_dict) +SURVEY_EXTRA_FIELDS = ( + "_created", + "_search_lists", + "_translations", + "_xpath", + "add_none_option", + "clean_text_values", + "attribute", + "auto_delete", + "auto_send", + "choices", + "default_language", + "file_name", + "id_string", + "instance_name", + "instance_xmlns", + "namespaces", + "omit_instanceID", + "public_key", + "setgeopoint_by_triggering_ref", + "setvalues_by_triggering_ref", + "sms_allow_media", + "sms_date_format", + "sms_datetime_format", + "sms_keyword", + "sms_response", + "sms_separator", + "style", + "submission_url", + "title", + "version", + constants.ALLOW_CHOICE_DUPLICATES, + constants.COMPACT_DELIMITER, + constants.COMPACT_PREFIX, + constants.ENTITY_FEATURES, +) +SURVEY_FIELDS = (*SURVEY_ELEMENT_FIELDS, *SECTION_EXTRA_FIELDS, *SURVEY_EXTRA_FIELDS) + + class Survey(Section): """ Survey class - represents the full XForm XML. """ - FIELDS = Section.FIELDS.copy() - FIELDS.update( - { - "_xpath": dict, - "_created": datetime.now, # This can't be dumped to json - "setvalues_by_triggering_ref": dict, - "setgeopoint_by_triggering_ref": dict, - "title": str, - "id_string": str, - "sms_keyword": str, - "sms_separator": str, - "sms_allow_media": bool, - "sms_date_format": str, - "sms_datetime_format": str, - "sms_response": str, - constants.COMPACT_PREFIX: str, - constants.COMPACT_DELIMITER: str, - "file_name": str, - "default_language": str, - "_translations": recursive_dict, - "submission_url": str, - "auto_send": str, - "auto_delete": str, - "public_key": str, - "instance_xmlns": str, - "version": str, - "choices": dict, - "style": str, - "attribute": dict, - "namespaces": str, - constants.ENTITY_FEATURES: list, - } - ) + __slots__ = SURVEY_EXTRA_FIELDS + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + return SURVEY_FIELDS + + def __init__(self, **kwargs): + # Internals + self._created: datetime.now = datetime.now() + self._search_lists: set = set() + self._translations: recursive_dict = recursive_dict() + self._xpath: dict[str, SurveyElement | None] = {} + + # Structure + # attribute is for custom instance attrs from settings e.g. attribute::abc:xyz + self.attribute: dict | None = None + self.choices: dict[str, tuple[Option, ...]] | None = None + self.entity_features: list[str] | None = None + self.setgeopoint_by_triggering_ref: dict[str, list[str]] = {} + self.setvalues_by_triggering_ref: dict[str, list[str]] = {} + + # Common / template settings + self.default_language: str = "" + self.id_string: str = "" + self.instance_name: str = "" + self.style: str | None = None + self.title: str = "" + self.version: str = "" + + # Other settings + self.add_none_option: bool = False + self.allow_choice_duplicates: bool = False + self.auto_delete: str | None = None + self.auto_send: str | None = None + self.clean_text_values: bool = False + self.instance_xmlns: str | None = None + self.namespaces: str | None = None + self.omit_instanceID: bool = False + self.public_key: str | None = None + self.submission_url: str | None = None + + # SMS / compact settings + self.delimiter: str | None = None + self.prefix: str | None = None + self.sms_allow_media: bool | None = None + self.sms_date_format: str | None = None + self.sms_datetime_format: str | None = None + self.sms_keyword: str | None = None + self.sms_response: str | None = None + self.sms_separator: str | None = None + + choices = kwargs.pop("choices", None) + if choices is not None: + self.choices = { + list_name: tuple( + c if isinstance(c, Option) else Option(**c) for c in values + ) + for list_name, values in choices.items() + } + kwargs[constants.TYPE] = constants.SURVEY + super().__init__(fields=SURVEY_EXTRA_FIELDS, **kwargs) + + def to_json_dict(self, delete_keys: Iterable[str] | None = None) -> dict: + to_delete = (k for k in self.get_slot_names() if k.startswith("_")) + if delete_keys is not None: + to_delete = chain(to_delete, delete_keys) + return super().to_json_dict(delete_keys=to_delete) def validate(self): if self.id_string in [None, "None"]: @@ -247,14 +322,17 @@ def _validate_uniqueness_of_section_names(self): def get_nsmap(self): """Add additional namespaces""" - namespaces = getattr(self, constants.NAMESPACES, "") - if len(getattr(self, constants.ENTITY_FEATURES, [])) > 0: - namespaces += " entities=http://www.opendatakit.org/xforms/entities" + if self.entity_features: + entities_ns = " entities=http://www.opendatakit.org/xforms/entities" + if self.namespaces is None: + self.namespaces = entities_ns + else: + self.namespaces += entities_ns - if namespaces and isinstance(namespaces, str): + if self.namespaces: nslist = [ ns.split("=") - for ns in namespaces.split() + for ns in self.namespaces.split() if len(ns.split("=")) == 2 and ns.split("=")[0] != "" ] xmlns = "xmlns:" @@ -287,8 +365,8 @@ def xml(self): self.insert_xpaths(triggering_reference, self) body_kwargs = {} - if hasattr(self, constants.STYLE) and getattr(self, constants.STYLE): - body_kwargs["class"] = getattr(self, constants.STYLE) + if self.style: + body_kwargs["class"] = self.style nsmap = self.get_nsmap() return node( @@ -298,13 +376,11 @@ def xml(self): **nsmap, ) - def get_trigger_values_for_question_name(self, question_name, trigger_type): - trigger_map = { - "setvalue": self.setvalues_by_triggering_ref, - "setgeopoint": self.setgeopoint_by_triggering_ref, - } - - return trigger_map.get(trigger_type, {}).get(f"${{{question_name}}}") + def get_trigger_values_for_question_name(self, question_name: str, trigger_type: str): + if trigger_type == "setvalue": + return self.setvalues_by_triggering_ref.get(f"${{{question_name}}}") + elif trigger_type == "setgeopoint": + return self.setgeopoint_by_triggering_ref.get(f"${{{question_name}}}") def _generate_static_instances(self, list_name, choice_list) -> InstanceInfo: """ @@ -315,34 +391,42 @@ def _generate_static_instances(self, list_name, choice_list) -> InstanceInfo: has_dyn_label = has_dynamic_label(choice_list) multi_language = False if isinstance(self._translations, dict): - choices = tuple( - k + choices = ( + True for items in self._translations.values() for k, v in items.items() if v.get(constants.TYPE, "") == constants.CHOICE and "-".join(k.split("-")[:-1]) == list_name ) - if 0 < len(choices): - multi_language = True + try: + if next(choices): + multi_language = True + except StopIteration: + pass for idx, choice in enumerate(choice_list): choice_element_list = [] # Add a unique id to the choice element in case there are itext references if multi_language or has_media or has_dyn_label: - itext_id = "-".join([list_name, str(idx)]) + itext_id = f"{list_name}-{idx}" choice_element_list.append(node("itextId", itext_id)) for name, value in choice.items(): - if isinstance(value, str) and name != "label": - choice_element_list.append(node(name, str(value))) - if ( + if not value: + continue + elif name != "label" and isinstance(value, str): + choice_element_list.append(node(name, value)) + elif name == "extra_data" and isinstance(value, dict): + for k, v in value.items(): + choice_element_list.append(node(k, v)) + elif ( not multi_language and not has_media and not has_dyn_label and isinstance(value, str) and name == "label" ): - choice_element_list.append(node(name, str(value))) + choice_element_list.append(node(name, value)) instance_element_list.append(node("item", *choice_element_list)) @@ -355,7 +439,7 @@ def _generate_static_instances(self, list_name, choice_list) -> InstanceInfo: ) @staticmethod - def _generate_external_instances(element) -> InstanceInfo | None: + def _generate_external_instances(element: SurveyElement) -> InstanceInfo | None: if isinstance(element, ExternalInstance): name = element["name"] extension = element["type"].split("-")[0] @@ -401,7 +485,7 @@ def _validate_external_instances(instances) -> None: raise ValidationError("\n".join(errors)) @staticmethod - def _generate_pulldata_instances(element) -> list[InstanceInfo] | None: + def _generate_pulldata_instances(element: SurveyElement) -> list[InstanceInfo] | None: def get_pulldata_functions(element): """ Returns a list of different pulldata(... function strings if @@ -412,11 +496,23 @@ def get_pulldata_functions(element): """ functions_present = [] for formula_name in constants.EXTERNAL_INSTANCES: - if "pulldata(" in str(element["bind"].get(formula_name)): + if ( + hasattr(element, "bind") + and element.bind is not None + and "pulldata(" in str(element["bind"].get(formula_name)) + ): functions_present.append(element["bind"][formula_name]) - if "pulldata(" in str(element["choice_filter"]): - functions_present.append(element["choice_filter"]) - if "pulldata(" in str(element["default"]): + if ( + hasattr(element, constants.CHOICE_FILTER) + and element.choice_filter is not None + and "pulldata(" in str(element[constants.CHOICE_FILTER]) + ): + functions_present.append(element[constants.CHOICE_FILTER]) + if ( + hasattr(element, "default") + and element.default is not None + and "pulldata(" in str(element["default"]) + ): functions_present.append(element["default"]) return functions_present @@ -434,6 +530,8 @@ def get_instance_info(element, file_id): instance=node("instance", id=file_id, src=uri), ) + if isinstance(element, Option | ExternalInstance | Tag | Survey): + return None pulldata_usages = get_pulldata_functions(element) if len(pulldata_usages) > 0: pulldata_instances = [] @@ -451,7 +549,9 @@ def get_instance_info(element, file_id): return None @staticmethod - def _generate_from_file_instances(element) -> InstanceInfo | None: + def _generate_from_file_instances(element: SurveyElement) -> InstanceInfo | None: + if not isinstance(element, MultipleChoiceQuestion) or element.itemset is None: + return None itemset = element.get("itemset") file_id, ext = os.path.splitext(itemset) if itemset and ext in EXTERNAL_INSTANCE_EXTENSIONS: @@ -474,16 +574,21 @@ def _generate_last_saved_instance(element) -> bool: """ True if a last-saved instance should be generated, false otherwise. """ + if not hasattr(element, "bind") or element.bind is None: + return False for expression_type in constants.EXTERNAL_INSTANCES: last_saved_expression = re.search( LAST_SAVED_REGEX, str(element["bind"].get(expression_type)) ) if last_saved_expression: return True - return bool( - re.search(LAST_SAVED_REGEX, str(element["choice_filter"])) - or re.search(LAST_SAVED_REGEX, str(element["default"])) + hasattr(element, constants.CHOICE_FILTER) + and element.choice_filter is not None + and re.search(LAST_SAVED_REGEX, str(element.choice_filter)) + or hasattr(element, "default") + and element.default is not None + and re.search(LAST_SAVED_REGEX, str(element.default)) ) @staticmethod @@ -550,10 +655,12 @@ def _generate_instances(self) -> Generator[DetachableElement, None, None]: instances += [self._get_last_saved_instance()] # Append last so the choice instance is excluded on a name clash. - for name, value in self.choices.items(): - instances += [ - self._generate_static_instances(list_name=name, choice_list=value) - ] + if self.choices: + for name, value in self.choices.items(): + if name not in self._search_lists: + instances += [ + self._generate_static_instances(list_name=name, choice_list=value) + ] # Check that external instances have unique names. if instances: @@ -580,14 +687,14 @@ def _generate_instances(self) -> Generator[DetachableElement, None, None]: yield i.instance seen[i.name] = i - def xml_descendent_bindings(self) -> Generator["DetachableElement", None, None]: + def xml_descendent_bindings(self) -> Generator[DetachableElement | None, None, None]: """ Yield bindings for this node and all its descendants. """ - for e in self.iter_descendants(): - xml_bindings = e.xml_bindings(survey=self) - if xml_bindings is not None: - yield from xml_bindings + for e in self.iter_descendants( + condition=lambda i: not isinstance(i, Option | Tag) + ): + yield from e.xml_bindings(survey=self) # dynamic defaults for repeats go in the body. All other dynamic defaults (setvalue actions) go in the model if not next( @@ -616,13 +723,12 @@ def xml_model(self): model_kwargs = {"odk:xforms-version": constants.CURRENT_XFORMS_VERSION} - entity_features = getattr(self, constants.ENTITY_FEATURES, []) - if len(entity_features) > 0: - if "offline" in entity_features: + if self.entity_features: + if "offline" in self.entity_features: model_kwargs["entities:entities-version"] = ( constants.ENTITIES_OFFLINE_VERSION ) - elif "update" in entity_features: + elif "update" in self.entity_features: model_kwargs["entities:entities-version"] = ( constants.ENTITIES_UPDATE_VERSION ) @@ -664,8 +770,9 @@ def xml_instance(self, **kwargs): result = Section.xml_instance(self, survey=self, **kwargs) # set these first to prevent overwriting id and version - for key, value in self.attribute.items(): - result.setAttribute(str(key), value) + if self.attribute: + for key, value in self.attribute.items(): + result.setAttribute(str(key), value) result.setAttribute("id", self.id_string) @@ -696,7 +803,7 @@ def _add_to_nested_dict(self, dicty, path, value): dicty[path[0]] = {} self._add_to_nested_dict(dicty[path[0]], path[1:], value) - def _redirect_is_search_itext(self, element: SurveyElement) -> bool: + def _redirect_is_search_itext(self, element: Question) -> bool: """ For selects using the "search()" function, redirect itext for in-line items. @@ -728,11 +835,12 @@ def _redirect_is_search_itext(self, element: SurveyElement) -> bool: "Remove the 'search()' usage, or change the select type." ) raise PyXFormError(msg) - itemset = element[constants.ITEMSET] - self.choices.pop(itemset, None) - element[constants.ITEMSET] = "" - for i, opt in enumerate(element.get(constants.CHILDREN, [])): - opt["_choice_itext_id"] = f"{element[constants.LIST_NAME_U]}-{i}" + if self.choices: + element.children = self.choices.get(element[constants.ITEMSET], None) + element[constants.ITEMSET] = "" + if element.children is not None: + for i, opt in enumerate(element.children): + opt["_choice_itext_id"] = f"{element[constants.LIST_NAME_U]}-{i}" return is_search def _setup_translations(self): @@ -766,15 +874,19 @@ def get_choices(): for idx, choice in enumerate(choice_list): for col_name, choice_value in choice.items(): lang_choice = None + if not choice_value: + continue if col_name == constants.MEDIA: has_media = True - if isinstance(choice_value, dict): lang_choice = choice_value - multi_language = True elif col_name == constants.LABEL: - lang_choice = {self.default_language: choice_value} - if is_label_dynamic(choice_value): - dyn_label = True + if isinstance(choice_value, dict): + lang_choice = choice_value + multi_language = True + else: + lang_choice = {self.default_language: choice_value} + if is_label_dynamic(choice_value): + dyn_label = True if lang_choice is not None: # e.g. (label, {"default": "Yes"}, "consent", 0) choices.append((col_name, lang_choice, list_name, idx)) @@ -790,27 +902,33 @@ def get_choices(): c[0], c[1], f"{c[2]}-{c[3]}" ) - for path, value in get_choices(): - last_path = path.pop() - leaf_value = {last_path: value, constants.TYPE: constants.CHOICE} - self._add_to_nested_dict(self._translations, path, leaf_value) + if self.choices: + for path, value in get_choices(): + last_path = path.pop() + leaf_value = {last_path: value, constants.TYPE: constants.CHOICE} + self._add_to_nested_dict(self._translations, path, leaf_value) select_types = set(aliases.select.keys()) search_lists = set() non_search_lists = set() - for element in self.iter_descendants(): - itemset = element.get("itemset") - if itemset is not None: - element._itemset_multi_language = itemset in itemsets_multi_language - element._itemset_has_media = itemset in itemsets_has_media - element._itemset_dyn_label = itemset in itemsets_has_dyn_label - - if element[constants.TYPE] in select_types: - select_ref = (element[constants.NAME], element[constants.LIST_NAME_U]) - if self._redirect_is_search_itext(element=element): - search_lists.add(select_ref) - else: - non_search_lists.add(select_ref) + for element in self.iter_descendants( + condition=lambda i: isinstance(i, Question | Section) + ): + if isinstance(element, MultipleChoiceQuestion): + if element.itemset is not None: + element._itemset_multi_language = ( + element.itemset in itemsets_multi_language + ) + element._itemset_has_media = element.itemset in itemsets_has_media + element._itemset_dyn_label = element.itemset in itemsets_has_dyn_label + + if element.type in select_types: + select_ref = (element[constants.NAME], element[constants.LIST_NAME_U]) + if self._redirect_is_search_itext(element=element): + search_lists.add(select_ref) + self._search_lists.add(element[constants.LIST_NAME_U]) + else: + non_search_lists.add(select_ref) # Skip creation of translations for choices in selects. The creation of these # translations is done above in this function. @@ -891,7 +1009,7 @@ def _set_up_media_translations(media_dict, translation_key): media_dict = media_dict_default for media_type, possibly_localized_media in media_dict.items(): - if media_type not in SurveyElement.SUPPORTED_MEDIA: + if media_type not in constants.SUPPORTED_MEDIA_TYPES: raise PyXFormError("Media type: " + media_type + " not supported") if isinstance(possibly_localized_media, dict): @@ -921,7 +1039,9 @@ def _set_up_media_translations(media_dict, translation_key): translations_trans_key[media_type] = media for survey_element in self.iter_descendants( - condition=lambda i: not isinstance(i, Option) + condition=lambda i: not isinstance( + i, Survey | EntityDeclaration | ExternalInstance | Tag | Option + ) ): # Skip set up of media for choices in selects. Translations for their media # content should have been set up in _setup_translations, with one copy of @@ -1027,7 +1147,6 @@ def __unicode__(self): return f"" def _setup_xpath_dictionary(self): - self._xpath: dict[str, SurveyElement | None] = {} # pylint: disable=attribute-defined-outside-init for element in self.iter_descendants(lambda i: isinstance(i, Question | Section)): if element.name in self._xpath: self._xpath[element.name] = None @@ -1144,7 +1263,7 @@ def _is_return_relative_path() -> bool: raise PyXFormError(intro + " There is no survey element with this name.") if self._xpath[name] is None: raise PyXFormError( - intro + " There are multiple survey elements" " with this name." + intro + " There are multiple survey elements with this name." ) if _is_return_relative_path(): @@ -1159,7 +1278,13 @@ def _is_return_relative_path() -> bool: ) return " " + last_saved_prefix + self._xpath[name].get_xpath() + " " - def insert_xpaths(self, text, context, use_current=False, reference_parent=False): + def insert_xpaths( + self, + text: str, + context: SurveyElement, + use_current: bool = False, + reference_parent: bool = False, + ): """ Replace all instances of ${var} with the xpath to var. """ @@ -1169,6 +1294,7 @@ def _var_repl_function(matchobj): matchobj, context, use_current, reference_parent ) + # "text" may actually be a dict, e.g. for custom attributes. return re.sub(BRACKETED_TAG_REGEX, _var_repl_function, str(text)) def _var_repl_output_function(self, matchobj, context): @@ -1227,7 +1353,7 @@ def print_xform_to_file( if warnings is None: warnings = [] if not path: - path = self._print_name + ".xml" + path = self.id_string + ".xml" if pretty_print: xml = self._to_pretty_xml() else: @@ -1272,17 +1398,18 @@ def to_xml(self, validate=True, pretty_print=True, warnings=None, enketo=False): # So it must be explicitly created, opened, closed, and removed. tmp = tempfile.NamedTemporaryFile(delete=False) tmp.close() + tmp_path = Path(tmp.name) try: # this will throw an exception if the xml is not valid xml = self.print_xform_to_file( - path=tmp.name, + path=tmp_path, validate=validate, pretty_print=pretty_print, warnings=warnings, enketo=enketo, ) finally: - Path(tmp.name).unlink(missing_ok=True) + tmp_path.unlink(missing_ok=True) return xml def instantiate(self): diff --git a/pyxform/survey_element.py b/pyxform/survey_element.py index d455729a..f72d4f74 100644 --- a/pyxform/survey_element.py +++ b/pyxform/survey_element.py @@ -4,18 +4,18 @@ import json import re -from collections.abc import Callable, Generator +from collections.abc import Callable, Generator, Iterable, Mapping from itertools import chain -from typing import TYPE_CHECKING, Any, ClassVar, Optional +from typing import TYPE_CHECKING, Optional from pyxform import aliases as alias from pyxform import constants as const from pyxform.errors import PyXFormError from pyxform.parsing.expression import is_xml_tag -from pyxform.question_type_dictionary import QUESTION_TYPE_DICT from pyxform.utils import ( BRACKETED_TAG_REGEX, INVALID_XFORM_TAG_REGEXP, + DetachableElement, default_is_dynamic, node, ) @@ -23,56 +23,21 @@ if TYPE_CHECKING: from pyxform.survey import Survey - from pyxform.utils import DetachableElement + # The following are important keys for the underlying dict that describes SurveyElement -FIELDS = { - "name": str, - const.COMPACT_TAG: str, # used for compact (sms) representation - "sms_field": str, - "sms_option": str, - "label": str, - "hint": str, - "guidance_hint": str, - "default": str, - "type": str, - "appearance": str, - "parameters": dict, - "intent": str, - "jr:count": str, - "bind": dict, - "instance": dict, - "control": dict, - "media": dict, +SURVEY_ELEMENT_FIELDS = ( + "name", + "label", # this node will also have a parent and children, like a tree! - "parent": lambda: None, - "children": list, - "itemset": str, - "choice_filter": str, - "query": str, - "autoplay": str, - "flat": lambda: False, - "action": str, - const.LIST_NAME_U: str, - "trigger": str, -} - - -def _overlay(over, under): - if isinstance(under, dict): - result = under.copy() - result.update(over) - return result - return over if over else under - - -SURVEY_ELEMENT_LOCAL_KEYS = { - "_survey_element_default_type", - "_survey_element_xpath", -} + "parent", + "extra_data", +) +SURVEY_ELEMENT_EXTRA_FIELDS = ("_survey_element_xpath",) +SURVEY_ELEMENT_SLOTS = (*SURVEY_ELEMENT_FIELDS, *SURVEY_ELEMENT_EXTRA_FIELDS) -class SurveyElement(dict): +class SurveyElement(Mapping): """ SurveyElement is the base class we'll looks for the following keys in kwargs: name, label, hint, type, bind, control, parent, @@ -80,70 +45,95 @@ class SurveyElement(dict): """ __name__ = "SurveyElement" - FIELDS: ClassVar[dict[str, Any]] = FIELDS.copy() - - def _dict_get(self, key): - """Built-in dict.get to bypass __getattr__""" - return dict.get(self, key) - - def __getattr__(self, key): - """ - Get attributes from FIELDS rather than the class. - """ - if key in self.FIELDS: - under = None - default = self._dict_get("_survey_element_default_type") - if default is not None: - under = default.get(key, None) - if not under: - return self._dict_get(key) - return _overlay(self._dict_get(key), under) - raise AttributeError(key) + __slots__ = SURVEY_ELEMENT_SLOTS def __hash__(self): return hash(id(self)) + def __getitem__(self, key): + return self.__getattribute__(key) + + @staticmethod + def get_slot_names() -> tuple[str, ...]: + """Each subclass must provide a list of slots from itself and all parents.""" + return SURVEY_ELEMENT_SLOTS + + def __len__(self): + return len(self.get_slot_names()) + + def __iter__(self): + return iter(self.get_slot_names()) + + def __setitem__(self, key, value): + self.__setattr__(key, value) + def __setattr__(self, key, value): if key == "parent": # If object graph position changes then invalidate cached. self._survey_element_xpath = None - self[key] = value - - def __init__(self, **kwargs): - self._survey_element_default_type: dict = kwargs.get( - "question_type_dict", QUESTION_TYPE_DICT - ).get(kwargs.get("type"), {}) + super().__setattr__(key, value) + + def __init__( + self, + name: str, + label: str | dict | None = None, + fields: tuple[str, ...] | None = None, + **kwargs, + ): + # Internals self._survey_element_xpath: str | None = None - for key, default in self.FIELDS.items(): - self[key] = kwargs.get(key, default()) - self._link_children() + + # Structure + self.parent: SurveyElement | None = None + self.extra_data: dict | None = None + + # Settings + self.name: str = name + self.label: str | dict | None = label + + if fields is not None: + for key in fields: + if key not in SURVEY_ELEMENT_FIELDS: + value = kwargs.pop(key, None) + if value or not hasattr(self, key): + self[key] = value + if len(kwargs) > 0: + self.extra_data = kwargs + + if hasattr(self, const.CHILDREN): + self._link_children() # Create a space label for unlabeled elements with the label # appearance tag. # This is because such elements are used to label the # options for selects in a field-list and might want blank labels for # themselves. - if self.control.get("appearance") == "label" and not self.label: - self["label"] = " " + if ( + hasattr(self, "control") + and self.control + and self.control.get("appearance") == "label" + and not self.label + ): + self.label = " " super().__init__() def _link_children(self): - for child in self.children: - child.parent = self + if self.children is not None: + for child in self.children: + child.parent = self def add_child(self, child): + if self.children is None: + self.children = [] self.children.append(child) child.parent = self def add_children(self, children): - if isinstance(children, list): + if isinstance(children, list | tuple): for child in children: self.add_child(child) else: self.add_child(children) - # Supported media types for attaching to questions - SUPPORTED_MEDIA = ("image", "big-image", "audio", "video") - def validate(self): if not is_xml_tag(self.name): invalid_char = re.search(INVALID_XFORM_TAG_REGEXP, self.name) @@ -166,12 +156,13 @@ def iter_descendants( yield self else: yield self - for e in self.children: - yield from e.iter_descendants(condition=condition) + if hasattr(self, const.CHILDREN) and self.children is not None: + for e in self.children: + yield from e.iter_descendants(condition=condition) def iter_ancestors( self, condition: Callable[["SurveyElement"], bool] | None = None - ) -> Generator[tuple["SurvekyElement", int], None, None]: + ) -> Generator[tuple["SurveyElement", int], None, None]: """ Get each self.parent with their distance from self (starting at 1). @@ -250,10 +241,10 @@ def condition(e): # The "flat" setting was added in 2013 to support ODK Tables, and results in # a data instance with no element nesting. Not sure if still needed. return isinstance(e, Survey) or ( - not isinstance(e, Survey) and not e.get("flat") + not isinstance(e, Survey) and not (hasattr(e, "flat") and e.get("flat")) ) - current_value = self._dict_get("_survey_element_xpath") + current_value = self._survey_element_xpath if current_value is None: if condition(self): self_element = (self,) @@ -268,7 +259,7 @@ def condition(e): return new_value return current_value - def _delete_keys_from_dict(self, dictionary: dict, keys: list): + def _delete_keys_from_dict(self, dictionary: dict, keys: Iterable[str]): """ Deletes a list of keys from a dictionary. Credits: https://stackoverflow.com/a/49723101 @@ -281,27 +272,33 @@ def _delete_keys_from_dict(self, dictionary: dict, keys: list): if isinstance(value, dict): self._delete_keys_from_dict(value, keys) - def to_json_dict(self): + def copy(self): + return {k: self[k] for k in self} + + def to_json_dict(self, delete_keys: Iterable[str] | None = None) -> dict: """ Create a dict copy of this survey element by removing inappropriate attributes and converting its children to dicts """ self.validate() result = self.copy() - to_delete = [ - "parent", - "question_type_dictionary", - "_created", - "_xpath", - *SURVEY_ELEMENT_LOCAL_KEYS, - ] + to_delete = chain(SURVEY_ELEMENT_EXTRA_FIELDS, ("extra_data",)) + if delete_keys is not None: + to_delete = chain(to_delete, delete_keys) # Delete all keys that may cause a "Circular Reference" # error while converting the result to JSON self._delete_keys_from_dict(result, to_delete) - children = result.pop("children") - result["children"] = [] - for child in children: - result["children"].append(child.to_json_dict()) + children = result.pop("children", None) + if children: + result["children"] = [ + c.to_json_dict(delete_keys=("parent",)) for c in children + ] + choices = result.pop("choices", None) + if choices: + result["choices"] = { + list_name: [o.to_json_dict(delete_keys=("parent",)) for o in options] + for list_name, options in choices.items() + } # Translation items with "output_context" have circular references. if "_translations" in result: for lang in result["_translations"].values(): @@ -386,7 +383,9 @@ def get_translations(self, default_language): } for display_element in ["label", "hint", "guidance_hint"]: - label_or_hint = self[display_element] + label_or_hint = None + if hasattr(self, display_element): + label_or_hint = self[display_element] if ( display_element == "label" @@ -400,6 +399,7 @@ def get_translations(self, default_language): # how they're defined - https://opendatakit.github.io/xforms-spec/#languages if ( display_element == "guidance_hint" + and label_or_hint is not None and not isinstance(label_or_hint, dict) and len(label_or_hint) > 0 ): @@ -409,8 +409,11 @@ def get_translations(self, default_language): if ( display_element == "hint" and not isinstance(label_or_hint, dict) + and hasattr(self, "hint") + and self.get("hint") is not None and len(label_or_hint) > 0 - and "guidance_hint" in self.keys() + and hasattr(self, "guidance_hint") + and self.get("guidance_hint") is not None and len(self["guidance_hint"]) > 0 ): label_or_hint = {default_language: label_or_hint} @@ -428,11 +431,15 @@ def get_translations(self, default_language): def needs_itext_ref(self): return isinstance(self.label, dict) or ( - isinstance(self.media, dict) and len(self.media) > 0 + hasattr(self, const.MEDIA) and isinstance(self.media, dict) and self.media ) def get_setvalue_node_for_dynamic_default(self, survey: "Survey", in_repeat=False): - if not self.default or not default_is_dynamic(self.default, self.type): + if ( + not hasattr(self, "default") + or not self.default + or not default_is_dynamic(self.default, self.type) + ): return None default_with_xpath_paths = survey.insert_xpaths(self.default, self) @@ -455,17 +462,21 @@ def xml_label(self, survey: "Survey"): # then we need to make a label with an itext ref ref = f"""jr:itext('{self._translation_path("label")}')""" return node("label", ref=ref) - else: + elif self.label: label, output_inserted = survey.insert_output_values(self.label, self) return node("label", label, toParseString=output_inserted) + else: + return node("label") def xml_hint(self, survey: "Survey"): if isinstance(self.hint, dict) or self.guidance_hint: path = self._translation_path("hint") return node("hint", ref=f"jr:itext('{path}')") - else: + elif self.hint: hint, output_inserted = survey.insert_output_values(self.hint, self) return node("hint", hint, toParseString=output_inserted) + else: + return node("hint") def xml_label_and_hint(self, survey: "Survey") -> list["DetachableElement"]: """ @@ -492,48 +503,54 @@ def xml_label_and_hint(self, survey: "Survey") -> list["DetachableElement"]: raise PyXFormError(msg) # big-image must combine with image - if "image" not in self.media and "big-image" in self.media: + if ( + self.media is not None + and "image" not in self.media + and "big-image" in self.media + ): raise PyXFormError( "To use big-image, you must also specify an image for the survey element named {self.name}." ) return result - def xml_bindings(self, survey: "Survey"): + def xml_bindings( + self, survey: "Survey" + ) -> Generator[DetachableElement | None, None, None]: """ Return the binding(s) for this survey element. """ - bind_dict = self.bind.copy() - if self.get("flat"): + if not hasattr(self, "bind") or self.get("bind") is None: + return None + if hasattr(self, "flat") and self.get("flat"): # Don't generate bind element for flat groups. return None - if bind_dict: + + bind_dict = {} + for k, v in self.bind.items(): # the expression goes in a setvalue action - if self.trigger and "calculate" in self.bind: - del bind_dict["calculate"] - - for k, v in bind_dict.items(): - # I think all the binding conversions should be happening on - # the xls2json side. - if ( - hashable(v) - and v in alias.BINDING_CONVERSIONS - and k in const.CONVERTIBLE_BIND_ATTRIBUTES - ): - v = alias.BINDING_CONVERSIONS[v] - elif k == "jr:constraintMsg" and ( - isinstance(v, dict) or re.search(BRACKETED_TAG_REGEX, v) - ): - v = f"""jr:itext('{self._translation_path("jr:constraintMsg")}')""" - elif k == "jr:requiredMsg" and ( - isinstance(v, dict) or re.search(BRACKETED_TAG_REGEX, v) - ): - v = f"""jr:itext('{self._translation_path("jr:requiredMsg")}')""" - elif k == "jr:noAppErrorString" and isinstance(v, dict): - v = f"""jr:itext('{self._translation_path("jr:noAppErrorString")}')""" - bind_dict[k] = survey.insert_xpaths(v, context=self) - return [node("bind", nodeset=self.get_xpath(), **bind_dict)] - return None + if hasattr(self, "trigger") and self.trigger and k == "calculate": + continue + # I think all the binding conversions should be happening on + # the xls2json side. + if ( + hashable(v) + and v in alias.BINDING_CONVERSIONS + and k in const.CONVERTIBLE_BIND_ATTRIBUTES + ): + v = alias.BINDING_CONVERSIONS[v] + elif k == "jr:constraintMsg" and ( + isinstance(v, dict) or re.search(BRACKETED_TAG_REGEX, v) + ): + v = f"""jr:itext('{self._translation_path("jr:constraintMsg")}')""" + elif k == "jr:requiredMsg" and ( + isinstance(v, dict) or re.search(BRACKETED_TAG_REGEX, v) + ): + v = f"""jr:itext('{self._translation_path("jr:requiredMsg")}')""" + elif k == "jr:noAppErrorString" and isinstance(v, dict): + v = f"""jr:itext('{self._translation_path("jr:noAppErrorString")}')""" + bind_dict[k] = survey.insert_xpaths(v, context=self) + yield node("bind", nodeset=self.get_xpath(), **bind_dict) def xml_control(self, survey: "Survey"): """ @@ -542,16 +559,6 @@ def xml_control(self, survey: "Survey"): """ raise NotImplementedError("Control not implemented") - def xml_action(self): - """ - Return the action for this survey element. - """ - if self.action: - action_dict = self.action.copy() - return node(action_dict.pop("name"), ref=self.get_xpath(), **action_dict) - - return None - def hashable(v): """Determine whether `v` can be hashed.""" diff --git a/pyxform/utils.py b/pyxform/utils.py index dcf4414b..66eb771b 100644 --- a/pyxform/utils.py +++ b/pyxform/utils.py @@ -6,8 +6,9 @@ import csv import json import re -from collections.abc import Generator +from collections.abc import Generator, Iterable from io import StringIO +from itertools import chain from json.decoder import JSONDecodeError from typing import Any from xml.dom import Node @@ -116,9 +117,7 @@ def node(*args, **kwargs) -> DetachableElement: # Move node's children to the result Element # discarding node's root for child in parsed_node.childNodes: - # No tests seem to involve moving elements with children. - # Deep clone used anyway in case of unknown edge cases. - result.appendChild(child.cloneNode(deep=True)) + result.appendChild(child.cloneNode(deep=False)) else: result.setAttribute(k, v) @@ -313,16 +312,16 @@ def coalesce(*args): def combine_lists( - a: list[dict] | None = None, - b: list[dict] | None = None, -) -> list[dict]: + a: Iterable | None = None, + b: Iterable | None = None, +) -> Iterable | None: """Get the list that is not None, or both lists combined, or an empty list.""" - if a is None and b is None: - combined = [] + if b is None: + if a is None: + return None + else: + return a elif a is None: - combined = b - elif b is None: - combined = a + return b else: - combined = a + b - return combined + return chain(a, b) diff --git a/pyxform/xls2json.py b/pyxform/xls2json.py index 90699c6e..af281ef9 100644 --- a/pyxform/xls2json.py +++ b/pyxform/xls2json.py @@ -1132,13 +1132,57 @@ def workbook_to_json( specify_other_question = None if parse_dict.get("specify_other") is not None: sheet_translations.or_other_seen = True - select_type += constants.SELECT_OR_OTHER_SUFFIX if row.get(constants.CHOICE_FILTER): msg = ( ROW_FORMAT_STRING % row_number + " Choice filter not supported with or_other." ) raise PyXFormError(msg) + itemset_choices = choices.get(list_name, None) + if not itemset_choices: + msg = ( + ROW_FORMAT_STRING % row_number + + " Please specify choices for this 'or other' question." + ) + raise PyXFormError(msg) + if ( + itemset_choices is not None + and isinstance(itemset_choices, list) + and not any( + c[constants.NAME] == constants.OR_OTHER_CHOICE[constants.NAME] + for c in itemset_choices + ) + ): + if any( + isinstance(c.get(constants.LABEL), dict) + for c in itemset_choices + ): + itemset_choices.append( + { + constants.NAME: constants.OR_OTHER_CHOICE[ + constants.NAME + ], + constants.LABEL: { + lang: constants.OR_OTHER_CHOICE[constants.LABEL] + for lang in { + lang + for c in itemset_choices + for lang in c[constants.LABEL] + if isinstance(c.get(constants.LABEL), dict) + } + }, + } + ) + else: + itemset_choices.append(constants.OR_OTHER_CHOICE) + specify_other_question = { + constants.TYPE: "text", + constants.NAME: f"{row[constants.NAME]}_other", + constants.LABEL: "Specify other.", + constants.BIND: { + "relevant": f"selected(../{row[constants.NAME]}, 'other')" + }, + } new_json_dict = row.copy() new_json_dict[constants.TYPE] = select_type diff --git a/tests/test_builder.py b/tests/test_builder.py index ee27322a..2c5ded36 100644 --- a/tests/test_builder.py +++ b/tests/test_builder.py @@ -32,8 +32,8 @@ class BuilderTests(TestCase): # self.assertTrue(xml_compare(expected, result)) def test_unknown_question_type(self): - survey = utils.build_survey("unknown_question_type.xls") - self.assertRaises(PyXFormError, survey.to_xml) + with self.assertRaises(PyXFormError): + utils.build_survey("unknown_question_type.xls") def test_uniqueness_of_section_names(self): # Looking at the xls file, I think this test might be broken. @@ -43,9 +43,7 @@ def test_uniqueness_of_section_names(self): def setUp(self): self.this_directory = os.path.dirname(__file__) survey_out = Survey(name="age", sms_keyword="age", type="survey") - question = InputQuestion(name="age") - question.type = "integer" - question.label = "How old are you?" + question = InputQuestion(name="age", type="integer", label="How old are you?") survey_out.add_child(question) self.survey_out_dict = survey_out.to_json_dict() print_pyobj_to_json( diff --git a/tests/test_j2x_creation.py b/tests/test_j2x_creation.py index 72761275..f4b69989 100644 --- a/tests/test_j2x_creation.py +++ b/tests/test_j2x_creation.py @@ -16,12 +16,15 @@ def test_survey_can_be_created_in_a_slightly_less_verbose_manner(self): {"name": "blue", "label": "Blue"}, ] - q = MultipleChoiceQuestion(name="Favorite_Color", choices=option_dict_array) - q.type = "select one" - s = Survey(name="Roses_are_Red", children=[q]) + q = MultipleChoiceQuestion( + name="Favorite_Color", type="select one", choices=option_dict_array + ) + s = Survey(name="Roses_are_Red") + s.add_child(q) expected_dict = { "name": "Roses_are_Red", + "type": "survey", "children": [ { "name": "Favorite_Color", @@ -34,36 +37,38 @@ def test_survey_can_be_created_in_a_slightly_less_verbose_manner(self): ], } - self.assertEqual(s.to_json_dict(), expected_dict) + self.assertEqual(expected_dict, s.to_json_dict()) - def allow_surveys_with_comment_rows(self): + def test_allow_surveys_with_comment_rows(self): """assume that a survey with rows that don't have name, type, or label headings raise warning only""" path = utils.path_to_text_fixture("allow_comment_rows_test.xls") survey = create_survey_from_xls(path) expected_dict = { - "default_language": "default", - "id_string": "allow_comment_rows_test", "children": [ { - "name": "farmer_name", "label": {"English": "First and last name of farmer"}, + "name": "farmer_name", "type": "text", - } + }, + { + "children": [ + { + "bind": {"jr:preload": "uid", "readonly": "true()"}, + "name": "instanceID", + "type": "calculate", + } + ], + "control": {"bodyless": True}, + "name": "meta", + "type": "group", + }, ], - "name": "allow_comment_rows_test", - "_translations": { - "English": { - "/allow_comment_rows_test/farmer_name:label": { - "long": "First and last name of farmer" - } - } - }, + "default_language": "default", + "id_string": "allow_comment_rows_test", + "name": "data", + "sms_keyword": "allow_comment_rows_test", "title": "allow_comment_rows_test", - "_xpath": { - "allow_comment_rows_test": "/allow_comment_rows_test", - "farmer_name": "/allow_comment_rows_test/farmer_name", - }, "type": "survey", } - self.assertEqual(survey.to_json_dict(), expected_dict) + self.assertEqual(expected_dict, survey.to_json_dict()) diff --git a/tests/test_j2x_question.py b/tests/test_j2x_question.py index 14c65002..c2229d90 100644 --- a/tests/test_j2x_question.py +++ b/tests/test_j2x_question.py @@ -2,6 +2,7 @@ Testing creation of Surveys using verbose methods """ +from collections.abc import Generator from unittest import TestCase from pyxform import Survey @@ -19,6 +20,8 @@ def ctw(control): """ if isinstance(control, list) and len(control) == 1: control = control[0] + elif isinstance(control, Generator): + control = next(control) return control.toxml() @@ -188,7 +191,8 @@ def test_simple_phone_number_question_type_multilingual(self): "type": "string", "constraint": r"regex(., '^\d*$')", } - observed = {k: v for k, v in q.xml_bindings(survey=self.s)[0].attributes.items()} # noqa: C416 + binding = next(q.xml_bindings(survey=self.s)) + observed = {k: v for k, v in binding.attributes.items()} # noqa: C416 self.assertDictEqual(expected, observed) def test_simple_select_all_question_multilingual(self): diff --git a/tests/test_j2x_xform_build_preparation.py b/tests/test_j2x_xform_build_preparation.py index 062a2280..c83d7440 100644 --- a/tests/test_j2x_xform_build_preparation.py +++ b/tests/test_j2x_xform_build_preparation.py @@ -15,10 +15,10 @@ def test_dictionary_consolidates_duplicate_entries(self): ] first_yesno_question = MultipleChoiceQuestion( - name="yn_q1", options=yes_or_no_dict_array, type="select one" + name="yn_q1", choices=yes_or_no_dict_array, type="select one" ) second_yesno_question = MultipleChoiceQuestion( - name="yn_q2", options=yes_or_no_dict_array, type="select one" + name="yn_q2", choices=yes_or_no_dict_array, type="select one" ) s = Survey(name="yes_or_no_tests")