User Guide

Install

The current release of aioprometheus is available from PyPI. Use pip to install it.

$ pip install aioprometheus

The ASGI middleware does not have any external dependencies but the Starlette and Quart convenience functions that handle metrics requests do.

If you plan on using the ASGI middleware in a Starlette / FastAPI application then you can install the extra dependencies alongside aioprometheus by adding extras to the install.

$ pip install aioprometheus[starlette]

If you plan on using the ASGI middleware in a Quart application then you can install the extra dependencies alongside aioprometheus by adding extras to the install.

$ pip install aioprometheus[quart]

A Prometheus Push Gateway client and a HTTP service are included, but their dependencies are not installed by default. You can install them alongside aioprometheus by adding optional extras to the install.

$ pip install aioprometheus[aiohttp]

Prometheus 2.0 removed support for the binary protocol, so in version 20.0.0 the dependency on prometheus-metrics-proto, which provides binary support, is now optional. If you need binary response support, for use with an older Prometheus, you will need to specify the ‘binary’ optional extra:

$ pip install aioprometheus[binary]

Multiple optional dependencies can be listed at once, such as:

$ pip install aioprometheus[aiohttp,binary,starlette,quart]

Usage

There are two basic steps involved in using aioprometheus; the first is to instrument your software by creating metrics to monitor events and the second is to expose the metrics to a collector.

Creating a new metric is easy. First, import the appropriate metric from aioprometheus. In the example below it’s a Counter metric. Next, instantiate the metric with a name and a help string. Finally, update the metric when an event occurs. In this case the counter is incremented.

from aioprometheus import Counter

events_counter = Counter(
    "events_counter",
    "Total number of events.",
)

events_counter.inc({"kind": "event A"})

A number of convenience decorator functions are included in aioprometheus that can assist with automatically updating metrics. The examples directory contains various decorators examples.

Once your software is instrumented with metrics you’ll want to expose them to Prometheus or a compatible metrics collector. There are multiple strategies available for this and the right choice depends on the kind of thing being instrumented. See the Exporting Metrics section below.

The following sections describe Instrumenting, Decorators, Exporting Metrics and the Push Gateway in more detail.

Instrumenting

Prometheus provides four kinds of metrics that can be used to instrument your software:

  • Counter,

  • Gauge,

  • Summary and

  • Histogram.

More details on the metrics types can be found here.

By default, each metric gets registered into the default collector registry which is available at aioprometheus.REGISTRY. Metrics accept a registry keyword argument if you need to supply your own registry. The registry is important later when you want to expose your metrics to a collector.

Counter

A counter is a cumulative metric that represents a single numerical value that only ever goes up. A counter is typically used to count requests served, tasks completed, errors occurred, etc. Counters should not be used to expose current counts of items whose number can also go down, e.g. the number of currently running coroutines. Use gauges for this use case.

from aioprometheus import Counter

uploads_metric = Counter("file_uploads_total", "File total uploads.")
uploads_metric.inc({'type': "png"})

Gauge

A Gauge is a metric that represents a single numerical value that can arbitrarily go up and down.

from aioprometheus import Gauge

ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)
ram_metric.inc({'type': "virtual"})
ram_metric.dec({'type': "virtual"})
ram_metric.add({'type': "virtual"}, 5)
ram_metric.sub({'type': "virtual"}, -5)

Summary

A Summary captures individual observations from an event or sample stream and summarizes them in a manner similar to traditional summary statistics.

A summary metrics provides:

  1. sum of observations,

  2. observation count,

  3. rank estimations.

from aioprometheus import Summary

http_access =  Summary("http_access_time", "HTTP access time")
http_access.observe({'time': '/static'}, 3.142)

The default invariants ([(0.50, 0.05), (0.90, 0.01), (0.99, 0.001)]) can be overridden by passing invariants keyword argument to Summary.

from aioprometheus import Summary

