scrape_and_report

Run-file to scrape and report.

Since scraping and reporting is executed on a personal VDI instead of a (multiuser) server, the concept of partner production is designed to safeguard scraping and reporting on a daily basis by using this scrape_and_report module.

For this partner production to be successful next conditions should be met for each participating (personal) VDI:

  • home directory listed as network path (//<VDI name>/users/<userid>) after the partner_homedirs parameter in the [MAIN] section of the configuration file
  • local data store (scrape master directory) located on the C: drive at the location that is set by the rel_mst_dir parameter in the [MAIN] section of the configuration file
  • local data store shared read/write with the users of all partner VDI's
  • content of local data store equal to the data stores of other partner VDI's
  • same Python version in a virtual environment as other partner VDI's
  • same Python code as other partner VDI's
  • production of this module scheduled to run at 5:00 AM each day

Upon executing this module, next actions will take place:

  • a flag is raised to signal that this VDI is available for production
  • other partner VDI's are checked on a similar flag signalling their availability
  • one of the available partners is selected for production (rotating per day)
  • stop execution if this VDI is not selected
  • if this VDI ìs selected: 1) scrape the site, 2) update analytics data, 3) produce reports and 4) synchronise the data changes to all other partner VDI's
  • all major steps are logged to a production log, which, as a last action, is copied to all partner VDI's.

The partner production mechanism safeguards against the cases that one of the partner VDI's is shut down (occurs after 7 days of inactivity) or not logged on (occurs after a restart from a pushed update). It also prevents that a partner VDI is selected for production if it has an outdated data store (occurs after not being available for one or more days).

View Source
#!/usr/bin/env python

"""
**Run-file to scrape and report.**

Since scraping and reporting is executed on a personal VDI instead of a
(multiuser) server, the concept of ***partner production*** is designed to
safeguard scraping and reporting on a daily basis by using this
scrape_and_report module.

For this partner production to be successful ***next conditions should be
met*** for each participating (personal) VDI:

- home directory listed as network path (`//<VDI name>/users/<userid>`) after
  the `partner_homedirs` parameter in the [MAIN] section of the configuration
  file
- local data store (scrape master directory) located on the C: drive at the
  location that is set by the `rel_mst_dir` parameter in the [MAIN] section of
  the configuration file
- local data store shared read/write with the users of all partner VDI's
- content of local data store equal to the data stores of other partner VDI's
- same Python version in a virtual environment as other partner VDI's
- same Python code as other partner VDI's
- production of this module scheduled to run at 5:00 AM each day

Upon executing this module, next actions will take place:

- a flag is raised to signal that this VDI is available for production
- other partner VDI's are checked on a similar flag signalling their
  availability
- one of the available partners is selected for production (rotating per day)
- stop execution if this VDI is not selected
- if this VDI ìs selected: 1) scrape the site, 2) update analytics data, 3)
  produce reports and 4) synchronise the data changes to all other partner
  VDI's
- all major steps are logged to a production log, which, as a last action, is
  copied to all partner VDI's.

The ***partner production mechanism*** safeguards against the cases that one
of the partner VDI's is shut down (occurs after 7 days of inactivity) or not
logged on (occurs after a restart from a pushed update). It also prevents
that a partner VDI is selected for production if it has an outdated data
store (occurs after not being available for one or more days).
"""

import logging
import os
import shutil
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Iterable

from bd_www import scrape_conf, mst_dir
from bd_www.scrape import scrape_site, valid_scrapes
from bd_www.matomo import update_analytics
from bd_www.report import site_report

logger = logging.getLogger()
_log_name = scrape_conf.prod_log_name


