Redshift Data API

In this lab you shall learn how to query data via the Redshift Data API. The Amazon Redshift Data API simplifies data access, ingest, and egress from programming languages and platforms supported by the AWS SDK such as Python, Go, Java, Node. js, PHP, Ruby, and C++. We shall be creating Python function in AWS Lambda to interact via Redshift Data API.

The Data API doesn’t require a persistent connection to the cluster. Instead, it provides a secure HTTP endpoint and integration with AWS SDKs. You can use the endpoint to run SQL statements without managing connections. Calls to the Data API are asynchronous.

This is a common scenario where customers want to execute queries against a Redshift cluster programmatically without doing Data Sharing or developing Rest APIs.

Contents

Before You Begin

This lab assumes you have launched a Redshift cluster, and have loaded it with TPC benchmark data. If you have not launched a cluster, see LAB 1 - Creating Redshift Clusters. If you have not yet loaded it, see 2. Data Loading

For this lab, you will need to gather the following information about your cluster from LAB 1 - Creating Redshift Clusters.

  • [Your-AWS_Account]
  • [Your-Redshift_Cluster_Identifier]
  • [Your-Redshift_Cluster_Database]
  • [Your-Redshift_Cluster_User]

Prepare IAM permissions

We need to provide permissions to Lambda to access your Redshift cluster. In the top search bar enter IAM and from results right click and open in new tab

From the left menu choose on Policy and click on Create policy blue button

Copy below JSON document to clipboard. These are the necessary privileges to get the redshift cluster credentials, execute the redshift data API.

{
"Version": "2012-10-17",
"Statement": [
  {
    "Effect": "Allow",
    "Action": ["redshift:GetClusterCredentials"],
    "Resource": [
      "arn:aws:redshift:*:[AWS_Account]:dbname:[Redshift_Cluster_Identifier]/[Redshift_Cluster_Database]",
      "arn:aws:redshift:*:[AWS_Account]:dbuser:[Redshift_Cluster_Identifier]/[Redshift_Cluster_User]"
    ]
  },
  {
    "Effect": "Allow",
    "Action": "redshift-data:*",
    "Resource": "*"
  }
]
}

Select the JSON tab and replace document below in there. Replace the AWS_Account, the Redshift_Cluster_Identifier, the Redshift_Cluster_Database and the Redshift_Cluster_User with your actual values

Click next until you reach the final create policy page. Name your policy RedshiftDataAPIPolicy.

Now lets create a new IAM Role Redshift-data-api-role and attach this policy we just created.

Create AWS Lambda function

On the AWS console search for Lambda and open in new tab. Click Create function from right side

Lets name the function as redshift-data-api-demo and from the Runtime drop down select Python 3.8 or higher whichever latest version shows, toggle the execution role and choose existing role radio button and pick Redshift-data-api-role from the dropdown. Click Create function buttom on bottom

Under the Code cource section , you Double click on the file lambda_function.py which loads the functions code into the editor on the right side panel

Here you copy-paste the following code to replace the existing code entirely. Click Deploy to save and deploy your functions code

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import os
import boto3
import json
import datetime

# initialize redshift-data client in boto3
redshift_client = boto3.client("redshift-data")

def call_data_api(redshift_client, redshift_database, redshift_user, redshift_cluster_id, sql_statement, with_event=True):
    # execute the input SQL statement
    api_response = redshift_client.execute_statement(Database=redshift_database, DbUser=redshift_user
                                                    ,Sql=sql_statement, ClusterIdentifier=redshift_cluster_id, WithEvent=True)

    # return the query_id
    query_id = api_response["Id"]
    return query_id

def check_data_api_status(redshift_client, query_id):
    desc = redshift_client.describe_statement(Id=query_id)
    status = desc["Status"]

    if status == "FAILED":
        raise Exception('SQL query failed:' + query_id + ": " + desc["Error"])
    return status.strip('"')

def get_api_results(redshift_client, query_id):
    response = redshift_client.get_statement_result(Id=query_id)
    return response

def lambda_handler(event, context):
    redshift_cluster_id = os.environ['redshift_cluster_id']
    redshift_database = os.environ['redshift_database']
    redshift_user = os.environ['redshift_user']

    action = event['queryStringParameters'].get('action')
    try:
        if action == "execute_report":
            country = event['queryStringParameters'].get('country_name')
            # sql report query to be submitted
            sql_statement = "select c.c_mktsegment as customer_segment,sum(o.o_totalprice) as total_order_price,extract(year from o.o_orderdate) as order_year,extract(month from o.o_orderdate) as order_month,r.r_name as region,n.n_name as country,o.o_orderpriority as order_priority from public.orders o inner join public.customer c on o.o_custkey = c.c_custkey inner join public.nation n on c.c_nationkey = n.n_nationkey inner join public.region r on n.n_regionkey = r.r_regionkey where n.n_name = '"+ country +"' group by 1,3,4,5,6,7 order by 2 desc limit 10"
            api_response = call_data_api(redshift_client, redshift_database, redshift_user, redshift_cluster_id, sql_statement)
            return_status = 200
            return_body = json.dumps(api_response)

        elif action == "check_report_status":            
            # query_id to input for action check_report_status
            query_id = event['queryStringParameters'].get('query_id')            
            # check status of a previously executed query
            api_response = check_data_api_status(redshift_client, query_id)
            return_status = 200
            return_body = json.dumps(api_response)

        elif action == "get_report_results":
            # query_id to input for action get_report_results
            query_id = event['queryStringParameters'].get('query_id')
            # get results of a previously executed query
            api_response = get_api_results(redshift_client, query_id)
            return_status = 200
            return_body = json.dumps(api_response)

            # total number of rows
            nrows=api_response["TotalNumRows"]
            # number of columns
            ncols=len(api_response["ColumnMetadata"])
            print("Number of rows: %d , columns: %d" % (nrows, ncols) )

            for record in api_response["Records"]:
                print (record)
        else:
            return_status = 500
            return_body = "Invalid Action: " + action
        return_headers = {
                        "Access-Control-Allow-Headers" : "Content-Type",
                        "Access-Control-Allow-Origin": "*",
                        "Access-Control-Allow-Methods": "GET"}
        return {'statusCode' : return_status,'headers':return_headers,'body' : return_body}
    except NameError as error:
        raise NameError(error)
    except Exception as exception:
        error_message = "Encountered exeption on:" + action + ":" + str(exception)
        raise Exception(error_message)

