Source code for eezz.filesrv

# -*- coding: utf-8 -*-
"""
    This module implements the following classes:

    * :py:class:`eezz.filesrv.TFile`:        Takes a chunk of data and merges it to a file
    * :py:class:`eezz.filesrv.TEezzFile`:    Extends TFile and implements encryption and decryption for transmitted data
    * :py:class:`eezz.filesrv.TFileMode`:    Enum file-mode for TEezzFiles
    * :py:class:`eezz.filesrv.TFileEncryptionError`:  Detect an error during reading an encrypted file

    This module supports a download of big files in chunks and ensures, that the incoming fragments are
    put together in the correct order again. Furthermore, a hash is calculated for each chunk, so that the
    data consistency of a file could be ensured during reading.
"""
from    loguru          import logger
import  os
import  mmap
from    typing         import Any, List

from    Crypto.Hash    import SHA256
from    Crypto.Cipher  import AES
from    Crypto         import Random

from    dataclasses    import dataclass
from    pathlib        import Path
from    service        import TService
from    io             import BufferedReader
from    enum           import Enum


[docs] class TFileEncryptionError(Exception): """ Exception handling for encrypted files """ def __init__(self, message): super().__init__(message) self.message = message
[docs] class TFileMode(Enum): """ File mode: Determine how to handle incoming stream :param NORMAL: Write through :param ENCRYPT: Encrypt and write :param ENCRYPT: Decrypt and write """ NORMAL = 0 #: :meta private: ENCRYPT = 1 #: :meta private: DECRYPT = 2 #: :meta private:
[docs] @dataclass(kw_only=True) class TFile: """ TFile supports chunked file transfer. :param file_type: User defined file type :param destination: Path to store the file :param size: The size of the file :param chunk_size: Fixed size for each chunk of data, except the last element """ file_type: str #: :meta private: destination: Path #: :meta private: size: int #: :meta private: chunk_size: int #: :meta private: transferred: int = 0 #: :meta private: def __post_init__(self): self.chunk_count = divmod(self.size, self.chunk_size)[0] + 1 self.hash_chain = ['' for x in range(self.chunk_count)] self.transferred = 0 with self.destination.open('w+b') as x_output: x_output.seek(self.size-1) x_output.write(b'\x00') @property def name(self) -> str: """ Returns the file name including extension """ return self.destination.name
[docs] def write(self, raw_data: Any, sequence_nr: int, mode: TFileMode = TFileMode.NORMAL) -> str: """ Write constant chunks of raw data to file. Only the last chunk might be smaller. The sequence number is passed along, because we cannot guarantee, that elements received in the same order as they are send. :param Any raw_data: Raw chunk of data :param int sequence_nr: Sequence number to insert chunks at the right place :param TFileMode mode: Set signature for derived classes :return: Signature for derived classes """ x_offset = sequence_nr * self.chunk_size self.transferred += len(raw_data) if self.transferred > self.size: raw_data = raw_data[:self.size - self.transferred] with self.destination.open("r+b") as x_out: # Accept any chunk any time x_data_slice = raw_data[:] x_memory_map = mmap.mmap(x_out.fileno(), 0) x_memory_view = memoryview(x_memory_map) x_memory_slice = x_memory_view[x_offset: x_offset + len(raw_data)] x_memory_slice[:] = x_data_slice x_memory_slice.release() x_memory_view.release() x_memory_map.close() return ''
[docs] @dataclass(kw_only=True) class TEezzFile(TFile): """ Derived from TFile, this class allows encryption and decryption using AES key. Each chunk generates a hash, which could be collected and saved to validate the encrypted data stream. :param Crypto.Random.new(16) key: AES key for cypher :param Crypto.Random.new(16) vector: AES vector for cypher :param List[SHA256.hexdigest] hash_chain: A list of hash values for each chunk """ key: bytes #: :meta private: vector: bytes #: :meta private: hash_chain: dict = None #: :meta private: def __post_init__(self): super().__post_init__() self.cypher = AES.new(self.key, AES.MODE_CBC, self.vector) self.hash_chain = dict()
[docs] def write(self, raw_data: Any, sequence_nr: int, mode: TFileMode = TFileMode.ENCRYPT) -> str: """ Write a chunk of data :param raw_data: The data chunk to write :param sequence_nr: Sequence of the data chunk in the stream :param mode: The mode used to en- or decrypt the data or pass through :return: The hash value of the data after encryption/before decryption :rtype: SHA256.hexdigest """ if mode == TFileMode.ENCRYPT: x_hash = self.encrypt(raw_data=raw_data, sequence_nr=sequence_nr) elif mode == TFileMode.DECRYPT: x_hash = self.decrypt(raw_data=raw_data, sequence_nr=sequence_nr) else: super().write(raw_data, sequence_nr) x_hash = SHA256.new(raw_data).hexdigest() return x_hash
[docs] def read(self, source: BufferedReader, hash_list: List[str] = None) -> None: """ Read an encrypted file from source input stream and create an decrypted version :param BufferedReader source: Input stream :param List[SHA256.hexdigest] hash_list: A hash list to check the stream :raise TFileEncryptionError: """ x_sequence_nr = 0 while True: x_raw_data = source.read(self.chunk_size) x_hash = self.write(x_raw_data, x_sequence_nr, TFileMode.DECRYPT) if x_hash != hash_list[x_sequence_nr]: raise TFileEncryptionError(f'no match for hash segment #{x_sequence_nr}') if self.transferred >= self.size: break x_sequence_nr += 1
[docs] def encrypt(self, raw_data: Any, sequence_nr: int) -> str: """ Encrypt the incoming data stream :param Any raw_data: Data chunk of the stream :param int sequence_nr: Sequence number in the stream :return: Hash value of the chunk :rtype: SHA256.hexdigest """ x_stream = raw_data # for encryption all chunks have to be extended to 16-byte mod length if self.file_type == 'main': x_dimension = divmod(len(raw_data), 16) if x_dimension[1] > 0: x_stream += (16 - x_dimension[1]) * b'\x00' x_stream = self.cypher.encrypt(bytes(raw_data)) x_hash = SHA256.new(x_stream).hexdigest() self.hash_chain[sequence_nr] = x_hash super().write(x_stream, sequence_nr) return x_hash
[docs] def decrypt(self, raw_data: Any, sequence_nr: int) -> str: """ Decrypt the incoming stream :param Any raw_data: Data chunk of the steam :param int sequence_nr: Sequence number in the stream :return: Hash value of the chunk :rtype: SHA256.hexdigest """ x_stream = raw_data x_hash = SHA256.new(raw_data).hexdigest() self.hash_chain[sequence_nr] = x_hash if self.file_type == 'main': x_stream = self.cypher.decrypt(bytes(raw_data)) super().write(x_stream, sequence_nr) return x_hash
# --- Section for module tests
[docs] def test_file_reader(): """ Test the TFile interfaces :meta private: """ # Read a file and test the TFile downloader x_source = Path(TService().root_path) / 'testdata/bird.jpg' x_stat = os.stat(x_source) x_size = x_stat.st_size x_dest = TService().document_path / x_source.with_suffix('.eezz').name try: x_dest.unlink() except FileNotFoundError: pass print(f'{x_source} --> {x_dest}') x_file = TFile(destination=x_dest, size=x_size, chunk_size=1024, file_type='doc') with x_source.open('rb+') as x_input: x_sequence = 0 while True: x_chunk = x_input.read(1024) if len(x_chunk) == 0: break x_file.write(raw_data=x_chunk, sequence_nr=x_sequence) x_sequence += 1
def test_eezzfile_reader(): """ Test the TEezzFile(TFile) interfaces TEezzFile.write would encrypt the file while TEezzFile.read does the decryption using key and vector variables :meta private: """ x_source = Path(TService().root_path) / 'testdata/bird.jpg' x_stat = os.stat(x_source) x_size = x_stat.st_size x_dest = TService().document_path / x_source.with_suffix('.crypt').name x_decr = TService().document_path / x_source.name x_key = Random.new().read(AES.block_size) x_vector = Random.new().read(AES.block_size) try: x_dest.unlink() except FileNotFoundError: pass logger.debug(f'--- Test TEezzFile: {x_source} --> {x_dest}, file-type=main') x_file = TEezzFile(destination=x_dest, size=x_size, chunk_size=1024, file_type='main', key=x_key, vector=x_vector) with x_source.open('rb+') as x_input: x_sequence = 0 while True: x_chunk = x_input.read(1024) if len(x_chunk) == 0: break x_file.write(raw_data=x_chunk, sequence_nr=x_sequence) x_sequence += 1 logger.debug(f'hash-list = {x_file.hash_chain}') logger.debug(f'--- Test TEezzFile: {x_dest} --> {x_decr}, file-type=main') x_file = TEezzFile(destination=x_decr, size=x_size, chunk_size=1024, file_type='main', key=x_key, vector=x_vector) with x_dest.open('rb+') as x_input: x_sequence = 0 while x_chunk := x_input.read(1024): x_file.write(raw_data=x_chunk, sequence_nr=x_sequence, mode=TFileMode.DECRYPT) x_sequence += 1 logger.debug(f'hash-list = {x_file.hash_chain}') logger.success('test TFile') if __name__ == '__main__': """ :meta private: """ x_service = TService.set_environment(root_path=r'C:\Users\alzer\Projects\github\eezz_full\webroot') x_log_path = x_service.logging_path / 'app.log' test_file_reader() test_eezzfile_reader()