def scrape_and_report() -> None:
    """
    **Scrape www.belastingdienst.nl and report about it.**

    First of all this function decides upon which of the partner VDI's will
    run the actual production. In case it is not this VDI this function will
    return without actually doing anything.

    In case this VDI ìs chosen, it will scrape, collect usage statistics and
    generate reports, by consecutively calling the next functions:

    - `bd_www.scrape.scrape_site` with default parameters
    - `bd_www.matomo.update_analytics` with default parameters
    - `bd_www.report.site_report` with the timestamp of the new scrape as
      parameter

    Afterwards the newly generated data will be synchronised to the partner
    VDI's.
    """

    # Change cwd for execution from within another directory (ref. crontab)
    os.chdir(Path(__file__).parent)

    # Setup logging
    global logger
    logger = logging.getLogger('prod')
    logger.setLevel(logging.INFO)
    log_file = str(mst_dir / _log_name)
    fh = logging.FileHandler(log_file)
    fh.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter(
        fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)

    # Handle production
    if chosen_to_produce(relax_secs=30):
        logger.info('production started')
        logger.info('starting scraper')
        timestamp = scrape_site()
        if not timestamp:
            logger.error('scraper failed')
        else:
            logger.info('scraper finished')
            logger.info('starting analytics update')
            metrics_updated = update_analytics()
            if metrics_updated:
                logger.info('finished analytics update')
                logger.info('starting reporting')
                site_report(timestamp)
                logger.info('finished reporting')
            else:
                logger.error('failed to update analytics')
                logger.warning('reporting skipped')
            sync_master_directory(cleanup=True)

        logger.info('production finished')

        # Write the production log to all partners
        for partner_homedir in scrape_conf.partner_homedirs:
            if os.getenv('username').lower() in partner_homedir.lower():
                continue
            phd = Path(partner_homedir) / scrape_conf.rel_mst_dir
            shutil.copy2(mst_dir / _log_name, phd / _log_name)
    else:
        logger.info('I am not needed any more, so will cease execution :-(')


def chosen_to_produce(relax_secs: int = 30) -> bool:
    """
    **Arbitration function if this partner VDI is selected to produce.**

    Arguments:

        relax_secs: number of seconds to wait for partners to execute mirrored
            actions

    Returns:

        True if chosen

    The partner VDI's are read from the `partner_homedirs` parameter in the
    [MAIN] section of the configuration file (see documentation of
    `bd_www.Config`).

    The algorithm to know which VDI's are available for production is:

    - raise 'available for production' flag for this VDI
    - wait `relax_secs` for others to raise flag (accommodates potential timing
      differences)
    - collect flags of all partners
    - decide on partner that should produce (see below)
    - wait `relax_secs` for others to reach same decision (accommodates
      potential timing differences)
    - remove flag
    - return True if selected

    The selection of the producing partner will rotate per day in
    alphabetical order over the available partners with the most recent scrape.
    """
    # Signal ready for production and wait for others to raise their flag
    flag_name = 'AVAILABLE_FOR_PRODUCTION.flag'
    flag = mst_dir / flag_name
    flag.touch()
    time.sleep(relax_secs)

    # Get partners (self included) with raised flags
    partners = {}
    ts_30_days_ago = (datetime.today()
                      - timedelta(days=30)).strftime('%y%m%d-%H%M')
    for partner_homedir in scrape_conf.partner_homedirs:
        partner_master_dir = Path(partner_homedir) / scrape_conf.rel_mst_dir
        partner_flag = partner_master_dir / flag_name
        if partner_flag.exists():
            partner_scrapes = valid_scrapes(
                from_ts=ts_30_days_ago, data_dir=partner_master_dir)
            latest_ts = partner_scrapes[-1] if partner_scrapes else ''
            partners[partner_homedir.lower()] = latest_ts
    logger.info(f'available partners: {", ".join(partners)}')

    # Select partners with recent and outdated data
    outdated_partners = sorted([
        p for p in partners if partners[p] < max(partners.values())])
    recent_partners = sorted([
        p for p in partners if p not in outdated_partners])
    if outdated_partners:
        logger.info(f'outdated partners: {", ".join(outdated_partners)}')

    # Decide on producing partner while rotating upon the ordinal date
    prod_partner = recent_partners[
        date.today().toordinal() % len(recent_partners)]
    logger.info(f'selected partner: {prod_partner}')

    # Wait for others to reach same decision and remove flag
    time.sleep(relax_secs)
    flag.unlink()

    # Return if this partner is selected to produce
    return os.getenv('username').lower() in prod_partner


