bd_www.matomo

Functions to collect web analytics data for scraping and reporting.

The main functions of this module are:

The update_analytics function bundles these two main functions with default parameters for convenience.

Functions used by the bd_www.scrape module are:

  • matomo_available to test the operational status of the web analytics server
  • urs_last_days to get all urls that were recently requested

Note All functions of this module are specific for the Matomo implementation and configuration on the Belastingdienst infrastructure for www.belastingdienst.nl. When new webanalytics tooling is replacing Matomo, and the function signatures can remain the same, coding changes could be limited to this module only.

View Source
"""
***Functions to collect web analytics data for scraping and reporting.***

The main functions of this module are:

- `period_metrics` for getting web statistics
- `period_feedback` for getting user feedback

The `update_analytics` function bundles these two main functions with
default parameters for convenience.

Functions used by the `bd_www.scrape` module are:

- `matomo_available` to test the operational status of the web analytics server
- `urs_last_days` to get all urls that were recently requested

***Note***
*All functions of this module are specific for the Matomo implementation and
configuration on the Belastingdienst infrastructure for
www.belastingdienst.nl. When new webanalytics tooling is replacing Matomo,
and the function signatures can remain the same, coding changes could be
limited to this module only.*
"""

import datetime as dt
import logging
import matomo_api as ma
import requests
import re
import sys
import time
from datetime import date, timedelta
from typing import Callable, Union
from urllib.parse import urlparse

from bd_www import matomo_conf, mst_conn, mst_dir
from bd_www.constants import WWW_SITE, ROOT_PATH, ROOT_URL, DL_SITE

METRICS = ['nb_visits', 'nb_hits', 'entry_nb_visits',
           'entry_bounce_count', 'exit_nb_visits']
"Matomo metrics that will be requested for all pages."
VISIT_SEGMENTS = {
    'organic_entries': ['entry_nb_visits', (60,)],
    'call_visits': ['nb_visits', (722,)]
}
"Dictionary constant guiding the retrieval of the segmented metrics. The " \
    "key refers to the fieldname in the `daily` table of the metrics " \
    "database. The list value contains the (Matomo) name of the metric and a " \
    "tuple with segment id's that will be used to retrieve the highest " \
    "metric value. Segments used are: " \
    "**Id.60**: *Bron - zoekmachine*; " \
    "**Id.722**: *Bellen pagina bezocht*."
CUST_REP_IDS = [133]
"Id's of the Matomo custom reports used for retrieving user feedback: " \
    "**Id.133**: *ja/nee and textual feedback all content (data since " \
    "28-11-2021)*"
DL_DATA = ['url', 'nb_visits', 'nb_hits']
"Matomo data that will be requested for all downloads."

logger = logging.getLogger()


def update_analytics() -> bool:
    """
    **Update the analytics data.**

    Returns:

        success of the execution (only False if Matomo is not available)

    Convenience bundling of functions with which the analytics data in the
    metrics database is updated.

    Basically this function executes the `period_metrics`, `period_feedback`
    and `period_downloads` functions consecutively with default parameters.
    This will result in completing the data from the first day for which no
    data has been stored yet, until and including yesterday.

    Log messages will be written to the configured log file (see the [MATOMO]
    section of `bd_www.Config`).

    ***Implementation note:***
    *Since the predictability when Matomo has finished processing previously
    gathered data is rather low, the `period_metrics` function will wait
    until yesterday's data has been processed. This approach can add quite
    some time to gather all analytics data. Typically total elapsed time
    accounts for some hours, but exceptionally it can raise to more than
    twelve. To avoid a clash with a next cycle of getting analytics data,
    a flag is tested upon, raised if not present and exits unsuccessfully
    otherwise.*
    """

    # Setup logging
    global logger
    logger = logging.getLogger('matomo')
    logger.setLevel(logging.INFO)
    log_file = str(mst_dir / matomo_conf.log_name)
    fh = logging.FileHandler(log_file)
    fh.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)

    if matomo_available():
        # Check if Matomo is being requested already
        flag = mst_dir / 'METRICS_REQUEST_IN_PROGRESS.flag'
        if flag.exists():
            ct = dt.datetime.fromtimestamp(flag.stat().st_ctime)
            logger.warning(f'Another metrics request running since '
                           f'{ct.strftime("%a %d %b %Y / %H:%M")}')
            return False
        else:
            flag.touch()
        period_metrics()
        period_feedback()
        period_downloads()
        flag.unlink(missing_ok=True)
        return True
    else:
        logger.warning(
            'Matomo not available: metrics database not updated')
        return False


def matomo_available() -> bool:
    """
    **Tests whether the Matomo API is available.**

    Returns:

        Availability.
    """

    # noinspection PyBroadException
    try:
        api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
        respons = mapi_request(api.API().getMatomoVersion, ma.format.json)
        if respons.status_code == 200 and 'maintenance' not in respons.text:
            ok = True
        else:
            ok = False
    except Exception:
        ok = False
    finally:
        del api
    return ok


def period_metrics(last_date: date = date.today() - timedelta(days=1),
                   first_date: date = None,
                   replace: bool = False) -> None:
    """
    **Store metrics for all pages alive during a period.**

    Arguments:

        last_date: final date of storing period
        first_date: starting date of storing period
        replace: replace previous metrics

    Page metrics will be requested and stored in the metrics database for all
    living pages on each day in the period bounded by `first_date` and
    `last_date` inclusive. Data that is already available for a specific day
    of the period, will be checked on completeness for all living pages and
    complemented where needed.

    The `last-date` parameter defaults to yesterday, which is also the
    maximum allowed value. If it exceeds this date, it will be reset to
    yesterday.

    When requesting metrics for yesterday, a wait/test cycle is used to
    guarantee the availability of these metrics before requesting and using
    them. This will be done for a maximum time of 20 hours. When the
    necessary set of metrics is still not available this will be logged as
    warning and no metrics will be stored for that day.

    If `first_date` equals to `None` (default) the period will start at the
    first day for which no metrics have been stored yet. If no metrics have
    been stored at all, `first_date` will be set to one week before the date
    of the first scrape.

    When metrics have been stored after the second last scrape (assuming that
    the last scrape has just been run when this function is used), new pages
    might have appeared afterwards for which no metrics are stored yet. In
    this situation the `first_date` is adapted to the date of this second
    last scrape.

    When `replace` is `True`, any previously stored metrics for the period
    will be purged before storing new data.

    ***Implementation note:***
    *The metrics database uses `page_id`'s from the scrapes database to store
    the metrics data. To guard against relational errors between these two
    databases, a shadow of the* mst_paths *table of the scrapes database is
    maintained as* shd_paths *table in the metrics database. This shadowed
    table contains all paths that are actually used for storing the metrics
    data. Each time this function is called, this inter-db relation is
    verified to be in sync. Failing this condition is logged as critical and
    no metrics will be added.*
    """

    global logger
    if logger.name == 'root':
        # Logging not set up yet, then only log to console
        logger = logging.getLogger('per_metrics')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Check if the paths table of the metrics database is a subset of the
    # scrapes database.
    if mst_conn.execute('''
            SELECT page_id
            FROM shd_paths AS shadow
            LEFT JOIN mst_paths AS original USING (page_id)
            WHERE shadow.path != original.path
            ''').fetchall():
        logger.critical('paths in the metrics and scrapes databases are not '
                        'in sync: no metrics were added')
        return

    # Check and set the first and last date for collecting metrics
    yesterday = date.today() - timedelta(days=1)
    last_date = min(last_date, yesterday)
    if not first_date:
        qry = 'SELECT max(date) FROM daily LEFT JOIN dates USING (date_id)'
        qry_result = mst_conn.execute(qry).fetchone()
        if qry_result[0]:
            first_date = date.fromisoformat(qry_result[0]) + timedelta(days=1)
        else:
            qry_result = mst_conn.execute(
                'SELECT min(date) FROM mst_scrapes').fetchone()
            first_scr_date = date.fromisoformat(qry_result[0])
            first_date = first_scr_date - timedelta(days=7)

    # Metrics saved for dates after the scrape previous to the last should be
    # checked on completeness.
    qry = '''
        SELECT max(date)
        FROM mst_scrapes
        WHERE date < (SELECT max(date) FROM mst_scrapes)'''
    prev_last_scr_date = mst_conn.execute(qry).fetchone()[0]
    if prev_last_scr_date:
        first_date = min(date.fromisoformat(prev_last_scr_date), first_date)

    # Get metrics for each day in the date range
    logger.info(f'retrieving metrics started')
    num_days = (last_date - first_date).days + 1
    for md in [first_date + timedelta(days=d) for d in range(num_days)]:

        if md == yesterday:
            ok = wait_for_metrics_yesterday(max_hours=20)
            if not ok:
                logger.warning('yesterdays metrics not fully available')
                return

        # Create view with pages that were alive at the first scrape after the
        # metrics date or died since the previous scrape.
        mst_conn.execute('DROP VIEW IF EXISTS pages_at_date')
        mst_conn.execute(f'''
            CREATE TEMPORARY VIEW pages_at_date AS
            WITH
                -- next timestamp after date
                next (ts) AS (
                    SELECT ifnull(min(timestamp),
                                  (SELECT max(timestamp) FROM mst_scrapes))
                    FROM mst_scrapes
                    WHERE date > '{md}'
                ),
                pages_life_value_at_date AS (
                    SELECT DISTINCT
                        page_id,
                        last_value(alive) OVER win AS alive
                    FROM his_pages_life
                    WHERE timestamp <= (SELECT ts FROM next)
                    WINDOW win AS (
                        PARTITION BY page_id
                        ORDER BY timestamp
                        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
                    )
                )
            -- living pages at next.ts plus the ones that died after the 
            -- previous scrape (since any could still be alive for this date)
            SELECT page_id
            FROM pages_life_value_at_date
            WHERE alive
                UNION
            SELECT page_id
            FROM his_pages_life
            WHERE timestamp = (SELECT ts FROM next) AND NOT alive
            ''')

        # Copy any needed page_id/path combination from the scrapes database
        # to the shadowed paths table of the metrics database.
        mst_conn.execute('''
            WITH
                missing_pages AS (
                    SELECT page_id
                    FROM pages_at_date
                    LEFT JOIN shd_paths USING (page_id)
                    WHERE path ISNULL
                )
            INSERT INTO shd_paths (page_id, path)
            SELECT page_id, path
            FROM mst_paths
            WHERE page_id IN missing_pages
            ''')

        # Get the set of pages for which data is needed
        if replace:
            # Delete all previous data for the date
            mst_conn.execute(f'''
                DELETE FROM daily
                WHERE date_id = (
                    SELECT date_id
                    FROM dates
                    WHERE date = '{md}'
                )
                ''')
            # Get all pages that were alive at the date
            qry = '''
                SELECT page_id, path
                FROM pages_at_date
                LEFT JOIN mst_paths USING (page_id)
                '''
        else:
            # Get all pages that were alive at the date and had no metrics
            # stored yet for that date.
            qry = f'''
            WITH
                -- Metrics stored for the selected date
                stored_metrics AS (
                    SELECT page_id
                    FROM daily
                    LEFT JOIN dates USING (date_id)
                    WHERE date = '{md}'
                )
            SELECT page_id, path
            FROM pages_at_date
            LEFT JOIN mst_paths USING (page_id)
            LEFT JOIN stored_metrics USING (page_id)
            WHERE stored_metrics.page_id IS NULL
            '''
        pages: list[tuple[int, str]] = mst_conn.execute(qry).fetchall()
        if not pages:
            logger.info(f'no (additional) metrics needed for {md}')
            continue

        # Get or set the date_id to store the dataset with
        qry_result = mst_conn.execute(
            f'SELECT date_id FROM dates WHERE date = "{md}"').fetchone()
        if qry_result:
            date_id = qry_result[0]
        else:
            # Seems to be a new date to register
            date_id = mst_conn.execute(
                'INSERT INTO dates (date) VALUES (?)',
                [md.isoformat()]).lastrowid

        # Get and save basic metrics and segmented metrics
        fields = METRICS + list(VISIT_SEGMENTS.keys())
        metrics = day_metrics(pages, md)
        # Reduce the set of pages to only those that got metrics data. The
        # logic behind this reduction is that if a page did not get any
        # metrics, there will be no segmented metrics for the page either.
        pages = [(page_id, path) for (page_id, path)
                 in pages if page_id in metrics]
        if pages:
            seg_metrics = day_segmented_metrics(pages, md)
            values = {page_id: metrics[page_id] + seg_metrics[page_id]
                      for page_id, _ in pages}
            # Implementation note:
            # To insert the metrics into the database a single, rather long
            # sql statement is constructed to insert all rows in one go. That
            # proved to be way faster than using a Python loop to insert them
            # one row at a time.
            save_qry = f'''
                INSERT INTO daily
                    (date_id, page_id, {", ".join(fields)})
                VALUES
                    {','.join([
                        f'({date_id}, {page_id}, ' + 
                        ', '.join(
                            [str(val) if val else 'NULL' for val in vals]
                        ) + ')'
                        for page_id, vals in values.items()
                        ])
                    }'''
            mst_conn.execute(save_qry)
        msg = f'metrics for date {md} saved for {len(pages)} pages'
        logger.info(msg)
        print(msg)

    return


