User Guide

This section of the documentation provides information about how to use aioprometheus.

Install

The current release of aioprometheus is available from PyPI. Use pip to install it.

$ pip install aioprometheus

Instrumenting

Prometheus provides four kinds of metrics that can be used to instrument your software:

  • Counter,
  • Gauge,
  • Summary and
  • Histogram.

More details on the metrics types can be found here.

Counter

A counter is a cumulative metric that represents a single numerical value that only ever goes up. A counter is typically used to count requests served, tasks completed, errors occurred, etc. Counters should not be used to expose current counts of items whose number can also go down, e.g. the number of currently running coroutines. Use gauges for this use case.

from aioprometheus import Counter

uploads_metric = Counter("file_uploads_total", "File total uploads.")
uploads_metric.inc({'type': "png"})

Gauge

A Gauge is a metric that represents a single numerical value that can arbitrarily go up and down.

from aioprometheus import Gauge

ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)
ram_metric.inc({'type': "virtual"})
ram_metric.dec({'type': "virtual"})
ram_metric.add({'type': "virtual"}, 5)
ram_metric.sub({'type': "virtual"}, -5)

Summary

A Summary captures individual observations from an event or sample stream and summarizes them in a manner similar to traditional summary statistics.

A summary metrics provides:

  1. sum of observations,
  2. observation count,
  3. rank estimations.
from aioprometheus import Summary

http_access =  Summary("http_access_time", "HTTP access time")
http_access.observe({'time': '/static'}, 3.142)

The default invariants ([(0.50, 0.05), (0.90, 0.01), (0.99, 0.001)]) can be overridden by passing invariants keyword argument to Summary.

from aioprometheus import Summary

http_access =  Summary(
    "http_access_time",
    "HTTP access time",
    invariants=[(0.50, 0.05), (0.99, 0.001)])

Histogram

A Histogram tracks the size and number of events in buckets.

You can use Histograms for aggregatable calculation of quantiles.

from aioprometheus import Histogram

http_access =  Histogram("http_access_time", "HTTP access time")
http_access.observe({'time': '/static'}, 3.142)

The default buckets cover the range 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0. All bucket ranges will include a +Inf bucket. The buckets can be overridden by passing buckets keyword argument to Histogram.

from aioprometheus import Histogram

http_access =  Histogram(
    "http_access_time",
    "HTTP access time",
    buckets=[0.1, 0.5, 1.0, 5.0])

Labels

All metrics have labels which allow the grouping of related time series.

See best practices and labels for more information of this topic.

To add a metric to a collector you first identify it with a label. In the following example a Gauge collector is created for tracking memory usage. Then a specific metric is created within the collector to track virtual memory usage:

ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)

A single collector is capable of store multiple metric instances. For example, the swap memory could also be monitored using this collector:

ram_metric.set({'type': "swap"}, 100.1)

Const labels

When you create a collector you can also add constant labels. These constant labels will be included with all the metrics gathered by that collector. This avoids needing to constantly add extra labels when updating the metric.

So this example without const labels

host = socket.gethostname()
ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual", 'host': host}, 100)
ram_metric.set({'type': "swap", 'host': host}, 100)

is the same as this one with const labels:

host = socket.gethostname()
ram_metric = Gauge(
    "memory_usage_bytes", "Memory usage in bytes.",
    const_labels={'host': host})
ram_metric.set({'type': "virtual"}, 100)
ram_metric.set({'type': "swap"}, 100)

Exporting Metrics

Metrics are exposed to the Prometheus server via a HTTP endpoint. The metrics can retrieved in two different formats; text and binary.

Simple Example

The example below shows a single Counter metric collector being created and exposed via a HTTP endpoint.

#!/usr/bin/env python
"""
This example demonstrates how aioprometheus can be used to expose metrics on
a HTTP endpoint that is provided by the aioprometheus.Service object.

.. code-block:: console

    (env) $ python simple-example.py
    Serving prometheus metrics on: http://127.0.0.1:5000/metrics

In another terminal fetch the metrics using the ``curl`` command line tool
to verify they can be retrieved by Prometheus server.
"""

import asyncio
import socket
from aioprometheus import Counter, Service


if __name__ == "__main__":

    async def main(svr: Service) -> None:

        events_counter = Counter(
            "events", "Number of events.", const_labels={"host": socket.gethostname()}
        )
        svr.register(events_counter)
        await svr.start(addr="127.0.0.1", port=5000)
        print(f"Serving prometheus metrics on: {svr.metrics_url}")

        # Now start another coroutine to periodically update a metric to
        # simulate the application making some progress.
        async def updater(c: Counter):
            while True:
                c.inc({"kind": "timer_expiry"})
                await asyncio.sleep(1.0)

        await updater(events_counter)

    loop = asyncio.get_event_loop()
    svr = Service()
    try:
        loop.run_until_complete(main(svr))
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.close()

