Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ jobs:
uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 # v6.1.0
with:
python-version: ${{ matrix.python-version }}
- name: Install Redis
run: |
apt-get -y install redis-server
- name: Install dependencies
run: |
pip install --user tox "virtualenv<20.22.0"
Expand Down
14 changes: 14 additions & 0 deletions prometheus_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ def remove(self, *labelvalues: Any) -> None:
warnings.warn(
"Removal of labels has not been implemented in multi-process mode yet.",
UserWarning)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Removal of labels has not been implemented in redis mode yet.",
UserWarning)

if not self._labelnames:
raise ValueError('No label names were set when constructing %s' % self)
Expand All @@ -226,6 +230,10 @@ def remove_by_labels(self, labels: dict[str, str]) -> None:
"Removal of labels has not been implemented in multi-process mode yet.",
UserWarning
)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Removal of labels has not been implemented in redis mode yet.",
UserWarning)

if not self._labelnames:
raise ValueError('No label names were set when constructing %s' % self)
Expand Down Expand Up @@ -258,6 +266,10 @@ def clear(self) -> None:
warnings.warn(
"Clearing labels has not been implemented in multi-process mode yet",
UserWarning)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Clearing of labels has not been implemented in redis mode yet.",
UserWarning)
with self._lock:
self._metrics = {}