We have three different actions this Lambda function supports

  1. Execute report : This function calls the Redshift Data API with the provided parameters for redshift client, Database, User, Cluster Identifier and the query statement. The python function call_data_api implements this functionality.
  2. Check report status : This functions uses the Redshift Data API to check for previously submitted query statement. Note that along with redshift client the Query Id is required to be passed. The python function check_data_api_status implements this functionality.
  3. Get report results : This functions uses the Redshift Data API to retrieve results for a previously submitted query statement. Note that along with redshift client the Query Id is required to be passed. The python function get_api_results implements this functionality.

Select the Configuration tab and create the environment varibles based on your Redshift cluster

Click Test and select Configure test event. Copy the below JSON input for testing the function. Save event name as EventExecuteReport using below input

{
  "queryStringParameters": {
    "action": "execute_report",
    "country_name": "UNITED STATES"
  }
}

Lets create the first test events to fire the report query.

Note that the Lambda function has submitted the query and completed its execution. Redshift Data API executes statements asynchronously so we must circle back for status and results.

Note the query_id returned by the cluster, which we shall use as input to subsequents, EventCheckReport and EventReportResults events we create as below.

Prepare the next test event EventCheckReport

Prepare the next test event EventReportResults using the EventCheckReport as template and replace the action with get_report_results

Integrate with API Gateway

Now that we have successfully tested the functions, let’s add this lambda function to an API Gateway so we can query the report over the web browser.

On the AWS console search bar type “API” and open API Gateway in a new tab. At the AWS Gateway console choose the API type as REST API and click Build

Now we define the resources for our API.

Let’s create a GET method on our resource and associate our Lambda function to it.

And finally we deploy our API to gateway and stage it for testing. Stages are used to isolate testing vs production versions of API’s.

Test API in browser

Let’s now test the API over the gateway via new browser tab or window. But first we need to create a basic HTML page with code to call our API. Copy - paste below into a text editor and save the file as test.html on your desktop.

<html>
<head><meta http-equiv="Access-Control-Allow-Origin" content="*"></head>
<script>
var endpoint;
var counter;

function updateStatus(endpoint, querystring, callback) {
  var xhttp = new XMLHttpRequest();
  xhttp.onreadystatechange = function() {
    if (this.readyState == 4 && this.status == 200) {
     callback(this.responseText);
    }
  };
  console.log(endpoint+querystring);
  xhttp.open("GET", endpoint+"/"+querystring, true);
  xhttp.send();
}

function submitQuery() {
  endpoint = document.getElementById('endpoint').value;
  var country = document.getElementById('country').value;
  querystring = "?action=execute_report&country_name="+country;

  updateStatus(endpoint, querystring, function(status){
    query_id = status.split('"').join('');
    document.getElementById("status").innerHTML = query_id;
    querystring = "?action=check_report_status&query_id="+query_id;
    counter = 1;
    checkStatus(endpoint, querystring, function(){
      querystring = "?action=get_report_results&query_id="+query_id;
      updateStatus(endpoint, querystring, function(status){
        var jsonString = "<pre>" + JSON.stringify(JSON.parse(status),null,2) + "</pre>";
        document.getElementById("status").innerHTML = jsonString;
      });
    });
  });
}

function checkStatus(endpoint, querystring, callback) {
  updateStatus(endpoint, querystring, function(status){
    if (status == "\"FINISHED\"")
      callback();
    else {
      document.getElementById("status").innerHTML = counter + ": " + status;
      setTimeout('', 1000);
      counter++;
      checkStatus(endpoint, querystring, callback)
    }
  });
}

</script>
<label for=endpoint>Endpoint:</label><input id=endpoint type=text style="width:100%"><br>
<label for=country>Country:</label><input id=country type=text style="width:100%">
<button type="button" onclick=submitQuery()>Submit</button>
<div id=status>
</div>

Now open this test.html in browser. First, enter the Invoke URL from the API Gateway in the Endpoint field. Second, enter a country name , in uppercase example UNITED STATES or CANADA or INDIA and click Submit.

Note the asynchronous nature of the Data API

  1. You see the request has been submitted to cluster and the query_id is returned
  2. Note the status shows as PICKED, thats when Redshift has picked the report for execution
  3. Then status changes to STARTED when Redshift is executing your query
  4. Finally status changes to FINISHED and the results are displayed
  5. The ColumnMetadata section describes the columns being returned
  6. the Records section is the actual data elements from query results

This completes the lab to demonstrate asynchronous query execution in Redshift via the Data API.

Before You Leave

If you are done using your cluster, please think about decommissioning it to avoid having to pay for unused resources.

For further details, refer to https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html

Feedback/Issue

We would love to hear from you

Please let us know if there's a way we can make this lab better, if there is a lab you would like added, or if you want help getting started with your Redshift deployment.

Submit