-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
387 lines (326 loc) · 10.3 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
package main
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"os"
"os/exec"
"sync"
"time"
)
func getenvWithDefault(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
// SignedURLResponse represents the JSON response for obtaining a signed URL
type SignedURLResponse struct {
UploadURL string `json:"upload_url"`
Filename string `json:"filename"`
AppBucketId int `json:"app_bucket_id"`
IsHealthy bool `json:"is_healthy"`
}
type HealthCheckResponse struct {
IsHealthy bool `json:"is_healthy"`
}
var (
httpClient *http.Client
logCounter int
counterMu sync.Mutex // Mutex to protect access to logCounter
)
func init() {
// Initialize logCounter with the current Unix time
logCounter = int(time.Now().Unix())
// Create a transport with connection pooling
transport := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
}
// Create a client with the transport
httpClient = &http.Client{
Transport: transport,
}
}
func main() {
if err := updateJobStatus("PROCESSING"); err != nil {
fmt.Println("Error updating status to PROCESSING:", err)
os.Exit(1)
}
if len(os.Args) < 2 {
fmt.Println("Usage: go run main.go <command>")
os.Exit(1)
}
// Get the command-line argument
command := os.Args[1]
// Execute the command with /bin/sh
cmd := exec.Command("/bin/sh", "-c", command)
// Create pipes for stdout and stderr
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
fmt.Println("Error creating stdout pipe:", err)
os.Exit(1)
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
fmt.Println("Error creating stderr pipe:", err)
os.Exit(1)
}
// Start the command
if err := cmd.Start(); err != nil {
fmt.Println("Error starting command:", err)
os.Exit(1)
}
// Create scanners for stdout and stderr
stdoutScanner := bufio.NewScanner(stdoutPipe)
stderrScanner := bufio.NewScanner(stderrPipe)
var wg sync.WaitGroup
// Start goroutines to read stdout and stderr and send chunks over HTTP
wg.Add(1)
go func() {
defer wg.Done()
sendChunks(stdoutScanner)
}()
wg.Add(1)
go func() {
defer wg.Done()
sendChunks(stderrScanner)
}()
// Wait for the goroutines to finish
wg.Wait()
// Wait for the command to finish
if err := cmd.Wait(); err != nil {
if err := updateJobStatus("ERROR"); err != nil {
fmt.Println("Error updating status to ERROR:", err)
os.Exit(1)
}
fmt.Println("Error waiting for command:", err)
os.Exit(1)
}
if err := updateJobStatus("DONE"); err != nil {
fmt.Println("Error updating status to DONE:", err)
os.Exit(1)
}
}
func sendChunks(scanner *bufio.Scanner) {
var lines []byte
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if len(lines) > 0 {
counterMu.Lock()
logFilename := fmt.Sprintf("wkube%d", logCounter)
logCounter++
counterMu.Unlock()
if err := sendBatch(lines, logFilename); err != nil {
fmt.Println("Error sending batch:", err)
os.Exit(1)
}
lines = lines[:0]
} else {
if err := checkHealth(); err != nil {
fmt.Println("Error checking health", err)
os.Exit(1)
}
}
default:
if scanner.Scan() {
chunk := scanner.Bytes()
lines = append(lines, chunk...)
lines = append(lines, '\n')
} else {
if err := scanner.Err(); err != nil {
fmt.Println("Scanner error:", err)
}
// Send any remaining lines when the scanner is done
if len(lines) > 0 {
counterMu.Lock()
logFilename := fmt.Sprintf("wkube%d", logCounter)
logCounter++
counterMu.Unlock()
if err := sendBatch(lines, logFilename); err != nil {
fmt.Println("Error sending batch:", err)
os.Exit(1)
}
} else {
if err := checkHealth(); err != nil {
fmt.Println("Error checking health", err)
os.Exit(1)
}
}
return
}
}
}
}
type StatusEventDataType struct {
NewStatus string `json:"new_status"`
}
// NestedData represents the nested data structure
type UpdateStatusEventPostData struct {
Type string `json:"type"`
StatusEventData StatusEventDataType `json:"data"`
}
func updateJobStatus(newStatus string) error {
gatewayServer := getenvWithDefault(
"ACC_JOB_GATEWAY_SERVER",
"https://accelerator-api.iiasa.ac.at",
)
authToken := os.Getenv("ACC_JOB_TOKEN")
if authToken == "" {
fmt.Println("AUTH_TOKEN environment variable not set")
os.Exit(1)
}
statusEventData := StatusEventDataType{
NewStatus: newStatus,
}
eventData := UpdateStatusEventPostData{
Type: "STATUS_UPDATE",
StatusEventData: statusEventData,
}
// Marshal the struct into JSON bytes
jsonEventData, err := json.Marshal(eventData)
if err != nil {
return fmt.Errorf("error marshaling event data json request: %v", err)
}
// Define the URL for the HTTP POST request
url := fmt.Sprintf("%s/v1/ajob-cli/webhook-event/", gatewayServer)
statusUpdateReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonEventData))
if err != nil {
return fmt.Errorf("error creating status update HTTP request: %v", err)
}
statusUpdateReq.Header.Set("Content-Type", "application/json")
// Add custom header
statusUpdateReq.Header.Set("X-Authorization", authToken)
// Send HTTP POST request with nested JSON payload
resp, err := httpClient.Do(statusUpdateReq)
if err != nil {
fmt.Println("Error sending status update HTTP request:", err)
os.Exit(1)
}
defer resp.Body.Close()
// Check the response status code
if resp.StatusCode != http.StatusOK {
fmt.Println("Unexpected status update response status code:", resp.StatusCode)
os.Exit(1)
}
// Optionally, you can process the response body here
return nil
}
func sendBatch(lines []byte, logFilename string) error {
gatewayServer := getenvWithDefault(
"ACC_JOB_GATEWAY_SERVER",
"https://accelerator-api.iiasa.ac.at",
)
authToken := os.Getenv("ACC_JOB_TOKEN")
if authToken == "" {
fmt.Println("AUTH_TOKEN environment variable not set")
os.Exit(1)
}
// Connect to the remote server to get signed URL
remoteServer := fmt.Sprintf("%s/v1/ajob-cli/presigned-log-upload-url/?filename=%s.log", gatewayServer, logFilename)
req, err := http.NewRequest("GET", remoteServer, nil)
if err != nil {
return fmt.Errorf("error creating HTTP request: %v", err)
}
// Set authorization token in request header
req.Header.Set("X-Authorization", authToken)
// First, make a GET request to obtain a signed URL
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("error getting signed URL: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response code while getting signed URL: %d", resp.StatusCode)
}
var signedURLResponse SignedURLResponse
if err := json.NewDecoder(resp.Body).Decode(&signedURLResponse); err != nil {
return fmt.Errorf("error decoding signed URL response: %v", err)
}
// Now, PUT the chunk to the signed URL
putReq, err := http.NewRequest("PUT", signedURLResponse.UploadURL, bytes.NewReader(lines))
if err != nil {
return fmt.Errorf("error creating PUT request for chunk: %v", err)
}
putResp, err := httpClient.Do(putReq)
if err != nil {
return fmt.Errorf("error sending batch over HTTP: %v", err)
}
defer putResp.Body.Close()
if putResp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response code while sending chunk: %d", putResp.StatusCode)
}
// Finally, make a POST request to register the chunk with the bucket ID in the request body
postURL := fmt.Sprintf("%s/v1/ajob-cli/register-log-file/", gatewayServer)
postData := map[string]interface{}{"filename": signedURLResponse.Filename, "app_bucket_id": signedURLResponse.AppBucketId}
postDataBytes, err := json.Marshal(postData)
if err != nil {
return fmt.Errorf("error encoding post data: %v", err)
}
postReq, err := http.NewRequest("POST", postURL, bytes.NewReader(postDataBytes))
if err != nil {
return fmt.Errorf("error creating POST request to register chunk: %v", err)
}
postReq.Header.Set("X-Authorization", authToken)
postReq.Header.Set("Content-Type", "application/json")
postResp, err := httpClient.Do(postReq)
if err != nil {
return fmt.Errorf("error sending POST request to register chunk: %v", err)
}
defer postResp.Body.Close()
if postResp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response code while registering chunk: %d", postResp.StatusCode)
}
if !signedURLResponse.IsHealthy {
fmt.Println("Process is not healthy. Exiting...")
os.Exit(1) // Exit the program with a non-zero status code
}
return nil
}
func checkHealth() error {
gatewayServer := getenvWithDefault(
"ACC_JOB_GATEWAY_SERVER",
"https://accelerator-api.iiasa.ac.at",
)
authToken := os.Getenv("ACC_JOB_TOKEN")
if authToken == "" {
fmt.Println("AUTH_TOKEN environment variable not set")
os.Exit(1)
}
// Connect to the remote server to get signed URL
remoteServer := fmt.Sprintf("%s/v1/ajob-cli/is-healthy/", gatewayServer)
req, err := http.NewRequest("GET", remoteServer, nil)
if err != nil {
return fmt.Errorf("error creating HTTP request: %v", err)
}
// Set authorization token in request header
req.Header.Set("X-Authorization", authToken)
// First, make a GET request to obtain a signed URL
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("error checking health: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected response code while checking health: %d", resp.StatusCode)
}
var healthCheckResponse HealthCheckResponse
if err := json.NewDecoder(resp.Body).Decode(&healthCheckResponse); err != nil {
return fmt.Errorf("error decoding signed URL response: %v", err)
}
if !healthCheckResponse.IsHealthy {
fmt.Println("Process is not healthy. Exiting...")
os.Exit(1) // Exit the program with a non-zero status code
}
return nil
}