Lesson 15 - Asynchronous Loop

Goal

In this lesson we’ll learn how to loop asynchronously. When looping asynchronously, a new branch is created for each value in a list and the branches run in parallel.

Get Started

We’ll be creating a new flow that will call the new_hire flow we’ve built in previous lessons as a subflow. Let’s begin by creating a new file named hire_all.sl in the tutorials/hiring folder for our new flow. Also, we’ll need the new_hire.sl because we’re going to make some minor changes to that as well. And finally, we’ll pass our flow inputs using a file, so let’s create a tutorials/inputs folder and add a hires.yaml file.

Outputs

Since we’ll be using the new_hire flow as a subflow, it will be helpful if we add some flow outputs for a parent flow to make use of. We’ll simply add an outputs section at the bottom of our flow to output a bit of information. This outputs section is quite a distance from the flow key, so be extra careful to place it at the proper indentation.

outputs:
  - address
  - total_cost

Parent Flow

Our new hire_all flow is going to take in a list of names of people being hired and will call the new_hire flow for each one of them. It will be looping asynchronously, so all the new_hire flows will be running simultaneously.

In hire_all.sl we can start off as usual by declaring a namespace, specifying the imports and taking in the inputs, which in our case is a list of names.

namespace: tutorials.hiring

imports:
  base: tutorials.base

flow:
  name: hire_all

  inputs:
    - names_list

  workflow:

Loop Syntax

An asynchronous loop looks pretty similar to a normal for loop, but with a few key differences.

Let’s create a new task named process_all in which we’ll do our looping. Each branch of the loop will call the new_hire flow.

- process_all:
    async_loop:
      for: name in names_list
      do:
        new_hire:
          - first_name: name['first']
          - middle_name: name.get('middle','')
          - last_name: name['last']

As you can see, so far it is almost identical to a regular for loop, except the loop key has been replaced by async_loop.

The names_list input will be a list of dictionaries containing name information with the keys first, middle and last. For each name in names_list the new_hire flow will be called and passed the corresponding name values. The various branches running the new_hire flow will run in parallel and the rest of the flow will continue only after all the branches have completed.

Publish

As in a regular loop, you can also include a publish section in an asynchronous loop, but it works differently. In an asynchronous loop, the published variables are not published to the flow’s scope. Instead they publish operation or subflow outputs to be used for aggregation purposes in the aggregate section.

Here we are publishing the address and total_cost outputs that we just added to the new_hire flow.

- process_all:
    async_loop:
      for: name in names_list
      do:
        new_hire:
          - first_name: name['first']
          - middle_name: name.get('middle','')
          - last_name: name['last']
      publish:
        - address
        - total_cost

Aggregate

Whereas aggregation takes place in the publish section of a normal for loop, in an asynchronous loop there is an additional aggregate section.

The aggregate key is indented to be in line with the async_loop key, indicating that it does not run for each branch in the loop. Aggregation occurs only after all branches have completed.

In most cases the aggregation will make use of the branches_context list. This is a list that is populated with all of the published outputs from all of the branchs. For example, in our case, branches_context[0] will contain keys, corresponding to the published variables address and total_cost, mapped to the values output by the first branch to complete. Similarly, branches_context[1] will contain the keys address and total_cost mapped to the values output by the second branch to complete.

There is no way to predict the order in which branches will complete, so the branches_context is rarely accessed using particular indices. Instead, Python expressions are used to extract the desired aggregations.

- process_all:
        async_loop:
          for: name in names_list
          do:
            new_hire:
              - first_name: name['first']
              - middle_name: name.get('middle','')
              - last_name: name['last']
          publish:
            - address
            - total_cost
        aggregate:
          - email_list: filter(lambda x:x != '', map(lambda x:str(x['address']), branches_context))
          - cost: sum(map(lambda x:x['total_cost'], branches_context))

In our case we use the map(), filter() and sum() Python functions to create a list of all the email addresses that were created and a sum of all the equipment costs.

Input File

We’ll use an input file to send the flow our list of names. An input file is very similar to a system properties file. It is written in plain YAML which will make it easy for us to format and it will also be more readable than if we had taken a different approach.

Here is the contents of our hires.yaml input file.

names_list:
  - first: joe
    middle: p
    last: bloggs
  - first: jane
    last: doe
  - first: juan
    last: perez

Tasks

Finally, we have to add the tasks we referred to in the navigation section. We can put them right after the process_all task.

- print_success:
    do:
      base.print:
        - text: >
            "All addresses were created successfully.\nEmail addresses created: "
            + str(email_list) + "\nTotal cost: " + str(cost)

