1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use chrono::Local;
5use futures_util::future::join_all;
6use http::Uri;
7use regex::Regex;
8use tokio::sync::{mpsc, oneshot};
9use vector_lib::api_client::{Client, RECONNECT_DELAY_MS};
10
11use vector_lib::top::{
12 dashboard::{init_dashboard, is_tty},
13 metrics,
14 state::{self, ConnectionStatus, EventType, State},
15};
16
17#[allow(clippy::print_stderr)]
20pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
21 if !is_tty() {
23 eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
24 return exitcode::IOERR;
25 }
26
27 let url = opts.url();
28
29 let Ok(uri) = url.as_str().parse::<Uri>() else {
31 eprintln!("Invalid API URL: {url}");
32 return exitcode::USAGE;
33 };
34 let mut client = Client::new(uri.clone());
35
36 if client.connect().await.is_err() || client.health().await.is_err() {
37 eprintln!(
38 indoc::indoc! {"
39 Vector API server isn't reachable ({}).
40
41 Have you enabled the API?
42
43 To enable the API, add the following to your Vector config file:
44
45 [api]
46 enabled = true"},
47 url
48 );
49 return exitcode::UNAVAILABLE;
50 }
51
52 top(opts, uri, "Vector").await
53}
54
55pub async fn top(opts: &super::Opts, uri: Uri, dashboard_title: &str) -> exitcode::ExitCode {
57 let (tx, rx) = tokio::sync::mpsc::channel(20);
59 let mut starting_state = State::new(BTreeMap::new());
60 starting_state.sort_state.column = opts.sort_field;
61 starting_state.sort_state.reverse = opts.sort_desc;
62 starting_state.filter_state.column = opts.filter_field;
63 starting_state.filter_state.pattern = opts
64 .filter_value
65 .as_deref()
66 .map(Regex::new)
67 .and_then(Result::ok);
68 let state_rx = state::updater(rx, starting_state).await;
69 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
71
72 let connection = tokio::spawn(subscription(opts.clone(), uri, tx.clone(), shutdown_tx));
73
74 match init_dashboard(
76 dashboard_title,
77 opts.url().as_str(),
78 opts.interval,
79 opts.human_metrics,
80 tx,
81 state_rx,
82 shutdown_rx,
83 )
84 .await
85 {
86 Ok(_) => {
87 connection.abort();
88 exitcode::OK
89 }
90 Err(err) => {
91 #[allow(clippy::print_stderr)]
92 {
93 eprintln!("[top] Encountered shutdown error: {err}");
94 }
95 connection.abort();
96 exitcode::IOERR
97 }
98 }
99}
100
101async fn subscription(
104 opts: super::Opts,
105 uri: Uri,
106 tx: mpsc::Sender<EventType>,
107 shutdown_tx: oneshot::Sender<()>,
108) {
109 loop {
110 let state = match metrics::init_components(uri.clone(), &opts.components).await {
114 Ok(state) => state,
115 Err(_) => {
116 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
117 continue;
118 }
119 };
120 let initial_components = state
121 .components
122 .keys()
123 .map(|k| k.id().to_string())
124 .collect();
125 _ = tx.send(EventType::InitializeState(state)).await;
126
127 let handles = match metrics::subscribe(
129 uri.clone(),
130 tx.clone(),
131 opts.interval as i64,
132 opts.components.clone(),
133 initial_components,
134 )
135 .await
136 {
137 Ok(handles) => handles,
138 Err(_) => {
139 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
140 continue;
141 }
142 };
143
144 _ = tx
145 .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
146 Local::now(),
147 )))
148 .await;
149
150 _ = join_all(handles.metric_handles).await;
154 handles.poll_handle.abort();
155
156 _ = tx
157 .send(EventType::ConnectionUpdated(
158 ConnectionStatus::Disconnected(RECONNECT_DELAY_MS),
159 ))
160 .await;
161
162 if opts.no_reconnect {
163 _ = shutdown_tx.send(());
164 break;
165 }
166 }
167}