http_access =  Summary(
    "http_access_time",
    "HTTP access time",
    invariants=[(0.50, 0.05), (0.99, 0.001)])

Histogram

A Histogram tracks the size and number of events in buckets.

You can use Histograms for aggregatable calculation of quantiles.

from aioprometheus import Histogram

http_access =  Histogram("http_access_time", "HTTP access time")
http_access.observe({'time': '/static'}, 3.142)

The default buckets cover the range 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0. All bucket ranges will include a +Inf bucket. The buckets can be overridden by passing buckets keyword argument to Histogram.

from aioprometheus import Histogram

http_access =  Histogram(
    "http_access_time",
    "HTTP access time",
    buckets=[0.1, 0.5, 1.0, 5.0])

The aioprometheus.histogram module includes two functions that help define differently spaced buckets. The linearBuckets function returns buckets that are spaced linearly while the exponentialBucket function returns buckets that are spaced exponentially.

Labels

All metrics have labels which allow the grouping of related time series.

See best practices and labels for more information of this topic.

To add a metric to a collector you first identify it with a label. In the following example a Gauge collector is created for tracking memory usage. Then a specific metric is created within the collector to track virtual memory usage:

ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)

A single collector is capable of store multiple metric instances. For example, the swap memory could also be monitored using this collector:

ram_metric.set({'type': "swap"}, 100.1)

Const labels

When you create a collector you can also add constant labels. These constant labels will be included with all the metrics gathered by that collector. This avoids needing to constantly add extra labels when updating the metric.

So this example without const labels

host = socket.gethostname()
ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual", 'host': host}, 100)
ram_metric.set({'type': "swap", 'host': host}, 100)

is the same as this one with const labels:

ram_metric = Gauge(
    "memory_usage_bytes", "Memory usage in bytes.",
    const_labels={'host': socket.gethostname()})
ram_metric.set({'type': "virtual"}, 100)
ram_metric.set({'type': "swap"}, 100)

Decorators

A number of different decorators are provided to help simplify the process of instrumenting your code. The decorators return a regular function if the wrapped function is a regular function or an awaitable if the wrapped function is a coroutine function.

The example below demonstrates how the @timer decorator can be used to time how long it takes to run a function.

#!/usr/bin/env python
"""
Usage:

.. code-block:: python

    $ python decorator_timer.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_processing_seconds Time spent processing request
    # TYPE request_processing_seconds summary
    request_processing_seconds_count 77
    request_processing_seconds_sum 38.19072341918945
    request_processing_seconds{quantile="0.5"} 0.27150511741638184
    request_processing_seconds{quantile="0.9"} 0.5016570091247559
    request_processing_seconds{quantile="0.99"} 0.6077709197998047

"""

import asyncio
import random

from aioprometheus import Counter, Summary, timer
from aioprometheus.service import Service

# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary("request_processing_seconds", "Time spent processing request")
REQUESTS = Counter("request_total", "Total number of requests")

# Decorate function with metric.
@timer(REQUEST_TIME)
async def handle_request(duration):
    """A dummy function that takes some time"""
    REQUESTS.inc({"route": "/"})
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        await handle_request(random.random())


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    svr = Service()

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

The following example demonstrates how the @inprogress decorator can be used to track how many requests are in progress.

#!/usr/bin/env python
"""

.. code-block:: python

    $ python decorator_inprogress.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_in_progress Number of requests in progress
    # TYPE request_in_progress gauge
    request_in_progress{route="/"} 1

"""

import asyncio
import random

from aioprometheus import Counter, Gauge, inprogress
from aioprometheus.service import Service

# Create a metric to track requests currently in progress.
REQUESTS_IN_PROGRESS = Gauge("request_in_progress", "Number of requests in progress")
REQUESTS = Counter("request_total", "Total number of requests")

