You want to chain blocks together so that the output of one block is fed into another.
Up until now, each block was run individually using the
.execute method of the block. While it is possible to manually collect the output from one block and provide it as an input parameter to the next, to reap the full benefits of all the features of inter-connecting blocks, you'll need to create a pipeline.
A Pipeline is essentially a DAG (Directed Acyclic Graph) of blocks where data flows only in one direction. An example of a pipeline is as shown below.
from razor.blocks import Block, inputs, outputs from razor.blocks import SocketTransport from razor.blocks import ThreadExecutor, ProcessExecutor from razor.pipeline import Pipeline import random @outputs.series.generic('number_list') class GenerateNumbers(Block): def run (self, number_list): for i in range(1, 100): n = random.randint(1, 1000) number_list.put(n) @inputs.series.generic('number_list') @inputs.atomic.generic('factor') @outputs.series.generic('factor_list') class MultiplyByFactor(Block): def run (self, number_list, factor, factor_list): for number in number_list: res = number * factor factor_list.put(res) @inputs.series.generic('factor_list') class SumListElements(Block): def run(self, factor_list): factor_sum = 0 for factor in factor_list: factor_sum += factor print(factor_sum)
To create a pipeline, the output of one block is chained as an input to the another block. In the following example of the pipeline, the output of
GenerateNumbers is passed as an input to the
MultiplyByFactor and the output of
MultiplyByFactor is passed as in input to the
A specific variable that needs to be passed to the next block can be accessed from the previous blocks object. As shown in the following pipeline, the number list which is passed to the
MultiplyByFactor block is accessed as
generate_nums = ( GenerateNumbers("generate_numbers") .number_list(SocketTransport) .executor(ThreadExecutor) ) multiply_by_factor = ( MultiplyByFactor("multiply_factor") .number_list(generate_nums.number_list) .factor(100) .factor_list(SocketTransport) .executor(ThreadExecutor) ) sum_list_elements = ( SumListElements("Sum elements") .factor_list(multiply_by_factor.factor_list) .executor(ThreadExecutor) )