Tutorial: Managing queries

This self-service tutorial guides you through a more advanced task: Monitoring and managing of longer running queries.

Difficulty: Advanced
Time needed: 4 hours

Introduction

After setting up an ODAS cluster and configuring it according to the Cluster Sizing guidelines, a common challenge around operating such ODAS clusters is the question of how to manage long running queries. This short tutorial introduces you to the necessary resources provided by the ODAS services and then presents a specific approach on how to use them for the aforementioned purpose.

Useful Cluster Information

Please refer to the Architecture Overview for details on the services provided by an ODAS cluster. For this tutorial, we will focus on two of them, namely the Planner and Worker. Both of those expose an internal, web-based management service, commonly available on port 11050 for the Planner, and 11051 for the Worker(s).

Note

These ports are usually not exposed as public endpoints for services. You will need to access them locally or enable access as needed.

The Planner has a tab on that page named /membership, which provides the list of all currently known Worker nodes (among other things):

Planner UI - Known Servers

For the Worker(s), the web pages look very similar to the Planner's, but has slight differences in the number of available tabs, as well as what is displayed inside of them. Most notably, the /queries tab for the Worker(s) is showing the running queries:

Worker UI - Queries

Of further note is that the Planner, while also having a /queries tab, will not show running queries, since the purpose of the Planner service is to plan queries, but not execute them. In particular, the task of planning includes a step where the query is parallelized into many tasks, which are similar to the original query, but operate on a subset of the data. These tasks are then executed across the cluster by the available Worker(s), and are what is listed on the /queries tab. The example screenshot above shows a list of queries in flight, which are really separate tasks of the same original query. They are already in the state FINISHED, waiting for the client to collect the results.

Monitoring and managing queries is then a three step process:

  1. Determine the Worker nodes - Which is needed to be able to communicate with each to manage the running tasks.

  2. Get the list of in-flight queries for each Worker

  3. Terminate the queries (aka tasks) that exceed a certain threshold

Management APIs

For step #3, you can use the Worker(s) /cancel_query?query_id=${qid} endpoint and query ID parameter, instructing the server to terminate a particular task.

The JSON returned by the /queries?json endpoint (which is the same as used earlier in the browser screenshot, but with the added ?json parameter) contains the necessary details, including the query_id and duration:

