https://travis-ci.org/claws/aioprometheus.svg?branch=master https://img.shields.io/pypi/v/aioprometheus.svg

aioprometheus

aioprometheus is a Prometheus Python client library for asyncio-based applications.

It provides asyncio based applications with a metrics collection and serving capability for use with the Prometheus monitoring and alerting system.

It supports text and binary data formats as well as the ability to push metrics to a gateway.

The project source code can be found here.

User Guide

This section of the documentation provides information about how to use aioprometheus.

Install

The current realease of aioprometheus is available from PyPI. Use pip to install it.

$ pip install aioprometheus

Instrumenting

Prometheus provides four kinds of metrics that can be used to instrument your software:

  • Counter,
  • Gauge,
  • Summary and
  • Histogram.

More details on the metrics types can be found here.

Counter

A counter is a cumulative metric that represents a single numerical value that only ever goes up. A counter is typically used to count requests served, tasks completed, errors occurred, etc. Counters should not be used to expose current counts of items whose number can also go down, e.g. the number of currently running coroutines. Use gauges for this use case.

from aioprometheus import Counter

uploads_metric = Counter("file_uploads_total", "File total uploads.")
uploads_metric.inc({'type': "png"})

Gauge

A Gauge is a metric that represents a single numerical value that can arbitrarily go up and down.

from aioprometheus import Gauge

ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)
ram_metric.inc({'type': "virtual"})
ram_metric.dec({'type': "virtual"})
ram_metric.add({'type': "virtual"}, 5)
ram_metric.sub({'type': "virtual"}, -5)

Summary

A Summary captures individual observations from an event or sample stream and summarizes them in a manner similar to traditional summary statistics.

A summary metrics provides:

  1. sum of observations,
  2. observation count,
  3. rank estimations.
from aioprometheus import Summary

http_access =  Summary("http_access_time", "HTTP access time")
http_access.add({'time': '/static'}, 3.142)

Histogram

A Histogram tracks the size and number of events in buckets.

You can use Histograms for aggregatable calculation of quantiles. The set of buckets used can be overridden by passing buckets keyword argument to Histogram.

from aioprometheus import Histogram

http_access =  Histogram("http_access_time", "HTTP access time")
http_access.add({'time': '/static'}, 3.142)

Labels

All metrics have labels which allow the grouping of related time series.

See best practices and labels for more information of this topic.

To add a metric to a collector you first identify it with a label. In the following example a Gauge collector is created for tracking memory usage. Then a specific metric is created within the collector to track virtual memory usage:

ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)

A single collector is capable of store multiple metric instances. For example, the swap memory could also be monitored using this collector:

ram_metric.set({'type': "swap"}, 100.1)

Const labels

When you create a collector you can also add constant labels. These constant labels will be included with all the metrics gathered by that collector. This avoids needing to constantly add extra labels when updating the metric.

So this example without const labels

host = socket.gethostname()
ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual", 'host': host}, 100)
ram_metric.set({'type': "swap", 'host': host}, 100)

is the same as this one with const labels:

host = socket.gethostname()
ram_metric = Gauge(
    "memory_usage_bytes", "Memory usage in bytes.",
    const_labels={'host': host})
ram_metric.set({'type': "virtual"}, 100)
ram_metric.set({'type': "swap"}, 100)

Exporting Metrics

Simple Example

Metrics are exposed to the Prometheus server via a HTTP endpoint. The metrics can retrieved in two different formats; text and binary.

The following example shows how a metrics service can be instantiated along with a Counter. Following typical asyncio usage, an event loop is instantiated first then a Prometheus metrics service is instantiated. The server accepts various arguments such as the interface and port to bind to.

The service can also be passed a specific registry to use or if none is explicitly defined it will create a registry. A registry holds the various metrics collectors that will be exposed by the service.

Next, a counter metric is created to track the number of iterations. This example uses a timer callback to periodically increment the metric tracking iterations. In a realistic application a metric might track the number of requests, etc.

#!/usr/bin/env python
'''
This example demonstrates how a single Counter metric collector can be created
and exposed via a HTTP endpoint.

.. code-block:: console

    (env) $ python simple-example.py
    Serving prometheus metrics on: http://127.0.0.1:50624/metrics

In another terminal fetch the metrics using the ``curl`` command line tool
to verify they can be retrieved by Prometheus server.
'''
import asyncio
import socket
from aioprometheus import Counter, Service


