AWS Glue

Credits: Educative.io

Let’s create: - DynamoDB table, and insert some data into it (see script below) - an IAM role (and give it AWSGlueServiceRole and AWSGluePolicy) - a Glue crawler and point it to the DynamoDB table as source to crawl - a new table that will store extracted metadata Now we can run the crawler that will create a table in the Glue Data Catalog.

Notice that we didn’t have to specify the schema of the table, Glue automatically inferred it from the data.

Next, we’ll set up a Glue ETL job that will extract data from the DynamoDB table, transform it, and load it into an S3 bucket. We create: - a S3 bucket - set up “Visual ETL” from Glue and add 3 nodes: - DynamoDB “movies” table as “Source” - SQL query as “Transform” - S3 bucket as “Target” - configure the job: - Set up an example SQL transform script (that will remove “s” from the index, and select only PG-13 movies (see below))

Finally, run the Visual ETL job, after its completion, check the S3 bucket for the results. You will see bunch of files starting with “run…” that contain the results of the ETL job (we can query them with SQL query). These files can then be used for any subsequent processing, such as ML training.

We have successfully extracted data from DynamoDB, transformed it, and loaded it into S3.

Appendix

Insert data into DynamoDB table:

import csv
import boto3

# Initialize the DynamoDB client
dynamodb = boto3.client(
  'dynamodb',
  aws_access_key_id='ZYX',
  aws_secret_access_key='XYZ',
  region_name='us-east-1'
)

# Define a function to insert a whole CSV into the table using the insert_csv method
def insert_csv():
    csv_file = "Movies.csv"

    with open(csv_file, 'r') as csvfile:
        csv_reader = csv.reader(csvfile)
        header = next(csv_reader)  # First row is the header
        for row in csv_reader:
            item = {}
            for i in range(len(header)):
                item[header[i]] = {'S': row[i]}
            
            response = dynamodb.put_item(
                TableName='Movies',
                Item=item
            )

# # Insert four items into the table
insert_csv()
print('Table populated successfully.')

SQL query:

SELECT 
    -- Extract only numbers from the string type index column and convert it to an integer and rename it as cleaned_index
    CAST(REPLACE(index, 's', '') AS INT) AS cleaned_index,
    -- Include other columns you want to select
    listed_in,
    duration,
    date_added,
    country,
    director,
    rating,
    release_year,
    title
FROM 
    myDataSource
WHERE 
    -- Choose only the movies that have PG-13 rating
    rating = 'PG-13'
ORDER BY 
    -- Sort the result set based on the values in the cleaned_index column in ascending order
    cleaned_index;