# -*- coding: utf-8 -*-
"""
This module implements the following classes:
* **TService**: A singleton for TGlobalService
* **TServiceCompiler**: A Lark compiler for HTML EEZZ extensions
* **TTranslate**: Extract translation info from HTML to create a POT file
* **TQuery**: Class representing the query of an HTML request
"""
from dataclasses import dataclass
from loguru import logger
import re
import itertools
import json
import sys
from bs4 import Tag, BeautifulSoup
from pathlib import Path
from importlib import import_module, reload
from lark import Lark, Transformer, Tree, UnexpectedInput
from lark.exceptions import UnexpectedCharacters
from typing import Any, TypeVar
from Crypto.PublicKey import RSA
T = TypeVar('T')
[docs]
class TService:
""" Container for unique environment as
* path descriptors
* stores assignments of the parser
* stores RSA application key
"""
_mod = int("C5F23FA172317A1F6930C0F9AF79FF044D34BFD1336E5A174155953487A4FF0C744A093CA7044F39842AC685AB37C55F1F01F0055561BAD9C3EEA22B28D09F061875ED5BDB2F1F2B797B1BEF6534C0D4FCEFAFFA8F3A91396961165241564BD6E3CA08023F2A760A0B54A4A6A996CDF7DE3491468C199566EE5993FCFD03A2B285AD6FBBC014A20C801618EE19F88EB8E6359624A35FDD7976F316D6AB225CF85DA5E63AB30248D38297A835CF16B9799973C2F9F05F5F850B3152B3A05F06FEC0FBDA95C70911F59F6A11A1451822ABFE4FE5A021F7EA983BDE9F442302891DCF51B7322EAFB88950F2617B7120F9B87534719DCA27E87D82A183CB37BC7045", 16)
_exp = int("10001", 16)
_private_key: RSA.RsaKey = None #: :meta private:
_public_key: RSA.RsaKey = None #: :meta private:
_root_path: Path = None #: :meta private: Root path for the HTTP server
_resource_path: Path = None #: :meta private:
_global_objects: dict = None #: :meta private:
_host: str = 'localhost' #: :meta private:
_websocket_addr: str = '8100' #: :meta private:
_translate: bool = False #: :meta private:
@property
def private_key(self) -> RSA.RsaKey:
""" :meta private: """
if not TService._private_key:
TService._private_key = RSA.construct((TService._mod, TService._exp))
return TService._private_key
@property
def public_key(self) -> RSA.RsaKey:
""" :meta private: """
if not TService._public_key:
TService._public_key = self.private_key.public_key()
return TService._public_key
@property
def root_path(self) -> Path:
""" :meta private: """
if not TService._root_path:
TService._root_path = Path.cwd()
return TService._root_path
@property
def resource_path(self) -> Path:
""" :meta private: """
return self.root_path / 'resources'
@property
def public_path(self) -> Path:
""" :meta private: """
return self.root_path / 'public'
@property
def application_path(self) -> Path:
""" :meta private: """
return self.root_path / 'applications'
@property
def document_path(self) -> Path:
""" :meta private: """
return self.root_path / 'database'
@property
def database_path(self) -> Path:
""" :meta private: """
return self.document_path / 'eezz.db'
@property
def locales_path(self) -> Path:
""" :meta private: """
return self.resource_path / 'locales'
@property
def logging_path(self) -> Path:
""" :meta private: """
return self.root_path / 'logs'
@property
def host(self) -> str:
""" :meta private: """
return TService._host
@property
def websocket_addr(self) -> str:
""" :meta private: """
return TService._websocket_addr
@property
def objects(self) -> dict:
""" :meta private: """
if not TService._global_objects:
TService._global_objects = dict()
return TService._global_objects
@property
def translate(self):
""" :meta private: """
return TService._translate
@classmethod
def set_environment(cls, root_path: str, host: str = 'localhost', address: str = '8000'):
""" :meta private: """
cls._root_path = Path(root_path).absolute()
cls._host = host
cls._websocket_addr = address
# application_path = cls._root_path / 'applications'
# sys.path.append(application_path.as_posix())
[docs]
def get_method(self, obj_id: str, a_method_name: str) -> tuple:
""" Get a method by name for a given object
:param str obj_id: Unique hash-ID for object as stored in :py:meth:`eezz.service.TService.assign_object`
:param str a_method_name: Name of the method
:return: tuple(object, method, parent-tag)
:raise AttributeError: Class has no method with the given name
"""
try:
x_object, x_tag, x_descr = self.objects[obj_id]
x_method = getattr(x_object, a_method_name)
return x_object, x_method, x_tag, x_descr
except AttributeError as x_except:
logger.exception(x_except)
raise x_except
[docs]
def assign_object(self, obj_id: str, description: str, attrs: dict, a_tag: Tag = None, force_reload: bool = False) -> None:
""" _`assign_object` Assigns an object to an HTML tag
:param str obj_id: Unique object-id
:param str description: Path to the class: <directory>.<module>.<class>
:param dict attrs: Attributes for the constructor
:param bs4.Tag a_tag: Parent tag which handles an instance of this object
:param bool force_reload: Force reloading
:raise AttributeError: Class not found
:raise IndexError: description systax does not match
"""
try:
x_list = description.split('.')
x, y, z = x_list[0], x_list[1], x_list[2]
except IndexError as x_except:
logger.exception(x_except)
raise x_except
x_path = self.application_path / x
if not str(x_path) in sys.path:
sys.path.append(str(x_path))
try:
x_module = import_module(y)
if force_reload:
x_module = reload(x_module)
x_class = getattr(x_module, z)
x_object = x_class(**attrs) if attrs else x_class()
self.objects.update({obj_id: (x_object, a_tag, description)})
logger.debug(f'assign {obj_id} {x}/{y}/{z}')
except AttributeError as x_except:
logger.exception(x_except)
[docs]
def get_object(self, obj_id: str) -> Any:
""" Get the object for a given ID
:param str obj_id: Unique hash-ID for object as stored in :func:`eezz.service.TGlobalService.assign_object`
:return: The assigned object
"""
x_object, x_tag, x_descr = self.objects[obj_id]
return x_object
[docs]
def get_tag_ref(self, obj_id: str) -> Any:
""" Get Tag and descriptor for a given object ID
:param str obj_id: Object ID
:return: Tag and descriptor
:rtype: dict
"""
x_object, x_tag, x_descr = self.objects[obj_id]
return x_tag, x_descr
[docs]
class TServiceCompiler(Transformer):
""" Transforms the parser tree into a list of dictionaries
The transformer output is in json format
:param bs4.Tag a_tag: The parent tag
:param str a_id: A unique object id
:param dict a_query: The URL query part
"""
def __init__(self, a_tag: Tag, a_id: str = '', a_query: dict = None):
super().__init__()
self.m_id = a_id
self.m_tag = a_tag
self.m_query = a_query
self.m_service = TService()
@staticmethod
def simple_str(item):
""" :meta private: Parse a string token """
return ''.join([str(x) for x in item])
@staticmethod
def escaped_str(item):
""" :meta private: Parse an escaped string """
return ''.join([x.strip('"') for x in item])
@staticmethod
def qualified_string(item):
""" :meta private: Parse a qualified string: ``part1.part2.part3`` """
return '.'.join([str(x) for x in item])
@staticmethod
def selector_string(item):
""" :meta private: """
return f'[{'.'.join(item)}]'
@staticmethod
def array_element(item):
""" :meta private: """
return item
def set_style(self, item):
""" :meta private: """
x_style_dict = dict()
if x_style := self.m_tag.attrs.get('style'):
x_style_dict = {y[0]: y[1] for y in [x.split(':') for x in x_style.strip(';').split(';')]}
x_style_dict.update({item[0]: item[1]})
self.m_tag.attrs['style'] = ';'.join([f'{x}:{y}' for x, y in x_style_dict.items()])
return {}
@staticmethod
def setenv(item):
""" :meta private: """
return {item[0]: item[1]}
@staticmethod
def list_updates(item):
""" :meta private: Accumulate 'update' statements """
return list(itertools.accumulate(item, lambda a, b: a | b))[-1]
@staticmethod
def list_arguments(item):
""" :meta private: Accumulate arguments for function call """
return list(itertools.accumulate(item, lambda a, b: a | b))[-1]
def onload_section(self, item):
""" :meta private: """
self.m_tag['data-eezz-onload'] = 'eezzy_onload(this)'
return {'onload': item[0]}
@staticmethod
def update_section(item):
""" :meta private: Parse 'update' section """
return {'update': item[0]}
@staticmethod
def update_item(item):
""" :meta private: Parse 'update' expression"""
return {item[0]: item[1]} if len(item) == 2 else {item[0]: item[0]}
@staticmethod
def update_task(item):
""" :meta private: """
x_function, x_args = item[0].children
return {'call': {'function': x_function, 'args': x_args}}
def update_function(self, item):
""" :meta private: """
x_function, x_args = item[1].children
return {item[0]: {'function': x_function, 'args': x_args, 'id': self.m_id}}
@staticmethod
def assignment(item):
""" :meta private: Parse 'assignment' expression: ``variable = value`` """
return {item[0]: item[1]}
@staticmethod
def format_string(item):
""" :meta private: Create a format string: ``{value}`` """
return f'{{{".".join(item)}}}'
@staticmethod
def format_value(item):
""" :meta private: Create a format string: ``{key.value}`` """
return f'{{{".".join([str(x[0]) for x in item[0].children])}}}'
def template_section(self, item):
""" :meta private: Create tag attributes """
template = {'data-eezz-template': item[0]}
self.m_tag['data-eezz-template'] = item[0]
if len(item) > 1:
template['data-eezz-reference'] = item[1]
self.m_tag['data-eezz-reference'] = item[1]
return template
def parameter_section(self, item):
""" :meta private: Create tag attributes """
if item[0] in ('name', 'match', 'file', 'progress', 'type'):
self.m_tag[f'data-eezz-{item[0]}'] = item[1]
return {item[0]: item[1]}
if item[0] in 'format' and item[1] in ('br', 'p'):
self.m_tag[f'data-eezz-{item[0]}'] = '<br>' if item[1] == 'br' else '</p><p>'
return {item[0]: item[1]}
if item[0] in 'process' and item[1] in 'sync':
return {item[0]: item[1]}
raise UnexpectedInput(f'parameter section: {item[0]} = {item[1]}')
# return {item[0]: item[1]}
def funct_assignment(self, item):
""" :meta private: Parse 'function' section """
x_function, x_args = item[0].children
self.m_tag['onclick'] = 'eezzy_click(event, this)'
return {'call': {'function': x_function, 'args': x_args, 'id': self.m_id}}
def post_init(self, item):
""" :meta private: Parse 'post-init' section for function assignment """
x_method_name, x_method_args = item[0].children
x_obj, x_method, x_tag, x_descr = TService().get_method(self.m_id, x_method_name)
x_method(**x_method_args) if x_method_args else x_method()
return {'oninit': 'done'}
def table_assignment(self, item):
""" :meta private: Parse 'assign' section, assigning a Python object to an HTML-Tag
The table assignment uses TQuery to format arguments
In case the arguments are not all present, the format is broken and process continues with default """
x_function, x_args = item[0].children
try:
x_query = TQuery(self.m_query)
x_args = {x_key: x_val.format(query=x_query) for x_key, x_val in x_args.items()} if x_args else {}
except AttributeError as x_except:
logger.error(f'table_assignment {x_except}: {x_function}, {x_args}')
TService().assign_object(self.m_id, x_function, x_args, self.m_tag)
return {'assign': {'function': x_function, 'args': x_args, 'id': self.m_id}}
[docs]
class TTranslate:
""" The class TTranslate executes the EEZZ grammar and translates the input to a JSON object """
@staticmethod
def generate_pot(a_soup, a_title):
""" :meta private: Generate a POT file from HTML file
:param a_soup: The HTML page for translation
:param a_title: The file name for the POT file
"""
" todo: argument a_tile must be a a simple string "
x_regex = re.compile('[a-zA-Z0-9]+')
x_group = x_regex.findall(a_title)
if not x_group:
logger.error(f'Cannot create file {a_title}')
try:
x_pot_file = TService().locales_path / f'{x_group[0]}.pot'
x_elements = a_soup.find_all(lambda x_tag: x_tag.has_attr('data-eezz-i18n'))
x_path_hdr = TService().locales_path / 'template.pot'
with x_pot_file.open('w', encoding='utf-8') as f:
with x_path_hdr.open('r', encoding='utf-8') as f_hdr:
f.write(f_hdr.read())
for x_elem in x_elements:
f.write(f"msgid \"{x_elem['data-eezz-i18n']}\"\n"
f"msgstr \"{[str(x) for x in x_elem.descendants]}\"\n\n")
except FileNotFoundError as x_except:
logger.exception(x_except)
raise x_except
# @dataclass(kw_only=True)
class TQuery:
""" :meta private: Transfer the HTTP query to class attributes
:param query: The query string in dictionary format
"""
def __init__(self, query: dict):
if query:
for x_key, x_val in query.items():
setattr(self, x_key, ','.join(x_val))
# --- Section for module tests
def test_parser(source: str) -> json:
""" :meta private: """
html = '<td style="font-size: .8em; font-family: monospace; background-color: rgb(244, 244, 244);"></td>'
html = """ <area shape="rect" coords=" 0, 0, 40, 20" data-eezz="event: navigate(where_togo = 3), update: this.tbody"/>"""
# Create soup from html
soup = BeautifulSoup(html, 'html.parser')
x_parent_tag = soup.area
x_parser = Lark.open(str(TService().resource_path / 'eezz.lark'))
try:
x_syntax_tree = x_parser.parse(source)
x_transformer = TServiceCompiler(x_parent_tag, 'Directory')
x_list_json = x_transformer.transform(x_syntax_tree)
logger.debug(x_parent_tag.prettify())
if type(x_list_json) is Tree:
x_result = list(itertools.accumulate(x_list_json.children, lambda a, b: a | b))[-1]
logger.debug(x_result)
return x_result
else:
x_res = dict()
for x_key, x_val in x_list_json.items():
x_result = x_val
# if type(x_val) is Tree:
# x_result = list(itertools.accumulate(x_val.children, lambda a, b: a | b))[-1]
# x_res[x_key] = x_result
logger.debug(x_list_json)
return x_list_json
except UnexpectedCharacters as x_ex:
logger.error(f'invalid expression: parent-tag={x_parent_tag.name}, position={x_ex.pos_in_stream}', stack_info=True, stacklevel=3)
raise x_ex
@dataclass
class TestRow:
""":meta private:"""
path: str = 'test/path/file'
row_id: int = 100
def test_parser_area():
""":meta private:"""
source = """ format: p, event: navigate(where_togo = 3), update: this.tbody """
test_parser(source=source)
if __name__ == '__main__':
""" :meta private: """
# test parser
TService.set_environment(root_path='/Users/alzer/Projects/github/eezz_full/webroot')
logger.debug(f'{TService().resource_path=}')
logger.debug("Test Lark Parser")
test_parser_area()
logger.debug("assign statement")
x_source = """assign: examples.directory.TDirView(title="", path="/Users/alzer/Projects/github/eezz_full/webroot")"""
test_parser(source=x_source)
logger.debug("update statement 1")
x_source = """ event: on_select(row={row.row_id}), update: elem1.innerHTML = {object.path} """
test_parser(source=x_source)
logger.debug("update statement 2")
x_source = """ event: on_select(row={row.row_id}), update: elem1.innerHTML = {object.path}, elem2.innerHTML = {object.row_id} """
x_result = test_parser(source=x_source)
logger.debug(x_result)
x_source = 'name: directory, assign: examples.directory.TDirView(path=".", title="dir"), process:sync'
x_result = test_parser(source=x_source)
logger.debug(x_result)
x_source = "event: FormInput.append(table_row = [field_index.value]), reference: cell.title"
x_result = test_parser(source=x_source)
logger.debug(x_result)
x_source = """
template: cell (main),
onload: this.src = read_file(document_title={cell.attrs},file_name={cell.value}) """
x_result = test_parser(source=x_source)
logger.debug(f'{x_source} ==> {x_result}')
logger.success('test finished')
"""
# test parser exception and logging
logger.debug(msg="Test the parser: wrong download statement:")
logger.debug(msg="download: files(name=test1, author=albert), documents( main=main, prev=prev )")
try:
test_parser(source=""download: files(name=test1, author=albert), documents( main=main, prev=prev )"")
except UnexpectedCharacters as xx_except:
logger.error(msg='Test parser exception successful', stack_info=True)
"""