telega/flow

Type-safe, persistent flow system for building multi-step conversational interactions.

Core Concepts

Quick Start

// 1. Define flow steps
pub type OnboardingStep {
  Welcome
  CollectName
  CollectEmail
  Complete
}

// 2. Build flow with handlers
let onboarding_flow =
  flow.new("onboarding", storage, step_to_string, string_to_step)
  |> flow.add_step(Welcome, welcome_handler)
  |> flow.add_step(CollectName, name_handler)
  |> flow.add_step(CollectEmail, email_handler)
  |> flow.add_step(Complete, complete_handler)
  |> flow.build(initial: Welcome)

// 3. Register and apply to router
let flow_registry =
  flow.new_registry()
  |> flow.register(flow.OnCommand("/start"), onboarding_flow)

router |> flow.apply_to_router(flow_registry)

Advanced Usage

// Call flow from any handler
fn my_handler(ctx, _data) {
  let initial_data = dict.from_list([#("product_id", "123")])
  flow.call_flow(ctx, flow_registry, "checkout", initial_data)
}

// Conditional transitions
|> flow.add_conditional(
  from: CollectAge,
  condition: fn(instance) {
    case flow.get_data(instance, "age") {
      Some(age) -> int.parse(age) |> result.unwrap(0) >= 18
      None -> False
    }
  },
  true: AdultFlow,
  false: MinorFlow
)

// Parallel step execution
|> flow.add_parallel_steps(
  trigger_step: StartVerification,
  parallel_steps: [EmailVerify, PhoneVerify, DocumentVerify],
  join_at: VerificationComplete
)

Types

pub type ComposedStep {
  ComposedFlowStep(Int)
  ComposedSelectFlow
  ComposedStartParallel
  ComposedParallelFlow(Int)
  ComposedMergeResults
}

Constructors

  • ComposedFlowStep(Int)
  • ComposedSelectFlow
  • ComposedStartParallel
  • ComposedParallelFlow(Int)
  • ComposedMergeResults
pub type ConditionalTransition(step_type) {
  ConditionalTransition(
    from: String,
    conditions: List(#(fn(FlowInstance) -> Bool, step_type)),
    default: step_type,
  )
}

Constructors

  • ConditionalTransition(
      from: String,
      conditions: List(#(fn(FlowInstance) -> Bool, step_type)),
      default: step_type,
    )
pub opaque type Flow(step_type, session, error)
pub type FlowAction(step_type) {
  Next(step_type)
  NextString(String)
  Back
  Complete(dict.Dict(String, String))
  Cancel
  Wait(String)
  WaitCallback(String)
  GoTo(step_type)
  Exit(option.Option(dict.Dict(String, String)))
  ReturnFromSubflow(result: dict.Dict(String, String))
  StartParallel(steps: List(step_type), join_at: step_type)
  CompleteParallelStep(
    step: step_type,
    result: dict.Dict(String, String),
  )
}

Constructors

  • Next(step_type)

    Move to the next step

  • NextString(String)

    Move to next step by string name (for dynamic navigation)

  • Back

    Go back to previous step

  • Complete(dict.Dict(String, String))

    Complete the flow with data

  • Cancel

    Cancel the flow

  • Wait(String)

    Wait for user input

  • WaitCallback(String)

    Wait for callback query

  • GoTo(step_type)

    Jump to any step (clears scene data)

  • Exit(option.Option(dict.Dict(String, String)))

    Exit flow with result

  • ReturnFromSubflow(result: dict.Dict(String, String))

    Return from subflow

  • StartParallel(steps: List(step_type), join_at: step_type)

    Start parallel execution

  • CompleteParallelStep(
      step: step_type,
      result: dict.Dict(String, String),
    )

    Complete a parallel step

pub opaque type FlowBuilder(step_type, session, error)

Persistent flow instance with unique ID

pub type FlowInstance {
  FlowInstance(
    id: String,
    flow_name: String,
    user_id: Int,
    chat_id: Int,
    state: FlowState,
    scene_data: dict.Dict(String, String),
    wait_token: option.Option(String),
    created_at: Int,
    updated_at: Int,
  )
}

Constructors

  • FlowInstance(
      id: String,
      flow_name: String,
      user_id: Int,
      chat_id: Int,
      state: FlowState,
      scene_data: dict.Dict(String, String),
      wait_token: option.Option(String),
      created_at: Int,
      updated_at: Int,
    )

Flow registry for centralized flow management

pub opaque type FlowRegistry(session, error)

Stack frame for nested flow calls

pub type FlowStackFrame {
  FlowStackFrame(
    flow_name: String,
    return_step: String,
    saved_data: dict.Dict(String, String),
  )
}

Constructors

  • FlowStackFrame(
      flow_name: String,
      return_step: String,
      saved_data: dict.Dict(String, String),
    )

Flow state representing current step and data

pub type FlowState {
  FlowState(
    current_step: String,
    data: dict.Dict(String, String),
    history: List(String),
    flow_stack: List(FlowStackFrame),
    parallel_state: option.Option(ParallelState),
  )
}

Constructors

Storage interface for persistent flows

pub type FlowStorage(error) {
  FlowStorage(
    save: fn(FlowInstance) -> Result(Nil, error),
    load: fn(String) -> Result(option.Option(FlowInstance), error),
    delete: fn(String) -> Result(Nil, error),
    list_by_user: fn(Int, Int) -> Result(
      List(FlowInstance),
      error,
    ),
  )
}

Constructors

Trigger type for flow registration

pub type FlowTrigger {
  OnCommand(command: String)
  OnText(pattern: router.Pattern)
  OnCallback(pattern: router.Pattern)
  OnFiltered(filter: router.Filter)
  OnPhoto
  OnVideo
  OnAudio
  OnVoice
  OnAnyText
}

Constructors

pub type ParallelConfig(step_type) {
  ParallelConfig(
    trigger_step: String,
    parallel_steps: List(step_type),
    join_step: step_type,
  )
}

Constructors

  • ParallelConfig(
      trigger_step: String,
      parallel_steps: List(step_type),
      join_step: step_type,
    )

State for parallel step execution

pub type ParallelState {
  ParallelState(
    pending_steps: List(String),
    completed_steps: List(String),
    results: dict.Dict(String, dict.Dict(String, String)),
    join_step: String,
  )
}

Constructors

  • ParallelState(
      pending_steps: List(String),
      completed_steps: List(String),
      results: dict.Dict(String, dict.Dict(String, String)),
      join_step: String,
    )
pub type StepConfig(step_type, session, error) {
  StepConfig(
    handler: fn(bot.Context(session, error), FlowInstance) -> Result(
      #(
        bot.Context(session, error),
        FlowAction(step_type),
        FlowInstance,
      ),
      error,
    ),
    middlewares: List(
      fn(
        bot.Context(session, error),
        FlowInstance,
        fn() -> Result(
          #(
            bot.Context(session, error),
            FlowAction(step_type),
            FlowInstance,
          ),
          error,
        ),
      ) -> Result(
        #(
          bot.Context(session, error),
          FlowAction(step_type),
          FlowInstance,
        ),
        error,
      ),
    ),
  )
}

Constructors

pub type StepHandler(step_type, session, error) =
  fn(bot.Context(session, error), FlowInstance) -> Result(
    #(
      bot.Context(session, error),
      FlowAction(step_type),
      FlowInstance,
    ),
    error,
  )