def wait_for_metrics_yesterday(
        interval_minutes: float = 5, max_hours: float = 12) -> bool:
    """
    **Wait until yesterday's metrics are available.**

    Arguments:

        interval_minutes: poll interval for successive tests
        max_hours: maximum time to wait for available metrics

    Returns:

        Success status

    The test/wait approach is necessary because of the unreliable nature of
    the Matomo operations. The function will repeatedly (with an interval
    determined by the `interval_minutes` parameter) request metrics with the
    next segments applied:

    - No segment
    - 'Bron - zoekmachine'
    - 'KPI bellen paginas'
    - 'Bellen pagina bezocht'

    After requesting metrics for each segment returns a reasonable amount of
    records (100), the cyclic requesting continues with an interval of 60
    seconds until the last three responses for each segment are the same.
    After that the function will return a positive status.

    When `max_hours` is exceeded without reaching success, the function will
    return a negative status.
    """
    # Initialisations
    segments = {}
    for name in (
            'None',
            'Bron - zoekmachine',
            'KPI bellen paginas',
            'Bellen pagina bezocht'):
        segments[name] = {'definition': '',
                          'records': [0]}
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    yesterday = date.today() - timedelta(days=1)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json \
        | ma.flat() | ma.filter_limit(-1) | ma.period.day \
        | ma.date.YMD(yesterday.isoformat())
    for seg in mapi_request(api.SegmentEditor().getAll, pars).json():
        if seg['name'] in segments:
            segments[seg['name']]['definition'] = seg['definition']
    start = time.time()

    # Initialise test cycle
    cycle = 0
    threshold_records = 100
    threshold = False
    stable = False

    # Test cycle: get metrics until all are above the threshold and stable
    while not stable:
        cycle += 1

        # Query metrics totals and add number of response records to segments
        # library.
        for seg_name in segments:
            mresp = mapi_request(
                api.Actions().getPageUrls, pars
                | {'segment': segments[seg_name]['definition']})
            if 'result' in mresp.json():
                records_in_response = 0
            else:
                records_in_response = len(mresp.json())
            segments[seg_name]['records'].append(records_in_response)

        # Print progress
        leader = time.strftime('%H:%M') + ' - records: '
        numbers = ' / '.join(
            [str(segments[n]['records'][-1]) for n in segments])
        print(leader + numbers)

        # Test if threshold reached for all metrics
        if not threshold:
            threshold = True
            for seg_name in segments:
                threshold = threshold and (
                        segments[seg_name]['records'][-1] > threshold_records)
            if threshold:
                logger.info('testing metrics: threshold reached')

        # Check if last metrics are constant
        if cycle >= 3 and threshold:
            stable = True
            for seg_name in segments:
                stable = stable and (
                        len(set(segments[seg_name]['records'][-3:])) == 1)
            if stable:
                logger.info('testing metrics: stable results')

        # Wait for next cycle if needed
        if not stable:
            if threshold:
                wait_secs = 60
            else:
                wait_secs = \
                    (start + cycle * interval_minutes * 60) - time.time()
            if wait_secs > 0:
                time.sleep(wait_secs)

        # Test on max waiting time
        if (time.time() - start) / 3600 > max_hours:
            break

    return stable


def mapi_request(
        request: Callable[[ma.module_methods.QryDict],
                          requests.Response],
        qry_pars: ma.module_methods.QryDict,
        max_retry: int = 4) -> Union[requests.Response, None]:
    """**Fail proof requester for the Matomo API.**

    Arguments:

        request: ModActions method from the matomo_api module
        qry_pars: parameters to complete the request
        max_retry: number of times to retry a failing request

    Returns:

        request response or None if not successful

    Sometimes Matomo will refuse an API-request for unknown reasons.
    Experience teaches that repeating such a request is often enough to
    circumvent this behaviour. This function wraps such an API-request,
    so that it is repeated on failure.

    Upon failure, the first retry of the request is issued after one second.
    Each subsequent retry that is needed will add one second to this interval.
    """

    attempt_interval = 1
    sys.tracebacklimit = 0  # To prohibit a lot of console rubbish
    for attempt in range(max_retry + 1):
        # noinspection PyBroadException
        try:
            req_resp = request(qry_pars)
            return req_resp
        except Exception:
            logger.exception(f'retrying after {attempt_interval} seconds')
            time.sleep(attempt_interval)
            attempt_interval += 1
            continue
    logger.critical(f'Matomo server did not return a successful response '
                    f'after {max_retry} retries')
    sys.tracebacklimit = 1000
    return None


def day_metrics(pages: list[tuple[int, str]], d: date) -> dict[int, tuple]:
    """
    **Retrieve day metrics for requested pages.**

    Arguments:

        pages: list of (page_id, path) tuples for which the metrics are
            requested
        d: date for which the metrics are requested

    Returns:

        dictionary with page_id as key and tuples with metrics as values

    The tuples in the returned dictionary contain the metrics per page
    corresponding with the names in the `METRICS` constant list. Visits with
    a value of zero are returned as `None`.
    """

    metrics: dict[int, tuple] = {}
    "metrics[page_id] = (metric values)"

    # Get metrics with bulk request
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json \
        | ma.flat() | ma.filter_limit(-1) \
        | ma.period.day | ma.date.YMD(d.isoformat())
    resp = mapi_request(api.Actions().getPageUrls, pars)
    if not resp:
        logger.error(f'requesting basic metrics failed for date {d}')
        return {}
    bulk_resp = {row['label']: row for row in resp.json()}

    # Get metrics for each page from bulk response or request them individually
    # otherwise.
    none_result = tuple(None for _ in range(len(METRICS)))
    for i, (page_id, path) in enumerate(pages):
        if i % 100 == 0:
            print(f'getting basic metrics for date {d}: {i}/{len(pages)}')
        full_path = ROOT_PATH + path
        page_resp = bulk_resp.get(full_path)
        if not page_resp:
            resp = mapi_request(api.Actions().getPageUrl,
                                pars | ma.pageUrl(WWW_SITE + full_path))
            if resp:
                if resp.json():
                    page_resp = resp.json()[0]
                else:
                    continue
            else:
                logger.error(
                    f'requesting basic metrics with API Actions.getPageUrl '
                    f'for path {full_path} failed for date {d}')
                continue
        metrics_data = tuple([page_resp.get(m, None) for m in METRICS])
        if metrics_data == none_result:
            continue
        metrics[page_id] = metrics_data

    return metrics


def day_segmented_metrics(
        pages: list[tuple[int, str]], d: date) -> dict[int, tuple]:
    """
    **Retrieve segmented metrics for specified pages.**

    Arguments:

        pages: list of (page_id, path) tuples for which the metrics are
            requested
        d: date for which the metrics are requested

    Returns:

        dictionary with page_id as key and tuples with segmented metrics as
            values

    The tuples in the returned dictionary contain the segmented metrics per
    page corresponding with the keys of the `VISIT_SEGMENTS` constant
    dictionary. Metrics with a value of zero are returned as `None`.
    """

    full_paths = [ROOT_PATH + path for page_id, path in pages]
    seg_metrics: dict[str, dict[int, int]] = {}
    "seg_metrics[field][page_id] = metrics"

    for field, (metric_name, seg_ids) in VISIT_SEGMENTS.items():

        # Initialise all metrics to zero
        metrics = {path: 0 for path in full_paths}
        "metrics[path] = metrics"

        for seg_id in seg_ids:

            # Get segment definition
            api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
            pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json
            resp = mapi_request(api.SegmentEditor().get,
                                pars | {'idSegment': seg_id})
            seg_def = resp.json()['definition']

            # Get segmented metric with bulk request
            pars |= ma.flat() | ma.filter_limit(-1) \
                | ma.period.day | ma.date.YMD(d.isoformat())
            resp = mapi_request(api.Actions().getPageUrls,
                                pars | {'segment': seg_def})
            if not resp:
                logger.error(
                    f'requesting {metric_name} with segment {seg_id} '
                    f'failed for date {d}')
                return {}
            if not resp.json():
                continue
            if 'result' in resp.json() and resp.json()['result'] == 'error':
                logger.error(
                    f'requesting {metric_name} with segment {seg_id} for '
                    f'date {d} returned: {resp.json()["message"]}')
                continue
            bulk_resp = {row['label']: int(row.get(metric_name, 0))
                         for row in resp.json() if row['label'] in full_paths}

            # Get metric for each page from bulk response (if it is greater
            # than what we got already with another segment) or request it
            # individually otherwise.
            for cnt, path in enumerate(full_paths):
                if cnt % 100 == 0:
                    print(f'getting {field} for date {d} '
                          f'with segment {seg_id}: {cnt}/{len(pages)}')
                if path in bulk_resp:
                    metrics[path] = max(metrics[path], bulk_resp[path])
                else:
                    resp = mapi_request(
                        api.Actions().getPageUrl, pars
                        | ma.pageUrl(WWW_SITE + path)
                        | {'segment': seg_def})
                    if not resp:
                        logger.error(
                            f'requesting {metric_name} with API Actions.'
                            f'getPageUrl for path {path} failed for date {d}')
                        continue
                    if resp.json():
                        metrics[path] = max(
                            metrics[path],
                            int(resp.json()[0].get(metric_name, 0)))

        # Convert key from (full) path to page_id and set zero values to None
        seg_metrics[field] = {page_id: None
                              if metrics[full_path := ROOT_PATH + path] == 0
                              else metrics[full_path]
                              for page_id, path in pages}

    # Combine field values per page before returning
    return {page_id: tuple(seg_metrics[field][page_id] for field in seg_metrics)
            for page_id, _ in pages}


