Problem
You want to chain blocks together so that the output of one block is fed into another.
Solution
Up until now, each block was run individually using the .execute
method of the block. While it is possible to manually collect the output from one block and provide it as an input parameter to the next, to reap the full benefits of all the features of inter-connecting blocks, you'll need to create a pipeline.
A Pipeline is essentially a DAG (Directed Acyclic Graph) of blocks where data flows only in one direction. An example of a pipeline is as shown below.
from razor.blocks import Block, inputs, outputs from razor.blocks import SocketTransport from razor.blocks import ThreadExecutor, ProcessExecutor from razor.pipeline import Pipeline import random @outputs.series.generic('number_list') class GenerateNumbers(Block): def run (self, number_list): for i in range(1, 100): n = random.randint(1, 1000) number_list.put(n) @inputs.series.generic('number_list') @inputs.atomic.generic('factor') @outputs.series.generic('factor_list') class MultiplyByFactor(Block): def run (self, number_list, factor, factor_list): for number in number_list: res = number * factor factor_list.put(res) @inputs.series.generic('factor_list') class SumListElements(Block): def run(self, factor_list): factor_sum = 0 for factor in factor_list: factor_sum += factor print(factor_sum)
To create a pipeline, the output of one block is chained as an input to the another block. In the following example of the pipeline, the output of GenerateNumbers
is passed as an input to the MultiplyByFactor
and the output of MultiplyByFactor
is passed as in input to the SumListElements
A specific variable that needs to be passed to the next block can be accessed from the previous blocks object. As shown in the following pipeline, the number list which is passed to the MultiplyByFactor
block is accessed as generate_nums.number_list.
generate_nums = ( GenerateNumbers("generate_numbers") .number_list(SocketTransport) .executor(ThreadExecutor) ) multiply_by_factor = ( MultiplyByFactor("multiply_factor") .number_list(generate_nums.number_list) .factor(100) .factor_list(SocketTransport) .executor(ThreadExecutor) ) sum_list_elements = ( SumListElements("Sum elements") .factor_list(multiply_by_factor.factor_list) .executor(ThreadExecutor) )