User Guide¶
Install¶
The current release of aioprometheus is available from PyPI. Use pip
to install it.
$ pip install aioprometheus
The ASGI middleware does not have any external dependencies but the Starlette and Quart convenience functions that handle metrics requests do.
If you plan on using the ASGI middleware in a Starlette / FastAPI application then you can install the extra dependencies alongside aioprometheus by adding extras to the install.
$ pip install aioprometheus[starlette]
If you plan on using the ASGI middleware in a Quart application then you can install the extra dependencies alongside aioprometheus by adding extras to the install.
$ pip install aioprometheus[quart]
A Prometheus Push Gateway client and a HTTP service are included, but their dependencies are not installed by default. You can install them alongside aioprometheus by adding optional extras to the install.
$ pip install aioprometheus[aiohttp]
Prometheus 2.0 removed support for the binary protocol, so in version 20.0.0 the dependency on prometheus-metrics-proto, which provides binary support, is now optional. If you need binary response support, for use with an older Prometheus, you will need to specify the ‘binary’ optional extra:
$ pip install aioprometheus[binary]
Multiple optional dependencies can be listed at once, such as:
$ pip install aioprometheus[aiohttp,binary,starlette,quart]
Usage¶
There are two basic steps involved in using aioprometheus; the first is to instrument your software by creating metrics to monitor events and the second is to expose the metrics to a collector.
Creating a new metric is easy. First, import the appropriate metric from aioprometheus. In the example below it’s a Counter metric. Next, instantiate the metric with a name and a help string. Finally, update the metric when an event occurs. In this case the counter is incremented.
from aioprometheus import Counter
events_counter = Counter(
"events_counter",
"Total number of events.",
)
events_counter.inc({"kind": "event A"})
A number of convenience decorator functions are included in aioprometheus that can assist with automatically updating metrics. The examples
directory contains various decorators examples.
Once your software is instrumented with metrics you’ll want to expose them to Prometheus or a compatible metrics collector. There are multiple strategies available for this and the right choice depends on the kind of thing being instrumented. See the Exporting Metrics section below.
The following sections describe Instrumenting, Decorators, Exporting Metrics and the Push Gateway in more detail.
Instrumenting¶
Prometheus provides four kinds of metrics that can be used to instrument your software:
Counter,
Gauge,
Summary and
Histogram.
More details on the metrics types can be found here.
By default, each metric gets registered into the default collector registry which is available at aioprometheus.REGISTRY
. Metrics accept a registry
keyword argument if you need to supply your own registry. The registry is important later when you want to expose your metrics to a collector.
Counter¶
A counter is a cumulative metric that represents a single numerical value that only ever goes up. A counter is typically used to count requests served, tasks completed, errors occurred, etc. Counters should not be used to expose current counts of items whose number can also go down, e.g. the number of currently running coroutines. Use gauges for this use case.
from aioprometheus import Counter
uploads_metric = Counter("file_uploads_total", "File total uploads.")
uploads_metric.inc({'type': "png"})
Gauge¶
A Gauge is a metric that represents a single numerical value that can arbitrarily go up and down.
from aioprometheus import Gauge
ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)
ram_metric.inc({'type': "virtual"})
ram_metric.dec({'type': "virtual"})
ram_metric.add({'type': "virtual"}, 5)
ram_metric.sub({'type': "virtual"}, -5)
Summary¶
A Summary captures individual observations from an event or sample stream and summarizes them in a manner similar to traditional summary statistics.
A summary metrics provides:
sum of observations,
observation count,
rank estimations.
from aioprometheus import Summary
http_access = Summary("http_access_time", "HTTP access time")
http_access.observe({'time': '/static'}, 3.142)
The default invariants ([(0.50, 0.05), (0.90, 0.01), (0.99, 0.001)]) can be overridden by passing invariants keyword argument to Summary.
from aioprometheus import Summary
http_access = Summary(
"http_access_time",
"HTTP access time",
invariants=[(0.50, 0.05), (0.99, 0.001)])
Histogram¶
A Histogram tracks the size and number of events in buckets.
You can use Histograms for aggregatable calculation of quantiles.
from aioprometheus import Histogram
http_access = Histogram("http_access_time", "HTTP access time")
http_access.observe({'time': '/static'}, 3.142)
The default buckets cover the range 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0. All bucket ranges will include a +Inf bucket. The buckets can be overridden by passing buckets keyword argument to Histogram.
from aioprometheus import Histogram
http_access = Histogram(
"http_access_time",
"HTTP access time",
buckets=[0.1, 0.5, 1.0, 5.0])
The aioprometheus.histogram
module includes two functions that help define differently spaced buckets. The linearBuckets
function returns buckets that are spaced linearly while the exponentialBucket
function returns buckets that are spaced exponentially.
Labels¶
All metrics have labels which allow the grouping of related time series.
See best practices and labels for more information of this topic.
To add a metric to a collector you first identify it with a label. In the following example a Gauge collector is created for tracking memory usage. Then a specific metric is created within the collector to track virtual memory usage:
ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual"}, 100)
A single collector is capable of store multiple metric instances. For example, the swap memory could also be monitored using this collector:
ram_metric.set({'type': "swap"}, 100.1)
Const labels¶
When you create a collector you can also add constant labels. These constant labels will be included with all the metrics gathered by that collector. This avoids needing to constantly add extra labels when updating the metric.
So this example without const labels
host = socket.gethostname()
ram_metric = Gauge("memory_usage_bytes", "Memory usage in bytes.")
ram_metric.set({'type': "virtual", 'host': host}, 100)
ram_metric.set({'type': "swap", 'host': host}, 100)
is the same as this one with const labels:
ram_metric = Gauge(
"memory_usage_bytes", "Memory usage in bytes.",
const_labels={'host': socket.gethostname()})
ram_metric.set({'type': "virtual"}, 100)
ram_metric.set({'type': "swap"}, 100)
Decorators¶
A number of different decorators are provided to help simplify the process of instrumenting your code. The decorators return a regular function if the wrapped function is a regular function or an awaitable if the wrapped function is a coroutine function.
The example below demonstrates how the @timer
decorator can be used to time how long it takes to run a function.
#!/usr/bin/env python
"""
Usage:
.. code-block:: python
$ python decorator_timer.py
The example script can be tested using ``curl``.
.. code-block:: console
$ curl :8000/metrics
# HELP request_processing_seconds Time spent processing request
# TYPE request_processing_seconds summary
request_processing_seconds_count 77
request_processing_seconds_sum 38.19072341918945
request_processing_seconds{quantile="0.5"} 0.27150511741638184
request_processing_seconds{quantile="0.9"} 0.5016570091247559
request_processing_seconds{quantile="0.99"} 0.6077709197998047
"""
import asyncio
import random
from aioprometheus import Counter, Summary, timer
from aioprometheus.service import Service
# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary("request_processing_seconds", "Time spent processing request")
REQUESTS = Counter("request_total", "Total number of requests")
# Decorate function with metric.
@timer(REQUEST_TIME)
async def handle_request(duration):
"""A dummy function that takes some time"""
REQUESTS.inc({"route": "/"})
await asyncio.sleep(duration)
async def handle_requests():
# Start up the server to expose the metrics.
await svr.start(port=8000)
# Generate some requests.
while True:
await handle_request(random.random())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
svr = Service()
try:
loop.run_until_complete(handle_requests())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(svr.stop())
loop.stop()
loop.close()
The following example demonstrates how the @inprogress
decorator can be used to track how many requests are in progress.
#!/usr/bin/env python
"""
.. code-block:: python
$ python decorator_inprogress.py
The example script can be tested using ``curl``.
.. code-block:: console
$ curl :8000/metrics
# HELP request_in_progress Number of requests in progress
# TYPE request_in_progress gauge
request_in_progress{route="/"} 1
"""
import asyncio
import random
from aioprometheus import Counter, Gauge, inprogress
from aioprometheus.service import Service
# Create a metric to track requests currently in progress.
REQUESTS_IN_PROGRESS = Gauge("request_in_progress", "Number of requests in progress")
REQUESTS = Counter("request_total", "Total number of requests")
# Decorate function with metric.
@inprogress(REQUESTS_IN_PROGRESS, {"route": "/"})
async def handle_request(duration):
"""A dummy function that takes some time"""
REQUESTS.inc({"route": "/"})
await asyncio.sleep(duration)
async def handle_requests():
# Start up the server to expose the metrics.
await svr.start(port=8000)
# Generate some requests.
while True:
await handle_request(random.random())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
svr = Service()
try:
loop.run_until_complete(handle_requests())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(svr.stop())
loop.stop()
loop.close()
The next example demonstrates how the @count_exceptions
decorator can be used to track the number of exceptions that occur in a function block.
#!/usr/bin/env python
"""
.. code-block:: python
$ python decorator_count_exceptions.py
The example script can be tested using ``curl``.
.. code-block:: console
$ curl :8000/metrics
# HELP request_handler_exceptions Number of exceptions in requests
# TYPE request_handler_exceptions counter
request_handler_exceptions{route="/"} 3
You may need to Ctrl+C twice to exit the example script.
"""
import asyncio
import random
from aioprometheus import Counter, count_exceptions
from aioprometheus.service import Service
# Create a metric to track requests currently in progress.
REQUEST_EXCEPTIONS = Counter(
"request_handler_exceptions", "Number of exceptions in requests"
)
REQUESTS = Counter("request_total", "Total number of requests")
# Decorate function with metric.
@count_exceptions(REQUEST_EXCEPTIONS, {"route": "/"})
async def handle_request(duration):
"""A dummy function that occasionally raises an exception"""
REQUESTS.inc({"route": "/"})
if duration < 0.3:
raise Exception("Ooops")
await asyncio.sleep(duration)
async def handle_requests():
# Start up the server to expose the metrics.
await svr.start(port=8000)
# Generate some requests.
while True:
try:
await handle_request(random.random())
except Exception:
pass # keep handling
if __name__ == "__main__":
loop = asyncio.get_event_loop()
svr = Service()
try:
loop.run_until_complete(handle_requests())
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(svr.stop())
loop.stop()
loop.close()
Exporting Metrics¶
aioprometheus provides two strategies to expose metrics to Prometheus or a compatible metrics collector via a HTTP endpoint. The right choice depends on the kind of software being instrumented. The first option is aimed at web application frameworks while the second is aimed at asyncio applications.
Web Frameworks¶
The aioprometheus package can be used within web application frameworks such as Starlette
, FastAPI
, aiohttp
and Quart
.
The easiest option for adding Prometheus metrics to a Starlette, FastAPI or Quart application is to use the ASGI Middleware provided by aioprometheus.
The ASGI middleware provides a default set of metrics that include counters for total requests received, total responses sent, exceptions raised and response status codes for route handlers.
Note
Exceptions are not propagated to the ASGI layer by the Quart framework so the default metric tracking exceptions does not work for Quart.
The middleware excludes a set of common paths such as ‘/favicon.ico’, ‘/metrics’ and some others from triggering updates to the default metrics. The complete set is defined in aioprometheus.agsi.middleware.EXCLUDE_PATHS
.
Any custom application metrics are automatically included in the exposed metrics.
The example below shows how to use the aioprometheus ASGI middleware in a FastAPI application. FastAPI is built upon Starlette so using the middleware in Starlette would be the same.
#!/usr/bin/env python
"""
This example shows how to use the aioprometheus ASGI middleware in a FastAPI
application. FastAPI is built upon Starlette so using the middleware in
Starlette would be the same.
Run:
(venv) $ pip install fastapi uvicorn
(venv) $ python fastapi-middleware.py
"""
from fastapi import FastAPI, Request, Response
from aioprometheus import Counter, MetricsMiddleware
from aioprometheus.asgi.starlette import metrics
app = FastAPI()
# Any custom application metrics are automatically included in the exposed
# metrics. It is a good idea to attach the metrics to 'app.state' so they
# can easily be accessed in the route handler - as metrics are often
# created in a different module than where they are used.
app.state.users_events_counter = Counter("events", "Number of events.")
app.add_middleware(MetricsMiddleware)
app.add_route("/metrics", metrics)
@app.get("/")
async def root(request: Request): # pylint: disable=unused-argument
return Response("FastAPI Middleware Example")
@app.get("/users/{user_id}")
async def get_user(
request: Request,
user_id: str,
):
request.app.state.users_events_counter.inc({"path": request.scope["path"]})
return Response(f"{user_id}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app)
The example imports a Starlette specific metrics rendering function from the aioprometheus.asgi.starlette
module and attaches it to the ‘/metrics’ route. The rendering function requires Starlette so remember to install aioprometheus with the ‘starlette’ extras.
$ pip install aioprometheus[starlette]
Alternatively, if you don’t want the default metrics provided by the ASGI middleware or want finer control over the metrics export function then you can use the aioprometheus render function to help implement your own metrics handler.
The example below shows how this approach can be implemented with a FastAPI application.
#!/usr/bin/env python
"""
This example adds Prometheus metrics to a FastAPI application. In this
example a counter metric is instantiated and gets updated whenever the "/"
route is accessed.
A '/metrics' route is implemented using the render function and added to
the application using the standard web framework method. The metrics route
renders Prometheus metrics from the default collector registry into the
appropriate format.
Run:
(venv) $ pip install fastapi uvicorn
(venv) $ python fastapi-example.py
"""
from typing import List
from fastapi import FastAPI, Header, Request, Response
from aioprometheus import REGISTRY, Counter, render
app = FastAPI()
app.state.events_counter = Counter("events", "Number of events.")
@app.get("/")
async def hello(request: Request):
request.app.state.events_counter.inc({"path": "/"})
return "FastAPI Hello"
@app.get("/metrics")
async def handle_metrics(
request: Request, # pylint: disable=unused-argument
accept: List[str] = Header(None),
) -> Response:
content, http_headers = render(REGISTRY, accept)
return Response(content=content, media_type=http_headers["Content-Type"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app)
There are more examples in the examples/frameworks
directory showing how aioprometheus can be used with other web application frameworks.
Other Applications¶
Other applications such as long running distributed system processes can embed the aioprometheus Service to provide a dedicated metrics HTTP endpoint on which to export metrics.
The Service can be configured to bind to a user defined network interface and port.
When the Service receives a request for metrics it forms a response by rendering the contents of its registry into the appropriate format. By default the Service uses the default collector registry, which is aioprometheus.REGISTRY
. The Service can be configured to use a different registry by passing one in as an argument to the Service constructor.
The Service object also responds to requests sent to its /
route. The response is simple HTML. This route can be useful as a Kubernetes /healthz
style health indicator as it does not incur any overhead within the service to serialize a full metrics response.
$ curl http://127.0.0.1:8000/
<html><body><a href='/metrics'>metrics</a></body></html>
The Service object requires optional extras to be installed so make sure you install aioprometheus with the ‘aiohttp’ extras.
$ pip install aioprometheus[aiohttp]
The first example shows the basics for how to use the Service object. A counter metric is used to track the number of while loop iterations executed by the ‘updater’ coroutine.
#!/usr/bin/env python
"""
This example demonstrates how the ``aioprometheus.Service`` can be used to
expose metrics on a HTTP endpoint.
.. code-block:: console
(env) $ python simple-service-example.py
Serving prometheus metrics on: http://127.0.0.1:8000/metrics
You can open the URL in a browser or use the ``curl`` command line tool to
fetch metrics manually to verify they can be retrieved by Prometheus server.
"""
import asyncio
import socket
from aioprometheus import Counter
from aioprometheus.service import Service
async def main():
service = Service()
events_counter = Counter(
"events", "Number of events.", const_labels={"host": socket.gethostname()}
)
await service.start(addr="127.0.0.1", port=8000)
print(f"Serving prometheus metrics on: {service.metrics_url}")
# Now start another coroutine to periodically update a metric to
# simulate the application making some progress.
async def updater(c: Counter):
while True:
c.inc({"kind": "timer_expiry"})
await asyncio.sleep(1.0)
await updater(events_counter)
# Finally stop server
await service.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
The Service is started and then a coroutine is started to periodically update the metric to simulate progress.
The script can be run from the examples directory using:
(venv) $ python simple-service-example.py
Serving prometheus metrics on: http://127.0.0.1:8000/metrics
You can open the URL in a browser or use the curl
command line tool to fetch metrics manually. By default metrics will be returned in plan text format.
$ curl http://127.0.0.1:8000/metrics
# HELP events Number of events.
# TYPE events counter
events{kind="timer_expiry"} 33
The next example uses the Service object in a more representative asyncio application. In this case it is a long running application that monitors various process metrics.
#!/usr/bin/env python
"""
This example implements an application that exposes application metrics
obtained from the psutil package.
This example requires the ``psutil`` package which can be installed
using ``pip install psutil``.
"""
import asyncio
import logging
import random
import socket
import psutil
from aioprometheus import Counter, Gauge, Histogram, Summary
from aioprometheus.service import Service
class ExampleApp:
"""
This example application attempts to demonstrates how ``aioprometheus``
can be integrated within a Python application built upon asyncio.
This application attempts to simulate a long running distributed system
process. It is intentionally not a web service application.
In this case the aioprometheus Service object is used to provide a
new HTTP endpoint that can be used to expose Prometheus metrics on.
"""
def __init__(
self,
metrics_host="127.0.0.1",
metrics_port: int = 8000,
):
self.metrics_host = metrics_host
self.metrics_port = metrics_port
self.timer = None # type: asyncio.Handle
######################################################################
# Create application metrics and metrics service
# Create a metrics server. The server will create a metrics collector
# registry if one is not specifically created and passed in.
self.msvr = Service()
# Define some constant labels that need to be added to all metrics
const_labels = {
"host": socket.gethostname(),
"app": f"{self.__class__.__name__}",
}
# Create metrics collectors. No registry is passed when creating the
# metrics so they get registered with the default registry.
# Create a counter metric to track requests.
self.requests_metric = Counter(
"requests", "Number of requests.", const_labels=const_labels
)
# Create a gauge metrics to track memory usage.
self.ram_metric = Gauge(
"memory_usage_bytes", "Memory usage in bytes.", const_labels=const_labels
)
# Create a gauge metrics to track CPU.
self.cpu_metric = Gauge(
"cpu_usage_percent", "CPU usage percent.", const_labels=const_labels
)
self.payload_metric = Summary(
"request_payload_size_bytes",
"Request payload size in bytes.",
const_labels=const_labels,
invariants=[(0.50, 0.05), (0.99, 0.001)],
)
self.latency_metric = Histogram(
"request_latency_seconds",
"Request latency in seconds",
const_labels=const_labels,
buckets=[0.1, 0.5, 1.0, 5.0],
)
async def start(self):
"""Start the application"""
await self.msvr.start(addr=self.metrics_host, port=self.metrics_port)
logger.debug(f"Serving prometheus metrics on: {self.msvr.metrics_url}")
# Schedule a timer to update metrics. In a realistic application
# the metrics would be updated as needed. In this example, a simple
# timer is used to emulate things happening, which conveniently
# allows all metrics to be updated at once.
self.timer = asyncio.get_event_loop().call_later(1.0, self.on_timer_expiry)
async def stop(self):
"""Stop the application"""
await self.msvr.stop()
if self.timer:
self.timer.cancel()
self.timer = None
def on_timer_expiry(self):
"""Update application to simulate work"""
# Update memory metrics
self.ram_metric.set({"type": "virtual"}, psutil.virtual_memory().used)
self.ram_metric.set({"type": "swap"}, psutil.swap_memory().used)
# Update cpu metrics
for c, p in enumerate(psutil.cpu_percent(interval=1, percpu=True)):
self.cpu_metric.set({"core": c}, p)
# Incrementing a requests counter to emulate webserver app
self.requests_metric.inc({"path": "/"})
# Monitor request payload data to emulate webserver app
self.payload_metric.add({"path": "/data"}, random.random() * 2 ** 10)
# Monitor request latency to emulate webserver app
self.latency_metric.add({"path": "/data"}, random.random() * 5)
# re-schedule another metrics update
self.timer = asyncio.get_event_loop().call_later(1.0, self.on_timer_expiry)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
# Silence asyncio and aiohttp loggers
logging.getLogger("asyncio").setLevel(logging.ERROR)
logging.getLogger("aiohttp").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
app = ExampleApp()
loop.run_until_complete(app.start())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(app.stop())
loop.close()
The example can be run from the examples directory using:
(env) $ python app-service-example.py
Serving prometheus metrics on: http://127.0.0.1:8000/metrics
You can open the URL in a browser or use the curl
command line tool to fetch metrics manually.
Push Gateway¶
Another method of exposing metrics is to push them to an intermediary that will get scraped by Prometheus. The Prometheus PushGateway exists for this purpose.
This strategy can be useful to obtain metrics from components that can not be scraped directly. They might be behind a firewall or might be too short lived.
The Prometheus Push Gateway allows you to push time series data to it which ensures that data is always exposed reliably via the pull model.
The aioprometheus package provides a Pusher object that can be used within your application to push metrics to a Prometheus Push Gateway. The Pusher allows you to specify a job name as well as additional grouping keys.
The grouping keys get added to the Push Gateway URL using the rules described here. See here for how to configure Prometheus to best scrape metrics from the Push Gateway.
The Pusher object requires optional extras to be installed so make sure you install aioprometheus with the ‘aiohttp’ extras.
$ pip install aioprometheus[aiohttp]
from aioprometheus import REGISTRY, Counter
from aioprometheus.pusher import Pusher
PUSH_GATEWAY_ADDR = "http://127.0.0.1:61423"
pusher = Pusher("my-job", PUSH_GATEWAY_ADDR, grouping_key={"instance": "127.0.0.1:1234"})
c = Counter("total_requests", "Total requests.", {})
c.inc({'url': "/p/user"})
# Push to the push gateway
resp = await pusher.replace(REGISTRY)
Using Prometheus To Check Examples¶
Prometheus can be configured to scrape metrics from the examples scripts so their metrics can be viewed using the Prometheus dash. Create a minimal configuration file to scrape metrics from the example scripts.
global:
scrape_interval: 15s # By default, scrape targets every 15 seconds.
evaluation_interval: 15s # By default, scrape targets every 15 seconds.
scrape_configs:
- job_name: 'test-app'
# Override the global default and scrape targets from this job every
# 5 seconds.
scrape_interval: 5s
scrape_timeout: 10s
target_groups:
- targets: ['localhost:8000']
labels:
group: 'dev'
The official Prometheus configuration documentation can be found here.
Run Prometheus and pass it the configuration file.
$ ./prometheus -config.file my-prom-config.yaml
Once Prometheus is running you can access at localhost:9090 and can observe the metrics from the example.
Testing¶
When producing unit tests for software that uses aioprometheus it will likely be necessary to clear the default registry between test runs to get it back to a clean state. Failing to do this will likely result in an error being raised reporting that a metric by the same name already exists.
Reseting the deafult registry is easily achieved by calling REGISTRY.clear()
. See the unit tests of this project for examples of where this is done.