# Working with the managed schema registry via the REST API

In Managed Service for Apache Kafka® clusters, you can work with [Managed Schema Registry](../concepts/managed-schema-registry.md#msr) either [using Apache Kafka® clients](managed-schema-registry.md) for various programming languages or via the [REST API](../concepts/available-apis.md#managed-kafka-api).

Managed Service for Apache Kafka® also provides the [REST API for Apache Kafka®](../concepts/available-apis.md#managed-kafka-api). Among other things, this API enables you to send and receive messages without using third-party producers and consumers. This tutorial will also demonstrate these features.

To explore the REST API features for Managed Schema Registry and Apache Kafka®:

1. [Create data format schemas](#create-schemas).
1. [Send messages to a topic](#send-messages).
1. [Read messages from the topic](#receive-messages).
1. [Delete the resources you created](#clear-out).


## Required paid resources {#paid-resources}

The support cost includes:

* Managed Service for Apache Kafka® cluster fee, which covers the use of computing resources allocated to hosts (including ZooKeeper hosts) and disk space (see [Apache Kafka® pricing](../pricing.md)).
* Fee for using public IP addresses (see [Virtual Private Cloud pricing](../../vpc/pricing.md)).


## Getting started {#before-you-begin}

### Set up your infrastructure {#deploy-infrastructure}

{% list tabs group=instructions %}

- Manually {#manual}

    1. [Create a Managed Service for Apache Kafka® cluster](../operations/cluster-create.md) of any suitable configuration.

        When creating a cluster, enable the following options:

        * **Schema registry**.

            The cluster will deploy a Managed Schema Registry schema registry, making the REST API for Managed Schema Registry available.

        * **Kafka Rest API**.

            The REST API for Apache Kafka® will become available in the cluster.

        * **Public access**.

            {% note info %}
            
            Public access to cluster hosts is required if you plan to connect to the cluster via the internet. This connection option is simpler and is recommended for the purposes of this guide. You can connect to non-public hosts as well but only from Yandex Cloud virtual machines located in the same cloud network as the cluster.
            
            {% endnote %}

    1. [Create a topic](../operations/cluster-topics.md#create-topic) named `messages` for exchanging messages between the producer and the consumer.

    1. [Create a user](../operations/cluster-accounts.md#create-account) named `user1` and [grant them permissions](../operations/cluster-accounts.md#grant-permission) for the `messages` topic:

        * `ACCESS_ROLE_CONSUMER`
        * `ACCESS_ROLE_PRODUCER`

        This user will be able to send and receive messages within the topic, as well as [perform any operations on Managed Schema Registry subjects](../concepts/managed-schema-registry.md#msr-auth) that are associated with the topic.

    1. [Complete all pre-configuration steps to connect to the cluster](../operations/connect/index.md).

{% endlist %}

### Install utilities {#install-utilities}

1. Install [cURL](https://curl.se/):

    ```bash
    sudo apt install curl -y
    ```

    It will be used to send API requests.

    For convenience, this tutorial will use the [--user](https://curl.se/docs/manpage.html#-u) cURL option when sending requests to the API. With this option used, cURL will automatically add the [Authorization](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Authorization) HTTP header with the value required for [authorization](../concepts/available-apis.md#managed-kafka-api-usage) to the request.

    {% note tip %}

    You can build the `Authorization` header yourself, e.g., if not using cURL.

    {% endnote %}

1. Install [jq](https://github.com/jqlang/jq):

    ```bash
    sudo apt install jq -y
    ```

    You can use it to convert schema descriptions to the required format.

    When using the REST API for Managed Schema Registry, you need to provide schema descriptions as an escaped character string, e.g.:

    ```json
    "schema": "{\"type\": \"record\", \"name\": \"Obj\", \"fields\":[...]}"
    ```

    For convenience, this tutorial shows schemas as JSON documents with indentations and line breaks. When sending API requests, schemas are formatted as required using `jq`.

    {% note tip %}

    When sending a REST API request via `cURL`, the server response comes as a single JSON string.

    You can further process the command output from this tutorial using `jq` to make the server response more readable.

    {% endnote %}

## Create data format schemas {#create-schemas}

{% note info %}

This tutorial uses the [Avro](https://avro.apache.org/docs/1.12.0/specification/) type schemas.

You can use other schema types supported in Managed Schema Registry.

{% endnote %}

Let's assume an Apache Kafka® message in the `messages` topic must consist of a key and a value in the following format:

#|
|| **Key** | **Value** ||
||

```json
{
  "id": <int>,
  "sid": "<string>"
}
```

|

```json
{
  "name": "<string>",
  "city": "<string>",
  "age": <int>
}
```

||
|#

Create the relevant data format schemas:

1. Create a file named `schema-key.json` containing the data format schema for the Apache Kafka® message key.

    {% cut "schema-key.json" %}

    ```json
    {
      "type": "record",
      "name": "my_key",
      "fields": [
        {
          "name": "id",
          "type": "int"
        },
        {
          "name": "sid",
          "type": "string"
        }
      ]
    }
    ```

    {% endcut %}

1. Create a data format schema for the Apache Kafka® message key.

    The [subject name for the schema](../concepts/managed-schema-registry.md#subjects) must consist of the name of the topic the schema will be used in (`messages`) and the `-key` suffix.

    Call the [POST /subjects/(subject)/versions](https://docs.confluent.io/platform/6.1/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions) REST API method for Managed Schema Registry, e.g., via the following request:

    ```bash
    jq \
        -n --slurpfile data schema-key.json \
        '{
           "schemaType": "AVRO",
           "schema": "\($data)"
        }' \
    | curl \
          --request POST \
          --url 'https://<broker_host_FQDN>:443/subjects/messages-key/versions' \
          --user user1:<user_password> \
          --header 'Content-Type: application/vnd.schemaregistry.v1+json' \
          --data "@-"
    ```

    The response to the request will return the new schema ID, e.g., `{"id":1}`.

1. Create a file named `schema-value.json` containing the data format schema for the Apache Kafka® message value.

    {% cut "schema-value.json" %}

    ```json
    {
      "type": "record",
      "name": "my_value",
      "fields": [
        {
          "name": "name",
          "type": "string"
        },
        {
          "name": "city",
          "type": "string"
        },
        {
          "name": "age",
          "type": "int"
        }
      ]
    }
    ```

    {% endcut %}

1. Create a data format schema for the Apache Kafka® message value.

    The [subject name for the schema](../concepts/managed-schema-registry.md#subjects) must consist of the name of the topic the schema will be used in (`messages`) and the `-value` suffix.

    Call the [POST /subjects/(subject)/versions](https://docs.confluent.io/platform/6.1/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions) REST API method for Managed Schema Registry, e.g., via the following request:

    ```bash
    jq \
        -n --slurpfile data schema-value.json \
        '{
           "schemaType": "AVRO",
           "schema": "\($data)"
        }' \
    | curl \
          --request POST \
          --url 'https://<broker_host_FQDN>:443/subjects/messages-value/versions' \
          --user user1:<user_password> \
          --header 'Content-Type: application/vnd.schemaregistry.v1+json' \
          --data "@-"
    ```

    The response to the request will return the new schema ID, e.g., `{"id":2}`.

## Send messages to a topic {#send-messages}

1. Get the data format schema IDs for the key and the value.

    Call the `GET /schemas` REST API method for Managed Schema Registry, e.g., via the following request:

    ```bash
    curl \
        --request GET \
        --url 'https://<broker_host_FQDN>:443/schemas' \
        --user user1:<user_password> \
        --header 'Accept: application/vnd.schemaregistry.v1+json'
    ```

    The response to the request contains data format schema IDs (`id`). You will need these IDs later.

    {% cut "Response example:" %}

     For brevity, the data format schema named `schema` in the form of JSON strings is not provided.

    ```json
    [
      {
        "id": 1,
        "schema": "<data_format_schema>",
        "schemaType": "AVRO",
        "subject": "messages-key",
        "version": 1
      },
      {
        "id": 2,
        "schema": "<data_format_schema>",
        "schemaType": "AVRO",
        "subject": "messages-value",
        "version": 1
      }
    ]
    ```

    {% endcut %}

1. Create a file named `message-list.json` containing two messages. For each message, the key and the value are specified according to the data format schemas created earlier.

    {% cut "message-list.json" %}

    ```json
    [
      {
        "key": {
          "id": 1111,
          "sid": "AAAAA-BBBBB-CCCCC"
        },
        "value": {
          "name": "Anna",
          "city": "Moscow",
          "age": 44
        }
      },
      {
        "key": {
          "id": 2222,
          "sid": "DDDDD-EEEEE-FFFFF"
        },
        "value": {
          "name": "Alex",
          "city": "London",
          "age": 32
        }
      }
    ]
    ```

    {% endcut %}

1. Send messages to the `messages` topic.

    Call the [POST /topics/(topic)](https://docs.confluent.io/platform/6.1/kafka-rest/api.html#post--topics-(string-topic_name)) REST API method for Apache Kafka®, e.g., via the following request:

    ```bash
    jq \
        -n --slurpfile data message-list.json \
        '{
          "key_schema_id": <messages-key_schema_ID>,
          "value_schema_id": <messages-value_schema_ID>,
          "records": $data.[]
        }' \
    | curl \
          --request POST \
          --url 'https://<broker_host_FQDN>:443/topics/messages' \
          --user user1:<user_password> \
          --header 'Content-Type: application/vnd.kafka.avro.v2+json' \
          --header 'Accept: application/vnd.kafka.v2+json' \
          --data "@-"
    ```

    The schema ID values ​​were obtained earlier by requesting the `GET /schemas` endpoint.

    {% cut "Response example:" %}

    ```json
    {
      "key_schema_id": 1,
      "offsets": [
        {
          "offset": 0,
          "partition": 0
        },
        {
          "offset": 0,
          "partition": 1
        }
      ],
      "value_schema_id": 2
    }
    ```

    {% endcut %}

## Read messages from the topic {#receive-messages}

1. Create a consumer named `my-consumer` in the consumer group named `my-group`.

    Call the [POST /consumers/(group)](https://docs.confluent.io/platform/6.1/kafka-rest/api.html#post--consumers-(string-group_name)) REST API method for Apache Kafka®, e.g., via the following request:

    ```bash
    curl \
        --request POST \
        --url 'https://<broker_host_FQDN>:443/consumers/my-group' \
        --user user1:<user_password> \
        --header 'Content-Type: application/vnd.kafka.v2+json' \
        --header 'Accept: application/vnd.kafka.v2+json' \
        --data '{
                  "name": "my-consumer",
                  "format": "avro",
                  "auto.offset.reset": "earliest"
                }'
    ```

    {% cut "Response example:" %}

    ```json
    {
      "base_uri": "https://<broker_host_FQDN>:443/consumers/my-group/instances/my-consumer",
      "instance_id": "my-consumer"
    }
    ```

    {% endcut %}

1. Subscribe to the `messages` topic as `my-consumer` from the`my-group` consumer group.

    Call the [POST /consumers/(group)/instances/(instance)/subscription](https://docs.confluent.io/platform/6.1/kafka-rest/api.html#post--consumers-(string-group_name)-instances-(string-instance)-subscription) REST API method for Apache Kafka®, e.g., via the following request:

    ```bash
    curl \
        --request POST \
        --url 'https://<broker_host_FQDN>:443/consumers/my-group/instances/my-consumer/subscription' \
        --user user1:<user_password> \
        --header 'Content-Type: application/vnd.kafka.v2+json' \
        --header 'Accept: application/vnd.kafka.v2+json' \
        --data '{"topics": ["messages"]}'
    ```

    The API server does not return a response body for this request, only an HTTP status code.

1. Read all messages from the `messages` topic as `my-consumer` from the `my-group` consumer group.

    Call the [GET /consumers/(group)/instances/(instance)/records](https://docs.confluent.io/platform/6.1/kafka-rest/api.html#get--consumers-(string-group_name)-instances-(string-instance)-records) REST API method for Apache Kafka®, e.g., via the following request:

    ```bash
    curl \
        --request GET \
        --url 'https://<broker_host_FQDN>:443/consumers/my-group/instances/my-consumer/records' \
        --user user1:<user_password> \
        --header 'Accept: application/vnd.kafka.avro.v2+json'
    ```

    If the response to the request contains messages that were sent earlier, this means the producer and consumer are successfully interpreting the messages in accordance with the specified data format schemas.

    {% cut "Response example:" %}

    ```json
    [
      {
        "key": {
          "id": 2222,
          "sid": "DDDDD-EEEEE-FFFFF"
        },
        "offset": 0,
        "partition": 1,
        "timestamp": 1726031054186,
        "topic": "messages",
        "value": {
          "age": 32,
          "city": "London",
          "name": "Alex"
        }
      },
      {
        "key": {
          "id": 1111,
          "sid": "AAAAA-BBBBB-CCCCC"
        },
        "offset": 0,
        "partition": 0,
        "timestamp": 1726031054186,
        "topic": "messages",
        "value": {
          "age": 44,
          "city": "Moscow",
          "name": "Anna"
        }
      }
    ]
    ```

    {% endcut %}

## Delete the resources you created {#clear-out}

Delete the resources you no longer need to avoid paying for them:

* [Delete the Managed Service for Apache Kafka® cluster](../operations/cluster-delete.md).
* If you reserved public static IP addresses, release and [delete them](../../vpc/operations/address-delete.md).