Taiga Develop 2 周之前
父节点
当前提交
84b53d909f

+ 6 - 1
source/__init__.py

@@ -1,5 +1,10 @@
 from .protocol import protocol
 from .message import message
-from .message import message_builder
+from .message_builder import message_builder
+from .message_step_builder import message_step_builder
 from .string_builder import string_builder
 from .encoder import encoder
+from .decoder import decoder
+from .builder_template import builder_template
+from .position_message import position_message
+from .position_message import position_message_builder

+ 62 - 0
source/builder_template.py

@@ -0,0 +1,62 @@
+from .template import template
+from .message import message
+from .message_builder import message_builder
+
+class builder_template(message_builder):
+    def __init__(self, result_type: type) -> None:
+        super().__init__()
+        self.__required = set()
+        self.__result_type = result_type
+
+    def _import_template(self, target: template) -> None:
+        default_keys = target.default_keys
+
+        for key in default_keys:
+            name = "set_" + key
+
+            setattr(self, name, self._create_setter(key))
+
+        for section in target.sections_names:
+            for key in target.get_section_keys(section):
+                name = "set_" + key
+
+                if name in default_keys:
+                    name = "set_" + key + "_" + section
+
+                setattr(self, name, self._create_setter(key, section))
+
+
+    def __create_setter_name(self, key: str, section: str | None) -> str:
+        if section is None:
+            return "kn_" + key
+        
+        return "ks_" + key + section
+
+    def _set_required_key(
+        self,
+        key: str, 
+        target: str, 
+        section: str | None
+    ) -> object:
+        name = self.__create_setter_name(key, section)
+
+        if not name in self.__required:
+            raise RuntimeError("Key " + key + " had been already set.")
+
+        self.__required.remove(name)
+        return self.set_key(key, target, section)
+
+    def _create_setter(self, key: str, section: str | None = None) -> object:
+        self.__required.add(self.__create_setter_name(key, section)) 
+
+        return lambda target: self._set_required_key(key, target, section)
+
+    def build(self) -> message:
+
+        if len(self.__required) > 0:
+            raise RuntimeError("One or more required keys are not set.")
+
+        return super().build(help_type = self.__result_type)
+
+
+

+ 45 - 5
source/decoder.py

@@ -1,15 +1,36 @@
 from .message import message
-from .message import message_builder
+from .message_builder import message_builder
+from .message_step_builder import message_step_builder
 from .protocol import protocol
 
 class decoder(protocol):
     def __init__(self, content: str) -> None:
         self.__content = content
-        self.__section = None
+        self.__builder = message_step_builder()
+
+    def process(self) -> object:
+        if self.__content is None:
+            return self
+
+        for line in self._lines:
+            self.__parse_line(line)
+
+        self.__content = None
+        return self
+
+    def result(self) -> message:
+        if self.__content is not None:
+            self.process()
+
+        return self._builder.build()
 
     @property
-    def _content(self) -> str:
-        return self.__content
+    def _lines(self) -> str:
+        return self.__content.split(self._line_separator)
+    
+    @property 
+    def _builder(self) -> message_step_builder:
+        return self.__builder
 
     def __parse_line(self, line: str) -> None:
         line = line.strip()
@@ -21,6 +42,25 @@ class decoder(protocol):
         self.__parse_key_value(line)
 
     def __is_section(self, line: str) -> bool:
+        if len(line) < 1:
+            return False
+
         return line[0] == self._section_buckles[0]
 
-    def __parse_section(self, line: str) -> 
+    def __parse_key_value(self, line: str) -> None:
+        position = line.find("=")
+        
+        if position == -1:    
+            return
+
+        key = line[:position].strip()
+        position = position + 1
+        value = line[position:].strip()
+
+        self._builder.add(key, value)
+
+    def __parse_section(self, line: str) -> None:
+        for char in self._section_buckles:
+            line = line.replace(char, "")
+        
+        self._builder.change_section(line)

+ 38 - 46
source/message.py

@@ -1,7 +1,18 @@
 import types
 
 class message:
