1use std::borrow::Cow;
4
5use opentelemetry::KeyValue;
6
7#[doc(hidden)]
9pub trait IsCollector: private::Sealed {
10 type Builder<'a>;
11
12 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_>;
13}
14
15mod private {
16 pub trait Sealed {
17 type Value;
18 }
19}
20
21macro_rules! impl_collector {
22 ($t:ty, $value:ty, $func:ident, $builder:ty) => {
23 impl private::Sealed for $t {
24 type Value = $value;
25 }
26
27 impl IsCollector for $t {
28 type Builder<'a> = $builder;
29
30 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_> {
31 meter.$func(name)
32 }
33 }
34 };
35}
36
37pub type Counter<T> = opentelemetry::metrics::Counter<T>;
41
42pub type CounterF64 = Counter<f64>;
46
47pub type CounterU64 = Counter<u64>;
51
52impl_collector!(
53 CounterF64,
54 f64,
55 f64_counter,
56 opentelemetry::metrics::InstrumentBuilder<'a, CounterF64>
57);
58impl_collector!(
59 CounterU64,
60 u64,
61 u64_counter,
62 opentelemetry::metrics::InstrumentBuilder<'a, CounterU64>
63);
64
65pub type Gauge<T> = opentelemetry::metrics::Gauge<T>;
70
71pub type GaugeF64 = Gauge<f64>;
77
78pub type GaugeI64 = Gauge<i64>;
84
85pub type GaugeU64 = Gauge<u64>;
91
92impl_collector!(
93 GaugeF64,
94 f64,
95 f64_gauge,
96 opentelemetry::metrics::InstrumentBuilder<'a, GaugeF64>
97);
98impl_collector!(
99 GaugeI64,
100 i64,
101 i64_gauge,
102 opentelemetry::metrics::InstrumentBuilder<'a, GaugeI64>
103);
104impl_collector!(
105 GaugeU64,
106 u64,
107 u64_gauge,
108 opentelemetry::metrics::InstrumentBuilder<'a, GaugeU64>
109);
110
111pub type Histogram<T> = opentelemetry::metrics::Histogram<T>;
115
116pub type HistogramF64 = Histogram<f64>;
120
121pub type HistogramU64 = Histogram<u64>;
125
126impl private::Sealed for HistogramF64 {
127 type Value = f64;
128}
129
130const DEFAULT_BOUNDARIES: [f64; 11] = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
132
133impl IsCollector for HistogramF64 {
134 type Builder<'a> = opentelemetry::metrics::HistogramBuilder<'a, HistogramF64>;
135
136 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_> {
137 meter.f64_histogram(name).with_boundaries(DEFAULT_BOUNDARIES.into())
138 }
139}
140
141impl private::Sealed for HistogramU64 {
142 type Value = u64;
143}
144
145impl IsCollector for HistogramU64 {
146 type Builder<'a> = opentelemetry::metrics::HistogramBuilder<'a, HistogramU64>;
147
148 fn builder(meter: &opentelemetry::metrics::Meter, name: impl Into<Cow<'static, str>>) -> Self::Builder<'_> {
149 meter.u64_histogram(name).with_boundaries(DEFAULT_BOUNDARIES.into())
150 }
151}
152
153pub type UpDownCounter<T> = opentelemetry::metrics::UpDownCounter<T>;
158
159pub type UpDownCounterI64 = UpDownCounter<i64>;
163
164pub type UpDownCounterF64 = UpDownCounter<f64>;
168
169impl_collector!(
170 UpDownCounterI64,
171 i64,
172 i64_up_down_counter,
173 opentelemetry::metrics::InstrumentBuilder<'a, UpDownCounterI64>
174);
175impl_collector!(
176 UpDownCounterF64,
177 f64,
178 f64_up_down_counter,
179 opentelemetry::metrics::InstrumentBuilder<'a, UpDownCounterF64>
180);
181
182trait Number {
185 const ONE: Self;
186}
187
188impl Number for f64 {
189 const ONE: Self = 1.0;
190}
191
192impl Number for u64 {
193 const ONE: Self = 1;
194}
195
196impl Number for i64 {
197 const ONE: Self = 1;
198}
199
200#[must_use = "Collectors do nothing by themselves, you must call them"]
204pub struct Collector<'a, T: IsCollector> {
205 attributes: Vec<KeyValue>,
206 collector: &'a T,
207}
208
209impl<'a, T: IsCollector> Collector<'a, T> {
210 pub fn new(attributes: Vec<KeyValue>, collector: &'a T) -> Self {
215 Self { attributes, collector }
216 }
217
218 pub fn inner(&self) -> &'a T {
220 self.collector
221 }
222}
223
224macro_rules! impl_counter {
225 ($t:ty) => {
226 impl<'a> Collector<'a, opentelemetry::metrics::Counter<$t>> {
227 #[inline]
229 pub fn incr(&self) {
230 self.incr_by(<$t as Number>::ONE);
231 }
232
233 pub fn incr_by(&self, value: $t) {
235 self.collector.add(value, &self.attributes);
236 }
237 }
238 };
239}
240
241impl_counter!(u64);
242impl_counter!(f64);
243
244macro_rules! impl_gauge {
245 ($t:ty) => {
246 impl<'a> Collector<'a, opentelemetry::metrics::Gauge<$t>> {
247 pub fn record(&self, value: $t) {
249 self.collector.record(value, &self.attributes);
250 }
251 }
252 };
253}
254
255impl_gauge!(u64);
256impl_gauge!(f64);
257impl_gauge!(i64);
258
259macro_rules! impl_histogram {
260 ($t:ty) => {
261 impl<'a> Collector<'a, opentelemetry::metrics::Histogram<$t>> {
262 pub fn observe(&self, value: $t) {
264 self.collector.record(value, &self.attributes);
265 }
266 }
267 };
268}
269
270impl_histogram!(u64);
271impl_histogram!(f64);
272
273macro_rules! impl_updowncounter {
274 ($t:ty) => {
275 impl<'a> Collector<'a, opentelemetry::metrics::UpDownCounter<$t>> {
276 pub fn incr(&self) {
278 self.incr_by(<$t as Number>::ONE);
279 }
280
281 pub fn incr_by(&self, value: $t) {
283 self.collector.add(value, &self.attributes);
284 }
285
286 pub fn decr(&self) {
288 self.decr_by(<$t as Number>::ONE);
289 }
290
291 pub fn decr_by(&self, value: $t) {
293 self.collector.add(-value, &self.attributes);
294 }
295 }
296 };
297}
298
299impl_updowncounter!(i64);
300impl_updowncounter!(f64);
301
302#[cfg(test)]
303#[cfg_attr(all(test, coverage_nightly), coverage(off))]
304mod tests {
305 use std::sync::Arc;
306
307 use opentelemetry::{KeyValue, Value};
308 use opentelemetry_sdk::Resource;
309 use opentelemetry_sdk::metrics::data::{Histogram, ResourceMetrics, Sum};
310 use opentelemetry_sdk::metrics::reader::MetricReader;
311 use opentelemetry_sdk::metrics::{ManualReader, ManualReaderBuilder, SdkMeterProvider};
312
313 use crate::HistogramF64;
314 use crate::collector::{Collector, IsCollector};
315
316 #[derive(Debug, Clone)]
317 struct TestReader(Arc<ManualReader>);
318
319 impl TestReader {
320 fn new() -> Self {
321 Self(Arc::new(ManualReaderBuilder::new().build()))
322 }
323
324 fn read(&self) -> ResourceMetrics {
325 let mut metrics = ResourceMetrics {
326 resource: Resource::builder_empty().build(),
327 scope_metrics: vec![],
328 };
329
330 self.0.collect(&mut metrics).expect("collect");
331
332 metrics
333 }
334 }
335
336 impl opentelemetry_sdk::metrics::reader::MetricReader for TestReader {
337 fn register_pipeline(&self, pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>) {
338 self.0.register_pipeline(pipeline)
339 }
340
341 fn collect(
342 &self,
343 rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
344 ) -> opentelemetry_sdk::metrics::MetricResult<()> {
345 self.0.collect(rm)
346 }
347
348 fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
349 self.0.force_flush()
350 }
351
352 fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult {
353 self.0.shutdown()
354 }
355
356 fn temporality(&self, kind: opentelemetry_sdk::metrics::InstrumentKind) -> opentelemetry_sdk::metrics::Temporality {
357 self.0.temporality(kind)
358 }
359 }
360
361 fn setup_reader() -> TestReader {
362 let reader = TestReader::new();
363 let provider = SdkMeterProvider::builder()
364 .with_resource(
365 Resource::builder()
366 .with_attribute(KeyValue::new("service.name", "test_service"))
367 .build(),
368 )
369 .with_reader(reader.clone())
370 .build();
371 opentelemetry::global::set_meter_provider(provider);
372 reader
373 }
374
375 fn find_metric<'a>(metrics: &'a ResourceMetrics, name: &str) -> Option<&'a opentelemetry_sdk::metrics::data::Metric> {
376 metrics
377 .scope_metrics
378 .iter()
379 .find(|sm| sm.scope.name() == "scuffle-metrics")
380 .and_then(|sm| sm.metrics.iter().find(|m| m.name == name))
381 }
382
383 fn get_data_point_value<T: PartialEq + std::fmt::Debug + Clone>(
384 data_points: &[opentelemetry_sdk::metrics::data::SumDataPoint<T>],
385 attr_key: &str,
386 attr_value: &str,
387 ) -> T {
388 data_points
389 .iter()
390 .find(|dp| {
391 dp.attributes
392 .iter()
393 .any(|kv| kv.key.as_str() == attr_key && kv.value.as_str() == attr_value)
394 })
395 .map(|dp| dp.value.clone())
396 .expect("Data point not found")
397 }
398
399 fn get_histogram_sum(
400 data_points: &[opentelemetry_sdk::metrics::data::HistogramDataPoint<u64>],
401 attr_key: &str,
402 attr_value: &str,
403 ) -> u64 {
404 data_points
405 .iter()
406 .find(|dp| {
407 dp.attributes
408 .iter()
409 .any(|kv| kv.key.as_str() == attr_key && kv.value.as_str() == attr_value)
410 })
411 .map(|dp| dp.sum)
412 .expect("Histogram data point not found")
413 }
414
415 fn get_data_point_value_with_two_attrs<T: PartialEq + std::fmt::Debug + Clone>(
416 data_points: &[opentelemetry_sdk::metrics::data::SumDataPoint<T>],
417 key1: &str,
418 val1: &str,
419 key2: &str,
420 val2: impl Into<Value> + Clone,
421 ) -> T {
422 data_points
423 .iter()
424 .find(|dp| {
425 dp.attributes
426 .iter()
427 .any(|kv| kv.key.as_str() == key1 && kv.value.as_str() == val1)
428 && dp
429 .attributes
430 .iter()
431 .any(|kv| kv.key.as_str() == key2 && kv.value == val2.clone().into())
432 })
433 .map(|dp| dp.value.clone())
434 .expect("Data point not found")
435 }
436
437 #[test]
438 fn test_counter_metric() {
439 #[crate::metrics(crate_path = "crate")]
440 mod example {
441 use crate::{CounterU64, MetricEnum};
442
443 #[derive(MetricEnum)]
444 #[metrics(crate_path = "crate")]
445 pub enum Kind {
446 Http,
447 Grpc,
448 }
449
450 #[metrics(unit = "requests")]
451 pub fn request(kind: Kind) -> CounterU64;
452 }
453
454 let reader = setup_reader();
455 example::request(example::Kind::Http).incr();
456 example::request(example::Kind::Http).incr();
457 example::request(example::Kind::Grpc).incr();
458
459 let metrics = reader.read();
460 let metric = find_metric(&metrics, "example_request").unwrap();
461 assert_eq!(metric.unit, "requests");
462
463 let sum: &Sum<u64> = metric.data.as_any().downcast_ref().unwrap();
464 assert_eq!(sum.data_points.len(), 2);
465 assert_eq!(get_data_point_value(&sum.data_points, "kind", "Http"), 2);
466 assert_eq!(get_data_point_value(&sum.data_points, "kind", "Grpc"), 1);
467 }
468
469 #[test]
470 fn test_gauge_metric() {
471 #[crate::metrics(crate_path = "crate")]
472 mod example {
473 use crate::GaugeU64;
474
475 #[metrics(unit = "connections")]
476 pub fn current_connections() -> GaugeU64;
477 }
478
479 let reader = setup_reader();
480 example::current_connections().record(10);
481 example::current_connections().record(20);
482
483 let metrics = reader.read();
484 let metric = find_metric(&metrics, "example_current_connections").unwrap();
485 assert_eq!(metric.unit, "connections");
486
487 let gauge = metric
488 .data
489 .as_any()
490 .downcast_ref::<opentelemetry_sdk::metrics::data::Gauge<u64>>()
491 .unwrap();
492 assert_eq!(gauge.data_points.len(), 1);
493 assert_eq!(gauge.data_points[0].value, 20);
494 assert_eq!(gauge.data_points[0].attributes.len(), 0);
495 }
496
497 #[test]
498 fn test_histogram_metric() {
499 #[crate::metrics(crate_path = "crate")]
500 mod example {
501 use crate::{HistogramU64, MetricEnum};
502
503 #[derive(MetricEnum)]
504 #[metrics(crate_path = "crate")]
505 pub enum Kind {
506 Http,
507 Grpc,
508 }
509
510 #[metrics(unit = "bytes")]
511 pub fn data_transfer(kind: Kind) -> HistogramU64;
512 }
513
514 let reader = setup_reader();
515 example::data_transfer(example::Kind::Http).observe(100);
516 example::data_transfer(example::Kind::Http).observe(200);
517 example::data_transfer(example::Kind::Grpc).observe(150);
518
519 let metrics = reader.read();
520 let metric = find_metric(&metrics, "example_data_transfer").unwrap();
521 assert_eq!(metric.unit, "bytes");
522
523 let histogram = metric
524 .data
525 .as_any()
526 .downcast_ref::<opentelemetry_sdk::metrics::data::Histogram<u64>>()
527 .unwrap();
528 assert_eq!(histogram.data_points.len(), 2);
529 assert_eq!(get_histogram_sum(&histogram.data_points, "kind", "Http"), 300);
530 assert_eq!(get_histogram_sum(&histogram.data_points, "kind", "Grpc"), 150);
531 }
532
533 #[test]
534 fn test_updowncounter_metric() {
535 #[crate::metrics(crate_path = "crate")]
536 mod example {
537 use crate::{MetricEnum, UpDownCounterI64};
538
539 #[derive(MetricEnum)]
540 #[metrics(crate_path = "crate")]
541 pub enum Kind {
542 Http,
543 Grpc,
544 }
545
546 #[metrics(unit = "requests")]
547 pub fn active_requests(kind: Kind) -> UpDownCounterI64;
548 }
549
550 let reader = setup_reader();
551 example::active_requests(example::Kind::Http).incr();
552 example::active_requests(example::Kind::Http).incr();
553 example::active_requests(example::Kind::Http).decr();
554 example::active_requests(example::Kind::Grpc).incr();
555
556 let metrics = reader.read();
557 let metric = find_metric(&metrics, "example_active_requests").unwrap();
558 assert_eq!(metric.unit, "requests");
559
560 let sum: &Sum<i64> = metric.data.as_any().downcast_ref().unwrap();
561 assert_eq!(sum.data_points.len(), 2);
562 assert_eq!(get_data_point_value(&sum.data_points, "kind", "Http"), 1);
563 assert_eq!(get_data_point_value(&sum.data_points, "kind", "Grpc"), 1);
564 }
565
566 #[test]
567 fn test_metric_with_multiple_attributes() {
568 #[crate::metrics(crate_path = "crate")]
569 mod example {
570 use crate::{CounterU64, MetricEnum};
571
572 #[derive(MetricEnum)]
573 #[metrics(crate_path = "crate")]
574 pub enum Kind {
575 Http,
576 Grpc,
577 }
578
579 #[metrics(unit = "requests")]
580 pub fn request_with_status(kind: Kind, status: u32) -> CounterU64;
581 }
582
583 let reader = setup_reader();
584 example::request_with_status(example::Kind::Http, 200).incr();
585 example::request_with_status(example::Kind::Http, 404).incr();
586 example::request_with_status(example::Kind::Grpc, 200).incr();
587
588 let metrics = reader.read();
589 let metric = find_metric(&metrics, "example_request_with_status").unwrap();
590 assert_eq!(metric.unit, "requests");
591
592 let sum: &Sum<u64> = metric.data.as_any().downcast_ref().unwrap();
593 assert_eq!(sum.data_points.len(), 3);
594 assert_eq!(
595 get_data_point_value_with_two_attrs(&sum.data_points, "kind", "Http", "status", 200),
596 1
597 );
598 assert_eq!(
599 get_data_point_value_with_two_attrs(&sum.data_points, "kind", "Http", "status", 404),
600 1
601 );
602 assert_eq!(
603 get_data_point_value_with_two_attrs(&sum.data_points, "kind", "Grpc", "status", 200),
604 1
605 );
606 }
607
608 #[test]
609 fn test_metric_with_string_attribute() {
610 #[crate::metrics(crate_path = "crate")]
611 mod example {
612 use crate::{CounterU64, MetricEnum};
613
614 #[derive(MetricEnum)]
615 #[metrics(crate_path = "crate")]
616 pub enum Kind {
617 Http,
618 Grpc,
619 }
620
621 #[metrics(unit = "requests")]
622 pub fn request_with_method(kind: Kind, method: &str) -> CounterU64;
623 }
624
625 let reader = setup_reader();
626 example::request_with_method(example::Kind::Http, "GET").incr();
627 example::request_with_method(example::Kind::Http, "POST").incr();
628 example::request_with_method(example::Kind::Grpc, "GET").incr();
629
630 let metrics = reader.read();
631 let metric = find_metric(&metrics, "example_request_with_method").unwrap();
632 assert_eq!(metric.unit, "requests");
633
634 let sum: &Sum<u64> = metric.data.as_any().downcast_ref().unwrap();
635 assert_eq!(sum.data_points.len(), 3);
636 assert_eq!(
637 get_data_point_value_with_two_attrs(&sum.data_points, "kind", "Http", "method", "GET"),
638 1
639 );
640 assert_eq!(
641 get_data_point_value_with_two_attrs(&sum.data_points, "kind", "Http", "method", "POST"),
642 1
643 );
644 assert_eq!(
645 get_data_point_value_with_two_attrs(&sum.data_points, "kind", "Grpc", "method", "GET"),
646 1
647 );
648 }
649
650 #[test]
651 fn test_metric_with_no_attributes() {
652 #[crate::metrics(crate_path = "crate")]
653 mod example {
654 use crate::CounterU64;
655
656 #[metrics(unit = "events")]
657 pub fn total_events() -> CounterU64;
658 }
659
660 let reader = setup_reader();
661 example::total_events().incr();
662 example::total_events().incr();
663
664 let metrics = reader.read();
665 let metric = find_metric(&metrics, "example_total_events").unwrap();
666 assert_eq!(metric.unit, "events");
667
668 let sum: &Sum<u64> = metric.data.as_any().downcast_ref().unwrap();
669 assert_eq!(sum.data_points.len(), 1);
670 assert_eq!(sum.data_points[0].value, 2);
671 assert_eq!(sum.data_points[0].attributes.len(), 0);
672 }
673
674 #[test]
675 fn test_metric_with_zero_values() {
676 #[crate::metrics(crate_path = "crate")]
677 mod example {
678 use crate::GaugeU64;
679
680 #[metrics(unit = "connections")]
681 pub fn current_connections() -> GaugeU64;
682 }
683
684 let reader = setup_reader();
685 example::current_connections().record(0);
686
687 let metrics = reader.read();
688 let metric = find_metric(&metrics, "example_current_connections").unwrap();
689 assert_eq!(metric.unit, "connections");
690
691 let gauge = metric
692 .data
693 .as_any()
694 .downcast_ref::<opentelemetry_sdk::metrics::data::Gauge<u64>>()
695 .unwrap();
696 assert_eq!(gauge.data_points.len(), 1);
697 assert_eq!(gauge.data_points[0].value, 0);
698 assert_eq!(gauge.data_points[0].attributes.len(), 0);
699 }
700
701 #[test]
702 fn test_metric_with_negative_increments() {
703 #[crate::metrics(crate_path = "crate")]
704 mod example {
705 use crate::{MetricEnum, UpDownCounterI64};
706
707 #[derive(MetricEnum)]
708 #[metrics(crate_path = "crate")]
709 pub enum Kind {
710 Http,
711 Grpc,
712 }
713
714 #[metrics(unit = "requests")]
715 pub fn active_requests(kind: Kind) -> UpDownCounterI64;
716 }
717
718 let reader = setup_reader();
719 example::active_requests(example::Kind::Http).incr();
720 example::active_requests(example::Kind::Http).decr();
721 example::active_requests(example::Kind::Http).decr();
722
723 let metrics = reader.read();
724 let metric = find_metric(&metrics, "example_active_requests").unwrap();
725 assert_eq!(metric.unit, "requests");
726
727 let sum: &Sum<i64> = metric.data.as_any().downcast_ref().unwrap();
728 assert_eq!(sum.data_points.len(), 1);
729 assert_eq!(get_data_point_value(&sum.data_points, "kind", "Http"), -1);
730 }
731
732 #[test]
733 fn test_histogram_f64_builder() {
734 let reader = setup_reader();
735 let meter = opentelemetry::global::meter("scuffle-metrics");
736 let name = "test_histogram_f64";
737
738 let builder = HistogramF64::builder(&meter, name);
739 let histogram = builder.build();
740
741 histogram.record(1.5, &[]);
742
743 let metrics = reader.read();
744 let metric = find_metric(&metrics, name).expect("histogram metric not found");
745
746 assert_eq!(metric.name, name);
747 assert_eq!(metric.unit, "");
748
749 let histogram_data = metric
750 .data
751 .as_any()
752 .downcast_ref::<Histogram<f64>>()
753 .expect("expected histogram data");
754
755 assert_eq!(histogram_data.data_points.len(), 1);
756 assert_eq!(histogram_data.data_points[0].sum, 1.5);
757 assert_eq!(histogram_data.data_points[0].attributes.len(), 0);
758 }
759
760 #[test]
761 fn test_collector_inner() {
762 let meter = opentelemetry::global::meter("test_meter");
763 let histogram = HistogramF64::builder(&meter, "inner_test_histogram").build();
764
765 let attributes = vec![KeyValue::new("key", "value")];
766 let collector = Collector::new(attributes.clone(), &histogram);
767
768 assert_eq!(collector.inner() as *const HistogramF64, &histogram as *const HistogramF64);
769 }
770}