...
"in_flight_queries": [
  {
    "application": "okera-presto (1.5.1)",
    "effective_user": "okera",
    "default_db": "default",
    "stmt": "SELECT `trip_distance` FROM `analyticsdb`.`nytaxi_yellow_raw` WHERE ((`trip_distance` < '') OR (`trip_distance` > ''))",
    "stmt_type": "QUERY",
    "start_time": "2019-11-14 15:52:40.201877000",
    "end_time": "00:00:00",
    "duration": "2m7s",
    "progress": "789800373 / 1328614131 (59.4454%)",
    "state": "FINISHED",
    "rows_fetched": 4644157,
    "query_id": "5b4d903384e59692:2a8657b90c942891",
    "last_event": "Rows available"
  },
  ...

Using the query ID and the cancel query endpoint allows invoking the cancellation request. Here is an example using the endpoint directly from a browser, which shows the confirmation that is returned when the cancellation was successful:

Cancelling Queries

A client would receive an error indicating that the query cannot fetch the rows that were queued up. For example, using the PrestoDB JDBC driver to query data provided by an ODAS cluster, the error stack for a cancelled query would look similar to this:

org.jkiss.dbeaver.model.exec.DBCException: SQL Error [65536]: Query failed (#20191114_155114_00028_95hx8): Unable to fetch records.
TRecordServiceException(code:INVALID_HANDLE, message:Invalid handle)
...
Caused by: com.facebook.presto.spi.PrestoException: Unable to fetch records.
TRecordServiceException(code:INVALID_HANDLE, message:Invalid handle)
    at com.okera.recordservice.presto.RecordServiceRecordSet$RecordServiceCursor.advanceNextPosition(RecordServiceRecordSet.java:171)
    at com.facebook.presto.$gen.CursorProcessor_20191114_150803_31.process(Unknown Source)
    at com.facebook.presto.operator.ScanFilterAndProjectOperator.processColumnSource(ScanFilterAndProjectOperator.java:237)
    at com.facebook.presto.operator.ScanFilterAndProjectOperator.getOutput(ScanFilterAndProjectOperator.java:229)
    at com.facebook.presto.operator.Driver.processInternal(Driver.java:379)
    ...

Automation

The cancellation process can be automated in many ways, for example, using BASH scripting. The following shows a script that implements the above process and could be run through a CRON job on a management machine.

These requirements are needed for the script to work:

  • cURL needs to be installed.

  • JQ, which is used to process JSON data, needs to be installed.

  • Access to the Worker endpoints must be set up properly. This includes bypassing any optional load-balancer in front of the service.

#!/bin/bash
# Usage : ./query-terminator <planner-ip or planner-hostname>

echo "Terminating queries running for more than 1 hour"
planner=${1}

# Get list of currently active workers
curl -s http://${planner}:11050/membership?json | jq -r '.workers' | \
  tail -n +2 | head -n -1 | sed 's/\"//g' | sed 's/\:13050,//g' | \
  sed 's/\:13050//g' > workers.list

# Iterate over current worker list
while read worker; do
  echo "----------------------------------"
  echo "Worker:"${worker}
  echo "----------------------------------"

  # Get list of in-flight queries for current worker
  curl -s http://${worker}:11051/queries?json | \
    jq -r '.in_flight_queries[]' > worker.queries

  # Iterate over the in-flight queries
  jq -c '.' worker.queries | while read qrow; do
    # Analyze query info to get elapsed time
    qduration=$(echo $qrow | jq -r '.duration')
    # Check the query time against the maximum allowed one
    if [[ $(echo ${qduration} | grep -i -E '^[1-9].h') ]]; then
      # Get the query ID
      qid=$(echo $qrow |  jq -r '.query_id')
      # Kill the query using the ID
      curl -s http://${worker}:11051/cancel_query?query_id=${qid}
      # Print terminated query statement query
      echo "Terminated query: ${qid}"
      echo "$qduration"
      echo $qrow | jq -r '.stmt'
      echo "-----------------"
    fi
  done
done < workers.list

# Remove temporary files
rm -f workers.list worker.queries

Some notes concerning the script:

  • The script ignores the confirmation returned by the endpoint as it is not needed.

  • All the ports are hard-coded (like the Worker RPC port), but ideally are configurable.

  • The duration is determined by comparing the time as a string, which is not as flexible as actual date/time handling.

  • Cancelled queries are logged for later analysis by an administrator.

Server-side Settings

Besides the management API, there are other parameters an administrator could use to influence long running queries and/or slow clients. These are server-side settings that need to be added to the configuration to the cluster itself, specified in the configuration YAML file using the RS_ARGS key.

  • --idle_query_timeout=0

    The time, in seconds, that a query may be idle for (that is, no processing work is done and no updates are received from the client) before it is cancelled. If 0, idle queries are never expired. The query option QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents the maximum allowable timeout.

  • --idle_session_timeout=120

    The time, in seconds, that a session may be idle for before it is closed (and all running queries cancelled). If 0, idle sessions are never expired.

For example, in the cluster configuration YAML file, add (or amend) this line:

  RS_ARGS: "--idle_session_timeout=120 --idle_query_timeout=120"

Note

These settings are cluster-wide and may have an adverse effect on other query behavior. Okera recommends to be very careful changing those settings without proper testing and/or professional advice. Also inform Okera Support of your changes to avoid any delays in later troubleshooting sessions.

Summary

With the APIs provided by the ODAS processes you can check the currently running queries and implement a safeguard against runaway tasks. The shown BASH script should just serve as a starting point, as more checks could be added to make it more robust.