def period_feedback(last_date: date = date.today() - timedelta(days=1),
                    first_date: date = None) -> None:
    r"""
    **Store all feedback given during a period.**

    Arguments:

        last_date: final date of storing period
        first_date: starting date of storing period

    For each day in the period bounded by `first_date` and `last_date`
    inclusive, all feedback that is registered by Matomo is requested and
    stored in the `feedback` table of the metrics database. While storing
    data for a specific date, any potentially available feedback data for
    this date will be purged.

    The `last-date` parameter defaults to yesterday, which is also the
    maximum allowed value. If it exceeds this date, it will be reset to
    yesterday.

    If `first_date` equals to `None` (default) the period will start at the
    first day for which no feedback has been stored yet. If no feedback has
    been stored at all, `first_date` will be set to one week before the date
    of the first scrape.

    Next data is stored for a specific date and each relevant page:

    - `pos_cnt`: total number of positive feedbacks
    - `neg_cnt`: total number of negative feedbacks
    - `pos_txt`: all positive textual feedback, separated by newlines
    - `neg_txt`: all negative textual feedback, separated by newlines

    ***Note 1:***
    *Requesting feedback from Matomo using the API Events.getAction method
    for each day suffers dramatically from data loss (most probable as a
    result of daily data aggregation). The custom reports that historically
    have been made to circumvent this problem are much more reliable,
    but suffer from historic operational hiccups. For this reason all
    available methods to retrieve the data are used. From these (competing)
    sets of information the highest numbers for positive and negative
    feedback are selected, as well as the longest lists for positive and
    negative textual feedback.*

    *Besides the Events.getAction API method, custom reports are
    used as set by the `CUST_REP_IDS` constant.*

    ***Note 2:***
    *Feedback tagging changed during 2021 as a consequence of the added
    possibility to enter textual feedback. The next examples represent the
    situation before and after the change.*

    *Label before:*

        feedback.nee - /nl/home/content/uitgelogd-mijn-belastingdienst[]
        |>|[bld-dv-content]feedback|feedback.nee|

    *Label after:*

        feedback.nee - /nl/home/content/uitgelogd-mijn-belastingdienst[]
        |>|[bld-dv-content]feedback
        |>|feedback.nee
        |>|example of a feedback text
    """

    def store_fb(fb: dict, p: str, ds: str, ea: str,
                 ec: int, ft: str) -> bool:
        """
        **Store the feedback that is retrieved from some datasource.**

        Arguments:

            fb: dictionary to contain all feedback data
            p: path for which feedback is given
            ds: datasource
            ea: event action
            ec: event count
            ft: feedback text

        Returns:

            True if ea is recognized as valid event action, False otherwise
        """
        # Initialise (sub)dictionary if not available yet
        if p not in fb:
            fb[p] = {ds: {'pos_cnt': 0, 'neg_cnt': 0,
                          'pos_txt': [], 'neg_txt': []}}
        elif ds not in fb[p]:
            fb[p][ds] = {'pos_cnt': 0, 'neg_cnt': 0,
                         'pos_txt': [], 'neg_txt': []}
        # Save the various values in the feedback dictionary
        if ea == 'feedback.ja':
            fb[p][ds]['pos_cnt'] += ec
            if ft:
                fb[p][ds]['pos_txt'].append(ft)
        elif evt_act == 'feedback.nee':
            fb[p][ds]['neg_cnt'] += ec
            if ft:
                fb[p][ds]['neg_txt'].append(ft)
        else:
            return False
        return True

    global logger
    if logger.name == 'root':
        # Logging not set up yet, then only log to console
        logger = logging.getLogger('per_feedback')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Check if the paths table of the metrics database is a subset of the
    # scrapes database.
    if mst_conn.execute('''
            SELECT page_id
            FROM shd_paths AS shadow
            LEFT JOIN mst_paths AS original USING (page_id)
            WHERE shadow.path != original.path
            ''').fetchall():
        logger.critical('paths in the metrics and scrapes databases are not '
                        'in sync: no feedback added')
        return

    # Check and set the first and last date for collecting feedback
    last_date = min(last_date, date.today() - timedelta(days=1))
    if not first_date:
        qry = 'SELECT max(date) FROM feedback LEFT JOIN dates USING (date_id)'
        qry_result = mst_conn.execute(qry).fetchone()
        if qry_result[0]:
            first_date = date.fromisoformat(qry_result[0]) + timedelta(days=1)
        else:
            qry_result = mst_conn.execute(
                'SELECT min(date) FROM mst_scrapes').fetchone()
            first_scr_date = date.fromisoformat(qry_result[0])
            first_date = first_scr_date - timedelta(days=7)

    # Get feedback for each day in the period
    logger.info(f'retrieving feedback started')
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.period.day \
        | ma.format.json | ma.flat(True) | ma.filter_limit(-1)
    num_days = (last_date - first_date).days + 1
    for fd in [first_date + timedelta(days=d) for d in range(num_days)]:

        # Set date to use timestamped views ('tsd_...') for specific scrape
        mst_conn.executescript(f'''
            DELETE FROM tsd;
            INSERT INTO tsd
            SELECT ifnull(min(timestamp),
                          (SELECT max(timestamp) FROM mst_scrapes))
            FROM mst_scrapes
            WHERE date > '{fd}'
            ''')

        # Initialise store to gather all relevant feedback data
        feedback = {}

        # Get feedback via filtered Events.getAction API method
        dated_pars = pars | ma.date.YMD(fd.isoformat())
        resp = mapi_request(api.Events().getAction,
                            dated_pars | ma.filter_pattern('^feedback.'))
        if not resp:
            logger.error(f'getting actions via '
                         f'Events.getAction API method failed for {fd}')
        else:
            for row in resp.json():
                # Stripping dot in next line to correct historic tagging
                # mistake.
                evt_act = row['Events_EventAction'].strip('.')
                evt_cnt = row['nb_visits']
                label_parts = row['label'].split('|>|')
                if len(label_parts) > 3:
                    fb_txt = re.sub(r'(\n)+', '/', label_parts[3])
                else:
                    fb_txt = ''
                evt_name = row['Events_EventName']
                path = evt_name.split('|>|')[0].strip('[]')
                path = path.split('#')[0].split('?')[0]
                path_ok = mst_conn.execute(f'''
                    SELECT *
                    FROM shd_paths
                    WHERE path = ?
                    ''', [path]).fetchone()
                if not path_ok:
                    qry_result = mst_conn.execute(f'''
                        SELECT path
                        FROM scr_redirs
                        LEFT JOIN mst_paths ON redir_id = page_id
                        WHERE redir_type = 'alias' AND req_url = ?
                        ''', [ROOT_URL + path]).fetchone()
                    if qry_result:
                        path = qry_result[0]
                        logger.debug(
                            f'getAction - alias path replaced by wcm path '
                            f'for feedback at {fd}: {row["label"]}')
                    else:
                        logger.debug(
                            f'getAction - abnormal path in '
                            f'feedback tag label at {fd}: {row["label"]}')
                        # Do not handle this one, so skip to next row
                        continue
                store_success = store_fb(
                    feedback, path, 'api.ga', evt_act, evt_cnt, fb_txt)
                if not store_success:
                    logger.debug(f'strange action from api.getAction '
                                 f'for {path} at {fd}: {evt_act}')

        # Get feedback data via relevant custom reports
        for cr_id in CUST_REP_IDS:
            resp = mapi_request(api.CustomReports().getCustomReport,
                                dated_pars | ma.idCustomReport(cr_id))
            if not resp:
                logger.error(
                    f'getting custom report {cr_id} failed for {fd}')
                continue
            for row in resp.json():
                try:
                    # Stripping dot in next line to correct historic tagging
                    # error.
                    evt_act = row['Events_EventAction'].strip('.')
                except KeyError:
                    # Not a valid row, probably with label 'Overige'
                    logger.debug(f'invalid row in custom report '
                                 f'{cr_id} for {fd}: {row}')
                    continue
                evt_cnt = row['nb_visits']
                label_parts = row['label'].split('|>|')
                if len(label_parts) > 3:
                    fb_txt = re.sub(r'(\n)+', '/', label_parts[3])
                else:
                    fb_txt = ''
                evt_url = row['Events_EventUrl']
                # Matomo evt_url's start with the domain and can also be
                # url's to be redirected, aliases or short url's. The latter
                # situation implicates that the domain can also be
                # 'toeslagen.nl' or 'douane.nl'.
                path = evt_url[evt_url.find('/'):]
                if path.startswith(ROOT_PATH):
                    path = path.replace(ROOT_PATH, '')
                evt_url = 'https://www.' + evt_url
                wcm_path_result = mst_conn.execute(f'''
                    SELECT path
                    FROM tsd_redirs_to_wcm
                    Left JOIN mst_paths ON wcm_id = page_id
                    WHERE req_url = ?
                    ''', [evt_url]).fetchone()
                if wcm_path_result:
                    # evt_url is an alias, short_url or url to be redirected
                    path = wcm_path_result[0]
                    logger.debug(
                        f'cr{cr_id} - EventUrl replaced by wcm path '
                        f'for feedback at {fd}: {row["label"]}')
                else:
                    # Check if it is a wcm url
                    path_ok = mst_conn.execute(f'''
                        SELECT page_id
                        FROM shd_paths
                        WHERE path = ?''', [path]).fetchone()
                    if not path_ok:
                        logger.debug(
                            f'cr{cr_id} - strange url in '
                            f'feedback tag at {fd}: {row["label"]}')
                        # Do not handle this one, so skip to next row
                        continue
                store_success = store_fb(
                    feedback, path, f'cr{cr_id}', evt_act, evt_cnt, fb_txt)
                if not store_success:
                    logger.debug(f'strange action from cr{cr_id} '
                                 f'for {path} at {fd}: {evt_act}')

        # Get or set the date_id to store the dataset with and delete any
        # previous feedback data.
        qry_result = mst_conn.execute(
            f'SELECT date_id FROM dates WHERE date = "{fd}"').fetchone()
        if qry_result:
            date_id = qry_result[0]
            mst_conn.execute(
                f'DELETE FROM feedback WHERE date_id = {date_id}')
        else:
            # Seems to be a new date to register
            date_id = mst_conn.execute(
                'INSERT INTO dates (date) VALUES (?)',
                [fd.isoformat()]).lastrowid

        # Select which of the available data from the various data sources
        # will be used for this date and store it in the metrics database.
        # The criterium to select the data is the largest count and most
        # textual feedback.
        for path in feedback:
            # Decide on the number op positive feedbacks
            pos_data = {feedback[path][data_src]['pos_cnt']
                        for data_src in feedback[path]}
            pos_cnt = max(pos_data)
            # Decide on the number op negative feedbacks
            neg_data = {feedback[path][data_src]['neg_cnt']
                        for data_src in feedback[path]}
            neg_cnt = max(neg_data)
            # Decide on the positive textual feedback
            pos_txt = [feedback[path][data_src]['pos_txt']
                       for data_src in feedback[path]]
            pos_txt = sorted(pos_txt, key=len, reverse=True)[0]
            # noinspection PyTypeChecker
            pos_txt = '\n'.join(sorted(pos_txt, key=str.lower))
            # Decide on the negative textual feedback
            neg_txt = [feedback[path][data_src]['neg_txt']
                       for data_src in feedback[path]]
            neg_txt = sorted(neg_txt, key=len, reverse=True)[0]
            # noinspection PyTypeChecker
            neg_txt = '\n'.join(sorted(neg_txt, key=str.lower))

            qry = f'''
                INSERT INTO feedback
                    (date_id, page_id, pos_cnt, neg_cnt, pos_txt, neg_txt)
                SELECT
                    {date_id}, page_id,
                    {pos_cnt}, {neg_cnt},
                    ?, ?
                FROM shd_paths
                WHERE path = '{path}' '''
            mst_conn.execute(qry, [pos_txt if pos_txt else None,
                                   neg_txt if neg_txt else None])

        logger.info(f'page feedback stored for {fd}')

    logger.info(f'retrieving feedback finished')