def sync_master_directory(cleanup: bool = False) -> None:
    """
    **Synchronise the master directory to configured partners.**

    Arguments:

        cleanup: items in the partner data stores that are not present in
            the leading store will be deleted

    With the actual scrape master directory as leading data store,
    all configured partner data stores (i.e. scrape master directories) are
    synchronised in one direction. Items missing in the partner data stores
    will be copied to the partners and shared items with different content
    will be updated.

    Optionally items in the partner data stores that are not present in the
    leading one can be deleted as well. This option can be dangerous to use
    since an item can be deleted from all partners when one of them deletes
    an item and uses a synchronisation afterwards. When this option is not
    used, warnings will be logged for any of such items.

    The partner data stores are configured via the `partner_homedirs` parameter
    in the [MAIN] section of the configuration file (see documentation of
    `bd_www.Config`).

    The configuration file contains names of items that will be ignored
    during synchronisation. These are in the value list of the `sync_ignore`
    parameter of the [MAIN] section.

    All actions are logged to the configured log file, which will be copied
    to all partners after finishing all synchronisations.
    """

    global logger
    if logger.name == 'root':
        # Logging not set up yet: only log to console
        logger = logging.getLogger('sync')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    flag_name = 'SYNC_IN_PROGRESS.flag'
    ignored_items: list[str] = \
        scrape_conf.sync_ignore + [scrape_conf.prod_log_name, flag_name]
    for partner_homedir in scrape_conf.partner_homedirs:
        if os.getenv('username').lower() in partner_homedir.lower():
            continue
        dest_dir = Path(partner_homedir) / scrape_conf.rel_mst_dir
        if not dest_dir.exists():
            logger.warning(f'{dest_dir} not found')
            continue
        logger.info(f'started data sync from {mst_dir} to {dest_dir}')
        flag = dest_dir / flag_name
        flag.touch()
        sync_one_way(mst_dir, dest_dir, ignore=ignored_items, cleanup=cleanup)
        logger.info(f'finished data sync from {mst_dir} to {dest_dir}')
        if flag.exists():
            flag.unlink()


