Skip to content

Commit c0abc8b

Browse files
authored
Merge pull request #207 from sipsma/v2
Upgrade to v2 runc service in VM Agent.
2 parents aba20fb + 3602a89 commit c0abc8b

File tree

5 files changed

+37
-31
lines changed

5 files changed

+37
-31
lines changed

agent/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ func main() {
7878
log.G(shimCtx).Info("creating task service")
7979

8080
eventExchange := &event.ExchangeCloser{Exchange: exchange.NewExchange()}
81-
taskService := NewTaskService(shimCtx, shimCancel, eventExchange)
81+
taskService, err := NewTaskService(shimCtx, shimCancel, eventExchange)
82+
if err != nil {
83+
log.G(shimCtx).WithError(err).Fatal("failed to create task service")
84+
}
8285

8386
server, err := ttrpc.NewServer()
8487
if err != nil {

agent/service.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222

2323
"github.com/containerd/containerd/cio"
2424
"github.com/containerd/containerd/log"
25-
runc "github.com/containerd/containerd/runtime/v2/runc/v1"
25+
runc "github.com/containerd/containerd/runtime/v2/runc/v2"
2626
"github.com/containerd/containerd/runtime/v2/shim"
2727
taskAPI "github.com/containerd/containerd/runtime/v2/task"
2828
"github.com/gogo/protobuf/types"
@@ -67,14 +67,23 @@ type TaskService struct {
6767
}
6868

6969
// NewTaskService creates new runc shim wrapper
70-
func NewTaskService(shimCtx context.Context, shimCancel context.CancelFunc, publisher shim.Publisher) taskAPI.TaskService {
70+
func NewTaskService(shimCtx context.Context, shimCancel context.CancelFunc, publisher shim.Publisher) (taskAPI.TaskService, error) {
71+
// We provide an empty string for "id" as the service manages multiple tasks; there is no single
72+
// "id" being managed. As noted in the comments of the called code, the "id" arg is only used by
73+
// the Cleanup function, so it will never be invoked as part of the task service API, which is all
74+
// we need.
75+
runcService, err := runc.New(shimCtx, "", publisher, shimCancel)
76+
if err != nil {
77+
return nil, err
78+
}
79+
7180
return &TaskService{
72-
taskManager: vm.NewTaskManager(log.G(shimCtx)),
81+
taskManager: vm.NewTaskManager(log.G(shimCtx), runcService),
7382

7483
publisher: publisher,
7584
shimCtx: shimCtx,
7685
shimCancel: shimCancel,
77-
}
86+
}, nil
7887
}
7988

8089
func logPanicAndDie(logger *logrus.Entry) {
@@ -128,11 +137,6 @@ func (ts *TaskService) Create(requestCtx context.Context, req *taskAPI.CreateTas
128137
// TODO if we update to the v2 runc implementation in containerd, we can use a single
129138
// runc service instance to manage all tasks instead of creating a new one for each
130139
taskCtx, taskCancel := context.WithCancel(ts.shimCtx)
131-
runcService, err := runc.New(taskCtx, req.ID, ts.publisher, taskCancel)
132-
if err != nil {
133-
return nil, errors.Wrap(err, "failed to create runc shim for task")
134-
}
135-
136140
defer func() {
137141
if err != nil {
138142
taskCancel()
@@ -159,7 +163,7 @@ func (ts *TaskService) Create(requestCtx context.Context, req *taskAPI.CreateTas
159163
fifoSet.Stderr = ""
160164
}
161165

162-
task, err := ts.taskManager.AddTask(req.ID, runcService, bundleDir, extraData, fifoSet, taskCtx.Done(), taskCancel)
166+
task, err := ts.taskManager.AddTask(req.ID, bundleDir, extraData, fifoSet, taskCtx.Done(), taskCancel)
163167
if err != nil {
164168
return nil, errors.Wrap(err, "failed to add task")
165169
}

internal/vm/task.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,32 +62,33 @@ type VSockConnector func(ctx context.Context, port uint32) (net.Conn, error)
6262
// being executed via a firecracker-containerd runtime. It's intended to be
6363
// abstracted over whether it's being executed on the Host or inside a VM Guest.
6464
type TaskManager interface {
65-
AddTask(string, taskAPI.TaskService, bundle.Dir, *proto.ExtraData, *cio.FIFOSet, <-chan struct{}, context.CancelFunc) (*Task, error)
65+
AddTask(string, bundle.Dir, *proto.ExtraData, *cio.FIFOSet, <-chan struct{}, context.CancelFunc) (*Task, error)
6666
Task(string) (*Task, error)
6767
TaskCount() uint
6868
Remove(string)
6969
RemoveAll()
7070
}
7171

7272
// NewTaskManager initializes a new TaskManager
73-
func NewTaskManager(logger *logrus.Entry) TaskManager {
73+
func NewTaskManager(logger *logrus.Entry, taskService taskAPI.TaskService) TaskManager {
7474
return &taskManager{
75-
tasks: make(map[string]*Task),
76-
logger: logger,
75+
tasks: make(map[string]*Task),
76+
logger: logger,
77+
taskService: taskService,
7778
}
7879
}
7980

8081
type taskManager struct {
81-
mu sync.RWMutex
82-
tasks map[string]*Task
83-
logger *logrus.Entry
82+
mu sync.RWMutex
83+
tasks map[string]*Task
84+
logger *logrus.Entry
85+
taskService taskAPI.TaskService
8486
}
8587

8688
// AddTask registers a task with the provided metadata with the taskManager.
8789
// taskService should implement the TaskService API for the task (i.e. Create, Kill, Exec, etc.).
8890
func (m *taskManager) AddTask(
8991
containerID string,
90-
taskService taskAPI.TaskService,
9192
bundleDir bundle.Dir,
9293
extraData *proto.ExtraData,
9394
fifoSet *cio.FIFOSet,
@@ -107,7 +108,7 @@ func (m *taskManager) AddTask(
107108
}
108109

109110
task := &Task{
110-
TaskService: taskService,
111+
TaskService: m.taskService,
111112
ID: containerID,
112113
logger: m.logger.WithField("id", containerID),
113114

internal/vm/task_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ type mockTaskService struct {
5555
func defaultMockTaskArgs(id string) addTaskArgs {
5656
return addTaskArgs{
5757
id: fmt.Sprintf("container-%s", id),
58-
ts: &mockTaskService{},
5958
bundleDir: mockBundleDir,
6059
extraData: mockExtraData,
6160
fifoSet: mockFIFOSet,
@@ -64,26 +63,24 @@ func defaultMockTaskArgs(id string) addTaskArgs {
6463

6564
type addTaskArgs struct {
6665
id string
67-
ts task.TaskService
6866
bundleDir bundle.Dir
6967
extraData *proto.ExtraData
7068
fifoSet *cio.FIFOSet
7169
}
7270

7371
func addTaskFromArgs(tm TaskManager, args addTaskArgs) (*Task, error) {
74-
return tm.AddTask(args.id, args.ts, args.bundleDir, args.extraData, args.fifoSet, context.Background().Done(), func() {})
72+
return tm.AddTask(args.id, args.bundleDir, args.extraData, args.fifoSet, context.Background().Done(), func() {})
7573
}
7674

7775
func TestTaskManager_AddRemoveTask(t *testing.T) {
7876
logger, _ := test.NewNullLogger()
79-
tm := NewTaskManager(logger.WithField("test", t.Name()))
77+
tm := NewTaskManager(logger.WithField("test", t.Name()), &mockTaskService{})
8078

8179
taskAArgs := defaultMockTaskArgs("A")
8280
taskBArgs := defaultMockTaskArgs("B")
8381
taskCArgs := defaultMockTaskArgs("C")
8482

8583
assertExpectedTask := func(args addTaskArgs, createdTask *Task) {
86-
require.Equal(t, args.ts, createdTask.TaskService, "AddTask sets expected task service")
8784
require.Equal(t, args.id, createdTask.ID, "AddTask sets expected container ID")
8885
require.Equal(t, args.extraData, createdTask.extraData, "AddTask sets expected container ID")
8986
require.Equal(t, args.bundleDir, createdTask.bundleDir, "AddTask sets expected container ID")
@@ -180,7 +177,7 @@ func TestTaskManager_StartStdioProxy_VSockToFIFO(t *testing.T) {
180177

181178
func testStdioProxy(t *testing.T, inputDirection IODirection) {
182179
logger, _ := test.NewNullLogger()
183-
tm := NewTaskManager(logger.WithField("test", t.Name()))
180+
tm := NewTaskManager(logger.WithField("test", t.Name()), &mockTaskService{})
184181

185182
taskArgs := defaultMockTaskArgs("A")
186183

runtime/service.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,6 @@ var (
9292

9393
// implements shimapi
9494
type service struct {
95-
taskManager vm.TaskManager
96-
9795
eventExchange *exchange.Exchange
9896
namespace string
9997

@@ -128,6 +126,10 @@ type service struct {
128126
agentClient taskAPI.TaskService
129127
eventBridgeClient eventbridge.Getter
130128

129+
// taskManager is only instantiated after CreateVM has run successfully.
130+
// Any use of it must be guarded by service.waitVMReady()
131+
taskManager vm.TaskManager
132+
131133
machine *firecracker.Machine
132134
machineConfig *firecracker.Config
133135
machineCID uint32
@@ -172,8 +174,6 @@ func NewService(shimCtx context.Context, id string, remotePublisher shim.Publish
172174
}
173175

174176
s := &service{
175-
taskManager: vm.NewTaskManager(logger),
176-
177177
eventExchange: exchange.NewExchange(),
178178
namespace: namespace,
179179

@@ -494,6 +494,7 @@ func (s *service) createVM(requestCtx context.Context, request *proto.CreateVMRe
494494
rpcClient := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { _ = conn.Close() }))
495495
s.agentClient = taskAPI.NewTaskClient(rpcClient)
496496
s.eventBridgeClient = eventbridge.NewGetterClient(rpcClient)
497+
s.taskManager = vm.NewTaskManager(s.logger, s.agentClient)
497498

498499
s.logger.Info("successfully started the VM")
499500

@@ -687,7 +688,7 @@ func (s *service) Create(requestCtx context.Context, request *taskAPI.CreateTask
687688
}
688689

689690
taskCtx, taskCancel := context.WithCancel(s.shimCtx)
690-
task, err := s.taskManager.AddTask(request.ID, s.agentClient, bundleDir, extraData, cio.NewFIFOSet(cio.Config{
691+
task, err := s.taskManager.AddTask(request.ID, bundleDir, extraData, cio.NewFIFOSet(cio.Config{
691692
Stdin: request.Stdin,
692693
Stdout: request.Stdout,
693694
Stderr: request.Stderr,

0 commit comments

Comments
 (0)