Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f84dd539bf | |||
| 6378734e1c | |||
| b3a617cce1 | |||
| 2080e5e8a9 | |||
| 45ed2bc9b8 | |||
| aa126fd89d | |||
| c612132763 | |||
| 19660f8cd0 | |||
| 89c43e28f5 |
@@ -86,17 +86,12 @@ jobs:
|
|||||||
build-macos-arm64:
|
build-macos-arm64:
|
||||||
needs: create-release
|
needs: create-release
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
container:
|
|
||||||
image: registry.metabarcoding.org/cibuilder/rustcrossosx:latest
|
|
||||||
credentials:
|
|
||||||
username: ${{ github.actor }}
|
|
||||||
password: ${{ secrets.REGISTRYTOKEN }}
|
|
||||||
defaults:
|
|
||||||
run:
|
|
||||||
working-directory: src
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Login to registry
|
||||||
|
run: echo "${{ secrets.REGISTRYTOKEN }}" | docker login registry.metabarcoding.org -u ${{ secrets.REGISTRYUSER }} --password-stdin
|
||||||
|
|
||||||
- name: Cache cargo registry
|
- name: Cache cargo registry
|
||||||
uses: actions/cache@v4
|
uses: actions/cache@v4
|
||||||
with:
|
with:
|
||||||
@@ -108,7 +103,12 @@ jobs:
|
|||||||
restore-keys: macos-arm64-cargo-
|
restore-keys: macos-arm64-cargo-
|
||||||
|
|
||||||
- name: Build macOS binary
|
- name: Build macOS binary
|
||||||
run: cargo build --release --target aarch64-apple-darwin --no-default-features
|
run: |
|
||||||
|
docker run --rm \
|
||||||
|
-v "${{ github.workspace }}:/src" \
|
||||||
|
-w /src/src \
|
||||||
|
registry.metabarcoding.org/cibuilder/rustcrossosx:latest \
|
||||||
|
cargo build --release --target aarch64-apple-darwin --no-default-features
|
||||||
|
|
||||||
- name: Prepare and upload artifact
|
- name: Prepare and upload artifact
|
||||||
env:
|
env:
|
||||||
@@ -116,7 +116,7 @@ jobs:
|
|||||||
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
|
RELEASE_ID: ${{ needs.create-release.outputs.release_id }}
|
||||||
run: |
|
run: |
|
||||||
mkdir -p /tmp/dist
|
mkdir -p /tmp/dist
|
||||||
cp target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
|
cp src/target/aarch64-apple-darwin/release/obikmer /tmp/dist/obikmer-macos-arm64
|
||||||
curl -s -X POST \
|
curl -s -X POST \
|
||||||
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
|
"${{ github.server_url }}/api/v1/repos/${{ github.repository }}/releases/$RELEASE_ID/assets" \
|
||||||
-H "Authorization: token $GITEA_TOKEN" \
|
-H "Authorization: token $GITEA_TOKEN" \
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ data-stress
|
|||||||
*.pb
|
*.pb
|
||||||
./**/*.json
|
./**/*.json
|
||||||
*.bin
|
*.bin
|
||||||
|
*.log
|
||||||
Betula_exilis--IGA-24-33
|
Betula_exilis--IGA-24-33
|
||||||
benchmark/genomes
|
benchmark/genomes
|
||||||
benchmark/simulated_data
|
benchmark/simulated_data
|
||||||
|
|||||||
@@ -162,14 +162,107 @@ A single `PartitionRunner` instance can be built once per command invocation
|
|||||||
and reused across multiple `run()` calls (e.g. `merge` runs
|
and reused across multiple `run()` calls (e.g. `merge` runs
|
||||||
`merge_partitions` then `pack_matrices`).
|
`merge_partitions` then `pack_matrices`).
|
||||||
|
|
||||||
|
## Known issue: CPU-only activation signal stalls on I/O-bound stages
|
||||||
|
|
||||||
|
Observed on a real `filter` run (109 genomes, 256 partitions, 8×24-core NUMA):
|
||||||
|
`rebuild` (CPU-bound — k-mer construction) scales cleanly from 9 to 43 active
|
||||||
|
workers as `CpuSample::do_i_activate` (`obisys::lib.rs`) sees efficiency climb.
|
||||||
|
`pack_matrices` (I/O-bound — reopens and recomposes per-genome column files
|
||||||
|
into `.pbmx`/`.pcmx`) activates one extra worker then flatlines at 10/192 for
|
||||||
|
the rest of the stage, even though 256 partitions keep completing over several
|
||||||
|
minutes. This matches the documented intent (§ Adaptive mechanism — "avoids
|
||||||
|
over-provisioning ... I/O-bound ... workloads") but conflates two different
|
||||||
|
things: *"CPU is not the bottleneck"* and *"more workers would not help"*. On
|
||||||
|
storage with real queue depth (NVMe, RAID, parallel FS) the second stage could
|
||||||
|
still benefit from more concurrent workers even with flat CPU usage — a signal
|
||||||
|
the current mechanism cannot see.
|
||||||
|
|
||||||
|
A one-off artefact was also found in the same log: right after a stage
|
||||||
|
transition, `do_i_activate` produced a physically impossible spike (efficiency
|
||||||
|
~94 cores on a 192-core box) because it has no minimum-window guard — unlike
|
||||||
|
its sibling `cpu_efficiency`, which returns `0.0` if `wall < 0.1s`
|
||||||
|
(`obisys::lib.rs:260`). `do_i_activate` unconditionally overwrites
|
||||||
|
`self.wall`/`self.user_secs`/`self.sys_secs` even when the elapsed window is
|
||||||
|
too short to be meaningful, so a burst of rapid completions right after
|
||||||
|
activating a worker can divide a real CPU delta by a near-zero wall delta.
|
||||||
|
|
||||||
|
### Implemented: I/O signal + shared debounce guard
|
||||||
|
|
||||||
|
`IoSample` (`obisys::lib.rs`, alongside `CpuSample`) is fed by
|
||||||
|
`read_bytes`/`write_bytes` from `/proc/self/io` on Linux (actual bytes
|
||||||
|
submitted to the block layer — not `rchar`/`wchar`, which also count
|
||||||
|
page-cache hits, and not `ru_inblock`/`ru_oublock`, unreliable on macOS), with
|
||||||
|
a `proc_pid_rusage(RUSAGE_INFO_V4)` fallback on macOS
|
||||||
|
(`ri_diskio_bytesread`/`ri_diskio_byteswritten`, FFI only via `libc`, no new
|
||||||
|
dependency — same pattern as the existing `getrusage` bindings). Any other
|
||||||
|
target degrades gracefully to a signal that never triggers (falls back to
|
||||||
|
CPU-only activation), same pattern as `cgroup_v2_available`.
|
||||||
|
|
||||||
|
`maybe_activate` (`numa.rs`) activates a worker if *either* signal still shows
|
||||||
|
headroom, making `PartitionRunner` adapt to whichever resource is actually the
|
||||||
|
bottleneck without per-call configuration. Both samplers are called
|
||||||
|
unconditionally — no `||` short-circuit — so neither window starves behind
|
||||||
|
whichever signal fires first:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let cpu_wants_more = cpu_sample.do_i_activate(CPU_SPAWN_THRESHOLD);
|
||||||
|
let io_wants_more = io_sample.do_i_activate(IO_SPAWN_THRESHOLD);
|
||||||
|
if cpu_wants_more || io_wants_more {
|
||||||
|
activate_tx.send(()).ok();
|
||||||
|
...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Unlike the CPU signal (an absolute delta in cores — a bounded, portable unit),
|
||||||
|
raw I/O throughput has no natural scale across devices, so `IoSample` uses a
|
||||||
|
**relative** growth threshold instead of an absolute one:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||||
|
let elapsed = self.wall.elapsed().as_secs_f64();
|
||||||
|
if elapsed < 0.1 { return false; } // state untouched — window keeps accumulating
|
||||||
|
|
||||||
|
let n = Self::read_bytes();
|
||||||
|
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
|
||||||
|
let activate = if self.previous_rate == 0.0 {
|
||||||
|
rate > 0.0 // bootstrap: any measured throughput is signal
|
||||||
|
} else {
|
||||||
|
(rate - self.previous_rate) / self.previous_rate >= threshold
|
||||||
|
};
|
||||||
|
|
||||||
|
self.bytes = n;
|
||||||
|
self.wall = Instant::now(); // reset only on a real sample
|
||||||
|
activate
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The `elapsed < 0.1s → return false without mutating state` guard was also
|
||||||
|
back-ported into `CpuSample::do_i_activate` (previously missing — source of
|
||||||
|
the ~94-core artefact above) — one fix for both problems, and it removes the
|
||||||
|
need for any arbitrary I/O-rate floor: a short/noisy window is rejected
|
||||||
|
outright rather than papered over with a hardware-dependent constant.
|
||||||
|
|
||||||
|
Both spawn thresholds (`CPU_SPAWN_THRESHOLD`, `IO_SPAWN_THRESHOLD`, both `0.2`)
|
||||||
|
are defined as `const` in `PartitionRunner::run` (`numa.rs`). The I/O value is
|
||||||
|
a starting point, not a derived one — needs empirical validation against a
|
||||||
|
real `pack` run.
|
||||||
|
|
||||||
|
Starting threshold: `0.2` (20 % relative growth) for `IoSample`, same order of
|
||||||
|
magnitude as the CPU threshold's *implicit* relative sensitivity (in the
|
||||||
|
observed log, an 8→9 worker step raised efficiency by ~12 %). This is a
|
||||||
|
starting point, not a derived value — I/O throughput is lumpier than CPU time
|
||||||
|
(buffered writes flush in bursts), so it needs empirical validation against a
|
||||||
|
real `pack` run before being considered final.
|
||||||
|
|
||||||
## Open questions
|
## Open questions
|
||||||
|
|
||||||
- **Error handling**: `run` currently returns the first error; remaining errors
|
- **Error handling**: `run` currently returns the first error; remaining errors
|
||||||
are dropped. A `Vec<E>` return would give complete diagnostics.
|
are dropped. A `Vec<E>` return would give complete diagnostics.
|
||||||
|
|
||||||
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
|
- **`workers_per_node` tuning**: currently `(cpus / 8).max(3).min(8)`, calibrated
|
||||||
for merge on BeeGFS. I/O-bound commands (`dump`, `select`) may benefit from
|
for merge on BeeGFS. Superseded by the I/O signal above for the "more
|
||||||
a higher value. A per-call override could be added to the API.
|
workers would help despite flat CPU" case — a per-call override may still be
|
||||||
|
worth keeping as a manual escape hatch.
|
||||||
|
|
||||||
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
- **`on_done` ordering**: the runner serialises calls to `on_done` via an
|
||||||
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
internal `Arc<Mutex<C>>`. `Send` is required (the Arc clone crosses thread
|
||||||
|
|||||||
Generated
+1
-1
@@ -1704,7 +1704,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "obikmer"
|
name = "obikmer"
|
||||||
version = "1.1.27"
|
version = "1.1.33"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"clap",
|
"clap",
|
||||||
"csv",
|
"csv",
|
||||||
|
|||||||
+26
-39
@@ -20,7 +20,7 @@ use hwlocality::cpu::binding::CpuBindingFlags;
|
|||||||
use hwlocality::cpu::cpuset::CpuSet;
|
use hwlocality::cpu::cpuset::CpuSet;
|
||||||
#[cfg(feature = "numa")]
|
#[cfg(feature = "numa")]
|
||||||
use hwlocality::object::types::ObjectType;
|
use hwlocality::object::types::ObjectType;
|
||||||
use obisys::CpuSample;
|
use obisys::{CpuSample, IoSample};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
// ── Public interface ──────────────────────────────────────────────────────────
|
// ── Public interface ──────────────────────────────────────────────────────────
|
||||||
@@ -190,10 +190,13 @@ impl PartitionRunner {
|
|||||||
/// Run `f(i)` for every index in `order`.
|
/// Run `f(i)` for every index in `order`.
|
||||||
///
|
///
|
||||||
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
|
/// Workers are pre-spawned dormant and activated adaptively. A timer thread
|
||||||
/// fires a CPU-efficiency check every `TIMER_SECS` seconds; each completed
|
/// fires an efficiency check every `TIMER_SECS` seconds; each completed
|
||||||
/// partition resets that timer (forcing an immediate check) and also
|
/// partition resets that timer (forcing an immediate check) and also
|
||||||
/// triggers its own inline check. A new worker is activated whenever
|
/// triggers its own inline check. A new worker is activated whenever CPU
|
||||||
/// efficiency falls below `SPAWN_THRESHOLD`.
|
/// efficiency grows by at least `CPU_SPAWN_THRESHOLD` (absolute, in cores)
|
||||||
|
/// or I/O throughput grows by at least `IO_SPAWN_THRESHOLD` (relative) since
|
||||||
|
/// the last check — whichever resource is the actual bottleneck still shows
|
||||||
|
/// headroom.
|
||||||
///
|
///
|
||||||
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
/// `on_done(i, result, elapsed)` is called from the controller thread as
|
||||||
/// each partition completes — suitable for progress bars and result
|
/// each partition completes — suitable for progress bars and result
|
||||||
@@ -217,13 +220,10 @@ impl PartitionRunner {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
const SPAWN_THRESHOLD: f64 = 0.95;
|
const CPU_SPAWN_THRESHOLD: f64 = 0.2;
|
||||||
|
const IO_SPAWN_THRESHOLD: f64 = 0.2;
|
||||||
const TIMER_SECS: u64 = 30;
|
const TIMER_SECS: u64 = 30;
|
||||||
|
|
||||||
let n_cores = std::thread::available_parallelism()
|
|
||||||
.map(|n| n.get())
|
|
||||||
.unwrap_or(1);
|
|
||||||
|
|
||||||
// ── Channels ──────────────────────────────────────────────────────────
|
// ── Channels ──────────────────────────────────────────────────────────
|
||||||
let (part_tx, part_rx) = unbounded::<usize>();
|
let (part_tx, part_rx) = unbounded::<usize>();
|
||||||
let (activate_tx, activate_rx) = unbounded::<()>();
|
let (activate_tx, activate_rx) = unbounded::<()>();
|
||||||
@@ -291,7 +291,7 @@ impl PartitionRunner {
|
|||||||
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
|
for _ in 0..initial_workers { activate_tx.send(()).ok(); }
|
||||||
let mut n_active = initial_workers;
|
let mut n_active = initial_workers;
|
||||||
let mut cpu_sample = CpuSample::now();
|
let mut cpu_sample = CpuSample::now();
|
||||||
let mut eff_at_last_spawn = 0.0f64; // 0 = no previous spawn to evaluate
|
let mut io_sample = IoSample::now();
|
||||||
let mut completed = 0usize;
|
let mut completed = 0usize;
|
||||||
|
|
||||||
while completed < n_total {
|
while completed < n_total {
|
||||||
@@ -308,15 +308,17 @@ impl PartitionRunner {
|
|||||||
// Inline check: same logic as a timer tick.
|
// Inline check: same logic as a timer tick.
|
||||||
maybe_activate(
|
maybe_activate(
|
||||||
&activate_tx, &mut n_active, max_workers,
|
&activate_tx, &mut n_active, max_workers,
|
||||||
&mut cpu_sample, &mut eff_at_last_spawn,
|
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
|
||||||
n_cores, SPAWN_THRESHOLD, completed, n_total,
|
&mut io_sample, IO_SPAWN_THRESHOLD,
|
||||||
|
completed, n_total,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
WorkerEvent::TimerTick => {
|
WorkerEvent::TimerTick => {
|
||||||
maybe_activate(
|
maybe_activate(
|
||||||
&activate_tx, &mut n_active, max_workers,
|
&activate_tx, &mut n_active, max_workers,
|
||||||
&mut cpu_sample, &mut eff_at_last_spawn,
|
&mut cpu_sample, CPU_SPAWN_THRESHOLD,
|
||||||
n_cores, SPAWN_THRESHOLD, completed, n_total,
|
&mut io_sample, IO_SPAWN_THRESHOLD,
|
||||||
|
completed, n_total,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -347,38 +349,23 @@ fn maybe_activate(
|
|||||||
n_active: &mut usize,
|
n_active: &mut usize,
|
||||||
max_workers: usize,
|
max_workers: usize,
|
||||||
cpu_sample: &mut CpuSample,
|
cpu_sample: &mut CpuSample,
|
||||||
eff_at_last_spawn: &mut f64,
|
cpu_threshold: f64,
|
||||||
n_cores: usize,
|
io_sample: &mut IoSample,
|
||||||
threshold: f64,
|
io_threshold: f64,
|
||||||
completed: usize,
|
completed: usize,
|
||||||
n_total: usize,
|
n_total: usize,
|
||||||
) {
|
) {
|
||||||
if *n_active >= max_workers || completed >= n_total { return; }
|
if *n_active >= max_workers || completed >= n_total { return; }
|
||||||
|
|
||||||
let eff = cpu_sample.cpu_efficiency(n_cores);
|
// Call both unconditionally (no `||` short-circuit): each sampler must
|
||||||
if eff >= threshold { return; } // CPU already saturated
|
// advance its own window every tick, regardless of what the other one
|
||||||
|
// reports, or it would starve behind whichever signal fires first.
|
||||||
|
let cpu_wants_more = cpu_sample.do_i_activate(cpu_threshold);
|
||||||
|
let io_wants_more = io_sample.do_i_activate(io_threshold);
|
||||||
|
|
||||||
// Check that the previous activation was beneficial enough.
|
if cpu_wants_more || io_wants_more {
|
||||||
// Going from k-1 → k workers, the minimum acceptable speedup is (k-1+0.2)/(k-1).
|
|
||||||
// For the very first extra worker (n_active == 1, no previous spawn), skip this
|
|
||||||
// check: eff_at_last_spawn == 0 acts as the sentinel.
|
|
||||||
let last_spawn_was_beneficial = if *eff_at_last_spawn < 1e-9 || eff < 1e-9 {
|
|
||||||
true // first additional worker, or measurement too short: no prior data to evaluate
|
|
||||||
} else {
|
|
||||||
let k_new = *n_active as f64; // worker count after the last spawn
|
|
||||||
let min_gain = 0.2 / k_new;
|
|
||||||
let actual_gain = (eff - *eff_at_last_spawn) / eff;
|
|
||||||
actual_gain >= min_gain
|
|
||||||
};
|
|
||||||
|
|
||||||
if last_spawn_was_beneficial {
|
|
||||||
activate_tx.send(()).ok();
|
activate_tx.send(()).ok();
|
||||||
*eff_at_last_spawn = eff;
|
|
||||||
*n_active += 1;
|
*n_active += 1;
|
||||||
*cpu_sample = CpuSample::now();
|
debug!("activated worker {}/{}", n_active, max_workers);
|
||||||
debug!(
|
|
||||||
"activated worker {}/{} — efficiency {:.0}%",
|
|
||||||
n_active, max_workers, eff * 100.0,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "obikmer"
|
name = "obikmer"
|
||||||
version = "1.1.27"
|
version = "1.1.33"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
|
|||||||
+248
-47
@@ -4,7 +4,7 @@ use std::sync::{Condvar, Mutex};
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
use tracing::{info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
|
const BRAILLE: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"];
|
||||||
|
|
||||||
@@ -31,7 +31,8 @@ impl TracedBar {
|
|||||||
let pct10 = (pos * 10) / self.total; // 0..=10
|
let pct10 = (pos * 10) / self.total; // 0..=10
|
||||||
let last = self.last_pct.load(Ordering::Relaxed);
|
let last = self.last_pct.load(Ordering::Relaxed);
|
||||||
if pct10 > last
|
if pct10 > last
|
||||||
&& self.last_pct
|
&& self
|
||||||
|
.last_pct
|
||||||
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
|
.compare_exchange(last, pct10, Ordering::Relaxed, Ordering::Relaxed)
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
@@ -49,14 +50,14 @@ impl TracedBar {
|
|||||||
let msg = msg.into();
|
let msg = msg.into();
|
||||||
if self.pb.is_hidden() {
|
if self.pb.is_hidden() {
|
||||||
if self.total > 0 {
|
if self.total > 0 {
|
||||||
// bounded bar: always log (already rate-limited by 10% threshold in inc)
|
debug!(stage = %self.label, "{msg}");
|
||||||
info!(stage = %self.label, "{msg}");
|
|
||||||
} else {
|
} else {
|
||||||
// spinner: throttle to ~10 s
|
// spinner: throttle to ~10 s
|
||||||
let now_ms = self.start.elapsed().as_millis() as u64;
|
let now_ms = self.start.elapsed().as_millis() as u64;
|
||||||
let last = self.last_log_ms.load(Ordering::Relaxed);
|
let last = self.last_log_ms.load(Ordering::Relaxed);
|
||||||
if now_ms >= last + 10_000
|
if now_ms >= last + 10_000
|
||||||
&& self.last_log_ms
|
&& self
|
||||||
|
.last_log_ms
|
||||||
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
|
.compare_exchange(last, now_ms, Ordering::Relaxed, Ordering::Relaxed)
|
||||||
.is_ok()
|
.is_ok()
|
||||||
{
|
{
|
||||||
@@ -83,8 +84,13 @@ pub fn spinner(label: &str) -> TracedBar {
|
|||||||
);
|
);
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
pb.enable_steady_tick(Duration::from_millis(100));
|
||||||
TracedBar {
|
TracedBar {
|
||||||
pb, label: label.to_string(), unit: String::new(), total: 0,
|
pb,
|
||||||
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
|
label: label.to_string(),
|
||||||
|
unit: String::new(),
|
||||||
|
total: 0,
|
||||||
|
start: Instant::now(),
|
||||||
|
last_pct: AtomicU64::new(0),
|
||||||
|
last_log_ms: AtomicU64::new(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -101,8 +107,13 @@ pub fn progress_bar(label: &str, n: u64, unit: &str) -> TracedBar {
|
|||||||
);
|
);
|
||||||
pb.enable_steady_tick(Duration::from_millis(100));
|
pb.enable_steady_tick(Duration::from_millis(100));
|
||||||
TracedBar {
|
TracedBar {
|
||||||
pb, label: label.to_string(), unit: unit.to_string(), total: n,
|
pb,
|
||||||
start: Instant::now(), last_pct: AtomicU64::new(0), last_log_ms: AtomicU64::new(0),
|
label: label.to_string(),
|
||||||
|
unit: unit.to_string(),
|
||||||
|
total: n,
|
||||||
|
start: Instant::now(),
|
||||||
|
last_pct: AtomicU64::new(0),
|
||||||
|
last_log_ms: AtomicU64::new(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -204,13 +215,19 @@ fn tv_to_secs(tv: timeval) -> f64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "macos")]
|
#[cfg(target_os = "macos")]
|
||||||
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 }
|
fn rss_to_bytes(ru: &rusage) -> u64 {
|
||||||
|
ru.ru_maxrss as u64
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(not(target_os = "macos"))]
|
#[cfg(not(target_os = "macos"))]
|
||||||
fn rss_to_bytes(ru: &rusage) -> u64 { ru.ru_maxrss as u64 * 1024 }
|
fn rss_to_bytes(ru: &rusage) -> u64 {
|
||||||
|
ru.ru_maxrss as u64 * 1024
|
||||||
|
}
|
||||||
|
|
||||||
// Monotonically increasing counters — negative delta would be a kernel bug.
|
// Monotonically increasing counters — negative delta would be a kernel bug.
|
||||||
fn delta(end: i64, start: i64) -> u64 { (end - start).max(0) as u64 }
|
fn delta(end: i64, start: i64) -> u64 {
|
||||||
|
(end - start).max(0) as u64
|
||||||
|
}
|
||||||
|
|
||||||
// ── CpuSample ─────────────────────────────────────────────────────────────────
|
// ── CpuSample ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
@@ -221,6 +238,7 @@ pub struct CpuSample {
|
|||||||
wall: Instant,
|
wall: Instant,
|
||||||
user_secs: f64,
|
user_secs: f64,
|
||||||
sys_secs: f64,
|
sys_secs: f64,
|
||||||
|
previous: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CpuSample {
|
impl CpuSample {
|
||||||
@@ -230,6 +248,7 @@ impl CpuSample {
|
|||||||
wall: Instant::now(),
|
wall: Instant::now(),
|
||||||
user_secs: tv_to_secs(ru.ru_utime),
|
user_secs: tv_to_secs(ru.ru_utime),
|
||||||
sys_secs: tv_to_secs(ru.ru_stime),
|
sys_secs: tv_to_secs(ru.ru_stime),
|
||||||
|
previous: 0.0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -238,11 +257,129 @@ impl CpuSample {
|
|||||||
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
|
pub fn cpu_efficiency(&self, n_cores: usize) -> f64 {
|
||||||
let ru = get_rusage();
|
let ru = get_rusage();
|
||||||
let wall = self.wall.elapsed().as_secs_f64();
|
let wall = self.wall.elapsed().as_secs_f64();
|
||||||
if wall < 0.1 { return 0.0; }
|
if wall < 0.1 {
|
||||||
let cpu = (tv_to_secs(ru.ru_utime) - self.user_secs)
|
return 0.0;
|
||||||
+ (tv_to_secs(ru.ru_stime) - self.sys_secs);
|
}
|
||||||
|
let cpu =
|
||||||
|
(tv_to_secs(ru.ru_utime) - self.user_secs) + (tv_to_secs(ru.ru_stime) - self.sys_secs);
|
||||||
cpu / (wall * n_cores as f64)
|
cpu / (wall * n_cores as f64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||||
|
let delta_wall = self.wall.elapsed().as_secs_f64();
|
||||||
|
if delta_wall < 0.1 {
|
||||||
|
// Window too short to be meaningful — leave state untouched so it
|
||||||
|
// keeps accumulating until a real sample can be taken.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = CpuSample::now();
|
||||||
|
let delta_ru = (n.user_secs - self.user_secs) + (n.sys_secs - self.sys_secs);
|
||||||
|
|
||||||
|
let efficiency = delta_ru / delta_wall;
|
||||||
|
let activate = 0f64.max(efficiency - self.previous) >= threshold;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"Do I activate : {} -> {} = {} Activate: {}",
|
||||||
|
self.previous,
|
||||||
|
efficiency,
|
||||||
|
0f64.max(efficiency - self.previous),
|
||||||
|
activate
|
||||||
|
);
|
||||||
|
self.previous = efficiency;
|
||||||
|
self.user_secs = n.user_secs;
|
||||||
|
self.sys_secs = n.sys_secs;
|
||||||
|
self.wall = n.wall;
|
||||||
|
|
||||||
|
activate
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── IoSample ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Snapshot of process-wide block I/O (bytes read + written) + wall clock.
|
||||||
|
///
|
||||||
|
/// Same activation protocol as [`CpuSample`], but the growth check in
|
||||||
|
/// [`do_i_activate`](Self::do_i_activate) is *relative* rather than absolute:
|
||||||
|
/// raw I/O throughput has no portable scale across storage devices, unlike a
|
||||||
|
/// core count.
|
||||||
|
pub struct IoSample {
|
||||||
|
wall: Instant,
|
||||||
|
bytes: u64,
|
||||||
|
previous_rate: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IoSample {
|
||||||
|
pub fn now() -> Self {
|
||||||
|
Self {
|
||||||
|
wall: Instant::now(),
|
||||||
|
bytes: Self::read_bytes(),
|
||||||
|
previous_rate: 0.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bytes actually submitted to the block layer (read + write), summed
|
||||||
|
/// process-wide. Returns 0 if unavailable — degrades gracefully to a
|
||||||
|
/// signal that never triggers activation (CPU-only heuristic).
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
fn read_bytes() -> u64 {
|
||||||
|
let Ok(io) = std::fs::read_to_string("/proc/self/io") else {
|
||||||
|
return 0;
|
||||||
|
};
|
||||||
|
io.lines()
|
||||||
|
.filter_map(|l| {
|
||||||
|
l.strip_prefix("read_bytes: ")
|
||||||
|
.or_else(|| l.strip_prefix("write_bytes: "))
|
||||||
|
})
|
||||||
|
.filter_map(|v| v.trim().parse::<u64>().ok())
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "macos")]
|
||||||
|
fn read_bytes() -> u64 {
|
||||||
|
use libc::{RUSAGE_INFO_V4, getpid, proc_pid_rusage, rusage_info_v4};
|
||||||
|
let mut info: rusage_info_v4 = unsafe { std::mem::zeroed() };
|
||||||
|
let ret =
|
||||||
|
unsafe { proc_pid_rusage(getpid(), RUSAGE_INFO_V4, &mut info as *mut _ as *mut _) };
|
||||||
|
if ret != 0 {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
info.ri_diskio_bytesread + info.ri_diskio_byteswritten
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
|
||||||
|
fn read_bytes() -> u64 {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same protocol as [`CpuSample::do_i_activate`] (0.1 s minimum window,
|
||||||
|
/// state untouched on early return), but growth is measured relative to
|
||||||
|
/// the previous rate. `threshold` is a fraction, e.g. `0.2` for a 20 %
|
||||||
|
/// increase in throughput since the last real sample.
|
||||||
|
pub fn do_i_activate(&mut self, threshold: f64) -> bool {
|
||||||
|
let elapsed = self.wall.elapsed().as_secs_f64();
|
||||||
|
if elapsed < 0.1 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = Self::read_bytes();
|
||||||
|
let rate = n.saturating_sub(self.bytes) as f64 / elapsed;
|
||||||
|
let activate = if self.previous_rate == 0.0 {
|
||||||
|
rate > 0.0 // bootstrap: any measured throughput is signal enough
|
||||||
|
} else {
|
||||||
|
(rate - self.previous_rate) / self.previous_rate >= threshold
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"Do I activate (I/O) : {} -> {} Activate: {}",
|
||||||
|
self.previous_rate, rate, activate
|
||||||
|
);
|
||||||
|
self.previous_rate = rate;
|
||||||
|
self.bytes = n;
|
||||||
|
self.wall = Instant::now();
|
||||||
|
|
||||||
|
activate
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── public API ────────────────────────────────────────────────────────────────
|
// ── public API ────────────────────────────────────────────────────────────────
|
||||||
@@ -259,7 +396,11 @@ impl Stage {
|
|||||||
pub fn start(label: impl Into<String>) -> Self {
|
pub fn start(label: impl Into<String>) -> Self {
|
||||||
let label = label.into();
|
let label = label.into();
|
||||||
info!(stage = %label, "started");
|
info!(stage = %label, "started");
|
||||||
Self { label, wall: Instant::now(), ru: get_rusage() }
|
Self {
|
||||||
|
label,
|
||||||
|
wall: Instant::now(),
|
||||||
|
ru: get_rusage(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop(self) -> StageStats {
|
pub fn stop(self) -> StageStats {
|
||||||
@@ -318,8 +459,11 @@ pub struct StageStats {
|
|||||||
impl StageStats {
|
impl StageStats {
|
||||||
/// (user + sys) / wall — effective thread count utilisation.
|
/// (user + sys) / wall — effective thread count utilisation.
|
||||||
pub fn parallelism(&self) -> f64 {
|
pub fn parallelism(&self) -> f64 {
|
||||||
if self.wall_secs > 1e-9 { (self.user_secs + self.sys_secs) / self.wall_secs }
|
if self.wall_secs > 1e-9 {
|
||||||
else { 0.0 }
|
(self.user_secs + self.sys_secs) / self.wall_secs
|
||||||
|
} else {
|
||||||
|
0.0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
|
/// parallelism / n_cores — fraction of available CPU power used (0..1+).
|
||||||
@@ -335,11 +479,19 @@ pub struct Reporter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Reporter {
|
impl Reporter {
|
||||||
pub fn new() -> Self { Self::default() }
|
pub fn new() -> Self {
|
||||||
pub fn push(&mut self, stats: StageStats) { self.stages.push(stats); }
|
Self::default()
|
||||||
pub fn stages(&self) -> &[StageStats] { &self.stages }
|
}
|
||||||
|
pub fn push(&mut self, stats: StageStats) {
|
||||||
|
self.stages.push(stats);
|
||||||
|
}
|
||||||
|
pub fn stages(&self) -> &[StageStats] {
|
||||||
|
&self.stages
|
||||||
|
}
|
||||||
/// Print the summary to stderr.
|
/// Print the summary to stderr.
|
||||||
pub fn print(&self) { eprint!("{self}"); }
|
pub fn print(&self) {
|
||||||
|
eprint!("{self}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── diagnosis ─────────────────────────────────────────────────────────────────
|
// ── diagnosis ─────────────────────────────────────────────────────────────────
|
||||||
@@ -387,26 +539,43 @@ fn diagnose(s: &StageStats, n_cores: usize) -> Diagnosis {
|
|||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
Diagnosis { tag: "—", detail: None }
|
Diagnosis {
|
||||||
|
tag: "—",
|
||||||
|
detail: None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── display helpers ───────────────────────────────────────────────────────────
|
// ── display helpers ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
fn fmt_secs(s: f64) -> String {
|
fn fmt_secs(s: f64) -> String {
|
||||||
if s >= 100.0 { format!("{:.0}s", s) }
|
if s >= 100.0 {
|
||||||
else if s >= 10.0 { format!("{:.1}s", s) }
|
format!("{:.0}s", s)
|
||||||
else if s >= 1.0 { format!("{:.2}s", s) }
|
} else if s >= 10.0 {
|
||||||
else { format!("{:.0}ms", s * 1000.0) }
|
format!("{:.1}s", s)
|
||||||
|
} else if s >= 1.0 {
|
||||||
|
format!("{:.2}s", s)
|
||||||
|
} else {
|
||||||
|
format!("{:.0}ms", s * 1000.0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fmt_bytes(b: u64) -> String {
|
fn fmt_bytes(b: u64) -> String {
|
||||||
if b >= 1 << 30 { format!("{:.1} GB", b as f64 / (1u64 << 30) as f64) }
|
if b >= 1 << 30 {
|
||||||
else if b >= 1 << 20 { format!("{:.0} MB", b as f64 / (1u64 << 20) as f64) }
|
format!("{:.1} GB", b as f64 / (1u64 << 30) as f64)
|
||||||
else { format!("{:.0} KB", b as f64 / 1024.0) }
|
} else if b >= 1 << 20 {
|
||||||
|
format!("{:.0} MB", b as f64 / (1u64 << 20) as f64)
|
||||||
|
} else {
|
||||||
|
format!("{:.0} KB", b as f64 / 1024.0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
|
fn fmt_efficiency(par: f64, n_cores: usize) -> String {
|
||||||
format!("{:.1}×/{} ({:.0}%)", par, n_cores, par / n_cores as f64 * 100.0)
|
format!(
|
||||||
|
"{:.1}×/{} ({:.0}%)",
|
||||||
|
par,
|
||||||
|
n_cores,
|
||||||
|
par / n_cores as f64 * 100.0
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Display ───────────────────────────────────────────────────────────────────
|
// ── Display ───────────────────────────────────────────────────────────────────
|
||||||
@@ -434,7 +603,11 @@ impl MemoryBudget {
|
|||||||
pub fn new(total: u64) -> Self {
|
pub fn new(total: u64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
total,
|
total,
|
||||||
inner: Mutex::new(BudgetInner { remaining: total, active: 0, peak_active: 0 }),
|
inner: Mutex::new(BudgetInner {
|
||||||
|
remaining: total,
|
||||||
|
active: 0,
|
||||||
|
peak_active: 0,
|
||||||
|
}),
|
||||||
condvar: Condvar::new(),
|
condvar: Condvar::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -459,24 +632,40 @@ impl MemoryBudget {
|
|||||||
self.condvar.notify_all();
|
self.condvar.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn total(&self) -> u64 { self.total }
|
pub fn total(&self) -> u64 {
|
||||||
pub fn active(&self) -> usize { self.inner.lock().unwrap().active }
|
self.total
|
||||||
pub fn remaining(&self) -> u64 { self.inner.lock().unwrap().remaining }
|
}
|
||||||
pub fn peak_active(&self) -> usize { self.inner.lock().unwrap().peak_active }
|
pub fn active(&self) -> usize {
|
||||||
|
self.inner.lock().unwrap().active
|
||||||
|
}
|
||||||
|
pub fn remaining(&self) -> u64 {
|
||||||
|
self.inner.lock().unwrap().remaining
|
||||||
|
}
|
||||||
|
pub fn peak_active(&self) -> usize {
|
||||||
|
self.inner.lock().unwrap().peak_active
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Display ───────────────────────────────────────────────────────────────────
|
// ── Display ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
impl fmt::Display for Reporter {
|
impl fmt::Display for Reporter {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
if self.stages.is_empty() { return Ok(()); }
|
if self.stages.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let n_cores = std::thread::available_parallelism()
|
let n_cores = std::thread::available_parallelism()
|
||||||
.map(|n| n.get())
|
.map(|n| n.get())
|
||||||
.unwrap_or(1);
|
.unwrap_or(1);
|
||||||
|
|
||||||
// column widths
|
// column widths
|
||||||
let nw = self.stages.iter().map(|s| s.label.len()).max().unwrap_or(5).max(5);
|
let nw = self
|
||||||
|
.stages
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.label.len())
|
||||||
|
.max()
|
||||||
|
.unwrap_or(5)
|
||||||
|
.max(5);
|
||||||
// efficiency col: worst-case width for this run's n_cores value
|
// efficiency col: worst-case width for this run's n_cores value
|
||||||
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
|
let ew = format!("{:.1}×/{} (100%)", 99.9f64, n_cores).len();
|
||||||
|
|
||||||
@@ -484,18 +673,21 @@ impl fmt::Display for Reporter {
|
|||||||
let sep = "─".repeat(sep_w);
|
let sep = "─".repeat(sep_w);
|
||||||
|
|
||||||
// header
|
// header
|
||||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} status",
|
writeln!(
|
||||||
"stage", "wall", "efficiency", "peak RSS")?;
|
f,
|
||||||
|
"{:<nw$} {:>7} {:>ew$} {:>8} status",
|
||||||
|
"stage", "wall", "efficiency", "peak RSS"
|
||||||
|
)?;
|
||||||
writeln!(f, "{sep}")?;
|
writeln!(f, "{sep}")?;
|
||||||
|
|
||||||
// compute all diagnoses up front (needed for both table and footnotes)
|
// compute all diagnoses up front (needed for both table and footnotes)
|
||||||
let diagnoses: Vec<Diagnosis> = self.stages.iter()
|
let diagnoses: Vec<Diagnosis> = self.stages.iter().map(|s| diagnose(s, n_cores)).collect();
|
||||||
.map(|s| diagnose(s, n_cores))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// per-stage rows
|
// per-stage rows
|
||||||
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
|
for (s, d) in self.stages.iter().zip(diagnoses.iter()) {
|
||||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8} {}",
|
writeln!(
|
||||||
|
f,
|
||||||
|
"{:<nw$} {:>7} {:>ew$} {:>8} {}",
|
||||||
s.label,
|
s.label,
|
||||||
fmt_secs(s.wall_secs),
|
fmt_secs(s.wall_secs),
|
||||||
fmt_efficiency(s.parallelism(), n_cores),
|
fmt_efficiency(s.parallelism(), n_cores),
|
||||||
@@ -508,11 +700,18 @@ impl fmt::Display for Reporter {
|
|||||||
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
|
let tw = self.stages.iter().map(|s| s.wall_secs).sum::<f64>();
|
||||||
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
|
let tu = self.stages.iter().map(|s| s.user_secs).sum::<f64>();
|
||||||
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
|
let ts = self.stages.iter().map(|s| s.sys_secs).sum::<f64>();
|
||||||
let trss = self.stages.iter().map(|s| s.max_rss_bytes).max().unwrap_or(0);
|
let trss = self
|
||||||
|
.stages
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.max_rss_bytes)
|
||||||
|
.max()
|
||||||
|
.unwrap_or(0);
|
||||||
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
|
let tpar = if tw > 1e-9 { (tu + ts) / tw } else { 0.0 };
|
||||||
|
|
||||||
writeln!(f, "{sep}")?;
|
writeln!(f, "{sep}")?;
|
||||||
writeln!(f, "{:<nw$} {:>7} {:>ew$} {:>8}",
|
writeln!(
|
||||||
|
f,
|
||||||
|
"{:<nw$} {:>7} {:>ew$} {:>8}",
|
||||||
"TOTAL",
|
"TOTAL",
|
||||||
fmt_secs(tw),
|
fmt_secs(tw),
|
||||||
fmt_efficiency(tpar, n_cores),
|
fmt_efficiency(tpar, n_cores),
|
||||||
@@ -520,7 +719,9 @@ impl fmt::Display for Reporter {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
// bottleneck footnotes (only if at least one anomaly detected)
|
// bottleneck footnotes (only if at least one anomaly detected)
|
||||||
let bottlenecks: Vec<(&str, &str)> = self.stages.iter()
|
let bottlenecks: Vec<(&str, &str)> = self
|
||||||
|
.stages
|
||||||
|
.iter()
|
||||||
.zip(diagnoses.iter())
|
.zip(diagnoses.iter())
|
||||||
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
|
.filter_map(|(s, d)| d.detail.as_deref().map(|det| (s.label.as_str(), det)))
|
||||||
.collect();
|
.collect();
|
||||||
|
|||||||
Reference in New Issue
Block a user