if __name__ == '__main__':

    def on_timer_expiry(loop, events_collector):
        ''' Update the metric periodically '''
        events_collector.inc({'kind': 'timer_expiry'})
        loop.call_later(1.0, on_timer_expiry, loop, events_collector)

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)

    events_collector = Counter(
        "events",
        "Number of events.",
        const_labels={'host': socket.gethostname()})

    svr.registry.register(events_collector)

    loop.run_until_complete(svr.start(addr="127.0.0.1"))
    print('Serving prometheus metrics on: {}'.format(svr.metrics_url))

    loop.call_later(1.0, on_timer_expiry, loop, events_collector)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.close()

The example can be run using

(env) $ python simple-example.py
Serving prometheus metrics on: http://127.0.0.1:50624/metrics

In another terminal fetch the metrics using the curl command line tool.

By default metrics will be returned in plan text format.

$ curl http://127.0.0.1:50624/metrics
# HELP events Number of events.
# TYPE events counter
events{host="alpha",kind="timer_expiry"} 33
$ curl http://127.0.0.1:50624/metrics -H 'Accept: text/plain; version=0.0.4'
# HELP events Number of events.
# TYPE events counter
events{host="alpha",kind="timer_expiry"} 36

Similarly, you can request metrics in binary format, though this will be hard to read on the command line.

$ curl http://127.0.0.1:50624/metrics -H "ACCEPT: application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited"

Application Example

A more representative example is shown below. It implements an application class that uses the aioprometheus package to collect application metrics and expose them on a service endpoint.

#!/usr/bin/env python
'''
This more complicated example implements an application that exposes
application metrics obtained from the psutil package.

This example requires the ``psutil`` package which can be installed
using ``pip install psutil``.
'''

import asyncio
import logging
import psutil
import random
import socket
import uuid

from aioprometheus import (
    Counter,
    Gauge,
    Histogram,
    Service,
    Summary,
    formats,
)

from asyncio.base_events import BaseEventLoop


class ExampleApp(object):
    '''
    An example application that demonstrates how ``aioprometheus`` can be
    used within a Python async application.
    '''

    def __init__(self,
                 metrics_host='127.0.0.1',
                 metrics_port: int = 0,
                 loop: BaseEventLoop = None):

        self.metrics_host = metrics_host
        self.metrics_port = metrics_port
        self.loop = loop or asyncio.get_event_loop()
        self.timer = None  # type: asyncio.Handle

        ######################################################################
        # Create application metrics and metrics service

        # Create a metrics server. The server will create a metrics collector
        # registry if one is not specifically created and passed in.
        self.msvr = Service()

        # Define some constant labels that need to be added to all metrics
        const_labels = {
            'host': socket.gethostname(),
            'app': f'{self.__class__.__name__}-{uuid.uuid4().hex}'}

        # Create metrics collectors

        # Create a counter metric to track requests
        self.requests_metric = Counter(
            "requests",
            "Number of requests.",
            const_labels=const_labels)

        # Collectors must be registered with the registry before they
        # get exposed.
        self.msvr.register(self.requests_metric)

        # Create a gauge metrics to track memory usage.
        self.ram_metric = Gauge(
            "memory_usage_bytes",
            "Memory usage in bytes.",
            const_labels=const_labels)
        self.msvr.register(self.ram_metric)

        # Create a gauge metrics to track CPU.
        self.cpu_metric = Gauge(
            "cpu_usage_percent",
            "CPU usage percent.",
            const_labels=const_labels)
        self.msvr.register(self.cpu_metric)

        self.payload_metric = Summary(
            "request_payload_size_bytes",
            "Request payload size in bytes.",
            const_labels=const_labels,
            invariants=[(0.50, 0.05), (0.99, 0.001)])
        self.msvr.register(self.payload_metric)

        self.latency_metric = Histogram(
            "request_latency_seconds", "Request latency in seconds",
            const_labels=const_labels,
            buckets=[0.1, 0.5, 1.0, 5.0])
        self.msvr.register(self.latency_metric)

    async def start(self):
        ''' Start the application '''
        await self.msvr.start(addr=self.metrics_host, port=self.metrics_port)
        logger.debug('Serving prometheus metrics on: %s', self.msvr.metrics_url)

        # Schedule a timer to update internal metrics. In a realistic
        # application metrics would be updated as needed. In this example
        # application a simple timer is used to emulate things happening,
        # which conveniently allows all metrics to be updated at once.
        self.timer = self.loop.call_later(1.0, self.on_timer_expiry)

    async def stop(self):
        ''' Stop the application '''
        await self.msvr.stop()
        if self.timer:
            self.timer.cancel()
        self.timer = None

    def on_timer_expiry(self):
        ''' Update application to simulate work '''

        # Update memory metrics
        self.ram_metric.set({'type': "virtual", }, psutil.virtual_memory().used)
        self.ram_metric.set({'type': "swap"}, psutil.swap_memory().used)

        # Update cpu metrics
        for c, p in enumerate(psutil.cpu_percent(interval=1, percpu=True)):
            self.cpu_metric.set({'core': c}, p)

        # Incrementing a requests counter to emulate webserver app
        self.requests_metric.inc({'path': "/"})

        # Monitor request payload data to emulate webserver app
        self.payload_metric.add({'path': "/data"}, random.random() * 2**10)

        # Monitor request latency to emulate webserver app
        self.latency_metric.add({'path': "/data"}, random.random() * 5)

        # re-schedule another metrics update
        self.timer = self.loop.call_later(1.0, self.on_timer_expiry)


