Support pausing parallelVisit

Pass a channel to visitor functions called by parallelVisit that
allows them to pause the current visitor until a given visitor
has finished.  This allows parallelVisit to work on a dependency
graph while it is being mutated.

Test: Test_parallelVisit
Change-Id: Id8b1542c22ac9914439310e31d992ae0d7318d69
This commit is contained in:
Colin Cross 2020-08-25 17:12:59 -07:00
parent 4b3efdc5b8
commit c4773d90a2
3 changed files with 522 additions and 121 deletions

View file

@ -246,7 +246,7 @@ type moduleInfo struct {
forwardDeps []*moduleInfo forwardDeps []*moduleInfo
directDeps []depInfo directDeps []depInfo
// used by parallelVisitAllBottomUp // used by parallelVisit
waitingCount int waitingCount int
// set during each runMutator // set during each runMutator
@ -1827,7 +1827,7 @@ type visitOrderer interface {
// returns the list of modules that are waiting for this module // returns the list of modules that are waiting for this module
propagate(module *moduleInfo) []*moduleInfo propagate(module *moduleInfo) []*moduleInfo
// visit modules in order // visit modules in order
visit(modules []*moduleInfo, visit func(*moduleInfo) bool) visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool)
} }
type unorderedVisitorImpl struct{} type unorderedVisitorImpl struct{}
@ -1840,9 +1840,9 @@ func (unorderedVisitorImpl) propagate(module *moduleInfo) []*moduleInfo {
return nil return nil
} }
func (unorderedVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo) bool) { func (unorderedVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool) {
for _, module := range modules { for _, module := range modules {
if visit(module) { if visit(module, nil) {
return return
} }
} }
@ -1858,9 +1858,9 @@ func (bottomUpVisitorImpl) propagate(module *moduleInfo) []*moduleInfo {
return module.reverseDeps return module.reverseDeps
} }
func (bottomUpVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo) bool) { func (bottomUpVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool) {
for _, module := range modules { for _, module := range modules {
if visit(module) { if visit(module, nil) {
return return
} }
} }
@ -1876,10 +1876,10 @@ func (topDownVisitorImpl) propagate(module *moduleInfo) []*moduleInfo {
return module.forwardDeps return module.forwardDeps
} }
func (topDownVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo) bool) { func (topDownVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool) {
for i := 0; i < len(modules); i++ { for i := 0; i < len(modules); i++ {
module := modules[len(modules)-1-i] module := modules[len(modules)-1-i]
if visit(module) { if visit(module, nil) {
return return
} }
} }
@ -1890,25 +1890,50 @@ var (
topDownVisitor topDownVisitorImpl topDownVisitor topDownVisitorImpl
) )
// pauseSpec describes a pause that a module needs to occur until another module has been visited,
// at which point the unpause channel will be closed.
type pauseSpec struct {
paused *moduleInfo
until *moduleInfo
unpause unpause
}
type unpause chan struct{}
const parallelVisitLimit = 1000
// Calls visit on each module, guaranteeing that visit is not called on a module until visit on all // Calls visit on each module, guaranteeing that visit is not called on a module until visit on all
// of its dependencies has finished. // of its dependencies has finished. A visit function can write a pauseSpec to the pause channel
func (c *Context) parallelVisit(order visitOrderer, visit func(group *moduleInfo) bool) { // to wait for another dependency to be visited. If a visit function returns true to cancel
// while another visitor is paused, the paused visitor will never be resumed and its goroutine
// will stay paused forever.
func parallelVisit(modules []*moduleInfo, order visitOrderer, limit int,
visit func(module *moduleInfo, pause chan<- pauseSpec) bool) []error {
doneCh := make(chan *moduleInfo) doneCh := make(chan *moduleInfo)
cancelCh := make(chan bool) cancelCh := make(chan bool)
count := 0 pauseCh := make(chan pauseSpec)
cancel := false cancel := false
var backlog []*moduleInfo
const limit = 1000
for _, module := range c.modulesSorted { var backlog []*moduleInfo // Visitors that are ready to start but backlogged due to limit.
var unpauseBacklog []pauseSpec // Visitors that are ready to unpause but backlogged due to limit.
active := 0 // Number of visitors running, not counting paused visitors.
visited := 0 // Number of finished visitors.
pauseMap := make(map[*moduleInfo][]pauseSpec)
for _, module := range modules {
module.waitingCount = order.waitCount(module) module.waitingCount = order.waitCount(module)
} }
visitOne := func(module *moduleInfo) { // Call the visitor on a module if there are fewer active visitors than the parallelism
if count < limit { // limit, otherwise add it to the backlog.
count++ startOrBacklog := func(module *moduleInfo) {
if active < limit {
active++
go func() { go func() {
ret := visit(module) ret := visit(module, pauseCh)
if ret { if ret {
cancelCh <- true cancelCh <- true
} }
@ -1919,34 +1944,190 @@ func (c *Context) parallelVisit(order visitOrderer, visit func(group *moduleInfo
} }
} }
for _, module := range c.modulesSorted { // Unpause the already-started but paused visitor on a module if there are fewer active
if module.waitingCount == 0 { // visitors than the parallelism limit, otherwise add it to the backlog.
visitOne(module) unpauseOrBacklog := func(pauseSpec pauseSpec) {
if active < limit {
active++
close(pauseSpec.unpause)
} else {
unpauseBacklog = append(unpauseBacklog, pauseSpec)
} }
} }
for count > 0 || len(backlog) > 0 { // Start any modules in the backlog up to the parallelism limit. Unpause paused modules first
// since they may already be holding resources.
unpauseOrStartFromBacklog := func() {
for active < limit && len(unpauseBacklog) > 0 {
unpause := unpauseBacklog[0]
unpauseBacklog = unpauseBacklog[1:]
unpauseOrBacklog(unpause)
}
for active < limit && len(backlog) > 0 {
toVisit := backlog[0]
backlog = backlog[1:]
startOrBacklog(toVisit)
}
}
toVisit := len(modules)
// Start or backlog any modules that are not waiting for any other modules.
for _, module := range modules {
if module.waitingCount == 0 {
startOrBacklog(module)
}
}
for active > 0 {
select { select {
case <-cancelCh: case <-cancelCh:
cancel = true cancel = true
backlog = nil backlog = nil
case doneModule := <-doneCh: case doneModule := <-doneCh:
count-- active--
if !cancel { if !cancel {
for count < limit && len(backlog) > 0 { // Mark this module as done.
toVisit := backlog[0] doneModule.waitingCount = -1
backlog = backlog[1:] visited++
visitOne(toVisit)
// Unpause or backlog any modules that were waiting for this one.
if unpauses, ok := pauseMap[doneModule]; ok {
delete(pauseMap, doneModule)
for _, unpause := range unpauses {
unpauseOrBacklog(unpause)
}
} }
// Start any backlogged modules up to limit.
unpauseOrStartFromBacklog()
// Decrement waitingCount on the next modules in the tree based
// on propagation order, and start or backlog them if they are
// ready to start.
for _, module := range order.propagate(doneModule) { for _, module := range order.propagate(doneModule) {
module.waitingCount-- module.waitingCount--
if module.waitingCount == 0 { if module.waitingCount == 0 {
visitOne(module) startOrBacklog(module)
} }
} }
} }
case pauseSpec := <-pauseCh:
if pauseSpec.until.waitingCount == -1 {
// Module being paused for is already finished, resume immediately.
close(pauseSpec.unpause)
} else {
// Register for unpausing.
pauseMap[pauseSpec.until] = append(pauseMap[pauseSpec.until], pauseSpec)
// Don't count paused visitors as active so that this can't deadlock
// if 1000 visitors are paused simultaneously.
active--
unpauseOrStartFromBacklog()
}
} }
} }
if !cancel {
// Invariant check: no backlogged modules, these weren't waiting on anything except
// the parallelism limit so they should have run.
if len(backlog) > 0 {
panic(fmt.Errorf("parallelVisit finished with %d backlogged visitors", len(backlog)))
}
// Invariant check: no backlogged paused modules, these weren't waiting on anything
// except the parallelism limit so they should have run.
if len(unpauseBacklog) > 0 {
panic(fmt.Errorf("parallelVisit finished with %d backlogged unpaused visitors", len(unpauseBacklog)))
}
if len(pauseMap) > 0 {
// Probably a deadlock due to a newly added dependency cycle.
// Start from a semi-random module being paused for and perform a depth-first
// search for the module it is paused on, ignoring modules that are marked as
// done. Note this traverses from modules to the modules that would have been
// unblocked when that module finished.
var start, end *moduleInfo
for _, pauseSpecs := range pauseMap {
for _, pauseSpec := range pauseSpecs {
if start == nil || start.String() > pauseSpec.paused.String() {
start = pauseSpec.paused
end = pauseSpec.until
}
}
}
var check func(group *moduleInfo) []*moduleInfo
check = func(module *moduleInfo) []*moduleInfo {
if module.waitingCount == -1 {
// This module was finished, it can't be part of a loop.
return nil
}
if module == end {
// This module is the end of the loop, start rolling up the cycle.
return []*moduleInfo{module}
}
for _, dep := range order.propagate(module) {
cycle := check(dep)
if cycle != nil {
return append([]*moduleInfo{module}, cycle...)
}
}
for _, depPauseSpec := range pauseMap[module] {
cycle := check(depPauseSpec.paused)
if cycle != nil {
return append([]*moduleInfo{module}, cycle...)
}
}
return nil
}
cycle := check(start)
if cycle != nil {
return cycleError(cycle)
}
}
// Invariant check: if there was no deadlock and no cancellation every module
// should have been visited.
if visited != toVisit {
panic(fmt.Errorf("parallelVisit ran %d visitors, expected %d", visited, toVisit))
}
// Invariant check: if there was no deadlock and no cancellation every module
// should have been visited, so there is nothing left to be paused on.
if len(pauseMap) > 0 {
panic(fmt.Errorf("parallelVisit finished with %d paused visitors", len(pauseMap)))
}
}
return nil
}
func cycleError(cycle []*moduleInfo) (errs []error) {
// The cycle list is in reverse order because all the 'check' calls append
// their own module to the list.
errs = append(errs, &BlueprintError{
Err: fmt.Errorf("encountered dependency cycle:"),
Pos: cycle[len(cycle)-1].pos,
})
// Iterate backwards through the cycle list.
curModule := cycle[0]
for i := len(cycle) - 1; i >= 0; i-- {
nextModule := cycle[i]
errs = append(errs, &BlueprintError{
Err: fmt.Errorf(" %q depends on %q",
curModule.Name(),
nextModule.Name()),
Pos: curModule.pos,
})
curModule = nextModule
}
return errs
} }
// updateDependencies recursively walks the module dependency graph and updates // updateDependencies recursively walks the module dependency graph and updates
@ -1963,30 +2144,6 @@ func (c *Context) updateDependencies() (errs []error) {
var check func(group *moduleInfo) []*moduleInfo var check func(group *moduleInfo) []*moduleInfo
cycleError := func(cycle []*moduleInfo) {
// We are the "start" of the cycle, so we're responsible
// for generating the errors. The cycle list is in
// reverse order because all the 'check' calls append
// their own module to the list.
errs = append(errs, &BlueprintError{
Err: fmt.Errorf("encountered dependency cycle:"),
Pos: cycle[len(cycle)-1].pos,
})
// Iterate backwards through the cycle list.
curModule := cycle[0]
for i := len(cycle) - 1; i >= 0; i-- {
nextModule := cycle[i]
errs = append(errs, &BlueprintError{
Err: fmt.Errorf(" %q depends on %q",
curModule.Name(),
nextModule.Name()),
Pos: curModule.pos,
})
curModule = nextModule
}
}
check = func(module *moduleInfo) []*moduleInfo { check = func(module *moduleInfo) []*moduleInfo {
visited[module] = true visited[module] = true
checking[module] = true checking[module] = true
@ -2022,10 +2179,8 @@ func (c *Context) updateDependencies() (errs []error) {
if cycle != nil { if cycle != nil {
if cycle[0] == module { if cycle[0] == module {
// We are the "start" of the cycle, so we're responsible // We are the "start" of the cycle, so we're responsible
// for generating the errors. The cycle list is in // for generating the errors.
// reverse order because all the 'check' calls append errs = append(errs, cycleError(cycle)...)
// their own module to the list.
cycleError(cycle)
// We can continue processing this module's children to // We can continue processing this module's children to
// find more cycles. Since all the modules that were // find more cycles. Since all the modules that were
@ -2055,7 +2210,7 @@ func (c *Context) updateDependencies() (errs []error) {
if cycle[len(cycle)-1] != module { if cycle[len(cycle)-1] != module {
panic("inconceivable!") panic("inconceivable!")
} }
cycleError(cycle) errs = append(errs, cycleError(cycle)...)
} }
} }
} }
@ -2248,7 +2403,7 @@ func (c *Context) runMutator(config interface{}, mutator *mutatorInfo,
c.depsModified = 0 c.depsModified = 0
visit := func(module *moduleInfo) bool { visit := func(module *moduleInfo, pause chan<- pauseSpec) bool {
if module.splitModules != nil { if module.splitModules != nil {
panic("split module found in sorted module list") panic("split module found in sorted module list")
} }
@ -2259,7 +2414,8 @@ func (c *Context) runMutator(config interface{}, mutator *mutatorInfo,
config: config, config: config,
module: module, module: module,
}, },
name: mutator.name, name: mutator.name,
pauseCh: pause,
} }
func() { func() {
@ -2325,12 +2481,17 @@ func (c *Context) runMutator(config interface{}, mutator *mutatorInfo,
} }
}() }()
var visitErrs []error
if mutator.parallel { if mutator.parallel {
c.parallelVisit(direction.orderer(), visit) visitErrs = parallelVisit(c.modulesSorted, direction.orderer(), parallelVisitLimit, visit)
} else { } else {
direction.orderer().visit(c.modulesSorted, visit) direction.orderer().visit(c.modulesSorted, visit)
} }
if len(visitErrs) > 0 {
return nil, visitErrs
}
done <- true done <- true
if len(errs) > 0 { if len(errs) > 0 {
@ -2445,12 +2606,16 @@ func (c *Context) cloneModules() {
ch := make(chan update) ch := make(chan update)
doneCh := make(chan bool) doneCh := make(chan bool)
go func() { go func() {
c.parallelVisit(unorderedVisitorImpl{}, func(m *moduleInfo) bool { errs := parallelVisit(c.modulesSorted, unorderedVisitorImpl{}, parallelVisitLimit,
origLogicModule := m.logicModule func(m *moduleInfo, pause chan<- pauseSpec) bool {
m.logicModule, m.properties = c.cloneLogicModule(m) origLogicModule := m.logicModule
ch <- update{origLogicModule, m} m.logicModule, m.properties = c.cloneLogicModule(m)
return false ch <- update{origLogicModule, m}
}) return false
})
if len(errs) > 0 {
panic(errs)
}
doneCh <- true doneCh <- true
}() }()
@ -2514,71 +2679,73 @@ func (c *Context) generateModuleBuildActions(config interface{},
} }
}() }()
c.parallelVisit(bottomUpVisitor, func(module *moduleInfo) bool { visitErrs := parallelVisit(c.modulesSorted, bottomUpVisitor, parallelVisitLimit,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
uniqueName := c.nameInterface.UniqueName(newNamespaceContext(module), module.group.name)
sanitizedName := toNinjaName(uniqueName)
uniqueName := c.nameInterface.UniqueName(newNamespaceContext(module), module.group.name) prefix := moduleNamespacePrefix(sanitizedName + "_" + module.variant.name)
sanitizedName := toNinjaName(uniqueName)
prefix := moduleNamespacePrefix(sanitizedName + "_" + module.variant.name) // The parent scope of the moduleContext's local scope gets overridden to be that of the
// calling Go package on a per-call basis. Since the initial parent scope doesn't matter we
// just set it to nil.
scope := newLocalScope(nil, prefix)
// The parent scope of the moduleContext's local scope gets overridden to be that of the mctx := &moduleContext{
// calling Go package on a per-call basis. Since the initial parent scope doesn't matter we baseModuleContext: baseModuleContext{
// just set it to nil. context: c,
scope := newLocalScope(nil, prefix) config: config,
module: module,
mctx := &moduleContext{ },
baseModuleContext: baseModuleContext{ scope: scope,
context: c, handledMissingDeps: module.missingDeps == nil,
config: config,
module: module,
},
scope: scope,
handledMissingDeps: module.missingDeps == nil,
}
func() {
defer func() {
if r := recover(); r != nil {
in := fmt.Sprintf("GenerateBuildActions for %s", module)
if err, ok := r.(panicError); ok {
err.addIn(in)
mctx.error(err)
} else {
mctx.error(newPanicErrorf(r, in))
}
}
}()
mctx.module.logicModule.GenerateBuildActions(mctx)
}()
if len(mctx.errs) > 0 {
errsCh <- mctx.errs
return true
}
if module.missingDeps != nil && !mctx.handledMissingDeps {
var errs []error
for _, depName := range module.missingDeps {
errs = append(errs, c.missingDependencyError(module, depName))
} }
errsCh <- errs
return true
}
depsCh <- mctx.ninjaFileDeps func() {
defer func() {
if r := recover(); r != nil {
in := fmt.Sprintf("GenerateBuildActions for %s", module)
if err, ok := r.(panicError); ok {
err.addIn(in)
mctx.error(err)
} else {
mctx.error(newPanicErrorf(r, in))
}
}
}()
mctx.module.logicModule.GenerateBuildActions(mctx)
}()
newErrs := c.processLocalBuildActions(&module.actionDefs, if len(mctx.errs) > 0 {
&mctx.actionDefs, liveGlobals) errsCh <- mctx.errs
if len(newErrs) > 0 { return true
errsCh <- newErrs }
return true
} if module.missingDeps != nil && !mctx.handledMissingDeps {
return false var errs []error
}) for _, depName := range module.missingDeps {
errs = append(errs, c.missingDependencyError(module, depName))
}
errsCh <- errs
return true
}
depsCh <- mctx.ninjaFileDeps
newErrs := c.processLocalBuildActions(&module.actionDefs,
&mctx.actionDefs, liveGlobals)
if len(newErrs) > 0 {
errsCh <- newErrs
return true
}
return false
})
cancelCh <- struct{}{} cancelCh <- struct{}{}
<-cancelCh <-cancelCh
errs = append(errs, visitErrs...)
return deps, errs return deps, errs
} }

View file

@ -838,3 +838,219 @@ func Test_findVariant(t *testing.T) {
}) })
} }
} }
func Test_parallelVisit(t *testing.T) {
moduleA := &moduleInfo{
group: &moduleGroup{
name: "A",
},
}
moduleB := &moduleInfo{
group: &moduleGroup{
name: "B",
},
}
moduleC := &moduleInfo{
group: &moduleGroup{
name: "C",
},
}
moduleD := &moduleInfo{
group: &moduleGroup{
name: "D",
},
}
moduleA.group.modules = modulesOrAliases{moduleA}
moduleB.group.modules = modulesOrAliases{moduleB}
moduleC.group.modules = modulesOrAliases{moduleC}
moduleD.group.modules = modulesOrAliases{moduleD}
addDep := func(from, to *moduleInfo) {
from.directDeps = append(from.directDeps, depInfo{to, nil})
from.forwardDeps = append(from.forwardDeps, to)
to.reverseDeps = append(to.reverseDeps, from)
}
// A depends on B, B depends on C. Nothing depends on D, and D doesn't depend on anything.
addDep(moduleA, moduleB)
addDep(moduleB, moduleC)
t.Run("no modules", func(t *testing.T) {
errs := parallelVisit(nil, bottomUpVisitorImpl{}, 1,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
panic("unexpected call to visitor")
})
if errs != nil {
t.Errorf("expected no errors, got %q", errs)
}
})
t.Run("bottom up", func(t *testing.T) {
order := ""
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 1,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
order += module.group.name
return false
})
if errs != nil {
t.Errorf("expected no errors, got %q", errs)
}
if g, w := order, "CBA"; g != w {
t.Errorf("expected order %q, got %q", w, g)
}
})
t.Run("pause", func(t *testing.T) {
order := ""
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC, moduleD}, bottomUpVisitorImpl{}, 1,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
if module == moduleC {
// Pause module C on module D
unpause := make(chan struct{})
pause <- pauseSpec{moduleC, moduleD, unpause}
<-unpause
}
order += module.group.name
return false
})
if errs != nil {
t.Errorf("expected no errors, got %q", errs)
}
if g, w := order, "DCBA"; g != w {
t.Errorf("expected order %q, got %q", w, g)
}
})
t.Run("cancel", func(t *testing.T) {
order := ""
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 1,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
order += module.group.name
// Cancel in module B
return module == moduleB
})
if errs != nil {
t.Errorf("expected no errors, got %q", errs)
}
if g, w := order, "CB"; g != w {
t.Errorf("expected order %q, got %q", w, g)
}
})
t.Run("pause and cancel", func(t *testing.T) {
order := ""
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC, moduleD}, bottomUpVisitorImpl{}, 1,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
if module == moduleC {
// Pause module C on module D
unpause := make(chan struct{})
pause <- pauseSpec{moduleC, moduleD, unpause}
<-unpause
}
order += module.group.name
// Cancel in module D
return module == moduleD
})
if errs != nil {
t.Errorf("expected no errors, got %q", errs)
}
if g, w := order, "D"; g != w {
t.Errorf("expected order %q, got %q", w, g)
}
})
t.Run("parallel", func(t *testing.T) {
order := ""
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 3,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
order += module.group.name
return false
})
if errs != nil {
t.Errorf("expected no errors, got %q", errs)
}
if g, w := order, "CBA"; g != w {
t.Errorf("expected order %q, got %q", w, g)
}
})
t.Run("pause existing", func(t *testing.T) {
order := ""
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 3,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
if module == moduleA {
// Pause module A on module B (an existing dependency)
unpause := make(chan struct{})
pause <- pauseSpec{moduleA, moduleB, unpause}
<-unpause
}
order += module.group.name
return false
})
if errs != nil {
t.Errorf("expected no errors, got %q", errs)
}
if g, w := order, "CBA"; g != w {
t.Errorf("expected order %q, got %q", w, g)
}
})
t.Run("cycle", func(t *testing.T) {
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 3,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
if module == moduleC {
// Pause module C on module A (a dependency cycle)
unpause := make(chan struct{})
pause <- pauseSpec{moduleC, moduleA, unpause}
<-unpause
}
return false
})
want := []string{
`encountered dependency cycle`,
`"C" depends on "A"`,
`"A" depends on "B"`,
`"B" depends on "C"`,
}
for i := range want {
if len(errs) <= i {
t.Errorf("missing error %s", want[i])
} else if !strings.Contains(errs[i].Error(), want[i]) {
t.Errorf("expected error %s, got %s", want[i], errs[i])
}
}
if len(errs) > len(want) {
for _, err := range errs[len(want):] {
t.Errorf("unexpected error %s", err.Error())
}
}
})
t.Run("pause cycle", func(t *testing.T) {
errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC, moduleD}, bottomUpVisitorImpl{}, 3,
func(module *moduleInfo, pause chan<- pauseSpec) bool {
if module == moduleC {
// Pause module C on module D
unpause := make(chan struct{})
pause <- pauseSpec{moduleC, moduleD, unpause}
<-unpause
}
if module == moduleD {
// Pause module D on module C (a pause cycle)
unpause := make(chan struct{})
pause <- pauseSpec{moduleD, moduleC, unpause}
<-unpause
}
return false
})
want := []string{
`encountered dependency cycle`,
`"C" depends on "D"`,
`"D" depends on "C"`,
}
for i := range want {
if len(errs) <= i {
t.Errorf("missing error %s", want[i])
} else if !strings.Contains(errs[i].Error(), want[i]) {
t.Errorf("expected error %s, got %s", want[i], errs[i])
}
}
if len(errs) > len(want) {
for _, err := range errs[len(want):] {
t.Errorf("unexpected error %s", err.Error())
}
}
})
}

