@@ -31,11 +31,11 @@ import (
31
31
32
32
"github.com/paypal/dce-go/config"
33
33
"github.com/paypal/dce-go/dce/monitor"
34
+ _ "github.com/paypal/dce-go/dce/monitor/plugin/default"
34
35
"github.com/paypal/dce-go/plugin"
35
36
_ "github.com/paypal/dce-go/pluginimpl/example"
36
37
_ "github.com/paypal/dce-go/pluginimpl/general"
37
38
"github.com/paypal/dce-go/types"
38
- "github.com/paypal/dce-go/utils"
39
39
fileUtils "github.com/paypal/dce-go/utils/file"
40
40
"github.com/paypal/dce-go/utils/pod"
41
41
"github.com/paypal/dce-go/utils/wait"
@@ -71,7 +71,8 @@ func (exec *dockerComposeExecutor) Disconnected(exec.ExecutorDriver) {
71
71
}
72
72
73
73
func (exec * dockerComposeExecutor ) LaunchTask (driver exec.ExecutorDriver , taskInfo * mesos.TaskInfo ) {
74
- log .SetOutput (config .CreateFileAppendMode (types .DCE_OUT ))
74
+ ctx := context .Background ()
75
+ initlogger ()
75
76
appStartTime := time .Now ()
76
77
77
78
log .Println ("====================Mesos LaunchTask====================" )
@@ -104,7 +105,7 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
104
105
pod .SetPodStatus (types .POD_STARTING )
105
106
106
107
// Update mesos state TO STARTING
107
- pod .SendMesosStatus (driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_STARTING .Enum ())
108
+ pod .SendMesosStatus (ctx , driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_STARTING .Enum ())
108
109
109
110
// Get required compose file list
110
111
pod .ComposeFiles , _ = fileUtils .GetFiles (taskInfo )
@@ -120,13 +121,11 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
120
121
121
122
// Create context with timeout
122
123
// Wait for pod launching until timeout
123
- var ctx context.Context
124
124
125
125
var cancel context.CancelFunc
126
- ctx = context .Background ()
127
126
ctx , cancel = context .WithTimeout (ctx , config .GetLaunchTimeout ())
128
127
129
- go pod .WaitOnPod (& ctx )
128
+ go pod .WaitOnPod (ctx )
130
129
131
130
// Get order of plugins from config or mesos labels
132
131
pluginOrder , err := fileUtils .GetPluginOrder (taskInfo )
@@ -141,50 +140,50 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
141
140
extpoints = plugin .GetOrderedExtpoints (pluginOrder )
142
141
143
142
// Executing LaunchTaskPreImagePull in order
144
- if _ , err := utils .PluginPanicHandler (utils .ConditionFunc (func () (string , error ) {
143
+ if _ , err := pod .PluginPanicHandler (pod .ConditionFunc (func () (string , error ) {
145
144
for i , ext := range extpoints {
146
145
147
146
if ext == nil {
148
147
logger .Errorln ("Error getting plugins from plugin registration pools" )
149
148
return "" , errors .New ("plugin is nil" )
150
149
}
151
150
granularMetricStepName := fmt .Sprintf ("%s_LaunchTaskPreImagePull" , ext .Name ())
152
- utils . SetStepData (pod .StepMetrics , time . Now (). Unix (), 0 , granularMetricStepName , "Starting" )
151
+ pod . StartStep (pod .StepMetrics , granularMetricStepName )
153
152
154
- err = ext .LaunchTaskPreImagePull (& ctx , & pod .ComposeFiles , executorId , taskInfo )
153
+ err = ext .LaunchTaskPreImagePull (ctx , & pod .ComposeFiles , executorId , taskInfo )
155
154
if err != nil {
156
155
logger .Errorf ("Error executing LaunchTaskPreImagePull of plugin : %v" , err )
157
- utils . SetStepData (pod .StepMetrics , 0 , time . Now (). Unix (), granularMetricStepName , "Error" )
156
+ pod . EndStep (pod .StepMetrics , granularMetricStepName , nil , err )
158
157
return "" , err
159
158
}
160
- utils . SetStepData (pod .StepMetrics , 0 , time . Now (). Unix (), granularMetricStepName , "Success" )
159
+ pod . EndStep (pod .StepMetrics , granularMetricStepName , nil , nil )
161
160
162
161
if config .EnableComposeTrace () {
163
- fileUtils .DumpPluginModifiedComposeFiles (ctx , pluginOrder [i ], "LaunchTaskPreImagePull" , i )
162
+ fileUtils .DumpPluginModifiedComposeFiles (pluginOrder [i ], "LaunchTaskPreImagePull" , i )
164
163
}
165
164
}
166
165
return "" , err
167
166
})); err != nil {
168
167
logger .Errorf ("error while executing task pre image pull: %s" , err )
169
168
pod .SetPodStatus (types .POD_FAILED )
170
169
cancel ()
171
- pod .SendMesosStatus (driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
170
+ pod .SendMesosStatus (ctx , driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
172
171
return
173
172
}
174
173
// Write updated compose files into pod folder
175
- err = fileUtils .WriteChangeToFiles (ctx )
174
+ err = fileUtils .WriteChangeToFiles ()
176
175
if err != nil {
177
176
logger .Errorf ("Failure writing updated compose files : %v" , err )
178
177
pod .SetPodStatus (types .POD_FAILED )
179
178
cancel ()
180
- pod .SendMesosStatus (driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
179
+ pod .SendMesosStatus (ctx , driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
181
180
}
182
181
183
182
//Validate Compose files
184
183
if err := validateComposeFiles (); err != nil {
185
184
pod .SetPodStatus (types .POD_COMPOSE_CHECK_FAILED )
186
185
cancel ()
187
- pod .SendMesosStatus (driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
186
+ pod .SendMesosStatus (ctx , driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
188
187
return
189
188
}
190
189
@@ -193,98 +192,92 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
193
192
if err != nil {
194
193
pod .SetPodStatus (types .POD_PULL_FAILED )
195
194
cancel ()
196
- pod .SendMesosStatus (driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
195
+ pod .SendMesosStatus (ctx , driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
197
196
return
198
197
}
199
198
200
199
timeElapsed := time .Since (appStartTime )
201
200
logger .Printf ("Time elapsed since App launch: %.3fs" , timeElapsed .Seconds ())
202
201
203
202
// Executing LaunchTaskPostImagePull in order
204
- if _ , err := utils .PluginPanicHandler (utils .ConditionFunc (func () (string , error ) {
203
+ if _ , err := pod .PluginPanicHandler (pod .ConditionFunc (func () (string , error ) {
205
204
for i , ext := range extpoints {
206
205
if ext == nil {
207
206
logger .Errorln ("Error getting plugins from plugin registration pools" )
208
207
return "" , errors .New ("plugin is nil" )
209
208
}
210
209
granularMetricStepName := fmt .Sprintf ("%s_LaunchTaskPostImagePull" , ext .Name ())
211
- utils . SetStepData (pod .StepMetrics , time . Now (). Unix (), 0 , granularMetricStepName , "Starting" )
210
+ pod . StartStep (pod .StepMetrics , granularMetricStepName )
212
211
213
- err = ext .LaunchTaskPostImagePull (& ctx , & pod .ComposeFiles , executorId , taskInfo )
212
+ err = ext .LaunchTaskPostImagePull (ctx , & pod .ComposeFiles , executorId , taskInfo )
214
213
if err != nil {
215
214
logger .Errorf ("Error executing LaunchTaskPreImagePull of plugin : %v" , err )
216
- utils . SetStepData (pod .StepMetrics , 0 , time . Now (). Unix (), granularMetricStepName , "Error" )
215
+ pod . EndStep (pod .StepMetrics , granularMetricStepName , nil , err )
217
216
return "" , err
218
217
}
219
218
220
- utils . SetStepData (pod .StepMetrics , 0 , time . Now (). Unix (), granularMetricStepName , "Success" )
219
+ pod . EndStep (pod .StepMetrics , granularMetricStepName , nil , nil )
221
220
222
221
if config .EnableComposeTrace () {
223
- fileUtils .DumpPluginModifiedComposeFiles (ctx , pluginOrder [i ], "LaunchTaskPostImagePull" , i )
222
+ fileUtils .DumpPluginModifiedComposeFiles (pluginOrder [i ], "LaunchTaskPostImagePull" , i )
224
223
}
225
224
}
226
225
return "" , err
227
226
})); err != nil {
228
227
pod .SetPodStatus (types .POD_FAILED )
229
228
cancel ()
230
- pod .SendMesosStatus (driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
229
+ pod .SendMesosStatus (ctx , driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
231
230
return
232
231
}
233
232
234
233
// Service list from all compose files
235
- podServices := getServices (ctx )
234
+ podServices := getServices ()
236
235
logger .Printf ("pod service list: %v" , podServices )
237
236
238
237
// Write updated compose files into pod folder
239
- err = fileUtils .WriteChangeToFiles (ctx )
238
+ err = fileUtils .WriteChangeToFiles ()
240
239
if err != nil {
241
240
logger .Errorf ("Failure writing updated compose files : %v" , err )
242
241
pod .SetPodStatus (types .POD_FAILED )
243
242
cancel ()
244
- pod .SendMesosStatus (driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
243
+ pod .SendMesosStatus (ctx , driver , taskInfo .GetTaskId (), mesos .TaskState_TASK_FAILED .Enum ())
245
244
}
246
245
247
- utils . SetStepData (pod .StepMetrics , time . Now (). Unix (), 0 , "Launch_Pod" , "Starting " )
246
+ pod . StartStep (pod .StepMetrics , "Launch_Pod" )
248
247
249
248
// Launch pod
250
249
replyPodStatus , err := pod .LaunchPod (pod .ComposeFiles )
251
- if err != nil {
252
- utils .SetStepData (pod .StepMetrics , 0 , time .Now ().Unix (), "Launch_Pod" , "Error" )
253
- } else {
254
- utils .SetStepData (pod .StepMetrics , 0 , time .Now ().Unix (), "Launch_Pod" , "Success" )
255
- }
250
+ pod .EndStep (pod .StepMetrics , "Launch_Pod" , nil , err )
256
251
257
252
logger .Printf ("Pod status returned by LaunchPod : %s" , replyPodStatus .String ())
258
253
259
254
// Take an action depends on different status
260
255
switch replyPodStatus {
261
256
case types .POD_FAILED :
262
257
cancel ()
263
- pod .SendPodStatus (types .POD_FAILED )
258
+ pod .SendPodStatus (ctx , types .POD_FAILED )
264
259
265
260
case types .POD_STARTING :
266
261
// Initial health check
267
262
res , err := initHealthCheck (podServices )
268
263
if err != nil || res == types .POD_FAILED {
269
264
cancel ()
270
- pod .SendPodStatus (types .POD_FAILED )
265
+ pod .SendPodStatus (ctx , types .POD_FAILED )
271
266
}
272
267
273
268
// Temp status keeps the pod status returned by PostLaunchTask
274
- tempStatus , err := utils .PluginPanicHandler (utils .ConditionFunc (func () (string , error ) {
269
+ tempStatus , err := pod .PluginPanicHandler (pod .ConditionFunc (func () (string , error ) {
275
270
var tempStatus string
276
271
for _ , ext := range extpoints {
277
272
logger .Println ("Executing post launch task plugin" )
278
273
279
274
granularMetricStepName := fmt .Sprintf ("%s_PostLaunchTask" , ext .Name ())
280
- utils . SetStepData (pod .StepMetrics , time . Now (). Unix (), 0 , granularMetricStepName , "Starting" )
275
+ pod . StartStep (pod .StepMetrics , granularMetricStepName )
281
276
282
- tempStatus , err = ext .PostLaunchTask (& ctx , pod .ComposeFiles , taskInfo )
277
+ tempStatus , err = ext .PostLaunchTask (ctx , pod .ComposeFiles , taskInfo )
278
+ pod .EndStep (pod .StepMetrics , granularMetricStepName , nil , err )
283
279
if err != nil {
284
280
logger .Errorf ("Error executing PostLaunchTask : %v" , err )
285
- utils .SetStepData (pod .StepMetrics , 0 , time .Now ().Unix (), granularMetricStepName , "Error" )
286
- } else {
287
- utils .SetStepData (pod .StepMetrics , 0 , time .Now ().Unix (), granularMetricStepName , "Success" )
288
281
}
289
282
290
283
logger .Printf ("Get pod status : %s returned by PostLaunchTask" , tempStatus )
@@ -300,20 +293,26 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
300
293
}
301
294
if tempStatus == types .POD_FAILED .String () {
302
295
cancel ()
303
- pod .SendPodStatus (types .POD_FAILED )
296
+ pod .SendPodStatus (ctx , types .POD_FAILED )
304
297
return
305
298
}
306
299
if res == types .POD_RUNNING {
307
300
cancel ()
308
301
if pod .GetPodStatus () != types .POD_RUNNING {
309
- pod .SendPodStatus (types .POD_RUNNING )
310
- go monitor .MonitorPoller ()
302
+ pod .SendPodStatus (ctx , types .POD_RUNNING )
303
+ go func () {
304
+ status , err := monitor .MonitorPoller (ctx )
305
+ if err != nil {
306
+ log .Errorf ("failure from monitor: %s" , err )
307
+ }
308
+ pod .SendPodStatus (ctx , status )
309
+ }()
311
310
}
312
311
}
313
312
//For adhoc job, send finished to mesos if job already finished during init health check
314
313
if res == types .POD_FINISHED {
315
314
cancel ()
316
- pod .SendPodStatus (types .POD_FINISHED )
315
+ pod .SendPodStatus (ctx , types .POD_FINISHED )
317
316
}
318
317
319
318
default :
@@ -324,6 +323,7 @@ func (exec *dockerComposeExecutor) LaunchTask(driver exec.ExecutorDriver, taskIn
324
323
}
325
324
326
325
func (exec * dockerComposeExecutor ) KillTask (driver exec.ExecutorDriver , taskId * mesos.TaskID ) {
326
+ ctx := context .Background ()
327
327
log .Println ("====================Mesos KillTask====================" )
328
328
329
329
defer func () {
@@ -345,12 +345,12 @@ func (exec *dockerComposeExecutor) KillTask(driver exec.ExecutorDriver, taskId *
345
345
logKill .Printf ("Mesos Kill Task : Current task status is %s , continue killTask" , status )
346
346
pod .SetPodStatus (types .POD_KILLED )
347
347
348
- err := pod .StopPod (pod .ComposeFiles )
348
+ err := pod .StopPod (ctx , pod .ComposeFiles )
349
349
if err != nil {
350
350
logKill .Errorf ("Error cleaning up pod : %v" , err .Error ())
351
351
}
352
352
353
- err = pod .SendMesosStatus (driver , taskId , mesos .TaskState_TASK_KILLED .Enum ())
353
+ err = pod .SendMesosStatus (ctx , driver , taskId , mesos .TaskState_TASK_KILLED .Enum ())
354
354
if err != nil {
355
355
logKill .Errorf ("Error during kill Task : %v" , err .Error ())
356
356
}
@@ -370,7 +370,7 @@ func (exec *dockerComposeExecutor) FrameworkMessage(driver exec.ExecutorDriver,
370
370
func (exec * dockerComposeExecutor ) Shutdown (driver exec.ExecutorDriver ) {
371
371
// Execute shutdown plugin extensions in order
372
372
for _ , ext := range extpoints {
373
- ext .Shutdown (pod .ComposeExecutorDriver )
373
+ ext .Shutdown (pod .ComposeTaskInfo , pod . ComposeExecutorDriver )
374
374
}
375
375
log .Println ("====================Stop ExecutorDriver====================" )
376
376
driver .Stop ()
@@ -395,18 +395,11 @@ func pullImage() error {
395
395
logger .Println ("====================Pulling Image====================" )
396
396
397
397
if ! config .SkipPullImages () {
398
- count := 0
399
398
err := wait .PollRetry (config .GetPullRetryCount (), config .GetPollInterval (), func () (string , error ) {
400
- utils . SetStepData (pod .StepMetrics , time . Now (). Unix (), 0 , fmt . Sprintf ( "Image_Pull_%v" , count ), "Starting " )
399
+ pod . StartStep (pod .StepMetrics , "Image_Pull " )
401
400
err := pod .PullImage (pod .ComposeFiles )
402
- if err != nil {
403
- utils .SetStepData (pod .StepMetrics , 0 , time .Now ().Unix (), fmt .Sprintf ("Image_Pull_%v" , count ), "Error" )
404
- } else {
405
- utils .SetStepData (pod .StepMetrics , 0 , time .Now ().Unix (), fmt .Sprintf ("Image_Pull_%v" , count ), "Success" )
406
- }
407
- count ++
401
+ pod .EndStep (pod .StepMetrics , "Image_Pull" , nil , err )
408
402
return "" , err
409
-
410
403
})
411
404
412
405
if err != nil {
@@ -420,19 +413,21 @@ func pullImage() error {
420
413
421
414
func initHealthCheck (podServices map [string ]bool ) (types.PodStatus , error ) {
422
415
res , err := wait .WaitUntil (config .GetLaunchTimeout (), func (healthCheckReply chan string ) {
416
+ // wait until timeout or receive any result from healthCheckReply
417
+ // healthCheckReply is to stored the pod status
423
418
pod .HealthCheck (pod .ComposeFiles , podServices , healthCheckReply )
424
419
})
425
420
426
421
if err != nil {
427
422
log .Printf ("POD_INIT_HEALTH_CHECK_TIMEOUT -- %v" , err )
428
423
return types .POD_FAILED , err
429
424
}
430
- return utils .ToPodStatus (res ), err
425
+ return pod .ToPodStatus (res ), err
431
426
}
432
427
433
- func getServices (ctx context.Context ) map [string ]bool {
428
+ func getServices () map [string ]bool {
429
+ filesMap := pod .GetServiceDetail ()
434
430
podService := make (map [string ]bool )
435
- filesMap := ctx .Value (types .SERVICE_DETAIL ).(types.ServiceDetail )
436
431
437
432
for _ , file := range pod .ComposeFiles {
438
433
servMap := filesMap [file ][types.SERVICES ].(map [interface {}]interface {})
@@ -507,3 +502,15 @@ func switchDebugMode() {
507
502
log .SetLevel (log .DebugLevel )
508
503
}
509
504
}
505
+
506
+ // redirect the output, and set the loglevel
507
+ func initlogger () {
508
+ log .SetOutput (config .CreateFileAppendMode (types .DCE_OUT ))
509
+ loglevel := config .GetConfig ().GetString (types .LOGLEVEL )
510
+ ll , err := log .ParseLevel (loglevel )
511
+ if err == nil {
512
+ log .SetLevel (ll )
513
+ } else {
514
+ log .SetLevel (log .InfoLevel )
515
+ }
516
+ }
0 commit comments