-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfrontend.cpp
More file actions
11673 lines (10663 loc) · 543 KB
/
frontend.cpp
File metadata and controls
11673 lines (10663 loc) · 543 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// frontend.cpp — Web UI server, log aggregator, service manager, and test runner.
//
// The frontend is the central control plane for the Prodigy system. It:
// - Serves a single-page web application (SPA) at http://127.0.0.1:8080/ (loopback only)
// - Manages the lifecycle of all 7 pipeline services (start/stop/restart/config)
// - Aggregates structured log entries from all services via UDP on port 22022
// - Stores logs in SQLite and exposes them via REST API and SSE stream
// - Provides test infrastructure for Whisper ASR accuracy and pipeline WER testing
// - Provides the IAP audio quality test (offline G.711 codec round-trip check)
//
// HTTP API index (all endpoints on port 8080):
//
// Service management:
// GET /api/services — list all managed services + status
// POST /api/services/start — start a service {name, args}
// POST /api/services/stop — stop a service {name}
// POST /api/services/restart — restart a service {name}
// GET/POST /api/services/config — read/write per-service config in SQLite
//
// Logging:
// GET /api/logs — paginated log query {limit, offset, service, level}
// GET /api/logs/recent — last N log entries from in-memory ring buffer
// GET /api/logs/stream — Server-Sent Events (SSE) live log stream
// POST /api/settings/log_level — set per-service log level; propagates to running service
//
// Database:
// POST /api/db/query — execute arbitrary SELECT query (read-only guard)
// POST /api/db/write_mode — toggle write mode for unsafe queries
// GET /api/db/schema — return SQLite schema
//
// Whisper / ASR:
// GET /api/whisper/models — list available GGML model files in models/
// POST /api/whisper/accuracy_test — run offline Whisper accuracy test on a WAV file
// POST /api/whisper/hallucination_filter — enable/disable hallucination filter on running service
//
// VAD:
// GET/POST /api/vad/config — read/write VAD parameters; propagates to running service
//
// SIP:
// POST /api/sip/add-line — register a new SIP account (calls ADD_LINE on service)
// POST /api/sip/remove-line — remove a SIP account
// GET /api/sip/lines — list registered SIP lines
// GET /api/sip/stats — RTP counters per active call
//
// IAP:
// POST /api/iap/quality_test — offline G.711 round-trip codec quality test
//
// Test files:
// GET /api/testfiles — list WAV+TXT sample pairs in Testfiles/
// POST /api/testfiles/scan — rescan Testfiles/ directory
//
// Test infrastructure:
// GET /api/tests — list available test binaries
// POST /api/tests/start — run a test binary
// POST /api/tests/stop — kill a running test
// GET /api/tests/*/history — test run history
// GET /api/tests/*/log — test stdout/stderr log
// GET /api/test_results — pipeline WER test results from /tmp/pipeline_results_*.json
//
// Dashboard / aggregated:
// GET /api/dashboard — service statuses, recent logs, test summary, uptime, pipeline topology
// GET /api/test_results_summary — aggregated test results (service_test_runs, whisper_accuracy_tests, ...)
//
// Misc:
// GET /api/status — system uptime, service health summary
//
// Log processing flow:
// 1. UDP recv on port 22022: run_log_server() reads 4096-byte datagrams.
// 2. process_log_message() parses "<SERVICE> <LEVEL> <CALL_ID> <message>".
// Malformed datagrams are silently dropped (no crash).
// 3. LogEntry is enqueued to the async SQLite writer thread (enqueue_log()).
// 4. Writer thread batch-INSERTs into the `logs` table at high throughput.
// 5. In-memory ring buffer (recent_logs_, MAX_RECENT_LOGS entries) is updated.
// 6. SSE broadcast notifies all open /api/logs/stream connections.
//
// Service start / log-level persistence:
// Service configs (args, log level) are stored in SQLite table `service_config`.
// start_service() reads log_level_<NAME> from DB and appends --log-level to args.
// handle_log_level_settings() writes to DB then sends SET_LOG_LEVEL:<LEVEL> to
// the service's cmd port (if running) — no restart needed.
//
// LLaMA quality test (score_llama_response):
// Scores generated responses by keyword match%, brevity (vs. max_words), and German
// language detection. Used by the frontend test panel to evaluate LLM quality.
#include "interconnect.h"
#include "mongoose.h"
#include "sqlite3.h"
#include <iostream>
#include <sstream>
#include <fstream>
#include <cmath>
#include <cstring>
#include <map>
#include <vector>
#include <deque>
#include <mutex>
#include <atomic>
#include <queue>
#include <ctime>
#include <chrono>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <signal.h>
#include <dirent.h>
#include <algorithm>
#include <fcntl.h>
#include <cerrno>
#include <climits>
#include <regex>
#include <iomanip>
#include <unordered_set>
// Named constants — all timing, buffer, and limit values formerly scattered as
// magic numbers throughout the event loop, log infrastructure, and test runners.
// Units are indicated by the suffix: _MS (milliseconds), _S (seconds),
// _US (microseconds), _DAYS (days). Buffer/count limits have no time suffix.
static constexpr int LOG_FLUSH_INTERVAL_MS = 500; // batch-INSERT cadence for log writer
static constexpr int UDP_BUFFER_SIZE = 4096; // max datagram size for log receiver
static constexpr int DB_QUERY_ROW_LIMIT = 10000; // max rows returned by /api/db/query
static constexpr int MG_POLL_TIMEOUT_MS = 100; // mongoose event-loop poll timeout
static constexpr int LOG_RETENTION_DAYS = 30; // log rotation: delete entries older than this
static constexpr int SERVICE_CHECK_INTERVAL_S = 2; // how often to reap dead child processes
static constexpr int ASYNC_CLEANUP_INTERVAL_S = 30; // how often to clean up finished async tasks
static constexpr int RECENT_LOGS_API_LIMIT = 100; // /api/logs/recent returns at most this many
static constexpr int DASHBOARD_RECENT_LOGS_LIMIT = 10; // /api/dashboard activity feed entry count
static constexpr useconds_t SIGTERM_GRACE_US = 500000; // 500ms grace after SIGTERM before SIGKILL
static constexpr useconds_t SERVICE_STARTUP_WAIT_US = 200000; // 200ms delay after killing ghosts
static constexpr useconds_t STOP_POLL_INTERVAL_US = 100000; // 100ms between stop-poll iterations
static constexpr useconds_t SHUTDOWN_GRACE_US = 2000000; // 2s shutdown grace period
static constexpr useconds_t RESTART_WAIT_US = 500000; // 500ms wait between stop and start
static constexpr int TRANSCRIPTION_SETTLE_MS = 5000; // settle time before reading transcription
static constexpr int TRANSCRIPTION_POLL_MS = 150; // poll interval for transcription log check
static constexpr int LLAMA_RESPONSE_POLL_MS = 200; // poll interval for LLaMA response check
static constexpr int SHUTUP_INTER_ROUND_MS = 100; // pause between shut-up test rounds
static constexpr int STRESS_POLL_MS = 100; // poll interval in pipeline stress loop
static constexpr int PIPELINE_ROUND_POLL_MS = 500; // poll interval for pipeline round-trip test
static constexpr int ACCURACY_INTER_FILE_MS = 2000; // pause between accuracy test files
static constexpr int DOWNLOAD_PROGRESS_POLL_MS = 500; // poll interval for download progress
using namespace whispertalk;
static std::atomic<bool> s_sigint_received(false);
static void signal_handler(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
s_sigint_received = true;
}
}
struct LogEntry {
std::string timestamp;
ServiceType service;
uint32_t call_id;
std::string level;
std::string message;
uint64_t seq = 0;
};
struct TestInfo {
std::string name;
std::string binary_path;
std::string description;
std::vector<std::string> default_args;
bool is_running;
pid_t pid;
std::string log_file;
time_t start_time;
time_t end_time;
int exit_code;
};
struct ServiceInfo {
std::string name;
std::string binary_path;
std::string default_args;
std::string description;
bool managed;
pid_t pid;
std::string log_file;
time_t start_time;
};
struct TestFileInfo {
std::string name;
size_t size_bytes;
double duration_sec;
uint32_t sample_rate;
uint16_t channels;
std::string ground_truth;
time_t last_modified;
};
static std::string escape_json(const std::string& s) {
std::string result;
for (char c : s) {
switch (c) {
case '"': result += "\\\""; break;
case '\\': result += "\\\\"; break;
case '\b': result += "\\b"; break;
case '\f': result += "\\f"; break;
case '\n': result += "\\n"; break;
case '\r': result += "\\r"; break;
case '\t': result += "\\t"; break;
default:
if (static_cast<unsigned char>(c) < 32) {
char buf[8];
snprintf(buf, sizeof(buf), "\\u%04x", static_cast<unsigned char>(c));
result += buf;
} else {
result += c;
}
}
}
return result;
}
static bool contains_whole_word(const std::string& text, const std::string& word) {
size_t pos = 0;
while ((pos = text.find(word, pos)) != std::string::npos) {
bool pre_ok = (pos == 0 || !isalpha((unsigned char)text[pos - 1]));
bool post_ok = (pos + word.size() >= text.size() ||
!isalpha((unsigned char)text[pos + word.size()]));
if (pre_ok && post_ok) return true;
pos++;
}
return false;
}
static bool detect_german(const std::string& text) {
static const char* word_markers[] = {
"ich", "der", "die", "das", "ist", "ein", "und",
"den", "dem", "des", "von", "als", "auch", "nicht",
"sich", "wie", "kann", "gerne", "bitte", "danke", "nein",
"es", "zu", "er", "sie", "wir", "ihr",
"mir", "dir", "uns", "ihm", "mich", "dich",
"sind", "oder", "aber", "haben", "werden",
"schon", "sehr", "noch", "hier", "dort", "diese",
"einen", "einer", "gute", "guten"
};
static const char* substr_markers[] = {"ü", "ö", "ä", "ß"};
std::string lower = text;
for (auto& ch : lower) ch = tolower((unsigned char)ch);
int hits = 0;
for (const auto* m : word_markers) {
if (contains_whole_word(lower, m)) hits++;
}
for (const auto* m : substr_markers) {
if (lower.find(m) != std::string::npos) hits++;
}
int word_count = 0;
bool in_w = false;
for (char ch : text) {
if (ch == ' ' || ch == '\n' || ch == '\t') in_w = false;
else if (!in_w) { in_w = true; word_count++; }
}
if (word_count <= 3) return hits >= 1;
return hits >= 2;
}
static int count_words(const std::string& text) {
int count = 0;
bool in_word = false;
for (char ch : text) {
if (ch == ' ' || ch == '\n' || ch == '\t') { in_word = false; }
else if (!in_word) { in_word = true; count++; }
}
return count;
}
static int count_keyword_matches(const std::string& text, const std::vector<std::string>& keywords) {
std::string lower = text;
for (auto& ch : lower) ch = tolower((unsigned char)ch);
int found = 0;
for (const auto& kw : keywords) {
std::string lk = kw;
for (auto& ch : lk) ch = tolower((unsigned char)ch);
if (lower.find(lk) != std::string::npos) found++;
}
return found;
}
struct LlamaScoreResult {
double score;
int word_count;
int keywords_found;
bool is_german;
};
// Scores a LLaMA response on three axes, returning a weighted composite 0-100:
// 1. Keyword coverage (40%): % of expected keywords found (case-insensitive substring).
// 2. Brevity (30%): 100 if word_count ≤ max_words, else penalized −5 per excess word (floor 0).
// 3. German language (30%): binary — 100 if German detected, 0 otherwise.
// Used by the LLaMA quality test panel to evaluate response quality without human review.
static LlamaScoreResult score_llama_response(const std::string& response,
const std::vector<std::string>& keywords, int max_words) {
LlamaScoreResult r;
r.word_count = count_words(response);
r.keywords_found = count_keyword_matches(response, keywords);
r.is_german = detect_german(response);
double kw_pct = keywords.empty() ? 100.0 : (r.keywords_found * 100.0 / keywords.size());
// Brevity penalty: −5 points per word over max_words, clamped to [0, 100].
double brevity = (r.word_count <= max_words) ? 100.0 :
std::max(0.0, 100.0 - (r.word_count - max_words) * 5.0);
double german = r.is_german ? 100.0 : 0.0;
// Weighted composite: keyword relevance (40%) + brevity (30%) + language (30%).
r.score = kw_pct * 0.4 + brevity * 0.3 + german * 0.3;
return r;
}
class FrontendServer {
public:
FrontendServer(uint16_t http_port, const std::string& project_root)
: http_port_(http_port),
log_port_(0),
interconnect_(ServiceType::FRONTEND),
db_(nullptr),
db_ok_(false),
project_root_(project_root),
db_path_(project_root + "/frontend.db") {
db_ok_ = init_database();
if (db_ok_) {
discover_tests();
load_services();
scan_testfiles_directory();
}
}
~FrontendServer() {
if (db_) {
sqlite3_close(db_);
}
}
bool start() {
if (!db_ok_) {
std::cerr << "ERROR: Database initialization failed. Cannot start frontend server.\n";
std::cerr << "Check that the database path is writable: " << db_path_ << "\n";
return false;
}
if (!interconnect_.initialize()) {
std::cerr << "Failed to initialize interconnect\n";
return false;
}
log_port_ = whispertalk::FRONTEND_LOG_PORT;
int probe = socket(AF_INET, SOCK_DGRAM, 0);
if (probe >= 0) {
struct sockaddr_in pa{};
pa.sin_family = AF_INET;
pa.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
pa.sin_port = htons(log_port_);
if (bind(probe, (struct sockaddr*)&pa, sizeof(pa)) < 0) {
std::cerr << "FATAL: Log port " << log_port_ << " is already in use (another frontend running?)\n";
close(probe);
interconnect_.shutdown();
return false;
}
close(probe);
}
std::cout << "Frontend logging port: " << log_port_ << "\n";
std::cout << "Frontend HTTP port: " << http_port_ << "\n";
log_thread_ = std::thread(&FrontendServer::log_receiver_loop, this);
mg_mgr_init(&mgr_);
// Security: binds to loopback only — intentionally not exposed to the network.
// All security assumptions (no auth, no TLS) rely on local-only access.
std::string listen_addr = "http://127.0.0.1:" + std::to_string(http_port_);
struct mg_connection *c = mg_http_listen(&mgr_, listen_addr.c_str(), http_handler_static, this);
if (c) c->fn_data = this;
std::cout << "Frontend web server started on " << listen_addr << "\n";
std::cout << "Open http://localhost:" << http_port_ << " in your browser\n";
auto last_flush = std::chrono::steady_clock::now();
auto last_rotation = last_flush;
auto last_svc_check = std::chrono::steady_clock::now();
auto last_async_cleanup = std::chrono::steady_clock::now();
while (!s_sigint_received) {
mg_mgr_poll(&mgr_, MG_POLL_TIMEOUT_MS);
check_test_status();
flush_sse_queue();
auto now = std::chrono::steady_clock::now();
if (now - last_flush >= std::chrono::milliseconds(LOG_FLUSH_INTERVAL_MS)) {
flush_log_queue();
last_flush = now;
}
if (now - last_svc_check >= std::chrono::seconds(SERVICE_CHECK_INTERVAL_S)) {
check_service_status();
last_svc_check = now;
}
if (now - last_async_cleanup >= std::chrono::seconds(ASYNC_CLEANUP_INTERVAL_S)) {
cleanup_old_async_tasks();
last_async_cleanup = now;
}
if (now - last_rotation >= std::chrono::hours(1)) {
rotate_logs();
last_rotation = now;
}
}
flush_log_queue();
shutdown_managed_processes();
mg_mgr_free(&mgr_);
interconnect_.shutdown();
if (log_thread_.joinable()) {
log_thread_.join();
}
return true;
}
private:
uint16_t http_port_;
uint16_t log_port_;
InterconnectNode interconnect_;
sqlite3* db_;
bool db_ok_ = false;
bool db_write_mode_ = false;
std::string project_root_;
std::string db_path_;
struct mg_mgr mgr_;
std::thread log_thread_;
std::mutex tests_mutex_;
std::vector<TestInfo> tests_;
std::mutex services_mutex_;
std::vector<ServiceInfo> services_;
std::mutex testfiles_mutex_;
std::vector<TestFileInfo> testfiles_;
std::mutex logs_mutex_;
std::deque<LogEntry> recent_logs_;
std::atomic<uint64_t> log_seq_{0};
static constexpr size_t MAX_RECENT_LOGS = 1000;
static constexpr int TEST_SIP_PROVIDER_PORT = 22011;
std::mutex sse_mutex_;
std::vector<struct mg_connection*> sse_connections_;
static constexpr size_t MAX_SSE_CONNECTIONS = 20;
std::mutex sse_queue_mutex_;
std::vector<LogEntry> sse_queue_;
std::chrono::steady_clock::time_point start_time_ = std::chrono::steady_clock::now();
struct AsyncTask {
int64_t id;
std::string type;
std::atomic<bool> running{true};
std::atomic<bool> result_read{false};
std::string result_json;
std::thread worker;
};
std::mutex async_mutex_;
std::map<int64_t, std::shared_ptr<AsyncTask>> async_tasks_;
std::atomic<int64_t> async_id_counter_{0};
struct DownloadProgress {
std::atomic<int64_t> bytes_downloaded{0};
std::atomic<int64_t> total_bytes{0};
std::atomic<bool> complete{false};
std::atomic<bool> failed{false};
std::string error;
std::string filename;
std::string local_path;
std::mutex mu;
};
std::mutex downloads_mutex_;
std::map<int64_t, std::shared_ptr<DownloadProgress>> downloads_;
struct PipelineStressProgress {
std::atomic<bool> running{true};
std::atomic<bool> stop_requested{false};
std::atomic<int> elapsed_s{0};
std::atomic<int> duration_s{120};
std::atomic<int> cycles_completed{0};
std::atomic<int> cycles_ok{0};
std::atomic<int> cycles_fail{0};
std::atomic<int> total_latency_ms{0};
std::atomic<int> min_latency_ms{999999};
std::atomic<int> max_latency_ms{0};
struct SvcSnap {
std::atomic<int> memory_mb{0};
std::atomic<bool> reachable{true};
std::atomic<int> ping_ok{0};
std::atomic<int> ping_fail{0};
std::atomic<int> total_ping_ms{0};
};
SvcSnap svcs[7];
std::mutex result_mutex;
std::string result_json;
};
std::shared_ptr<PipelineStressProgress> pipeline_stress_;
std::mutex pipeline_stress_mutex_;
int64_t create_async_task(const std::string& type) {
int64_t id = ++async_id_counter_;
auto task = std::make_shared<AsyncTask>();
task->id = id;
task->type = type;
std::lock_guard<std::mutex> lock(async_mutex_);
async_tasks_[id] = task;
return id;
}
void finish_async_task(int64_t id, const std::string& result) {
std::lock_guard<std::mutex> lock(async_mutex_);
auto it = async_tasks_.find(id);
if (it != async_tasks_.end()) {
it->second->result_json = result;
it->second->running = false;
}
}
void cleanup_old_async_tasks() {
std::lock_guard<std::mutex> lock(async_mutex_);
for (auto it = async_tasks_.begin(); it != async_tasks_.end(); ) {
if (!it->second->running && it->second->result_read) {
if (it->second->worker.joinable()) it->second->worker.join();
it = async_tasks_.erase(it);
} else {
++it;
}
}
}
// init_database() — Open SQLite DB at the absolute path db_path_, verify it
// is writable (not read-only), disable extension loading for security, and
// create all schema tables + run migrations. Called once from constructor.
bool init_database() {
int rc = sqlite3_open(db_path_.c_str(), &db_);
if (rc != SQLITE_OK) {
std::cerr << "Cannot open database: " << sqlite3_errmsg(db_) << "\n";
db_ = nullptr;
return false;
}
if (sqlite3_db_readonly(db_, "main") == 1) {
std::cerr << "Fatal: database is read-only: " << db_path_ << "\n";
sqlite3_close(db_);
db_ = nullptr;
return false;
}
int cfg_rc = sqlite3_db_config(db_, SQLITE_DBCONFIG_ENABLE_LOAD_EXTENSION, 0, nullptr);
if (cfg_rc != SQLITE_OK) {
std::cerr << "Warning: could not disable SQLite extension loading (rc="
<< cfg_rc << ")\n";
}
const char* schema = R"(
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
service TEXT NOT NULL,
call_id INTEGER,
level TEXT,
message TEXT
);
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_logs_service ON logs(service);
CREATE INDEX IF NOT EXISTS idx_logs_service_ts ON logs(service, timestamp);
CREATE TABLE IF NOT EXISTS test_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_name TEXT NOT NULL,
start_time INTEGER,
end_time INTEGER,
exit_code INTEGER,
arguments TEXT,
log_file TEXT
);
CREATE TABLE IF NOT EXISTS service_status (
service TEXT PRIMARY KEY,
status TEXT,
last_seen INTEGER,
call_count INTEGER,
ports TEXT
);
CREATE TABLE IF NOT EXISTS service_config (
service TEXT PRIMARY KEY,
binary_path TEXT NOT NULL,
default_args TEXT DEFAULT '',
description TEXT DEFAULT '',
auto_start INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
);
CREATE TABLE IF NOT EXISTS testfiles (
name TEXT PRIMARY KEY,
size_bytes INTEGER,
duration_sec REAL,
sample_rate INTEGER,
channels INTEGER,
ground_truth TEXT,
last_modified INTEGER
);
CREATE TABLE IF NOT EXISTS whisper_accuracy_tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_run_id INTEGER,
file_name TEXT,
model_name TEXT,
ground_truth TEXT,
transcription TEXT,
similarity_percent REAL,
latency_ms INTEGER,
status TEXT,
timestamp INTEGER
);
CREATE TABLE IF NOT EXISTS iap_quality_tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_name TEXT,
latency_ms REAL,
snr_db REAL,
rms_error_pct REAL,
max_latency_ms REAL,
status TEXT,
timestamp INTEGER
);
CREATE TABLE IF NOT EXISTS models (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT,
name TEXT,
path TEXT,
backend TEXT,
size_mb INTEGER,
config_json TEXT,
added_timestamp INTEGER
);
CREATE TABLE IF NOT EXISTS model_benchmark_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
model_id INTEGER,
model_name TEXT,
model_type TEXT DEFAULT 'whisper',
backend TEXT,
test_files TEXT,
iterations INTEGER,
files_tested INTEGER,
avg_accuracy REAL,
avg_latency_ms INTEGER,
p50_latency_ms INTEGER,
p95_latency_ms INTEGER,
p99_latency_ms INTEGER,
memory_mb INTEGER,
pass_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
avg_tokens REAL,
interrupt_latency_ms REAL,
german_pct REAL,
timestamp INTEGER,
FOREIGN KEY(model_id) REFERENCES models(id)
);
CREATE TABLE IF NOT EXISTS tts_validation_tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
line1_call_id INTEGER,
line2_call_id INTEGER,
original_text TEXT,
tts_transcription TEXT,
similarity_percent REAL,
phoneme_errors TEXT,
timestamp INTEGER
);
CREATE TABLE IF NOT EXISTS sip_lines (
line_id INTEGER PRIMARY KEY,
username TEXT,
password TEXT,
server TEXT,
port INTEGER,
status TEXT,
last_registered INTEGER
);
CREATE TABLE IF NOT EXISTS service_test_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT,
test_type TEXT,
status TEXT,
metrics_json TEXT,
timestamp INTEGER
);
CREATE TABLE IF NOT EXISTS test_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_name TEXT,
service TEXT,
status TEXT,
details TEXT,
timestamp INTEGER
);
)";
char* errmsg = nullptr;
rc = sqlite3_exec(db_, schema, nullptr, nullptr, &errmsg);
if (rc != SQLITE_OK) {
std::cerr << "SQL error: " << errmsg << "\n";
sqlite3_free(errmsg);
}
const char* migrations[] = {
"ALTER TABLE model_benchmark_runs ADD COLUMN model_name TEXT",
"ALTER TABLE model_benchmark_runs ADD COLUMN model_type TEXT DEFAULT 'whisper'",
"ALTER TABLE model_benchmark_runs ADD COLUMN backend TEXT",
"ALTER TABLE model_benchmark_runs ADD COLUMN files_tested INTEGER",
"ALTER TABLE model_benchmark_runs ADD COLUMN pass_count INTEGER DEFAULT 0",
"ALTER TABLE model_benchmark_runs ADD COLUMN fail_count INTEGER DEFAULT 0",
"ALTER TABLE model_benchmark_runs ADD COLUMN avg_tokens REAL",
"ALTER TABLE model_benchmark_runs ADD COLUMN interrupt_latency_ms REAL",
"ALTER TABLE model_benchmark_runs ADD COLUMN german_pct REAL",
"ALTER TABLE iap_quality_tests ADD COLUMN rms_error_pct REAL",
"ALTER TABLE iap_quality_tests ADD COLUMN max_latency_ms REAL",
"ALTER TABLE iap_quality_tests DROP COLUMN thd_percent",
nullptr
};
for (int i = 0; migrations[i]; i++) {
sqlite3_exec(db_, migrations[i], nullptr, nullptr, nullptr);
}
const char* seed = R"(
INSERT OR IGNORE INTO service_config (service, binary_path, default_args, description) VALUES
('SIP_CLIENT', 'bin/sip-client', '', 'SIP client / RTP gateway'),
('INBOUND_AUDIO_PROCESSOR', 'bin/inbound-audio-processor', '', 'G.711 decode + 8kHz to 16kHz resample'),
('VAD_SERVICE', 'bin/vad-service', '', 'Voice Activity Detection + speech segmentation'),
('WHISPER_SERVICE', 'bin/whisper-service', '--language de --model bin/models/ggml-large-v3-turbo-q5_0.bin', 'Whisper ASR (Metal)'),
('LLAMA_SERVICE', 'bin/llama-service', '', 'LLaMA 3.2-1B response generation'),
('KOKORO_SERVICE', 'bin/kokoro-service', '', 'Kokoro TTS (CoreML)'),
('NEUTTS_SERVICE', 'bin/neutts-service', '', 'NeuTTS Nano German TTS (CoreML)'),
('OUTBOUND_AUDIO_PROCESSOR', 'bin/outbound-audio-processor', '', 'TTS audio to G.711 encode + RTP'),
('TEST_SIP_PROVIDER', 'bin/test_sip_provider', '--port 5060 --http-port 22011 --testfiles-dir Testfiles', 'SIP B2BUA test provider for audio injection');
UPDATE service_config SET default_args='--language de --model bin/models/ggml-large-v3-turbo-q5_0.bin', description='Whisper ASR (Metal)' WHERE service='WHISPER_SERVICE' AND default_args LIKE '%models/ggml%' AND default_args NOT LIKE '%bin/models%';
UPDATE service_config SET default_args='' WHERE service='SIP_CLIENT' AND (default_args='--lines 1 alice 127.0.0.1 5060' OR default_args='--lines 2 alice 127.0.0.1 5060');
)";
sqlite3_exec(db_, seed, nullptr, nullptr, nullptr);
rotate_logs();
return true;
}
void discover_tests() {
std::lock_guard<std::mutex> lock(tests_mutex_);
std::vector<std::pair<std::string, std::string>> test_files = {
{"test_sanity", "bin/test_sanity"},
{"test_interconnect", "bin/test_interconnect"},
{"test_sip_provider_unit", "bin/test_sip_provider_unit"},
{"test_kokoro_cpp", "bin/test_kokoro_cpp"},
{"test_integration", "bin/test_integration"},
{"test_sip_provider", "bin/test_sip_provider"},
};
for (const auto& [name, path] : test_files) {
struct stat st;
if (stat(path.c_str(), &st) == 0 && (st.st_mode & S_IXUSR)) {
TestInfo info;
info.name = name;
info.binary_path = path;
info.is_running = false;
info.pid = 0;
info.start_time = 0;
info.end_time = 0;
info.exit_code = -1;
if (name == "test_integration") {
info.description = "Full pipeline integration test with real services";
} else if (name == "test_interconnect") {
info.description = "Interconnect protocol tests (master/slave, heartbeat, crash recovery)";
} else if (name == "test_sip_provider") {
info.description = "SIP B2BUA test provider";
info.default_args = {"--port", "5060", "--http-port", std::to_string(TEST_SIP_PROVIDER_PORT), "--testfiles-dir", "Testfiles"};
} else if (name == "test_kokoro_cpp") {
info.description = "Kokoro TTS C++ tests (phonemization, CoreML inference)";
} else {
info.description = "Unit test: " + name;
}
tests_.push_back(info);
}
}
std::cout << "Discovered " << tests_.size() << " tests\n";
}
void check_test_status() {
std::lock_guard<std::mutex> lock(tests_mutex_);
for (auto& test : tests_) {
if (test.is_running && test.pid > 0) {
int status;
pid_t result = waitpid(test.pid, &status, WNOHANG);
if (result == test.pid) {
test.is_running = false;
test.end_time = time(nullptr);
if (WIFEXITED(status)) {
test.exit_code = WEXITSTATUS(status);
} else {
test.exit_code = -1;
}
save_test_run(test);
}
}
}
}
void load_services() {
std::lock_guard<std::mutex> lock(services_mutex_);
if (!db_) return;
sqlite3_stmt* stmt;
const char* sql = "SELECT service, binary_path, default_args, description FROM service_config";
if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) return;
while (sqlite3_step(stmt) == SQLITE_ROW) {
ServiceInfo svc;
svc.name = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0));
svc.binary_path = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
const char* args = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 2));
svc.default_args = args ? args : "";
const char* desc = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 3));
svc.description = desc ? desc : "";
svc.managed = false;
svc.pid = 0;
svc.start_time = 0;
services_.push_back(svc);
}
sqlite3_finalize(stmt);
std::cout << "Loaded " << services_.size() << " service configs\n";
}
void check_service_status() {
std::lock_guard<std::mutex> lock(services_mutex_);
for (auto& svc : services_) {
if (svc.managed && svc.pid > 0) {
int status;
pid_t result = waitpid(svc.pid, &status, WNOHANG);
if (result == svc.pid) {
svc.managed = false;
svc.pid = 0;
}
}
}
}
static std::vector<std::string> split_args(const std::string& s) {
std::vector<std::string> result;
std::istringstream iss(s);
std::string token;
while (iss >> token) {
result.push_back(token);
}
return result;
}
static bool is_allowed_binary(const std::string& path) {
if (path.empty()) return false;
if (path.find("..") != std::string::npos) return false;
if (path[0] == '/') return false;
if (path.substr(0, 4) != "bin/") return false;
struct stat st;
if (stat(path.c_str(), &st) != 0) return false;
if (!S_ISREG(st.st_mode)) return false;
if (!(st.st_mode & S_IXUSR)) return false;
return true;
}
// kill_ghost_processes() — SIGTERM then SIGKILL any existing processes matching
// binary_name. Input sanitized: only bare names matching [a-zA-Z0-9_.-] are
// accepted (no paths, no shell metacharacters) to prevent popen() command injection.
void kill_ghost_processes(const std::string& binary_name) {
static const std::regex valid_name("^[a-zA-Z0-9_.-]+$");
if (!std::regex_match(binary_name, valid_name)) return;
std::string escaped_name;
for (char ch : binary_name) {
if (ch == '.') escaped_name += "[.]";
else escaped_name += ch;
}
std::string cmd = "pgrep -f '" + escaped_name + "' 2>/dev/null";
FILE* fp = popen(cmd.c_str(), "r");
if (!fp) return;
char buf[64];
std::vector<pid_t> pids;
while (fgets(buf, sizeof(buf), fp)) {
pid_t p = atoi(buf);
if (p > 0 && p != getpid()) pids.push_back(p);
}
pclose(fp);
for (pid_t p : pids) {
std::cerr << "Killing ghost process " << p << " for " << binary_name << "\n";
kill(p, SIGTERM);
}
if (!pids.empty()) usleep(SIGTERM_GRACE_US);
for (pid_t p : pids) {
if (kill(p, 0) == 0) {
kill(p, SIGKILL);
waitpid(p, nullptr, WNOHANG);
}
}
}
bool start_service(const std::string& name, const std::string& args_override) {
std::lock_guard<std::mutex> lock(services_mutex_);
for (auto& svc : services_) {
if (svc.name != name) continue;
if (svc.managed && svc.pid > 0) return false;
{
std::string bin_name = svc.binary_path;
size_t slash = bin_name.rfind('/');
if (slash != std::string::npos) bin_name = bin_name.substr(slash + 1);
kill_ghost_processes(bin_name);
usleep(SERVICE_STARTUP_WAIT_US);
}
if (!is_allowed_binary(svc.binary_path)) return false;
std::string use_args = args_override.empty() ? svc.default_args : args_override;
if (name == "VAD_SERVICE" && args_override.empty()) {
std::string vad_w = get_setting("vad_window_ms", "");
std::string vad_t = get_setting("vad_threshold", "");
std::string vad_s = get_setting("vad_silence_ms", "");
std::string vad_c = get_setting("vad_max_chunk_ms", "");
std::string vad_g = get_setting("vad_onset_gap", "");
if (!vad_w.empty()) use_args += " --vad-window-ms " + vad_w;
if (!vad_t.empty()) use_args += " --vad-threshold " + vad_t;
if (!vad_s.empty()) use_args += " --vad-silence-ms " + vad_s;
if (!vad_c.empty()) use_args += " --vad-max-chunk-ms " + vad_c;
if (!vad_g.empty()) use_args += " --vad-onset-gap " + vad_g;
}
if (args_override.empty()) {
std::string ll_key = "log_level_" + name;
std::string ll = get_setting(ll_key, "");
if (!ll.empty()) use_args += " --log-level " + ll;
}
auto argv_strings = split_args(use_args);
mkdir("logs", 0755);
svc.log_file = "logs/" + name + ".log";
pid_t pid = fork();
if (pid < 0) {
std::cerr << "fork() failed for service " << name << ": " << strerror(errno) << "\n";
return false;
}
if (pid == 0) {
for (int i = 3; i < 1024; ++i) close(i);
int fd = open(svc.log_file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd >= 0) {
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
close(fd);
}
std::vector<char*> argv;
argv.push_back(const_cast<char*>(svc.binary_path.c_str()));
for (auto& a : argv_strings) {
argv.push_back(const_cast<char*>(a.c_str()));
}
argv.push_back(nullptr);
execv(svc.binary_path.c_str(), argv.data());
_exit(1);
}
svc.managed = true;
svc.pid = pid;
svc.start_time = time(nullptr);
if (!args_override.empty()) {
svc.default_args = args_override;
save_service_config(name, args_override);
}
return true;
}
return false;
}
bool stop_service(const std::string& name) {
std::lock_guard<std::mutex> lock(services_mutex_);
for (auto& svc : services_) {