Use Apache Flink® with Aiven for Apache Kafka® ====================================================== `Apache Flink® `_ is an open source platform for processing distributed streaming and batch data. Where Apache Kafka® excels at receiving and sending event streams, Flink consumes, transforms, aggregates, and enriches your data. .. Note:: If you want to experience the power of streaming SQL transformations with Flink, Aiven provides a managed :doc:`/docs/products/flink` with built-in data flow integration with Aiven for Apache Kafka®. The example in this article shows you how to create a simple Java Flink job that reads data from a Kafka topic, processes it, and then pushes it to a different Kafka topic. It uses the Java API on a `local installation of Apache Flink 1.15.1 `_, but it can be applied to use Aiven for Apache Kafka with any self-hosted cluster. .. _kafka-flink-java-prereq: Prerequisites ------------- You need an Aiven for Apache Kafka service up and running with two topics, named ``test-flink-input`` and ``test-flink-output``, already `created `_. Furthermore, for the example, you need to collect the following information about the Aiven for Apache Kafka service: * ``APACHE_KAFKA_HOST``: The hostname of the Apache Kafka service * ``APACHE_KAFKA_PORT``: The port of the Apache Kafka service You need to have `Apache Maven™ `_ installed to build the example. Setup the truststore and keystore '''''''''''''''''''''''''''''''''' Create a :doc:`Java keystore and truststore ` for the Aiven for Apache Kafka service. For the following example we assume: * The keystore is available at ``KEYSTORE_PATH/client.keystore.p12`` * The truststore is available at ``TRUSTSTORE_PATH/client.truststore.jks`` * For simplicity, the same secret (password) is used for both the keystore and the truststore, and is shown here as ``KEY_TRUST_SECRET`` Use Apache Flink with Aiven for Apache Kafka -------------------------------------------- The following example shows how to customise the ``DataStreamJob`` generated from the `Quickstart `_ to work with Aiven for Apache Kafka. .. Note:: The full code to run this example can be found in the `Aiven examples GitHub repository `_. 1. Generate a Flink job skeleton named ``flink-capitalizer`` using the Maven archetype: .. code:: shell mvn archetype:generate -DinteractiveMode=false \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.15.1 \ -DgroupId=io.aiven.example \ -DartifactId=flink-capitalizer \ -Dpackage=io.aiven.example.flinkcapitalizer \ -Dversion=0.0.1-SNAPSHOT 2. Uncomment the Kafka connector in `pom.xml`: .. code:: xml org.apache.flink flink-connector-kafka ${flink.version} Customize the ``DataStreamJob`` application ''''''''''''''''''''''''''''''''''''''''''''' In the generated code, ``DataStreamJob`` is the main entry point, and has already been configured with all of the context necessary to interact with the cluster for your processing. 1. Create a new class called ``io.aiven.example.flinkcapitalizer.StringCapitalizer`` which performs a simple ``MapFunction`` transformation on incoming records with every incoming string will be emitted as uppercase. .. code:: java package io.aiven.example.flinkcapitalizer; import org.apache.flink.api.common.functions.MapFunction; public class StringCapitalizer implements MapFunction { public String map(String s) { return s.toUpperCase(); } } 2. Import the following classes in the ``DataStreamJob`` .. code:: java import java.util.Properties; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; 3. Modify the ``main`` method in ``DataStreamJob`` to read and write from the Kafka topics, replacing the ``APACHE_KAFKA_HOST``, ``APACHE_KAFKA_PORT``, ``KEYSTORE_PATH``, ``TRUSTSTORE_PATH`` and ``KEY_TRUST_SECRET`` placeholders with the values from the :ref:`prerequisites `. .. code:: java public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("security.protocol", "SSL"); props.put("ssl.keystore.type", "PKCS12"); props.put("ssl.keystore.location", "KEYSTORE_PATH/client.keystore.p12"); props.put("ssl.keystore.password", "KEY_TRUST_SECRET"); props.put("ssl.key.password", "KEY_TRUST_SECRET"); props.put("ssl.truststore.type", "JKS"); props.put("ssl.truststore.location", "TRUSTSTORE_PATH/client.truststore.jks"); props.put("ssl.truststore.password", "KEY_TRUST_SECRET"); KafkaSource source = KafkaSource.builder() .setBootstrapServers("APACHE_KAFKA_HOST:APACHE_KAFKA_PORT") .setGroupId("test-flink-input-group") .setTopics("test-flink-input") .setProperties(props) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); KafkaSink sink = KafkaSink.builder() .setBootstrapServers("APACHE_KAFKA_HOST:APACHE_KAFKA_PORT") .setKafkaProducerConfig(props) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("test-flink-output") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); // ... processing continues here } 4. Tie the Kafka sources and sinks together with the ``StringCapitalizer`` in a single processing pipeline. .. code:: java // ... processing continues here env .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") .map(new StringCapitalizer()) .sinkTo(sink); env.execute("Flink Java capitalizer"); Build the application '''''''''''''''''''''''''''''''''''' From the main ``flink-capitalizer`` folder, execute the following Maven command to build the application: .. code:: shell mvn -DskipTests=true clean package The above command should create a ``jar`` file named ``target/flink-capitalizer-0.0.1-SNAPSHOT.jar``. Run the applications '''''''''''''''''''' If you have installed a `local cluster installation of Apache Flink 1.15.1 `_, you can launch the job on your local machine. ``$FLINK_HOME`` is the Flink installation directory. .. code:: shell $FLINK_HOME/bin/flink run target/flink-capitalizer-0.0.1-SNAPSHOT.jar You can see that the job is running in the Flink web UI at ``http://localhost:8081``. By following the article :doc:`/docs/products/kafka/howto/connect-with-command-line`, you can send string events to the input topic and verify that the messages are forwarded to the output topic in upper case.