Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions apps/sim/executor/execution/edge-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,171 @@ describe('EdgeManager', () => {
})
expect(readyNodes).toContain(function1Id)
})

/**
* Regression for the substring-match bug in clearDeactivatedEdgesForNodes.
*
* Reproduces the real workflow pattern where an empty upstream loop (e.g. KG) cascade
* deactivates its `loop_exit` edge into the next loop's sentinel-start (e.g. SBJ). When
* SBJ iterates and resets its state between iterations, the old buggy `includes(\`-${nodeId}-\`)`
* check matched edge keys where the sentinel was the TARGET (not the source), wrongly
* reactivating that external edge. That made countActiveIncomingEdges see a phantom pending
* upstream and SBJ's sentinel-start stopped being ready, stalling the loop after iteration 1.
*/
it('should not re-activate external cascade-deactivated edges pointing INTO a loop node', () => {
const externalNodeId = 'external-node'
const sbjSentinelStartId = 'loop-sbj-sentinel-start'
const sbjSentinelEndId = 'loop-sbj-sentinel-end'
const bodyNodeId = 'body-node'

const externalNode = createMockNode(externalNodeId, [
{ target: sbjSentinelStartId, sourceHandle: 'condition-if' },
])
const sbjSentinelStartNode = createMockNode(
sbjSentinelStartId,
[{ target: bodyNodeId }],
[externalNodeId]
)
const bodyNode = createMockNode(
bodyNodeId,
[{ target: sbjSentinelEndId }],
[sbjSentinelStartId]
)
const sbjSentinelEndNode = createMockNode(sbjSentinelEndId, [], [bodyNodeId])

const nodes = new Map<string, DAGNode>([
[externalNodeId, externalNode],
[sbjSentinelStartId, sbjSentinelStartNode],
[bodyNodeId, bodyNode],
[sbjSentinelEndId, sbjSentinelEndNode],
])

const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)

edgeManager.processOutgoingEdges(externalNode, { selectedOption: 'else' })

expect(edgeManager.isNodeReady(sbjSentinelStartNode)).toBe(true)

edgeManager.clearDeactivatedEdgesForNodes(
new Set([sbjSentinelStartId, sbjSentinelEndId, bodyNodeId])
)

expect(edgeManager.isNodeReady(sbjSentinelStartNode)).toBe(true)
})

/**
* End-to-end regression: after a loop reset while an external edge is cascade-deactivated,
* the backwards `loop_continue` edge from sentinel-end must still mark sentinel-start as
* ready. The old code removed the external edge's deactivation entry, leaving a phantom
* active incoming and producing the exact "loop stops after 1 iteration" symptom the user
* hit on the Group A workflow.
*/
it('should leave sbjSentinelStart ready after loop reset when external edge is cascade-deactivated', () => {
const externalNodeId = 'external-node'
const sbjSentinelStartId = 'loop-sbj-sentinel-start'
const sbjSentinelEndId = 'loop-sbj-sentinel-end'
const bodyNodeId = 'body-node'

const externalNode = createMockNode(externalNodeId, [
{ target: sbjSentinelStartId, sourceHandle: 'condition-if' },
])
const sbjSentinelStartNode = createMockNode(
sbjSentinelStartId,
[{ target: bodyNodeId }],
[externalNodeId]
)
const bodyNode = createMockNode(
bodyNodeId,
[{ target: sbjSentinelEndId }],
[sbjSentinelStartId]
)
const sbjSentinelEndNode = createMockNode(
sbjSentinelEndId,
[{ target: sbjSentinelStartId, sourceHandle: 'loop_continue' }],
[bodyNodeId]
)

const nodes = new Map<string, DAGNode>([
[externalNodeId, externalNode],
[sbjSentinelStartId, sbjSentinelStartNode],
[bodyNodeId, bodyNode],
[sbjSentinelEndId, sbjSentinelEndNode],
])

const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)

edgeManager.processOutgoingEdges(externalNode, { selectedOption: 'else' })

edgeManager.clearDeactivatedEdgesForNodes(
new Set([sbjSentinelStartId, sbjSentinelEndId, bodyNodeId])
)

const readyNodes = edgeManager.processOutgoingEdges(sbjSentinelEndNode, {
selectedRoute: 'loop_continue',
})

expect(readyNodes).toContain(sbjSentinelStartId)
})