# Decorate function with metric.
@inprogress(REQUESTS_IN_PROGRESS, {"route": "/"})
async def handle_request(duration):
    """A dummy function that takes some time"""
    REQUESTS.inc({"route": "/"})
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        await handle_request(random.random())


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    svr = Service()

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

The next example demonstrates how the @count_exceptions decorator can be used to track the number of exceptions that occur in a function block.

#!/usr/bin/env python
"""

.. code-block:: python

    $ python decorator_count_exceptions.py

The example script can be tested using ``curl``.

.. code-block:: console

    $ curl :8000/metrics
    # HELP request_handler_exceptions Number of exceptions in requests
    # TYPE request_handler_exceptions counter
    request_handler_exceptions{route="/"} 3

You may need to Ctrl+C twice to exit the example script.

"""

import asyncio
import random

from aioprometheus import Counter, count_exceptions
from aioprometheus.service import Service

# Create a metric to track requests currently in progress.
REQUEST_EXCEPTIONS = Counter(
    "request_handler_exceptions", "Number of exceptions in requests"
)
REQUESTS = Counter("request_total", "Total number of requests")

# Decorate function with metric.
@count_exceptions(REQUEST_EXCEPTIONS, {"route": "/"})
async def handle_request(duration):
    """A dummy function that occasionally raises an exception"""
    REQUESTS.inc({"route": "/"})
    if duration < 0.3:
        raise Exception("Ooops")
    await asyncio.sleep(duration)


async def handle_requests():
    # Start up the server to expose the metrics.
    await svr.start(port=8000)
    # Generate some requests.
    while True:
        try:
            await handle_request(random.random())
        except Exception:
            pass  # keep handling


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    svr = Service()

    try:
        loop.run_until_complete(handle_requests())
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(svr.stop())
    loop.stop()
    loop.close()

Exporting Metrics

aioprometheus provides two strategies to expose metrics to Prometheus or a compatible metrics collector via a HTTP endpoint. The right choice depends on the kind of software being instrumented. The first option is aimed at web application frameworks while the second is aimed at asyncio applications.

Web Frameworks

The aioprometheus package can be used within web application frameworks such as Starlette, FastAPI, aiohttp and Quart.

The easiest option for adding Prometheus metrics to a Starlette, FastAPI or Quart application is to use the ASGI Middleware provided by aioprometheus.

The ASGI middleware provides a default set of metrics that include counters for total requests received, total responses sent, exceptions raised and response status codes for route handlers.

Note

Exceptions are not propagated to the ASGI layer by the Quart framework so the default metric tracking exceptions does not work for Quart.

The middleware excludes a set of common paths such as ‘/favicon.ico’, ‘/metrics’ and some others from triggering updates to the default metrics. The complete set is defined in aioprometheus.agsi.middleware.EXCLUDE_PATHS.

Any custom application metrics are automatically included in the exposed metrics.

The example below shows how to use the aioprometheus ASGI middleware in a FastAPI application. FastAPI is built upon Starlette so using the middleware in Starlette would be the same.

#!/usr/bin/env python
"""
This example shows how to use the aioprometheus ASGI middleware in a FastAPI
application. FastAPI is built upon Starlette so using the middleware in
Starlette would be the same.

Run:

  (venv) $ pip install fastapi uvicorn
  (venv) $ python fastapi-middleware.py

"""

from fastapi import FastAPI, Request, Response

from aioprometheus import Counter, MetricsMiddleware
from aioprometheus.asgi.starlette import metrics

app = FastAPI()

# Any custom application metrics are automatically included in the exposed
# metrics. It is a good idea to attach the metrics to 'app.state' so they
# can easily be accessed in the route handler - as metrics are often
# created in a different module than where they are used.
app.state.users_events_counter = Counter("events", "Number of events.")

app.add_middleware(MetricsMiddleware)
app.add_route("/metrics", metrics)


@app.get("/")
async def root(request: Request):  # pylint: disable=unused-argument
    return Response("FastAPI Middleware Example")