def sync_one_way(src_dir: Path, tgt_dir: Path,
                 ignore: Iterable[str] = (), cleanup: bool = False) -> None:
    """
    **Synchronise directory tree from source to target.**

    Arguments:

        src_dir: source directory
        tgt_dir: target directory
        ignore: filenames to ignore
        cleanup: target items that are not present in source will be deleted

    The directory tree with `src_dir` as root is synchronised in one direction
    to `tgt_dir`. Items missing in the target tree will be copied and shared
    items with different content will be updated (overwritten).

    Optionally items in the target tree that are not present in the source
    tree can be deleted as well. This option can be dangerous to use when an
    item is deleted in the source by accident. After completing the sync the
    mirrored item in the target tree will be deleted as well. When this
    option is not used, warnings will be logged for such items.

    All relevant actions will be logged to console. This can be overridden by
    configuring a logger named 'logger' in the namespace from where this
    function is used.

    ***Implementation note:***
    *To compare the source and target directory trees fingerprints are
    created for both trees. These fingerprints are dictionaries with the path
    relative to the topmost directory as key and the modification time as
    value. The various sync actions are performed by comparing the fingerprints
    of the these two trees.*
    """

    def root_fingerprint(root: Path) -> dict[str, (int, float)]:
        """
        **Creates a fingerprint of an entire directory tree.**

        Arguments:

            root: full path of the top directory of the tree

        Returns:

            dictionary of all files and directories with the path relative to
                root as key and the modification time as value
        """
        def tree_fingerprint(tree: Path) -> dict[str, (int, float)]:
            def dir_fingerprint(target: Path) -> dict[str, (int, float)]:
                fp = {}
                for obj in target.iterdir():
                    if obj.is_dir():
                        fp |= tree_fingerprint(obj)
                    fp[str(obj.relative_to(root))] = os.stat(obj).st_mtime
                return fp
            return dir_fingerprint(tree)
        return tree_fingerprint(root)

    # Setup logging
    global logger
    if logger.name == 'root':
        # Logging not set up yet: only log to console
        logger = logging.getLogger('sync')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Do the sync
    logger.info(f'creating fingerprint for {src_dir}')
    src_fp = root_fingerprint(src_dir)
    logger.info(f'creating fingerprint for {tgt_dir}')
    tgt_fp = root_fingerprint(tgt_dir)
    names_to_ignore = set(ignore)

    # Copy items that are missing in target
    items_only_in_src = sorted(src_fp.keys() - tgt_fp.keys())
    for item in items_only_in_src:
        if set(Path(item).parts) & names_to_ignore:
            continue
        src_item = src_dir / item
        tgt_item = tgt_dir / item
        if src_item.is_dir():
            tgt_item.mkdir()
            logger.info(f'added {tgt_item}')
        else:
            shutil.copy2(src_item, tgt_item)
            logger.info(f'added {tgt_item}')

    # Overwrite (update) items that are different in target
    shared_items = src_fp.keys() & tgt_fp.keys()
    for item in shared_items:
        if set(Path(item).parts) & names_to_ignore:
            continue
        if src_fp[item] != tgt_fp[item]:
            src_item = src_dir / item
            tgt_item = tgt_dir / item
            # Do not act on different fingerprints of a directory
            if not src_item.is_dir():
                shutil.copy2(src_item, tgt_item)
                logger.info(f'updated {tgt_item}')

    # Delete or report items in target that are not present in source
    items_only_in_tgt = sorted(tgt_fp.keys() - src_fp.keys())
    for item in items_only_in_tgt:
        if set(Path(item).parts) & names_to_ignore:
            continue
        if cleanup:
            tgt_item = tgt_dir / item
            if tgt_item.is_dir():
                shutil.rmtree(tgt_item)
                logger.info(f'deleted {tgt_item}')
            else:
                if tgt_item.exists():  # Can be deleted with directory
                    tgt_item.unlink()
                    logger.info(f'deleted {tgt_item}')
        else:
            logger.warning(f'only in {tgt_dir}: {item}')


if __name__ == '__main__':
    scrape_and_report()
#   def scrape_and_report() -> None:
View Source
def scrape_and_report() -> None:
    """
    **Scrape www.belastingdienst.nl and report about it.**

    First of all this function decides upon which of the partner VDI's will
    run the actual production. In case it is not this VDI this function will
    return without actually doing anything.

    In case this VDI ìs chosen, it will scrape, collect usage statistics and
    generate reports, by consecutively calling the next functions:

    - `bd_www.scrape.scrape_site` with default parameters
    - `bd_www.matomo.update_analytics` with default parameters
    - `bd_www.report.site_report` with the timestamp of the new scrape as
      parameter

    Afterwards the newly generated data will be synchronised to the partner
    VDI's.
    """

    # Change cwd for execution from within another directory (ref. crontab)
    os.chdir(Path(__file__).parent)

    # Setup logging
    global logger
    logger = logging.getLogger('prod')
    logger.setLevel(logging.INFO)
    log_file = str(mst_dir / _log_name)
    fh = logging.FileHandler(log_file)
    fh.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter(
        fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)

    # Handle production
    if chosen_to_produce(relax_secs=30):
        logger.info('production started')
        logger.info('starting scraper')
        timestamp = scrape_site()
        if not timestamp:
            logger.error('scraper failed')
        else:
            logger.info('scraper finished')
            logger.info('starting analytics update')
            metrics_updated = update_analytics()
            if metrics_updated:
                logger.info('finished analytics update')
                logger.info('starting reporting')
                site_report(timestamp)
                logger.info('finished reporting')
            else:
                logger.error('failed to update analytics')
                logger.warning('reporting skipped')
            sync_master_directory(cleanup=True)

        logger.info('production finished')

        # Write the production log to all partners
        for partner_homedir in scrape_conf.partner_homedirs:
            if os.getenv('username').lower() in partner_homedir.lower():
                continue
            phd = Path(partner_homedir) / scrape_conf.rel_mst_dir
            shutil.copy2(mst_dir / _log_name, phd / _log_name)
    else:
        logger.info('I am not needed any more, so will cease execution :-(')