if __name__ == '__main__':

    logging.basicConfig(level=logging.DEBUG)
    # Silence asyncio and aiohttp loggers
    logging.getLogger('asyncio').setLevel(logging.ERROR)
    logging.getLogger('aiohttp').setLevel(logging.ERROR)
    logger = logging.getLogger(__name__)

    loop = asyncio.get_event_loop()

    app = ExampleApp()
    loop.run_until_complete(app.start())

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(app.stop())
    loop.close()

The example can be run using

(env) $ python app-example.py
Serving prometheus metrics on: http://127.0.0.1:50624/metrics

You can use the curl command line tool to fetch metrics manually or use the helper script described in the next section.

Checking Example using helper script

There is a script in the examples directory that emulates Prometheus server scraping a metrics service endpoint. You can specify a particular format to use (e.g. text or binary). If no format is specified then it will randomly choose a format each time it requests metrics.

usage: metrics-fetcher.py [-h] [--url URL] [--format FORMAT]
                        [--interval INTERVAL] [--debug]

Metrics Fetcher

optional arguments:
-h, --help           show this help message and exit
--url URL            The metrics URL
--format FORMAT      Metrics response format (i.e. 'text' or 'binary'
--interval INTERVAL  The number of seconds between metrics requests
--debug              Show debug output

Example:

$ python metrics-fetcher.py --url=http://127.0.0.1:50624/metrics --format=text --interval=2.0

Checking Example using Prometheus

Once an example is running you can configure Prometheus to begin scraping it’s metrics by creating or updating the configuration file passed to Prometheus. Using the official Prometheus documentation we can create a minimal configuration file to scrape the example application.

global:
  scrape_interval:     15s # By default, scrape targets every 15 seconds.
  evaluation_interval: 15s # By default, scrape targets every 15 seconds.

scrape_configs:
  - job_name:       'test-app'

    # Override the global default and scrape targets from this job every
    # 5 seconds.
    scrape_interval: 5s
    scrape_timeout: 10s

    target_groups:
      - targets: ['localhost:50624']
        labels:
          group: 'dev'

We can then run Prometheus and configure it using the configuration file.

$ ./prometheus -config.file my-prom-config.yaml

Once Prometheus is running you can access at localhost:9090 and can observe the metrics from the example.

Decorators

A number of different decorators are provided to help simplify the process of instrumenting your code. As the aioprometheus library is targeting use in long running asyncio based applications, the decorators return a coroutine object. However, the wrapped function does not have to be a coroutine.

The example below demonstrates how the @timer decorator can be used to time how long it takes to run a function.

#!/usr/bin/env python
'''
Usage:

.. code-block:: python

    $ python decorator_timer.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_processing_seconds Time spent processing request
    # TYPE request_processing_seconds summary
    request_processing_seconds_count 77
    request_processing_seconds_sum 38.19072341918945
    request_processing_seconds{quantile="0.5"} 0.27150511741638184
    request_processing_seconds{quantile="0.9"} 0.5016570091247559
    request_processing_seconds{quantile="0.99"} 0.6077709197998047

'''

import asyncio
import random

from aioprometheus import Service, Summary, timer


# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary(
    'request_processing_seconds', 'Time spent processing request')


# Decorate function with metric.
@timer(REQUEST_TIME)
async def handle_request(duration):
    ''' A dummy function that takes some time '''
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        await handle_request(random.random())


if __name__ == '__main__':

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)
    svr.registry.register(REQUEST_TIME)

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

The following example demonstrates how the @inprogress decorator can be used to track how many requests are in progress.

