Problem Statement

You want to define a block that works on a stream of data.


Solution

In the @inputs decorator class use the series attribute. Razor SDK treats a series of input as a Queue. There are a couple of ways to retrieve values from this queue:

  1. by treating the input as an iterator
  2. by using .get method to take one value at a time

The following example shows you how to rewrite the block from the previous recipe to one that accepts a stream of strings as input:

@inputs.series.generic(name='texts', doc='A string of text to split')
@inputs.atomic.generic(name='delimiter', doc='A single character or a sequence of characters')
@outputs.series.generic(name='data', doc='Results of the split operation')
class SplitStringSeries(Block):
    def run(self, texts, delimiter, data):
        for text in texts:
            res = text.split(delimiter)
            data.put(res)


Although stream blocks are usually used in pipelines where a series of operations are performed by blocks one after another, when using the .execute method to run a single block, a series input can be provided as an iterative object such as list, tuple, generator etc. In the above example, yielding the processed value rather than returning it will cause this block to stream its output as well.

# inputs can also be specified as class parameters during instantiation
split_string_series = (
    SplitStringSeries()
    .delimiter('/')
    .texts(['1/1/2020', '3/1/2020', '23/1/2020'])
)

result = split_string_series.execute()
list(result['data'].values())


# using generator comprehension to define a date generator
dates = (f'{i}/1/2000' for i in range(3))

result = split_string_series.texts(dates).execute()
list(result['data'].values())


Using .get method to retrieve values from the stream is typically used when you don't need to access the values sequentially; for example blocks that processes values in parallel. Upcoming recipes describe such blocks in more detail.