def period_downloads(last_date: date = date.today() - timedelta(days=1),
                     first_date: date = None) -> None:
    """
    **Store number of downloads during a period.**

    Arguments:

        last_date: final date of storing period
        first_date: starting date of storing period

    For each day in the period bounded by `first_date` and `last_date`
    inclusive, download data is requested from Matomo and
    stored in the `downloads` table of the metrics database. While storing
    data for a specific date, any potentially available download data for
    this date will be purged.

    The `last-date` parameter defaults to yesterday, which is also the
    maximum allowed value. If it exceeds this date, it will be reset to
    yesterday.

    If `first_date` equals to `None` (default) the period will start at the
    first day for which no feedback has been stored yet. If no feedback has
    been stored at all, `first_date` will be set to one week before the date
    of the first scrape.

    Next data is stored for a specific date and each relevant download:

    - `nb_visits`: number of unique downloads (per visit)
    - `nb_hits`: number of downloads

    """

    global logger
    if logger.name == 'root':
        # Logging not set up yet, then only log to console
        logger = logging.getLogger('per_downloads')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Check if the links table of the metrics database is a subset of the
    # scrapes database.
    if mst_conn.execute('''
            SELECT link_id
            FROM shd_links AS shadow
            LEFT JOIN mst_links AS original USING (link_id)
            WHERE shadow.url != original.url
            ''').fetchall():
        logger.critical('links in the metrics and scrapes databases are not '
                        'in sync: no download data added')
        return

    # Check and set the first and last date
    last_date = min(last_date, date.today() - timedelta(days=1))
    if not first_date:
        qry = 'SELECT max(date) FROM downloads LEFT JOIN dates USING (date_id)'
        qry_result = mst_conn.execute(qry).fetchone()
        if qry_result[0]:
            first_date = date.fromisoformat(qry_result[0]) + timedelta(days=1)
        else:
            qry_result = mst_conn.execute(
                'SELECT min(date) FROM mst_scrapes').fetchone()
            first_scr_date = date.fromisoformat(qry_result[0])
            first_date = first_scr_date - timedelta(days=7)

    logger.info(f'retrieving download data started')

    # Create temporary table to receive the data for one day
    mst_conn.execute('''
        CREATE TEMPORARY TABLE IF NOT EXISTS dl_data (
            url			TEXT PRIMARY KEY NOT NULL,
            nb_visits	INTEGER,
            nb_hits		INTEGER
        )
        ''')

    # Prepare the Matomo API query
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.period.day \
        | ma.format.json | ma.flat(True) | ma.filter_limit(-1) \
        | ma.filter_pattern_recursive('^' + urlparse(DL_SITE).netloc)

    # Cycle over each day within the period
    num_days = (last_date - first_date).days + 1
    for dd in [first_date + timedelta(days=d) for d in range(num_days)]:

        # Request data for this date
        resp = mapi_request(api.Actions().getDownloads,
                            pars | ma.date.YMD(dd.isoformat()))
        if not resp:
            logger.error(
                f'requesting download data with API Actions.getDownloads '
                f'failed for date {dd}')
            continue  # with next date
        if not resp.json():
            logger.warning(f'no download data for date {dd}')
            continue  # with next date
        # Select relevant data
        values = []
        for dl in resp.json():
            if 'url' not in dl:
                # Reject the aggregate row
                continue
            vals = tuple([dl.get(d, None) for d in DL_DATA])
            values.append(
                '(' + ', '.join([f'"{v}"' if v else 'NULL' for v in vals]) + ')'
            )
        # Save to temporary table
        ins_qry = f'''
            INSERT INTO dl_data
                (url, nb_visits, nb_hits)
            VALUES
                {', '.join(values)}
            '''
        mst_conn.execute(ins_qry)
        # Downloads reported by Matomo can contain links that were not
        # discovered while scraping. Register these new download links in the
        # mst_links table of the scrapes database and its shadow in the
        # metrics database.
        mst_conn.execute('''
            INSERT INTO mst_links (url)
            SELECT url
            FROM dl_data
            LEFT JOIN mst_links USING (url)
            WHERE link_id ISNULL
            ''')
        mst_conn.execute('''
            INSERT INTO shd_links
            SELECT link_id, url
            FROM dl_data
            LEFT JOIN mst_links USING (url)
            WHERE link_id NOT IN (SELECT link_id FROM shd_links)        
            ''')
        # Get or set the date_id, delete any previous data and store the new set
        qry_result = mst_conn.execute(
            f'SELECT date_id FROM dates WHERE date = "{dd}"').fetchone()
        if qry_result:
            date_id = qry_result[0]
            mst_conn.execute(
                f'DELETE FROM downloads WHERE date_id = {date_id}')
        else:
            # Seems to be a new date to register
            date_id = mst_conn.execute(
                'INSERT INTO dates (date) VALUES (?)',
                [dd.isoformat()]).lastrowid
        mst_conn.execute(f'''
            INSERT INTO downloads
            SELECT {date_id}, link_id, nb_visits, nb_hits
            FROM dl_data
            LEFT JOIN mst_links USING (url)
            ''')
        # Purge the temporary table
        mst_conn.execute('DELETE FROM dl_data')

        logger.info(f'download data stored for {dd}')

    logger.info(f'retrieving download data finished')


def urls_last_days(num_days: int = 8) -> set[str]:
    """
    **Return all url's that were recently requested.**

    Arguments:

        num_days: length of the period, including today, for which the url's
            are returned

    Returns:

        requested (fully qualified) url's
    """

    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json \
        | ma.period.range | ma.date.last(num_days) | ma.flat(True) \
        | ma.filter_limit('all')
    json_resp = mapi_request(api.Actions().getPageUrls, pars).json()
    return {WWW_SITE + p['label'].split('?')[0] for p in json_resp}
#   METRICS = ['nb_visits', 'nb_hits', 'entry_nb_visits', 'entry_bounce_count', 'exit_nb_visits']

Matomo metrics that will be requested for all pages.

#   VISIT_SEGMENTS = {'organic_entries': ['entry_nb_visits', (60,)], 'call_visits': ['nb_visits', (722,)]}

Dictionary constant guiding the retrieval of the segmented metrics. The key refers to the fieldname in the daily table of the metrics database. The list value contains the (Matomo) name of the metric and a tuple with segment id's that will be used to retrieve the highest metric value. Segments used are: Id.60: Bron - zoekmachine; Id.722: Bellen pagina bezocht.

#   CUST_REP_IDS = [133]

Id's of the Matomo custom reports used for retrieving user feedback: Id.133: ja/nee and textual feedback all content (data since 28-11-2021)

#   DL_DATA = ['url', 'nb_visits', 'nb_hits']

Matomo data that will be requested for all downloads.

#   def update_analytics() -> bool:
View Source
def update_analytics() -> bool:
    """
    **Update the analytics data.**

    Returns:

        success of the execution (only False if Matomo is not available)

    Convenience bundling of functions with which the analytics data in the
    metrics database is updated.

    Basically this function executes the `period_metrics`, `period_feedback`
    and `period_downloads` functions consecutively with default parameters.
    This will result in completing the data from the first day for which no
    data has been stored yet, until and including yesterday.

    Log messages will be written to the configured log file (see the [MATOMO]
    section of `bd_www.Config`).

    ***Implementation note:***
    *Since the predictability when Matomo has finished processing previously
    gathered data is rather low, the `period_metrics` function will wait
    until yesterday's data has been processed. This approach can add quite
    some time to gather all analytics data. Typically total elapsed time
    accounts for some hours, but exceptionally it can raise to more than
    twelve. To avoid a clash with a next cycle of getting analytics data,
    a flag is tested upon, raised if not present and exits unsuccessfully
    otherwise.*
    """

    # Setup logging
    global logger
    logger = logging.getLogger('matomo')
    logger.setLevel(logging.INFO)
    log_file = str(mst_dir / matomo_conf.log_name)
    fh = logging.FileHandler(log_file)
    fh.setLevel(logging.INFO)
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)

    if matomo_available():
        # Check if Matomo is being requested already
        flag = mst_dir / 'METRICS_REQUEST_IN_PROGRESS.flag'
        if flag.exists():
            ct = dt.datetime.fromtimestamp(flag.stat().st_ctime)
            logger.warning(f'Another metrics request running since '
                           f'{ct.strftime("%a %d %b %Y / %H:%M")}')
            return False
        else:
            flag.touch()
        period_metrics()
        period_feedback()
        period_downloads()
        flag.unlink(missing_ok=True)
        return True
    else:
        logger.warning(
            'Matomo not available: metrics database not updated')
        return False

Update the analytics data.

Returns:

success of the execution (only False if Matomo is not available)

Convenience bundling of functions with which the analytics data in the metrics database is updated.

Basically this function executes the period_metrics, period_feedback and period_downloads functions consecutively with default parameters. This will result in completing the data from the first day for which no data has been stored yet, until and including yesterday.

Log messages will be written to the configured log file (see the [MATOMO] section of bd_www.Config).

Implementation note: Since the predictability when Matomo has finished processing previously gathered data is rather low, the period_metrics function will wait until yesterday's data has been processed. This approach can add quite some time to gather all analytics data. Typically total elapsed time accounts for some hours, but exceptionally it can raise to more than twelve. To avoid a clash with a next cycle of getting analytics data, a flag is tested upon, raised if not present and exits unsuccessfully otherwise.

