Create an Amazon S3 sink connector by Confluent from Apache Kafka®#
The Apache Kafka Connect® S3 sink connector by Aiven enables you to move data from an Aiven for Apache Kafka® cluster to Amazon S3 for long term storage.
Note
There are two versions of S3 sink connector available with Aiven for Apache Kafka Connect®: One is developed by Aiven, another developed by Confluent. This article uses the Confluent version. To learn about Aiven sink connector check out the dedicated page.
Note
You can check the full set of available parameters and configuration options in the connector’s documentation.
Prerequisites#
To setup the S3 sink connector by Confluent, you need an Aiven for Apache Kafka® service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
Furthermore you need to follow the steps to prepare the AWS account and S3 sink and collect the following information about the target S3 bucket upfront:
AWS_S3_NAME
: The name of the S3 bucketAWS_S3_REGION
: The AWS region where the S3 bucket has been createdAWS_USER_ACCESS_KEY_ID
: The AWS user access key IDAWS_USER_SECRET_ACCESS_KEY
: The AWS user secret access key
Set up an S3 sink connector with Aiven CLI#
The following example demonstrates how to setup an Apache Kafka Connect® S3 sink connector using the Aiven CLI dedicated command.
Define a Kafka Connect® configuration file#
Define the connector configurations in a file (we’ll refer to it with the name s3_sink.json
) with the following content:
{
"name": "<CONNECTOR_NAME>",
"topics": "<TOPIC_NAME>",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "1",
"s3.bucket.name": "<AWS_S3_NAME>",
"s3.region": "<AWS_S3_REGION>",
"s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.secret_access_key": "<AWS_USER_SECRET_ACCESS_KEY>",
"s3.credentials.provider.access_key_id": "<AWS_USER_ACCESS_KEY_ID>"
}
The configuration file contains the following entries:
name
: The connector nametopics
: The list of Apache Kafka® topics to sink to the S3 bucketkey.converter
andvalue.converter
: Data converters, depending on the topic data format. Check the related documentation for more informationformat.class
: Defines the output data format in the S3 bucket. Theio.confluent.connect.s3.format.bytearray.ByteArrayFormat
writes messages in binary format.flush.size
: Defines how many messages to write per file in the S3 bucket. E.g. settingflush.size
to3
generates a file every three messages in a topic and partition.s3.bucket.name
: The name of the S3 buckets3.region
: The AWS region where the S3 bucket has been createds3.credentials.provider.class
: The name of the class implementing thecom.amazonaws.auth.AWSCredentialsProvider
andorg.apache.kafka.common.Configurable
interfaces. Useio.aiven.kafka.connect.util.AivenAWSCredentialsProvider
.s3.credentials.provider.secret_access_key
: The AWS user secret access keys3.credentials.provider.access_key_id
: The AWS user access key ID
Check out the dedicated documentation for the full list of configuration options.
Create an S3 sink connector with Aiven CLI#
To create the connector, execute the following Aiven CLI command, replacing the SERVICE_NAME
with the name of the existing Aiven for Apache Kafka® service where the connector needs to run:
avn service connector create SERVICE_NAME @s3_sink.json
Check the connector status with the following command, replacing the SERVICE_NAME
with the existing Aiven for Apache Kafka® service and the CONNECTOR_NAME
with the name of the connector defined before:
avn service connector status SERVICE_NAME CONNECTOR_NAME
With the connection in place, verify that the data is flowing to the target S3 bucket.
Example: define a S3 sink connector#
The example creates an S3 sink connector with the following properties:
connector name:
my_s3_sink
source topics:
students
target S3 bucket name:
my-test-bucket
target S3 bucket region:
eu-central-1
AWS user access key id:
AKIAXXXXXXXXXX
AWS user secret access key:
hELuXXXXXXXXXXXXXXXXXXXXXXXXXX
generating a file in the S3 bucket every 10 messages
The connector configuration is the following:
{
"name": "my_s3_sink",
"topics": "students",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "10",
"s3.bucket.name": "my-test-bucket",
"s3.region": "eu-central-1",
"s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.secret_access_key": "hELuXXXXXXXXXXXXXXXXXXXXXXXXXX",
"s3.credentials.provider.access_key_id": "AKIAXXXXXXXXXX"
}
With the above configuration stored in a s3_sink.json
file, you can create the connector in the demo-kafka
instance with:
avn service connector create demo-kafka @s3_sink.json