Introduction
The document contains the code piece which converts an input kafka stream into a pandas dataframe. This code can be used with any custom block created in the RZT AI Platform.
Block Code
import pandas as pd # A temporary list to store read Records temp_list = [] # Getting the Input Topic's schema (Column Names and Their Types) input_schema = self.get_schema(topic=self.input_dict["DataSource"]["queueTopicName"]) # Creating a list of Column names to be used while creating DataFrame columns = list(input_schema.keys()) # Consumer is the consumer object created for reading records from input topic for row in consumer.receive(): temp_list.append(row) # Creating the data frame df = pd.DataFrame(temp_list, columns=columns)