# Yandex Managed Service for Apache Spark™ integration with Apache Hive™ Metastore

# Yandex Managed Service for Apache Spark™ integration with Apache Hive™ Metastore


In a PySpark job, you can use the global Hive catalog. To do it, you need to connect a Apache Hive™ Metastore cluster to your Yandex Managed Service for Apache Spark™ cluster.

Apache Hive™ Metastore provides for:
* Centralized storage of metadata on databases, tables, and partitions.
* Simplified access to data without specifying paths and schemas manually.
* Storing table and column statistics for query optimization.

This tutorial shows an example of working with a table in a Yandex Object Storage bucket from a PySpark job using the global Hive catalog. The database metadata is saved to the Apache Hive™ Metastore cluster before being exported to an output bucket.

To implement the above example:

1. [Set up your infrastructure](#infra).
1. [Prepare and run a PySpark job](#prepare-job).
1. [Check the result](#check-result).

If you no longer need the resources you created, [delete them](#clear-out).

{% note info %}

Yandex Managed Service for Apache Spark™ cluster integration with Apache Hive™ Metastore allows using the Apache Iceberg™ table format in Spark jobs. For more information, see [Working with an Apache Iceberg™ table from a PySpark job](../../managed-spark/tutorials/spark-simple-rw-job.md).

{% endnote %}


## Required paid resources {#paid-resources}

* Object Storage buckets: use of storage, data operations (see [Object Storage pricing](../../storage/pricing.md)).
* Yandex Cloud Logging: amount of written data and its retention time (see [Cloud Logging pricing](../../logging/pricing.md)).
* Yandex Managed Service for Apache Spark™ cluster: computing resources of cluster components (see [Yandex Managed Service for Apache Spark™ pricing](../../managed-spark/pricing.md)).
* Apache Hive™ Metastore cluster: computing resources of cluster components (see [Yandex MetaData Hub pricing](../pricing.md)).


## Set up your infrastructure {#infra}

{% list tabs group=instructions %}

- Management console {#console}

    1. [Create a service account](../../iam/operations/sa/create.md) named `spark-agent` for the Yandex Managed Service for Apache Spark™ cluster and assign it the [managed-spark.integrationProvider](../../iam/roles-reference.md#managed-spark-integrationProvider) role to enable the Yandex Managed Service for Apache Spark™ cluster to interact with other resources.

    1. [Create a service account](../../iam/operations/sa/create.md) named `metastore-agent` and assign it the [managed-metastore.integrationProvider](../../iam/roles-reference.md#managed-metastore-integrationProvider) and [storage.uploader](../../iam/roles-reference.md#storage-uploader) roles to enable your Apache Hive™ Metastore cluster to [interact with other resources](../concepts/metastore-impersonation.md) and export metadata to the Object Storage bucket.

    1. [Create buckets](../../storage/operations/buckets/create.md):

        * One for the PySpark job source code.
        * One for output data.

    1. [Grant permissions](../../storage/operations/buckets/edit-acl.md) to the `spark-agent` service account for the created buckets:

        * Bucket for the PySpark job source code: `READ`.
        * Bucket for output data: `READ and WRITE`.

    1. [Grant](../../storage/operations/buckets/edit-acl.md) the `READ and WRITE` permissions for the output bucket to the `metastore-agent` service account.

    1. [Create a cloud network](../../vpc/operations/network-create.md) named `integration-network`.

        This will automatically create three subnets in different availability zones.

    1. For the Yandex Managed Service for Apache Spark™ cluster, [create a security group](../../vpc/operations/security-group-create.md) named `spark-sg` in `integration-network`. Add the following rule to it:

        * For outgoing traffic, to allow Yandex Managed Service for Apache Spark™ cluster connections to Apache Hive™ Metastore:

            * Port range: `9083`
            * Protocol: `Any`
            * Destination: `CIDR`
            * CIDR blocks: `0.0.0.0/0`

    1. For the Apache Hive™ Metastore cluster, [create a security group](../../vpc/operations/security-group-create.md) named `metastore-sg` in `integration-network`. Add the following rules to it:

        * For incoming client traffic:

            * Port range: `30000-32767`
            * Protocol: `Any`
            * Source: `CIDR`
            * CIDR blocks: `0.0.0.0/0`

        * For incoming load balancer traffic:

            * Port range: `10256`
            * Protocol: `Any`
            * Source: `Load balancer health checks`

    1. [Create a Apache Hive™ Metastore cluster](../operations/metastore/cluster-create.md) with the following parameters:

        * **Service account**: `metastore-agent`.
        * **Version**: `3.1`.
        * **Network**: `integration-network`.
        * **Subnet**: `integration-network-ru-central1-a`.
        * **Security groups**: `metastore-sg`.

    1. [Create a Yandex Managed Service for Apache Spark™ cluster](../../managed-spark/operations/cluster-create.md) with the following parameters:

        * **Service account**: `spark-agent`.
        * **Network**: `integration-network`.
        * **Subnet**: `integration-network-ru-central1-a`.
        * **Security groups**: `spark-sg`.
        * **Metastore**: Apache Hive™ Metastore cluster you created earlier.

{% endlist %}

## Prepare a PySpark job {#prepare-job}

For a PySpark job, we will use a Python script that creates a database named `database_1` and a table named `table_1`. To allow the Yandex Managed Service for Apache Spark™ cluster to access the global Apache Hive™ Metastore catalog, call the `enableHiveSupport()` method in the script. The script will be stored in the Object Storage bucket.

Prepare a script file:

{% list tabs group=instructions %}

- Management console {#console}

    1. Create a local file named `job-create-table.py` and paste the following script to it:

        {% cut "job-create-table.py" %}

        ```python
        import random
        import sys
        from pyspark.sql import SparkSession
        
        
        def prepare_table(spark, database, table):
            create_database_sql = "create database if not exists {database}"
            create_table_sql = """
            create table if not exists {database}.{table} (
                id int,
                value double
            )
            """
            truncate_table_sql = "truncate table {database}.{table}"
        
            spark.sql(create_database_sql.format(database=database))
            spark.sql(create_table_sql.format(database=database, table=table))
            spark.sql(truncate_table_sql.format(database=database, table=table))
        
        
        def write_data(spark, database, table):
            data = [(i, random.random()) for i in range(100_000)]
            # Creating a dataframe
            df = spark.createDataFrame(data, schema=['id', 'value'])
            table_full_name = "{database}.{table}".format(database=database, table=table)
            df.write.mode('overwrite').format('json').saveAsTable(table_full_name)
        
        
        def main():
            # Creating a Spark session
            spark = (
                SparkSession
                .builder
                .appName('job-create-table')
                .enableHiveSupport()
                .config('spark.sql.warehouse.dir', sys.argv[1])
                .getOrCreate()
            )
            database, table = 'database_1', 'table_1'
            prepare_table(spark, database, table)
            write_data(spark, database, table)
        
        
        if __name__ == '__main__':
            if len(sys.argv) != 2:
                print("Usage: job-create-table s3a://<bucket>/<folder>", file=sys.stderr)
                sys.exit(-1)
            main()
        ```

        {% endcut %}

    1. In the source code bucket, create a folder named `scripts` and [upload](../../storage/operations/objects/upload.md#simple) the `job-create-table.py` file to this folder.
    1. In the output bucket, create the `warehouse` folder to load data from `database_1` to.
    1. [Create a job](../../managed-spark/operations/jobs-pyspark.md) with the following settings:
        * **Job type**: **PySpark**
        * **Main python file**: `s3a://<source_code_bucket>/scripts/job-create-table.py`
        * **Arguments**: `s3a://<bucket_for_output_data>/warehouse`

{% endlist %}

## Check the result {#check-result}

{% list tabs group=instructions %}

- Management console {#console}

    1. In the [management console](https://console.yandex.cloud), select a folder.
    1. Navigate to **Managed Service for Apache Spark™**.
    1. Click the name of your cluster and select the **Jobs** tab.
    1. Wait for the PySpark job you created to change its status to **Done**.
    1. Make sure the file with data from `database_1` appears in the `warehouse` folder in your output data bucket.
    1. Make sure the Apache Hive™ Metastore cluster has the metadata on `database_1`:

        1. [Export the metadata](../operations/metastore/export-and-import.md#export) from the Apache Hive™ Metastore cluster to the output bucket.
        1. [Download the metadata file](../../storage/operations/objects/download.md) and make sure it mentions `database_1`.

{% endlist %}

## Delete the resources you created {#clear-out}

Some resources are not free of charge. Delete the resources you no longer need to avoid paying for them:

{% list tabs group=instructions %}

- Management console {#console}

    1. [Apache Hive™ Metastore cluster](../operations/metastore/cluster-delete.md).
    1. [Yandex Managed Service for Apache Spark™ cluster](../../managed-spark/operations/cluster-delete.md).
    1. [Object Storage buckets](../../storage/operations/buckets/delete.md). Before deleting your buckets, make sure to [have deleted](../../storage/operations/objects/delete.md) all objects from those buckets.

{% endlist %}