"""A file handler for generic XML files."""
# pylint: disable = too-many-lines
from __future__ import annotations
from os import walk
from os.path import join, dirname, basename, relpath, exists, normpath
from os.path import splitext, isdir
from re import search as re_search, findall as re_findall, sub as re_sub
from tempfile import mkdtemp
from shutil import rmtree
from json import loads
from lxml import etree
from pygments import highlight
from pygments.lexers.html import XmlLexer
from pygments.formatters.html import HtmlFormatter
from chrysalio.lib.utils import normalize_spaces, convert_value
from chrysalio.lib.utils import tounicode, tostr, shorten
from chrysalio.lib.utils import mimetype_get
from chrysalio.lib.form import Form, get_action
from chrysalio.lib.xml import load_xml2, validate_xml, load_xslt
from chrysalio.lib.xml import relaxng4validation
from chrysalio.helpers.literal import Literal
from chrysalio.includes.themes import theme_static_prefix
from cioservice.lib.utils import location2abs_base
from ciowarehouse.lib.utils import EXCLUDED_FILES, THUMBNAILS_DIR
from ciowarehouse.lib.utils import THUMBNAIL_LARGE, THUMBNAIL_SMALL, HERE
from ciowarehouse.lib.utils import MIMETYPES_DIR, NOT_FOUND, apply_regex
from ciowarehouse.lib.utils import html2thumbnails
from ciowarehouse.lib.handler import Handler
from ...lib.i18n import _, translate
from ...lib.utils import special_protect, special_unprotect
MEDIA_HTTP_NOTFOUND = {
'image': '/cioxml/images/notfound.jpg',
'audio': '/cioxml/audios/notfound.ogg',
'video': '/cioxml/videos/notfound.mp4'}
MEDIA_FILE_NOTFOUND = {
'image': normpath(join(
dirname(__file__), '..', '..', 'Static', 'Images', 'notfound.jpg')),
'audio': normpath(join(
dirname(__file__), '..', '..', 'Static', 'Audios', 'notfound.ogg')),
'video': normpath(join(
dirname(__file__), '..', '..', 'Static', 'Videos', 'notfound.mp4'))}
IMAGE_WEB_EXTENSIONS = ('.png', '.jpg', '.jpeg', '.svg', '.gif')
MEDIA_EXTENSIONS = {
'image': ('',) + IMAGE_WEB_EXTENSIONS + (
'.tif', '.tiff', '.eps', '.bmp', '.psd', '.ai'),
'audio': ('', '.ogg', '.wav', '.mp3', '.m4a'),
'video': ('', '.ogv', '.webm', '.mp4')}
ATTRIBUTE_NS_HTML = ' xmlns="http://www.w3.org/1999/xhtml"'
THUMBNAIL_FRAME = """<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/xhtml" lang="en" xml:lang="en">
<head>
<meta charset="utf-8"/>
<title>XML</title>
<link rel="StyleSheet" href="{css}"
type="text/css"/>
</head>
<body id="cioFileThumbnail" class="cioHandler-{uid} cioRendering-thumbnail">
{content}
</body>
</html>
"""
EDIT_CXE_FRAME = """
<div class="cxeZone">
<div id="cxeToolbar"><div id="cxeMenu"></div></div>
<div id="cxeWrapper">
<div id="cxeViewZone"><div id="cxeView" class="{class_}"></div></div>
<div id="cxeTrail"></div>
</div>
<textarea id="cxeSource" name="content"{params}>{content}</textarea>
{aside}
</div>
"""
EDIT_XML_FRAME = '<textarea id="cmCode" name="content">{0}</textarea>'
HANDLER_CSS = ('/cioxml/css/handler.css',)
HANDLER_XML_CSS = ('/cioxml/css/handler_xml.css',)
CODEMIRROR_CSS = (
'/ciowarehouse/css/codemirror.css', '/ciowarehouse/css/show-hint.css',
'/cioxml/css/handler_xml.css')
KATEX_CSS = ('{katex_css}',)
KATEX_CSS_CDN = 'https://cdn.jsdelivr.net/npm/katex@0.16.4/dist/katex.min.css'
KATEX_CSS_LOCAL = '/ciokatex/css/katex.css'
KATEX_JS = ('{katex_js}',)
KATEX_JS_CDN = 'https://cdn.jsdelivr.net/npm/katex@0.16.4/dist/katex.min.js'
KATEX_JS_LOCAL = '/ciokatex/js/katex.js'
CIOKATEX_JS = ('/cioxml/js/ciokatex.js',)
# =============================================================================
[docs]
def includeme(configurator):
"""Function to include a CioWarehouse handler.
:type configurator: pyramid.config.Configurator
:param configurator:
Object used to do configuration declaration within the application.
"""
Handler.register(configurator, HandlerXml)
# =============================================================================
[docs]
class HandlerXml(Handler):
"""Class to manage a generic XML file.
``viewings`` and ``editings`` dictionaries have the following additional
keys:
* ``'xsl'``: XSL file to transform the XML file
* ``'image_paths'``: list of paths to dynamically find images
* ``'image_extensions'``: list of authorized extensions for images
* ``'audio_paths'``: list of paths to dynamically find audios
* ``'audio_extensions'``: list of authorized extensions for audio
* ``'video_paths'``: list of paths to dynamically find videos
* ``'video_extensions'``: list of authorized extensions for videos
The three last fields can contain the following tag:
* ``{directory}``: path of the directory containing the input file
"""
uid = 'xml'
label = _('Generic XML file handling')
extensions: tuple = ('.xml', '.rng', '.onix')
viewings: tuple = (
{'name': 'xml', 'label': _('XML'),
'template': 'ciowarehouse:Templates/handler_layout_view.pt',
'css': ('/cioxml/css/handler_xml.css',),
'js': None,
'xsl': None,
'image_paths': (), 'image_extensions': (),
'audio_paths': (), 'audio_extensions': (),
'video_paths': (), 'video_extensions': ()},)
editings: tuple = ()
thumbnailing: dict = {
'static': normpath(join(dirname(__file__), '..', 'Static')),
'css': 'Css/handler_xml.css'}
thumbnail_timeout = 30
relaxng: dict | None = None
class_xpath: str | None = None
_home = dirname(__file__)
_xslt: dict = {}
# -------------------------------------------------------------------------
[docs]
def infos_complete_fields(
self, warehouse, path, abs_path, whoosh_fields, request=None):
"""Complete the ``whoosh_fields`` dictionary with information found
in the metadata file.
See: :meth:`ciowarehouse.lib.handler.Handler.infos_complete_fields`
"""
super(HandlerXml, self).infos_complete_fields(
warehouse, path, abs_path, whoosh_fields, request)
# Look for index field IDs to process
field_ids = self._remaining_fields(warehouse, whoosh_fields)
if not field_ids:
return
# Load file
tree, err = load_xml2(abs_path)
if err is not None:
self._log_error(
_('${w}/${p}: ${e}', {
'w': warehouse.uid, 'p': path, 'e': translate(err)}),
request)
return
# Extract information
for field_id in field_ids:
xpath = self._indexers[field_id].get(
'argument', 'normalize-space()')
namespace = re_search(r'\{([^}]+)\}', xpath)
namespace = namespace and namespace.group(0)
xpath = xpath.replace(namespace, 'ns0:') if namespace else xpath
try:
xresult = tree.xpath(
xpath, namespaces=namespace and {'ns0': namespace[1:-1]})
except etree.XPathEvalError as error:
self._log_error(
'XPath "{0}": {1}'.format(xpath, error), request)
continue
if not xresult:
continue
if isinstance(xresult, list):
# pylint: disable = protected-access
sep = ',' if warehouse.indexfields[
field_id]['field_type'] == 'keyword' else ' '
if isinstance(xresult[0], etree._Element):
field = sep.join([elt.text or '' for elt in xresult])
else:
field = sep.join(xresult)
else:
field = xresult
field = normalize_spaces(field)
if self._indexers[field_id].get('limit'):
field = shorten(field, self._indexers[field_id]['limit'])
if field:
whoosh_fields[field_id] = convert_value(
warehouse.indexfields[field_id]['field_type'], field)
if warehouse.indexfields[field_id]['field_type'] in (
'datetime', 'date'):
whoosh_fields[field_id] = whoosh_fields[
field_id].isoformat()
# -------------------------------------------------------------------------
[docs]
def thumbnails(
self, warehouse, abs_file, thumb_dir, request=None, registry=None):
"""Create the small and large thumbnails representing the file.
See: :meth:`ciowarehouse.handlers.Handler.thumbnails`
"""
if 'css' not in self.thumbnailing:
return
if not warehouse.lock(abs_file):
return
# Create HTML
if self.thumbnailing.get('xsl'):
self.install()
html = self.xml2html(
request, warehouse, relpath(abs_file, warehouse.root), None,
self.thumbnailing, 'thumbnail', file_url=True)[0]
if html is None:
warehouse.unlock(abs_file)
return
html = self._fix_media(
request, warehouse, self.thumbnailing, html,
dirname(relpath(abs_file, warehouse.root)))
html = self._transform_pi(
warehouse, self.thumbnailing, html, request)
html = self._fix_thumbnail_url(warehouse, html, request, registry)
else:
try:
with open(abs_file, 'rb') as hdl:
html = hdl.read().strip()
except IOError:
warehouse.unlock(abs_file)
return
if not html:
warehouse.unlock(abs_file)
return
html = THUMBNAIL_FRAME.format(
uid=self.uid,
css=self.thumbnailing['css'],
content=highlight(html, XmlLexer(), HtmlFormatter()))
# Save HTML
tmp_dir = mkdtemp(
prefix=warehouse.uid.lower(),
dir=request.registry.settings.get('temporary')
if request is not None else None)
with open(join(tmp_dir, 'index.html'), 'wb') as hdl:
hdl.write(html.encode('utf8'))
# Convert HTML into thumbnails
error = html2thumbnails(
hdl.name, thumb_dir, warehouse.thumbnail_sizes,
self.thumbnail_timeout)
if error:
self._log_error(error, request)
rmtree(tmp_dir)
warehouse.unlock(abs_file)
# -------------------------------------------------------------------------
[docs]
def view(self, request, warehouse, content=None, ts_factory=None):
"""Return a string containing HTML to display the file.
See: :meth:`ciowarehouse.lib.handler.Handler.view`
"""
# Create content
viewing = self.current_rendering(request, warehouse, 'viewing')
if not viewing:
return None
if viewing.get('xsl'):
self.install(self._develop)
if self._develop:
self._xslt = {}
content, class_ = self.xml2html(
request, warehouse, join(*request.matchdict['path']), content,
viewing, 'view')[:2]
if content is not None:
content = content.replace(ATTRIBUTE_NS_HTML, '')
content = self._fix_media(request, warehouse, viewing, content)
content = self._transform_pi(
warehouse, viewing, content, request)
else:
if content is None:
try:
with open(join(warehouse.root, *request.matchdict['path']),
'rb') as hdl:
content = hdl.read()
except (OSError, IOError):
return None
class_ = None
content = highlight(content, XmlLexer(), HtmlFormatter())
if content is None:
return None
# Create HTML
html = self._chameleon_render(
request, warehouse, viewing, ts_factory or _, {
'class': class_,
'rendering_num': request.session[
'handlers'][self.uid]['viewing'],
'content': Literal(content)})
if html is None:
return None
return html
# -------------------------------------------------------------------------
[docs]
def edit(self, request, warehouse, content=None, ts_factory=None):
"""Return a string containing HTML to edit the file.
See: :meth:`ciowarehouse.lib.handler.Handler.edit`
"""
# Rendering
editing = self.current_rendering(request, warehouse, 'editing')
if not editing:
return None
# Lock file
if not self.file_lock(request, warehouse):
request.session.flash(
translate(_('The file is locked.'), request=request), 'alert')
return None
# Form, initial content and frame
path = join(*request.matchdict['path'])
if editing['name'].startswith('cxe'):
form, original, frame, class_ = self.edit_cxe(
request, warehouse, path, content, editing)
elif editing.get('xsl'):
form, original, frame, class_ = self.edit_mask(
request, warehouse, path, content, editing)
if frame is not None:
frame = self._fix_media(request, warehouse, editing, frame)
frame = self._transform_pi(warehouse, editing, frame, request)
else:
form, original, frame = self.edit_xml(
request, warehouse, path, content)
class_ = None
if form is None:
self.file_unlock(request, warehouse)
return None
# Action
action = get_action(request)[0]
if action in ('saq!', 'sav!') and form.validate():
error = self.save(
request, warehouse, original, form.values, action == 'sav!')
if error != '':
request.session.flash(translate(
_('"${f}" successfully saved.', {'f': basename(path)})
if error is None else error,
request=request), 'alert' if error is not None else '')
if action == 'saq!' and not error:
return None
# HTML
html = self._chameleon_render(
request, warehouse, editing, ts_factory or _, {
'form': form,
'class': class_,
'rendering_num': request.session[
'handlers'][self.uid]['editing'],
'content': Literal(frame)})
if html is None:
self.file_unlock(request, warehouse)
return html
# -------------------------------------------------------------------------
[docs]
def edit_cxe(self, request, warehouse, path, content=None, editing=None):
"""Return a string containing HTML to edit in a WYSIWYM manner.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:param content: (optional)
Content of the file.
:param dict editing: (optional)
Dictionary representing the editing.
:rtype: tuple
:return:
A tuple such as ``(form, original, frame, class)``.
"""
# Content and aside
self.install(self._develop)
if 'content' in request.POST:
content = self._save_cxe_transform(
request, request.POST['content'], editing)
elif content is None:
with open(join(warehouse.root, path), 'rb') as hdl:
content = hdl.read()
aside = self._edit_cxe_aside(
request, warehouse, path, content, editing)
content, class_ = self._edit_cxe_transform(
request, content, editing)
# Debug
# print('-' * 80, 'IN')
# print(content)
# Parameters
params = 'data-route_preview="{preview}/"'\
' data-route_image="{image}/"'\
' data-route_audio="{audio}/"'\
' data-route_video="{video}/"{i18n}'.format(
preview=request.route_path(
'file_preview', warehouse_id=warehouse.uid,
path=dirname(path) or '.'),
image=request.route_path(
'media_download', handler_id=self.uid, rendering='editing',
file_type='image', warehouse_id=warehouse.uid,
path=dirname(path) or '.'),
audio=request.route_path(
'media_download', handler_id=self.uid, rendering='editing',
file_type='audio', warehouse_id=warehouse.uid,
path=dirname(path) or '.'),
video=request.route_path(
'media_download', handler_id=self.uid, rendering='editing',
file_type='video', warehouse_id=warehouse.uid,
path=dirname(path) or '.'),
i18n=self._edit_cxe_i18n(request))
return Form(request), content, \
EDIT_CXE_FRAME.format(
class_=editing['class'] if editing and 'class' in editing
else self.uid.replace('xml_', ''), params=params,
content=special_protect(
re_sub('<\\?xml[^>]+>', '', content)),
aside=special_protect(
re_sub('<\\?xml[^>]+>', '', aside))), \
class_
# -------------------------------------------------------------------------
def _edit_cxe_transform(self, request, content, editing):
"""Possibly convert the content before use and return it as a string.
:type request: pyramid.request.Request
:param request:
Current request.
:param content:
Content of the file
:param dict editing: (optional)
Dictionary representing the editing.
:rtype: tuple
:return:
A tuple such as ``(content, class_)``
"""
if not content:
return '', None
# Regex
if editing and 'regex' in editing and editing['regex'][0]:
content = apply_regex(
request, self.abspath_from_home(editing['regex'][0]), content)
# XSL
uid = f'{self.uid}:cxe_in'
if uid not in self._xslt or self._develop:
try:
self._xslt[uid] = etree.XSLT(etree.parse(
self.abspath_from_home(editing['xsl'][0])))\
if editing and 'xsl' in editing and editing['xsl'][0] \
else None
except (IOError, etree.XSLTParseError,
etree.XMLSyntaxError) as error:
self._log_error(error, request)
self._xslt[uid] = None
# pylint: disable = protected-access
if self._xslt[uid] is None:
return (tostr(etree.tostring(content, encoding='utf-8')), None) \
if isinstance(content, etree._ElementTree) \
else tostr(content), None
if not isinstance(content, etree._ElementTree):
content = load_xml2('content.xml', data=content)[0]
class_ = None
if self.class_xpath:
class_ = content.xpath(self.class_xpath)
class_ = class_[0] if class_ else None
# pylint: enable = protected-access
try:
content = self._xslt[uid](content)
except (etree.XSLTApplyError, TypeError) as error:
self._log_error(error, request)
return '', class_
return str(content), class_
# -------------------------------------------------------------------------
def _edit_cxe_aside(self, request, warehouse, path, content, editing):
"""Return a possibly string containing additional information.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:param content: (optional)
Content of the file.
:param dict editing: (optional)
Dictionary representing the editing.
:rtype: str
"""
# pylint: disable = unused-argument
return ''
# -------------------------------------------------------------------------
@classmethod
def _edit_cxe_i18n(cls, request):
"""Return a string of data-i18n_* attributes for translations in
CioXmlEditor.
:type request: pyramid.request.Request
:param request:
Current request.
"""
return 'data-i18n_select_parent="{select_parent}"'\
' data-i18n_undo="{undo}"'\
' data-i18n_redo="{redo}"'\
' data-i18n_lift="{lift}"'\
' data-i18n_join_down="{join_down}"'\
' data-i18n_join_up="{join_up}"'\
' data-i18n_ok="{ok}"'\
' data-i18n_cancel="{cancel}"'\
' data-i18n_close="{close}"'\
' data-i18n_required_value="{required}"'\
' data-i18n_excluded_value="{excluded}"'\
' data-i18n_format_not_valid="{not_valid}"'\
' data-i18n_key="{key}"'\
' data-i18n_value="{value}"'\
' data-i18n_click4keyboard="{click4keyboard}"'\
' data-i18n_value="{value}"'\
' data-i18n_dblclick2edit="{dblclick2edit}"'.format(
select_parent=translate(
_('Parent node (Esc)'), request=request),
undo=translate(
_('Undoing last change (Ctrl-z)'), request=request),
redo=translate(
_('Redoing last undone change (Shift-Ctrl-z)'),
request=request),
lift=translate(_(
'Removal of a level (Ctrl-<)'), request=request),
join_down=translate(
_('Merging downwards (Alt-↓)'), request=request),
join_up=translate(
_('Merging upwards (Alt-↑)'), request=request),
ok=translate(_('OK'), request=request),
cancel=translate(_('Cancel'), request=request),
close=translate(_('Close the window'), request=request),
required=translate(_('Required value'), request=request),
excluded=translate(_('Excluded value'), request=request),
not_valid=translate(_('Format not valid'), request=request),
key=translate(_('Key'), request=request),
value=translate(_('Value'), request=request),
click4keyboard=translate(
_('Click to open/close the keyboard'), request=request),
dblclick2edit=translate(
_('Double click to edit'), request=request))
# -------------------------------------------------------------------------
[docs]
def edit_mask(self, request, warehouse, path, content=None, editing=None,
in_panel=False):
"""Return a string containing HTML to edit the file with a mask.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:type content: :class:`str`, :class:`lxml.etree.ElementTree`
:param content: (optional)
Content of the file.
:param dict editing: (optional)
Dictionary representing the editing.
:param bool in_panel: (default=False)
``True`` if it is displayed in a panel.
:rtype: tuple
:return:
A tuple such as ``(form, content, frame, class)``.
"""
# pylint: disable = too-many-arguments
self.install(self._develop)
if editing is None:
editing = self.current_rendering(request, warehouse, 'editing')
if self._develop:
self._xslt = {}
# Frame and initial content
frame, class_, content = self.xml2html(
request, warehouse, path, content, editing, 'frame',
parser=etree.XMLParser(remove_blank_text=True), in_panel=in_panel)
if frame is None: # pragma: nocover
self._log_error(
translate(_('Incorrect XSL for frame'), request=request),
request)
return None, None, None, None
frame = tounicode(frame).replace(ATTRIBUTE_NS_HTML, '')
# Values
try:
values = loads(self.xml2html(
request, warehouse, path, content, editing, 'values',
in_panel=in_panel)[0].replace('\n', '\\n'))
except (TypeError, ValueError):
self._log_error(
translate(_('Incorrect XSL for values'), request=request),
request)
return None, None, None, None
# Form
form, frame = self._edit_mask_form(request, values, frame, in_panel)
return form, content, frame, class_
# -------------------------------------------------------------------------
@classmethod
def _edit_mask_form(cls, request, values, frame, in_panel):
"""Return a string containing HTML to edit the file with a mask.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict values:
Values for fields.
:param str frame:
Frame being processed.
:param bool in_panel:
``True`` if it is displayed in a panel.
:rtype: tuple
:return:
A tuple such as ``(form, frame)``.
"""
# Defaults
defaults = {}
for field in values:
field_id, input_type = field.split('@')
if input_type == 'checkbox':
defaults[field_id] = values[field]
elif input_type != 'options':
values[field] = special_protect(values[field])
defaults[field_id] = values[field].strip() \
if '\n' not in values[field] else '\n'.join([
k.strip() for k in values[field].split('\n')]).rstrip()
# Form
form = Form(request, defaults=defaults, force_defaults=in_panel)
for field in values:
field_id, input_type = field.split('@')
if input_type == 'hidden':
form_input = form.hidden(field_id)
elif input_type == 'checkbox':
form_input = form.custom_checkbox(field_id)
elif input_type == 'textarea':
form_input = form.textarea(field_id)
elif input_type == 'select' and '{0}@options'.format(
field_id) in values:
form_input = form.select(
field_id, None, values['{0}@options'.format(field_id)])
elif input_type != 'options':
form_input = form.text(field_id)
if input_type != 'options':
frame = frame.replace(
'__{0}__'.format(field), tounicode(form_input))
return form, frame
# -------------------------------------------------------------------------
[docs]
@classmethod
def edit_xml(cls, request, warehouse, path, content=None):
"""Return a string containing HTML to edit the file as XML.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:param content: (optional)
Content of the file.
:rtype: tuple
:return:
A tuple such as ``(form, original, frame)``.
"""
if 'content' in request.POST:
content = request.POST['content'].encode('utf8')
if content is None:
with open(join(warehouse.root, path), 'rb') as hdl:
content = hdl.read()
return Form(request), content, EDIT_XML_FRAME.format(
content.decode('utf8').replace('<', '&lt;')
.replace(' ', '‧'))
# -------------------------------------------------------------------------
[docs]
def save(self, request, warehouse, original, values, go_on):
"""Save the XML file.
See: :meth:`ciowarehouse.lib.handler.Handler.save`
"""
editing = self.current_rendering(request, warehouse, 'editing')
path = join(*request.matchdict['path'])
if editing['name'].startswith('cxe'):
return self._save_cxe(
request, warehouse, editing, path, values, go_on)
if editing['name'] == 'mask':
return self._save_mask(
request, warehouse, path, original, values, go_on)
if editing['name'] == 'xml':
return self._save_xml(request, warehouse, path, values, go_on)
self.file_unlock(request, warehouse)
return translate(_('Unknown editing mode'), request=request)
# -------------------------------------------------------------------------
def _save_cxe(self, request, warehouse, editing, path, values, go_on):
"""Update XML in a WYSIWYM manner.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param dict editing:
Dictionary representing the editing.
:param str path:
Relative path to the file.
:param dict values:
Values of the form.
:param bool go_on:
``True`` if the modification continues after saving.
:rtype: bool
"""
# pylint: disable = too-many-arguments
# Get content
self.install(self._develop)
content = self._save_cxe_transform(
request, values.get('content', ''), editing)
# Debug
# tree = load_xml2('content.xml', data=content)[0]
# print('-' * 80, 'OUT')
# print(etree.tostring(
# tree, pretty_print=True, encoding='utf-8',
# xml_declaration=True).decode('utf8'))
# Validate
relaxngs = relaxng4validation(self.relaxng)
tree, err = load_xml2(
'content.xml', relaxngs=relaxngs, data=content, noline=True)
if err is not None:
return translate(err, request=request)
# Save
content = etree.tostring(
tree, pretty_print=True, encoding='utf-8',
xml_declaration=True).decode('utf8')
if editing and 'regex' in editing and editing['regex'][1]:
content = apply_regex(
request, self.abspath_from_home(editing['regex'][1]), content)
tree, err = load_xml2(
'content.xml', relaxngs=relaxngs, data=content, noline=True)
if err is not None:
return translate(err, request=request)
with open(join(warehouse.root, path), 'w', encoding='utf8') as hdl:
hdl.write(content)
# Go on
if not go_on:
self.edit_finalization(
request, warehouse, path,
translate(
_('Online editing in "WYSIWYM" mode'), request=request))
self._update_panel(request, warehouse, path, tree)
return None
# -------------------------------------------------------------------------
def _save_cxe_transform(self, request, content, editing):
"""Possibly convert the content before saving and return it as a
string.
:type request: pyramid.request.Request
:param request:
Current request.
:param content:
Content of the file
:param dict editing: (optional)
Dictionary representing the editing.
:rtype: str
"""
# XSL
content = re_sub(
' cxens="([a-z]+):([a-zA-Z0-9/:._-]+)"', r' xmlns:\1="\2"',
special_unprotect(content))
uid = '{}:cxe_out'.format(self.uid)
if uid not in self._xslt or self._develop:
try:
self._xslt[uid] = etree.XSLT(etree.parse(
self.abspath_from_home(editing['xsl'][1])))\
if editing and 'xsl' in editing and editing['xsl'][1] \
else None
except (IOError, etree.XSLTParseError,
etree.XMLSyntaxError) as error:
self._log_error(error, request)
self._xslt[uid] = None
if self._xslt[uid] is not None:
content, err = load_xml2('content.xml', data=content)
if err is not None:
self._log_error(err, request)
return ''
try:
content = self._xslt[uid](content)
except etree.XSLTApplyError as error:
self._log_error(error, request)
return ''
content = str(content)
# Regular expressions
if editing and 'regex' in editing and editing['regex'][1]:
content = apply_regex(
request, self.abspath_from_home(editing['regex'][1]), content)
return content
# -------------------------------------------------------------------------
def _save_mask(self, request, warehouse, path, original, values, go_on):
"""Update XML according to a mask. This method must be overridden by
the derived class.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: ciowarehouse.lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:type original: lxml.etree._ElementTree
:param original:
Initial content of the file as a XML DOM object.
:param dict values:
Values of the form.
:param bool go_on:
``True`` if the modification continues after saving.
:rtype: bool
"""
# pylint: disable = unused-argument, too-many-arguments
# Validate
if self.relaxng is not None:
error = validate_xml(
original, relaxngs=relaxng4validation(self.relaxng))
if error is not None: # pragma: nocover
return translate(error, request=request)
# Save
original.write(
join(warehouse.root, path), pretty_print=True,
encoding='utf-8', xml_declaration=True)
if not go_on:
self.edit_finalization(
request, warehouse, path,
translate(_('Online editing in "mask" mode'), request=request))
self._update_panel(request, warehouse, path, original)
return None
# -------------------------------------------------------------------------
def _save_xml(self, request, warehouse, path, values, go_on):
"""Update XML.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:param dict values:
Values of the form.
:param bool go_on:
``True`` if the modification continues after saving.
:rtype: bool
"""
# Validate
xml = values.get('content', '').replace('&', '#amp;')\
.replace('<', '#lt;').replace('>', '#gt;')\
.replace('&', '&').replace('#amp;', '&')\
.replace('#lt;', '<').replace('#gt;', '>')\
.replace('’', "'").replace('‧', ' ')
tree, err = load_xml2(
'content.xml', relaxngs=relaxng4validation(self.relaxng),
data=tostr(xml))
if err is not None:
return translate(err, request=request)
# Save
tree.write(
join(warehouse.root, path), pretty_print=True,
encoding='utf-8', xml_declaration=True)
if not go_on:
self.edit_finalization(
request, warehouse, path,
translate(_('Online editing in XML mode'), request=request))
self._update_panel(request, warehouse, path, tree)
return None
# -------------------------------------------------------------------------
[docs]
def xml2html(self, request, warehouse, path, content, rendering, mode,
parser=None, file_url=False, in_panel=False):
"""Thanks to a XSL file, return a piece of HTML to display or edit a
file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:type content: :class:`str`, :class:`lxml.etree.ElementTree`, ``None``
:param content:
Content of the file.
:param dict rendering:
Dictionary defining the rendering.
:param str mode:
Display mode ('thumbnail', 'view', 'frame' or 'values').
:param parser: (optional)
Parser to load XML.
:param bool file_url: (default=False)
If ``True``, URLs of media gegin with `file://`.
:param bool in_panel: (default=False)
``True`` if it is displayed in a panel.
:rtype: tuple
:return:
A tuple such as ``(html, class, content_as_tree)``.
See: :meth:`.handlers.xml.HandlerXml.view` and
:meth:`.handlers.xml.HandlerXml.edit`
"""
# pylint: disable = too-many-arguments
# Retrieve the XSL
uid = f"{self.uid}:{rendering.get('name')}"
if uid not in self._xslt:
self._xslt[uid] = load_xslt(
self.abspath_from_home(rendering.get('xsl')))
if not isinstance(self._xslt[uid], etree.XSLT):
self._log_error(self._xslt[uid], request)
return None, None, content
# Load the file
content, err = load_xml2(
join(warehouse.root, path), data=content, parser=parser)
if err is not None:
self._log_error(err, request)
return None, None, None
# Get class
class_ = None
if self.class_xpath and mode != 'values':
class_ = content.xpath(self.class_xpath)
class_ = class_[0] if class_ else None
# Prepare parameters
params = {
'mode': '"{0}"'.format(mode),
'theme': '"{0}"'.format(theme_static_prefix(request)),
'handler_uid': '"{0}"'.format(
rendering.get('handler_uid', self.uid)),
'in_panel': '1' if in_panel else '0',
'language': '"{0}"'.format(
request.session.get('lang', 'en') if request else ''),
'warehouse_id': '"{0}"'.format(warehouse.uid),
'input_directory': '"{0}"'.format(normpath(dirname(path)) or '.'),
'input_filename': '"{0}"'.format(normpath(basename(path))),
'input_path': '"{0}"'.format(normpath(path)),
'route_preview': '"{0}/"'.format(request.route_path(
'file_preview', warehouse_id='',
path=())[:-2]) if not file_url else '"preview://"',
'route_view': '"{0}/"'.format(request.route_path(
'file_view', warehouse_id='',
path=())[:-2]) if not file_url else '"download://"',
'route_edit': '"{0}/"'.format(request.route_path(
'file_edit', warehouse_id='',
path=())[:-2]) if not file_url else '"download://"',
'route_download': '"{0}/"'.format(request.route_path(
'file_download', warehouse_id='',
path=())[:-2]) if not file_url else '"download://"'
}
params['route_image'] = '"{0}/"'.format(request.route_path(
'media_download', handler_id=self.uid,
rendering='viewing'
if mode in ('view', 'thumbnail') else 'editing',
file_type='image', warehouse_id=warehouse.uid,
path=dirname(path))) if not file_url else \
'"file://{0}#"'.format(MEDIA_FILE_NOTFOUND['image'])
params['route_audio'] = '"{0}/"'.format(request.route_path(
'media_download', handler_id=self.uid,
rendering='viewing'
if mode in ('view', 'thumbnail') else 'editing',
file_type='audio', warehouse_id=warehouse.uid,
path=dirname(path))) if not file_url else \
'"file://{0}#"'.format(MEDIA_FILE_NOTFOUND['audio'])
params['route_video'] = '"{0}/"'.format(request.route_path(
'media_download', handler_id=self.uid,
rendering='viewing'
if mode in ('view', 'thumbnail') else 'editing',
file_type='video', warehouse_id=warehouse.uid,
path=dirname(path))) if not file_url else \
'"file://{0}#"'.format(MEDIA_FILE_NOTFOUND['video'])
if 'static' in rendering:
params['static'] = '"{0}"'.format(
self.abspath_from_home(rendering['static']))
if 'css' in rendering:
params['css'] = '"{0}"'.format(rendering['css'])
# XSL transform
try:
html = self._xslt[uid](content, **params)
except (etree.XSLTApplyError, TypeError) as error:
self._log_error(error, request)
return None, class_, content
html = etree.tostring(
etree.ElementTree(html.getroot()), encoding='utf-8').decode(
'utf8') if html.getroot() is not None else str(html)
return html, class_, content
# -------------------------------------------------------------------------
def _update_panel(self, request, warehouse, path, content):
"""Update XML according to a mask. This method must be overridden by
the derived class.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: ciowarehouse.lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str path:
Relative path to the file.
:type content: lxml.etree._ElementTree
:param content:
Content of the file as a XML DOM object.
"""
if self.panel is not None:
panel = request.registry['panels'].get(self.panel.uid)
if panel is not None and panel.is_open(request):
content = panel.render_content(
request, warehouse, path, content)
panel.set_value(request, 'content', Literal(content))
panel.set_value(request, 'modified', False)
# -------------------------------------------------------------------------
def _fix_renderings(self, request, available):
"""Possibly Fix URLs.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict available:
Available renderings.
:rtype: dictionary
"""
local = 'modules' in request.registry \
and 'ciokatex' in request.registry['modules'] \
and 'ciokatex' not in request.registry['modules_off']
katex_css = KATEX_CSS_LOCAL if local else KATEX_CSS_CDN
katex_js = KATEX_JS_LOCAL if local else KATEX_JS_CDN
for renderings in ('viewings', 'editings'):
for rendering in available.get(renderings, ''):
urls = []
for url in rendering.get('css') or '':
urls.append(url.format(katex_css=katex_css))
rendering['css'] = tuple(urls)
urls = []
for url in rendering.get('js') or '':
urls.append(url.format(katex_js=katex_js))
rendering['js'] = tuple(urls)
return available
# -------------------------------------------------------------------------
@classmethod
def _fix_media(cls, request, warehouse, rendering, html, directory=None):
"""Fix image, audio and video source if possible.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param dict rendering:
Dictionary defining the rendering.
:param str html:
HTML content to fix.
:param str directory:
Relative path to the directory containing the file.
:rtype: str
:return:
Fixed HTML content
"""
# pylint: disable = too-many-locals, too-many-branches
file_url = directory is not None
theme = theme_static_prefix(request) if not file_url else 'file://'
if request is not None and not file_url:
directory = dirname(join(*request.matchdict['path']))
directory = directory or '.'
for media_type in ('image', 'audio', 'video'):
roots = rendering.get('{0}_paths'.format(media_type))
if not roots:
continue
notfound = MEDIA_FILE_NOTFOUND[media_type] \
if file_url else MEDIA_HTTP_NOTFOUND[media_type]
try:
media_ids = set(re_findall(
'"{0}{1}#([^"]+)'.format(theme, notfound), html))
except TypeError as error:
cls._log_error(error, request)
continue
extensions = rendering.get('{0}_exentions'.format(media_type)) \
or MEDIA_EXTENSIONS[media_type]
names = {
f'{basename(i)}{j}': i for i in media_ids for j in extensions}
for root in roots:
root = join(warehouse.root, root.format(directory=directory))
for path, dirs, files in walk(root):
for name in tuple(dirs):
if name in EXCLUDED_FILES:
dirs.remove(name)
continue
for name in files:
if name not in names:
continue
html = cls._fix_one_media(
request, warehouse, theme, media_type, names[name],
relpath(join(path, name), warehouse.root), html,
file_url)
name = names[name]
names = {
k: names[k] for k in names if names[k] != name}
media_ids.remove(name)
if not media_ids:
break
if not media_ids:
break
if not media_ids:
break
return html
# -------------------------------------------------------------------------
@classmethod
def _fix_one_media(cls, request, warehouse, theme, media_type, media_id,
media_path, html, file_url=False):
"""Fix one media.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str theme:
Theme prefix.
:param str media_type: ('image', 'audio' or 'video')
Type of media.
:param str media_id:
ID of the media with or without its extension.
:param str media_path:
Relative path of the found media.
:param str html:
HTML content to fix.
:param bool file_url: (default=False)
If ``True``, use `file://' for URL.
:rtype: str
:return:
Partially fixed HTML content.
"""
# pylint: disable = too-many-arguments
notfound = MEDIA_FILE_NOTFOUND[media_type] \
if file_url else MEDIA_HTTP_NOTFOUND[media_type]
# Audio or video
if media_type != 'image':
url = request.route_path(
'file_download', warehouse_id=warehouse.uid, path=media_path) \
if not file_url \
else f'file://{normpath(join(warehouse.root, media_path))}'
return html.replace(
f'"{theme}{notfound}#{media_id}"', f'"{url}"')
# Native image
if file_url and splitext(media_path)[1] in (
'.jpg', '.jpeg', '.png', '.gif', '.svg'):
return html.replace(
f'"{theme}{notfound}#{media_id}"',
f'"file://{normpath(join(warehouse.root, media_path))}"')
# Thumbnail
thumbnail_dir = join(warehouse.root, THUMBNAILS_DIR)
abs_thumb = join(
thumbnail_dir, media_path, '{0}.jpg'.format(THUMBNAIL_LARGE))
if not exists(abs_thumb):
abs_thumb = join(
thumbnail_dir, media_path, '{0}.png'.format(THUMBNAIL_LARGE))
if exists(abs_thumb):
url = request.route_path(
'file_thumbnail', warehouse_id=warehouse.uid,
path=relpath(abs_thumb, thumbnail_dir)) \
if not file_url else f'file://{abs_thumb}'
return html.replace(
f'"{theme}{notfound}#{media_id}"', f'"{url}"')
return html
# -------------------------------------------------------------------------
@classmethod
def _transform_pi(cls, warehouse, rendering, html, request=None):
"""Fix processing instructions.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param dict rendering:
Dictionary defining the rendering.
:param str html:
HTML content to fix.
:param str directory:
Relative path to the directory containing the file.
:type request: pyramid.request.Request
:param request: (optional)
Current request.
:rtype: str
:return:
Fixed HTML content
"""
# pylint: disable = unused-argument
return html
# -------------------------------------------------------------------------
@classmethod
def _fix_thumbnail_url(cls, warehouse, html, request=None, registry=None):
"""Fix download and preview URLs for thumbnail rendering.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str html:
HTML content to fix.
:type request: pyramid.request.Request
:param request: (optional)
Current request.
:type registry: pyramid.registry.Registry
:param registry: (optional)
Current registry.
:rtype: str
:return:
Fixed HTML content
"""
# Roots of warehouses
locations = dict(registry['modules']['ciowarehouse'].locations) \
if registry is not None else {}
locations['/{0}'.format(warehouse.uid)] = warehouse.root
# Download URLs
try:
urls = set(re_findall('"download://([^"]+)', html))
except TypeError as error:
cls._log_error(error, request)
return html
for url in urls:
root = location2abs_base(
locations, '/{0}'.format(url.partition('/')[0]))
abs_path = join(root, url.partition('/')[2].partition('?')[0]) \
if root is not None else None
if abs_path is None or not exists(abs_path):
abs_path = NOT_FOUND
html = html.replace(f'"download://{url}"', f'"file://{abs_path}"')
# Preview URLs
try:
urls = set(re_findall('"preview://([^"]+)', html))
except TypeError as error:
cls._log_error(error, request)
return html
for url in urls:
root = location2abs_base(
locations, '/{0}'.format(url.partition('/')[0]))
path = url.partition('/')[2].partition('?')[0]
abs_path = join(root, path) if root is not None else None
if abs_path is None or not exists(abs_path):
html = html.replace(f'preview://{url}', f'file://{NOT_FOUND}')
continue
thumb_dir = join(root, THUMBNAILS_DIR, path)
thumb_size = THUMBNAIL_SMALL \
if 'size=small' in url.partition('?')[2] else THUMBNAIL_LARGE
if isdir(abs_path):
thumb_dir = join(thumb_dir, HERE)
if exists(thumb_dir):
if exists(join(thumb_dir, f'{thumb_size}.jpg')):
abs_path = join(thumb_dir, f'{thumb_size}.jpg')
else:
abs_path = join(thumb_dir, f'{thumb_size}.png')
else:
abs_path = join(MIMETYPES_DIR, 'normal', '{0}.svg'.format(
mimetype_get(abs_path)[1]))
html = html.replace(f'"preview://{url}"', f'"file://{abs_path}"')
return html
# -------------------------------------------------------------------------
@classmethod
def _remove_children(cls, element):
"""Remove all children of an XML element.
:type element: lxml.etree.Element
:param element:
XML Element to clean.
"""
for elt in element.getchildren():
element.remove(elt)