[Yandex Cloud documentation](../../index.md) > [Yandex Managed Service for Apache Kafka®](../index.md) > [Step-by-step guides](index.md) > Managing connectors

# Managing connectors

[Connectors](../concepts/connectors.md) manage the transfer of Apache Kafka® topics to a different cluster or data storage system.

You can:

* [Get a list of connectors](#list).
* [Get detailed information about a connector](#get).
* [Create a connector](#create) of the right type:
    * [MirrorMaker](#settings-mm2).
    * [S3 Sink](#settings-s3).
    * [Iceberg Sink](#settings-iceberg).
* [Edit a connector](#update).
* [Pause a connector](#pause).
* [Resume a connector](#resume).
* [Import a connector to Terraform](#import).
* [Delete a connector](#delete).

## Getting a list of connectors {#list}

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), navigate to the relevant folder.
    1. Navigate to **Managed Service for&nbsp;Kafka**.
    1. Select the cluster and open the **Connectors** tab.

- CLI {#cli}

    If you do not have the Yandex Cloud CLI yet, [install and initialize it](../../cli/quickstart.md#install).

    The folder used by default is the one specified when [creating](../../cli/operations/profile/profile-create.md) the CLI profile. To change the default folder, use the `yc config set folder-id <folder_ID>` command. You can also specify a different folder for any command using `--folder-name` or `--folder-id`. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.

    To get the list of cluster connectors, run this command:

    ```bash
    yc managed-kafka connector list --cluster-name=<cluster_name>
    ```

    Result:

    ```text
    +--------------+-----------+
    |     NAME     | TASKS MAX |
    +--------------+-----------+
    | connector559 |         1 |
    | ...          |           |
    +--------------+-----------+
    ```

    You can get the cluster name with the [list of clusters in the folder](cluster-list.md#list-clusters).

- REST API {#api}

  1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

     ```bash
     export IAM_TOKEN="<IAM_token>"
     ```

  1. Call the [Connector.list](../api-ref/Connector/list.md) method, e.g., via the following [cURL](https://curl.se/) request:

     ```bash
     curl \
       --request GET \
       --header "Authorization: Bearer $IAM_TOKEN" \
       --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors'
     ```

     You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters).

  1. Check the [server response](../api-ref/Connector/list.md#yandex.cloud.mdb.kafka.v1.ListConnectorsResponse) to make sure your request was successful.

- gRPC API {#grpc-api}

  1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

     ```bash
     export IAM_TOKEN="<IAM_token>"
     ```

  1. Clone the [cloudapi](https://github.com/yandex-cloud/cloudapi) repository:
     
     ```bash
     cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
     ```
     
     Below, we assume that the repository contents reside in the `~/cloudapi/` directory.
  1. Call the [ConnectorService/List](../api-ref/grpc/Connector/list.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

     ```bash
     grpcurl \
       -format json \
       -import-path ~/cloudapi/ \
       -import-path ~/cloudapi/third_party/googleapis/ \
       -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
       -rpc-header "Authorization: Bearer $IAM_TOKEN" \
       -d '{
             "cluster_id": "<cluster_ID>"
           }' \
       mdb.api.cloud.yandex.net:443 \
       yandex.cloud.mdb.kafka.v1.ConnectorService.List
     ```

     You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters).

  1. Check the [server response](../api-ref/grpc/Connector/list.md#yandex.cloud.mdb.kafka.v1.ListConnectorsResponse) to make sure your request was successful.

{% endlist %}

## Getting detailed information about a connector {#get}

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), navigate to the relevant folder.
    1. Navigate to **Managed Service for&nbsp;Kafka**.
    1. Select the cluster and open the **Connectors** tab.
    1. Click the connector name.

- CLI {#cli}

    If you do not have the Yandex Cloud CLI yet, [install and initialize it](../../cli/quickstart.md#install).

    The folder used by default is the one specified when [creating](../../cli/operations/profile/profile-create.md) the CLI profile. To change the default folder, use the `yc config set folder-id <folder_ID>` command. You can also specify a different folder for any command using `--folder-name` or `--folder-id`. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.

    To get detailed information about a connector, run this command:

    ```bash
    yc managed-kafka connector get <connector_name>\
       --cluster-name=<cluster_name>
    ```

    Result:

    ```text
    name: connector785
    tasks_max: "1"
    cluster_id: c9qbkmoiimsl********
    ...
    ```

    You can get the connector name with the [list of cluster connectors](#list), and the cluster name, with the [list of clusters in the folder](cluster-list.md#list-clusters).

- REST API {#api}

  1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

     ```bash
     export IAM_TOKEN="<IAM_token>"
     ```

  1. Call the [Connector.get](../api-ref/Connector/get.md) method, e.g., via the following [cURL](https://curl.se/) request:

     ```bash
     curl \
       --request GET \
       --header "Authorization: Bearer $IAM_TOKEN" \
       --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>'
     ```

     You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

  1. Check the [server response](../api-ref/Connector/get.md#yandex.cloud.mdb.kafka.v1.Connector) to make sure your request was successful.

- gRPC API {#grpc-api}

  1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

     ```bash
     export IAM_TOKEN="<IAM_token>"
     ```

  1. Clone the [cloudapi](https://github.com/yandex-cloud/cloudapi) repository:
     
     ```bash
     cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
     ```
     
     Below, we assume that the repository contents reside in the `~/cloudapi/` directory.
  1. Call the [ConnectorService/Get](../api-ref/grpc/Connector/get.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

     ```bash
     grpcurl \
       -format json \
       -import-path ~/cloudapi/ \
       -import-path ~/cloudapi/third_party/googleapis/ \
       -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
       -rpc-header "Authorization: Bearer $IAM_TOKEN" \
       -d '{
             "cluster_id": "<cluster_ID>"
           }' \
       mdb.api.cloud.yandex.net:443 \
       yandex.cloud.mdb.kafka.v1.ConnectorService.Get
     ```

     You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters).

  1. Check the [server response](../api-ref/grpc/Connector/get.md#yandex.cloud.mdb.kafka.v1.Connector) to make sure your request was successful.

{% endlist %}

## Creating a connector {#create}

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), navigate to the relevant folder.
    1. Navigate to **Managed Service for&nbsp;Kafka**.
    1. Select the cluster and open the **Connectors** tab.
    1. Click **Create connector**.
    1. Under **Basic parameters**, specify:

        * Connector name.
        * Task limit: Number of concurrent tasks. To distribute replication load evenly, we recommend a value of at least `2`.

    1. Under **Additional properties**, specify the connector properties in the following format:

        ```text
        <key>:<value>
        ```

        The key can either be a simple string or include a prefix that indicates whether it belongs to the source or target (a cluster alias in the connector configuration):

        ```text
        <cluster_alias>.<key_body>:<value>
        ```

    1. Select the connector type, [MirrorMaker](#settings-mm2), [S3 Sink](#settings-s3), or [Iceberg Sink](#settings-iceberg), and set up its configuration.

        For more information about the supported connector types, see [Connectors](../concepts/connectors.md).

    1. Click **Create**.

- CLI {#cli}

  If you do not have the Yandex Cloud CLI yet, [install and initialize it](../../cli/quickstart.md#install).

  The folder used by default is the one specified when [creating](../../cli/operations/profile/profile-create.md) the CLI profile. To change the default folder, use the `yc config set folder-id <folder_ID>` command. You can also specify a different folder for any command using `--folder-name` or `--folder-id`. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.

  To create a [MirrorMaker](#settings-mm2) connector:

  1. See the description of the CLI command for creating a connector:

      ```bash
      yc managed-kafka connector-mirrormaker create --help
      ```

  1. Create a connector:

      ```bash
      yc managed-kafka connector-mirrormaker create <connector_name> \
         --cluster-name=<cluster_name> \
         --direction=<connector_direction> \
         --tasks-max=<task_limit> \
         --properties=<advanced_properties> \
         --replication-factor=<replication_factor> \
         --topics=<topic_pattern> \
         --this-cluster-alias=<this_cluster_prefix> \
         --external-cluster alias=<external_cluster_prefix>,`
                           `bootstrap-servers=<list_of_broker_host_FQDNs>,`
                           `security-protocol=<security_protocol>,`
                           `sasl-mechanism=<authentication_mechanism>,`
                           `sasl-username=<username>,`
                           `sasl-password=<user_password>,`
                           `ssl-truststore-certificates=<certificates_in_PEM_format>
      ```

      To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

      You can get the cluster name with the [list of clusters in the folder](cluster-list.md#list-clusters).

      `--direction` takes these values:

       * `egress`: If the current cluster is a source cluster.
       * `ingress`: If the current cluster is a target cluster.

  To create an [S3 Sink](#settings-s3) connector:

  1. View the description of the CLI command to create a connector:

      ```bash
      yc managed-kafka connector-s3-sink create --help
      ```

  1. Create a connector:

      ```bash
      yc managed-kafka connector-s3-sink create <connector_name> \
         --cluster-name=<cluster_name> \
         --tasks-max=<task_limit> \
         --properties=<advanced_properties> \
         --topics=<topic_pattern> \
         --file-compression-type=<compression_codec> \
         --file-max-records=<file_max_records> \
         --bucket-name=<bucket_name> \
         --access-key-id=<AWS_compatible_static_key_ID> \
         --secret-access-key=<AWS_compatible_static_key_contents> \
         --storage-endpoint=<S3_compatible_storage_endpoint> \
         --region=<S3_compatible_storage_region>
      ```

     You can get the cluster name with the [list of clusters in the folder](cluster-list.md#list-clusters).

  To create an [Iceberg Sink](#settings-iceberg) connector:

  1. See the description of the CLI command for creating a connector:

      ```bash
      yc managed-kafka connector-iceberg-sink create --help
      ```

  1. Create a connector:

      ```bash
      yc managed-kafka connector-iceberg-sink create <connector_name> \
         --cluster-name=<cluster_name> \
         --tasks-max=<task_limit> \
         --properties=<advanced_properties> \
         --topics=<topic_pattern> \
         --file-compression-type=<compression_codec> \
         --file-max-records=<file_max_records> \
         --bucket-name=<bucket_name> \
         --access-key-id=<AWS_compatible_static_key_ID> \
         --secret-access-key=<AWS_compatible_static_key_contents> \
         --storage-endpoint=<S3_compatible_storage_endpoint> \
         --region=<S3_compatible_storage_region>
      ```

     You can get the cluster name with the [list of clusters in the folder](cluster-list.md#list-clusters).

- Terraform {#tf}

    1. Check the list of [MirrorMaker](#settings-mm2) and [S3 Sink](#settings-s3) connector settings.

    1. Open the current Terraform configuration file describing your infrastructure.

        For information about creating this file, see [Creating a cluster Apache Kafka®](cluster-create.md).

    1. To create a MirrorMaker connector, add the `yandex_mdb_kafka_connector` resource with the `connector_config_mirrormaker` configuration section:

        ```hcl
        resource "yandex_mdb_kafka_connector" "<connector_name>" {
          cluster_id = "<cluster_ID>"
          name       = "<connector_name>"
          tasks_max  = <task_limit>
          properties = {
            <advanced_properties>
          }
          connector_config_mirrormaker {
            topics             = "<topic_pattern>"
            replication_factor = <replication_factor>
            source_cluster {
              alias = "<cluster_prefix>"
              external_cluster {
                bootstrap_servers           = "<list_of_broker_host_FQDNs>"
                sasl_username               = "<username>"
                sasl_password               = "<user_password>"
                sasl_mechanism              = "<authentication_mechanism>"
                security_protocol           = "<security_protocol>"
                ssl-truststore-certificates = "<PEM_certificate_contents>"
              }
            }
            target_cluster {
              alias = "<cluster_prefix>"
              this_cluster {}
            }
          }
        }
        ```

        To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

    1. To create an S3 Sink connector, add the `yandex_mdb_kafka_connector` resource with the `connector_config_s3_sink` configuration section:

        ```hcl
        resource "yandex_mdb_kafka_connector" "<connector_name>" {
          cluster_id = "<cluster_ID>"
          name       = "<connector_name>"
          tasks_max  = <task_limit>
          properties = {
            <advanced_properties>
          }
          connector_config_s3_sink {
            topics                = "<topic_pattern>"
            file_compression_type = "<compression_codec>"
            file_max_records      = <file_max_records>
            s3_connection {
              bucket_name = "<bucket_name>"
              external_s3 {
                endpoint          = "<S3_compatible_storage_endpoint>"
                access_key_id     = "<AWS_compatible_static_key_ID>"
                secret_access_key = "<AWS_compatible_static_key_contents>"
              }
            }
          }
        }
        ```
    
    1. To create an Iceberg Sink connector, add the `yandex_mdb_kafka_connector` resource with the `connector_config_iceberg_sink` configuration section:
       
       ```hcl
       resource "yandex_mdb_kafka_connector" "<connector_name>" {
          cluster_id = "<cluster_ID>"
          name       = "<connector_name>"
          tasks_max  = <task_limit>
          properties = {
            <advanced_properties>
          }
          connector_config_iceberg_sink {
            topics        = "<comma_separated_list_of_topics>"
            control_topic = "<management_topic_name>"

            metastore_connection {
              catalog_uri = "<URI_for_connecting_to_Metastore_cluster>"
              warehouse   = "<root_directory_for_storing_managed_table_data_in_S3>"
            }

            s3_connection {
              external_s3 {
                endpoint          = "<S3_compatible_storage_endpoint>"
                access_key_id     = "<AWS_compatible_static_key_ID>"
                secret_access_key = "<AWS_compatible_static_key_contents>"
                region            = "<region_name>"
             }
            }

            static_tables {
              tables = "comma_separated_table_names"
            }

            tables_config {
              default_commit_branch    = "<default_branch_name>"
              default_id_columns       = "<comma_separated_default_column_list>"
              default_partition_by     = "<list_of_columns_or_transformation_expressions>"
              evolve_schema_enabled    = <automatically_update_Iceberg_table_schema>
              schema_force_optional    = <make_Iceberg_table_schema_fields_optional>
              schema_case_insensitive  = <ignore_case_when_matching_fields>
            }

            control_config {
              group_id_prefix      = "<prefix_for_Consumer_Group_ID>"
              commit_interval_ms   = <Iceberg_table_data_commit_interval>
              commit_timeout_ms    = <how_long_the_coordinator_waits_for_confirmation>
              commit_threads       = <number_of_threads_for_committing_data_to_Iceberg_table>
              transactional_prefix = "<prefix_for_Transactional_ID>"
            }
          }
       }
       ```

    1. Make sure the settings are correct.

        1. In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
        1. Run this command:
        
           ```bash
           terraform validate
           ```
        
           Terraform will show any errors found in your configuration files.

    1. Confirm resource changes.

        1. Run this command to view the planned changes:
        
           ```bash
           terraform plan
           ```
        
           If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
        
        1. If everything looks correct, apply the changes:
           1. Run this command:
        
              ```bash
              terraform apply
              ```
        
           1. Confirm updating the resources.
           1. Wait for the operation to complete.

    For more information, see [this Terraform provider guide](../../terraform/resources/mdb_kafka_connector.md).

- REST API {#api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. To create a [MirrorMaker](#settings-mm2) connector, call the [Connector.create](../api-ref/Connector/create.md) method, e.g., via the following [cURL](https://curl.se/) request:

       ```bash
       curl \
         --request POST \
         --header "Authorization: Bearer $IAM_TOKEN" \
         --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors' \
         --data '{
                   "connectorSpec": {
                     "name": "<connector_name>",
                     "tasksMax": "<task_limit>"
                     "properties": "<advanced_connector_properties>"
                     "connectorConfigMirrormaker": {
                       <Mirrormaker_connector_settings>
                     }
                   }
                 }'
       ```

       You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters).

    1. To create an [S3 Sink](#settings-s3) connector, call the [Connector.create](../api-ref/Connector/create.md) method, e.g., via the following [cURL](https://curl.se/) request:

       ```bash
       curl \
         --request POST \
         --header "Authorization: Bearer $IAM_TOKEN" \
         --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors' \
         --data '{
                   "connectorSpec": {
                     "name": "<connector_name>",
                     "tasksMax": "<task_limit>"
                     "properties": "<advanced_connector_properties>"
                     "connectorConfigS3Sink": {
                       <S3_Sink_connector_settings>
                     }
                   }
                 }'
       ```

       You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters).

    1. Check the [server response](../api-ref/Connector/list.md#yandex.cloud.operation.Operation) to make sure your request was successful.

- gRPC API {#grpc-api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Clone the [cloudapi](https://github.com/yandex-cloud/cloudapi) repository:
       
       ```bash
       cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
       ```
       
       Below, we assume that the repository contents reside in the `~/cloudapi/` directory.

    1. To create a [MirrorMaker](#settings-mm2) connector, call the [ConnectorService/Create](../api-ref/grpc/Connector/create.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

        ```bash
        grpcurl \
          -format json \
          -import-path ~/cloudapi/ \
          -import-path ~/cloudapi/third_party/googleapis/ \
          -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
          -rpc-header "Authorization: Bearer $IAM_TOKEN" \
          -d '{
                "cluster_id": "<cluster_ID>",
                "connector_spec": {
                  "name": "<connector_name>",
                  "tasks_max": {
                    "value": "<task_limit>"
                  },
                  "properties": "<advanced_connector_properties>"
                  "connector_config_mirrormaker": {
                    <Mirrormaker_connector_settings>
                  }
                }
              }' \
          mdb.api.cloud.yandex.net:443 \
          yandex.cloud.mdb.kafka.v1.ConnectorService.Create
        ```

        You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters).

    1. To create an [S3 Sink](#settings-s3) connector, call the [ConnectorService/Create](../api-ref/grpc/Connector/create.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

        ```bash
        grpcurl \
          -format json \
          -import-path ~/cloudapi/ \
          -import-path ~/cloudapi/third_party/googleapis/ \
          -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
          -rpc-header "Authorization: Bearer $IAM_TOKEN" \
          -d '{
                "cluster_id": "<cluster_ID>",
                "connector_spec": {
                  "name": "<connector_name>",
                  "tasks_max": {
                    "value": "<task_limit>"
                  },
                  "properties": "<advanced_connector_properties>"
                  "connector_config_s3_sink": {
                    <S3_Sink_connector_settings>
                  }
                }
              }' \
          mdb.api.cloud.yandex.net:443 \
          yandex.cloud.mdb.kafka.v1.ConnectorService.Create
        ```

        You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters).

    1. Check the [server response](../api-ref/grpc/Connector/create.md#yandex.cloud.operation.Operation) to make sure your request was successful.

{% endlist %}

### MirrorMaker {#settings-mm2}

Specify the MirrorMaker connector parameters as follows:

{% list tabs group=instructions %}

- Management console {#console}

  * **Topics**: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To replicate all topics, specify `.*`.
  * **Replication factor**: Number of replicas the cluster stores for each topic.
  * Under **Source cluster**, specify the parameters for connecting to the source cluster:
    * **Alias**: Source cluster prefix in the connector settings.

      {% note info %}

      Topics in the target cluster will be created with the specified prefix.

      {% endnote %}

    * **Use this cluster**: Select this option to use the current cluster as the source.
    * **Bootstrap servers**: Comma-separated list of the FQDNs of the source cluster broker hosts with the port numbers for connection, e.g., `broker1.example.com:9091,broker2.example.com`.

       To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

    * **SASL username**: Username for the connector to access the source cluster.
    * **SASL password**: User password for the connector to access the source cluster.
    * **SASL mechanism**: Authentication mechanism for username and password validation.
    * **Security protocol**: Select the connection protocol for the connector:
      * `PLAINTEXT`, `SASL_PLAINTEXT`: To connect without SSL.
      * `SSL`, `SASL_SSL`: To connect with SSL.
    * **Certificate in PEM format**: Upload a PEM certificate to access the external cluster.

  * Under **Target cluster**, specify the parameters for connecting to the target cluster:
    * **Alias**: Target cluster prefix in the connector settings.
    * **Use this cluster**: Select this option to use the current cluster as the target.
    * **Bootstrap servers**: Comma-separated list of the FQDNs of the target cluster broker hosts with the port numbers for connection.

       To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

    * **SASL username**: Username for the connector to access the target cluster.
    * **SASL password**: User password for the connector to access the target cluster.
    * **SASL mechanism**: Authentication mechanism for username and password validation.
    * **Security protocol**: Select the connection protocol for the connector:
      * `PLAINTEXT`, `SASL_PLAINTEXT`: To connect without SSL.
      * `SSL`, `SASL_SSL`: To connect with SSL.
    * **Certificate in PEM format**: Upload a PEM certificate to access the external cluster.

  * To specify additional settings not listed above, create the relevant keys and set their values under **Additional properties** when [creating](#create) or [updating](#update) the connector. Here are some examples of keys:

    * `key.converter`
    * `value.converter`

    For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).

- CLI {#cli}

    * `--cluster-name`: Cluster name.
    * `--direction`: Connector direction:

        * `ingress`: For a target cluster.
        * `egress`: For a source cluster.

    * `--tasks-max`: Maximum number of concurrently running connector tasks.
    * `--properties`: Comma-separated list of additional connector settings in `<key>:<value>` format. Here are some examples of keys:

        * `key.converter`
        * `value.converter`

        For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).

    * `--replication-factor`: Number of replicas the cluster stores for each topic.
    * `--topics`: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
    * `--this-cluster-alias`: This cluster prefix in the connector settings.
    * `--external-cluster`: External cluster parameters:

        * `alias`: External cluster prefix in the connector settings.
        * `bootstrap-servers`: Comma-separated list of the FQDNs of the external cluster broker hosts with the port numbers for connection.

           To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

        * `security-protocol`: Connection protocol for the connector:

            * `plaintext`, `sasl_plaintext`: To connect without SSL.
            * `ssl`, `sasl_ssl`: To connect with SSL.

        * `sasl-mechanism`: Authentication mechanism for username and password validation.
        * `sasl-username`: Username for the connector to access the external cluster.
        * `sasl-password`: User password for the connector to access the external cluster.
        * `ssl-truststore-certificates`: List of PEM certificates.

- Terraform {#tf}

  * **properties**: Comma-separated list of additional connector settings in `<key>:<value>` format. Here are some examples of keys:

     * `key.converter`
     * `value.converter`

     For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).
  
   * **connector_config_mirrormaker**: MirrorMaker connector settings:
      
      * **replication_factor**: Number of replicas the cluster stores for each topic.
      * **topics**: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
      * **source_cluster** and **target_cluster**: Parameters for connecting to the source and target clusters:
         * **alias**: Cluster prefix in the connector settings.

            {% note info %}

            Topics in the target cluster will be created with the specified prefix.

            {% endnote %}

         * **external_cluster**: Parameters for connecting to the external cluster:
            * **bootstrap_servers**: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.

               To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

            * **sasl_username**: Username for the connector to access the cluster.
            * **sasl_password**: User password for the connector to access the cluster.
            * **sasl_mechanism**: Authentication mechanism for username and password validation.
            * **security_protocol**: Connection protocol for the connector:
               * `PLAINTEXT`, `SASL_PLAINTEXT`: To connect without SSL.
               * `SSL`, `SASL_SSL`: To connect with SSL.
            * **ssl_truststore_certificates**: PEM certificate contents.
         * **this_cluster**: Option to use the current cluster as the source or target.

- REST API {#api}

    To configure the MirrorMaker connector, use the `connectorSpec.connectorConfigMirrormaker` parameter:

    * `sourceCluster` and `targetCluster`: Parameters for connecting to the source and target clusters:

        * `alias`: Cluster prefix in the connector settings.

            {% note info %}

            Topics in the target cluster will be created with the specified prefix.

            {% endnote %}

        * `thisCluster`: Option to use the current cluster as the source or target.

        * `externalCluster`: Parameters for connecting to the external cluster:

            * `bootstrapServers`: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.

                To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

            * `saslUsername`: Username for the connector to access the cluster.
            * `saslPassword`: User password for the connector to access the cluster.
            * `saslMechanism`: Authentication mechanism for username and password validation.
            * `securityProtocol`: Connection protocol for the connector:
                * `PLAINTEXT`, `SASL_PLAINTEXT`: To connect without SSL.
                * `SSL`, `SASL_SSL`: To connect with SSL.
            * `sslTruststoreCertificates`: PEM certificate contents.

    * `topics`: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
    * `replicationFactor`: Number of replicas the cluster stores for each topic.

- gRPC API {#grpc-api}

    To configure the MirrorMaker connector, use the `connector_spec.connector_config_mirrormaker` parameter:

    * `source_cluster` and `target_cluster`: Parameters for connecting to the source and target clusters:

        * `alias`: Cluster prefix in the connector settings.

            {% note info %}

            Topics in the target cluster will be created with the specified prefix.

            {% endnote %}

        * `this_cluster`: Option to use the current cluster as the source or target.

        * `external_cluster`: Parameters for connecting to the external cluster:

            * `bootstrap_servers`: Comma-separated list of the FQDNs of the cluster broker hosts with the port numbers for connection.

                To learn how to get a broker host FQDN, see [this guide](connect/index.md#get-fqdn).

            * `sasl_username`: Username for the connector to access the cluster.
            * `sasl_password`: User password for the connector to access the cluster.
            * `sasl_mechanism`: Authentication mechanism for username and password validation.
            * `security_protocol`: Connection protocol for the connector:
                * `PLAINTEXT`, `SASL_PLAINTEXT`: To connect without SSL.
                * `SSL`, `SASL_SSL`: To connect with SSL.
            * `ssl_truststore_certificates`: PEM certificate contents.

    * `topics`: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
    * `replication_factor`: Number of replicas the cluster stores for each topic, provided as an object with the `value` field.

{% endlist %}

### S3 Sink {#settings-s3}

Specify the S3 Sink connector parameters as follows:

{% list tabs group=instructions %}

- Management console {#console}

  * **Topics**: Pattern for selecting topics to export. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
  * **Compression type**: Message compression codec:

      * `none` (default): No compression
      * `gzip`: [gzip](https://www.gzip.org/) codec
      * `snappy`: [snappy](https://github.com/google/snappy) codec
      * `zstd`: [zstd](https://facebook.github.io/zstd/) codec

      You cannot change this setting after the cluster is created.

  * **Max record per file**: Maximum number of records that can be written to a single file in an S3-compatible storage. This is an optional setting.
  * Under **S3 connection**, specify the storage connection parameters:
      * **Bucket**: Storage bucket name.
      * **Endpoint**: Endpoint for storage access. Get it from your storage provider.
      * **Region**: Region name. This is an optional setting. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

          {% note info %}
          
          Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
          
          {% endnote %}

      
      * **Access key ID**, **Secret access key**: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md). This is an optional setting.


  * To specify additional settings not listed above, create the relevant keys and set their values under **Additional properties** when [creating](#create) or [updating](#update) the connector. Here are some examples of keys:

      * `key.converter`
      * `value.converter`
      * `value.converter.schemas.enable`
      * `format.output.type`

      For the list of all connector settings, see [this connector guide](https://github.com/aiven/s3-connector-for-apache-kafka). For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).

- CLI {#cli}

    * `--cluster-name`: Cluster name.
    * `--tasks-max`: Maximum number of concurrently running connector tasks.
    * `--properties`: Comma-separated list of additional connector settings in `<key>:<value>` format. Here are some examples of keys:

      * `key.converter`
      * `value.converter`
      * `value.converter.schemas.enable`
      * `format.output.type`

      For the list of all connector settings, see [this connector guide](https://github.com/aiven/s3-connector-for-apache-kafka). For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).

    * `--topics`: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
    * `--file-compression-type`: Message compression codec. You cannot change this setting after the cluster is created. Valid values:

        * `none` (default): No compression
        * `gzip`: [gzip](https://www.gzip.org/) codec
        * `snappy`: [snappy](https://github.com/google/snappy) codec
        * `zstd`: [zstd](https://facebook.github.io/zstd/) codec

    * `--file-max-records`: Maximum number of records that can be written to a single file in an S3-compatible storage.
    * `--bucket-name`: Name of the S3-compatible storage bucket to write data to.
    * `--storage-endpoint`: Endpoint for storage access (get it from your storage provider), e.g., `storage.yandexcloud.net`.
    * `--region`: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

        {% note info %}
        
        Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
        
        {% endnote %}

    
    * `--access-key-id`, `--secret-access-key`: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).


- Terraform {#tf}

  * **properties**: Comma-separated list of additional connector settings in `<key>:<value>` format. Here are some examples of keys:

     * `key.converter`
     * `value.converter`
     * `value.converter.schemas.enable`
     * `format.output.type`

   For the list of all connector settings, see [this connector guide](https://github.com/aiven/s3-connector-for-apache-kafka). For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).

  * **connector_config_s3_sink**: S3 Sink connector settings:
     * **file_compression_type**: Message compression codec. You cannot change this setting after the cluster is created. Valid values:

        * `none` (default): No compression
        * `gzip`: [gzip](https://www.gzip.org/) codec
        * `snappy`: [snappy](https://github.com/google/snappy) codec
        * `zstd`: [zstd](https://facebook.github.io/zstd/) codec

     * **topics**: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
     * **file_max_records**: Maximum number of records that can be written to a single file in an S3-compatible storage.
     * **s3_connection**: S3-compatible storage connection parameters:

        * **bucket_name**: Name of the bucket to write data to.
        * **external_s3**: External S3-compatible storage connection parameters:

            * **endpoint**: Endpoint for storage access (get it from your storage provider), e.g., `storage.yandexcloud.net`.
            * **region**: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

                {% note info %}
                
                Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
                
                {% endnote %}

            
            * **access_key_id**, **secret_access_key**: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).


- REST API {#api}

    To configure the S3 Sink connector, use the `connectorSpec.connectorConfigS3Sink` parameter:

    * `topics`: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
    * `fileCompressionType`: Message compression codec. You cannot change this setting after the cluster is created. Valid values:

        * `none` (default): No compression
        * `gzip`: [gzip](https://www.gzip.org/) codec
        * `snappy`: [snappy](https://github.com/google/snappy) codec
        * `zstd`: [zstd](https://facebook.github.io/zstd/) codec

    * `fileMaxRecords`: Maximum number of records that can be written to a single file in an S3-compatible storage.
    * `s3Connection`: S3-compatible storage connection parameters:
        * `bucketName`: Name of the bucket to write data to.
        * `externalS3`: External storage parameters:
            * `endpoint`: Endpoint for storage access (get it from your storage provider), e.g., `storage.yandexcloud.net`.
            * `region`: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

                {% note info %}
                
                Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
                
                {% endnote %}

            
            * `accessKeyId`, `secretAccessKey`: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).


- gRPC API {#grpc-api}

    To configure the S3 Sink connector, use the `connector_spec.connector_config_s3_sink` parameter:

    * `topics`: Pattern for selecting topics to replicate. List topic names separated by commas or `|`. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
    * `file_compression_type`: Message compression codec. You cannot change this setting after the cluster is created. Valid values:

        * `none` (default): No compression
        * `gzip`: [gzip](https://www.gzip.org/) codec
        * `snappy`: [snappy](https://github.com/google/snappy) codec
        * `zstd`: [zstd](https://facebook.github.io/zstd/) codec

    * `file_max_records`: Maximum number of records that can be written to a single file in an S3-compatible storage. provided as an object with the `value` field.
    * `s3_connection`: S3-compatible storage connection parameters:
        * `bucket_name`: Name of the bucket to write data to.
        * `external_s3`: External storage parameters:
            * `endpoint`: Endpoint for storage access (get it from your storage provider), e.g., `storage.yandexcloud.net`.
            * `region`: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

                {% note info %}
                
                Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
                
                {% endnote %}

            
            * `access_key_id`, `secret_access_key`: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).


{% endlist %}

### Iceberg Sink {#settings-iceberg}

Specify the Iceberg Sink connector parameters as follows:

{% list tabs group=instructions %}

- Management console {#console}

  * **Control topic**: Select or create a management topic. This topic will be used for coordination and managing the data writing process to Iceberg tables.
  * **Topic source**: Select the topic source from which the data will be transferred to Iceberg tables:
     * **Topic list**: Comma-separated topic names.
     * **Topic Regex**: A regular expression for selecting topics. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
  * **Table routing**: Select the rule for routing each message from an Apache Kafka® topic to Iceberg tables:
     * **Static**: Destination tables are predetermined. Each topic, with all its messages, will be routed to a separate Iceberg table. 
        
        In the **Tables** field, list the names of the Iceberg tables separated by commas.
    
     * **Dynamic**: The destination table is determined by the content of the message itself.
       
       In the **Route field** field, specify the field in the message whose value determines the target table.
  * Under **Metastore connection**, specify the Apache Hive™ Metastore connection properties:
     * **Catalog URI**: URI for connection to the Apache Hive™ Metastore cluster in `thrift://<host>:<port>` format.
     * **Warehouse**: Root directory for storing managed table data in S3 in `s3a://bucket-name/path/to/warehouse` format.
  * Under **S3 connection**, specify the storage connection parameters:
     * **Endpoint**: Endpoint for storage access. Get it from your storage provider.
     * **Region**: Region name. This is an optional setting. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

        {% note info %}
        
        Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
        
        {% endnote %}

     * **Access key ID**, **Secret access key**: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md). 

  * Optionally, under **Optional settings**:
     * Section **Table settings**:
        * The default branch name is **Default commit branch**. The connector will commit data to this branch of the Iceberg table. The default value is `main`.
        * **Default column list**: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.
        * **Default partition**: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. Defines the physical placement of data to optimize queries, e.g., `date`, `year`, `month`, `year (timestamp)`, `month (timestamp)`, `days (timestamp)`, and `bucket (16, user_id)`.
        * **Enable automatic schema evolution**: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes.
        * **Force all columns to be nullable**: This setting indicates whether to make all fields of the Iceberg table schema nullable (`nullable`), regardless of how they are defined in the incoming message schema.
        * **Enable case-insensitive field name matching**: This setting specifies whether the connector should ignore case when matching fields of the incoming message to columns of the Iceberg table.

     * Section **Control settings**:
        * **Consumer group prefix**: Prefix for the `Consumer Group ID` that the connector uses when reading from Apache Kafka® topics. The default value is `cg-control`.
        * **Commit interval, ms**: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is `300000`.
        * **Commit timeout, ms**: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is `30000`.
        * **Commit threads**: Number of threads used to commit data to the Iceberg table.
        * **Transactional prefix**: Prefix for `Transactional ID` that the connector uses when writing to Apache Kafka® within transactions.

- CLI {#cli}

  * `--cluster-id`: Cluster ID.
  * `--cluster-name`: Cluster name.
  * `--tasks-max`: Maximum number of concurrently running connector tasks.
  * `--properties`: Comma-separated list of additional connector settings in `<key>:<value>` format. Here are some examples of keys:

     * `key.converter`
     * `value.converter`
     * `value.converter.schemas.enable`

     For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).

  * `--topics`: Comma-separated list of topics whose data will be transferred to Iceberg tables.
  * `--topics-regex`: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
  * `--control-topic`: Name of the management topic used for coordination and managing the data writing process to Iceberg tables.
  * `--catalog-uri`: URI for connection to the Apache Hive™ Metastore cluster in `thrift://<host>:<port>` format.
  * `--warehouse`: Root directory for storing managed table data in S3 in `s3a://bucket-name/path/to/warehouse` format.
  * `--access-key-id`, `--secret-access-key`: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).
  * `--storage-endpoint`: Endpoint for storage access (get it from your storage provider), e.g., `storage.yandexcloud.net`.
  * `--region`: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

     {% note info %}
     
     Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
     
     {% endnote %}
  
  * `--tables`: Comma-separated names of Iceberg tables for static table routing.
  * `--route-field`: Field in the message that determines the target table for dynamic routing.
  * The default branch name is `--default-commit-branch`. The connector will commit data to this branch of the Iceberg table. The default value is `main`.
  * `--default-id-columns`: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.
  * `--default-partition-by`: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. Defines the physical placement of data to optimize queries, e.g., `date`, `year`, `month`, `year (timestamp)`, `month (timestamp)`, `days (timestamp)`, and `bucket (16, user_id)`.
  * `--evolve-schema-enabled`: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value is `false`.
  * `--schema-force-optional`: This setting indicates whether to make all fields of the Iceberg table schema nullable (`nullable`), regardless of how they are defined in the incoming message schema. The default value is `false`.
  * `--schema-case-insensitive`: This setting specifies whether the connector should ignore case when matching fields of the incoming message to columns of the Iceberg table. The default value is `false`.
  * `--group-id-prefix`: Prefix for the `Consumer Group ID` that the connector uses when reading from Apache Kafka® topics. The default value is `cg-control`.
  * `--commit-interval-ms`: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is `300000`.
  * `--commit-timeout-ms`: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is `30000`.
  * `--commit-threads`: Number of threads used to commit data to the Iceberg table. The default value is `vCPU × 2`.
  * `--transactional-prefix`: Prefix for `Transactional ID` that the connector uses when writing to Apache Kafka® within transactions.

- Terraform {#tf}

  * **properties**: Comma-separated list of additional connector settings in `<key>:<value>` format. Here are some examples of keys:

     * `key.converter`
     * `value.converter`
     * `value.converter.schemas.enable`

     For the list of general connector settings, see [this Apache Kafka® guide](https://kafka.apache.org/42/configuration/kafka-connect-configs/).

  * **tasks_max**: Maximum number of concurrently running connector tasks.
  * **connector_config_iceberg_sink**: Section with the Iceberg Sink connector configuration:
     * **control_topic**: The management topic name, it is used for coordination and managing the data writing process to Iceberg tables.
     * **topics**: Comma-separated list of topics whose data will be transferred to Iceberg tables.
     * **topics_regex**: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.
     * `secrets`: Section with additional settings:
        * **commit_interval_ms**: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is `300000`.
        * **commit_threads**: Number of threads used to commit data to the Iceberg table. The default value is `vCPU × 2`.
        * **commit_timeout_ms**: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is `30000`.
        * **group_id_prefix**: Prefix for the `Consumer Group ID` that the connector uses when reading from Apache Kafka® topics. The default value is `cg-control`.
        * **transactional_prefix**: Prefix for `Transactional ID` that the connector uses when writing to Apache Kafka® within transactions.
     * **dynamic_tables**: Section with settings for dynamic table routing:
        * **route_field**: Field in the message that determines the target table for dynamic routing.
     * **metastore_connection**: Section with Apache Hive™ Metastore connection settings:
        * **catalog_uri**: URI for connection to the Apache Hive™ Metastore cluster in `thrift://<host>:<port>` format.
        * **warehouse**: Root directory for storing managed table data in S3 in `s3a://bucket-name/path/to/warehouse` format.
     * **s3_connection**: Section with S3-compatible storage connection settings:
        * **external_s3**: Section with S3-compatible storage connection settings:
           * **endpoint**: Endpoint for storage access (get it from your storage provider). Example: `storage.yandexcloud.net`.
           * **region**: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

              {% note info %}
              
              Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
              
              {% endnote %}

           * **access_key_id**, **secret_access_key**: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).

     * **static_tables**: Section with settings for static table routing:
        * **tables**: Comma-separated names of Iceberg tables for static table routing.
     * **tables_config**: Section with table settings:
        * **default_commit_branch**: Default branch name. The connector will commit data to this branch of the Iceberg table. The default value is `main`.
        * **default_id_columns**: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.
        * **default_partition_by**: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. Defines the physical placement of data to optimize queries, e.g., `date`, `year`, `month`, `year (timestamp)`, `month (timestamp)`, `days (timestamp)`, and `bucket (16, user_id)`.
        * **evolve_schema_enabled**: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value is `false`.
        * **schema_case_insensitive**: This setting specifies whether the connector should ignore case when matching fields of the incoming message to columns of the Iceberg table. The default value is `false`.
        * **schema_force_optional**: This setting indicates whether to make all fields of the Iceberg table schema nullable (`nullable`), regardless of how they are defined in the incoming message schema. The default value is `false`.

- REST API {#api}

  The Iceberg Sink connector settings are set in the `connectorSpec.connectorConfigIcebergSink` parameter:

  * `topics`: Comma-separated list of topics whose data will be transferred to Iceberg tables.
  * `topicsRegex`: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.

  To select topics, use either the `topics`or `topicsRegex` parameter.
    
  * `controlTopic`: Name of the management topic used for coordination and managing the data writing process to Iceberg tables.
  * `metastoreConnection`: Apache Hive™ Metastore connection settings:
      * `catalogUri`: URI for connection to the Apache Hive™ Metastore cluster in `thrift://<host>:<port>` format.
      * `warehouse`: Root directory for storing managed table data in S3 in `s3a://bucket-name/path/to/warehouse` format.
  * `s3Connection`: S3-compatible storage connection parameters:
      * `externalS3`: External storage parameters:
          * `endpoint`: Endpoint for storage access (get it from your storage provider). Example: `storage.yandexcloud.net`.
          * `region`: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

             {% note info %}
             
             Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
             
             {% endnote %}

          * `accessKeyId`, `secretAccessKey`: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).
  
  * `staticTables`: Section with settings for static table routing:
      * `--tables`: Comma-separated names of Iceberg tables for static table routing.
  * `dynamicTables`: Section with settings for dynamic table routing:
      * `routeField`: Field in the message that determines the target table for dynamic routing.

  To set up table routing, use either the `staticTables` or `dynamicTables` parameter.

  * `tablesConfig`: Section with table settings:
      * The default branch name is `defaultCommitBranch`. The connector will commit data to this branch of the Iceberg table. The default value is `main`.
      * `defaultIdColumns`: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.
      * `defaultPartitionBy`: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. Defines the physical placement of data to optimize queries, e.g., `date`, `year`, `month`, `year (timestamp)`, `month (timestamp)`, `days (timestamp)`, and `bucket (16, user_id)`.
      * `evolveSchemaEnabled`: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value is `false`.
      * `schemaForceOptional`: This setting indicates whether to make all fields of the Iceberg table schema nullable (`nullable`), regardless of how they are defined in the incoming message schema. The default value is `false`.
      * `schemaCaseInsensitive`: This setting specifies whether the connector should ignore case when matching fields of the incoming message to columns of the Iceberg table. The default value is `false`.
  * `controlConfig`: Section with additional settings:
      * `groupIdPrefix`: Prefix for the `Consumer Group ID` that the connector uses when reading from Apache Kafka® topics. The default value is `cg-control`.
      * `commitIntervalMs`: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is `300000`.
      * `commitTimeoutMs`: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is `30000`.
      * `commitThreads`: Number of threads used to commit data to the Iceberg table. The default value is `vCPU × 2`.
      * `transactionalPrefix`: Prefix for `Transactional ID` that the connector uses when writing to Apache Kafka® within transactions.

- gRPC API {#grpc-api}

  The Iceberg Sink connector settings are set in the `connector_spec.connector_config_iceberg_sink` parameter:

  * `topics`: Comma-separated list of topics whose data will be transferred to Iceberg tables.
  * `topics_regex`: Regular expression to select topics whose data will be transferred to Iceberg tables. You can also use a regular expression (`.*`), e.g., `analysis.*`. To migrate all topics, specify `.*`.

  To select topics, use either the `topics` or `topics_regex` parameter.
    
  * `control_topic`: Name of the management topic used for coordination and managing the data writing process to Iceberg tables.
  * `metastore_connection`: Apache Hive™ Metastore connection settings:
      * `catalog_uri`: URI for connection to the Apache Hive™ Metastore cluster in `thrift://<host>:<port>` format.
      * `warehouse`: Root directory for storing managed table data in S3 in `s3a://bucket-name/path/to/warehouse` format.
    * `s3_connection`: S3-compatible storage connection parameters:
        * `externalS3`: External storage parameters:
            * `endpoint`: Endpoint for storage access (get it from your storage provider). Example: `storage.yandexcloud.net`.
            * `region`: Region where the S3-compatible storage bucket resides. The default value is `ru-central1`. You can find the list of available regions [here](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/regions/Regions.html).

                {% note info %}
                
                Some apps designed to work with Amazon S3 do not allow you to specify the region; this is why [Yandex Object Storage](../../storage/index.md) may also accept the main AWS region value, which is the [first row in the table of regions](https://docs.aws.amazon.com/global-infrastructure/latest/regions/aws-regions.html#available-regions).
                
                {% endnote %}

            * `access_key_id`, `secret_access_key`: [AWS-compatible key ID and contents](../../iam/concepts/authorization/access-key.md).
  
    * `static_tables`: Section with settings for static table routing:
       * `--tables`: Comma-separated names of Iceberg tables for static table routing.
    * `dynamic_tables`: Section with settings for dynamic table routing:
       * `route_field`: Field in the message that determines the target table for dynamic routing.

    To set up table routing, use either the `static_tables` or `dynamic_tables` parameter.

    * `tables_config`: Section with table settings:
       * The default branch name is `default_commit_branch`. The connector will commit data to this branch of the Iceberg table. The default value is `main`.
       * `default_id_columns`: Comma-separated list of default columns that define the ID row in Iceberg tables (primary key). This parameter is required when UPSERT mode is enabled.
       * `default_partition_by`: Comma-separated list of columns or transformation expressions for partitioning data in the Iceberg table. Defines the physical placement of data to optimize queries, e.g., `date`, `year`, `month`, `year (timestamp)`, `month (timestamp)`, `days (timestamp)`, and `bucket (16, user_id)`.
       * `evolve_schema_enabled`: This setting specifies whether the connector should automatically update the Iceberg table schema if the schema of incoming messages from Apache Kafka® changes. The default value is `false`.
       * `schema_force_optional`: This setting indicates whether to make all fields of the Iceberg table schema nullable (`nullable`), regardless of how they are defined in the incoming message schema. The default value is `false`.
       * `schema_case_insensitive`: This setting specifies whether the connector should ignore case when matching fields of the incoming message to columns of the Iceberg table. The default value is `false`.
    * `control_config`: Section with additional settings:
       * `group_id_prefix`: Prefix for the `Consumer Group ID` that the connector uses when reading from Apache Kafka® topics. The default value is `cg-control`.
       * `commit_interval_ms`: Specifies how often the connector commits data to the Iceberg table, milliseconds. The default value is `300000`.
       * `commit_timeout_ms`: Specifies how long the coordinator waits for confirmation from all workers before considering the commit failed, milliseconds. The default value is `30000`.
       * `commit_threads`: Number of threads used to commit data to the Iceberg table. The default value is `vCPU × 2`.
       * `transactional_prefix`: Prefix for `Transactional ID` that the connector uses when writing to Apache Kafka® within transactions.

{% endlist %}

## Editing a connector {#update}

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), navigate to the relevant folder.
    1. Navigate to **Managed Service for&nbsp;Kafka**.
    1. Select the cluster and open the **Connectors** tab.
    1. In the connector row, click ![image](../../_assets/console-icons/ellipsis.svg) and select **Edit connector**.
    1. Edit the connector properties as needed.
    1. Click **Save**.

- CLI {#cli}

    If you do not have the Yandex Cloud CLI yet, [install and initialize it](../../cli/quickstart.md#install).

    The folder used by default is the one specified when [creating](../../cli/operations/profile/profile-create.md) the CLI profile. To change the default folder, use the `yc config set folder-id <folder_ID>` command. You can also specify a different folder for any command using `--folder-name` or `--folder-id`. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.

    To edit a [MirrorMaker](#settings-mm2) connector:

    1. View a description of the CLI command to edit a connector:

        ```bash
        yc managed-kafka connector-mirrormaker update --help
        ```

    1. Run this command, e.g., to update the task limit:

        ```bash
        yc managed-kafka connector-mirrormaker update <connector_name> \
           --cluster-name=<cluster_name> \
           --direction=<connector_direction> \
           --tasks-max=<new_task_limit>
        ```

        Where `--direction` is the connector direction, either `ingress` or `egres`.

        You can get the connector name with the [list of cluster connectors](#list), and the cluster name, with the [list of clusters in the folder](cluster-list.md#list-clusters).

    To update an [S3 Sink](#settings-s3) connector:

    1. View the description of the CLI command to edit a connector:

        ```bash
        yc managed-kafka connector-s3-sink update --help
        ```

    1. Run this command, e.g., to update the task limit:

        ```bash
        yc managed-kafka connector-s3-sink update <connector_name> \
           --cluster-name=<cluster_name> \
           --tasks-max=<new_task_limit>
        ```

        You can get the connector name with the [list of cluster connectors](#list), and the cluster name, with the [list of clusters in the folder](cluster-list.md#list-clusters).

    To update an [Iceberg Sink](#settings-iceberg) connector:

    1. View the description of the CLI command to edit a connector:

        ```bash
        yc managed-kafka connector-iceberg-sink update --help
        ```

    1. Run this command, e.g., to update the task limit:

        ```bash
        yc managed-kafka connector-iceberg-sink update <connector_name> \
           --cluster-name=<cluster_name> \
           --tasks-max=<new_task_limit>
        ```

        You can get the connector name with the [list of cluster connectors](#list) and the cluster name with the [list of clusters in the folder](cluster-list.md#list-clusters).

- Terraform {#tf}

    1. Check the list of [MirrorMaker](#settings-mm2), [S3 Sink](#settings-s3), and [Iceberg Sink](#settings-iceberg) connector settings.

    1. Open the current Terraform configuration file describing your infrastructure.

        For information about creating this file, see [Creating a cluster Apache Kafka®](cluster-create.md).

    1. Edit the parameter values in the `yandex_mdb_kafka_connector` resource description:

        * For a MirrorMaker connector:

            ```hcl
            resource "yandex_mdb_kafka_connector" "<connector_name>" {
              cluster_id = "<cluster_ID>"
              name       = "<connector_name>"
              tasks_max  = <task_limit>
              properties = {
                <advanced_properties>
              }
              connector_config_mirrormaker {
                topics             = "<topic_pattern>"
                replication_factor = <replication_factor>
                source_cluster {
                  alias = "<cluster_prefix>"
                  external_cluster {
                    bootstrap_servers           = "<list_of_broker_host_FQDNs>"
                    sasl_username               = "<username>"
                    sasl_password               = "<user_password>"
                    sasl_mechanism              = "<authentication_mechanism>"
                    security_protocol           = "<security_protocol>"
                    ssl-truststore-certificates = "<PEM_certificate_contents>"
                  }
                }
                target_cluster {
                  alias = "<cluster_prefix>"
                  this_cluster {}
                }
              }
            }
            ```

        * For an S3 Sink connector:

            ```hcl
            resource "yandex_mdb_kafka_connector" "<S3_Sink_connector_name>" {
              cluster_id = "<cluster_ID>"
              name       = "<S3_Sink_connector_name>"
              tasks_max  = <task_limit>
              properties = {
                <advanced_properties>
             }
              connector_config_s3_sink {
                topics                = "<topic_pattern>"
                file_max_records      = <file_max_records>
                s3_connection {
                  bucket_name = "<bucket_name>"
                  external_s3 {
                    endpoint          = "<S3_compatible_storage_endpoint>"
                    access_key_id     = "<AWS_compatible_static_key_ID>"
                    secret_access_key = "<AWS_compatible_static_key_contents>"
                  }
                }
              }
            }
            ```
          
        * For an Iceberg Sink connector:

           ```hcl
           resource "yandex_mdb_kafka_connector" "<connector_name>" {
             cluster_id = "<cluster_ID>"
             name       = "<connector_name>"
             tasks_max  = <task_limit>
             properties = {
               <advanced_properties>
             }
             connector_config_iceberg_sink {
               topics             = "<topic_list>"
               control_topic = "<management_topic_name>"

               metastore_connection {
                 catalog_uri = "<URI_for_connecting_to_Metastore_cluster>"
                 warehouse   = "<root_directory_for_storing_managed_table_data_in_S3>"
               }

               s3_connection {
                 external_s3 {
                   endpoint          = "<S3_compatible_storage_endpoint>"
                   access_key_id     = "<AWS_compatible_static_key_ID>"
                   secret_access_key = "<AWS_compatible_static_key_contents>"
                   region            = "<region_name>"
                 }
               }

               tables_config {
                 default_commit_branch    = "<default_branch_name>"
                 default_id_columns       = "<comma_separated_default_column_list>"
                 default_partition_by     = "<list_of_columns_or_transformation_expressions>"
                 evolve_schema_enabled    = <automatically_update_Iceberg_table_schema>
                 schema_force_optional    = <make_Iceberg_table_schema_fields_optional>
                 schema_case_insensitive  = <ignore_case_when_matching_fields>
               }

               control_config {
                 group_id_prefix      = "<prefix_for_Consumer_Group_ID>"
                 commit_interval_ms   = <Iceberg_table_data_commit_interval>
                 commit_timeout_ms    = <how_long_the_coordinator_waits_for_confirmation>
                 commit_threads       = <number_of_threads_for_committing_data_to_Iceberg_table>
                 transactional_prefix = "<prefix_for_Transactional_ID>"
               }
             }
           }
           ```

    1. Make sure the settings are correct.

       1. In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
       1. Run this command:
       
          ```bash
          terraform validate
          ```
       
          Terraform will show any errors found in your configuration files.

    1. Confirm resource changes.

       1. Run this command to view the planned changes:
       
          ```bash
          terraform plan
          ```
       
          If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
       
       1. If everything looks correct, apply the changes:
          1. Run this command:
       
             ```bash
             terraform apply
             ```
       
          1. Confirm updating the resources.
          1. Wait for the operation to complete.

    For more information, see [this Terraform provider guide](../../terraform/resources/mdb_kafka_connector.md).

- REST API {#api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Call the [Connector.update](../api-ref/Connector/update.md) method, e.g., via the following [cURL](https://curl.se/) request:

       {% note warning %}
       
       The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the `updateMask` parameter as a single comma-separated string.
       
       {% endnote %}

       ```bash
       curl \
         --request PATCH \
         --header "Authorization: Bearer $IAM_TOKEN" \
         --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>' \
         --data '{
                   "updateMask": "connectorSpec.tasksMax,connectorSpec.properties,connectorSpec.connectorConfigMirrormaker.<Mirrormaker_1_connector_setting>,...,connectorSpec.connectorConfigMirrormaker.<Mirrormaker_N_connector_setting>,connectorSpec.connectorConfigS3Sink.<S3_Sink_1_connector_setting>,...,connectorSpec.connectorConfigS3Sink.<S3_Sink_N_connector_setting>,connectorSpec.connectorConfigIcebergSink.<IcebergSink_1_connector_setting>,...,connectorSpec.connectorConfigIcebergSink.<IcebergSink_N_connector_setting>",
                   "connectorSpec": {
                     "tasksMax": "<task_limit>"
                     "properties": "<advanced_connector_properties>"
                     "connectorConfigMirrormaker": {
                       <Mirrormaker_connector_settings>
                     },
                     "connectorConfigS3Sink": {
                       <S3_Sink_connector_settings>
                     },
                     "connectorConfigIcebergSink": {
                        <IcebergSink_connector_settings>
                      }
                   }
                 }'
       ```

       Where:

       * `updateMask`: Comma-separated string of connector settings to update.

            Specify the relevant parameters:
            * `connectorSpec.tasksMax`: To change the connector task limit.
            * `connectorSpec.properties`: To change the connector’s advanced properties.
            * `connectorSpec.connectorConfigMirrormaker.<configuring_Mirrormaker_connector>`: To update the [Mirrormaker](#settings-mm2) connector settings.
            * `connectorSpec.connectorConfigS3Sink.<configuring_S3_Sink_connector>`: To update the [S3 Sink](#settings-s3) connector settings.
            * `connectorSpec.connectorConfigIcebergSink.<IcebergSink_connector_configuration_setup>`: To update the [Iceberg Sink](#settings-iceberg) connector settings.

       * `connectorSpec`: Specify connector settings, MirrorMaker, S3 Sink, or Iceberg Sink.

       You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/Connector/update.md#yandex.cloud.operation.Operation) to make sure your request was successful.

- gRPC API {#grpc-api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Clone the [cloudapi](https://github.com/yandex-cloud/cloudapi) repository:
       
       ```bash
       cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
       ```
       
       Below, we assume that the repository contents reside in the `~/cloudapi/` directory.

    1. Call the [ConnectorService/Update](../api-ref/grpc/Connector/update.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

        {% note warning %}
        
        The API method will assign default values to all the parameters of the object you are modifying unless you explicitly provide them in your request. To avoid this, list the settings you want to change in the `update_mask` parameter as an array of `paths[]` strings.
        
        {% cut "Format for listing settings" %}
        
        ```yaml
        "update_mask": {
            "paths": [
                "<setting_1>",
                "<setting_2>",
                ...
                "<setting_N>"
            ]
        }
        ```
        
        {% endcut %}
        
        {% endnote %}

        ```bash
        grpcurl \
          -format json \
          -import-path ~/cloudapi/ \
          -import-path ~/cloudapi/third_party/googleapis/ \
          -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
          -rpc-header "Authorization: Bearer $IAM_TOKEN" \
          -d '{
                "cluster_id": "<cluster_ID>",
                "connector_name": "<connector_name>",
                "update_mask": {
                  "paths": [
                    "connector_spec.tasks_max",
                    "connector_spec.properties",
                    "connector_spec.connector_config_mirrormaker.<Mirrormaker_1_connector_setting>",
                    ...,
                    "connector_spec.connector_config_mirrormaker.<Mirrormaker_N_connector_setting>",
                    "connector_spec.connector_config_s3_sink.<S3_Sink_1_connector_setting>",
                    ...,
                    "connector_spec.connector_config_s3_sink.<S3-Sink_N_connector_setting>",
                    "connector_spec.connector_config_iceberg_sink.<IcebergSink_1_setting>",
                    ...,
                    "connector_spec.connector_config_iceberg_sink.<IcebergSink_N_connector_setting>"
                  ]
                },
                "connector_spec": {
                  "tasks_max": {
                    "value": "<task_limit>"
                  },
                  "properties": "<advanced_connector_properties>"
                  "connector_config_mirrormaker": {
                    <Mirrormaker_connector_settings>
                  },
                  "connector_config_s3_sink": {
                    <S3_Sink_connector_settings>
                  },
                  "connector_config_iceberg_sink": {
                    <IcebergSink_connector_settings>
                  }
                }
              }' \
          mdb.api.cloud.yandex.net:443 \
          yandex.cloud.mdb.kafka.v1.ConnectorService.Update
        ```

        Where:

        * `update_mask`: List of connector settings you want to update as an array of strings (`paths[]`).

            Specify the relevant parameters:
            * `connector_spec.tasks_max`: To change the connector task limit.
            * `connector_spec.properties`: To change the connector’s advanced properties.
            * `connector_spec.connector_config_mirrormaker.<configuring_Mirrormaker_connector>`: To update the [Mirrormaker](#settings-mm2) connector settings.
            * `connector_spec.connector_config_s3_sink.<configuring_S3_Sink_connector>`: To update the [S3 Sink](#settings-s3) connector settings.
            * `connector_spec.connector_config_iceberg_sink.<IcebergSink_connector_configuration_setup>`: To update the [Iceberg Sink](#settings-iceberg) connector settings.
        * `connector_spec`: Specify the MirrorMaker or S3 Sink connector settings.

        You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/grpc/Connector/update.md#yandex.cloud.operation.Operation) to make sure your request was successful.

{% endlist %}

## Pausing a connector {#pause}

When you pause a connector, the system:

* Terminates the sink connection.
* Deletes data from the connector service topics.

To pause a connector:

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), navigate to the relevant folder.
    1. Navigate to **Managed Service for&nbsp;Kafka**.
    1. Select the cluster and open the **Connectors** tab.
    1. Click ![ellipsis](../../_assets/console-icons/ellipsis.svg) next to the connector name and select **Pause**.

- CLI {#cli}

    If you do not have the Yandex Cloud CLI yet, [install and initialize it](../../cli/quickstart.md#install).

    The folder used by default is the one specified when [creating](../../cli/operations/profile/profile-create.md) the CLI profile. To change the default folder, use the `yc config set folder-id <folder_ID>` command. You can also specify a different folder for any command using `--folder-name` or `--folder-id`. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.

    To pause a connector, run this command:

    ```bash
    yc managed-kafka connector pause <connector_name> \
       --cluster-name=<cluster_name>
    ```

- REST API {#api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Call the [Connector.pause](../api-ref/Connector/pause.md) method, e.g., via the following [cURL](https://curl.se/) request:

       ```bash
       curl \
         --request POST \
         --header "Authorization: Bearer $IAM_TOKEN" \
         --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/pause/<connector_name>'
       ```

       You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/Connector/pause.md#yandex.cloud.operation.Operation) to make sure your request was successful.

- gRPC API {#grpc-api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Clone the [cloudapi](https://github.com/yandex-cloud/cloudapi) repository:
       
       ```bash
       cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
       ```
       
       Below, we assume that the repository contents reside in the `~/cloudapi/` directory.

    1. Call the [ConnectorService/Pause](../api-ref/grpc/Connector/pause.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

        ```bash
        grpcurl \
          -format json \
          -import-path ~/cloudapi/ \
          -import-path ~/cloudapi/third_party/googleapis/ \
          -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
          -rpc-header "Authorization: Bearer $IAM_TOKEN" \
          -d '{
                "cluster_id": "<cluster_ID>",
                "connector_name": "<connector_name>"
              }' \
          mdb.api.cloud.yandex.net:443 \
          yandex.cloud.mdb.kafka.v1.ConnectorService.Pause
        ```

        You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/grpc/Connector/pause.md#yandex.cloud.operation.Operation) to make sure your request was successful.

{% endlist %}

## Resuming a connector {#resume}

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), navigate to the relevant folder.
    1. Navigate to **Managed Service for&nbsp;Kafka**.
    1. Select the cluster and open the **Connectors** tab.
    1. Click ![ellipsis](../../_assets/console-icons/ellipsis.svg) next to the connector name and select **Resume**.

- CLI {#cli}

    If you do not have the Yandex Cloud CLI yet, [install and initialize it](../../cli/quickstart.md#install).

    The folder used by default is the one specified when [creating](../../cli/operations/profile/profile-create.md) the CLI profile. To change the default folder, use the `yc config set folder-id <folder_ID>` command. You can also specify a different folder for any command using `--folder-name` or `--folder-id`. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.

    To resume a connector, run this command:

    ```bash
    yc managed-kafka connector resume <connector_name> \
       --cluster-name=<cluster_name>
    ```

- REST API {#api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Call the [Connector.pause](../api-ref/Connector/resume.md) method, e.g., via the following [cURL](https://curl.se/) request:

       ```bash
       curl \
         --request POST \
         --header "Authorization: Bearer $IAM_TOKEN" \
         --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/resume/<connector_name>'
       ```

       You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/Connector/resume.md#yandex.cloud.operation.Operation) to make sure your request was successful.

- gRPC API {#grpc-api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Clone the [cloudapi](https://github.com/yandex-cloud/cloudapi) repository:
       
       ```bash
       cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
       ```
       
       Below, we assume that the repository contents reside in the `~/cloudapi/` directory.

    1. Call the [ConnectorService/Resume](../api-ref/grpc/Connector/resume.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

        ```bash
        grpcurl \
          -format json \
          -import-path ~/cloudapi/ \
          -import-path ~/cloudapi/third_party/googleapis/ \
          -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
          -rpc-header "Authorization: Bearer $IAM_TOKEN" \
          -d '{
                "cluster_id": "<cluster_ID>",
                "connector_name": "<connector_name>"
              }' \
          mdb.api.cloud.yandex.net:443 \
          yandex.cloud.mdb.kafka.v1.ConnectorService.Resume
        ```

        You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/grpc/Connector/resume.md#yandex.cloud.operation.Operation) to make sure your request was successful.

{% endlist %}

## Importing a connector to Terraform {#import}

You can import the existing connectors to manage them with Terraform.

{% list tabs group=instructions %}

- Terraform {#tf}

    1. In the Terraform configuration file, specify the connector you want to import:

        ```hcl
        resource "yandex_mdb_kafka_cluster" "<connector_name>" {}
        ```

    1. Run the following command to import your connector:

        ```hcl
        terraform import yandex_mdb_kafka_connector.<connector_name> <cluster_ID>:<connector_name>
        ```

        To learn more about importing connectors, see [this Terraform provider guide](../../terraform/resources/mdb_kafka_connector.md#import).

{% endlist %}

## Deleting a connector {#delete}

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), navigate to the relevant folder.
    1. Navigate to **Managed Service for&nbsp;Kafka**.
    1. Select the cluster and open the **Connectors** tab.
    1. Click ![ellipsis](../../_assets/console-icons/ellipsis.svg) next to the connector name and select **Delete**.
    1. Click **Delete**.

- CLI {#cli}

    If you do not have the Yandex Cloud CLI yet, [install and initialize it](../../cli/quickstart.md#install).

    The folder used by default is the one specified when [creating](../../cli/operations/profile/profile-create.md) the CLI profile. To change the default folder, use the `yc config set folder-id <folder_ID>` command. You can also specify a different folder for any command using `--folder-name` or `--folder-id`. If you access a resource by its name, the search will be limited to the default folder. If you access a resource by its ID, the search will be global, i.e., through all folders based on access permissions.

    To delete a connector, run this command:

    ```bash
    yc managed-kafka connector delete <connector_name> \
       --cluster-name <cluster_name>
    ```

- Terraform {#tf}

    1. Open the current Terraform configuration file describing your infrastructure.

        For information about creating this file, see [Creating a cluster Apache Kafka®](cluster-create.md).

    1. Delete the `yandex_mdb_kafka_connector` resource with the connector description.
    1. Make sure the settings are correct.

        1. In the command line, navigate to the directory that contains the current Terraform configuration files defining the infrastructure.
        1. Run this command:
        
           ```bash
           terraform validate
           ```
        
           Terraform will show any errors found in your configuration files.

    1. Confirm resource changes.

        1. Run this command to view the planned changes:
        
           ```bash
           terraform plan
           ```
        
           If you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
        
        1. If everything looks correct, apply the changes:
           1. Run this command:
        
              ```bash
              terraform apply
              ```
        
           1. Confirm updating the resources.
           1. Wait for the operation to complete.

    For more information, see [this Terraform provider guide](../../terraform/resources/mdb_kafka_connector.md).

- REST API {#api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Call the [Connector.pause](../api-ref/Connector/delete.md) method, e.g., via the following [cURL](https://curl.se/) request:

       ```bash
       curl \
         --request DELETE \
         --header "Authorization: Bearer $IAM_TOKEN" \
         --url 'https://mdb.api.cloud.yandex.net/managed-kafka/v1/clusters/<cluster_ID>/connectors/<connector_name>'
       ```

       You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/Connector/delete.md#yandex.cloud.operation.Operation) to make sure your request was successful.

- gRPC API {#grpc-api}

    1. [Get an IAM token for API authentication](../api-ref/authentication.md) and put it into an environment variable:

       ```bash
       export IAM_TOKEN="<IAM_token>"
       ```

    1. Clone the [cloudapi](https://github.com/yandex-cloud/cloudapi) repository:
       
       ```bash
       cd ~/ && git clone --depth=1 https://github.com/yandex-cloud/cloudapi
       ```
       
       Below, we assume that the repository contents reside in the `~/cloudapi/` directory.

    1. Call the [ConnectorService/Delete](../api-ref/grpc/Connector/delete.md) method, e.g., via the following [gRPCurl](https://github.com/fullstorydev/grpcurl) request:

        ```bash
        grpcurl \
          -format json \
          -import-path ~/cloudapi/ \
          -import-path ~/cloudapi/third_party/googleapis/ \
          -proto ~/cloudapi/yandex/cloud/mdb/kafka/v1/connector_service.proto \
          -rpc-header "Authorization: Bearer $IAM_TOKEN" \
          -d '{
                "cluster_id": "<cluster_ID>",
                "connector_name": "<connector_name>"
              }' \
          mdb.api.cloud.yandex.net:443 \
          yandex.cloud.mdb.kafka.v1.ConnectorService.Delete
        ```

        You can get the cluster ID with the [list of clusters in the folder](cluster-list.md#list-clusters), and the connector name, with the [list of connectors in the cluster](#list).

    1. Check the [server response](../api-ref/grpc/Connector/delete.md#yandex.cloud.operation.Operation) to make sure your request was successful.

{% endlist %}