@app.get("/users/{user_id}")
async def get_user(
    request: Request,
    user_id: str,
):
    request.app.state.users_events_counter.inc({"path": request.scope["path"]})
    return Response(f"{user_id}")


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)

The example imports a Starlette specific metrics rendering function from the aioprometheus.asgi.starlette module and attaches it to the ‘/metrics’ route. The rendering function requires Starlette so remember to install aioprometheus with the ‘starlette’ extras.

$ pip install aioprometheus[starlette]

Alternatively, if you don’t want the default metrics provided by the ASGI middleware or want finer control over the metrics export function then you can use the aioprometheus render function to help implement your own metrics handler.

The example below shows how this approach can be implemented with a FastAPI application.

#!/usr/bin/env python
"""
This example adds Prometheus metrics to a FastAPI application. In this
example a counter metric is instantiated and gets updated whenever the "/"
route is accessed.

A '/metrics' route is implemented using the render function and added to
the application using the standard web framework method. The metrics route
renders Prometheus metrics from the default collector registry into the
appropriate format.

Run:

  (venv) $ pip install fastapi uvicorn
  (venv) $ python fastapi-example.py

"""

from typing import List

from fastapi import FastAPI, Header, Request, Response

from aioprometheus import REGISTRY, Counter, render

app = FastAPI()
app.state.events_counter = Counter("events", "Number of events.")


@app.get("/")
async def hello(request: Request):
    request.app.state.events_counter.inc({"path": "/"})
    return "FastAPI Hello"


@app.get("/metrics")
async def handle_metrics(
    request: Request,  # pylint: disable=unused-argument
    accept: List[str] = Header(None),
) -> Response:
    content, http_headers = render(REGISTRY, accept)
    return Response(content=content, media_type=http_headers["Content-Type"])


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)

There are more examples in the examples/frameworks directory showing how aioprometheus can be used with other web application frameworks.

Other Applications

Other applications such as long running distributed system processes can embed the aioprometheus Service to provide a dedicated metrics HTTP endpoint on which to export metrics.

The Service can be configured to bind to a user defined network interface and port.

When the Service receives a request for metrics it forms a response by rendering the contents of its registry into the appropriate format. By default the Service uses the default collector registry, which is aioprometheus.REGISTRY. The Service can be configured to use a different registry by passing one in as an argument to the Service constructor.

The Service object also responds to requests sent to its / route. The response is simple HTML. This route can be useful as a Kubernetes /healthz style health indicator as it does not incur any overhead within the service to serialize a full metrics response.

$ curl http://127.0.0.1:8000/
<html><body><a href='/metrics'>metrics</a></body></html>

The Service object requires optional extras to be installed so make sure you install aioprometheus with the ‘aiohttp’ extras.

$ pip install aioprometheus[aiohttp]

The first example shows the basics for how to use the Service object. A counter metric is used to track the number of while loop iterations executed by the ‘updater’ coroutine.

#!/usr/bin/env python
"""
This example demonstrates how the ``aioprometheus.Service`` can be used to
expose metrics on a HTTP endpoint.

.. code-block:: console

    (env) $ python simple-service-example.py
    Serving prometheus metrics on: http://127.0.0.1:8000/metrics

You can open the URL in a browser or use the ``curl`` command line tool to
fetch metrics manually to verify they can be retrieved by Prometheus server.

"""

import asyncio
import socket

from aioprometheus import Counter
from aioprometheus.service import Service


async def main():

    service = Service()
    events_counter = Counter(
        "events", "Number of events.", const_labels={"host": socket.gethostname()}
    )

    await service.start(addr="127.0.0.1", port=8000)
    print(f"Serving prometheus metrics on: {service.metrics_url}")

    # Now start another coroutine to periodically update a metric to
    # simulate the application making some progress.
    async def updater(c: Counter):
        while True:
            c.inc({"kind": "timer_expiry"})
            await asyncio.sleep(1.0)

    await updater(events_counter)

    # Finally stop server
    await service.stop()