#!/usr/bin/env python
'''

.. code-block:: python

    $ python decorator_inprogress.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_in_progress Number of requests in progress
    # TYPE request_in_progress gauge
    request_in_progress{route="/"} 1

'''

import asyncio
import random

from aioprometheus import Service, Gauge, inprogress


# Create a metric to track requests currently in progress.
REQUESTS = Gauge(
    'request_in_progress', 'Number of requests in progress')


# Decorate function with metric.
@inprogress(REQUESTS, {'route': '/'})
async def handle_request(duration):
    ''' A dummy function that takes some time '''
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        await handle_request(random.random())


if __name__ == '__main__':

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)
    svr.registry.register(REQUESTS)

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

The next example demonstrates how the @count_exceptions decorator can be used to track the number of exceptions that occur in a function block.

#!/usr/bin/env python
'''

.. code-block:: python

    $ python decorator_count_exceptions.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_handler_exceptions Number of exceptions in requests
    # TYPE request_handler_exceptions counter
    request_handler_exceptions{route="/"} 3

You may need to Ctrl+C twice to exit the example script.

'''

import asyncio
import random

from aioprometheus import Service, Counter, count_exceptions


# Create a metric to track requests currently in progress.
REQUESTS = Counter(
    'request_handler_exceptions', 'Number of exceptions in requests')


# Decorate function with metric.
@count_exceptions(REQUESTS, {'route': '/'})
async def handle_request(duration):
    ''' A dummy function that occasionally raises an exception '''
    if duration < 0.3:
        raise Exception('Ooops')
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        try:
            await handle_request(random.random())
        except Exception:
            pass  # keep handling


if __name__ == '__main__':

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)
    svr.registry.register(REQUESTS)

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

Push Gateway

Another method of exposing metrics is to push them to a gateway that will get scraped by Prometheus.

Prometheus provides a push gateway intermediary that can be used to help monitor components that can not be scraped directly. They might be behind a firewall or might be too short lived. The push gateway allows you to push time series data to it which ensures that data is always exposed reliably via the pull model.

from aioprometheus import Counter, Pusher, Registry

p = Pusher("my-job", "http://127.0.0.1:61423", loop=self.loop)
registry = Registry()
c = Counter("total_requests", "Total requests.", {})
registry.register(c)

c.inc({'url': "/p/user"})

# Push to the pushgateway
resp = await p.replace(registry)

Developers Guide

The project is hosted on GitHub. and uses Travis for Continuous Integration.

If you have found a bug or have an idea for an enhancement that would improve the library, use the bug tracker.

To develop aioprometheus you’ll need Python 3.6, some dependencies and the source code.

Get the source

$ git clone git@github.com:claws/aioprometheus.git

Setup

The best way to work on aioprometheus is to create a virtual env. This isolates your work from other project’s dependencies and ensures that any commands are pointing at the correct tools.

Note

In the following example python is assumed to be the Python 3.6 executable. You may need to explicitly specify this (e.g. use python3) if you have multiple Python’s available on your system.

$ python -m venv myvenv
$ cd myvenv
$ source bin/activate
$ cd ..

To exit the virtual environment simply type deactivate.

Note

The following steps assume you are operating in a virtual environment.

Install Dependencies

Install the developmental dependencies using pip.

$ cd aioprometheus
$ pip install -r requirements.dev.txt

Some rules in the convenience Makefile only work if the dependencies have been installed, such as dist, style, etc.

Install aioprometheus

Use pip to perform a development install of aioprometheus. This installs the package in a way that allows you to edit the code after its installed so that any changes take effect immediately.

$ pip install -e .

Test

The easiest method to run all of the unit tests is to run the make test rule from the top level directory. This runs the standard library unittest tool which discovers all the unit tests and runs them.

$ make test

Or, you can call the standard library unittest module directly.

$ python -m unittest discover -s tests -v

Individual unit tests can be run using the standard library unittest package too.

$ cd aioprometheus/tests
$ python -m unittest test_negotiate

Type Annotations

The code base has been updated with type annotations. These provide helpful gradual typing information that can improve how easily the code is understood and which helps with any future enhancements.

The type annotations checker mypy should run cleanly with no warnings.

Use the Makefile convenience rule to check no issues are reported.

$ make check_types

Coverage

The test code coverage report can be found here

Documentation

To rebuild the project documentation, developers should run the make docs rule from the top level directory. It performs a number of steps to create a new set of sphinx html content.

$ make docs

To quickly view the rendered docs locally as you are working you can use the simple Python web server.

$ cd docs
$ python -m http.server

Then open a browser to the docs content.

