# Transferring data from a Apache Kafka® source endpoint

Yandex Data Transfer enables you to migrate data from an Apache Kafka® queue and implement various data transfer, processing, and transformation scenarios. To implement a transfer:

1. [Explore possible data transfer scenarios](#scenarios).
1. [Prepare the Apache Kafka®](#prepare) database for the transfer.
1. [Set up a source endpoint](#endpoint-settings) in Yandex Data Transfer.
1. [Set up one of the supported data targets](#supported-targets).
1. [Create](../../transfer.md#create) a transfer and [start](../../transfer.md#activate) it.
1. Perform required operations with the database and [control the transfer](../../monitoring.md).
1. In case of any issues, [use ready-made solutions](../../../troubleshooting/index.md) to resolve them.

## Scenarios for transferring data from Apache Kafka® {#scenarios}

1. Migration: Moving data from one storage to another. Migration often means migrating a database from obsolete local databases to managed cloud ones.

   Mirroring data across queues is a separate migration task.

    * [Apache Kafka®](../../../tutorials/mkf-to-mkf.md) mirroring.

1. Data delivery is a process of delivering arbitrary data to target storage. It includes data retrieval from a queue and its deserialization with subsequent transformation to target storage format.

    * [Apache Kafka® to ClickHouse®](../../../tutorials/mkf-to-mch.md).
    * [Apache Kafka® to PostgreSQL](../../../tutorials/mkf-to-mpg.md).
    * [Apache Kafka® to Greenplum®](../../../tutorials/managed-kafka-to-greenplum.md).
    * [Apache Kafka® to MongoDB](../../../tutorials/mkf-to-mmg.md).
    * [Apache Kafka® to MySQL®](../../../tutorials/mkf-to-mmy.md).
    * [Apache Kafka® to OpenSearch](../../../tutorials/mkf-to-mos.md).
    * [Apache Kafka® to YDB](../../../tutorials/mkf-to-ydb.md).

    
    * [Apache Kafka® in YDS](../../../tutorials/mkf-to-yds.md).



For a detailed description of possible Yandex Data Transfer scenarios, see [Tutorials](../../../tutorials/index.md).

## Preparing the source database {#prepare}

{% list tabs %}

- Managed Service for Apache Kafka®
    
    
    [Create a user](../../../../managed-kafka/operations/cluster-accounts.md#create-account) with the `ACCESS_ROLE_CONSUMER` role for the source topic.


- Apache Kafka®
    
    1. If not planning to use [Cloud Interconnect](../../../../interconnect/concepts/index.md) or [VPN](https://en.wikipedia.org/wiki/Virtual_private_network) for connections to an external cluster, make such cluster accessible from the Internet from [IP addresses used by Data Transfer](../../../../overview/concepts/public-ips.md#virtual-private-cloud).
       
       For details on linking your network up with external resources, see [this concept](../../../concepts/network.md#source-external).
    
    1. [Configure access to the source cluster from Yandex Cloud](../../../concepts/network.md#source-external).
    
    1. [Configure user access rights](https://kafka.apache.org/42/security/authorization-and-acls/#examples) to the topic you need.
    
    1. Grant the `READ` permissions to the consumer group whose ID matches the transfer ID.
    
        ```text
        bin/kafka-acls --bootstrap-server localhost:9092 \
          --command-config adminclient-configs.conf \
          --add \
          --allow-principal User:username \
          --operation Read \
          --group <transfer_ID>
        ```
    
    1. Optionally, to log in with a username and password, [configure SASL authentication](https://kafka.apache.org/42/security/authentication-using-sasl/).

{% endlist %}

## Configuring the Apache Kafka® source endpoint

When [creating](../index.md#create) or [updating](../index.md#update) an endpoint, you can define:

* [Yandex Managed Service for Apache Kafka® cluster](#managed-service) connection or [custom installation](#on-premise) settings, including those based on Yandex Compute Cloud VMs. These are required parameters.
* [Advanced settings](#additional-settings).

### Managed Service for Apache Kafka® cluster {#managed-service}


{% note warning %}

To create or edit an endpoint of a managed database, you will need the [`managed-kafka.viewer`](../../../../managed-kafka/security/index.md#mkf-viewer) role or the primitive [`viewer`](../../../../iam/roles-reference.md#viewer) role for the folder the cluster of this managed database resides in.

{% endnote %}


Connection with the cluster specified in Yandex Cloud.

{% list tabs group=instructions %}

- Management console {#console}

    * **Connection type**: Select a cluster connection option:
    
      * **Self-managed**: Allows you to specify connection settings manually.
    
        Select **Managed Service for Apache Kafka cluster** as the installation type and configure these settings:
    
        * **Managed Service for Apache Kafka cluster**: Select the cluster to connect to.
        * **Authentication**: Select the connection type (`SASL` or `No authentication`).
          
          If you select `SASL`:
          
          * **Username**: Specify the name of the account, under which Data Transfer will connect to the topic.
          * **Password**: Enter the account password.
    
      * **Connection Manager**: Allows connecting to the cluster via [Yandex Connection Manager](../../../../metadata-hub/quickstart/connection-manager.md):
    
        * Select the folder with the Managed Service for Apache Kafka® cluster.
        * Select **Managed DB cluster** as the installation type and configure these settings:
    
          * **Cluster for Managed DB**: Select the cluster to connect to.
          * **Connection**: Select or create a connection in Connection Manager.
    
        {% note warning %}
        
        To use a connection from Connection Manager, the user must have [access permissions](../../../../metadata-hub/operations/connection-access.md) for this connection of `connection-manager.user` or higher.
        
        {% endnote %}
    
    * **Topic full name**: Specify the name of the topic to connect to.
    
    * **Security groups**: Select the cloud network to host the endpoint and security groups for network traffic.
      
      Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see [Networking in Yandex Data Transfer](../../../concepts/network.md).

- Terraform {#tf}

    * Endpoint type: `kafka_source`.

    * `security_groups`: [Security groups](../../../../vpc/concepts/security-groups.md) for network traffic.
    
       Security group rules apply to a transfer. They allow opening network access from the transfer VM to the cluster. For more information, see [Networking in Yandex Data Transfer](../../../concepts/network.md).
    
       Security groups must belong to the same network as the cluster.
    
       {% note info %}
       
       In Terraform, it is not required to specify a network for security groups.
       
       {% endnote %}
    
    
    * `connection.cluster_id`: ID of the cluster to connect to.
    
    * `auth`: Authentication method used to connect to broker hosts:
      
      * `sasl`: SASL authentication.
      * `no_auth`: Without authentication.
    
    * `topic_names`: Names of the topics to connect to.

    Here is an example of the configuration file structure:

    
    ```hcl
    resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
      name = "<endpoint_name>"
      settings {
        kafka_source {
          security_groups = ["<list_of_security_group_IDs>"]
          connection {
            cluster_id = "<cluster_ID>"
          }
          auth {
            <authentication_method>
          }
          topic_names = ["<topic_name_list>"]
          <endpoint_advanced_settings>
        }
      }
    }
    ```


    For more information, see [this Terraform provider guide](../../../../terraform/resources/datatransfer_endpoint.md).

- API {#api}

    * `securityGroups`: Network traffic security groups whose rules apply to VMs and clusters without altering their configurations. For more information, see [Networking in Yandex Data Transfer](../../../concepts/network.md).
    
    * `connection.clusterId`: ID of the cluster to connect to.
    
    * `auth`: Authentication method used to connect to broker hosts:
      
      * `sasl`: SASL authentication. The following parameters are required:
      
         * `user`: Name of the account Data Transfer will use to connect to the topic.
         * `password.raw`: Password for the account in text form.
         * `mechanism`: Hashing mechanism.
      
      * `noAuth`: Without authentication.
    
    * `topicNames`: Names of the topics to connect to.

{% endlist %}

### Custom installation {#on-premise}

Connection to the Apache Kafka® cluster with explicitly specified network addresses and broker host ports.

{% list tabs group=instructions %}

- Management console {#console}

    * **Connection type**: Select a database connection option:
    
        * **Self-managed**: Allows you to specify connection settings manually.
    
            Select **Custom installation** as the installation type and configure these settings:
    
            * **Broker URLs**: Specify the IP addresses or FQDNs of the broker hosts.
              
              If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:
              
              ```text
              <broker_host_IP_address_or_FQDN>:<port_number>
              ```
            * **SSL**: Use encryption to protect the connection.
            * **PEM Certificate**: If transmitted data has to be be encrypted, e.g., to meet the PCI DSS, upload the [certificate](../../../../managed-kafka/operations/connect/index.md#get-ssl-cert) file or add its contents as text.
              
              {% note warning %}
              
              If no certificate is added, the transfer may [fail with an error](../../../troubleshooting/index.md#failed-to-connect).
              
              {% endnote %}
            * **Endpoint network interface**: Select or [create](../../../../vpc/operations/subnet-create.md) a subnet in the required [availability zone](../../../../overview/concepts/geo-scope.md). The transfer will use this subnet to access the source.
    
              If this field has a value specified for both endpoints, both subnets must be hosted in the same availability zone.
    
            * **Authentication**: Select the connection type (`SASL` or `No authentication`).
              
              If you select `SASL`:
              
              * **Username**: Specify the name of the account, under which Data Transfer will connect to the topic.
              * **Password**: Enter the account password.
              * **Mechanism**: Select the hashing mechanism (SHA 256 or SHA 512).
    
        * **Connection Manager**: Allows connecting to the database using [Yandex Connection Manager](../../../../metadata-hub/quickstart/connection-manager.md):
    
            * Select the folder where the Connection Manager connection was created.
            * Select **Custom installation** as the installation type and configure these settings:
    
              * **Connection**: Select or create a connection in Connection Manager.
              * **Subnet ID**: Select or [create](../../../../vpc/operations/subnet-create.md) a subnet in the required [availability zone](../../../../overview/concepts/geo-scope.md). The transfer will use this subnet to access the database.
    
                If this field has a value specified for both endpoints, both subnets must be hosted in the same availability zone.
    
    * **Topic full name**: Specify the name of the topic to connect to.
    * **Security groups**: Select the cloud network to host the endpoint and security groups for network traffic.
      
      Thus, you will be able to apply the specified security group rules to the VMs and clusters in the selected network without changing the settings of these VMs and clusters. For more information, see [Networking in Yandex Data Transfer](../../../concepts/network.md).

- Terraform {#tf}

    * Endpoint type: `kafka_source`.

    * `security_groups`: [Security groups](../../../../vpc/concepts/security-groups.md) for network traffic.
    
       Security group rules apply to a transfer. They allow opening network access from the transfer VM to the broker hosts. For more information, see [Networking in Yandex Data Transfer](../../../concepts/network.md).
    
       Security groups must belong to the same network as the `subnet_id` subnet, if the latter is specified.
    
       {% note info %}
       
       In Terraform, it is not required to specify a network for security groups.
       
       {% endnote %}
    
    
    * `connection.on_premise`: Parameters for connecting to the broker hosts:
    
       * `broker_urls`: IP addresses or FQDNs of the broker hosts.
         
         If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:
         
         ```text
         <broker_host_IP_address_or_FQDN>:<port_number>
         ```
       * `tls_mode`: Parameters for encrypting the data to transfer, if required, e.g., for compliance with the PCI DSS requirements.
         
         * `disabled`: Disabled.
         * `enabled`: Enabled.
             * `ca_certificate`: CA certificate.
         
               {% note warning %}
               
               If no certificate is added, the transfer may [fail with an error](../../../troubleshooting/index.md#failed-to-connect).
               
               {% endnote %}
       * `subnet_id`: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.
    
    * `auth`: Authentication method used to connect to broker hosts:
      
      * `sasl`: SASL authentication.
      * `no_auth`: Without authentication.
    
    * `topic_names`: Names of the topics to connect to.

    Here is an example of the configuration file structure:

    
    ```hcl
    resource "yandex_datatransfer_endpoint" "<endpoint_name_in_Terraform>" {
      name = "<endpoint_name>"
      settings {
        kafka_source {
          security_groups = ["<list_of_security_group_IDs>"]
          connection {
            on_premise {
              broker_urls = ["<list_of_IP_addresses_or_broker_host_FQDNs>"]
              subnet_id  = "<ID_of_subnet_with_broker_hosts>"
            }
          }
          auth = {
            <authentication_method>
          }
          topic_names = ["<topic_name_list>"]
          <endpoint_advanced_settings>
        }
      }
    }
    ```


    For more information, see [this Terraform provider guide](../../../../terraform/resources/datatransfer_endpoint.md).

- API {#api}

    * `securityGroups`: Network traffic security groups whose rules apply to VMs and clusters without altering their configurations. For more information, see [Networking in Yandex Data Transfer](../../../concepts/network.md).
    
    * `connection.onPremise`: Parameters for connecting to the broker hosts:
    
       * `brokerUrls`: IP addresses or FQDNs of the broker hosts.
         
         If the Apache Kafka® port number differs from the standard one, specify it with a colon after the host name:
         
         ```text
         <broker_host_IP_address_or_FQDN>:<port_number>
         ```
       * `tlsMode`: Parameters for encrypting the data to transfer, if required, e.g., for compliance with the PCI DSS requirements.
         
         * `disabled`: Disabled.
         * `enabled`: Enabled.
             * `caCertificate`: CA certificate.
         
               {% note warning %}
               
               If no certificate is added, the transfer may [fail with an error](../../../troubleshooting/index.md#failed-to-connect).
               
               {% endnote %}
       * `subnetId`: ID of the subnet hosting the broker hosts. The transfer will use this subnet to access them.
    
    * `auth`: Authentication method used to connect to broker hosts:
      
      * `sasl`: SASL authentication. The following parameters are required:
      
         * `user`: Name of the account Data Transfer will use to connect to the topic.
         * `password.raw`: Password for the account in text form.
         * `mechanism`: Hashing mechanism.
      
      * `noAuth`: Without authentication.
    
    * `topicNames`: Names of the topics to connect to.

{% endlist %}

### Advanced settings {#additional-settings}

Use advanced settings to specify transformation and conversion rules. Data is processed in the following order:

1. **Transformation**. Data in JSON format is provided to a [Yandex Cloud Functions](../../../../functions/index.md) function. The function body contains metadata and raw data from the queue. The function handles the data and sends it back to Data Transfer.

1. **Conversion**. Data is parsed as a preparation for delivery to the target.

If no transformation rules are set, parsing is applied to raw data from the queue. If no conversion rules are set, the data goes directly to the target.

{% list tabs group=instructions %}

- Management console {#console}

    
    * **Transformation rules**:

        * **Cloud function**: Select one of the functions created in Cloud Functions.

        
        * **Service account**: Select or [create](../../../../iam/operations/sa/create.md) a service account you are going to use to start the processing function.


        * **Number of retries**: Set the number of attempts to invoke the processing function.
        * **Buffer size for function**: Set the size of the buffer (in bytes) which when full data will be transferred to the processing function.

            The maximum buffer size is 3.5 MB. For more information about restrictions that apply to functions in Cloud Functions, see the [relevant section](../../../../functions/concepts/limits.md).

        * **Flush interval**: Set the interval (in seconds) to wait before transferring stream data to the processing function.

            {% note info %}

            If the buffer is full or the sending interval expires, data will be transferred to the processing function.

            {% endnote %}

        * **Invocation timeout**: Set the allowed timeout of the response from the processing function (in seconds).

        {% note warning %}

        Values in the **Flush interval** and **Invocation timeout** fields are specified with the `s` postfix, e.g., `10s`.

        {% endnote %}
    

    * **Conversion rules**:
      
         * **Data format**: Select one of the available formats:
             * `JSON`: JSON. To optimize throughput, the system supports single-line JSON messages, i.e., those without line breaks introduced with `\n`. Example: `{"attr": "value"}`.
             * `AuditTrails.v1 parser`: [Audit Trails](../../../../audit-trails/index.md) log format.
             * `CloudLogging parser`: [Cloud Logging](../../../../logging/index.md) log format.
             * `Debezium CDC parser`: Debezium CDC. It allows you to specify settings for connection to Schema Registry: `On Premise Schema Registry` for [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html), [namespace](../../../../metadata-hub/operations/list-name-space.md) ID for `Yandex Schema Registry`.
             * `Raw to table parser` allows saving whole messages "as is" to a separate table on the target.
      
                For **JSON**, specify:
      
                 * **Data scheme**: Specify the schema as a list of fields or upload a file with the description of the schema in JSON format.
                  
                 {% cut "Sample data schema" %}
                 ```json
                 [
                     {
                         "name": "request",
                         "type": "string"
                       }
                 ]
                 ```
                 {% endcut %}
          
                 * **Enable NULL values in keys**: Select this option to allow the `null` value in key columns.
                 * **Add a column for missing keys**: Select this option to add the fields missing in the schema to the `_rest` column.
                 * **Unescape string values**: Select this option to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.
      
               For **Debezium CDC**, specify the following:
                 * For `On Premise Schema Registry`: Schema Registry URL, authentication method (including username and password if authentication is used), and CA certificate.
                 * For `Yandex Schema Registry`: Schema registry [namespace](../../../../metadata-hub/operations/list-name-space.md) ID.
      
               For **Raw-to-table**, specify:
                 * **Target table name**: Name of the table the messages are saved to. By default, matches the the topic name.
                 * **Store message key**: Select this option to write the message key to a separate column.
                 * **Key type**: Select suitable format to write the key to the table.
                 * **Key type**: Select suitable format to write the value to the table.
                 * **Advanced settings**:
                   * **Store write timestamp**: Select this option to save message write time in a separate column.
                   * **Store message headers**: Select this option to save message headers in a separate column.
                   * **DLQ table name suffix**: Specify a suffix to append to the name of the separate undelivered message table according to the `<message_table_name>_<suffix>` pattern. The default suffix is `_dlq`.

- Terraform {#tf}

    
    * `transformer`: Transformation rules:


    * `parser`: Conversion rules which depend on the selected parser:
      
      * `audit_trails_v1_parser`: [Audit Trails](../../../../audit-trails/index.md) log parser.
      * `cloud_logging_parser`: [Cloud Logging](../../../../logging/index.md) log parser.
      * `json_parser` or `tskv_parser`: JSON and TSKV parsers, respectively.
      
         Both parsers use the same attributes to define parameters:
      
         * `data_schema`: Data schema represented either as a JSON object with a schema description or as a list of fields:
      
            * `fields`: Schema represented as a JSON object. The object contains an array of JSON objects that describe individual columns.
            * `json_fields`: Schema represented as a list of fields.
      
         * `null_keys_allowed`: Set to `true` to allow the `null` value in key columns.
         * `add_rest_column`: Set to `true` to add the fields missing in the schema to the `_rest` column.
         * `unescape_string_values`: Set to `true` to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.

- API {#api}

    
    * `transformer`: Transformation rules:
      
      * `cloudFunction`: ID of the function created in Cloud Functions.
      
      
      * `serviceAccountId`: ID of the service account you want to use to invoke the processing function.
      
      
      * `numberOfRetries`: Number of attempts to invoke the processing function.
      * `bufferSize`: Buffer size (in bytes) at which data will be transferred to the processing function.
      
          The maximum buffer size is 3.5 MB. For more information about restrictions that apply to functions in Cloud Functions, see the [relevant section](../../../../functions/concepts/limits.md).
      
      * `bufferFlushInterval`: Interval (in seconds) to wait before transferring stream data to the processing function.
      
          {% note info %}
      
          If the buffer is full or the sending interval expires, data will be transferred to the processing function.
      
          {% endnote %}
      
      * `invocationTimeout`: Allowed timeout of a processing function response, in seconds.
      
      {% note warning %}
      
      Specify values ​​for `bufferFlushInterval` and `invocationTimeout` with the `s` postfix, e.g., `10s`.
      
      {% endnote %}


    * `parser`: Conversion rules which depend on the selected parser:
      
      * `auditTrailsV1Parser`: [Audit Trails](../../../../audit-trails/index.md) log parser.
      
      * `cloudLoggingParser`: [Cloud Logging](../../../../logging/index.md) log parser.
      
      * `jsonParser` or `tskvParser`: JSON and TSKV parsers, respectively.
      
         Both parsers share the same parameter fields:
      
         * `dataSchema`: Data schema represented either as a JSON object with a schema description or as a list of fields:
      
            * `fields`: Schema represented as a JSON object. The object contains an array of JSON objects that describe individual columns.
      
            * `jsonFields`: Schema represented as a list of fields.
      
         * `nullKeysAllowed`: Set to `true` to allow the `null` value in key columns.
      
         * `addRestColumn`: Set to `true` to add the fields missing in the schema to the `_rest` column.
      
         * `unescapeStringValues`: Set to `true` to remove quotation marks from string variables. Otherwise, the string field values will remain unchanged.

{% endlist %}

## Configuring the data target {#supported-targets}

Configure one of the supported data targets:

* [PostgreSQL](../target/postgresql.md)
* [MySQL®](../target/mysql.md)
* [MongoDB](../target/mongodb.md)
* [ClickHouse®](../target/clickhouse.md)
* [Greenplum®](../target/greenplum.md)
* [Yandex Managed Service for YDB](../target/yandex-database.md)
* [Yandex Object Storage](../target/object-storage.md)
* [Apache Kafka®](../target/kafka.md)
* [YDS](../target/data-streams.md)
* [YTsaurus](yt.md)
* [OpenSearch](../target/opensearch.md).

For a complete list of supported sources and targets in Yandex Data Transfer, see [Available transfers](../../../transfer-matrix.md).

After configuring the data source and target, [create and start the transfer](../../transfer.md#create).