Problem

You want to chain blocks together so that the output of one block is fed into another.


Solution

Up until now, each block was run individually using the .execute method of the block. While it is possible to manually collect the output from one block and provide it as an input parameter to the next, to reap the full benefits of all the features of inter-connecting blocks, you'll need to create a pipeline.

A Pipeline is essentially a DAG (Directed Acyclic Graph) of blocks where data flows only in one direction. An example of a pipeline is as shown below.

from razor.blocks import Block, inputs, outputs
from razor.blocks import SocketTransport
from razor.blocks import ThreadExecutor, ProcessExecutor
from razor.pipeline import Pipeline

import random

@outputs.series.generic('number_list')
class GenerateNumbers(Block):
    def run (self, number_list):
        for i in range(1, 100):
            n = random.randint(1, 1000)
            number_list.put(n)
                
                
@inputs.series.generic('number_list')
@inputs.atomic.generic('factor')
@outputs.series.generic('factor_list')
class MultiplyByFactor(Block):
    def run (self, number_list, factor, factor_list):
        for number in number_list:
            res = number * factor
            factor_list.put(res)

@inputs.series.generic('factor_list')
class SumListElements(Block):
    def run(self, factor_list):
        factor_sum = 0
        for factor in factor_list:
            factor_sum += factor
        
        print(factor_sum)


To create a pipeline, the output of one block is chained as an input to the another block. In the following example of the pipeline, the output of GenerateNumbers is passed as an input to the MultiplyByFactor and the output of MultiplyByFactor is passed as in input to the SumListElements

A specific variable that needs to be passed to the next block can be accessed from the previous blocks object. As shown in the following pipeline, the number list which is passed to the MultiplyByFactor block is accessed as generate_nums.number_list.

generate_nums = (
    GenerateNumbers("generate_numbers")
    .number_list(SocketTransport)
    .executor(ThreadExecutor)
)

multiply_by_factor = (
    MultiplyByFactor("multiply_factor")
    .number_list(generate_nums.number_list)
    .factor(100)
    .factor_list(SocketTransport)
    .executor(ThreadExecutor)
)

sum_list_elements = (
    SumListElements("Sum elements")
    .factor_list(multiply_by_factor.factor_list)
    .executor(ThreadExecutor)
)