Taiga Develop 1 week ago
parent
commit
a8212ef51a

+ 1 - 1
source/__init__.py

@@ -9,4 +9,4 @@ from .message_decoder import message_decoder
 from .message_receiver import message_receiver
 from .position_message import position_message
 from .state_message import state_message
-
+from .control_message import control_message

+ 1 - 1
source/control_message.py

@@ -1,7 +1,7 @@
 from .field import field
 from .message import message
 
-class state_message(message):
+class control_message(message):
     _type = 0x0012
     
     def __init__(self) -> None:

+ 1 - 1
source/cutter.py

@@ -6,7 +6,7 @@ class cutter:
         if count == 0:
             raise RuntimeError("Cutter is already empty.")
 
-        if count > len(self.size):
+        if count > self.size:
             raise RuntimeError("Count is higher than size.")
         
     def from_start(self, count: int) -> bytes:

+ 1 - 1
source/message.py

@@ -27,7 +27,7 @@ class message:
 
         title = target.title
         setter = lambda content: target.set(content)
-        setter = lambda : target.get()
+        getter = lambda : target.get()
 
         setattr(self, "set_" + title, setter)
         setattr(self, "get_" + title, getter)

+ 9 - 6
source/message_decoder.py

@@ -23,18 +23,21 @@ class message_decoder(message_coder):
         return result
 
     def fill(self, target: message) -> object:
+        if self.extract_type() != target.get_type():
+            raise TypeError("Received message ID is not correct.")
+
         handler = cutter(self.__content)
+        
+        if target.encoder().size != handler.size:
+            raise ValueError("Received content lenght is not correct")
+        
         crc = handler.cut(self.hash_size())
 
         if not self.__check_crc(handler.content, crc):
             raise ConnectionError("Message CRC checksum is not correct.")
 
-        type_bytes = handler.trim(self.type_size())
-        type_id = int.from_bytes(type_bytes)
-
-        if type_id != target.get_type():
-            raise TypeError("Received message ID is not correct.")
-
+        handler.trim(self.type_size())
+        
         for count in target.fields:
             field_bytes = handler.trim(count.size)
             count.decoder().load(field_bytes)

+ 9 - 1
source/message_encoder.py

@@ -1,7 +1,13 @@
+from .message_coder import message_coder 
+
 class message_encoder(message_coder):
     def __init__(self, target: message) -> None:
         self.__target = target
 
+    @property
+    def size(self) -> int:
+        return self.__target.size + self.hash_size() + self.type_size()
+
     def code(self) -> bytes:
         result = self.__target.get_type().to_bytes(self.type_size())
 
@@ -10,5 +16,7 @@ class message_encoder(message_coder):
 
         result += self._get_hash(result)
         return result
-
+    
+    def __len__(self) -> int:
+        return self.size
 

+ 9 - 0
source/message_receiver.py

@@ -9,6 +9,15 @@ class message_receiver:
         if not issubclass(target, message):
             raise RuntimeError("Can not register type which is not message.")
 
+        if not target.get_type() in self.__types.keys():
+            self.__types[target.get_type()] = target
+            return self
+
+        if self.__types[target.get_type()] != target:
+            raise RuntimeError("That ID already own by other message.")
+
+        return self
+
         self.__types[target.get_type()] = target
         return self
 

+ 29 - 0
tests/001-field.py

@@ -0,0 +1,29 @@
+import pathlib
+
+test_file = pathlib.Path(__file__)
+test_dir = test_file.parent
+project_dir = test_dir.parent
+
+import sys
+sys.path.append(str(project_dir.absolute()))
+
+import source as communication
+
+def main():
+    print("Starting test.")
+
+    sample = communication.field("move_x", int)
+    sample.set(255)
+
+    print("Sample is: " + str(sample.get()))
+    print("Sample as bytes: " + str(sample.encoder().code()))
+
+    encoded = sample.encoder().code()
+
+    sample_2 = communication.field("move_x", int)
+    sample_2.decoder().load(encoded)
+
+    print("Sample 2 (decoded sample): " + str(sample_2.get()))
+
+if __name__ == "__main__":
+    main()

+ 91 - 0
tests/002-message.py