View file

@ -725,6 +725,7 @@ type mutatorContext struct {
newVariations modulesOrAliases // new variants of existing modules newVariations modulesOrAliases // new variants of existing modules
newModules []*moduleInfo // brand new modules newModules []*moduleInfo // brand new modules
defaultVariation *string defaultVariation *string
pauseCh chan<- pauseSpec
} }
type BaseMutatorContext interface { type BaseMutatorContext interface {
@ -1102,6 +1103,23 @@ func (mctx *mutatorContext) CreateModule(factory ModuleFactory, props ...interfa
return module.logicModule return module.logicModule
} }
// pause waits until the given dependency has been visited by the mutator's parallelVisit call.
// It returns true if the pause was supported, false if the pause was not supported and did not
// occur, which will happen when the mutator is not parallelizable.
func (mctx *mutatorContext) pause(dep *moduleInfo) bool {
if mctx.pauseCh != nil {
unpause := make(unpause)
mctx.pauseCh <- pauseSpec{
paused: mctx.module,
until: dep,
unpause: unpause,
}
<-unpause
return true
}
return false
}
// SimpleName is an embeddable object to implement the ModuleContext.Name method using a property // SimpleName is an embeddable object to implement the ModuleContext.Name method using a property
// called "name". Modules that embed it must also add SimpleName.Properties to their property // called "name". Modules that embed it must also add SimpleName.Properties to their property
// structure list. // structure list.