In this simple example the counter metric is tracking the number of while loop iterations executed by the updater coroutine. In a realistic application a metric might track the number of requests, etc.

Following typical asyncio usage, an event loop is instantiated first then a metrics service is instantiated. The metrics service is responsible for managing metric collectors and responding to metrics requests.

The service accepts various arguments such as the interface and port to bind to. A collector registry is used within the service to hold metrics collectors that will be exposed by the service. The service will create a new collector registry if one is not passed in.

A counter metric is created and registered with the service. The service is started and then a coroutine is started to periodically update the metric to simulate progress.

The example script can be run using:

(venv) $ cd examples
(venv) $ python simple-example.py
Serving prometheus metrics on: http://127.0.0.1:5000/metrics

In another terminal fetch the metrics using the curl command line tool to verify they can be retrieved by Prometheus server.

By default metrics will be returned in plan text format.

$ curl http://127.0.0.1:5000/metrics
# HELP events Number of events.
# TYPE events counter
events{host="alpha",kind="timer_expiry"} 33

$ curl http://127.0.0.1:5000/metrics -H 'Accept: text/plain; version=0.0.4'
# HELP events Number of events.
# TYPE events counter
events{host="alpha",kind="timer_expiry"} 36

Similarly, you can request metrics in binary format, though this will be hard to read on the command line.

$ curl http://127.0.0.1:5000/metrics -H "ACCEPT: application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited"

The metrics service also responds to requests sent to its / route. The response is simple HTML. This route can be useful as a Kubernetes /healthz style health indicator as it does not incur any overhead within the service to serialize a full metrics response.

$ curl http://127.0.0.1:5000/
<html><body><a href='/metrics'>metrics</a></body></html>

Application Example

A more representative example is shown below. It implements an application class that uses the aioprometheus package to collect application metrics and expose them on a service endpoint.

#!/usr/bin/env python
"""
This more complicated example implements an application that exposes
application metrics obtained from the psutil package.

This example requires the ``psutil`` package which can be installed
using ``pip install psutil``.
"""

import asyncio
import logging
import psutil
import random
import socket
import uuid

from aioprometheus import Counter, Gauge, Histogram, Service, Summary, formats

from asyncio.base_events import BaseEventLoop