Expand Down Expand Up @@ -652,6 +664,7 @@ def _metric_init(self) -> None:
self._created = time.time()
bucket_labelnames = self._labelnames + ('le',)
self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames, self._labelvalues, self._documentation)
for b in self._upper_bounds:
self._buckets.append(values.ValueClass(
self._type,
Expand All @@ -674,6 +687,7 @@ def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> N
"""
self._raise_if_not_observable()
self._sum.inc(amount)
self._count.inc(1)
for i, bound in enumerate(self._upper_bounds):
if amount <= bound:
self._buckets[i].inc(1)
Expand Down
2 changes: 0 additions & 2 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ def _accumulate_metrics(metrics, accumulate):
samples[labels][sample_key] = acc
else:
samples[labels][sample_key] = value
if accumulate:
samples[labels][(metric.name + '_count', labels)] = acc

# Convert to correct sample format.
metric.samples = []
Expand Down
107 changes: 107 additions & 0 deletions prometheus_client/redis_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from collections.abc import Iterable
import json
import os
from urllib.parse import urlsplit

from .metrics_core import Metric
from .registry import Collector, CollectorRegistry
from .samples import Sample


def redis_client():
"""
Create a redis client for PROMETHEUS_REDIS_URL.

Configure the redis database via a URL in PROMETHEUS_REDIS_URL of the form
redis://localhost:6379/0
"""
from redis import Redis

parsed_url = urlsplit(os.environ["PROMETHEUS_REDIS_URL"])
assert parsed_url.scheme == "redis"
assert parsed_url.path.startswith("/")
assert parsed_url.path[1:].isdigit()
port = parsed_url.port or 6379
db = int(parsed_url.path[1:])
return Redis(host=parsed_url.hostname, port=port, db=db)


class RedisCollector(Collector):
"""Collector for redis mode."""

def __init__(self, registry: CollectorRegistry | None) -> None:
self._client = redis_client()
if registry:
registry.register(self)

def _iter_values(self) -> Iterable[tuple[bytes, str]]:
cursor = 0
while True:
cursor, keys = self._client.scan(cursor=cursor, match="value:*")
values = self._client.mget(keys)
yield from zip(keys, values)
if cursor == 0:
break

def collect(self) -> Iterable[Metric]:
metrics: dict[str, Metric] = {}
histograms: set[str] = set()

for key, value_s in self._iter_values():
# FIXME: Catch ValueError here, just in case?
prefix_b, typ_b, mmap_key = key.split(b":", 2)
assert prefix_b == b"value"
typ = typ_b.decode()
value = float(value_s)

metric_name, name, labels, help_text = json.loads(mmap_key)

metric = metrics.get(metric_name)
if metric is None:
metric = Metric(metric_name, help_text, typ)
metrics[metric_name] = metric
if typ in ("histogram", "gaugehistogram"):
histograms.add(metric_name)

metric.add_sample(name, labels, value)

for name in histograms:
self._fix_histogram(metrics[name])

return metrics.values()

def _fix_histogram(self, metric: Metric) -> None:
"""
Fix-up histogram samples.

Sort the buckets as expected by a client, and accumulate the values.
The Histogram class is optimized to only increment the bucket that a
value first appears in, not larger ones that would also contain it.
"""
by_label: dict[tuple[tuple[str, ...], str], list[Sample]] = {}

# Organize into lists of samples by label
for sample in metric.samples:
if "le" in sample.labels:
labels_without_le = sample.labels.copy()
labels_without_le.pop("le")
key = (tuple(labels_without_le.values()), sample.name)
else:
key = (tuple(sample.labels.values()), sample.name)
by_label.setdefault(key, []).append(sample)

metric.samples = []

for (labels, name), samples in sorted(by_label.items()):
if name.endswith("_bucket"):
# Sort buckets within each label
samples.sort(key=lambda sample: float(sample.labels["le"]))

# Accumulate values into larger buckets
value = 0.0
for sample in samples:
value += sample.value
metric.samples.append(Sample(sample.name, sample.labels, value))

else:
metric.samples.extend(samples)
100 changes: 96 additions & 4 deletions prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,48 @@
import os
from threading import Lock
from typing import Any, Protocol
import warnings

from .mmap_dict import mmap_key, MmapedDict
from .redis_collector import redis_client
from .samples import Exemplar


class MutexValue:
class Value(Protocol):
"""Prometheus Client Metric implementation."""

_multiprocess: bool

def __init__(
self,
typ: str,
metric_name: str,
name: str,
labelnames: list[str],
labelvalues: list[str],
help_text: str,
**kwargs: Any,
) -> None:
"""Initialize a metric."""

def inc(self, amount: float) -> None:
"""Increment the metric by amount."""

def set(self, value: float, timestamp: float | None = None) -> None:
"""Set the metric to value."""

def get(self) -> float:
"""Get the current metric value."""

def set_exemplar(self, exemplar: Exemplar) -> None:
"""Set an exemplar value."""
exemplar # For vulture

def get_exemplar(self) -> Exemplar | None:
"""Get any set exemplar value."""


class MutexValue(Value):
"""A float protected by a mutex."""

_multiprocess = False
Expand Down Expand Up @@ -52,7 +89,7 @@ def MultiProcessValue(process_identifier=os.getpid):
# This avoids the need to also have mutexes in __MmapDict.
lock = Lock()

class MmapedValue:
class MmapedValue(Value):
"""A float protected by a mutex backed by a per-process mmaped file."""

_multiprocess = True
Expand Down Expand Up @@ -125,12 +162,67 @@ def get_exemplar(self):
return MmapedValue


def get_value_class():
def RedisValue():
"""
A value implementation that stores data in a redis/valkey database.

Key scheme:
* value:typ:MMAP_KEY
"""
client = redis_client()

class RedisValueImpl(Value):
"""A float stored by redis."""

_multiprocess = False

def __init__(
self,
typ: str,
metric_name: str,
name: str,
labelnames: list[str],
labelvalues: list[str],
help_text: str,
**kwargs: Any,
) -> None:
key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
self._key = f"value:{typ}:{key}"
self._redis = client
self._redis.setnx(self._key, 0.0)

def inc(self, amount: float) -> None:
self._redis.incrbyfloat(self._key, amount)

def set(self, value: float, timestamp: float | None = None) -> None:
# TODO: Implement timestamps
self._redis.set(self._key, value)

def get(self) -> float:
value = self._redis.get(self._key)
if value is None:
return 0.0
return float(value)

def set_exemplar(self, exemplar: Exemplar) -> None:
# TODO: Implement exemplars for redis.
return

def get_exemplar(self) -> Exemplar | None:
# TODO: Implement exemplars for redis.
return None

return RedisValueImpl


def get_value_class() -> type[Value]:
# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an environment variable.
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
if "PROMETHEUS_REDIS_URL" in os.environ:
return RedisValue()
elif 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
return MultiProcessValue()
else:
return MutexValue
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ aiohttp = [
django = [
"django",
]
redis = [
"redis",
]

[project.urls]
Homepage = "https://github.com/prometheus/client_python"
Expand Down
7 changes: 4 additions & 3 deletions tests/test_multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ def add_label(key, value):

expected_histogram = [
Sample('h_sum', labels, 6.0),
Sample('h_count', labels, 2.0),
Sample('h_bucket', add_label('le', '0.005'), 0.0),
Sample('h_bucket', add_label('le', '0.01'), 0.0),
Sample('h_bucket', add_label('le', '0.025'), 0.0),
Expand All @@ -293,7 +294,6 @@ def add_label(key, value):
Sample('h_bucket', add_label('le', '7.5'), 2.0),
Sample('h_bucket', add_label('le', '10.0'), 2.0),
Sample('h_bucket', add_label('le', '+Inf'), 2.0),
Sample('h_count', labels, 2.0),
]

self.assertEqual(metrics['h'].samples, expected_histogram)
Expand Down Expand Up @@ -321,6 +321,7 @@ def add_label(key, value):

expected_histogram = [
Sample('h_sum', {'view': 'view1'}, 6.0),
Sample('h_count', {'view': 'view1'}, 2.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.005'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.01'}, 0.0),
Sample('h_bucket', {'view': 'view1', 'le': '0.025'}, 0.0),
Expand All @@ -336,8 +337,8 @@ def add_label(key, value):
Sample('h_bucket', {'view': 'view1', 'le': '7.5'}, 2.0),
Sample('h_bucket', {'view': 'view1', 'le': '10.0'}, 2.0),
Sample('h_bucket', {'view': 'view1', 'le': '+Inf'}, 2.0),
Sample('h_count', {'view': 'view1'}, 2.0),
Sample('h_sum', {'view': 'view2'}, 1.0),
Sample('h_count', {'view': 'view2'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.005'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.01'}, 0.0),
Sample('h_bucket', {'view': 'view2', 'le': '0.025'}, 0.0),
Expand All @@ -353,7 +354,6 @@ def add_label(key, value):
Sample('h_bucket', {'view': 'view2', 'le': '7.5'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '10.0'}, 1.0),
Sample('h_bucket', {'view': 'view2', 'le': '+Inf'}, 1.0),
Sample('h_count', {'view': 'view2'}, 1.0),
]

self.assertEqual(metrics['h'].samples, expected_histogram)
Expand Down Expand Up @@ -435,6 +435,7 @@ def add_label(key, value):

expected_histogram = [
Sample('h_sum', labels, 6.0),
Sample('h_count', labels, 2.0),
Sample('h_bucket', add_label('le', '0.005'), 0.0),
Sample('h_bucket', add_label('le', '0.01'), 0.0),
Sample('h_bucket', add_label('le', '0.025'), 0.0),
Expand Down
Loading