Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make sure init exited event send after exec exited events #17

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 119 additions & 85 deletions cmd/containerd-shim-runc-v2/task/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
}
go ep.Run(ctx)
s := &service{
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
pendingExecs: make(map[*runc.Container]int),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
context: ctx,
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
ep: ep,
shutdown: sd,
containers: make(map[string]*runc.Container),
running: make(map[int][]containerProcess),
runningExecs: make(map[*runc.Container][]uint32),
lastExecEventSent: make(map[*runc.Container]chan struct{}),
exitSubscribers: make(map[*map[int][]runcC.Exit]struct{}),
containerInitProcess: make(map[*runc.Container]initProcess),
waitingAllExecExitSent: make(map[*runc.Container]bool),
}
go s.processExits()
runcC.Monitor = reaper.Default
Expand Down Expand Up @@ -115,9 +118,13 @@ type service struct {

containers map[string]*runc.Container

lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
pendingExecs map[*runc.Container]int // container -> num pending execs, guarded by lifecycleMu
lifecycleMu sync.Mutex
running map[int][]containerProcess // pid -> running process, guarded by lifecycleMu
execsMu sync.Mutex
runningExecs map[*runc.Container][]uint32
waitingAllExecExitSent map[*runc.Container]bool
lastExecEventSent map[*runc.Container]chan struct{}
containerInitProcess map[*runc.Container]initProcess
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
// dereferencing the subscription pointers must only be done while holding
// lifecycleMu.
Expand All @@ -126,6 +133,11 @@ type service struct {
shutdown shutdown.Service
}

type initProcess struct {
pid uint32
exited bool
}

type containerProcess struct {
Container *runc.Container
Process process.Process
Expand Down Expand Up @@ -173,45 +185,7 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
if p != nil {
pid = p.Pid()
}

_, init := p.(*process.Init)
s.lifecycleMu.Lock()

var initExits []runcC.Exit
var initCps []containerProcess
if !init {
s.pendingExecs[c]--

initPid := c.Pid()
iExits, initExited := exits[initPid]
if initExited && s.pendingExecs[c] == 0 {
// c's init process has exited before handleStarted was called and
// this is the last pending exec process start - we need to process
// the exit for the init process after processing this exec, so:
// - delete c from the s.pendingExecs map
// - keep the exits for the init pid to process later (after we process
// this exec's exits)
// - get the necessary containerProcesses for the init process (that we
// need to process the exits), and remove them from s.running (which we skipped
// doing in processExits).
delete(s.pendingExecs, c)
initExits = iExits
var skipped []containerProcess
for _, initPidCp := range s.running[initPid] {
if initPidCp.Container == c {
initCps = append(initCps, initPidCp)
} else {
skipped = append(skipped, initPidCp)
}
}
if len(skipped) == 0 {
delete(s.running, initPid)
} else {
s.running[initPid] = skipped
}
}
}

ees, exited := exits[pid]
delete(s.exitSubscribers, &exits)
exits = nil
Expand All @@ -220,11 +194,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
for _, ee := range ees {
s.handleProcessExit(ee, c, p)
}
for _, eee := range initExits {
for _, cp := range initCps {
s.handleProcessExit(eee, cp.Container, cp.Process)
}
}
} else {
// Process start was successful, add to `s.running`.
s.running[pid] = append(s.running[pid], containerProcess{
Expand Down Expand Up @@ -262,7 +231,10 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
}

s.containers[r.ID] = container

s.containerInitProcess[container] = initProcess{
pid: uint32(container.Pid()),
exited: false,
}
s.send(&eventstypes.TaskCreate{
ContainerID: r.ID,
Bundle: r.Bundle,
Expand Down Expand Up @@ -304,8 +276,11 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
s.lifecycleMu.Lock()
if r.ExecID == "" {
cinit = container
} else {
s.pendingExecs[container]++
}
initPro, ok := s.containerInitProcess[container]
if ok && initPro.exited {
s.lifecycleMu.Unlock()
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container %s init process is not running", container.ID)
}
handleStarted, cleanup := s.preStart(cinit)
s.lifecycleMu.Unlock()
Expand Down Expand Up @@ -347,6 +322,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
Pid: uint32(p.Pid()),
})
default:
s.execsMu.Lock()
_, exist := s.lastExecEventSent[container]
if !exist {
s.lastExecEventSent[container] = make(chan struct{})
}
pids, ok := s.runningExecs[container]
if !ok {
pids = []uint32{}
}
pids = append(pids, uint32(p.Pid()))
s.runningExecs[container] = pids
s.execsMu.Unlock()

s.send(&eventstypes.TaskExecStarted{
ContainerID: container.ID,
ExecID: r.ExecID,
Expand Down Expand Up @@ -674,27 +662,25 @@ func (s *service) processExits() {
for subscriber := range s.exitSubscribers {
(*subscriber)[e.Pid] = append((*subscriber)[e.Pid], e)
}
// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
var cps, skipped []containerProcess
for _, cp := range s.running[e.Pid] {
_, init := cp.Process.(*process.Init)
if init && s.pendingExecs[cp.Container] != 0 {
// This exit relates to a container for which we have pending execs. In
// order to ensure order between execs and the init process for a given
// container, skip processing this exit here and let the `handleStarted`
// closure for the pending exec publish it.
skipped = append(skipped, cp)
} else {
cps = append(cps, cp)

init := initProcess{}
var container *runc.Container
for c, initPro := range s.containerInitProcess {
if initPro.pid == uint32(e.Pid) {
container = c
init.pid = uint32(e.Pid)
init.exited = true
}
}
if len(skipped) > 0 {
s.running[e.Pid] = skipped
} else {
delete(s.running, e.Pid)
if container != nil {
s.containerInitProcess[container] = init
}

// Handle the exit for a created/started process. If there's more than
// one, assume they've all exited. One of them will be the correct
// process.
cps := s.running[e.Pid]
delete(s.running, e.Pid)
s.lifecycleMu.Unlock()

for _, cp := range cps {
Expand All @@ -709,24 +695,72 @@ func (s *service) send(evt interface{}) {

// s.mu must be locked when calling handleProcessExit
func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.Process) {
if ip, ok := p.(*process.Init); ok {
if ip, init := p.(*process.Init); init {
// Ensure all children are killed
if runc.ShouldKillAllOnExit(s.context, c.Bundle) {
if err := ip.KillAll(s.context); err != nil {
log.G(s.context).WithError(err).WithField("id", ip.ID()).
Error("failed to kill init's children")
}
}
go func() {
s.execsMu.Lock()
s.waitingAllExecExitSent[c] = true
if execCounts, exist := s.runningExecs[c]; exist && len(execCounts) > 0 {
s.execsMu.Unlock()
if ch, ok := s.lastExecEventSent[c]; ok {
<-ch
}
} else {
s.execsMu.Unlock()
}
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
s.execsMu.Lock()
delete(s.lastExecEventSent, c)
delete(s.containerInitProcess, c)
delete(s.waitingAllExecExitSent, c)
s.execsMu.Unlock()
}()
} else {
p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
s.execsMu.Lock()
pids, ok := s.runningExecs[c]
if ok {
pids = removeExitedPid(pids, uint32(e.Pid))
s.runningExecs[c] = pids
}
if len(s.runningExecs[c]) == 0 && s.waitingAllExecExitSent[c] {
// last exec event is sent
sent, exist := s.lastExecEventSent[c]
if exist {
sent <- struct{}{}
}
}
s.execsMu.Unlock()
}
}

p.SetExited(e.Status)
s.send(&eventstypes.TaskExit{
ContainerID: c.ID,
ID: p.ID(),
Pid: uint32(e.Pid),
ExitStatus: uint32(e.Status),
ExitedAt: protobuf.ToTimestamp(p.ExitedAt()),
})
func removeExitedPid(s []uint32, pid uint32) []uint32 {
for i, item := range s {
if item == pid {
return append(s[:i], s[i+1:]...)
}
}
return s
}

func (s *service) getContainerPids(ctx context.Context, container *runc.Container) ([]uint32, error) {
Expand Down