# Confluent HTTP Sink

The easiest way to ingest data from Confluent Cloud based kafka pipelines is using their HTTP Sink.


# Prerequisites

To set up this integration you'll need to first:

  1. Set up an ingest using the Array of JSON Objects schema.
  2. Create an API Key to use with this integration with Write permissions. We'd recommend creating a dedicated Org-Wide API Key.
  3. Follow the instructions here to install the HTTP Sink connector on Confluent Cloud

# Setting up the connector

# 1 - Topic Selection

First, create a new HTTP Sink and select the source topic(s).

# 2 - Kafka Credentials

Choose an appropriate authentication method. These credentials need access to the topic, but will not be used by Aggregations.io.

# 3 - Authentication

Set the following options:

  • HTTP URL should be your POST URL, something like https://ingest.aggregations.io/[ID] (find it here)
  • Choose BASIC for the Endpoint Authentication Type
  • The Auth username should be blank.
  • The Auth password should be your API key.

# 4 - Configuration

Choose the relevant Input record value format for your data.

Click Show advanced configuration to reveal all the options you'll need.

Important settings:

  • Request Body Format should be set to json
  • Batch max size should be set ~ 250. Depending on the average size of your payload, you may want to increase or decrease this to ensure your payloads do not exceed 1MB
  • Batch json as array should be true
  • Retry related settings can be adjusted to your preference for robustness and resiliency.

# 5 - Sizing

Select the number of tasks that makes sense for your throughput and setup.

# 6 - Launch

Launch the connector. Confluent Cloud will by default create an error & DLQ topic for validating everything works as expected.


# Troubleshooting

# Error Codes

See the Ingest Docs for common errors ingesting events.

# Events don't seem to be batching?

From the Sink Docs:

The HTTP Sink connector does not batch requests for messages containing Kafka header values that are different.

Confirm your Kafka records have the same key.