From c4773d90a24137b98249e1d5e8974ab09a14dff3 Mon Sep 17 00:00:00 2001 From: Colin Cross Date: Tue, 25 Aug 2020 17:12:59 -0700 Subject: [PATCH] Support pausing parallelVisit Pass a channel to visitor functions called by parallelVisit that allows them to pause the current visitor until a given visitor has finished. This allows parallelVisit to work on a dependency graph while it is being mutated. Test: Test_parallelVisit Change-Id: Id8b1542c22ac9914439310e31d992ae0d7318d69 --- context.go | 409 ++++++++++++++++++++++++++++++++++-------------- context_test.go | 216 +++++++++++++++++++++++++ module_ctx.go | 18 +++ 3 files changed, 522 insertions(+), 121 deletions(-) diff --git a/context.go b/context.go index d6bb851..e48669a 100644 --- a/context.go +++ b/context.go @@ -246,7 +246,7 @@ type moduleInfo struct { forwardDeps []*moduleInfo directDeps []depInfo - // used by parallelVisitAllBottomUp + // used by parallelVisit waitingCount int // set during each runMutator @@ -1827,7 +1827,7 @@ type visitOrderer interface { // returns the list of modules that are waiting for this module propagate(module *moduleInfo) []*moduleInfo // visit modules in order - visit(modules []*moduleInfo, visit func(*moduleInfo) bool) + visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool) } type unorderedVisitorImpl struct{} @@ -1840,9 +1840,9 @@ func (unorderedVisitorImpl) propagate(module *moduleInfo) []*moduleInfo { return nil } -func (unorderedVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo) bool) { +func (unorderedVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool) { for _, module := range modules { - if visit(module) { + if visit(module, nil) { return } } @@ -1858,9 +1858,9 @@ func (bottomUpVisitorImpl) propagate(module *moduleInfo) []*moduleInfo { return module.reverseDeps } -func (bottomUpVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo) bool) { +func (bottomUpVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool) { for _, module := range modules { - if visit(module) { + if visit(module, nil) { return } } @@ -1876,10 +1876,10 @@ func (topDownVisitorImpl) propagate(module *moduleInfo) []*moduleInfo { return module.forwardDeps } -func (topDownVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo) bool) { +func (topDownVisitorImpl) visit(modules []*moduleInfo, visit func(*moduleInfo, chan<- pauseSpec) bool) { for i := 0; i < len(modules); i++ { module := modules[len(modules)-1-i] - if visit(module) { + if visit(module, nil) { return } } @@ -1890,25 +1890,50 @@ var ( topDownVisitor topDownVisitorImpl ) +// pauseSpec describes a pause that a module needs to occur until another module has been visited, +// at which point the unpause channel will be closed. +type pauseSpec struct { + paused *moduleInfo + until *moduleInfo + unpause unpause +} + +type unpause chan struct{} + +const parallelVisitLimit = 1000 + // Calls visit on each module, guaranteeing that visit is not called on a module until visit on all -// of its dependencies has finished. -func (c *Context) parallelVisit(order visitOrderer, visit func(group *moduleInfo) bool) { +// of its dependencies has finished. A visit function can write a pauseSpec to the pause channel +// to wait for another dependency to be visited. If a visit function returns true to cancel +// while another visitor is paused, the paused visitor will never be resumed and its goroutine +// will stay paused forever. +func parallelVisit(modules []*moduleInfo, order visitOrderer, limit int, + visit func(module *moduleInfo, pause chan<- pauseSpec) bool) []error { + doneCh := make(chan *moduleInfo) cancelCh := make(chan bool) - count := 0 + pauseCh := make(chan pauseSpec) cancel := false - var backlog []*moduleInfo - const limit = 1000 - for _, module := range c.modulesSorted { + var backlog []*moduleInfo // Visitors that are ready to start but backlogged due to limit. + var unpauseBacklog []pauseSpec // Visitors that are ready to unpause but backlogged due to limit. + + active := 0 // Number of visitors running, not counting paused visitors. + visited := 0 // Number of finished visitors. + + pauseMap := make(map[*moduleInfo][]pauseSpec) + + for _, module := range modules { module.waitingCount = order.waitCount(module) } - visitOne := func(module *moduleInfo) { - if count < limit { - count++ + // Call the visitor on a module if there are fewer active visitors than the parallelism + // limit, otherwise add it to the backlog. + startOrBacklog := func(module *moduleInfo) { + if active < limit { + active++ go func() { - ret := visit(module) + ret := visit(module, pauseCh) if ret { cancelCh <- true } @@ -1919,34 +1944,190 @@ func (c *Context) parallelVisit(order visitOrderer, visit func(group *moduleInfo } } - for _, module := range c.modulesSorted { - if module.waitingCount == 0 { - visitOne(module) + // Unpause the already-started but paused visitor on a module if there are fewer active + // visitors than the parallelism limit, otherwise add it to the backlog. + unpauseOrBacklog := func(pauseSpec pauseSpec) { + if active < limit { + active++ + close(pauseSpec.unpause) + } else { + unpauseBacklog = append(unpauseBacklog, pauseSpec) } } - for count > 0 || len(backlog) > 0 { + // Start any modules in the backlog up to the parallelism limit. Unpause paused modules first + // since they may already be holding resources. + unpauseOrStartFromBacklog := func() { + for active < limit && len(unpauseBacklog) > 0 { + unpause := unpauseBacklog[0] + unpauseBacklog = unpauseBacklog[1:] + unpauseOrBacklog(unpause) + } + for active < limit && len(backlog) > 0 { + toVisit := backlog[0] + backlog = backlog[1:] + startOrBacklog(toVisit) + } + } + + toVisit := len(modules) + + // Start or backlog any modules that are not waiting for any other modules. + for _, module := range modules { + if module.waitingCount == 0 { + startOrBacklog(module) + } + } + + for active > 0 { select { case <-cancelCh: cancel = true backlog = nil case doneModule := <-doneCh: - count-- + active-- if !cancel { - for count < limit && len(backlog) > 0 { - toVisit := backlog[0] - backlog = backlog[1:] - visitOne(toVisit) + // Mark this module as done. + doneModule.waitingCount = -1 + visited++ + + // Unpause or backlog any modules that were waiting for this one. + if unpauses, ok := pauseMap[doneModule]; ok { + delete(pauseMap, doneModule) + for _, unpause := range unpauses { + unpauseOrBacklog(unpause) + } } + + // Start any backlogged modules up to limit. + unpauseOrStartFromBacklog() + + // Decrement waitingCount on the next modules in the tree based + // on propagation order, and start or backlog them if they are + // ready to start. for _, module := range order.propagate(doneModule) { module.waitingCount-- if module.waitingCount == 0 { - visitOne(module) + startOrBacklog(module) } } } + case pauseSpec := <-pauseCh: + if pauseSpec.until.waitingCount == -1 { + // Module being paused for is already finished, resume immediately. + close(pauseSpec.unpause) + } else { + // Register for unpausing. + pauseMap[pauseSpec.until] = append(pauseMap[pauseSpec.until], pauseSpec) + + // Don't count paused visitors as active so that this can't deadlock + // if 1000 visitors are paused simultaneously. + active-- + unpauseOrStartFromBacklog() + } } } + + if !cancel { + // Invariant check: no backlogged modules, these weren't waiting on anything except + // the parallelism limit so they should have run. + if len(backlog) > 0 { + panic(fmt.Errorf("parallelVisit finished with %d backlogged visitors", len(backlog))) + } + + // Invariant check: no backlogged paused modules, these weren't waiting on anything + // except the parallelism limit so they should have run. + if len(unpauseBacklog) > 0 { + panic(fmt.Errorf("parallelVisit finished with %d backlogged unpaused visitors", len(unpauseBacklog))) + } + + if len(pauseMap) > 0 { + // Probably a deadlock due to a newly added dependency cycle. + // Start from a semi-random module being paused for and perform a depth-first + // search for the module it is paused on, ignoring modules that are marked as + // done. Note this traverses from modules to the modules that would have been + // unblocked when that module finished. + var start, end *moduleInfo + for _, pauseSpecs := range pauseMap { + for _, pauseSpec := range pauseSpecs { + if start == nil || start.String() > pauseSpec.paused.String() { + start = pauseSpec.paused + end = pauseSpec.until + } + } + } + + var check func(group *moduleInfo) []*moduleInfo + check = func(module *moduleInfo) []*moduleInfo { + if module.waitingCount == -1 { + // This module was finished, it can't be part of a loop. + return nil + } + if module == end { + // This module is the end of the loop, start rolling up the cycle. + return []*moduleInfo{module} + } + + for _, dep := range order.propagate(module) { + cycle := check(dep) + if cycle != nil { + return append([]*moduleInfo{module}, cycle...) + } + } + for _, depPauseSpec := range pauseMap[module] { + cycle := check(depPauseSpec.paused) + if cycle != nil { + return append([]*moduleInfo{module}, cycle...) + } + } + + return nil + } + + cycle := check(start) + if cycle != nil { + return cycleError(cycle) + } + } + + // Invariant check: if there was no deadlock and no cancellation every module + // should have been visited. + if visited != toVisit { + panic(fmt.Errorf("parallelVisit ran %d visitors, expected %d", visited, toVisit)) + } + + // Invariant check: if there was no deadlock and no cancellation every module + // should have been visited, so there is nothing left to be paused on. + if len(pauseMap) > 0 { + panic(fmt.Errorf("parallelVisit finished with %d paused visitors", len(pauseMap))) + } + } + + return nil +} + +func cycleError(cycle []*moduleInfo) (errs []error) { + // The cycle list is in reverse order because all the 'check' calls append + // their own module to the list. + errs = append(errs, &BlueprintError{ + Err: fmt.Errorf("encountered dependency cycle:"), + Pos: cycle[len(cycle)-1].pos, + }) + + // Iterate backwards through the cycle list. + curModule := cycle[0] + for i := len(cycle) - 1; i >= 0; i-- { + nextModule := cycle[i] + errs = append(errs, &BlueprintError{ + Err: fmt.Errorf(" %q depends on %q", + curModule.Name(), + nextModule.Name()), + Pos: curModule.pos, + }) + curModule = nextModule + } + + return errs } // updateDependencies recursively walks the module dependency graph and updates @@ -1963,30 +2144,6 @@ func (c *Context) updateDependencies() (errs []error) { var check func(group *moduleInfo) []*moduleInfo - cycleError := func(cycle []*moduleInfo) { - // We are the "start" of the cycle, so we're responsible - // for generating the errors. The cycle list is in - // reverse order because all the 'check' calls append - // their own module to the list. - errs = append(errs, &BlueprintError{ - Err: fmt.Errorf("encountered dependency cycle:"), - Pos: cycle[len(cycle)-1].pos, - }) - - // Iterate backwards through the cycle list. - curModule := cycle[0] - for i := len(cycle) - 1; i >= 0; i-- { - nextModule := cycle[i] - errs = append(errs, &BlueprintError{ - Err: fmt.Errorf(" %q depends on %q", - curModule.Name(), - nextModule.Name()), - Pos: curModule.pos, - }) - curModule = nextModule - } - } - check = func(module *moduleInfo) []*moduleInfo { visited[module] = true checking[module] = true @@ -2022,10 +2179,8 @@ func (c *Context) updateDependencies() (errs []error) { if cycle != nil { if cycle[0] == module { // We are the "start" of the cycle, so we're responsible - // for generating the errors. The cycle list is in - // reverse order because all the 'check' calls append - // their own module to the list. - cycleError(cycle) + // for generating the errors. + errs = append(errs, cycleError(cycle)...) // We can continue processing this module's children to // find more cycles. Since all the modules that were @@ -2055,7 +2210,7 @@ func (c *Context) updateDependencies() (errs []error) { if cycle[len(cycle)-1] != module { panic("inconceivable!") } - cycleError(cycle) + errs = append(errs, cycleError(cycle)...) } } } @@ -2248,7 +2403,7 @@ func (c *Context) runMutator(config interface{}, mutator *mutatorInfo, c.depsModified = 0 - visit := func(module *moduleInfo) bool { + visit := func(module *moduleInfo, pause chan<- pauseSpec) bool { if module.splitModules != nil { panic("split module found in sorted module list") } @@ -2259,7 +2414,8 @@ func (c *Context) runMutator(config interface{}, mutator *mutatorInfo, config: config, module: module, }, - name: mutator.name, + name: mutator.name, + pauseCh: pause, } func() { @@ -2325,12 +2481,17 @@ func (c *Context) runMutator(config interface{}, mutator *mutatorInfo, } }() + var visitErrs []error if mutator.parallel { - c.parallelVisit(direction.orderer(), visit) + visitErrs = parallelVisit(c.modulesSorted, direction.orderer(), parallelVisitLimit, visit) } else { direction.orderer().visit(c.modulesSorted, visit) } + if len(visitErrs) > 0 { + return nil, visitErrs + } + done <- true if len(errs) > 0 { @@ -2445,12 +2606,16 @@ func (c *Context) cloneModules() { ch := make(chan update) doneCh := make(chan bool) go func() { - c.parallelVisit(unorderedVisitorImpl{}, func(m *moduleInfo) bool { - origLogicModule := m.logicModule - m.logicModule, m.properties = c.cloneLogicModule(m) - ch <- update{origLogicModule, m} - return false - }) + errs := parallelVisit(c.modulesSorted, unorderedVisitorImpl{}, parallelVisitLimit, + func(m *moduleInfo, pause chan<- pauseSpec) bool { + origLogicModule := m.logicModule + m.logicModule, m.properties = c.cloneLogicModule(m) + ch <- update{origLogicModule, m} + return false + }) + if len(errs) > 0 { + panic(errs) + } doneCh <- true }() @@ -2514,71 +2679,73 @@ func (c *Context) generateModuleBuildActions(config interface{}, } }() - c.parallelVisit(bottomUpVisitor, func(module *moduleInfo) bool { + visitErrs := parallelVisit(c.modulesSorted, bottomUpVisitor, parallelVisitLimit, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + uniqueName := c.nameInterface.UniqueName(newNamespaceContext(module), module.group.name) + sanitizedName := toNinjaName(uniqueName) - uniqueName := c.nameInterface.UniqueName(newNamespaceContext(module), module.group.name) - sanitizedName := toNinjaName(uniqueName) + prefix := moduleNamespacePrefix(sanitizedName + "_" + module.variant.name) - prefix := moduleNamespacePrefix(sanitizedName + "_" + module.variant.name) + // The parent scope of the moduleContext's local scope gets overridden to be that of the + // calling Go package on a per-call basis. Since the initial parent scope doesn't matter we + // just set it to nil. + scope := newLocalScope(nil, prefix) - // The parent scope of the moduleContext's local scope gets overridden to be that of the - // calling Go package on a per-call basis. Since the initial parent scope doesn't matter we - // just set it to nil. - scope := newLocalScope(nil, prefix) - - mctx := &moduleContext{ - baseModuleContext: baseModuleContext{ - context: c, - config: config, - module: module, - }, - scope: scope, - handledMissingDeps: module.missingDeps == nil, - } - - func() { - defer func() { - if r := recover(); r != nil { - in := fmt.Sprintf("GenerateBuildActions for %s", module) - if err, ok := r.(panicError); ok { - err.addIn(in) - mctx.error(err) - } else { - mctx.error(newPanicErrorf(r, in)) - } - } - }() - mctx.module.logicModule.GenerateBuildActions(mctx) - }() - - if len(mctx.errs) > 0 { - errsCh <- mctx.errs - return true - } - - if module.missingDeps != nil && !mctx.handledMissingDeps { - var errs []error - for _, depName := range module.missingDeps { - errs = append(errs, c.missingDependencyError(module, depName)) + mctx := &moduleContext{ + baseModuleContext: baseModuleContext{ + context: c, + config: config, + module: module, + }, + scope: scope, + handledMissingDeps: module.missingDeps == nil, } - errsCh <- errs - return true - } - depsCh <- mctx.ninjaFileDeps + func() { + defer func() { + if r := recover(); r != nil { + in := fmt.Sprintf("GenerateBuildActions for %s", module) + if err, ok := r.(panicError); ok { + err.addIn(in) + mctx.error(err) + } else { + mctx.error(newPanicErrorf(r, in)) + } + } + }() + mctx.module.logicModule.GenerateBuildActions(mctx) + }() - newErrs := c.processLocalBuildActions(&module.actionDefs, - &mctx.actionDefs, liveGlobals) - if len(newErrs) > 0 { - errsCh <- newErrs - return true - } - return false - }) + if len(mctx.errs) > 0 { + errsCh <- mctx.errs + return true + } + + if module.missingDeps != nil && !mctx.handledMissingDeps { + var errs []error + for _, depName := range module.missingDeps { + errs = append(errs, c.missingDependencyError(module, depName)) + } + errsCh <- errs + return true + } + + depsCh <- mctx.ninjaFileDeps + + newErrs := c.processLocalBuildActions(&module.actionDefs, + &mctx.actionDefs, liveGlobals) + if len(newErrs) > 0 { + errsCh <- newErrs + return true + } + return false + }) cancelCh <- struct{}{} <-cancelCh + errs = append(errs, visitErrs...) + return deps, errs } diff --git a/context_test.go b/context_test.go index 91089e6..dd5ec38 100644 --- a/context_test.go +++ b/context_test.go @@ -838,3 +838,219 @@ func Test_findVariant(t *testing.T) { }) } } + +func Test_parallelVisit(t *testing.T) { + moduleA := &moduleInfo{ + group: &moduleGroup{ + name: "A", + }, + } + moduleB := &moduleInfo{ + group: &moduleGroup{ + name: "B", + }, + } + moduleC := &moduleInfo{ + group: &moduleGroup{ + name: "C", + }, + } + moduleD := &moduleInfo{ + group: &moduleGroup{ + name: "D", + }, + } + moduleA.group.modules = modulesOrAliases{moduleA} + moduleB.group.modules = modulesOrAliases{moduleB} + moduleC.group.modules = modulesOrAliases{moduleC} + moduleD.group.modules = modulesOrAliases{moduleD} + + addDep := func(from, to *moduleInfo) { + from.directDeps = append(from.directDeps, depInfo{to, nil}) + from.forwardDeps = append(from.forwardDeps, to) + to.reverseDeps = append(to.reverseDeps, from) + } + + // A depends on B, B depends on C. Nothing depends on D, and D doesn't depend on anything. + addDep(moduleA, moduleB) + addDep(moduleB, moduleC) + + t.Run("no modules", func(t *testing.T) { + errs := parallelVisit(nil, bottomUpVisitorImpl{}, 1, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + panic("unexpected call to visitor") + }) + if errs != nil { + t.Errorf("expected no errors, got %q", errs) + } + }) + t.Run("bottom up", func(t *testing.T) { + order := "" + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 1, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + order += module.group.name + return false + }) + if errs != nil { + t.Errorf("expected no errors, got %q", errs) + } + if g, w := order, "CBA"; g != w { + t.Errorf("expected order %q, got %q", w, g) + } + }) + t.Run("pause", func(t *testing.T) { + order := "" + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC, moduleD}, bottomUpVisitorImpl{}, 1, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + if module == moduleC { + // Pause module C on module D + unpause := make(chan struct{}) + pause <- pauseSpec{moduleC, moduleD, unpause} + <-unpause + } + order += module.group.name + return false + }) + if errs != nil { + t.Errorf("expected no errors, got %q", errs) + } + if g, w := order, "DCBA"; g != w { + t.Errorf("expected order %q, got %q", w, g) + } + }) + t.Run("cancel", func(t *testing.T) { + order := "" + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 1, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + order += module.group.name + // Cancel in module B + return module == moduleB + }) + if errs != nil { + t.Errorf("expected no errors, got %q", errs) + } + if g, w := order, "CB"; g != w { + t.Errorf("expected order %q, got %q", w, g) + } + }) + t.Run("pause and cancel", func(t *testing.T) { + order := "" + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC, moduleD}, bottomUpVisitorImpl{}, 1, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + if module == moduleC { + // Pause module C on module D + unpause := make(chan struct{}) + pause <- pauseSpec{moduleC, moduleD, unpause} + <-unpause + } + order += module.group.name + // Cancel in module D + return module == moduleD + }) + if errs != nil { + t.Errorf("expected no errors, got %q", errs) + } + if g, w := order, "D"; g != w { + t.Errorf("expected order %q, got %q", w, g) + } + }) + t.Run("parallel", func(t *testing.T) { + order := "" + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 3, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + order += module.group.name + return false + }) + if errs != nil { + t.Errorf("expected no errors, got %q", errs) + } + if g, w := order, "CBA"; g != w { + t.Errorf("expected order %q, got %q", w, g) + } + }) + t.Run("pause existing", func(t *testing.T) { + order := "" + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 3, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + if module == moduleA { + // Pause module A on module B (an existing dependency) + unpause := make(chan struct{}) + pause <- pauseSpec{moduleA, moduleB, unpause} + <-unpause + } + order += module.group.name + return false + }) + if errs != nil { + t.Errorf("expected no errors, got %q", errs) + } + if g, w := order, "CBA"; g != w { + t.Errorf("expected order %q, got %q", w, g) + } + }) + t.Run("cycle", func(t *testing.T) { + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC}, bottomUpVisitorImpl{}, 3, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + if module == moduleC { + // Pause module C on module A (a dependency cycle) + unpause := make(chan struct{}) + pause <- pauseSpec{moduleC, moduleA, unpause} + <-unpause + } + return false + }) + want := []string{ + `encountered dependency cycle`, + `"C" depends on "A"`, + `"A" depends on "B"`, + `"B" depends on "C"`, + } + for i := range want { + if len(errs) <= i { + t.Errorf("missing error %s", want[i]) + } else if !strings.Contains(errs[i].Error(), want[i]) { + t.Errorf("expected error %s, got %s", want[i], errs[i]) + } + } + if len(errs) > len(want) { + for _, err := range errs[len(want):] { + t.Errorf("unexpected error %s", err.Error()) + } + } + }) + t.Run("pause cycle", func(t *testing.T) { + errs := parallelVisit([]*moduleInfo{moduleA, moduleB, moduleC, moduleD}, bottomUpVisitorImpl{}, 3, + func(module *moduleInfo, pause chan<- pauseSpec) bool { + if module == moduleC { + // Pause module C on module D + unpause := make(chan struct{}) + pause <- pauseSpec{moduleC, moduleD, unpause} + <-unpause + } + if module == moduleD { + // Pause module D on module C (a pause cycle) + unpause := make(chan struct{}) + pause <- pauseSpec{moduleD, moduleC, unpause} + <-unpause + } + return false + }) + want := []string{ + `encountered dependency cycle`, + `"C" depends on "D"`, + `"D" depends on "C"`, + } + for i := range want { + if len(errs) <= i { + t.Errorf("missing error %s", want[i]) + } else if !strings.Contains(errs[i].Error(), want[i]) { + t.Errorf("expected error %s, got %s", want[i], errs[i]) + } + } + if len(errs) > len(want) { + for _, err := range errs[len(want):] { + t.Errorf("unexpected error %s", err.Error()) + } + } + }) +} diff --git a/module_ctx.go b/module_ctx.go index 50fbcf0..1e52730 100644 --- a/module_ctx.go +++ b/module_ctx.go @@ -725,6 +725,7 @@ type mutatorContext struct { newVariations modulesOrAliases // new variants of existing modules newModules []*moduleInfo // brand new modules defaultVariation *string + pauseCh chan<- pauseSpec } type BaseMutatorContext interface { @@ -1102,6 +1103,23 @@ func (mctx *mutatorContext) CreateModule(factory ModuleFactory, props ...interfa return module.logicModule } +// pause waits until the given dependency has been visited by the mutator's parallelVisit call. +// It returns true if the pause was supported, false if the pause was not supported and did not +// occur, which will happen when the mutator is not parallelizable. +func (mctx *mutatorContext) pause(dep *moduleInfo) bool { + if mctx.pauseCh != nil { + unpause := make(unpause) + mctx.pauseCh <- pauseSpec{ + paused: mctx.module, + until: dep, + unpause: unpause, + } + <-unpause + return true + } + return false +} + // SimpleName is an embeddable object to implement the ModuleContext.Name method using a property // called "name". Modules that embed it must also add SimpleName.Properties to their property // structure list.