@@ -45,142 +45,19 @@ func NewPostgreSQLCollector(connectionString string) (*PostgreSQLCollector, erro
4545
4646// initSchema creates the necessary tables if they don't exist
4747func (p * PostgreSQLCollector ) initSchema () error {
48- queries := []string {
49- `CREATE TABLE IF NOT EXISTS connections (
50- id SERIAL PRIMARY KEY,
51- connection_uuid TEXT,
52- client_ip INET NOT NULL,
53- target_host TEXT NOT NULL,
54- target_port INTEGER NOT NULL,
55- protocol TEXT NOT NULL,
56- started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
57- ended_at TIMESTAMP WITH TIME ZONE,
58- bytes_sent BIGINT DEFAULT 0,
59- bytes_received BIGINT DEFAULT 0,
60- duration_ms INTEGER,
61- close_reason TEXT
62- )` ,
63- `CREATE INDEX IF NOT EXISTS idx_connections_client_ip ON connections(client_ip)` ,
64- `CREATE INDEX IF NOT EXISTS idx_connections_target_host ON connections(target_host)` ,
65- `CREATE INDEX IF NOT EXISTS idx_connections_started_at ON connections(started_at)` ,
66- `CREATE INDEX IF NOT EXISTS idx_connections_uuid ON connections(connection_uuid)` ,
67-
68- `CREATE TABLE IF NOT EXISTS http_requests (
69- id SERIAL PRIMARY KEY,
70- connection_id INTEGER NOT NULL REFERENCES connections(id) ON DELETE CASCADE,
71- method TEXT NOT NULL,
72- url TEXT NOT NULL,
73- host TEXT NOT NULL,
74- user_agent TEXT,
75- content_length BIGINT DEFAULT 0,
76- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
77- )` ,
78- `CREATE INDEX IF NOT EXISTS idx_http_requests_connection_id ON http_requests(connection_id)` ,
79- `CREATE INDEX IF NOT EXISTS idx_http_requests_timestamp ON http_requests(timestamp)` ,
80-
81- `CREATE TABLE IF NOT EXISTS http_responses (
82- id SERIAL PRIMARY KEY,
83- connection_id INTEGER NOT NULL REFERENCES connections(id) ON DELETE CASCADE,
84- request_id INTEGER REFERENCES http_requests(id) ON DELETE CASCADE,
85- status_code INTEGER NOT NULL,
86- content_length BIGINT DEFAULT 0,
87- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
88- )` ,
89- `CREATE INDEX IF NOT EXISTS idx_http_responses_connection_id ON http_responses(connection_id)` ,
90-
91- `CREATE TABLE IF NOT EXISTS errors (
92- id SERIAL PRIMARY KEY,
93- connection_id INTEGER REFERENCES connections(id) ON DELETE CASCADE,
94- error_type TEXT NOT NULL,
95- error_message TEXT NOT NULL,
96- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
97- )` ,
98- `CREATE INDEX IF NOT EXISTS idx_errors_timestamp ON errors(timestamp)` ,
99-
100- `CREATE TABLE IF NOT EXISTS security_events (
101- id SERIAL PRIMARY KEY,
102- client_ip INET NOT NULL,
103- target_host TEXT NOT NULL,
104- event_type TEXT NOT NULL,
105- reason TEXT,
106- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
107- )` ,
108- `CREATE INDEX IF NOT EXISTS idx_security_events_client_ip ON security_events(client_ip)` ,
109- `CREATE INDEX IF NOT EXISTS idx_security_events_timestamp ON security_events(timestamp)` ,
110-
111- `CREATE TABLE IF NOT EXISTS data_transfers (
112- id SERIAL PRIMARY KEY,
113- connection_id INTEGER NOT NULL REFERENCES connections(id) ON DELETE CASCADE,
114- bytes_sent BIGINT DEFAULT 0,
115- bytes_received BIGINT DEFAULT 0,
116- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
117- )` ,
118- `CREATE INDEX IF NOT EXISTS idx_data_transfers_connection_id ON data_transfers(connection_id)` ,
119-
120- `CREATE TABLE IF NOT EXISTS recorded_http_requests (
121- id SERIAL PRIMARY KEY,
122- connection_id INTEGER NOT NULL REFERENCES connections(id) ON DELETE CASCADE,
123- method TEXT NOT NULL,
124- url TEXT NOT NULL,
125- host TEXT NOT NULL,
126- user_agent TEXT,
127- request_headers JSONB, -- JSON encoded headers
128- request_body BYTEA,
129- request_body_size INTEGER DEFAULT 0,
130- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
131- )` ,
132- `CREATE INDEX IF NOT EXISTS idx_recorded_http_requests_connection_id ON recorded_http_requests(connection_id)` ,
133- `CREATE INDEX IF NOT EXISTS idx_recorded_http_requests_timestamp ON recorded_http_requests(timestamp)` ,
134-
135- `CREATE TABLE IF NOT EXISTS recorded_http_responses (
136- id SERIAL PRIMARY KEY,
137- connection_id INTEGER NOT NULL REFERENCES connections(id) ON DELETE CASCADE,
138- status_code INTEGER NOT NULL,
139- response_headers JSONB, -- JSON encoded headers
140- response_body BYTEA,
141- response_body_size INTEGER DEFAULT 0,
142- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
143- )` ,
144- `CREATE INDEX IF NOT EXISTS idx_recorded_http_responses_connection_id ON recorded_http_responses(connection_id)` ,
145- `CREATE INDEX IF NOT EXISTS idx_recorded_http_responses_timestamp ON recorded_http_responses(timestamp)` ,
146-
147- // New: request body parts table for streaming
148- `CREATE TABLE IF NOT EXISTS recorded_http_request_body_parts (
149- id SERIAL PRIMARY KEY,
150- request_id INTEGER NOT NULL REFERENCES recorded_http_requests(id) ON DELETE CASCADE,
151- seq_no INTEGER NOT NULL,
152- data BYTEA,
153- part_size INTEGER DEFAULT 0,
154- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
155- )` ,
156- `CREATE INDEX IF NOT EXISTS idx_req_body_parts_request_id ON recorded_http_request_body_parts(request_id)` ,
157- `CREATE INDEX IF NOT EXISTS idx_req_body_parts_req_seq ON recorded_http_request_body_parts(request_id, seq_no)` ,
158-
159- // New: response body parts table for streaming
160- `CREATE TABLE IF NOT EXISTS recorded_http_response_body_parts (
161- id SERIAL PRIMARY KEY,
162- response_id INTEGER NOT NULL REFERENCES recorded_http_responses(id) ON DELETE CASCADE,
163- seq_no INTEGER NOT NULL,
164- data BYTEA,
165- part_size INTEGER DEFAULT 0,
166- timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
167- )` ,
168- `CREATE INDEX IF NOT EXISTS idx_resp_body_parts_response_id ON recorded_http_response_body_parts(response_id)` ,
169- `CREATE INDEX IF NOT EXISTS idx_resp_body_parts_resp_seq ON recorded_http_response_body_parts(response_id, seq_no)` ,
170- }
171-
172- for _ , query := range queries {
173- if _ , err := p .db .Exec (query ); err != nil {
174- return fmt .Errorf ("failed to execute schema query: %w" , err )
175- }
176- }
48+ logger .Debug ("Initializing PostgreSQL schema using schema-driven approach" )
49+ initializer := NewSchemaInitializer (p .db , "postgres" )
17750
178- // Add migration for existing tables to add connection_uuid column
179- _ , _ = p .db .Exec (`ALTER TABLE connections ADD COLUMN connection_uuid TEXT` )
51+ if err := initializer .ValidateAndInitialize (); err != nil {
52+ return fmt .Errorf ("schema initialization failed: %w" , err )
53+ }
18054
55+ logger .Info ("Schema initialization completed using new schema-driven approach" )
18156 return nil
18257}
18358
59+
60+
18461// StartConnection records the start of a connection (legacy method for backward compatibility)
18562func (p * PostgreSQLCollector ) StartConnection (ctx context.Context , clientIP , targetHost string , targetPort int , protocol string ) (int64 , error ) {
18663 var id int64
0 commit comments