Scrape www.belastingdienst.nl and report about it.

First of all this function decides upon which of the partner VDI's will run the actual production. In case it is not this VDI this function will return without actually doing anything.

In case this VDI ìs chosen, it will scrape, collect usage statistics and generate reports, by consecutively calling the next functions:

Afterwards the newly generated data will be synchronised to the partner VDI's.

#   def chosen_to_produce(relax_secs: int = 30) -> bool:
View Source
def chosen_to_produce(relax_secs: int = 30) -> bool:
    """
    **Arbitration function if this partner VDI is selected to produce.**

    Arguments:

        relax_secs: number of seconds to wait for partners to execute mirrored
            actions

    Returns:

        True if chosen

    The partner VDI's are read from the `partner_homedirs` parameter in the
    [MAIN] section of the configuration file (see documentation of
    `bd_www.Config`).

    The algorithm to know which VDI's are available for production is:

    - raise 'available for production' flag for this VDI
    - wait `relax_secs` for others to raise flag (accommodates potential timing
      differences)
    - collect flags of all partners
    - decide on partner that should produce (see below)
    - wait `relax_secs` for others to reach same decision (accommodates
      potential timing differences)
    - remove flag
    - return True if selected

    The selection of the producing partner will rotate per day in
    alphabetical order over the available partners with the most recent scrape.
    """
    # Signal ready for production and wait for others to raise their flag
    flag_name = 'AVAILABLE_FOR_PRODUCTION.flag'
    flag = mst_dir / flag_name
    flag.touch()
    time.sleep(relax_secs)

    # Get partners (self included) with raised flags
    partners = {}
    ts_30_days_ago = (datetime.today()
                      - timedelta(days=30)).strftime('%y%m%d-%H%M')
    for partner_homedir in scrape_conf.partner_homedirs:
        partner_master_dir = Path(partner_homedir) / scrape_conf.rel_mst_dir
        partner_flag = partner_master_dir / flag_name
        if partner_flag.exists():
            partner_scrapes = valid_scrapes(
                from_ts=ts_30_days_ago, data_dir=partner_master_dir)
            latest_ts = partner_scrapes[-1] if partner_scrapes else ''
            partners[partner_homedir.lower()] = latest_ts
    logger.info(f'available partners: {", ".join(partners)}')

    # Select partners with recent and outdated data
    outdated_partners = sorted([
        p for p in partners if partners[p] < max(partners.values())])
    recent_partners = sorted([
        p for p in partners if p not in outdated_partners])
    if outdated_partners:
        logger.info(f'outdated partners: {", ".join(outdated_partners)}')

    # Decide on producing partner while rotating upon the ordinal date
    prod_partner = recent_partners[
        date.today().toordinal() % len(recent_partners)]
    logger.info(f'selected partner: {prod_partner}')

    # Wait for others to reach same decision and remove flag
    time.sleep(relax_secs)
    flag.unlink()

    # Return if this partner is selected to produce
    return os.getenv('username').lower() in prod_partner

Arbitration function if this partner VDI is selected to produce.

Arguments:

relax_secs: number of seconds to wait for partners to execute mirrored
    actions

Returns:

True if chosen

The partner VDI's are read from the partner_homedirs parameter in the [MAIN] section of the configuration file (see documentation of bd_www.Config).

The algorithm to know which VDI's are available for production is:

  • raise 'available for production' flag for this VDI
  • wait relax_secs for others to raise flag (accommodates potential timing differences)
  • collect flags of all partners
  • decide on partner that should produce (see below)
  • wait relax_secs for others to reach same decision (accommodates potential timing differences)
  • remove flag
  • return True if selected

The selection of the producing partner will rotate per day in alphabetical order over the available partners with the most recent scrape.