if __name__ == "__main__":

    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

The Service is started and then a coroutine is started to periodically update the metric to simulate progress.

The script can be run from the examples directory using:

(venv) $ python simple-service-example.py
Serving prometheus metrics on: http://127.0.0.1:8000/metrics

You can open the URL in a browser or use the curl command line tool to fetch metrics manually. By default metrics will be returned in plan text format.

$ curl http://127.0.0.1:8000/metrics
# HELP events Number of events.
# TYPE events counter
events{kind="timer_expiry"} 33

The next example uses the Service object in a more representative asyncio application. In this case it is a long running application that monitors various process metrics.

#!/usr/bin/env python
"""
This example implements an application that exposes application metrics
obtained from the psutil package.

This example requires the ``psutil`` package which can be installed
using ``pip install psutil``.
"""

import asyncio
import logging
import random
import socket

import psutil

from aioprometheus import Counter, Gauge, Histogram, Summary
from aioprometheus.service import Service


class ExampleApp:
    """
    This example application attempts to demonstrates how ``aioprometheus``
    can be integrated within a Python application built upon asyncio.

    This application attempts to simulate a long running distributed system
    process. It is intentionally not a web service application.

    In this case the aioprometheus Service object is used to provide a
    new HTTP endpoint that can be used to expose Prometheus metrics on.
    """

    def __init__(
        self,
        metrics_host="127.0.0.1",
        metrics_port: int = 8000,
    ):

        self.metrics_host = metrics_host
        self.metrics_port = metrics_port
        self.timer = None  # type: asyncio.Handle

        ######################################################################
        # Create application metrics and metrics service

        # Create a metrics server. The server will create a metrics collector
        # registry if one is not specifically created and passed in.
        self.msvr = Service()

        # Define some constant labels that need to be added to all metrics
        const_labels = {
            "host": socket.gethostname(),
            "app": f"{self.__class__.__name__}",
        }

        # Create metrics collectors. No registry is passed when creating the
        # metrics so they get registered with the default registry.

        # Create a counter metric to track requests.
        self.requests_metric = Counter(
            "requests", "Number of requests.", const_labels=const_labels
        )

        # Create a gauge metrics to track memory usage.
        self.ram_metric = Gauge(
            "memory_usage_bytes", "Memory usage in bytes.", const_labels=const_labels
        )

        # Create a gauge metrics to track CPU.
        self.cpu_metric = Gauge(
            "cpu_usage_percent", "CPU usage percent.", const_labels=const_labels
        )

        self.payload_metric = Summary(
            "request_payload_size_bytes",
            "Request payload size in bytes.",
            const_labels=const_labels,
            invariants=[(0.50, 0.05), (0.99, 0.001)],
        )

        self.latency_metric = Histogram(
            "request_latency_seconds",
            "Request latency in seconds",
            const_labels=const_labels,
            buckets=[0.1, 0.5, 1.0, 5.0],
        )

    async def start(self):
        """Start the application"""
        await self.msvr.start(addr=self.metrics_host, port=self.metrics_port)
        logger.debug(f"Serving prometheus metrics on: {self.msvr.metrics_url}")

        # Schedule a timer to update metrics. In a realistic application
        # the metrics would be updated as needed. In this example, a simple
        # timer is used to emulate things happening, which conveniently
        # allows all metrics to be updated at once.
        self.timer = asyncio.get_event_loop().call_later(1.0, self.on_timer_expiry)

    async def stop(self):
        """Stop the application"""
        await self.msvr.stop()
        if self.timer:
            self.timer.cancel()
        self.timer = None

    def on_timer_expiry(self):
        """Update application to simulate work"""

        # Update memory metrics
        self.ram_metric.set({"type": "virtual"}, psutil.virtual_memory().used)
        self.ram_metric.set({"type": "swap"}, psutil.swap_memory().used)

        # Update cpu metrics
        for c, p in enumerate(psutil.cpu_percent(interval=1, percpu=True)):
            self.cpu_metric.set({"core": c}, p)

        # Incrementing a requests counter to emulate webserver app
        self.requests_metric.inc({"path": "/"})

        # Monitor request payload data to emulate webserver app
        self.payload_metric.add({"path": "/data"}, random.random() * 2 ** 10)

        # Monitor request latency to emulate webserver app
        self.latency_metric.add({"path": "/data"}, random.random() * 5)

        # re-schedule another metrics update
        self.timer = asyncio.get_event_loop().call_later(1.0, self.on_timer_expiry)