-    def __init__(self, default: dict, sections: dict) -> None:
+    def __init__(self, *args, **kwargs) -> None:
+        if "clone_it" in kwargs:
+            self.__from_clone(kwargs["clone_it"])
+            return 
+
+        self.__from_dicts(args[0], args[1])
+
+    def __from_clone(self, clone_it: object) -> None:
+        self.__sections = clone_it.sections
+        self.__default = clone_it.default
+
+    def __from_dicts(self, default: dict, sections: dict) -> None:
         sections_proxy = dict()
 
         for key, value in sections.items():
@@ -16,10 +27,34 @@ class message:
 
         return self.get_from_section(section, key)
 
-    def get_from_default(self, key: str) -> any:
+    def key_exists(self, key: str, section: str | None = None) -> bool:
+        if section is None:
+            return key in self.default
+        
+        if not section in self.sections:
+            return False
+
+        return key in self.sections[section]
+    
+    def get_from_default(self, key: str) -> str:
+        if not key in self.__default:
+            raise KeyError("Not found key " + key + " in message.")
+
         return self.__default[key]
 
-    def get_from_section(self, section: str, key: str) -> any:
+    def get_from_section(self, section: str, key: str) -> str:
+        if not section in self.__sections:
+            raise KeyError("Not found section " + section + " in message.")
+
+        if not key in self.__sections[section]:
+            raise KeyError( \
+                "Not found key " \
+                + key \
+                + " in section " \
+                + section \
+                + " of message." \
+            )
+
         return self.__sections[section][key]
 
     @property
@@ -30,46 +65,3 @@ class message:
     def sections(self) -> types.MappingProxyType:
         return self.__sections
 
-class message_builder:
-    def __init__(self) -> None:
-        self.__sections = dict()
-        self.__default = dict()
-
-    def __add_section(self, section: str) -> object:
-        if not section in self.__sections.keys():
-            self.__sections[section] = dict()
-
-    def __set_section_key(self, section: str, key: str, value: str) -> None:
-        self.__sections[section][key] = value
-
-    def __set_default_key(self, key: str, value: str) -> None:
-        self.__default[key] = value
-
-    def set_key(
-        self, 
-        key: str, 
-        value: str, 
-        section: str | None = None
-    ) -> object:
-        key = key.strip()
-        value = value.strip()
-
-        if len(key) == 0:
-            raise RuntimeError("Key nam can nob being empty.")
-
-        if section is None:
-            self.__set_default_key(key, value)
-            return self
-
-        section = section.strip()
-        
-        if len(section) == 0:
-            raise RuntimeError("Section name can not being empty.")
-
-        self.__add_section(section)
-        self.__set_section_key(section, key, value)
-
-        return self
-
-    def build(self) -> message:
-        return message(self.__default, self.__sections)

+ 50 - 0
source/message_builder.py

@@ -0,0 +1,50 @@
+from .message import message
+
+class message_builder:
+    def __init__(self) -> None:
+        self.__sections = dict()
+        self.__default = dict()
+
+    def __add_section(self, section: str) -> object:
+        if not section in self.__sections.keys():
+            self.__sections[section] = dict()
+
+    def __set_section_key(self, section: str, key: str, value: str) -> None:
+        self.__sections[section][key] = value
+
+    def __set_default_key(self, key: str, value: str) -> None:
+        self.__default[key] = value
+
+    def set_key(
+        self, 
+        key: str, 
+        value: str, 
+        section: str | None = None
+    ) -> object:
+        key = key.strip()
+        value = value.strip()
+
+        if len(key) == 0:
+            raise RuntimeError("Key name can nob being empty.")
+
+        if section is None:
+            self.__set_default_key(key, value)
+            return self
+
+        section = section.strip()
+        
+        if len(section) == 0:
+            raise RuntimeError("Section name can not being empty.")
+
+        self.__add_section(section)
+        self.__set_section_key(section, key, value)
+
+        return self
+
+    def build(self, help_type: type | None = None) -> message:
+        result = message(self.__default, self.__sections)
+
+        if help_type is None:
+            return result
+
+        return help_type(result)

+ 15 - 0
source/message_step_builder.py

@@ -0,0 +1,15 @@
+from .message import message
+from .message_builder import message_builder
+
+class message_step_builder(message_builder):
+    def __init__(self):
+        super().__init__()
+        self.__current_section = None
+
+    def add(self, key: str, value: str) -> object:
+        return self.set_key(key, value, self.__current_section)
+   
+    def change_section(self, section: str) -> object:
+        self.__current_section = section
+        return self
+