#   def sync_master_directory(cleanup: bool = False) -> None:
View Source
def sync_master_directory(cleanup: bool = False) -> None:
    """
    **Synchronise the master directory to configured partners.**

    Arguments:

        cleanup: items in the partner data stores that are not present in
            the leading store will be deleted

    With the actual scrape master directory as leading data store,
    all configured partner data stores (i.e. scrape master directories) are
    synchronised in one direction. Items missing in the partner data stores
    will be copied to the partners and shared items with different content
    will be updated.

    Optionally items in the partner data stores that are not present in the
    leading one can be deleted as well. This option can be dangerous to use
    since an item can be deleted from all partners when one of them deletes
    an item and uses a synchronisation afterwards. When this option is not
    used, warnings will be logged for any of such items.

    The partner data stores are configured via the `partner_homedirs` parameter
    in the [MAIN] section of the configuration file (see documentation of
    `bd_www.Config`).

    The configuration file contains names of items that will be ignored
    during synchronisation. These are in the value list of the `sync_ignore`
    parameter of the [MAIN] section.

    All actions are logged to the configured log file, which will be copied
    to all partners after finishing all synchronisations.
    """

    global logger
    if logger.name == 'root':
        # Logging not set up yet: only log to console
        logger = logging.getLogger('sync')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    flag_name = 'SYNC_IN_PROGRESS.flag'
    ignored_items: list[str] = \
        scrape_conf.sync_ignore + [scrape_conf.prod_log_name, flag_name]
    for partner_homedir in scrape_conf.partner_homedirs:
        if os.getenv('username').lower() in partner_homedir.lower():
            continue
        dest_dir = Path(partner_homedir) / scrape_conf.rel_mst_dir
        if not dest_dir.exists():
            logger.warning(f'{dest_dir} not found')
            continue
        logger.info(f'started data sync from {mst_dir} to {dest_dir}')
        flag = dest_dir / flag_name
        flag.touch()
        sync_one_way(mst_dir, dest_dir, ignore=ignored_items, cleanup=cleanup)
        logger.info(f'finished data sync from {mst_dir} to {dest_dir}')
        if flag.exists():
            flag.unlink()

Synchronise the master directory to configured partners.

Arguments:

cleanup: items in the partner data stores that are not present in
    the leading store will be deleted

With the actual scrape master directory as leading data store, all configured partner data stores (i.e. scrape master directories) are synchronised in one direction. Items missing in the partner data stores will be copied to the partners and shared items with different content will be updated.

Optionally items in the partner data stores that are not present in the leading one can be deleted as well. This option can be dangerous to use since an item can be deleted from all partners when one of them deletes an item and uses a synchronisation afterwards. When this option is not used, warnings will be logged for any of such items.

The partner data stores are configured via the partner_homedirs parameter in the [MAIN] section of the configuration file (see documentation of bd_www.Config).

The configuration file contains names of items that will be ignored during synchronisation. These are in the value list of the sync_ignore parameter of the [MAIN] section.

All actions are logged to the configured log file, which will be copied to all partners after finishing all synchronisations.