Version

aioprometheus uses a three segment CalVer versioning scheme comprising a short year, a zero padded month and then a micro version. The YY.MM part of the version are treated similarly to a SemVer major version. So when backwards incompatible or major functional changes occur the YY.MM will be rolled up. For all other minor changes only the micro part will be incremented.

Release Process

Assuming that the tests are passing, the docs build without warnings and the type annotations check passes without warnings then a release can be made.

The following steps are used to make a new software release:

  • Ensure that the version label in __init__.py is correct. It must comply with the Version scheme.

  • Create the distribution. This project produces an artefact called a pure Python wheel. Only Python3 is supported by this package.

    make dist
    
  • Test distribution. This involves creating a virtual environment, installing the distribution in it and running the tests. These steps have been captured for convenience in a Makefile rule.

    make dist.test
    
  • Upload to PyPI using

    make dist.upload
    
  • Create and push a repo tag to Github.

    git tag YY.MM.MICRO -m "A meaningful release tag comment"
    git tag  # check release tag is in list
    git push --tags origin master
    
    • Github will create a release tarball at:

      https://github.com/{username}/{repo}/tarball/{tag}.tar.gz
      

API

If you are looking for information on a specific function, class or method, this part of the documentation is for you.

Example

#!/usr/bin/env python
'''
This example demonstrates how a single Counter metric collector can be created
and exposed via a HTTP endpoint.

.. code-block:: console

    (env) $ python simple-example.py
    Serving prometheus metrics on: http://127.0.0.1:50624/metrics

In another terminal fetch the metrics using the ``curl`` command line tool
to verify they can be retrieved by Prometheus server.
'''
import asyncio
import socket
from aioprometheus import Counter, Service


if __name__ == '__main__':

    def on_timer_expiry(loop, events_collector):
        ''' Update the metric periodically '''
        events_collector.inc({'kind': 'timer_expiry'})
        loop.call_later(1.0, on_timer_expiry, loop, events_collector)

    loop = asyncio.get_event_loop()

    svr = Service(loop=loop)

    events_collector = Counter(
        "events",
        "Number of events.",
        const_labels={'host': socket.gethostname()})

    svr.registry.register(events_collector)

    loop.run_until_complete(svr.start(addr="127.0.0.1"))
    print('Serving prometheus metrics on: {}'.format(svr.metrics_url))

    loop.call_later(1.0, on_timer_expiry, loop, events_collector)

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.close()

The example above shows a single Counter metric collector being created and exposed via a HTTP endpoint.

The counter metric is created to track the number of iterations. This example uses a timer callback to periodically increment the metric tracking iterations. In a realistic application a metric might track the number of requests, etc.

Following typical asyncio usage, an event loop is instantiated first then a metrics service is instantiated. The metrics service is responsible for managing the various metrics collectors and responding to Prometheus server when it requests metrics.

The server accepts various arguments such as the interface and port to bind to. The service will create a new collector registry if one is not passed in. A collector registry holds the various metrics collectors that will be exposed by the service.

The example script can be run using:

(env) $ cd examples
(env) $ python simple-example.py
Serving prometheus metrics on: http://127.0.0.1:50624/metrics

In another terminal fetch the metrics using the curl command line tool to verify they can be retrieved by Prometheus server.

By default metrics will be returned in plan text format.

$ curl http://127.0.0.1:50624/metrics
# HELP events Number of events.
# TYPE events counter
events{host="alpha",kind="timer_expiry"} 33
$ curl http://127.0.0.1:50624/metrics -H 'Accept: text/plain; version=0.0.4'
# HELP events Number of events.
# TYPE events counter
events{host="alpha",kind="timer_expiry"} 36

Similarly, you can request metrics in binary format, though this will be hard to read on the command line.

$ curl http://127.0.0.1:50624/metrics -H "ACCEPT: application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited"

There are more examples in the examples directory. The app-example.py file will likely be of interest as it provides a more representative application example.

A number of convenience decorator functions are also available to assist with updating metrics.

License

aioprometheus is released under the MIT license.

aioprometheus originates from the (now deprecated) prometheus python package which was released under the MIT license. aioprometheus continues to use the MIT license and contains a copy of the orignal MIT license from the prometheus-python project as instructed by the original license.

Origins

aioprometheus originates from the (now deprecated) prometheus python package. Many thanks to slok for developing prometheus-python. I have taken the original work and modified it to meet the needs of my asyncio-based applications, added the histogram metric, integrated support for binary format, updated and extended tests, added docs, decorators, etc.