pub type StepMiddleware(step_type, session, error) =
  fn(
    bot.Context(session, error),
    FlowInstance,
    fn() -> Result(
      #(
        bot.Context(session, error),
        FlowAction(step_type),
        FlowInstance,
      ),
      error,
    ),
  ) -> Result(
    #(
      bot.Context(session, error),
      FlowAction(step_type),
      FlowInstance,
    ),
    error,
  )

Result type for flow steps

pub type StepResult(step_type, session, error) =
  Result(
    #(
      bot.Context(session, error),
      FlowAction(step_type),
      FlowInstance,
    ),
    error,
  )

Sub-flow configuration

pub type SubflowConfig(step_type, session, error) {
  SubflowConfig(
    trigger_step: String,
    flow: Flow(dynamic.Dynamic, session, error),
    return_step: step_type,
    map_args: fn(FlowInstance) -> dict.Dict(String, String),
    map_result: fn(dict.Dict(String, String), FlowInstance) -> FlowInstance,
  )
}

Constructors

Values

pub fn add_conditional(
  builder: FlowBuilder(step_type, session, error),
  from: step_type,
  condition: fn(FlowInstance) -> Bool,
  true on_true: step_type,
  false on_false: step_type,
) -> FlowBuilder(step_type, session, error)

Add conditional transition

