Connectors → Apache Kafka

About the Apache Kafka connector

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. To learn more, please review Concepts → Apache Kafka.

With the Kafka connector, you can create an external data source for a Kafka topic available on a list of one or more Kafka brokers. The Kafka topic must contain messages in valid JavaScript Object Notation (JSON) format.

In order to ingest JSON using a defined schema, the Kafka connector requires an Apache Avro file. An Avro file is a map for how to serialize and flatten semi-structured, schema-less JSON into structured data for related physical schema tables. You can use the Avro Extractor Tool to generate an .avro file for the JSON messages related to a specific Kafka topic.

When enabled, the Kafka connector creates a Kafka Consumer in a Consumer Group that immediately begins ingesting an initial stream of messages from the Kafka topic from the available list of Kafka Brokers.

Important

In this release, the Kafka Consumer in Incorta uses a Java client for Apache Kafka 2.1.

The Kafka connector applies the Avro mapping file, transforms the schema-less, unordered JSON into structured data, encrypts the structured data, and stores the encrypted data as Comma Separated Value (.csv) files in Shared Storage for the given tenant.

Because Kafka is a platform for streaming data, the Kafka connector inherently loads data incrementally. The Kafka connector supports specifying a rewind offset.

The Kafka connector supports the following Incorta specific functionality:

Feature Supported
Encryption at Ingest
Incremental Loading
Multi-Source
Performance Optimization
Rewind to point in time
Single-Source
Webhook Callbacks

Deployment of the Kafka connector may require the following:

About the Kafka topic JSON message

As semi-structured data, JSON is schema-less and unordered. For this reason, the content of the JSON message for a given Kafka topic may vary by design. One message may contain only one JSON object in a nested array whereas another message may contain numerous nested objects within a nested array. In addition, some key-value pairs may be present, absent, and in differing order between messages.

The Kafka connector requires the JSON message to be valid JSON and in one line. In addition, the JSON message itself must contain a key labeled data that contains the actual data you want to store in Incorta.

Although optional, it is strongly recommended to include an additional key-pair that names the root entity in the physical schema table. For example, it is recommended that the JSON message contain a key arbitrarily labeled entity that contains the value of the name of root entity in the physical schema table

Here is an valid example:

{ "entity": "my_table",
  "data": { "name1": "value1",
            "name2": { "name3": "value4",
                       "name5": "value5"
                     }
          }
}
Important

Because of the key-value requirement for a ‘data’ key and for each message to be one line, you may need to make changes to the applications that produce and write JSON messages to a given Kafka topic.

About Apache Avro and the Avro Extractor Tool

Apache Avro is a JSON file itself that defines how to serialize JSON as a schema for a given JSON object. The Avro Extractor Tool allows you to generate an Avro file for the JSON messages related to a specific Kafka topic.

To learn more about Apache Avro, please review Concepts → Apache Avro.

To learn more about the Avro Extractor Tool, please review Tools → Avro Extractor Tool.

Deployment Steps

The Kafka connector ships with Incorta. In certain cases, you may need to make additional configurations to deploy the connector successfully as follows:

Specify a Kafka Consumer Service Name

For an Incorta Cluster with two or Incorta Nodes that each run a Loader Service, you must specify a Kafka Consumer Service Name in the Cluster Management Console.

A Cluster Management Console (CMC) Administrator for your Incorta Cluster must configure the Kafka Consumer Service Name. Changes to this property require that the CMC Administrator restart each Loader Service.

If your Incorta Cluster contains more than two Incorta Nodes each with a Loader Service, then you must specify the Incorta Node and Loader Service to use. If you do not assign a loader service to consume the Kafka messages, Incorta assigns a Loader Service randomly. This can result in unexpected behavior and row duplication.

Here are the steps to specify the required properties for the Server Configurations:

  • As the CMC Administrator, sign in to the CMC.
  • In the Navigation bar, select Clusters.
  • In the cluster list, select a Cluster name.
  • In the canvas tabs, select Cluster Configurations.
  • In the panel tabs, select Server Configurations.
  • In the left pane, select Clustering.
  • In the right pane, tor Kafka Consumer Service Name, enter the <NODE_NAME>.<SERVICE_NAME>.
  • Select Save.
Important

When changing the Kafka loader service consumer from one loader service (A) to another loader service (B), you must restart the current loader service (A) first, then restart the other loader service (B).

A CMC Administrator must restart the Loader Services in the Incorta cluster.