#   def sync_one_way( src_dir: pathlib.Path, tgt_dir: pathlib.Path, ignore: Iterable[str] = (), cleanup: bool = False ) -> None:
View Source
def sync_one_way(src_dir: Path, tgt_dir: Path,
                 ignore: Iterable[str] = (), cleanup: bool = False) -> None:
    """
    **Synchronise directory tree from source to target.**

    Arguments:

        src_dir: source directory
        tgt_dir: target directory
        ignore: filenames to ignore
        cleanup: target items that are not present in source will be deleted

    The directory tree with `src_dir` as root is synchronised in one direction
    to `tgt_dir`. Items missing in the target tree will be copied and shared
    items with different content will be updated (overwritten).

    Optionally items in the target tree that are not present in the source
    tree can be deleted as well. This option can be dangerous to use when an
    item is deleted in the source by accident. After completing the sync the
    mirrored item in the target tree will be deleted as well. When this
    option is not used, warnings will be logged for such items.

    All relevant actions will be logged to console. This can be overridden by
    configuring a logger named 'logger' in the namespace from where this
    function is used.

    ***Implementation note:***
    *To compare the source and target directory trees fingerprints are
    created for both trees. These fingerprints are dictionaries with the path
    relative to the topmost directory as key and the modification time as
    value. The various sync actions are performed by comparing the fingerprints
    of the these two trees.*
    """

    def root_fingerprint(root: Path) -> dict[str, (int, float)]:
        """
        **Creates a fingerprint of an entire directory tree.**

        Arguments:

            root: full path of the top directory of the tree

        Returns:

            dictionary of all files and directories with the path relative to
                root as key and the modification time as value
        """
        def tree_fingerprint(tree: Path) -> dict[str, (int, float)]:
            def dir_fingerprint(target: Path) -> dict[str, (int, float)]:
                fp = {}
                for obj in target.iterdir():
                    if obj.is_dir():
                        fp |= tree_fingerprint(obj)
                    fp[str(obj.relative_to(root))] = os.stat(obj).st_mtime
                return fp
            return dir_fingerprint(tree)
        return tree_fingerprint(root)

    # Setup logging
    global logger
    if logger.name == 'root':
        # Logging not set up yet: only log to console
        logger = logging.getLogger('sync')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Do the sync
    logger.info(f'creating fingerprint for {src_dir}')
    src_fp = root_fingerprint(src_dir)
    logger.info(f'creating fingerprint for {tgt_dir}')
    tgt_fp = root_fingerprint(tgt_dir)
    names_to_ignore = set(ignore)

    # Copy items that are missing in target
    items_only_in_src = sorted(src_fp.keys() - tgt_fp.keys())
    for item in items_only_in_src:
        if set(Path(item).parts) & names_to_ignore:
            continue
        src_item = src_dir / item
        tgt_item = tgt_dir / item
        if src_item.is_dir():
            tgt_item.mkdir()
            logger.info(f'added {tgt_item}')
        else:
            shutil.copy2(src_item, tgt_item)
            logger.info(f'added {tgt_item}')

    # Overwrite (update) items that are different in target
    shared_items = src_fp.keys() & tgt_fp.keys()
    for item in shared_items:
        if set(Path(item).parts) & names_to_ignore:
            continue
        if src_fp[item] != tgt_fp[item]:
            src_item = src_dir / item
            tgt_item = tgt_dir / item
            # Do not act on different fingerprints of a directory
            if not src_item.is_dir():
                shutil.copy2(src_item, tgt_item)
                logger.info(f'updated {tgt_item}')

    # Delete or report items in target that are not present in source
    items_only_in_tgt = sorted(tgt_fp.keys() - src_fp.keys())
    for item in items_only_in_tgt:
        if set(Path(item).parts) & names_to_ignore:
            continue
        if cleanup:
            tgt_item = tgt_dir / item
            if tgt_item.is_dir():
                shutil.rmtree(tgt_item)
                logger.info(f'deleted {tgt_item}')
            else:
                if tgt_item.exists():  # Can be deleted with directory
                    tgt_item.unlink()
                    logger.info(f'deleted {tgt_item}')
        else:
            logger.warning(f'only in {tgt_dir}: {item}')

Synchronise directory tree from source to target.

Arguments:

src_dir: source directory
tgt_dir: target directory
ignore: filenames to ignore
cleanup: target items that are not present in source will be deleted

The directory tree with src_dir as root is synchronised in one direction to tgt_dir. Items missing in the target tree will be copied and shared items with different content will be updated (overwritten).

Optionally items in the target tree that are not present in the source tree can be deleted as well. This option can be dangerous to use when an item is deleted in the source by accident. After completing the sync the mirrored item in the target tree will be deleted as well. When this option is not used, warnings will be logged for such items.

All relevant actions will be logged to console. This can be overridden by configuring a logger named 'logger' in the namespace from where this function is used.

Implementation note: To compare the source and target directory trees fingerprints are created for both trees. These fingerprints are dictionaries with the path relative to the topmost directory as key and the modification time as value. The various sync actions are performed by comparing the fingerprints of the these two trees.