vector/top/
cmd.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use chrono::Local;
5use futures_util::future::join_all;
6use http::Uri;
7use regex::Regex;
8use tokio::sync::{mpsc, oneshot};
9use vector_lib::api_client::{Client, RECONNECT_DELAY_MS};
10
11use vector_lib::top::{
12    dashboard::{init_dashboard, is_tty},
13    metrics,
14    state::{self, ConnectionStatus, EventType, State},
15};
16
17/// CLI command func for displaying Vector components, and communicating with a local/remote
18/// Vector API server via gRPC
19#[allow(clippy::print_stderr)]
20pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
21    // Exit early if the terminal is not a teletype
22    if !is_tty() {
23        eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
24        return exitcode::IOERR;
25    }
26
27    let url = opts.url();
28
29    // Create a new API client for connecting to the local/remote Vector instance.
30    let Ok(uri) = url.as_str().parse::<Uri>() else {
31        eprintln!("Invalid API URL: {url}");
32        return exitcode::USAGE;
33    };
34    let mut client = Client::new(uri.clone());
35
36    if client.connect().await.is_err() || client.health().await.is_err() {
37        eprintln!(
38            indoc::indoc! {"
39            Vector API server isn't reachable ({}).
40
41            Have you enabled the API?
42
43            To enable the API, add the following to your Vector config file:
44
45            [api]
46                enabled = true"},
47            url
48        );
49        return exitcode::UNAVAILABLE;
50    }
51
52    top(opts, uri, "Vector").await
53}
54
55/// General monitoring
56pub async fn top(opts: &super::Opts, uri: Uri, dashboard_title: &str) -> exitcode::ExitCode {
57    // Channel for updating state via event messages
58    let (tx, rx) = tokio::sync::mpsc::channel(20);
59    let mut starting_state = State::new(BTreeMap::new());
60    starting_state.sort_state.column = opts.sort_field;
61    starting_state.sort_state.reverse = opts.sort_desc;
62    starting_state.filter_state.column = opts.filter_field;
63    starting_state.filter_state.pattern = opts
64        .filter_value
65        .as_deref()
66        .map(Regex::new)
67        .and_then(Result::ok);
68    let state_rx = state::updater(rx, starting_state).await;
69    // Channel for shutdown signal
70    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
71
72    let connection = tokio::spawn(subscription(opts.clone(), uri, tx.clone(), shutdown_tx));
73
74    // Initialize the dashboard
75    match init_dashboard(
76        dashboard_title,
77        opts.url().as_str(),
78        opts.interval,
79        opts.human_metrics,
80        tx,
81        state_rx,
82        shutdown_rx,
83    )
84    .await
85    {
86        Ok(_) => {
87            connection.abort();
88            exitcode::OK
89        }
90        Err(err) => {
91            #[allow(clippy::print_stderr)]
92            {
93                eprintln!("[top] Encountered shutdown error: {err}");
94            }
95            connection.abort();
96            exitcode::IOERR
97        }
98    }
99}
100
101// This task handles reconnecting the gRPC client and all
102// subscriptions in the case of a connection failure
103async fn subscription(
104    opts: super::Opts,
105    uri: Uri,
106    tx: mpsc::Sender<EventType>,
107    shutdown_tx: oneshot::Sender<()>,
108) {
109    loop {
110        // Initialize state. On future reconnects, we re-initialize state in
111        // order to accurately capture added, removed, and edited
112        // components.
113        let state = match metrics::init_components(uri.clone(), &opts.components).await {
114            Ok(state) => state,
115            Err(_) => {
116                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
117                continue;
118            }
119        };
120        let initial_components = state
121            .components
122            .keys()
123            .map(|k| k.id().to_string())
124            .collect();
125        _ = tx.send(EventType::InitializeState(state)).await;
126
127        // Subscribe to updated metrics via gRPC streaming
128        let handles = match metrics::subscribe(
129            uri.clone(),
130            tx.clone(),
131            opts.interval as i64,
132            opts.components.clone(),
133            initial_components,
134        )
135        .await
136        {
137            Ok(handles) => handles,
138            Err(_) => {
139                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
140                continue;
141            }
142        };
143
144        _ = tx
145            .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
146                Local::now(),
147            )))
148            .await;
149
150        // Wait for metric stream tasks to finish. poll_components is intentionally
151        // excluded: it runs indefinitely while get_components succeeds, so joining
152        // it here would prevent reconnection when metric streams fail first.
153        _ = join_all(handles.metric_handles).await;
154        handles.poll_handle.abort();
155
156        _ = tx
157            .send(EventType::ConnectionUpdated(
158                ConnectionStatus::Disconnected(RECONNECT_DELAY_MS),
159            ))
160            .await;
161
162        if opts.no_reconnect {
163            _ = shutdown_tx.send(());
164            break;
165        }
166    }
167}