/**
* Guard against an overly narrow fix: edges whose SOURCE is inside the loop (e.g. a body
* node that deactivated its outgoing edge during the previous iteration) must still be
* cleared on reset so the next iteration can traverse them.
*/
it('should re-activate internal loop edges (source inside loop) when resetting loop state', () => {
const sbjSentinelStartId = 'loop-sbj-sentinel-start'
const sbjSentinelEndId = 'loop-sbj-sentinel-end'
const conditionInLoopId = 'condition-in-loop'
const thenBranchId = 'then-branch'

const sbjSentinelStartNode = createMockNode(sbjSentinelStartId, [
{ target: conditionInLoopId },
])
const conditionInLoopNode = createMockNode(
conditionInLoopId,
[
{ target: thenBranchId, sourceHandle: 'condition-if' },
{ target: sbjSentinelEndId, sourceHandle: 'condition-else' },
],
[sbjSentinelStartId]
)
const thenBranchNode = createMockNode(
thenBranchId,
[{ target: sbjSentinelEndId }],
[conditionInLoopId]
)
const sbjSentinelEndNode = createMockNode(
sbjSentinelEndId,
[],
[conditionInLoopId, thenBranchId]
)

const nodes = new Map<string, DAGNode>([
[sbjSentinelStartId, sbjSentinelStartNode],
[conditionInLoopId, conditionInLoopNode],
[thenBranchId, thenBranchNode],
[sbjSentinelEndId, sbjSentinelEndNode],
])

const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)

edgeManager.processOutgoingEdges(conditionInLoopNode, { selectedOption: 'else' })

edgeManager.clearDeactivatedEdgesForNodes(
new Set([sbjSentinelStartId, sbjSentinelEndId, conditionInLoopId, thenBranchId])
)

thenBranchNode.incomingEdges.add(conditionInLoopId)

const readyNodes = edgeManager.processOutgoingEdges(conditionInLoopNode, {
selectedOption: 'if',
})

expect(readyNodes).toContain(thenBranchId)
})
})