@@ -0,0 +1,91 @@
+import pathlib
+
+test_file = pathlib.Path(__file__)
+test_dir = test_file.parent
+project_dir = test_dir.parent
+
+import sys
+sys.path.append(str(project_dir.absolute()))
+
+import source as communication
+
+class sample_message(communication.message):
+    _type = 0x0010
+
+    def __init__(self) -> None:
+        super().__init__()
+
+        self._add_field(communication.field("x_axis", int, 2))
+        self._add_field(communication.field("y_axis", int, 2))
+        self._add_field(communication.field("z_axis", int, 2))
+
+def main():
+    print("Starting test.")
+    
+    print("Sample message size: " + str(sample_message().size))
+    print("Setting test message.")
+
+    testing = sample_message()
+    testing.set_x_axis(10)
+    testing.set_y_axis(20)
+    testing.set_z_axis(30)
+
+    print("Encoding.")
+
+    encoded = testing.encoder().code()
+
+    print("Encoded lenght: " + str(len(encoded)))
+    print("Encoded: " + str(encoded))
+  
+    print("Decoding.")
+
+    decoded = communication.message_decoder(encoded).build(sample_message)
+
+    print("Checking.")
+
+    try:
+        if decoded.get_x_axis() != 10:
+            raise ValueError("X asis not validated.")
+
+        if decoded.get_y_axis() != 20:
+            raise ValueError("Y asis not validated.")
+
+        if decoded.get_z_axis() != 30:
+            raise ValueError("Z asis not validated.")
+
+        print("Validated property.")
+
+    except ValueError as error:
+        print("Not validated, error: " + str(error))
+
+    print("Checking decoder validation.")
+    print("Adding errors to coded message.")
+
+    too_long = encoded + bytes(10)
+    with_bad_id = int(2).to_bytes(2) + encoded[2:]
+    broken_message = encoded[0:5] + int(2).to_bytes(1) + encoded[6:]
+
+    try:
+        print("Testing too long message.")
+        communication.message_decoder(too_long).build(sample_message)
+    except Exception as error:
+        print("Work, raised: " + str(error))
+
+    try:
+        print("Testing message with bad id.")
+        communication.message_decoder(with_bad_id).build(sample_message)
+    except Exception as error:
+        print("Work, raised: " + str(error))
+
+    try:
+        print("Testing message with broken content")
+        communication.message_decoder(broken_message).build(sample_message)
+    except Exception as error:
+        print("Work, raised: " + str(error))
+
+
+
+
+
+if __name__ == "__main__":
+    main()

+ 68 - 0
tests/004-message_receiver.py

@@ -0,0 +1,68 @@
+import pathlib
+
+test_file = pathlib.Path(__file__)
+test_dir = test_file.parent
+project_dir = test_dir.parent
+
+import sys
+sys.path.append(str(project_dir.absolute()))
+
+import source as communication
+
+class fail_message(communication.message):
+    _type = 0x0012
+
+    def __init__(self) -> None:
+        super().__init__()
+
+        self._add_field(communication.field("test", int, 1))
+
+def main():
+    print("Starting test.")
+
+    print("Registering base messages.")
+
+    receiver = communication.message_receiver()
+
+    receiver.register(communication.position_message)
+    receiver.register(communication.state_message)
+    receiver.register(communication.control_message)
+
+    print("Trying to register bad message.")
+
+    try:
+        receiver.register(fail_message)
+
+    except Exception as error:
+        print("Work, raised: " + str(error))
+
+    print("Creating sample message to receive.")
+
+    sample = communication.state_message()
+    sample.set_ignition(True)
+    sample.set_leds(0x20)
+
+    coded = sample.encoder().code()
+
+    print("Encoded.")
+    print("Trying to receive it.")
+
+    received = receiver.receive(coded)
+
+    print("Type of received message: " + str(type(received)))
+    print("Checking fields:")
+
+    try:
+        if received.get_ignition() != True:
+            raise RuntimeError("Bad decoded ignition field.")
+
+        if received.get_leds() != 0x20:
+            raise RuntimeError("Bad decoded leds field.")
+
+        print("All fields are good.")
+
+    except Exception as error:
+        print("Can not check, error: " + str(error))
+
+if __name__ == "__main__":
+    main()