#   def matomo_available() -> bool:
View Source
def matomo_available() -> bool:
    """
    **Tests whether the Matomo API is available.**

    Returns:

        Availability.
    """

    # noinspection PyBroadException
    try:
        api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
        respons = mapi_request(api.API().getMatomoVersion, ma.format.json)
        if respons.status_code == 200 and 'maintenance' not in respons.text:
            ok = True
        else:
            ok = False
    except Exception:
        ok = False
    finally:
        del api
    return ok

Tests whether the Matomo API is available.

Returns:

Availability.
#   def period_metrics( last_date: datetime.date = datetime.date(2023, 2, 1), first_date: datetime.date = None, replace: bool = False ) -> None:
View Source
def period_metrics(last_date: date = date.today() - timedelta(days=1),
                   first_date: date = None,
                   replace: bool = False) -> None:
    """
    **Store metrics for all pages alive during a period.**

    Arguments:

        last_date: final date of storing period
        first_date: starting date of storing period
        replace: replace previous metrics

    Page metrics will be requested and stored in the metrics database for all
    living pages on each day in the period bounded by `first_date` and
    `last_date` inclusive. Data that is already available for a specific day
    of the period, will be checked on completeness for all living pages and
    complemented where needed.

    The `last-date` parameter defaults to yesterday, which is also the
    maximum allowed value. If it exceeds this date, it will be reset to
    yesterday.

    When requesting metrics for yesterday, a wait/test cycle is used to
    guarantee the availability of these metrics before requesting and using
    them. This will be done for a maximum time of 20 hours. When the
    necessary set of metrics is still not available this will be logged as
    warning and no metrics will be stored for that day.

    If `first_date` equals to `None` (default) the period will start at the
    first day for which no metrics have been stored yet. If no metrics have
    been stored at all, `first_date` will be set to one week before the date
    of the first scrape.

    When metrics have been stored after the second last scrape (assuming that
    the last scrape has just been run when this function is used), new pages
    might have appeared afterwards for which no metrics are stored yet. In
    this situation the `first_date` is adapted to the date of this second
    last scrape.

    When `replace` is `True`, any previously stored metrics for the period
    will be purged before storing new data.

    ***Implementation note:***
    *The metrics database uses `page_id`'s from the scrapes database to store
    the metrics data. To guard against relational errors between these two
    databases, a shadow of the* mst_paths *table of the scrapes database is
    maintained as* shd_paths *table in the metrics database. This shadowed
    table contains all paths that are actually used for storing the metrics
    data. Each time this function is called, this inter-db relation is
    verified to be in sync. Failing this condition is logged as critical and
    no metrics will be added.*
    """

    global logger
    if logger.name == 'root':
        # Logging not set up yet, then only log to console
        logger = logging.getLogger('per_metrics')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Check if the paths table of the metrics database is a subset of the
    # scrapes database.
    if mst_conn.execute('''
            SELECT page_id
            FROM shd_paths AS shadow
            LEFT JOIN mst_paths AS original USING (page_id)
            WHERE shadow.path != original.path
            ''').fetchall():
        logger.critical('paths in the metrics and scrapes databases are not '
                        'in sync: no metrics were added')
        return

    # Check and set the first and last date for collecting metrics
    yesterday = date.today() - timedelta(days=1)
    last_date = min(last_date, yesterday)
    if not first_date:
        qry = 'SELECT max(date) FROM daily LEFT JOIN dates USING (date_id)'
        qry_result = mst_conn.execute(qry).fetchone()
        if qry_result[0]:
            first_date = date.fromisoformat(qry_result[0]) + timedelta(days=1)
        else:
            qry_result = mst_conn.execute(
                'SELECT min(date) FROM mst_scrapes').fetchone()
            first_scr_date = date.fromisoformat(qry_result[0])
            first_date = first_scr_date - timedelta(days=7)

    # Metrics saved for dates after the scrape previous to the last should be
    # checked on completeness.
    qry = '''
        SELECT max(date)
        FROM mst_scrapes
        WHERE date < (SELECT max(date) FROM mst_scrapes)'''
    prev_last_scr_date = mst_conn.execute(qry).fetchone()[0]
    if prev_last_scr_date:
        first_date = min(date.fromisoformat(prev_last_scr_date), first_date)

    # Get metrics for each day in the date range
    logger.info(f'retrieving metrics started')
    num_days = (last_date - first_date).days + 1
    for md in [first_date + timedelta(days=d) for d in range(num_days)]:

        if md == yesterday:
            ok = wait_for_metrics_yesterday(max_hours=20)
            if not ok:
                logger.warning('yesterdays metrics not fully available')
                return

        # Create view with pages that were alive at the first scrape after the
        # metrics date or died since the previous scrape.
        mst_conn.execute('DROP VIEW IF EXISTS pages_at_date')
        mst_conn.execute(f'''
            CREATE TEMPORARY VIEW pages_at_date AS
            WITH
                -- next timestamp after date
                next (ts) AS (
                    SELECT ifnull(min(timestamp),
                                  (SELECT max(timestamp) FROM mst_scrapes))
                    FROM mst_scrapes
                    WHERE date > '{md}'
                ),
                pages_life_value_at_date AS (
                    SELECT DISTINCT
                        page_id,
                        last_value(alive) OVER win AS alive
                    FROM his_pages_life
                    WHERE timestamp <= (SELECT ts FROM next)
                    WINDOW win AS (
                        PARTITION BY page_id
                        ORDER BY timestamp
                        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
                    )
                )
            -- living pages at next.ts plus the ones that died after the 
            -- previous scrape (since any could still be alive for this date)
            SELECT page_id
            FROM pages_life_value_at_date
            WHERE alive
                UNION
            SELECT page_id
            FROM his_pages_life
            WHERE timestamp = (SELECT ts FROM next) AND NOT alive
            ''')

        # Copy any needed page_id/path combination from the scrapes database
        # to the shadowed paths table of the metrics database.
        mst_conn.execute('''
            WITH
                missing_pages AS (
                    SELECT page_id
                    FROM pages_at_date
                    LEFT JOIN shd_paths USING (page_id)
                    WHERE path ISNULL
                )
            INSERT INTO shd_paths (page_id, path)
            SELECT page_id, path
            FROM mst_paths
            WHERE page_id IN missing_pages
            ''')

        # Get the set of pages for which data is needed
        if replace:
            # Delete all previous data for the date
            mst_conn.execute(f'''
                DELETE FROM daily
                WHERE date_id = (
                    SELECT date_id
                    FROM dates
                    WHERE date = '{md}'
                )
                ''')
            # Get all pages that were alive at the date
            qry = '''
                SELECT page_id, path
                FROM pages_at_date
                LEFT JOIN mst_paths USING (page_id)
                '''
        else:
            # Get all pages that were alive at the date and had no metrics
            # stored yet for that date.
            qry = f'''
            WITH
                -- Metrics stored for the selected date
                stored_metrics AS (
                    SELECT page_id
                    FROM daily
                    LEFT JOIN dates USING (date_id)
                    WHERE date = '{md}'
                )
            SELECT page_id, path
            FROM pages_at_date
            LEFT JOIN mst_paths USING (page_id)
            LEFT JOIN stored_metrics USING (page_id)
            WHERE stored_metrics.page_id IS NULL
            '''
        pages: list[tuple[int, str]] = mst_conn.execute(qry).fetchall()
        if not pages:
            logger.info(f'no (additional) metrics needed for {md}')
            continue

        # Get or set the date_id to store the dataset with
        qry_result = mst_conn.execute(
            f'SELECT date_id FROM dates WHERE date = "{md}"').fetchone()
        if qry_result:
            date_id = qry_result[0]
        else:
            # Seems to be a new date to register
            date_id = mst_conn.execute(
                'INSERT INTO dates (date) VALUES (?)',
                [md.isoformat()]).lastrowid

        # Get and save basic metrics and segmented metrics
        fields = METRICS + list(VISIT_SEGMENTS.keys())
        metrics = day_metrics(pages, md)
        # Reduce the set of pages to only those that got metrics data. The
        # logic behind this reduction is that if a page did not get any
        # metrics, there will be no segmented metrics for the page either.
        pages = [(page_id, path) for (page_id, path)
                 in pages if page_id in metrics]
        if pages:
            seg_metrics = day_segmented_metrics(pages, md)
            values = {page_id: metrics[page_id] + seg_metrics[page_id]
                      for page_id, _ in pages}
            # Implementation note:
            # To insert the metrics into the database a single, rather long
            # sql statement is constructed to insert all rows in one go. That
            # proved to be way faster than using a Python loop to insert them
            # one row at a time.
            save_qry = f'''
                INSERT INTO daily
                    (date_id, page_id, {", ".join(fields)})
                VALUES
                    {','.join([
                        f'({date_id}, {page_id}, ' + 
                        ', '.join(
                            [str(val) if val else 'NULL' for val in vals]
                        ) + ')'
                        for page_id, vals in values.items()
                        ])
                    }'''
            mst_conn.execute(save_qry)
        msg = f'metrics for date {md} saved for {len(pages)} pages'
        logger.info(msg)
        print(msg)

    return

Store metrics for all pages alive during a period.

Arguments:

last_date: final date of storing period
first_date: starting date of storing period
replace: replace previous metrics

Page metrics will be requested and stored in the metrics database for all living pages on each day in the period bounded by first_date and last_date inclusive. Data that is already available for a specific day of the period, will be checked on completeness for all living pages and complemented where needed.

The last-date parameter defaults to yesterday, which is also the maximum allowed value. If it exceeds this date, it will be reset to yesterday.

When requesting metrics for yesterday, a wait/test cycle is used to guarantee the availability of these metrics before requesting and using them. This will be done for a maximum time of 20 hours. When the necessary set of metrics is still not available this will be logged as warning and no metrics will be stored for that day.

If first_date equals to None (default) the period will start at the first day for which no metrics have been stored yet. If no metrics have been stored at all, first_date will be set to one week before the date of the first scrape.

When metrics have been stored after the second last scrape (assuming that the last scrape has just been run when this function is used), new pages might have appeared afterwards for which no metrics are stored yet. In this situation the first_date is adapted to the date of this second last scrape.

When replace is True, any previously stored metrics for the period will be purged before storing new data.

Implementation note: The metrics database uses page_id's from the scrapes database to store the metrics data. To guard against relational errors between these two databases, a shadow of the mst_paths table of the scrapes database is maintained as shd_paths table in the metrics database. This shadowed table contains all paths that are actually used for storing the metrics data. Each time this function is called, this inter-db relation is verified to be in sync. Failing this condition is logged as critical and no metrics will be added.