class ExampleApp(object):
    """
    An example application that demonstrates how ``aioprometheus`` can be
    integrated and used within a Python application built upon asyncio.

    This application attempts to simulate a long running distributed system
    process, say a socket relay or some kind of message adapter. It is
    intentionally not hosting an existing web service in the application.

    In this case the aioprometheus.Service object is used to provide a
    new HTTP endpoint that can be used to expose Prometheus metrics on.

    If this application was a web service (i.e. already had an existing web
    interface) then the aioprometheus.Service object could be used as before
    to add another web interface or a different approach could be used that
    provides a metrics handler function for use with the existing web service.
    """

    def __init__(
        self,
        metrics_host="127.0.0.1",
        metrics_port: int = 5000,
        loop: BaseEventLoop = None,
    ):

        self.metrics_host = metrics_host
        self.metrics_port = metrics_port
        self.loop = loop or asyncio.get_event_loop()
        self.timer = None  # type: asyncio.Handle

        ######################################################################
        # Create application metrics and metrics service

        # Create a metrics server. The server will create a metrics collector
        # registry if one is not specifically created and passed in.
        self.msvr = Service()

        # Define some constant labels that need to be added to all metrics
        const_labels = {
            "host": socket.gethostname(),
            "app": f"{self.__class__.__name__}-{uuid.uuid4().hex}",
        }

        # Create metrics collectors

        # Create a counter metric to track requests
        self.requests_metric = Counter(
            "requests", "Number of requests.", const_labels=const_labels
        )

        # Collectors must be registered with the registry before they
        # get exposed.
        self.msvr.register(self.requests_metric)

        # Create a gauge metrics to track memory usage.
        self.ram_metric = Gauge(
            "memory_usage_bytes", "Memory usage in bytes.", const_labels=const_labels
        )
        self.msvr.register(self.ram_metric)

        # Create a gauge metrics to track CPU.
        self.cpu_metric = Gauge(
            "cpu_usage_percent", "CPU usage percent.", const_labels=const_labels
        )
        self.msvr.register(self.cpu_metric)

        self.payload_metric = Summary(
            "request_payload_size_bytes",
            "Request payload size in bytes.",
            const_labels=const_labels,
            invariants=[(0.50, 0.05), (0.99, 0.001)],
        )
        self.msvr.register(self.payload_metric)

        self.latency_metric = Histogram(
            "request_latency_seconds",
            "Request latency in seconds",
            const_labels=const_labels,
            buckets=[0.1, 0.5, 1.0, 5.0],
        )
        self.msvr.register(self.latency_metric)

    async def start(self):
        """ Start the application """
        await self.msvr.start(addr=self.metrics_host, port=self.metrics_port)
        logger.debug("Serving prometheus metrics on: %s", self.msvr.metrics_url)

        # Schedule a timer to update internal metrics. In a realistic
        # application metrics would be updated as needed. In this example
        # application a simple timer is used to emulate things happening,
        # which conveniently allows all metrics to be updated at once.
        self.timer = self.loop.call_later(1.0, self.on_timer_expiry)

    async def stop(self):
        """ Stop the application """
        await self.msvr.stop()
        if self.timer:
            self.timer.cancel()
        self.timer = None

    def on_timer_expiry(self):
        """ Update application to simulate work """

        # Update memory metrics
        self.ram_metric.set({"type": "virtual"}, psutil.virtual_memory().used)
        self.ram_metric.set({"type": "swap"}, psutil.swap_memory().used)

        # Update cpu metrics
        for c, p in enumerate(psutil.cpu_percent(interval=1, percpu=True)):
            self.cpu_metric.set({"core": c}, p)

        # Incrementing a requests counter to emulate webserver app
        self.requests_metric.inc({"path": "/"})

        # Monitor request payload data to emulate webserver app
        self.payload_metric.add({"path": "/data"}, random.random() * 2 ** 10)

        # Monitor request latency to emulate webserver app
        self.latency_metric.add({"path": "/data"}, random.random() * 5)

        # re-schedule another metrics update
        self.timer = self.loop.call_later(1.0, self.on_timer_expiry)


if __name__ == "__main__":

    logging.basicConfig(level=logging.DEBUG)
    # Silence asyncio and aiohttp loggers
    logging.getLogger("asyncio").setLevel(logging.ERROR)
    logging.getLogger("aiohttp").setLevel(logging.ERROR)
    logger = logging.getLogger(__name__)

    loop = asyncio.get_event_loop()

    app = ExampleApp()
    loop.run_until_complete(app.start())

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(app.stop())
    loop.close()

The example can be run using

(env) $ python app-example.py
Serving prometheus metrics on: http://127.0.0.1:5000/metrics

You can use the curl command line tool to fetch metrics manually or use the helper script described in the next section.

Frameworks Example

The aioprometheus package can also be used within other web framework based applications such as aiohttp, quart and vibora applications. This usage approach removes the need to create a separate server endpoint to handle metrics. The vibora example is shown below.

#!/usr/bin/env python
"""
Sometimes you want to expose Prometheus metrics from within an existing web
service and don't want to start a separate Prometheus metrics server.

This example uses the aioprometheus package to add Prometheus instrumentation
to a Vibora application. In this example a registry and a counter metric is
instantiated. A '/metrics' route is added to the application and the render
function from aioprometheus is called to format the metrics into the
appropriate format.
"""

from aioprometheus import render, Counter, Registry
from vibora import Vibora, Request, Response


app = Vibora(__name__)
app.registry = Registry()
app.events_counter = Counter("events", "Number of events.")
app.registry.register(app.events_counter)


@app.route("/")
async def hello(request: Request):
    app.events_counter.inc({"path": "/"})
    return Response(b"hello")


@app.route("/metrics")
async def handle_metrics(request: Request):
    """
    Negotiate a response format by inspecting the ACCEPTS headers and selecting
    the most efficient format. Render metrics in the registry into the chosen
    format and return a response.
    """
    content, http_headers = render(app.registry, [request.headers.get("accept")])
    return Response(content, headers=http_headers)


app.run()

Checking examples using helper script

There is a script in the examples directory that emulates Prometheus server scraping a metrics service endpoint. You can specify a particular format to use (e.g. text or binary). If no format is specified then it will randomly choose a format each time it requests metrics.

usage: metrics-fetcher.py [-h] [--url URL] [--format FORMAT]
                        [--interval INTERVAL] [--debug]

Metrics Fetcher