describe('restoreIncomingEdge', () => {
Expand Down
12 changes: 10 additions & 2 deletions apps/sim/executor/execution/edge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,21 @@ export class EdgeManager {

/**
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
*
* Only clears edges whose SOURCE is in the provided set. Edges pointing INTO a node in the set
* whose source lives outside (e.g. an external branch whose path was cascade-deactivated) must
* remain deactivated — otherwise `countActiveIncomingEdges` would count a source that will never
* fire again, stalling the loop on its next iteration.
*
* Edge-key format is `${sourceId}-${targetId}-${handle}`, so `startsWith("${nodeId}-")` uniquely
* matches "node is source". An `includes("-${nodeId}-")` check would also match "node is target"
* and is unsafe for the reset semantics.
*/
clearDeactivatedEdgesForNodes(nodeIds: Set<string>): void {
const edgesToRemove: string[] = []
for (const edgeKey of this.deactivatedEdges) {
for (const nodeId of nodeIds) {
if (edgeKey.startsWith(`${nodeId}-`) || edgeKey.includes(`-${nodeId}-`)) {
if (edgeKey.startsWith(`${nodeId}-`)) {
edgesToRemove.push(edgeKey)
break
}
Expand All @@ -142,7 +151,6 @@ export class EdgeManager {
for (const edgeKey of edgesToRemove) {
this.deactivatedEdges.delete(edgeKey)
}
// Also clear activated edge tracking for these nodes
for (const nodeId of nodeIds) {
this.nodesWithActivatedEdge.delete(nodeId)
}
Expand Down
34 changes: 3 additions & 31 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,8 @@ import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import type { LoopScope } from '@/executor/execution/state'
import type { BlockStateController, ContextExtensions } from '@/executor/execution/types'
import {
type ExecutionContext,
getNextExecutionOrder,
type NormalizedBlockOutput,
} from '@/executor/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { LoopConfigWithNodes } from '@/executor/types/loop'
import { buildContainerIterationContext } from '@/executor/utils/iteration-context'
import { replaceValidReferences } from '@/executor/utils/reference-validation'
import {
addSubflowErrorLog,
Expand All @@ -22,6 +17,7 @@ import {
buildSentinelEndId,
buildSentinelStartId,
emitEmptySubflowEvents,
emitSubflowSuccessEvents,
extractBaseBlockId,
resolveArrayInput,
validateMaxCount,
Expand Down Expand Up @@ -319,31 +315,7 @@ export class LoopOrchestrator {
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)

if (this.contextExtensions?.onBlockComplete) {
const now = new Date().toISOString()
const iterationContext = buildContainerIterationContext(ctx, loopId)

try {
await this.contextExtensions.onBlockComplete(
loopId,
'Loop',
'loop',
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
} catch (error) {
logger.warn('Loop completion callback failed', {
loopId,
error: error instanceof Error ? error.message : String(error),
})
}
}
await emitSubflowSuccessEvents(ctx, loopId, 'loop', output, this.contextExtensions)

return {
shouldContinue: false,
Expand Down
37 changes: 3 additions & 34 deletions apps/sim/executor/orchestrators/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ import { DEFAULTS } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
import type { ParallelScope } from '@/executor/execution/state'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import {
type ExecutionContext,
getNextExecutionOrder,
type NormalizedBlockOutput,
} from '@/executor/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { ParallelConfigWithNodes } from '@/executor/types/parallel'
import { buildContainerIterationContext } from '@/executor/utils/iteration-context'
import { ParallelExpander } from '@/executor/utils/parallel-expansion'
import {
addSubflowErrorLog,
emitEmptySubflowEvents,
emitSubflowSuccessEvents,
extractBranchIndex,
resolveArrayInput,
validateMaxCount,
Expand Down Expand Up @@ -318,34 +314,7 @@ export class ParallelOrchestrator {
const output = { results }
this.state.setBlockOutput(parallelId, output)

// Emit onBlockComplete for the parallel container so the UI can track it.
// When this parallel is nested inside a parent subflow (parallel or loop), emit
// iteration context so the terminal can group this event under the parent container.
if (this.contextExtensions?.onBlockComplete) {
const now = new Date().toISOString()
const iterationContext = buildContainerIterationContext(ctx, parallelId)

try {
await this.contextExtensions.onBlockComplete(
parallelId,
'Parallel',
'parallel',
{
output,
executionTime: 0,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
} catch (error) {
logger.warn('Parallel completion callback failed', {
parallelId,
error: error instanceof Error ? error.message : String(error),
})
}
}
await emitSubflowSuccessEvents(ctx, parallelId, 'parallel', output, this.contextExtensions)

return {
allBranchesComplete: true,
Expand Down
57 changes: 57 additions & 0 deletions apps/sim/executor/utils/subflow-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,60 @@ export async function emitEmptySubflowEvents(
}
}
}

/**
* Emits the BlockLog + onBlockComplete callback for a loop/parallel container that
* finished successfully with at least one iteration. Without this, successful container
* runs produce no top-level BlockLog, which forces the trace-span builder to fall back
* to generic counter-based names ("Loop 1", "Parallel 1") instead of the user-configured
* block name.
*/
export async function emitSubflowSuccessEvents(
ctx: ExecutionContext,
blockId: string,
blockType: 'loop' | 'parallel',
output: { results: any[] },
contextExtensions: ContextExtensions | null
): Promise<void> {
const now = new Date().toISOString()
const executionOrder = getNextExecutionOrder(ctx)
const block = ctx.workflow?.blocks.find((b) => b.id === blockId)
const blockName = block?.metadata?.name ?? blockType
const iterationContext = buildContainerIterationContext(ctx, blockId)

ctx.blockLogs.push({
blockId,
blockName,
blockType,
startedAt: now,
endedAt: now,
durationMs: DEFAULTS.EXECUTION_TIME,
success: true,
output,
executionOrder,
})

if (contextExtensions?.onBlockComplete) {
try {
await contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder,
endedAt: now,
},
iterationContext
)
} catch (error) {
logger.warn('Subflow success completion callback failed', {
blockId,
blockType,
error: error instanceof Error ? error.message : String(error),
})
}
}
}
Loading
Loading