Create a Kafka consumer properties file

Apache Kafka supports numerous consumer configurations including support for security protocols such as SSL and SASL.

For the Incorta Node which hosts the Loader Service described in the Kafka Consumer Service Name for the given cluster, you can create a kafka-consumer.properties file.

The kafka-consumer.properties affects all external data sources that use the Kafka connector for the given tenant. A System Administrator for the operating system with root access creates a Kafka consumer properties file. The default location for where to create the kafka-consumer.properties file is:

/home/incorta/IncortaAnalytics/Tenants/<TENANT_NAME>/KAFKA/kafka-consumer.properties

After a System Administrator creates the kafka-consumer.properties file, a CMC Administrator must restart each Loader Service in an Incorta cluster.

Note

Until you create an external data source that uses a Kafka connector with Kafka Consumer property enabled, the KAFKA directory does not exist.

To learn more about the available Consumer Configurations properties, please review Apache Kafka 2.1 Consumer Configs documentation.

Here is an example of a bash shell script that creates the file as the incorta user:

# Switch to the incorta user
sudo su incorta

# Change as needed the path
INCORTA_NODE_LOADER_SERVICE=/home/incorta/IncortaAnalytics

# Change as needed the name of the tenant
TENANT_NAME=demo

# Do not change this file name
FILE_NAME=kafka-consumer.properties

# If the directory exists, the make directory command will fail
mkdir $INCORTA_NODE_LOADER_SERVICE/Tenants/$TENANT_NAME/KAFKA


# Add and remove properties. This script uses a new line \n character.
KAFKA_CONSUMER_CONFIG_PROPERTIES='
ssl.truststore.location=home/incorta/IncortaAnalytics/certs/kafka-client.keystore\n
ssl.truststore.password=4546xd5f45fpYF345ghgMX\n
security.protocol=SASL_SSL\n
sasl.mechanism=PLAIN\n
auto.commit.interval.ms=10000\n
request.timeout.ms=605000\n
max.poll.interval.ms=600000\n
'

# Create the file using the defined properties with echo and awk.
echo -e $KAFKA_CONSUMER_CONFIG_PROPERTIES | awk '{$1=$1;print}' > $INCORTA_NODE_LOADER_SERVICE/Tenants/$TENANT_NAME/KAFKA/$FILE_NAME

# Read the file to verify the properties and values.
cat $INCORTA_NODE_LOADER_SERVICE/Tenants/$TENANT_NAME/KAFKA/$FILE_NAME
Restart the Loader Services in the CMC

Here are the steps to restart the Loader Services in an Incorta Cluster from the Cluster Management Console (CMC).

  • As the CMC Administrator, sign in to the CMC.
  • In the Navigation bar, select Clusters.
  • In the cluster list, select a Cluster name.
  • Select the Services tab.
  • In the list of services, select a service of the Loader type.
  • In Service details, in the footer, select Restart.
  • In the Action menu, select Refresh.
  • After the service restarts, in the Summary section, for cluster, select the Cluster name.
  • Select the Services tab, and select the next Loader Service in the cluster to restart.
  • Repeat these steps as necessary for all Loader Services in the cluster.

Steps to connect Apache Kafka and Incorta

To connect Apache Kafka and Incorta, here are the high level steps, tools, and procedures:

Create an Apache Avro file

You must create an Avro file for an external data source that uses the Apache Kafka connector. To learn how to create an Avro file with the Avro Extractor Tool, please review Tools → Avro Extractor Tool.

Create an external data source

Here are the steps to create a external data source with the Kafka connector:

  • Sign in to the Incorta Direct Data Platform™.
  • In the Navigation bar, select Data.
  • In the Action bar, select + NewAdd Data Source.
  • In the Choose a Data Source dialog, in Streaming, select Kafka.
  • In the New Data Source dialog, specify the applicable connector properties.
  • To test, select Test Connection.
  • Select Ok to save your changes.

Kafka connector properties

Here are the properties for the Kafka connector:

Property Control Description
Data Source Name text box Enter the name of the data source
Topic text box Enter the name of the Kafka topic
Broker List text box Enter a comma-separated list of Kafka Brokers as follows:
<IPv4\_ADDRESS>:<PORT>
The default port for a Kafka Broker is 9092.
Message Type Field text box Optional.
If you created an .avro file using the same input parameter for the avroExtractor.jar command, then specify the same value for this property.
Trim messageType after dash toggle Optional.
If you created an .avro file using the same input parameter for the avroExtractor.jar command, then enable this property.
Kafka Version drop down list Select a supported Apache Kafka version. Blank defaults to the most current Kafka version.
Enable Kafka Consumer toggle When enabled, the Loader Service defined as the Kafka Consumer will start ingesting messages and write encrypted .csv files to to the KAFKA directory in the Tenant’s shared directory.
Date Expiration input number This property is not functional.
Offset rewind to text box Optional.
Using the yyyy-mm-dd hh:mm:ss.SSS zzz timestamp format, specify the time to rewind the consumer to. This may cause the consumer to retrieve old messages that might have been previously retrieved.
Mapping File upload Select Choose File and the select a local .avro file to upload to Incorta.

Create a schema with the Schema Wizard

Here are the steps to create a Kafka schema with the Schema Wizard:

  • Sign in to the Incorta Direct Data Platform™.
  • In the Navigation bar, select Schema.
  • In the Action bar, select + New → Schema Wizard
  • In (1) Choose a Source, specify the following:

    • For Enter a name, enter the schema name.
    • For Select a Datasource, select the Kafka external data source.
    • Optionally create a description.
  • In the Schema Wizard footer, select Next.
  • In (2) Manage Tables, in the Data Panel, first select the name of the Data Source, and then check the Select All checkbox.
  • In the Schema Wizard footer, select Next.
  • In (3) Finalize, in the Schema Wizard footer, select Create Schema.

Create a schema with the Schema Designer

Here are the steps to create a Kafka schema using the Schema Designer:

  • Sign in to the Incorta Direct Data Platform™.
  • In the Navigation bar, select Schema.
  • In the Action bar, select + New → Create Schema.
  • In Name, specify the schema name, and select Save.
  • In the Schema Designer, in Start adding tables to your schema, select Kafka.
  • In the Data Source dialog, specify the Kafka table data source properties.
  • Select Add.
  • In the Table Editor, in the Table Summary section, enter the table name.
  • To save your changes, select **Done **in the Action Bar.

Kafka table data source properties

For a schema table in Incorta, you can define the following Kafka specific data source properties as follows:

Property Control Description
Type drop down list Default is Kafka
Data Source drop down list Select the Kafka external data source
Query Text box Select the text box. In the Edit Query dialog, specify a SELECT statement for the table as defined in the .avro file.
Callback toggle Enables the Callback URL field
Callback URL text box This property appears when the Callback toggle is enabled. Specify the URL.

Example of Query:
SELECT id, dt, item, quantity, price FROM tblFruitSales.tblFruitSales
Important

The Kafka connector natively supports incremental loads for streaming messages from Kafka. There are no configurations to enable an Update query other than specifying one or more key columns in the physical schema table. You can schedule an incremental load for a Kafka table in a schema as frequently as every 5 minutes.

View the schema diagram with the Schema Diagram Viewer

Here are the steps to view the schema diagram using the Schema Diagram Viewer:

  • Sign in to the Incorta Direct Data Platform™.
  • In the Navigation bar, select Schema.
  • In the list of schemas, select the Kafka schema.
  • In the Schema Designer, in the Action bar, select Diagram.

Load the schema

Here are the steps to perform a Full Load of the Kafka schema using the Schema Designer:

  • Sign in to the Incorta Direct Data Platform™.
  • In the Navigation bar, select Schema.
  • In the list of schemas, select the Kafka schema.
  • In the Schema Designer, in the Action bar, select Load → Load Now → Full.
  • To review the load status, in Last Load Status, select the date.

Here are the steps to perform an Incremental Load of the Kafka schema with the Schema Designer:

  • Sign in to the Incorta Direct Data Platform™.
  • In the Navigation bar, select Schema.
  • In the list of schemas, select the Kafka schema.
  • In the Schema Designer, in the Action bar, select Load → Load Now → Incremental.
  • To review the load status, in Last Load Status, select the date.

Explore the schema

With the full or incremental load of the Kafka schema complete, you can use the Analyzer to explore the schema, create your first insight, and save the insight to a new dashboard.

To open the Analyzer from the schema, follow these steps:

  • In the Navigation bar, select Schema.
  • In the Schema Manager, in the List view, select the Kafka schema.
  • In the Schema Designer, in the Action bar, select Explore Data.

Known issues

  • Incorta will convert Boolean and Date data types to String. You can use a formula column in a physical schema table to covert a String to a specific type using built-in functions.

© Incorta, Inc. All Rights Reserved.