vector/internal_telemetry/allocations/
mod.rs

1//! Allocation tracking exposed via internal telemetry.
2
3mod allocator;
4use std::{
5    sync::{
6        Mutex,
7        atomic::{AtomicBool, AtomicU64, Ordering},
8    },
9    thread,
10    time::Duration,
11};
12
13use arr_macro::arr;
14use rand_distr::num_traits::ToPrimitive;
15use vector_common::{
16    counter, gauge,
17    internal_event::{CounterName, GaugeName},
18};
19
20use self::allocator::Tracer;
21pub(crate) use self::allocator::{
22    AllocationGroupId, AllocationLayer, GroupedTraceableAllocator, without_allocation_tracing,
23};
24
25const NUM_GROUPS: usize = 256;
26
27// Allocations are not tracked during startup.
28// We use the Relaxed ordering for both stores and loads of this atomic as no other threads exist when
29// this code is running, and all future threads will have a happens-after relationship with
30// this thread -- the main thread -- ensuring that they see the latest value of TRACK_ALLOCATIONS.
31pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);
32
33pub fn is_allocation_tracing_enabled() -> bool {
34    TRACK_ALLOCATIONS.load(Ordering::Acquire)
35}
36
37/// Track allocations and deallocations separately.
38struct GroupMemStatsStorage {
39    allocations: [AtomicU64; NUM_GROUPS],
40    deallocations: [AtomicU64; NUM_GROUPS],
41}
42
43// Reporting interval in milliseconds.
44pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);
45
46/// A registry for tracking each thread's group memory statistics.
47static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());
48
49/// Group memory statistics per thread.
50struct GroupMemStats {
51    stats: &'static GroupMemStatsStorage,
52}
53
54impl GroupMemStats {
55    /// Allocates a [`GroupMemStatsStorage`], and updates the global [`THREAD_LOCAL_REFS`] registry
56    /// with a reference to this newly allocated memory.
57    pub fn new() -> Self {
58        let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
59        let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
60            allocations: arr![AtomicU64::new(0) ; 256],
61            deallocations: arr![AtomicU64::new(0) ; 256],
62        }));
63        let group_mem_stats = GroupMemStats { stats: stats_ref };
64        mutex.push(stats_ref);
65        group_mem_stats
66    }
67}
68
69thread_local! {
70    static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
71}
72
73struct GroupInfo {
74    component_kind: String,
75    component_type: String,
76    component_id: String,
77}
78
79impl GroupInfo {
80    const fn new() -> Self {
81        Self {
82            component_id: String::new(),
83            component_kind: String::new(),
84            component_type: String::new(),
85        }
86    }
87}
88
89static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 256];
90
91pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;
92
93pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
94    GroupedTraceableAllocator::new(allocator, MainTracer)
95}
96
97pub struct MainTracer;
98
99impl Tracer for MainTracer {
100    #[inline(always)]
101    fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
102        // Handle the case when thread local destructor is ran.
103        _ = GROUP_MEM_STATS.try_with(|t| {
104            t.stats.allocations[group_id.as_raw() as usize]
105                .fetch_add(object_size as u64, Ordering::Relaxed)
106        });
107    }
108
109    #[inline(always)]
110    fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
111        // Handle the case when thread local destructor is ran.
112        _ = GROUP_MEM_STATS.try_with(|t| {
113            t.stats.deallocations[source_group_id.as_raw() as usize]
114                .fetch_add(object_size as u64, Ordering::Relaxed)
115        });
116    }
117}
118
119/// Initializes allocation tracing.
120pub fn init_allocation_tracing() {
121    for group in &GROUP_INFO {
122        let mut writer = group.lock().unwrap();
123        *writer = GroupInfo {
124            component_id: "root".to_string(),
125            component_kind: "root".to_string(),
126            component_type: "root".to_string(),
127        };
128    }
129    let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
130    alloc_processor
131        .spawn(|| {
132            without_allocation_tracing(|| loop {
133                for (group_idx, group) in GROUP_INFO.iter().enumerate() {
134                    let mut allocations_diff = 0;
135                    let mut deallocations_diff = 0;
136                    let mutex = THREAD_LOCAL_REFS.lock().unwrap();
137                    for idx in 0..mutex.len() {
138                        allocations_diff +=
139                            mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
140                        deallocations_diff +=
141                            mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
142                    }
143                    if allocations_diff == 0 && deallocations_diff == 0 {
144                        continue;
145                    }
146                    let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
147                    let group_info = group.lock().unwrap();
148                    if allocations_diff > 0 {
149                        counter!(
150                            CounterName::ComponentAllocatedBytesTotal, "component_kind" => group_info.component_kind.clone(),
151                            "component_type" => group_info.component_type.clone(),
152                            "component_id" => group_info.component_id.clone()).increment(allocations_diff);
153                    }
154                    if deallocations_diff > 0 {
155                        counter!(
156                            CounterName::ComponentDeallocatedBytesTotal, "component_kind" => group_info.component_kind.clone(),
157                            "component_type" => group_info.component_type.clone(),
158                            "component_id" => group_info.component_id.clone()).increment(deallocations_diff);
159                    }
160                    if mem_used_diff > 0 {
161                        gauge!(
162                            GaugeName::ComponentAllocatedBytes, "component_type" => group_info.component_type.clone(),
163                            "component_id" => group_info.component_id.clone(),
164                            "component_kind" => group_info.component_kind.clone())
165                            .increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
166                    }
167                    if mem_used_diff < 0 {
168                        gauge!(
169                            GaugeName::ComponentAllocatedBytes, "component_type" => group_info.component_type.clone(),
170                            "component_id" => group_info.component_id.clone(),
171                            "component_kind" => group_info.component_kind.clone())
172                            .decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
173                    }
174                }
175                thread::sleep(Duration::from_millis(
176                    REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
177                ));
178            })
179        })
180        .unwrap();
181}
182
183/// Acquires an allocation group ID.
184///
185/// This creates an allocation group which allows callers to enter/exit the allocation group context, associating all
186/// (de)allocations within the context with that group. An allocation group ID must be "attached" to
187/// a [`tracing::Span`] to achieve this" we utilize the logical invariants provided by spans --
188/// entering, exiting, and how spans exist as a stack -- in order to handle keeping the "current
189/// allocation group" accurate across all threads.
190pub fn acquire_allocation_group_id(
191    component_id: String,
192    component_type: String,
193    component_kind: String,
194) -> AllocationGroupId {
195    if let Some(group_id) = AllocationGroupId::register()
196        && let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize)
197    {
198        let mut writer = group_lock.lock().unwrap();
199        *writer = GroupInfo {
200            component_id,
201            component_kind,
202            component_type,
203        };
204
205        return group_id;
206    }
207
208    warn!(
209        "Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.",
210        NUM_GROUPS, component_id
211    );
212    AllocationGroupId::ROOT
213}