+ 33 - 0
source/message_template.py

@@ -0,0 +1,33 @@
+from .message import message
+from .template import template
+
+class message_template(message):
+    def __init__(self, target: message) -> None:
+        super().__init__(clone_it = target)
+
+    def _import_template(self, target: template) -> None:
+        default_keys = target.default_keys
+
+        for key in default_keys:
+            name = "get_" + key
+
+            setattr(self, name, self._create_getter(key))
+
+        for section in target.sections_names:
+            for key in target.get_section_keys(section):
+                name = "get_" + key
+
+                if name in default_keys:
+                    name = "get_" + key + "_" + section
+
+                setattr(self, name, self._create_getter(key, section))
+
+    def _create_getter(self, key: str, section: str | None = None) -> object:
+        if not self.key_exists(key, section):
+            raise RuntimeError("Message not have required key " + key + ".")
+
+        return lambda: self.get_key(key, section)
+
+
+   
+

+ 26 - 0
source/position_message.py

@@ -0,0 +1,26 @@
+from .template import template
+from .message_template import message_template
+from .builder_template import builder_template
+
+class position_template(template):
+    def __init__(self) -> None:
+        super().__init__()
+
+        self._add_key("x", "move")
+        self._add_key("y", "move")
+        self._add_key("z", "move")
+
+        self._add_key("yaw", "rotate")
+        self._add_key("roll", "rotate")
+        self._add_key("pitch", "rotate")
+
+class position_message(message_template):
+    def __init__(self, target: message) -> None:
+        super().__init__(target)
+        self._import_template(position_template())
+
+class position_message_builder(builder_template):
+    def __init__(self) -> None:
+        super().__init__(position_message)
+        self._import_template(position_template())
+

+ 29 - 0
source/template.py

@@ -0,0 +1,29 @@
+class template:
+    def __init__(self) -> None:
+        self.__default = set()
+        self.__sections = dict()
+
+    def _add_key(self, key: str, section: str | None = None) -> None:
+        if section is None:
+            self.__default.add(key)
+            return
+            
+        if not section in self.__sections:
+            self.__sections[section] = set()
+
+        self.__sections[section].add(key)
+        
+    @property
+    def default_keys(self) -> set:
+        return self.__default.copy()
+
+    @property
+    def sections_names(self) -> set:
+        return set(self.__sections.keys())
+
+    def get_section_keys(self, section: str) -> set:
+        if not section in self.__sections:
+            raise RuntimeError("Section " + section + " not exists.")
+
+        return self.__sections[section].copy()
+

+ 42 - 0
tests/003-decoder.py

@@ -0,0 +1,42 @@
+import pathlib
+
+test_file = pathlib.Path(__file__)
+test_dir = test_file.parent
+project_dir = test_dir.parent
+
+import sys
+sys.path.append(str(project_dir.absolute()))
+
+import source as communication
+
+def main():
+    message = "\
+width=200\n\
+height=100\n\
+\n\
+[first]\n\
+my=iam\n\
+\n\
+[conn]\n\
+ip=dhcp\n\
+ssid=name\n\
+"
+
+    result = communication.decoder(message).process().result()
+
+    def check(key: str, value: str, section: str | None = None) -> None:
+        if result.get_key(key, section) == value:
+            print("Key " + key + " work.")
+            return
+        
+        section = "default" if section is None else section
+        print("Key " + key + " in section " + section + " NOT WORK.")
+    
+    check("width", "200")
+    check("height", "100")
+    check("my", "iam", "first")
+    check("ip", "dhcp", "conn")
+    check("ssid", "name", "conn")
+    
+if __name__ == "__main__":
+    main()

+ 28 - 0
tests/004-template.py

@@ -0,0 +1,28 @@
+import pathlib
+
+test_file = pathlib.Path(__file__)
+test_dir = test_file.parent
+project_dir = test_dir.parent
+
+import sys
+sys.path.append(str(project_dir.absolute()))
+
+import source as communication
+
+def main():
+    helper = communication.position_message_builder()
+    
+    helper.set_x("10")
+    helper.set_y("15")
+    helper.set_z("11")
+    
+    helper.set_yaw("0")
+    helper.set_roll("90")
+    helper.set_pitch("70")
+
+    result = helper.build()
+
+    print(type(result)) 
+
+if __name__ == "__main__":
+    main()