#   def wait_for_metrics_yesterday(interval_minutes: float = 5, max_hours: float = 12) -> bool:
View Source
def wait_for_metrics_yesterday(
        interval_minutes: float = 5, max_hours: float = 12) -> bool:
    """
    **Wait until yesterday's metrics are available.**

    Arguments:

        interval_minutes: poll interval for successive tests
        max_hours: maximum time to wait for available metrics

    Returns:

        Success status

    The test/wait approach is necessary because of the unreliable nature of
    the Matomo operations. The function will repeatedly (with an interval
    determined by the `interval_minutes` parameter) request metrics with the
    next segments applied:

    - No segment
    - 'Bron - zoekmachine'
    - 'KPI bellen paginas'
    - 'Bellen pagina bezocht'

    After requesting metrics for each segment returns a reasonable amount of
    records (100), the cyclic requesting continues with an interval of 60
    seconds until the last three responses for each segment are the same.
    After that the function will return a positive status.

    When `max_hours` is exceeded without reaching success, the function will
    return a negative status.
    """
    # Initialisations
    segments = {}
    for name in (
            'None',
            'Bron - zoekmachine',
            'KPI bellen paginas',
            'Bellen pagina bezocht'):
        segments[name] = {'definition': '',
                          'records': [0]}
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    yesterday = date.today() - timedelta(days=1)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json \
        | ma.flat() | ma.filter_limit(-1) | ma.period.day \
        | ma.date.YMD(yesterday.isoformat())
    for seg in mapi_request(api.SegmentEditor().getAll, pars).json():
        if seg['name'] in segments:
            segments[seg['name']]['definition'] = seg['definition']
    start = time.time()

    # Initialise test cycle
    cycle = 0
    threshold_records = 100
    threshold = False
    stable = False

    # Test cycle: get metrics until all are above the threshold and stable
    while not stable:
        cycle += 1

        # Query metrics totals and add number of response records to segments
        # library.
        for seg_name in segments:
            mresp = mapi_request(
                api.Actions().getPageUrls, pars
                | {'segment': segments[seg_name]['definition']})
            if 'result' in mresp.json():
                records_in_response = 0
            else:
                records_in_response = len(mresp.json())
            segments[seg_name]['records'].append(records_in_response)

        # Print progress
        leader = time.strftime('%H:%M') + ' - records: '
        numbers = ' / '.join(
            [str(segments[n]['records'][-1]) for n in segments])
        print(leader + numbers)

        # Test if threshold reached for all metrics
        if not threshold:
            threshold = True
            for seg_name in segments:
                threshold = threshold and (
                        segments[seg_name]['records'][-1] > threshold_records)
            if threshold:
                logger.info('testing metrics: threshold reached')

        # Check if last metrics are constant
        if cycle >= 3 and threshold:
            stable = True
            for seg_name in segments:
                stable = stable and (
                        len(set(segments[seg_name]['records'][-3:])) == 1)
            if stable:
                logger.info('testing metrics: stable results')

        # Wait for next cycle if needed
        if not stable:
            if threshold:
                wait_secs = 60
            else:
                wait_secs = \
                    (start + cycle * interval_minutes * 60) - time.time()
            if wait_secs > 0:
                time.sleep(wait_secs)

        # Test on max waiting time
        if (time.time() - start) / 3600 > max_hours:
            break

    return stable

Wait until yesterday's metrics are available.

Arguments:

interval_minutes: poll interval for successive tests
max_hours: maximum time to wait for available metrics

Returns:

Success status

The test/wait approach is necessary because of the unreliable nature of the Matomo operations. The function will repeatedly (with an interval determined by the interval_minutes parameter) request metrics with the next segments applied:

  • No segment
  • 'Bron - zoekmachine'
  • 'KPI bellen paginas'
  • 'Bellen pagina bezocht'

After requesting metrics for each segment returns a reasonable amount of records (100), the cyclic requesting continues with an interval of 60 seconds until the last three responses for each segment are the same. After that the function will return a positive status.

When max_hours is exceeded without reaching success, the function will return a negative status.

#   def mapi_request( request: Callable[[matomo_api.parameter_specifications.QryDict], requests.models.Response], qry_pars: matomo_api.parameter_specifications.QryDict, max_retry: int = 4 ) -> Optional[requests.models.Response]:
View Source
def mapi_request(
        request: Callable[[ma.module_methods.QryDict],
                          requests.Response],
        qry_pars: ma.module_methods.QryDict,
        max_retry: int = 4) -> Union[requests.Response, None]:
    """**Fail proof requester for the Matomo API.**

    Arguments:

        request: ModActions method from the matomo_api module
        qry_pars: parameters to complete the request
        max_retry: number of times to retry a failing request

    Returns:

        request response or None if not successful

    Sometimes Matomo will refuse an API-request for unknown reasons.
    Experience teaches that repeating such a request is often enough to
    circumvent this behaviour. This function wraps such an API-request,
    so that it is repeated on failure.

    Upon failure, the first retry of the request is issued after one second.
    Each subsequent retry that is needed will add one second to this interval.
    """

    attempt_interval = 1
    sys.tracebacklimit = 0  # To prohibit a lot of console rubbish
    for attempt in range(max_retry + 1):
        # noinspection PyBroadException
        try:
            req_resp = request(qry_pars)
            return req_resp
        except Exception:
            logger.exception(f'retrying after {attempt_interval} seconds')
            time.sleep(attempt_interval)
            attempt_interval += 1
            continue
    logger.critical(f'Matomo server did not return a successful response '
                    f'after {max_retry} retries')
    sys.tracebacklimit = 1000
    return None

Fail proof requester for the Matomo API.

Arguments:

request: ModActions method from the matomo_api module
qry_pars: parameters to complete the request
max_retry: number of times to retry a failing request

Returns:

request response or None if not successful

Sometimes Matomo will refuse an API-request for unknown reasons. Experience teaches that repeating such a request is often enough to circumvent this behaviour. This function wraps such an API-request, so that it is repeated on failure.

Upon failure, the first retry of the request is issued after one second. Each subsequent retry that is needed will add one second to this interval.

#   def day_metrics(pages: list[tuple[int, str]], d: datetime.date) -> dict[int, tuple]:
View Source
def day_metrics(pages: list[tuple[int, str]], d: date) -> dict[int, tuple]:
    """
    **Retrieve day metrics for requested pages.**

    Arguments:

        pages: list of (page_id, path) tuples for which the metrics are
            requested
        d: date for which the metrics are requested

    Returns:

        dictionary with page_id as key and tuples with metrics as values

    The tuples in the returned dictionary contain the metrics per page
    corresponding with the names in the `METRICS` constant list. Visits with
    a value of zero are returned as `None`.
    """

    metrics: dict[int, tuple] = {}
    "metrics[page_id] = (metric values)"

    # Get metrics with bulk request
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json \
        | ma.flat() | ma.filter_limit(-1) \
        | ma.period.day | ma.date.YMD(d.isoformat())
    resp = mapi_request(api.Actions().getPageUrls, pars)
    if not resp:
        logger.error(f'requesting basic metrics failed for date {d}')
        return {}
    bulk_resp = {row['label']: row for row in resp.json()}

    # Get metrics for each page from bulk response or request them individually
    # otherwise.
    none_result = tuple(None for _ in range(len(METRICS)))
    for i, (page_id, path) in enumerate(pages):
        if i % 100 == 0:
            print(f'getting basic metrics for date {d}: {i}/{len(pages)}')
        full_path = ROOT_PATH + path
        page_resp = bulk_resp.get(full_path)
        if not page_resp:
            resp = mapi_request(api.Actions().getPageUrl,
                                pars | ma.pageUrl(WWW_SITE + full_path))
            if resp:
                if resp.json():
                    page_resp = resp.json()[0]
                else:
                    continue
            else:
                logger.error(
                    f'requesting basic metrics with API Actions.getPageUrl '
                    f'for path {full_path} failed for date {d}')
                continue
        metrics_data = tuple([page_resp.get(m, None) for m in METRICS])
        if metrics_data == none_result:
            continue
        metrics[page_id] = metrics_data

    return metrics

Retrieve day metrics for requested pages.

Arguments:

pages: list of (page_id, path) tuples for which the metrics are
    requested
d: date for which the metrics are requested

Returns:

dictionary with page_id as key and tuples with metrics as values

The tuples in the returned dictionary contain the metrics per page corresponding with the names in the METRICS constant list. Visits with a value of zero are returned as None.

#   def day_segmented_metrics(pages: list[tuple[int, str]], d: datetime.date) -> dict[int, tuple]:
View Source
def day_segmented_metrics(
        pages: list[tuple[int, str]], d: date) -> dict[int, tuple]:
    """
    **Retrieve segmented metrics for specified pages.**

    Arguments:

        pages: list of (page_id, path) tuples for which the metrics are
            requested
        d: date for which the metrics are requested

    Returns:

        dictionary with page_id as key and tuples with segmented metrics as
            values

    The tuples in the returned dictionary contain the segmented metrics per
    page corresponding with the keys of the `VISIT_SEGMENTS` constant
    dictionary. Metrics with a value of zero are returned as `None`.
    """

    full_paths = [ROOT_PATH + path for page_id, path in pages]
    seg_metrics: dict[str, dict[int, int]] = {}
    "seg_metrics[field][page_id] = metrics"

    for field, (metric_name, seg_ids) in VISIT_SEGMENTS.items():

        # Initialise all metrics to zero
        metrics = {path: 0 for path in full_paths}
        "metrics[path] = metrics"

        for seg_id in seg_ids:

            # Get segment definition
            api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
            pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json
            resp = mapi_request(api.SegmentEditor().get,
                                pars | {'idSegment': seg_id})
            seg_def = resp.json()['definition']

            # Get segmented metric with bulk request
            pars |= ma.flat() | ma.filter_limit(-1) \
                | ma.period.day | ma.date.YMD(d.isoformat())
            resp = mapi_request(api.Actions().getPageUrls,
                                pars | {'segment': seg_def})
            if not resp:
                logger.error(
                    f'requesting {metric_name} with segment {seg_id} '
                    f'failed for date {d}')
                return {}
            if not resp.json():
                continue
            if 'result' in resp.json() and resp.json()['result'] == 'error':
                logger.error(
                    f'requesting {metric_name} with segment {seg_id} for '
                    f'date {d} returned: {resp.json()["message"]}')
                continue
            bulk_resp = {row['label']: int(row.get(metric_name, 0))
                         for row in resp.json() if row['label'] in full_paths}

            # Get metric for each page from bulk response (if it is greater
            # than what we got already with another segment) or request it
            # individually otherwise.
            for cnt, path in enumerate(full_paths):
                if cnt % 100 == 0:
                    print(f'getting {field} for date {d} '
                          f'with segment {seg_id}: {cnt}/{len(pages)}')
                if path in bulk_resp:
                    metrics[path] = max(metrics[path], bulk_resp[path])
                else:
                    resp = mapi_request(
                        api.Actions().getPageUrl, pars
                        | ma.pageUrl(WWW_SITE + path)
                        | {'segment': seg_def})
                    if not resp:
                        logger.error(
                            f'requesting {metric_name} with API Actions.'
                            f'getPageUrl for path {path} failed for date {d}')
                        continue
                    if resp.json():
                        metrics[path] = max(
                            metrics[path],
                            int(resp.json()[0].get(metric_name, 0)))

        # Convert key from (full) path to page_id and set zero values to None
        seg_metrics[field] = {page_id: None
                              if metrics[full_path := ROOT_PATH + path] == 0
                              else metrics[full_path]
                              for page_id, path in pages}

    # Combine field values per page before returning
    return {page_id: tuple(seg_metrics[field][page_id] for field in seg_metrics)
            for page_id, _ in pages}

