vector_common/internal_event/
events_sent.rs1use std::sync::Arc;
2
3use metrics::Counter;
4
5use crate::counter;
6use tracing::trace;
7
8use super::{CountByteSize, CounterName, OptionalTag, Output, SharedString};
9use crate::config::ComponentKey;
10
11pub const DEFAULT_OUTPUT: &str = "_default";
12
13crate::registered_event!(
14 EventsSent {
15 output: Option<SharedString>,
16 } => {
17 events: Counter = if let Some(output) = &self.output {
18 counter!(CounterName::ComponentSentEventsTotal, "output" => output.clone())
19 } else {
20 counter!(CounterName::ComponentSentEventsTotal)
21 },
22 event_bytes: Counter = if let Some(output) = &self.output {
23 counter!(CounterName::ComponentSentEventBytesTotal, "output" => output.clone())
24 } else {
25 counter!(CounterName::ComponentSentEventBytesTotal)
26 },
27 output: Option<SharedString> = self.output,
28 }
29
30 fn emit(&self, data: CountByteSize) {
31 let CountByteSize(count, byte_size) = data;
32
33 match &self.output {
34 Some(output) => {
35 trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
36 }
37 None => {
38 trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
39 }
40 }
41
42 self.events.increment(count as u64);
43 self.event_bytes.increment(byte_size.get() as u64);
44 }
45);
46
47impl From<Output> for EventsSent {
48 fn from(output: Output) -> Self {
49 Self { output: output.0 }
50 }
51}
52
53fn make_tags(
55 source: &OptionalTag<Arc<ComponentKey>>,
56 service: &OptionalTag<String>,
57) -> Vec<(&'static str, String)> {
58 let mut tags = Vec::new();
59 if let OptionalTag::Specified(tag) = source {
60 tags.push((
61 "source",
62 tag.as_ref()
63 .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
64 ));
65 }
66
67 if let OptionalTag::Specified(tag) = service {
68 tags.push(("service", tag.clone().unwrap_or("-".to_string())));
69 }
70
71 tags
72}
73
74crate::registered_event!(
75 TaggedEventsSent {
76 source: OptionalTag<Arc<ComponentKey>>,
77 service: OptionalTag<String>,
78 } => {
79 events: Counter = {
80 counter!(CounterName::ComponentSentEventsTotal, &make_tags(&self.source, &self.service))
81 },
82 event_bytes: Counter = {
83 counter!(CounterName::ComponentSentEventBytesTotal, &make_tags(&self.source, &self.service))
84 },
85 }
86
87 fn emit(&self, data: CountByteSize) {
88 let CountByteSize(count, byte_size) = data;
89 trace!(message = "Events sent.", %count, %byte_size);
90
91 self.events.increment(count as u64);
92 self.event_bytes.increment(byte_size.get() as u64);
93 }
94
95 fn register(_fixed: (), tags: TaggedEventsSent) {
96 super::register(tags)
97 }
98);
99
100impl TaggedEventsSent {
101 #[must_use]
102 pub fn new_empty() -> Self {
103 Self {
104 source: OptionalTag::Specified(None),
105 service: OptionalTag::Specified(None),
106 }
107 }
108
109 #[must_use]
110 pub fn new_unspecified() -> Self {
111 Self {
112 source: OptionalTag::Ignored,
113 service: OptionalTag::Ignored,
114 }
115 }
116}