- on_failure:
    - print_failure:
        do:
          base.print:
            - text: >
                "Some addresses were not created or there is an email issue.\nEmail addresses created: "
                + str(email_list) + "\nTotal cost: " + str(cost)

Run It

We can save the files and run the flow. It’s a bit harder to track what has happened now because there are quite a few things happening at once. On careful inspection you will see that each task in the new_hire flow, and in each of its subflows, is run for each of the people in the names_list input.

run --f <folder path>/tutorials/hiring/hire_all.sl --cp <folder path>/tutorials/base,<folder path>/tutorials/hiring,<content folder path>/base --if <folder path>/tutorials/inputs/hires.yaml --spf <folder path>/tutorials/properties/bcompany.yaml

New Code - Complete

new_hire.sl

namespace: tutorials.hiring

imports:
  base: tutorials.base
  mail: io.cloudslang.base.mail

flow:
  name: new_hire

  inputs:
    - first_name
    - middle_name:
        required: false
    - last_name
    - missing:
        default: "''"
        overridable: false
    - total_cost:
        default: 0
        overridable: false
    - order_map: >
        {'laptop': 1000, 'docking station':200, 'monitor': 500, 'phone': 100}
    - hostname:
        system_property: tutorials.hiring.hostname
    - port:
        system_property: tutorials.hiring.port
    - from:
        system_property: tutorials.hiring.system_address
    - to:
        system_property: tutorials.hiring.hr_address

  workflow:
    - print_start:
        do:
          base.print:
            - text: "'Starting new hire process'"

    - create_email_address:
        loop:
          for: attempt in range(1,5)
          do:
            create_user_email:
              - first_name
              - middle_name
              - last_name
              - attempt
          publish:
            - address
          break:
            - CREATED
            - FAILURE
        navigate:
          CREATED: get_equipment
          UNAVAILABLE: print_fail
          FAILURE: print_fail

    - get_equipment:
        loop:
          for: item, price in order_map
          do:
            order:
              - item
              - price
          publish:
            - missing: self['missing'] + unavailable
            - total_cost: self['total_cost'] + cost
        navigate:
          AVAILABLE: print_finish
          UNAVAILABLE: print_finish

    - print_finish:
        do:
          base.print:
            - text: >
                'Created address: ' + address + ' for: ' + first_name + ' ' + last_name + '\n' +
                'Missing items: ' + missing + ' Cost of ordered items: ' + str(total_cost)

    - fancy_name:
        do:
          fancy_text:
            - text: first_name + ' ' + last_name
        publish:
          - fancy_text: fancy

    - send_mail:
        do:
          mail.send_mail:
            - hostname
            - port
            - from
            - to
            - subject: "'New Hire: ' + first_name + ' ' + last_name"
            - body: >
                fancy_text + '<br>' +
                'Created address: ' + address + ' for: ' + first_name + ' ' + last_name + '<br>' +
                'Missing items: ' + missing + ' Cost of ordered items: ' + str(total_cost)
        navigate:
          FAILURE: FAILURE
          SUCCESS: SUCCESS

    - on_failure:
      - print_fail:
          do:
            base.print:
              - text: "'Failed to create address for: ' + first_name + ' ' + last_name"

  outputs:
    - address
    - total_cost

hire_all.sl

namespace: tutorials.hiring

imports:
  base: tutorials.base

flow:
  name: hire_all

  inputs:
    - names_list

  workflow:
    - process_all:
        async_loop:
          for: name in names_list
          do:
            new_hire:
              - first_name: name['first']
              - middle_name: name.get('middle','')
              - last_name: name['last']
          publish:
            - address
            - total_cost
        aggregate:
          - email_list: filter(lambda x:x != '', map(lambda x:str(x['address']), branches_context))
          - cost: sum(map(lambda x:x['total_cost'], branches_context))
        navigate:
          SUCCESS: print_success
          FAILURE: print_failure

    - print_success:
        do:
          base.print:
            - text: >
                "All addresses were created successfully.\nEmail addresses created: "
                + str(email_list) + "\nTotal cost: " + str(cost)

    - on_failure:
        - print_failure:
            do:
              base.print:
                - text: >
                    "Some addresses were not created or there is an email issue.\nEmail addresses created: "
                    + str(email_list) + "\nTotal cost: " + str(cost)

hires.yaml

names_list:
  - first: joe
    middle: p
    last: bloggs
  - first: jane
    last: doe
  - first: juan
    last: perez