[Yandex Cloud documentation](../../index.md) > [Yandex Data Streams](../index.md) > [Tutorials](index.md) > Delivering data to Yandex Managed Service for Apache Kafka® using Yandex Data Transfer

# Delivering data from a Data Streams queue to Managed Service for Apache Kafka®

# Delivering data from a Data Streams queue to Managed Service for Apache Kafka® using Data Transfer

With Data Transfer, you can deliver data from a stream in Data Streams to a Managed Service for Apache Kafka® cluster.

To transfer data:

1. [Set up a data stream in Data Streams](#prepare-source).
1. [Set up and activate the transfer](#prepare-transfer).
1. [Test your transfer](#verify-transfer).

If you no longer need the resources you created, [delete them](#clear-out).


## Required paid resources {#paid-resources}

* Managed Service for YDB database (see [Managed Service for YDB pricing](../../ydb/pricing/index.md)). Its cost depends on the deployment mode:

	* In serverless mode, you pay for data operations as well as the amount of stored data and backups.
  	* In dedicated instance mode, you pay for the use of computing resources allocated to the database, storage size, and backups.

* Data Streams (see [Data Streams pricing](../pricing.md)). The cost depends on the pricing model:

    * [Based on allocated resources](../pricing.md#rules): You pay a fixed hourly rate for the established throughput limit and message retention period, and additionally for the number of units of actually written data.
    * [On-demand](../pricing.md#on-demand): You pay for the performed read/write operations, the amount of read or written data, and the actual storage used for messages that are still within their retention period.

* Managed Service for Apache Kafka® cluster, which includes computing resources allocated to hosts, storage and backup size (see [Managed Service for Apache Kafka® pricing](../../managed-kafka/pricing.md)).
* Public IP addresses if public access is enabled for cluster hosts (see [Virtual Private Cloud pricing](../../vpc/pricing.md)).
* Each transfer, which includes the use of computing resources and number of transferred data rows (see [Data Transfer pricing](../../data-transfer/pricing.md)).


## Getting started {#before-you-begin}

Set up your data delivery infrastructure:

{% list tabs group=instructions %}

- Manually {#manual}

    1. [Create a Managed Service for YDB database](../../ydb/operations/manage-databases.md) of your preferred configuration.

    1. [Create a Managed Service for Apache Kafka® cluster](../../managed-kafka/operations/cluster-create.md) of any suitable configuration with publicly accessible hosts.

        {% note info %}
        
        Public access to cluster hosts is required if you plan to connect to the cluster via the internet. This connection option is simpler and is recommended for the purposes of this guide. You can connect to non-public hosts as well but only from Yandex Cloud virtual machines located in the same cloud network as the cluster.
        
        {% endnote %}

    1. [In the Managed Service for Apache Kafka® cluster, create a topic](../../managed-kafka/operations/cluster-topics.md#create-topic) named `sensors`.

    1. [In the Managed Service for Apache Kafka® cluster, create a user](../../managed-kafka/operations/cluster-accounts.md#create-account) named `mkf-user` with the `ACCESS_ROLE_PRODUCER` and `ACCESS_ROLE_CONSUMER` access permissions for the new topic.


- Terraform {#tf}

    1. If you do not have Terraform yet, [install it](../../tutorials/infrastructure-management/terraform-quickstart.md#install-terraform).
    1. [Get the authentication credentials](../../tutorials/infrastructure-management/terraform-quickstart.md#get-credentials). You can add them to environment variables or specify them later in the provider configuration file.
    1. [Configure and initialize a provider](../../tutorials/infrastructure-management/terraform-quickstart.md#configure-provider). There is no need to create a provider configuration file manually, you can [download it](https://github.com/yandex-cloud-examples/yc-terraform-provider-settings/blob/main/provider.tf).
    1. Place the configuration file in a separate working directory and [specify the parameter values](../../tutorials/infrastructure-management/terraform-quickstart.md#configure-provider). If you did not add the authentication credentials to environment variables, specify them in the configuration file.

    1. Download the [yds-to-kafka.tf](https://github.com/yandex-cloud-examples/yc-data-transfer-from-yds-to-kafka/blob/main/yds-to-kafka.tf) configuration file to the same working directory.

        This file describes:

        * [Network](../../vpc/concepts/network.md#network).
        * [Subnet](../../vpc/concepts/network.md#subnet).
        * [Security group](../../vpc/concepts/security-groups.md) and rules for connecting to a Managed Service for Apache Kafka® cluster.
        * Managed Service for YDB database.
        * Managed Service for Apache Kafka® cluster.
        * Managed Service for Apache Kafka® topic named `sensors`.
        * Managed Service for Apache Kafka® user with the `ACCESS_ROLE_PRODUCER` and `ACCESS_ROLE_CONSUMER` access permissions for the `sensors` topic.
        * Transfer.

    1. In the `yds-to-kafka.tf` file, specify the following settings:

        * `mkf_version`: Apache Kafka® cluster version.
        * `ydb_name`: Managed Service for YDB database name.
        * `mkf_user_name`: Managed Service for Apache Kafka® cluster user name.
        * `mkf_user_password`: Managed Service for Apache Kafka® cluster user password.
        * `transfer_enabled`: Set to `0` to ensure no transfer is created until you [create endpoints manually](#prepare-transfer).

    1. Validate your Terraform configuration files using this command:

        ```bash
        terraform validate
        ```

        Terraform will display any configuration errors detected in your files.

    1. Create the required infrastructure:

        1. Run this command to view the planned changes:
        
           ```bash
           terraform plan
           ```
        
           If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
        
        1. If everything looks correct, apply the changes:
           1. Run this command:
        
              ```bash
              terraform apply
              ```
        
           1. Confirm updating the resources.
           1. Wait for the operation to complete.

        All the required resources will be created in the specified folder. You can check resource availability and their settings in the [management console](https://console.yandex.cloud).

{% endlist %}

## Create a data stream in Data Streams {#prepare-source}

[Create a data stream in Data Streams](../quickstart/create-stream.md).

## Set up and activate the transfer {#prepare-transfer}

1. [Create a Data Streams source endpoint](../../data-transfer/operations/endpoint/index.md#create).

    * **Database type**: `Yandex Data Streams`.
    * **Endpoint parameters**:

        * **Connection settings**:

            * **Database**: Select the Managed Service for YDB database from the list.
            * **Stream**: Specify the name of the stream in Data Streams.
            * **Service account**: Select an existing service account or create a new one with the `yds.editor` role.

        * **Advanced settings**:

            * **Conversion rules**: `JSON`.
            * **Data scheme**: `JSON specification`:

            Fill in the data schema:

            {% cut "Data schema" %}

            ```json
                [
                    {
                        "name": "device_id",
                        "type": "string"
                    },
                    {
                        "name": "datetime",
                        "type": "datetime"
                    },
                    {
                        "name": "latitude",
                        "type": "double"
                    },
                    {
                        "name": "longitude",
                        "type": "double"
                    },
                    {
                        "name": "altitude",
                        "type": "double"
                    },
                    {
                        "name": "speed",
                        "type": "double"
                    },
                    {
                        "name": "battery_voltage",
                        "type": "any"
                    },
                    {
                        "name": "cabin_temperature",
                        "type": "double"
                    },
                    {
                        "name": "fuel_level",
                        "type": "any"
                    }
                ]
            ```

            {% endcut %}

1. [Create a target endpoint in Managed Service for Apache Kafka®](../../data-transfer/operations/endpoint/index.md#create).

    * **Database type**: `Kafka`.
    * **Endpoint parameters**:

        * **Connection settings**:
        
            * **Connection type**: Select `Managed Service for Apache Kafka cluster`.
            * **Managed Service for Apache Kafka cluster**: Select your Managed Service for Apache Kafka® cluster from the list.
            * **Authentication**: Select **SASL**.
            * **Username**: Enter the Managed Service for Apache Kafka® cluster user name.
            * **Password**: Enter the Managed Service for Apache Kafka® cluster user password.
            * **Topic**: Select **Topic full name**.
            * **Topic full name**: Enter the name of the topic in the Managed Service for Apache Kafka® cluster.

1. Create a transfer:

    {% list tabs group=instructions %}

    - Manually {#manual}

        1. [Create a transfer](../../data-transfer/operations/transfer.md#create) of the **Replication** type that will use the endpoints you created.
        1. [Activate](../../data-transfer/operations/transfer.md#activate) the transfer.

    - Terraform {#tf}

        1. In the `yds-to-kafka.tf` file, specify the values of the following variables:

            * `source_endpoint_id`: Source endpoint ID.
            * `target_endpoint_id`: Target endpoint ID.
            * `transfer_enabled`: Set to `1` to create a transfer.

        1. Validate your Terraform configuration files using this command:

            ```bash
            terraform validate
            ```

            Terraform will display any configuration errors detected in your files.

        1. Create the required infrastructure:

            1. Run this command to view the planned changes:
            
               ```bash
               terraform plan
               ```
            
               If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
            
            1. If everything looks correct, apply the changes:
               1. Run this command:
            
                  ```bash
                  terraform apply
                  ```
            
               1. Confirm updating the resources.
               1. Wait for the operation to complete.

            The transfer will be activated automatically.

    {% endlist %}

## Test your transfer {#verify-transfer}

1. Wait for the transfer status to change to **Replicating**.

1. [Send test data to the stream in Data Streams](../operations/aws-cli/send.md):

    ```json
    {
        "device_id":"iv9a94th6rzt********",
        "datetime":"2020-06-05T17:27:00",
        "latitude":"55.70329032",
        "longitude":"37.65472196",
        "altitude":"427.5",
        "speed":"0",
        "battery_voltage":"23.5",
        "cabin_temperature":"17",
        "fuel_level":null
    }
    ```

1. Make sure the data has been transferred to the `sensors` topic in the Managed Service for Apache Kafka® cluster:

    1. [Get an SSL certificate](../../managed-kafka/operations/connect/index.md#get-ssl-cert) to connect to the Managed Service for Apache Kafka® cluster.
    1. [Install](../../managed-kafka/operations/connect/clients.md#bash-zsh) `kafkacat`.
    1. [Run](../../managed-kafka/operations/connect/clients.md#with-ssl_1) the command for receiving messages from a topic.

## Delete the resources you created {#clear-out}

{% note info %}

Before deleting the resources, [deactivate the transfer](../../data-transfer/operations/transfer.md#deactivate).

{% endnote %}

To reduce the consumption of resources, delete those you do not need:

1. [Delete the transfer](../../data-transfer/operations/transfer.md#delete).
1. [Delete](../../data-transfer/operations/endpoint/index.md#delete) the source and target endpoints.
1. If you created a service account when creating the source endpoint, [delete it](../../iam/operations/sa/delete.md).
1. Delete the other resources depending on how you created them:

   {% list tabs group=instructions %}

   - Manually {#manual}

       1. [Delete the Managed Service for Apache Kafka® cluster](../../managed-kafka/operations/cluster-delete.md).
       1. [Delete the Managed Service for YDB database](../../ydb/operations/manage-databases.md#delete-db).

   - Terraform {#tf}

       1. In the terminal window, go to the directory containing the infrastructure plan.
       
           {% note warning %}
       
           Make sure the directory has no Terraform manifests with the resources you want to keep. Terraform deletes all resources that were created using the manifests in the current directory.
       
           {% endnote %}
       
       1. Delete resources:
       
           1. Run this command:
       
               ```bash
               terraform destroy
               ```
       
           1. Confirm deleting the resources and wait for the operation to complete.
       
           All the resources described in the Terraform manifests will be deleted.

   {% endlist %}