Retrieve segmented metrics for specified pages.

Arguments:

pages: list of (page_id, path) tuples for which the metrics are
    requested
d: date for which the metrics are requested

Returns:

dictionary with page_id as key and tuples with segmented metrics as
    values

The tuples in the returned dictionary contain the segmented metrics per page corresponding with the keys of the VISIT_SEGMENTS constant dictionary. Metrics with a value of zero are returned as None.

#   def period_feedback( last_date: datetime.date = datetime.date(2023, 2, 1), first_date: datetime.date = None ) -> None:
View Source
def period_feedback(last_date: date = date.today() - timedelta(days=1),
                    first_date: date = None) -> None:
    r"""
    **Store all feedback given during a period.**

    Arguments:

        last_date: final date of storing period
        first_date: starting date of storing period

    For each day in the period bounded by `first_date` and `last_date`
    inclusive, all feedback that is registered by Matomo is requested and
    stored in the `feedback` table of the metrics database. While storing
    data for a specific date, any potentially available feedback data for
    this date will be purged.

    The `last-date` parameter defaults to yesterday, which is also the
    maximum allowed value. If it exceeds this date, it will be reset to
    yesterday.

    If `first_date` equals to `None` (default) the period will start at the
    first day for which no feedback has been stored yet. If no feedback has
    been stored at all, `first_date` will be set to one week before the date
    of the first scrape.

    Next data is stored for a specific date and each relevant page:

    - `pos_cnt`: total number of positive feedbacks
    - `neg_cnt`: total number of negative feedbacks
    - `pos_txt`: all positive textual feedback, separated by newlines
    - `neg_txt`: all negative textual feedback, separated by newlines

    ***Note 1:***
    *Requesting feedback from Matomo using the API Events.getAction method
    for each day suffers dramatically from data loss (most probable as a
    result of daily data aggregation). The custom reports that historically
    have been made to circumvent this problem are much more reliable,
    but suffer from historic operational hiccups. For this reason all
    available methods to retrieve the data are used. From these (competing)
    sets of information the highest numbers for positive and negative
    feedback are selected, as well as the longest lists for positive and
    negative textual feedback.*

    *Besides the Events.getAction API method, custom reports are
    used as set by the `CUST_REP_IDS` constant.*

    ***Note 2:***
    *Feedback tagging changed during 2021 as a consequence of the added
    possibility to enter textual feedback. The next examples represent the
    situation before and after the change.*

    *Label before:*

        feedback.nee - /nl/home/content/uitgelogd-mijn-belastingdienst[]
        |>|[bld-dv-content]feedback|feedback.nee|

    *Label after:*

        feedback.nee - /nl/home/content/uitgelogd-mijn-belastingdienst[]
        |>|[bld-dv-content]feedback
        |>|feedback.nee
        |>|example of a feedback text
    """

    def store_fb(fb: dict, p: str, ds: str, ea: str,
                 ec: int, ft: str) -> bool:
        """
        **Store the feedback that is retrieved from some datasource.**

        Arguments:

            fb: dictionary to contain all feedback data
            p: path for which feedback is given
            ds: datasource
            ea: event action
            ec: event count
            ft: feedback text

        Returns:

            True if ea is recognized as valid event action, False otherwise
        """
        # Initialise (sub)dictionary if not available yet
        if p not in fb:
            fb[p] = {ds: {'pos_cnt': 0, 'neg_cnt': 0,
                          'pos_txt': [], 'neg_txt': []}}
        elif ds not in fb[p]:
            fb[p][ds] = {'pos_cnt': 0, 'neg_cnt': 0,
                         'pos_txt': [], 'neg_txt': []}
        # Save the various values in the feedback dictionary
        if ea == 'feedback.ja':
            fb[p][ds]['pos_cnt'] += ec
            if ft:
                fb[p][ds]['pos_txt'].append(ft)
        elif evt_act == 'feedback.nee':
            fb[p][ds]['neg_cnt'] += ec
            if ft:
                fb[p][ds]['neg_txt'].append(ft)
        else:
            return False
        return True

    global logger
    if logger.name == 'root':
        # Logging not set up yet, then only log to console
        logger = logging.getLogger('per_feedback')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Check if the paths table of the metrics database is a subset of the
    # scrapes database.
    if mst_conn.execute('''
            SELECT page_id
            FROM shd_paths AS shadow
            LEFT JOIN mst_paths AS original USING (page_id)
            WHERE shadow.path != original.path
            ''').fetchall():
        logger.critical('paths in the metrics and scrapes databases are not '
                        'in sync: no feedback added')
        return

    # Check and set the first and last date for collecting feedback
    last_date = min(last_date, date.today() - timedelta(days=1))
    if not first_date:
        qry = 'SELECT max(date) FROM feedback LEFT JOIN dates USING (date_id)'
        qry_result = mst_conn.execute(qry).fetchone()
        if qry_result[0]:
            first_date = date.fromisoformat(qry_result[0]) + timedelta(days=1)
        else:
            qry_result = mst_conn.execute(
                'SELECT min(date) FROM mst_scrapes').fetchone()
            first_scr_date = date.fromisoformat(qry_result[0])
            first_date = first_scr_date - timedelta(days=7)

    # Get feedback for each day in the period
    logger.info(f'retrieving feedback started')
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.period.day \
        | ma.format.json | ma.flat(True) | ma.filter_limit(-1)
    num_days = (last_date - first_date).days + 1
    for fd in [first_date + timedelta(days=d) for d in range(num_days)]:

        # Set date to use timestamped views ('tsd_...') for specific scrape
        mst_conn.executescript(f'''
            DELETE FROM tsd;
            INSERT INTO tsd
            SELECT ifnull(min(timestamp),
                          (SELECT max(timestamp) FROM mst_scrapes))
            FROM mst_scrapes
            WHERE date > '{fd}'
            ''')

        # Initialise store to gather all relevant feedback data
        feedback = {}

        # Get feedback via filtered Events.getAction API method
        dated_pars = pars | ma.date.YMD(fd.isoformat())
        resp = mapi_request(api.Events().getAction,
                            dated_pars | ma.filter_pattern('^feedback.'))
        if not resp:
            logger.error(f'getting actions via '
                         f'Events.getAction API method failed for {fd}')
        else:
            for row in resp.json():
                # Stripping dot in next line to correct historic tagging
                # mistake.
                evt_act = row['Events_EventAction'].strip('.')
                evt_cnt = row['nb_visits']
                label_parts = row['label'].split('|>|')
                if len(label_parts) > 3:
                    fb_txt = re.sub(r'(\n)+', '/', label_parts[3])
                else:
                    fb_txt = ''
                evt_name = row['Events_EventName']
                path = evt_name.split('|>|')[0].strip('[]')
                path = path.split('#')[0].split('?')[0]
                path_ok = mst_conn.execute(f'''
                    SELECT *
                    FROM shd_paths
                    WHERE path = ?
                    ''', [path]).fetchone()
                if not path_ok:
                    qry_result = mst_conn.execute(f'''
                        SELECT path
                        FROM scr_redirs
                        LEFT JOIN mst_paths ON redir_id = page_id
                        WHERE redir_type = 'alias' AND req_url = ?
                        ''', [ROOT_URL + path]).fetchone()
                    if qry_result:
                        path = qry_result[0]
                        logger.debug(
                            f'getAction - alias path replaced by wcm path '
                            f'for feedback at {fd}: {row["label"]}')
                    else:
                        logger.debug(
                            f'getAction - abnormal path in '
                            f'feedback tag label at {fd}: {row["label"]}')
                        # Do not handle this one, so skip to next row
                        continue
                store_success = store_fb(
                    feedback, path, 'api.ga', evt_act, evt_cnt, fb_txt)
                if not store_success:
                    logger.debug(f'strange action from api.getAction '
                                 f'for {path} at {fd}: {evt_act}')

        # Get feedback data via relevant custom reports
        for cr_id in CUST_REP_IDS:
            resp = mapi_request(api.CustomReports().getCustomReport,
                                dated_pars | ma.idCustomReport(cr_id))
            if not resp:
                logger.error(
                    f'getting custom report {cr_id} failed for {fd}')
                continue
            for row in resp.json():
                try:
                    # Stripping dot in next line to correct historic tagging
                    # error.
                    evt_act = row['Events_EventAction'].strip('.')
                except KeyError:
                    # Not a valid row, probably with label 'Overige'
                    logger.debug(f'invalid row in custom report '
                                 f'{cr_id} for {fd}: {row}')
                    continue
                evt_cnt = row['nb_visits']
                label_parts = row['label'].split('|>|')
                if len(label_parts) > 3:
                    fb_txt = re.sub(r'(\n)+', '/', label_parts[3])
                else:
                    fb_txt = ''
                evt_url = row['Events_EventUrl']
                # Matomo evt_url's start with the domain and can also be
                # url's to be redirected, aliases or short url's. The latter
                # situation implicates that the domain can also be
                # 'toeslagen.nl' or 'douane.nl'.
                path = evt_url[evt_url.find('/'):]
                if path.startswith(ROOT_PATH):
                    path = path.replace(ROOT_PATH, '')
                evt_url = 'https://www.' + evt_url
                wcm_path_result = mst_conn.execute(f'''
                    SELECT path
                    FROM tsd_redirs_to_wcm
                    Left JOIN mst_paths ON wcm_id = page_id
                    WHERE req_url = ?
                    ''', [evt_url]).fetchone()
                if wcm_path_result:
                    # evt_url is an alias, short_url or url to be redirected
                    path = wcm_path_result[0]
                    logger.debug(
                        f'cr{cr_id} - EventUrl replaced by wcm path '
                        f'for feedback at {fd}: {row["label"]}')
                else:
                    # Check if it is a wcm url
                    path_ok = mst_conn.execute(f'''
                        SELECT page_id
                        FROM shd_paths
                        WHERE path = ?''', [path]).fetchone()
                    if not path_ok:
                        logger.debug(
                            f'cr{cr_id} - strange url in '
                            f'feedback tag at {fd}: {row["label"]}')
                        # Do not handle this one, so skip to next row
                        continue
                store_success = store_fb(
                    feedback, path, f'cr{cr_id}', evt_act, evt_cnt, fb_txt)
                if not store_success:
                    logger.debug(f'strange action from cr{cr_id} '
                                 f'for {path} at {fd}: {evt_act}')

        # Get or set the date_id to store the dataset with and delete any
        # previous feedback data.
        qry_result = mst_conn.execute(
            f'SELECT date_id FROM dates WHERE date = "{fd}"').fetchone()
        if qry_result:
            date_id = qry_result[0]
            mst_conn.execute(
                f'DELETE FROM feedback WHERE date_id = {date_id}')
        else:
            # Seems to be a new date to register
            date_id = mst_conn.execute(
                'INSERT INTO dates (date) VALUES (?)',
                [fd.isoformat()]).lastrowid

        # Select which of the available data from the various data sources
        # will be used for this date and store it in the metrics database.
        # The criterium to select the data is the largest count and most
        # textual feedback.
        for path in feedback:
            # Decide on the number op positive feedbacks
            pos_data = {feedback[path][data_src]['pos_cnt']
                        for data_src in feedback[path]}
            pos_cnt = max(pos_data)
            # Decide on the number op negative feedbacks
            neg_data = {feedback[path][data_src]['neg_cnt']
                        for data_src in feedback[path]}
            neg_cnt = max(neg_data)
            # Decide on the positive textual feedback
            pos_txt = [feedback[path][data_src]['pos_txt']
                       for data_src in feedback[path]]
            pos_txt = sorted(pos_txt, key=len, reverse=True)[0]
            # noinspection PyTypeChecker
            pos_txt = '\n'.join(sorted(pos_txt, key=str.lower))
            # Decide on the negative textual feedback
            neg_txt = [feedback[path][data_src]['neg_txt']
                       for data_src in feedback[path]]
            neg_txt = sorted(neg_txt, key=len, reverse=True)[0]
            # noinspection PyTypeChecker
            neg_txt = '\n'.join(sorted(neg_txt, key=str.lower))

            qry = f'''
                INSERT INTO feedback
                    (date_id, page_id, pos_cnt, neg_cnt, pos_txt, neg_txt)
                SELECT
                    {date_id}, page_id,
                    {pos_cnt}, {neg_cnt},
                    ?, ?
                FROM shd_paths
                WHERE path = '{path}' '''
            mst_conn.execute(qry, [pos_txt if pos_txt else None,
                                   neg_txt if neg_txt else None])

        logger.info(f'page feedback stored for {fd}')

    logger.info(f'retrieving feedback finished')

