telega/flow
Type-safe, persistent flow system for building multi-step conversational interactions.
Core Concepts
- Flow - State machine for multi-step conversations with compile-time validated transitions
- FlowRegistry - Central container managing all flows, supporting triggers and manual calls
- Storage - Pluggable persistence (database, memory, Redis) with automatic state recovery
Quick Start
// 1. Define flow steps
pub type OnboardingStep {
Welcome
CollectName
CollectEmail
Complete
}
// 2. Build flow with handlers
let onboarding_flow =
flow.new("onboarding", storage, step_to_string, string_to_step)
|> flow.add_step(Welcome, welcome_handler)
|> flow.add_step(CollectName, name_handler)
|> flow.add_step(CollectEmail, email_handler)
|> flow.add_step(Complete, complete_handler)
|> flow.build(initial: Welcome)
// 3. Register and apply to router
let flow_registry =
flow.new_registry()
|> flow.register(flow.OnCommand("/start"), onboarding_flow)
router |> flow.apply_to_router(flow_registry)
Advanced Usage
// Call flow from any handler
fn my_handler(ctx, _data) {
let initial_data = dict.from_list([#("product_id", "123")])
flow.call_flow(ctx, flow_registry, "checkout", initial_data)
}
// Conditional transitions
|> flow.add_conditional(
from: CollectAge,
condition: fn(instance) {
case flow.get_data(instance, "age") {
Some(age) -> int.parse(age) |> result.unwrap(0) >= 18
None -> False
}
},
true: AdultFlow,
false: MinorFlow
)
// Parallel step execution
|> flow.add_parallel_steps(
trigger_step: StartVerification,
parallel_steps: [EmailVerify, PhoneVerify, DocumentVerify],
join_at: VerificationComplete
)
Types
pub type ComposedStep {
ComposedFlowStep(Int)
ComposedSelectFlow
ComposedStartParallel
ComposedParallelFlow(Int)
ComposedMergeResults
}
Constructors
-
ComposedFlowStep(Int) -
ComposedSelectFlow -
ComposedStartParallel -
ComposedParallelFlow(Int) -
ComposedMergeResults
pub type ConditionalTransition(step_type) {
ConditionalTransition(
from: String,
conditions: List(#(fn(FlowInstance) -> Bool, step_type)),
default: step_type,
)
}
Constructors
-
ConditionalTransition( from: String, conditions: List(#(fn(FlowInstance) -> Bool, step_type)), default: step_type, )
pub type FlowAction(step_type) {
Next(step_type)
NextString(String)
Back
Complete(dict.Dict(String, String))
Cancel
Wait(String)
WaitCallback(String)
GoTo(step_type)
Exit(option.Option(dict.Dict(String, String)))
ReturnFromSubflow(result: dict.Dict(String, String))
StartParallel(steps: List(step_type), join_at: step_type)
CompleteParallelStep(
step: step_type,
result: dict.Dict(String, String),
)
}
Constructors
-
Next(step_type)Move to the next step
-
NextString(String)Move to next step by string name (for dynamic navigation)
-
BackGo back to previous step
-
Complete(dict.Dict(String, String))Complete the flow with data
-
CancelCancel the flow
-
Wait(String)Wait for user input
-
WaitCallback(String)Wait for callback query
-
GoTo(step_type)Jump to any step (clears scene data)
-
Exit(option.Option(dict.Dict(String, String)))Exit flow with result
-
ReturnFromSubflow(result: dict.Dict(String, String))Return from subflow
-
StartParallel(steps: List(step_type), join_at: step_type)Start parallel execution
-
CompleteParallelStep( step: step_type, result: dict.Dict(String, String), )Complete a parallel step
pub opaque type FlowBuilder(step_type, session, error)
Persistent flow instance with unique ID
pub type FlowInstance {
FlowInstance(
id: String,
flow_name: String,
user_id: Int,
chat_id: Int,
state: FlowState,
scene_data: dict.Dict(String, String),
wait_token: option.Option(String),
created_at: Int,
updated_at: Int,
)
}
Constructors
-
FlowInstance( id: String, flow_name: String, user_id: Int, chat_id: Int, state: FlowState, scene_data: dict.Dict(String, String), wait_token: option.Option(String), created_at: Int, updated_at: Int, )
Flow registry for centralized flow management
pub opaque type FlowRegistry(session, error)
Flow state representing current step and data
pub type FlowState {
FlowState(
current_step: String,
data: dict.Dict(String, String),
history: List(String),
flow_stack: List(FlowStackFrame),
parallel_state: option.Option(ParallelState),
)
}
Constructors
-
FlowState( current_step: String, data: dict.Dict(String, String), history: List(String), flow_stack: List(FlowStackFrame), parallel_state: option.Option(ParallelState), )
Storage interface for persistent flows
pub type FlowStorage(error) {
FlowStorage(
save: fn(FlowInstance) -> Result(Nil, error),
load: fn(String) -> Result(option.Option(FlowInstance), error),
delete: fn(String) -> Result(Nil, error),
list_by_user: fn(Int, Int) -> Result(
List(FlowInstance),
error,
),
)
}
Constructors
-
FlowStorage( save: fn(FlowInstance) -> Result(Nil, error), load: fn(String) -> Result(option.Option(FlowInstance), error), delete: fn(String) -> Result(Nil, error), list_by_user: fn(Int, Int) -> Result(List(FlowInstance), error), )
Trigger type for flow registration
pub type FlowTrigger {
OnCommand(command: String)
OnText(pattern: router.Pattern)
OnCallback(pattern: router.Pattern)
OnFiltered(filter: router.Filter)
OnPhoto
OnVideo
OnAudio
OnVoice
OnAnyText
}
Constructors
-
OnCommand(command: String) -
OnText(pattern: router.Pattern) -
OnCallback(pattern: router.Pattern) -
OnFiltered(filter: router.Filter) -
OnPhoto -
OnVideo -
OnAudio -
OnVoice -
OnAnyText
pub type ParallelConfig(step_type) {
ParallelConfig(
trigger_step: String,
parallel_steps: List(step_type),
join_step: step_type,
)
}
Constructors
-
ParallelConfig( trigger_step: String, parallel_steps: List(step_type), join_step: step_type, )
pub type StepConfig(step_type, session, error) {
StepConfig(
handler: fn(bot.Context(session, error), FlowInstance) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
middlewares: List(
fn(
bot.Context(session, error),
FlowInstance,
fn() -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
),
)
}
Constructors
-
StepConfig( handler: fn(bot.Context(session, error), FlowInstance) -> Result( #( bot.Context(session, error), FlowAction(step_type), FlowInstance, ), error, ), middlewares: List( fn( bot.Context(session, error), FlowInstance, fn() -> Result( #( bot.Context(session, error), FlowAction(step_type), FlowInstance, ), error, ), ) -> Result( #( bot.Context(session, error), FlowAction(step_type), FlowInstance, ), error, ), ), )
pub type StepHandler(step_type, session, error) =
fn(bot.Context(session, error), FlowInstance) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
pub type StepMiddleware(step_type, session, error) =
fn(
bot.Context(session, error),
FlowInstance,
fn() -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Result type for flow steps
pub type StepResult(step_type, session, error) =
Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Sub-flow configuration
pub type SubflowConfig(step_type, session, error) {
SubflowConfig(
trigger_step: String,
flow: Flow(dynamic.Dynamic, session, error),
return_step: step_type,
map_args: fn(FlowInstance) -> dict.Dict(String, String),
map_result: fn(dict.Dict(String, String), FlowInstance) -> FlowInstance,
)
}
Constructors
-
SubflowConfig( trigger_step: String, flow: Flow(dynamic.Dynamic, session, error), return_step: step_type, map_args: fn(FlowInstance) -> dict.Dict(String, String), map_result: fn(dict.Dict(String, String), FlowInstance) -> FlowInstance, )
Values
pub fn add_conditional(
builder: FlowBuilder(step_type, session, error),
from: step_type,
condition: fn(FlowInstance) -> Bool,
true on_true: step_type,
false on_false: step_type,
) -> FlowBuilder(step_type, session, error)
Add conditional transition
pub fn add_global_middleware(
builder: FlowBuilder(step_type, session, error),
middleware: fn(
bot.Context(session, error),
FlowInstance,
fn() -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> FlowBuilder(step_type, session, error)
Add global middleware that applies to all steps
pub fn add_multi_conditional(
builder: FlowBuilder(step_type, session, error),
from: step_type,
conditions: List(#(fn(FlowInstance) -> Bool, step_type)),
default: step_type,
) -> FlowBuilder(step_type, session, error)
Add multi-way conditional
pub fn add_parallel_steps(
builder: FlowBuilder(step_type, session, error),
trigger_step: step_type,
parallel_steps: List(step_type),
join_at: step_type,
) -> FlowBuilder(step_type, session, error)
Add parallel step execution.
@deprecated Use parallel() instead for cleaner API.
pub fn add_step(
builder: FlowBuilder(step_type, session, error),
step: step_type,
handler: fn(bot.Context(session, error), FlowInstance) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> FlowBuilder(step_type, session, error)
Add a step to the flow
pub fn add_step_with_middleware(
builder: FlowBuilder(step_type, session, error),
step: step_type,
middlewares: List(
fn(
bot.Context(session, error),
FlowInstance,
fn() -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
),
handler: fn(bot.Context(session, error), FlowInstance) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> FlowBuilder(step_type, session, error)
Add a step with middleware
pub fn add_subflow(
builder: FlowBuilder(step_type, session, error),
trigger_step: step_type,
subflow subflow: Flow(dynamic.Dynamic, session, error),
return_to return_to: step_type,
map_args map_args: fn(FlowInstance) -> dict.Dict(String, String),
map_result map_result: fn(
dict.Dict(String, String),
FlowInstance,
) -> FlowInstance,
) -> FlowBuilder(step_type, session, error)
Add sub-flow
pub fn apply_to_router(
router: router.Router(session, error),
registry: FlowRegistry(session, error),
) -> router.Router(session, error)
Apply all registered flows to a router
pub fn back(
ctx: bot.Context(session, error),
instance: FlowInstance,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Go back to previous step
pub fn build(
builder: FlowBuilder(step_type, session, error),
initial initial_step: step_type,
) -> Flow(step_type, session, error)
Build the flow
pub fn call_flow(
ctx ctx: bot.Context(session, error),
registry registry: FlowRegistry(session, error),
name flow_name: String,
initial initial_data: dict.Dict(String, String),
) -> Result(bot.Context(session, error), error)
Call a registered flow from any handler
Parameters
ctx: Current contextregistry: The flow registry containing the flowflow_name: Name of the flow to callinitial_data: Initial data to pass to the flow
Example
fn my_handler(ctx, registry, _data) {
let initial_data = dict.from_list([
#("user_name", "John"),
#("product_id", "123")
])
flow.call_flow(ctx, registry, "checkout", initial_data)
}
pub fn cancel(
ctx: bot.Context(session, error),
instance: FlowInstance,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Cancel the flow
pub fn clear_scene_data(instance: FlowInstance) -> FlowInstance
Clear all scene data
pub fn clear_scene_data_key(
instance: FlowInstance,
key key: String,
) -> FlowInstance
Clear specific scene data key
pub fn complete(
ctx: bot.Context(session, error),
instance: FlowInstance,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Complete the flow
pub fn compose_conditional(
name: String,
condition: fn(FlowInstance) -> String,
flows: dict.Dict(String, Flow(dynamic.Dynamic, session, error)),
storage: FlowStorage(error),
) -> Flow(ComposedStep, session, error)
Compose flows with conditional selection
pub fn compose_parallel(
name: String,
flows: List(Flow(dynamic.Dynamic, session, error)),
merge_results: fn(List(dict.Dict(String, String))) -> dict.Dict(
String,
String,
),
storage: FlowStorage(error),
) -> Flow(ComposedStep, session, error)
Compose flows for parallel execution
pub fn compose_sequential(
name: String,
flows: List(Flow(dynamic.Dynamic, session, error)),
storage: FlowStorage(error),
) -> Flow(ComposedStep, session, error)
Compose flows sequentially
pub fn create_memory_storage() -> FlowStorage(error)
Create in-memory storage (for testing)
pub fn create_resume_handler(
flow: Flow(step_type, session, error),
) -> fn(bot.Context(session, error), update.Update) -> Result(
bot.Context(session, error),
error,
)
Create a router handler for resuming flows from callback queries
pub fn create_resume_handler_with_keyboard(
flow: Flow(step_type, session, error),
callback_data: keyboard.KeyboardCallbackData(String),
) -> fn(bot.Context(session, error), update.Update) -> Result(
bot.Context(session, error),
error,
)
Create a router handler for resuming flows from callback queries with keyboard parsing
pub fn create_text_handler(
flow: Flow(step_type, session, error),
) -> fn(bot.Context(session, error), update.Update) -> Result(
bot.Context(session, error),
error,
)
Create a text handler for resuming flows
pub fn exit(
ctx: bot.Context(session, error),
instance: FlowInstance,
result result: option.Option(dict.Dict(String, String)),
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Exit with result
pub fn get_current_step(
flow: Flow(step_type, session, error),
instance: FlowInstance,
) -> Result(step_type, Nil)
Get current step
pub fn get_data(
instance: FlowInstance,
key key: String,
) -> option.Option(String)
Get data from the flow instance
pub fn get_scene_data(
instance: FlowInstance,
key key: String,
) -> option.Option(String)
Get scene data
pub fn goto(
ctx: bot.Context(session, error),
instance: FlowInstance,
step step: step_type,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Type-safe goto navigation (clears scene data)
pub fn is_callback_passed(
instance: FlowInstance,
key key: String,
callback_id callback_id: String,
) -> option.Option(Bool)
pub fn message_step(
message_fn: fn(FlowInstance) -> String,
next_step: option.Option(step_type),
) -> fn(bot.Context(session, error), FlowInstance) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Create a message display step
pub fn new(
flow_name: String,
storage: FlowStorage(error),
step_to_string: fn(step_type) -> String,
string_to_step: fn(String) -> Result(step_type, Nil),
) -> FlowBuilder(step_type, session, error)
Create a new flow builder
pub fn new_registry() -> FlowRegistry(session, error)
Create a new empty flow registry
pub fn new_with_default_converters(
flow_name: String,
storage: FlowStorage(error),
steps: List(#(String, step_type)),
) -> FlowBuilder(step_type, session, error)
Create a flow builder with default string conversion (uses string.inspect)
pub fn next(
ctx: bot.Context(session, error),
instance: FlowInstance,
step step: step_type,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Next navigation
pub fn next_with_data(
ctx: bot.Context(session, error),
instance: FlowInstance,
step step: step_type,
key key: String,
value value: String,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Update instance data and continue to next step
pub fn on_complete(
builder: FlowBuilder(step_type, session, error),
handler: fn(bot.Context(session, error), FlowInstance) -> Result(
bot.Context(session, error),
error,
),
) -> FlowBuilder(step_type, session, error)
Set completion handler
pub fn on_error(
builder: FlowBuilder(step_type, session, error),
handler: fn(
bot.Context(session, error),
FlowInstance,
option.Option(error),
) -> Result(bot.Context(session, error), error),
) -> FlowBuilder(step_type, session, error)
Set error handler
pub fn parallel(
builder: FlowBuilder(step_type, session, error),
from from: step_type,
steps steps: List(step_type),
join join: step_type,
) -> FlowBuilder(step_type, session, error)
Add parallel step execution (simplified API).
This is the recommended way to add parallel steps to a flow.
When the flow reaches from step, it will execute all steps in parallel,
and automatically transition to join step when all parallel steps complete.
Example
flow.new("kyc_verification", storage, to_string, from_string)
|> flow.add_step(Start, start_handler)
|> flow.add_step(EmailVerify, email_handler)
|> flow.add_step(PhoneVerify, phone_handler)
|> flow.add_step(DocumentVerify, document_handler)
|> flow.parallel(
from: Start,
steps: [EmailVerify, PhoneVerify, DocumentVerify],
join: AllComplete,
)
|> flow.add_step(AllComplete, complete_handler)
|> flow.build(initial: Start)
pub fn register(
registry: FlowRegistry(session, error),
trigger: FlowTrigger,
flow: Flow(step_type, session, error),
) -> FlowRegistry(session, error)
Add a flow to the registry with a trigger
pub fn register_callable(
registry: FlowRegistry(session, error),
flow: Flow(step_type, session, error),
) -> FlowRegistry(session, error)
Register a flow without a trigger (for calling from handlers)
pub fn register_with_data(
registry: FlowRegistry(session, error),
trigger: FlowTrigger,
flow: Flow(step_type, session, error),
initial_data: dict.Dict(String, String),
) -> FlowRegistry(session, error)
Add a flow to the registry with a trigger and initial data
pub fn store_data(
instance: FlowInstance,
key key: String,
value value: String,
) -> FlowInstance
Store data in the flow instance
pub fn store_scene_data(
instance: FlowInstance,
key key: String,
value value: String,
) -> FlowInstance
Store scene data
pub fn text_step(
prompt: String,
data_key: String,
next_step: step_type,
) -> fn(bot.Context(session, error), FlowInstance) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Create a text input step
pub fn to_handler(
flow flow: Flow(step_type, session, error),
) -> fn(bot.Context(session, error), update.Command) -> Result(
bot.Context(session, error),
error,
)
Create a router handler that starts a flow
pub fn unsafe_next(
ctx: bot.Context(session, error),
instance: FlowInstance,
step step: String,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Next navigation with string step (for dynamic navigation)
pub fn validation_middleware(
validator: fn(FlowInstance) -> Result(Nil, String),
) -> fn(
bot.Context(session, error),
FlowInstance,
fn() -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
),
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
pub fn wait(
ctx: bot.Context(session, error),
instance: FlowInstance,
token token: String,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Wait for user input
pub fn wait_callback(
ctx: bot.Context(session, error),
instance: FlowInstance,
token token: String,
) -> Result(
#(
bot.Context(session, error),
FlowAction(step_type),
FlowInstance,
),
error,
)
Wait for callback