The document contains the code piece which converts an input kafka stream into a pandas dataframe. This code can be used with any custom block created in the RZT AI Platform.

Block Code

import pandas as pd

# A temporary list to store read Records
temp_list = []

# Getting the Input Topic's schema (Column Names and Their Types)
input_schema = self.get_schema(topic=self.input_dict["DataSource"]["queueTopicName"])

# Creating a list of Column names to be used while creating DataFrame
columns = list(input_schema.keys())

# Consumer is the consumer object created for reading records from input topic
for row in consumer.receive():

# Creating the data frame
df = pd.DataFrame(temp_list, columns=columns)