pub fn add_global_middleware(
  builder: FlowBuilder(step_type, session, error),
  middleware: fn(
    bot.Context(session, error),
    FlowInstance,
    fn() -> Result(
      #(
        bot.Context(session, error),
        FlowAction(step_type),
        FlowInstance,
      ),
      error,
    ),
  ) -> Result(
    #(
      bot.Context(session, error),
      FlowAction(step_type),
      FlowInstance,
    ),
    error,
  ),
) -> FlowBuilder(step_type, session, error)

Add global middleware that applies to all steps

pub fn add_multi_conditional(
  builder: FlowBuilder(step_type, session, error),
  from: step_type,
  conditions: List(#(fn(FlowInstance) -> Bool, step_type)),
  default: step_type,
) -> FlowBuilder(step_type, session, error)

Add multi-way conditional

pub fn add_parallel_steps(
  builder: FlowBuilder(step_type, session, error),
  trigger_step: step_type,
  parallel_steps: List(step_type),
  join_at: step_type,
) -> FlowBuilder(step_type, session, error)

Add parallel step execution.

@deprecated Use parallel() instead for cleaner API.

pub fn add_step(
  builder: FlowBuilder(step_type, session, error),
  step: step_type,
  handler: fn(bot.Context(session, error), FlowInstance) -> Result(
    #(
      bot.Context(session, error),
      FlowAction(step_type),
      FlowInstance,
    ),
    error,
  ),
) -> FlowBuilder(step_type, session, error)

Add a step to the flow

pub fn add_step_with_middleware(
  builder: FlowBuilder(step_type, session, error),
  step: step_type,
  middlewares: List(
    fn(
      bot.Context(session, error),
      FlowInstance,
      fn() -> Result(
        #(
          bot.Context(session, error),
          FlowAction(step_type),
          FlowInstance,
        ),
        error,
      ),
    ) -> Result(
      #(
        bot.Context(session, error),
        FlowAction(step_type),
        FlowInstance,
      ),
      error,
    ),
  ),
  handler: fn(bot.Context(session, error), FlowInstance) -> Result(
    #(
      bot.Context(session, error),
      FlowAction(step_type),
      FlowInstance,
    ),
    error,
  ),
) -> FlowBuilder(step_type, session, error)

Add a step with middleware

pub fn add_subflow(
  builder: FlowBuilder(step_type, session, error),
  trigger_step: step_type,
  subflow subflow: Flow(dynamic.Dynamic, session, error),
  return_to return_to: step_type,
  map_args map_args: fn(FlowInstance) -> dict.Dict(String, String),
  map_result map_result: fn(
    dict.Dict(String, String),
    FlowInstance,
  ) -> FlowInstance,
) -> FlowBuilder(step_type, session, error)

Add sub-flow

pub fn apply_to_router(
  router: router.Router(session, error),
  registry: FlowRegistry(session, error),
) -> router.Router(session, error)

Apply all registered flows to a router

