vector/internal_telemetry/allocations/
mod.rs1mod allocator;
4use std::{
5 sync::{
6 Mutex,
7 atomic::{AtomicBool, AtomicU64, Ordering},
8 },
9 thread,
10 time::Duration,
11};
12
13use arr_macro::arr;
14use rand_distr::num_traits::ToPrimitive;
15use vector_common::{
16 counter, gauge,
17 internal_event::{CounterName, GaugeName},
18};
19
20use self::allocator::Tracer;
21pub(crate) use self::allocator::{
22 AllocationGroupId, AllocationLayer, GroupedTraceableAllocator, without_allocation_tracing,
23};
24
25const NUM_GROUPS: usize = 256;
26
27pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);
32
33pub fn is_allocation_tracing_enabled() -> bool {
34 TRACK_ALLOCATIONS.load(Ordering::Acquire)
35}
36
37struct GroupMemStatsStorage {
39 allocations: [AtomicU64; NUM_GROUPS],
40 deallocations: [AtomicU64; NUM_GROUPS],
41}
42
43pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);
45
46static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());
48
49struct GroupMemStats {
51 stats: &'static GroupMemStatsStorage,
52}
53
54impl GroupMemStats {
55 pub fn new() -> Self {
58 let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
59 let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
60 allocations: arr![AtomicU64::new(0) ; 256],
61 deallocations: arr![AtomicU64::new(0) ; 256],
62 }));
63 let group_mem_stats = GroupMemStats { stats: stats_ref };
64 mutex.push(stats_ref);
65 group_mem_stats
66 }
67}
68
69thread_local! {
70 static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
71}
72
73struct GroupInfo {
74 component_kind: String,
75 component_type: String,
76 component_id: String,
77}
78
79impl GroupInfo {
80 const fn new() -> Self {
81 Self {
82 component_id: String::new(),
83 component_kind: String::new(),
84 component_type: String::new(),
85 }
86 }
87}
88
89static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 256];
90
91pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;
92
93pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
94 GroupedTraceableAllocator::new(allocator, MainTracer)
95}
96
97pub struct MainTracer;
98
99impl Tracer for MainTracer {
100 #[inline(always)]
101 fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
102 _ = GROUP_MEM_STATS.try_with(|t| {
104 t.stats.allocations[group_id.as_raw() as usize]
105 .fetch_add(object_size as u64, Ordering::Relaxed)
106 });
107 }
108
109 #[inline(always)]
110 fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
111 _ = GROUP_MEM_STATS.try_with(|t| {
113 t.stats.deallocations[source_group_id.as_raw() as usize]
114 .fetch_add(object_size as u64, Ordering::Relaxed)
115 });
116 }
117}
118
119pub fn init_allocation_tracing() {
121 for group in &GROUP_INFO {
122 let mut writer = group.lock().unwrap();
123 *writer = GroupInfo {
124 component_id: "root".to_string(),
125 component_kind: "root".to_string(),
126 component_type: "root".to_string(),
127 };
128 }
129 let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
130 alloc_processor
131 .spawn(|| {
132 without_allocation_tracing(|| loop {
133 for (group_idx, group) in GROUP_INFO.iter().enumerate() {
134 let mut allocations_diff = 0;
135 let mut deallocations_diff = 0;
136 let mutex = THREAD_LOCAL_REFS.lock().unwrap();
137 for idx in 0..mutex.len() {
138 allocations_diff +=
139 mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
140 deallocations_diff +=
141 mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
142 }
143 if allocations_diff == 0 && deallocations_diff == 0 {
144 continue;
145 }
146 let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
147 let group_info = group.lock().unwrap();
148 if allocations_diff > 0 {
149 counter!(
150 CounterName::ComponentAllocatedBytesTotal, "component_kind" => group_info.component_kind.clone(),
151 "component_type" => group_info.component_type.clone(),
152 "component_id" => group_info.component_id.clone()).increment(allocations_diff);
153 }
154 if deallocations_diff > 0 {
155 counter!(
156 CounterName::ComponentDeallocatedBytesTotal, "component_kind" => group_info.component_kind.clone(),
157 "component_type" => group_info.component_type.clone(),
158 "component_id" => group_info.component_id.clone()).increment(deallocations_diff);
159 }
160 if mem_used_diff > 0 {
161 gauge!(
162 GaugeName::ComponentAllocatedBytes, "component_type" => group_info.component_type.clone(),
163 "component_id" => group_info.component_id.clone(),
164 "component_kind" => group_info.component_kind.clone())
165 .increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
166 }
167 if mem_used_diff < 0 {
168 gauge!(
169 GaugeName::ComponentAllocatedBytes, "component_type" => group_info.component_type.clone(),
170 "component_id" => group_info.component_id.clone(),
171 "component_kind" => group_info.component_kind.clone())
172 .decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
173 }
174 }
175 thread::sleep(Duration::from_millis(
176 REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
177 ));
178 })
179 })
180 .unwrap();
181}
182
183pub fn acquire_allocation_group_id(
191 component_id: String,
192 component_type: String,
193 component_kind: String,
194) -> AllocationGroupId {
195 if let Some(group_id) = AllocationGroupId::register()
196 && let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize)
197 {
198 let mut writer = group_lock.lock().unwrap();
199 *writer = GroupInfo {
200 component_id,
201 component_kind,
202 component_type,
203 };
204
205 return group_id;
206 }
207
208 warn!(
209 "Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.",
210 NUM_GROUPS, component_id
211 );
212 AllocationGroupId::ROOT
213}