| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 | import typingimport sqlmodelimport sqlmodel.sql._expression_select_clsfrom .product import productfrom .product import product_factoryfrom .validator import barcode_validatorfrom .validator import name_validatorfrom .validator import description_validatorfrom .validator import author_validatorfrom .exception import validator_exceptionfrom .exception import exists_exceptionclass product_loader(sqlmodel.Session):    """    This is loader for product. It help in work with product database, and    manage of the product of course.    """    def get_by_barcode(self, target: str) -> product | None:        """        It load product by barcode. When product with given barcode not         exists, return None. It also validate barcode, and when it is not         property, raise validation exception.        Parameters:            target (str): Barcode of product to load        Returns:            (product): Product with given barcode from database            (None): When product not exists        """        if barcode_validator(target).invalid:            raise validator_exception("get.barcode")        query = self.__select.where(product.barcode == target)        result = self.exec(query)        return result.first()    def get_by_name(self, target: str) -> product | None:        """        It load product by name. When product with given name not exists,        returns None. It also validate name, and if it is not correct,        raise validation_exception.        Parameters:            target (str): Name of the product to load        Returns:            (product): Product with given name             (None): When product with that name not exists        """        if name_validator(target).invalid:            raise validator_exception("get.name")        query = self.__select.where(product.name == target)        result = self.exec(query)        return result.first()    def search_by_name(self, target: str) -> typing.Iterable[product]:        """        This search by name. To start searching phrase with could be valid        name must be given, that mean it could not be too short or too long.        If phrase is not valid, then validator_exception is raised.        Parameters:            target (str): Phrase to search in names        Returns:            (Iterable[product]): Generator with products found by name        """                if name_validator(target).invalid:            raise validator_exception("search.name")        target = "%" + target + "%"        query = self.__select.where(product.name.like(target))        result = self.exec(query)        for item in result:            yield item    def search_by_author(self, target: str) -> typing.Iterable[product]:        """        This search products by author. Author must be valid, when it is not         then validation_exception would be raised.        Parameters:            target (str): Author to search product for        Returns:            (Iterable[product]): Generator with products with author        """                if author_validator(target).invalid:            raise validator_exception("search.author")        target = "%" + target + "%"        query = self.__select.where(product.author.like(target))        result = self.exec(query)        for item in result:            yield item    def load_all(self) -> typing.Iterable[product]:        """        This load all products from database.        Returns:            (Iterable[produdct]): Generator with products from database        """        query = self.__select        result = self.exec(query)        for item in result:            yield item    def barcode_in_use(self, target: str) -> bool:        """         It check that barcode is already in use by any product.        Parameters:            target (str): Barcode to check        Returns:            (bool): True when item exists, false when not        """        return self.get_by_barcode(target) is not None    def name_in_use(self, target: str) -> bool:        """        It check that name is already in use by any element.        Parameters:            target (str): Name to check that is in use        Returns:            (bool): True when name in use, False when not            """        return self.get_by_name(target) is not None    def __append(self, target: product) -> bool:        """        This insert new product into database, when it is not stored yet. It        check that item is ready, also barcode and name can not be in use.        Parameters:            target (product): Product to insert into database        Returns:            (bool): True when success, False when not                """        if self.barcode_in_use(target.barcode):            raise exists_exception("barcode")                if self.name_in_use(target.name):            raise exists_exception("name")        self.add(target)        self.commit()        self.refresh(target)        return True    def store(self, target: product) -> bool:        """        This save product into database. When product is not in database yet,        then it inserting. If product is in database, then updating it. It         check that new barcode and name is not already in use by another         product.        Parameters:             target (product): Product to save or insert        Returns:                (bool): True when save success, False when not        """        if not target.ready:            raise RuntimeError("Target is not ready yet.")        if not target.in_database:            return self.__append(target)        name_use = self.get_by_name(target.name)        if name_use is not None and name_use.id != target.id:            raise exists_exception("name")        barcode_use = self.get_by_barcode(target.barcode)        if barcode_use is not None and barcode_use.id != target.id:            raise exists_exception("barcode")        try:            target.sqlmodel_update(target.model_dump(exclude = "id"))            self.commit()            self.refresh(target)            return True               except:           return False    def drop(self, target: product) -> bool:        """        This remove product from database. Product must exists, to delete         it from storage.        Parameters:             target (product): Item to delete        Returns:            (bool): True when deleted successfull, False if not        """        if not target.in_database:            return False        try:            for count in target.reservations:                self.delete(count)            self.delete(target)            self.commit()            return True        except:            return False    @property    def __select(self) -> sqlmodel.sql._expression_select_cls.SelectOfScalar:        """ New product selector. """                return sqlmodel.select(product)
 |