pub fn back(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Go back to previous step

pub fn build(
  builder: FlowBuilder(step_type, session, error),
  initial initial_step: step_type,
) -> Flow(step_type, session, error)

Build the flow

pub fn call_flow(
  ctx ctx: bot.Context(session, error),
  registry registry: FlowRegistry(session, error),
  name flow_name: String,
  initial initial_data: dict.Dict(String, String),
) -> Result(bot.Context(session, error), error)

Call a registered flow from any handler

Parameters

  • ctx: Current context
  • registry: The flow registry containing the flow
  • flow_name: Name of the flow to call
  • initial_data: Initial data to pass to the flow

Example

fn my_handler(ctx, registry, _data) {
  let initial_data = dict.from_list([
    #("user_name", "John"),
    #("product_id", "123")
  ])
  flow.call_flow(ctx, registry, "checkout", initial_data)
}
pub fn cancel(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Cancel the flow

pub fn clear_scene_data(instance: FlowInstance) -> FlowInstance

Clear all scene data

pub fn clear_scene_data_key(
  instance: FlowInstance,
  key key: String,
) -> FlowInstance

Clear specific scene data key

pub fn complete(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Complete the flow

pub fn compose_conditional(
  name: String,
  condition: fn(FlowInstance) -> String,
  flows: dict.Dict(String, Flow(dynamic.Dynamic, session, error)),
  storage: FlowStorage(error),
) -> Flow(ComposedStep, session, error)

Compose flows with conditional selection

pub fn compose_parallel(
  name: String,
  flows: List(Flow(dynamic.Dynamic, session, error)),
  merge_results: fn(List(dict.Dict(String, String))) -> dict.Dict(
    String,
    String,
  ),
  storage: FlowStorage(error),
) -> Flow(ComposedStep, session, error)

Compose flows for parallel execution

pub fn compose_sequential(
  name: String,
  flows: List(Flow(dynamic.Dynamic, session, error)),
  storage: FlowStorage(error),
) -> Flow(ComposedStep, session, error)

Compose flows sequentially

pub fn create_memory_storage() -> FlowStorage(error)

Create in-memory storage (for testing)

pub fn create_resume_handler(
  flow: Flow(step_type, session, error),
) -> fn(bot.Context(session, error), update.Update) -> Result(
  bot.Context(session, error),
  error,
)

Create a router handler for resuming flows from callback queries

pub fn create_resume_handler_with_keyboard(
  flow: Flow(step_type, session, error),
  callback_data: keyboard.KeyboardCallbackData(String),
) -> fn(bot.Context(session, error), update.Update) -> Result(
  bot.Context(session, error),
  error,
)

Create a router handler for resuming flows from callback queries with keyboard parsing

pub fn create_text_handler(
  flow: Flow(step_type, session, error),
) -> fn(bot.Context(session, error), update.Update) -> Result(
  bot.Context(session, error),
  error,
)

Create a text handler for resuming flows

pub fn exit(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
  result result: option.Option(dict.Dict(String, String)),
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Exit with result

pub fn get_current_step(
  flow: Flow(step_type, session, error),
  instance: FlowInstance,
) -> Result(step_type, Nil)

Get current step

pub fn get_data(
  instance: FlowInstance,
  key key: String,
) -> option.Option(String)

Get data from the flow instance

pub fn get_scene_data(
  instance: FlowInstance,
  key key: String,
) -> option.Option(String)

Get scene data

pub fn goto(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
  step step: step_type,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Type-safe goto navigation (clears scene data)

pub fn is_callback_passed(
  instance: FlowInstance,
  key key: String,
  callback_id callback_id: String,
) -> option.Option(Bool)
pub fn message_step(
  message_fn: fn(FlowInstance) -> String,
  next_step: option.Option(step_type),
) -> fn(bot.Context(session, error), FlowInstance) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Create a message display step

pub fn new(
  flow_name: String,
  storage: FlowStorage(error),
  step_to_string: fn(step_type) -> String,
  string_to_step: fn(String) -> Result(step_type, Nil),
) -> FlowBuilder(step_type, session, error)

Create a new flow builder

pub fn new_registry() -> FlowRegistry(session, error)

Create a new empty flow registry

pub fn new_with_default_converters(
  flow_name: String,
  storage: FlowStorage(error),
  steps: List(#(String, step_type)),
) -> FlowBuilder(step_type, session, error)

Create a flow builder with default string conversion (uses string.inspect)

pub fn next(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
  step step: step_type,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Next navigation

pub fn next_with_data(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
  step step: step_type,
  key key: String,
  value value: String,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Update instance data and continue to next step

pub fn on_complete(
  builder: FlowBuilder(step_type, session, error),
  handler: fn(bot.Context(session, error), FlowInstance) -> Result(
    bot.Context(session, error),
    error,
  ),
) -> FlowBuilder(step_type, session, error)

Set completion handler

pub fn on_error(
  builder: FlowBuilder(step_type, session, error),
  handler: fn(
    bot.Context(session, error),
    FlowInstance,
    option.Option(error),
  ) -> Result(bot.Context(session, error), error),
) -> FlowBuilder(step_type, session, error)

Set error handler

pub fn parallel(
  builder: FlowBuilder(step_type, session, error),
  from from: step_type,
  steps steps: List(step_type),
  join join: step_type,
) -> FlowBuilder(step_type, session, error)

Add parallel step execution (simplified API).

This is the recommended way to add parallel steps to a flow. When the flow reaches from step, it will execute all steps in parallel, and automatically transition to join step when all parallel steps complete.

Example

flow.new("kyc_verification", storage, to_string, from_string)
|> flow.add_step(Start, start_handler)
|> flow.add_step(EmailVerify, email_handler)
|> flow.add_step(PhoneVerify, phone_handler)
|> flow.add_step(DocumentVerify, document_handler)
|> flow.parallel(
    from: Start,
    steps: [EmailVerify, PhoneVerify, DocumentVerify],
    join: AllComplete,
  )
|> flow.add_step(AllComplete, complete_handler)
|> flow.build(initial: Start)
pub fn register(
  registry: FlowRegistry(session, error),
  trigger: FlowTrigger,
  flow: Flow(step_type, session, error),
) -> FlowRegistry(session, error)

Add a flow to the registry with a trigger

pub fn register_callable(
  registry: FlowRegistry(session, error),
  flow: Flow(step_type, session, error),
) -> FlowRegistry(session, error)

Register a flow without a trigger (for calling from handlers)

pub fn register_with_data(
  registry: FlowRegistry(session, error),
  trigger: FlowTrigger,
  flow: Flow(step_type, session, error),
  initial_data: dict.Dict(String, String),
) -> FlowRegistry(session, error)

Add a flow to the registry with a trigger and initial data

pub fn store_data(
  instance: FlowInstance,
  key key: String,
  value value: String,
) -> FlowInstance

Store data in the flow instance

pub fn store_scene_data(
  instance: FlowInstance,
  key key: String,
  value value: String,
) -> FlowInstance

Store scene data

pub fn text_step(
  prompt: String,
  data_key: String,
  next_step: step_type,
) -> fn(bot.Context(session, error), FlowInstance) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Create a text input step

pub fn to_handler(
  flow flow: Flow(step_type, session, error),
) -> fn(bot.Context(session, error), update.Command) -> Result(
  bot.Context(session, error),
  error,
)

Create a router handler that starts a flow

pub fn unsafe_next(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
  step step: String,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Next navigation with string step (for dynamic navigation)

pub fn validation_middleware(
  validator: fn(FlowInstance) -> Result(Nil, String),
) -> fn(
  bot.Context(session, error),
  FlowInstance,
  fn() -> Result(
    #(
      bot.Context(session, error),
      FlowAction(step_type),
      FlowInstance,
    ),
    error,
  ),
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Validation middleware

pub fn wait(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
  token token: String,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Wait for user input

pub fn wait_callback(
  ctx: bot.Context(session, error),
  instance: FlowInstance,
  token token: String,
) -> Result(
  #(
    bot.Context(session, error),
    FlowAction(step_type),
    FlowInstance,
  ),
  error,
)

Wait for callback

Search Document