optional arguments:
-h, --help           show this help message and exit
--url URL            The metrics URL
--format FORMAT      Metrics response format (i.e. 'text' or 'binary'
--interval INTERVAL  The number of seconds between metrics requests
--debug              Show debug output

Example:

$ python metrics-fetcher.py --url=http://127.0.0.1:5000/metrics --format=text --interval=2.0

Checking Example using Prometheus

Once an example is running you can configure Prometheus to begin scraping it’s metrics by creating or updating the configuration file passed to Prometheus. Using the official Prometheus documentation we can create a minimal configuration file to scrape the example application.

global:
  scrape_interval:     15s # By default, scrape targets every 15 seconds.
  evaluation_interval: 15s # By default, scrape targets every 15 seconds.

scrape_configs:
  - job_name:       'test-app'

    # Override the global default and scrape targets from this job every
    # 5 seconds.
    scrape_interval: 5s
    scrape_timeout: 10s

    target_groups:
      - targets: ['localhost:5000']
        labels:
          group: 'dev'

We can then run Prometheus and configure it using the configuration file.

$ ./prometheus -config.file my-prom-config.yaml

Once Prometheus is running you can access at localhost:9090 and can observe the metrics from the example.

Decorators

A number of different decorators are provided to help simplify the process of instrumenting your code. As the aioprometheus library is targeting use in long running asyncio based applications, the decorators return a coroutine object. However, the wrapped function does not have to be a coroutine.

The example below demonstrates how the @timer decorator can be used to time how long it takes to run a function.

#!/usr/bin/env python
"""
Usage:

.. code-block:: python

    $ python decorator_timer.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_processing_seconds Time spent processing request
    # TYPE request_processing_seconds summary
    request_processing_seconds_count 77
    request_processing_seconds_sum 38.19072341918945
    request_processing_seconds{quantile="0.5"} 0.27150511741638184
    request_processing_seconds{quantile="0.9"} 0.5016570091247559
    request_processing_seconds{quantile="0.99"} 0.6077709197998047

"""

import asyncio
import random

from aioprometheus import Service, Summary, timer


# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary("request_processing_seconds", "Time spent processing request")


# Decorate function with metric.
@timer(REQUEST_TIME)
async def handle_request(duration):
    """ A dummy function that takes some time """
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        await handle_request(random.random())


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)
    svr.register(REQUEST_TIME)

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

The following example demonstrates how the @inprogress decorator can be used to track how many requests are in progress.

#!/usr/bin/env python
"""

.. code-block:: python

    $ python decorator_inprogress.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_in_progress Number of requests in progress
    # TYPE request_in_progress gauge
    request_in_progress{route="/"} 1

"""

import asyncio
import random

from aioprometheus import Service, Gauge, inprogress


# Create a metric to track requests currently in progress.
REQUESTS = Gauge("request_in_progress", "Number of requests in progress")


# Decorate function with metric.
@inprogress(REQUESTS, {"route": "/"})
async def handle_request(duration):
    """ A dummy function that takes some time """
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        await handle_request(random.random())


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)
    svr.register(REQUESTS)

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

The next example demonstrates how the @count_exceptions decorator can be used to track the number of exceptions that occur in a function block.

#!/usr/bin/env python
"""

.. code-block:: python

    $ python decorator_count_exceptions.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_handler_exceptions Number of exceptions in requests
    # TYPE request_handler_exceptions counter
    request_handler_exceptions{route="/"} 3

You may need to Ctrl+C twice to exit the example script.

"""

import asyncio
import random

from aioprometheus import Service, Counter, count_exceptions


# Create a metric to track requests currently in progress.
REQUESTS = Counter("request_handler_exceptions", "Number of exceptions in requests")


# Decorate function with metric.
@count_exceptions(REQUESTS, {"route": "/"})
async def handle_request(duration):
    """ A dummy function that occasionally raises an exception """
    if duration < 0.3:
        raise Exception("Ooops")
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        try:
            await handle_request(random.random())
        except Exception:
            pass  # keep handling


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)
    svr.register(REQUESTS)

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

Push Gateway

Another method of exposing metrics is to push them to a gateway that will get scraped by Prometheus.

Prometheus provides a push gateway intermediary that can be used to help monitor components that can not be scraped directly. They might be behind a firewall or might be too short lived. The push gateway allows you to push time series data to it which ensures that data is always exposed reliably via the pull model.

from aioprometheus import Counter, Pusher, Registry

p = Pusher("my-job", "http://127.0.0.1:61423", loop=self.loop)
registry = Registry()
c = Counter("total_requests", "Total requests.", {})
registry.register(c)

c.inc({'url': "/p/user"})

# Push to the pushgateway
resp = await p.replace(registry)