Orchestrate Module
The Orchestrate module enables workflow execution and multi-step AI orchestration. It allows you to define complex workflows with conditional logic, parallel execution, and integration with resources, documents, and external APIs.
Basic Usage
import { PlatformClient } from '@enterpriseaigroup/platform-sdk';
const client = new PlatformClient({ /* config */ });
// Execute a workflow
const execution = await client.orchestrate.execute('workflow_id', {
input: { projectId: 'proj_123' }
});
// Check execution status
const status = await client.orchestrate.getStatus(execution.id);
console.log(`Status: ${status.state}`);
// List available workflows
const workflows = await client.orchestrate.listWorkflows();
Methods
execute()
Execute a workflow with input data.
execute(
workflowId: string,
input: WorkflowInput
): Promise<WorkflowExecution>
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflowId | string | Yes | Workflow identifier or name |
input | WorkflowInput | Yes | Input data for the workflow |
async | boolean | No | Execute asynchronously (default: false) |
metadata | Record<string, any> | No | Additional execution metadata |
Returns
interface WorkflowExecution {
id: string;
workflowId: string;
state: 'pending' | 'running' | 'completed' | 'failed';
input: WorkflowInput;
output?: any;
error?: string;
steps: StepExecution[];
startedAt: string;
completedAt?: string;
metadata?: Record<string, any>;
}
Example
const execution = await client.orchestrate.execute('analyze_project', {
input: {
projectId: 'proj_123',
analysisType: 'comprehensive',
includeRisks: true
},
metadata: {
requestedBy: 'user_456',
priority: 'high'
}
});
console.log(`Execution ID: ${execution.id}`);
console.log(`State: ${execution.state}`);
if (execution.state === 'completed') {
console.log('Results:', execution.output);
}
getStatus()
Get the current status of a workflow execution.
getStatus(executionId: string): Promise<WorkflowExecution>
Example
const status = await client.orchestrate.getStatus('exec_789');
console.log(`State: ${status.state}`);
console.log(`Progress: ${status.steps.filter(s => s.state === 'completed').length}/${status.steps.length}`);
if (status.state === 'failed') {
console.error(`Error: ${status.error}`);
}
listWorkflows()
List all available workflows.
listWorkflows(options?: ListOptions): Promise<Workflow[]>
Returns
interface Workflow {
id: string;
name: string;
description?: string;
version: string;
inputSchema: object;
outputSchema?: object;
steps: WorkflowStep[];
tags?: string[];
createdAt: string;
updatedAt: string;
}
Example
const workflows = await client.orchestrate.listWorkflows();
workflows.forEach(workflow => {
console.log(`${workflow.name} (v${workflow.version})`);
console.log(` ${workflow.description}`);
console.log(` Steps: ${workflow.steps.length}`);
});
getWorkflow()
Get detailed information about a specific workflow.
getWorkflow(workflowId: string): Promise<Workflow>
Example
const workflow = await client.orchestrate.getWorkflow('analyze_project');
console.log(`Name: ${workflow.name}`);
console.log(`Description: ${workflow.description}`);
console.log('\nSteps:');
workflow.steps.forEach((step, i) => {
console.log(`${i + 1}. ${step.name} (${step.type})`);
});
cancel()
Cancel a running workflow execution.
cancel(executionId: string): Promise<void>
Example
await client.orchestrate.cancel('exec_789');
console.log('Execution cancelled');
listExecutions()
List workflow executions with filtering.
listExecutions(options?: ExecutionListOptions): Promise<WorkflowExecution[]>
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflowId | string | No | Filter by workflow ID |
state | ExecutionState | No | Filter by state |
limit | number | No | Maximum results (default: 50) |
offset | number | No | Pagination offset |
Example
const executions = await client.orchestrate.listExecutions({
workflowId: 'analyze_project',
state: 'completed',
limit: 10
});
executions.forEach(exec => {
console.log(`${exec.id}: ${exec.state} (${exec.startedAt})`);
});
Workflow Definition
Workflows are defined with a series of steps that can include AI operations, resource queries, document processing, and custom logic.
Workflow Structure
interface Workflow {
id: string;
name: string;
description?: string;
version: string;
inputSchema: object; // JSON Schema for input validation
outputSchema?: object; // JSON Schema for output
steps: WorkflowStep[];
errorHandling?: ErrorHandling;
timeout?: number; // Maximum execution time in seconds
}
Step Types
interface WorkflowStep {
id: string;
name: string;
type: StepType;
config: StepConfig;
dependencies?: string[]; // IDs of steps that must complete first
condition?: string; // Expression to evaluate before executing
retryPolicy?: RetryPolicy;
}
type StepType =
| 'ai_chat' // AI conversation
| 'resource_query' // Query resources
| 'document_process' // Process documents
| 'api_call' // External API call
| 'transform' // Data transformation
| 'condition' // Conditional branching
| 'parallel' // Parallel execution
| 'custom'; // Custom function
Example Workflows
Project Analysis Workflow
const execution = await client.orchestrate.execute('analyze_project', {
input: {
projectId: 'proj_123'
}
});
// Workflow executes:
// 1. Fetch project data
// 2. Query related documents
// 3. AI analysis of project status
// 4. Risk assessment
// 5. Generate recommendations
// 6. Create summary report
console.log('Analysis:', execution.output.analysis);
console.log('Risks:', execution.output.risks);
console.log('Recommendations:', execution.output.recommendations);
Document Processing Pipeline
const execution = await client.orchestrate.execute('process_documents', {
input: {
documentIds: ['doc_1', 'doc_2', 'doc_3']
}
});
// Workflow executes:
// 1. Validate documents
// 2. Extract text (parallel)
// 3. Classify each document
// 4. Index for RAG
// 5. Generate summary
Multi-Step AI Agent
const execution = await client.orchestrate.execute('research_agent', {
input: {
query: 'What are the top risks in our active projects?',
includeRecommendations: true
}
});
// Workflow executes:
// 1. Understand query intent
// 2. Query resources for active projects
// 3. For each project:
// a. Fetch documents
// b. Analyze risks
// 4. Aggregate results
// 5. Generate recommendations
// 6. Format response
Async Execution
For long-running workflows, use async execution:
// Start async execution
const execution = await client.orchestrate.execute('complex_analysis', {
input: { dataSet: 'large' },
async: true
});
console.log(`Started execution: ${execution.id}`);
// Poll for completion
while (true) {
const status = await client.orchestrate.getStatus(execution.id);
console.log(`State: ${status.state}`);
if (status.state === 'completed') {
console.log('Results:', status.output);
break;
} else if (status.state === 'failed') {
console.error('Execution failed:', status.error);
break;
}
await new Promise(resolve => setTimeout(resolve, 2000));
}
Step Execution Details
interface StepExecution {
stepId: string;
name: string;
state: 'pending' | 'running' | 'completed' | 'failed' | 'skipped';
input?: any;
output?: any;
error?: string;
startedAt?: string;
completedAt?: string;
duration?: number; // Milliseconds
retries?: number;
}
Viewing Step Progress
const status = await client.orchestrate.getStatus('exec_789');
status.steps.forEach(step => {
console.log(`\n${step.name}:`);
console.log(` State: ${step.state}`);
if (step.duration) {
console.log(` Duration: ${step.duration}ms`);
}
if (step.error) {
console.log(` Error: ${step.error}`);
}
});
Error Handling
try {
const execution = await client.orchestrate.execute('workflow_id', {
input: { data: 'value' }
});
} catch (error) {
if (error.code === 'WORKFLOW_NOT_FOUND') {
console.error('Workflow does not exist');
} else if (error.code === 'INVALID_INPUT') {
console.error('Input validation failed:', error.details);
} else if (error.code === 'TIMEOUT') {
console.error('Workflow execution timed out');
} else {
console.error('Execution error:', error.message);
}
}
Handling Step Failures
const status = await client.orchestrate.getStatus('exec_789');
if (status.state === 'failed') {
// Find failed step
const failedStep = status.steps.find(s => s.state === 'failed');
if (failedStep) {
console.error(`Failed at step: ${failedStep.name}`);
console.error(`Error: ${failedStep.error}`);
// Optionally retry from failed step
// (if workflow supports it)
}
}
Custom Workflows
You can define custom workflows using workflow configuration:
const customWorkflow = {
name: 'custom_analysis',
description: 'Custom analysis workflow',
inputSchema: {
type: 'object',
properties: {
resourceId: { type: 'string' }
},
required: ['resourceId']
},
steps: [
{
id: 'fetch_resource',
name: 'Fetch Resource',
type: 'resource_query',
config: {
type: 'project',
filter: { 'id.eq': '{{input.resourceId}}' }
}
},
{
id: 'analyze',
name: 'AI Analysis',
type: 'ai_chat',
dependencies: ['fetch_resource'],
config: {
prompt: 'Analyze this project: {{steps.fetch_resource.output}}',
temperature: 0.3
}
},
{
id: 'generate_report',
name: 'Generate Report',
type: 'transform',
dependencies: ['analyze'],
config: {
template: {
resource: '{{steps.fetch_resource.output}}',
analysis: '{{steps.analyze.output.content}}',
timestamp: '{{now}}'
}
}
}
]
};
// Register and execute custom workflow
// (requires admin permissions)
Complete Example
Full workflow orchestration with monitoring:
async function executeWorkflowWithMonitoring(
workflowId: string,
input: WorkflowInput
) {
console.log(`Starting workflow: ${workflowId}`);
// Start execution
const execution = await client.orchestrate.execute(workflowId, {
input,
async: true
});
console.log(`Execution ID: ${execution.id}`);
// Monitor progress
let lastStepCount = 0;
while (true) {
const status = await client.orchestrate.getStatus(execution.id);
// Show new completed steps
const completedSteps = status.steps.filter(s => s.state === 'completed');
if (completedSteps.length > lastStepCount) {
const newSteps = completedSteps.slice(lastStepCount);
newSteps.forEach(step => {
console.log(`✓ ${step.name} (${step.duration}ms)`);
});
lastStepCount = completedSteps.length;
}
// Check if done
if (status.state === 'completed') {
console.log('\n✓ Workflow completed successfully');
return status.output;
} else if (status.state === 'failed') {
const failedStep = status.steps.find(s => s.state === 'failed');
throw new Error(`Workflow failed at step "${failedStep?.name}": ${status.error}`);
}
// Wait before polling again
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
// Usage
try {
const result = await executeWorkflowWithMonitoring('analyze_project', {
projectId: 'proj_123',
depth: 'comprehensive'
});
console.log('\nResults:', result);
} catch (error) {
console.error('Workflow failed:', error.message);
}
Next Steps
- Learn about Chat module for AI steps
- Explore Resources module for data queries
- See Workflow examples
- Read Orchestration guide