Store all feedback given during a period.

Arguments:

last_date: final date of storing period
first_date: starting date of storing period

For each day in the period bounded by first_date and last_date inclusive, all feedback that is registered by Matomo is requested and stored in the feedback table of the metrics database. While storing data for a specific date, any potentially available feedback data for this date will be purged.

The last-date parameter defaults to yesterday, which is also the maximum allowed value. If it exceeds this date, it will be reset to yesterday.

If first_date equals to None (default) the period will start at the first day for which no feedback has been stored yet. If no feedback has been stored at all, first_date will be set to one week before the date of the first scrape.

Next data is stored for a specific date and each relevant page:

  • pos_cnt: total number of positive feedbacks
  • neg_cnt: total number of negative feedbacks
  • pos_txt: all positive textual feedback, separated by newlines
  • neg_txt: all negative textual feedback, separated by newlines

Note 1: Requesting feedback from Matomo using the API Events.getAction method for each day suffers dramatically from data loss (most probable as a result of daily data aggregation). The custom reports that historically have been made to circumvent this problem are much more reliable, but suffer from historic operational hiccups. For this reason all available methods to retrieve the data are used. From these (competing) sets of information the highest numbers for positive and negative feedback are selected, as well as the longest lists for positive and negative textual feedback.

Besides the Events.getAction API method, custom reports are used as set by the CUST_REP_IDS constant.

Note 2: Feedback tagging changed during 2021 as a consequence of the added possibility to enter textual feedback. The next examples represent the situation before and after the change.

Label before:

feedback.nee - /nl/home/content/uitgelogd-mijn-belastingdienst[]
|>|[bld-dv-content]feedback|feedback.nee|

Label after:

feedback.nee - /nl/home/content/uitgelogd-mijn-belastingdienst[]
|>|[bld-dv-content]feedback
|>|feedback.nee
|>|example of a feedback text
#   def period_downloads( last_date: datetime.date = datetime.date(2023, 2, 1), first_date: datetime.date = None ) -> None:
View Source
def period_downloads(last_date: date = date.today() - timedelta(days=1),
                     first_date: date = None) -> None:
    """
    **Store number of downloads during a period.**

    Arguments:

        last_date: final date of storing period
        first_date: starting date of storing period

    For each day in the period bounded by `first_date` and `last_date`
    inclusive, download data is requested from Matomo and
    stored in the `downloads` table of the metrics database. While storing
    data for a specific date, any potentially available download data for
    this date will be purged.

    The `last-date` parameter defaults to yesterday, which is also the
    maximum allowed value. If it exceeds this date, it will be reset to
    yesterday.

    If `first_date` equals to `None` (default) the period will start at the
    first day for which no feedback has been stored yet. If no feedback has
    been stored at all, `first_date` will be set to one week before the date
    of the first scrape.

    Next data is stored for a specific date and each relevant download:

    - `nb_visits`: number of unique downloads (per visit)
    - `nb_hits`: number of downloads

    """

    global logger
    if logger.name == 'root':
        # Logging not set up yet, then only log to console
        logger = logging.getLogger('per_downloads')
        logger.setLevel(logging.DEBUG)
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        formatter = logging.Formatter(
            fmt='[%(asctime)s] %(levelname)-8s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')
        ch.setFormatter(formatter)
        logger.addHandler(ch)

    # Check if the links table of the metrics database is a subset of the
    # scrapes database.
    if mst_conn.execute('''
            SELECT link_id
            FROM shd_links AS shadow
            LEFT JOIN mst_links AS original USING (link_id)
            WHERE shadow.url != original.url
            ''').fetchall():
        logger.critical('links in the metrics and scrapes databases are not '
                        'in sync: no download data added')
        return

    # Check and set the first and last date
    last_date = min(last_date, date.today() - timedelta(days=1))
    if not first_date:
        qry = 'SELECT max(date) FROM downloads LEFT JOIN dates USING (date_id)'
        qry_result = mst_conn.execute(qry).fetchone()
        if qry_result[0]:
            first_date = date.fromisoformat(qry_result[0]) + timedelta(days=1)
        else:
            qry_result = mst_conn.execute(
                'SELECT min(date) FROM mst_scrapes').fetchone()
            first_scr_date = date.fromisoformat(qry_result[0])
            first_date = first_scr_date - timedelta(days=7)

    logger.info(f'retrieving download data started')

    # Create temporary table to receive the data for one day
    mst_conn.execute('''
        CREATE TEMPORARY TABLE IF NOT EXISTS dl_data (
            url			TEXT PRIMARY KEY NOT NULL,
            nb_visits	INTEGER,
            nb_hits		INTEGER
        )
        ''')

    # Prepare the Matomo API query
    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.period.day \
        | ma.format.json | ma.flat(True) | ma.filter_limit(-1) \
        | ma.filter_pattern_recursive('^' + urlparse(DL_SITE).netloc)

    # Cycle over each day within the period
    num_days = (last_date - first_date).days + 1
    for dd in [first_date + timedelta(days=d) for d in range(num_days)]:

        # Request data for this date
        resp = mapi_request(api.Actions().getDownloads,
                            pars | ma.date.YMD(dd.isoformat()))
        if not resp:
            logger.error(
                f'requesting download data with API Actions.getDownloads '
                f'failed for date {dd}')
            continue  # with next date
        if not resp.json():
            logger.warning(f'no download data for date {dd}')
            continue  # with next date
        # Select relevant data
        values = []
        for dl in resp.json():
            if 'url' not in dl:
                # Reject the aggregate row
                continue
            vals = tuple([dl.get(d, None) for d in DL_DATA])
            values.append(
                '(' + ', '.join([f'"{v}"' if v else 'NULL' for v in vals]) + ')'
            )
        # Save to temporary table
        ins_qry = f'''
            INSERT INTO dl_data
                (url, nb_visits, nb_hits)
            VALUES
                {', '.join(values)}
            '''
        mst_conn.execute(ins_qry)
        # Downloads reported by Matomo can contain links that were not
        # discovered while scraping. Register these new download links in the
        # mst_links table of the scrapes database and its shadow in the
        # metrics database.
        mst_conn.execute('''
            INSERT INTO mst_links (url)
            SELECT url
            FROM dl_data
            LEFT JOIN mst_links USING (url)
            WHERE link_id ISNULL
            ''')
        mst_conn.execute('''
            INSERT INTO shd_links
            SELECT link_id, url
            FROM dl_data
            LEFT JOIN mst_links USING (url)
            WHERE link_id NOT IN (SELECT link_id FROM shd_links)        
            ''')
        # Get or set the date_id, delete any previous data and store the new set
        qry_result = mst_conn.execute(
            f'SELECT date_id FROM dates WHERE date = "{dd}"').fetchone()
        if qry_result:
            date_id = qry_result[0]
            mst_conn.execute(
                f'DELETE FROM downloads WHERE date_id = {date_id}')
        else:
            # Seems to be a new date to register
            date_id = mst_conn.execute(
                'INSERT INTO dates (date) VALUES (?)',
                [dd.isoformat()]).lastrowid
        mst_conn.execute(f'''
            INSERT INTO downloads
            SELECT {date_id}, link_id, nb_visits, nb_hits
            FROM dl_data
            LEFT JOIN mst_links USING (url)
            ''')
        # Purge the temporary table
        mst_conn.execute('DELETE FROM dl_data')

        logger.info(f'download data stored for {dd}')

    logger.info(f'retrieving download data finished')

Store number of downloads during a period.

Arguments:

last_date: final date of storing period
first_date: starting date of storing period

For each day in the period bounded by first_date and last_date inclusive, download data is requested from Matomo and stored in the downloads table of the metrics database. While storing data for a specific date, any potentially available download data for this date will be purged.

The last-date parameter defaults to yesterday, which is also the maximum allowed value. If it exceeds this date, it will be reset to yesterday.

If first_date equals to None (default) the period will start at the first day for which no feedback has been stored yet. If no feedback has been stored at all, first_date will be set to one week before the date of the first scrape.

Next data is stored for a specific date and each relevant download:

  • nb_visits: number of unique downloads (per visit)
  • nb_hits: number of downloads
#   def urls_last_days(num_days: int = 8) -> set[str]:
View Source
def urls_last_days(num_days: int = 8) -> set[str]:
    """
    **Return all url's that were recently requested.**

    Arguments:

        num_days: length of the period, including today, for which the url's
            are returned

    Returns:

        requested (fully qualified) url's
    """

    api = ma.MatomoApi(matomo_conf.server, matomo_conf.token)
    pars = ma.idSite.one_or_more(matomo_conf.www_id) | ma.format.json \
        | ma.period.range | ma.date.last(num_days) | ma.flat(True) \
        | ma.filter_limit('all')
    json_resp = mapi_request(api.Actions().getPageUrls, pars).json()
    return {WWW_SITE + p['label'].split('?')[0] for p in json_resp}

Return all url's that were recently requested.

Arguments:

num_days: length of the period, including today, for which the url's
    are returned

Returns:

requested (fully qualified) url's