if __name__ == "__main__":

    logging.basicConfig(level=logging.DEBUG)
    # Silence asyncio and aiohttp loggers
    logging.getLogger("asyncio").setLevel(logging.ERROR)
    logging.getLogger("aiohttp").setLevel(logging.ERROR)
    logger = logging.getLogger(__name__)

    loop = asyncio.get_event_loop()

    app = ExampleApp()

    loop.run_until_complete(app.start())
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(app.stop())
    loop.close()

The example can be run from the examples directory using:

(env) $ python app-service-example.py
Serving prometheus metrics on: http://127.0.0.1:8000/metrics

You can open the URL in a browser or use the curl command line tool to fetch metrics manually.

Push Gateway

Another method of exposing metrics is to push them to an intermediary that will get scraped by Prometheus. The Prometheus PushGateway exists for this purpose.

This strategy can be useful to obtain metrics from components that can not be scraped directly. They might be behind a firewall or might be too short lived.

The Prometheus Push Gateway allows you to push time series data to it which ensures that data is always exposed reliably via the pull model.

The aioprometheus package provides a Pusher object that can be used within your application to push metrics to a Prometheus Push Gateway. The Pusher allows you to specify a job name as well as additional grouping keys.

The grouping keys get added to the Push Gateway URL using the rules described here. See here for how to configure Prometheus to best scrape metrics from the Push Gateway.

The Pusher object requires optional extras to be installed so make sure you install aioprometheus with the ‘aiohttp’ extras.

$ pip install aioprometheus[aiohttp]
from aioprometheus import REGISTRY, Counter
from aioprometheus.pusher import Pusher

PUSH_GATEWAY_ADDR = "http://127.0.0.1:61423"
pusher = Pusher("my-job", PUSH_GATEWAY_ADDR, grouping_key={"instance": "127.0.0.1:1234"})
c = Counter("total_requests", "Total requests.", {})

c.inc({'url': "/p/user"})

# Push to the push gateway
resp = await pusher.replace(REGISTRY)

Using Prometheus To Check Examples

Prometheus can be configured to scrape metrics from the examples scripts so their metrics can be viewed using the Prometheus dash. Create a minimal configuration file to scrape metrics from the example scripts.

global:
  scrape_interval:     15s # By default, scrape targets every 15 seconds.
  evaluation_interval: 15s # By default, scrape targets every 15 seconds.

scrape_configs:
  - job_name:       'test-app'

    # Override the global default and scrape targets from this job every
    # 5 seconds.
    scrape_interval: 5s
    scrape_timeout: 10s

    target_groups:
      - targets: ['localhost:8000']
        labels:
          group: 'dev'

The official Prometheus configuration documentation can be found here.

Run Prometheus and pass it the configuration file.

$ ./prometheus -config.file my-prom-config.yaml

Once Prometheus is running you can access at localhost:9090 and can observe the metrics from the example.

Testing

When producing unit tests for software that uses aioprometheus it will likely be necessary to clear the default registry between test runs to get it back to a clean state. Failing to do this will likely result in an error being raised reporting that a metric by the same name already exists.

Reseting the deafult registry is easily achieved by calling REGISTRY.clear(). See the unit tests of this project for examples of where this is done.