cloudarchitected

Exporting Databricks cluster events to Log Analytics

For running analytics and alerts off Azure Databricks events, best practice is to process cluster logs using cluster log delivery and set up the Spark monitoring library to ingest events into Azure Log Analytics.

However, in some cases it might be sufficient to set up a lightweight event ingestion pipeline that pushes events from the Databricks Cluster Events API into Log Analytics. The PowerShell script below demonstrates this, and can be run automatically within an Azure Functions, Azure Automation, or Azure DevOps (make sure to move the secrets out of the script and manage them securely).

The script also demonstrates how to use an Azure Storage Table as a particularly cost-effective Idempotent Receiver repository. We don’t want to assume that events are surfaced at the API at the exact time they are generated, and we don’t want to miss any events, so we process overlapping time windows. Log Analytics doesn’t do message deduplication. So we add this integration pattern to ensure unique events are stored.

# Requires the AzTable module.
############################
# Databricks configuration
#
# Replace with your Databricks workspace location
$DATABRICKS_HOST = "https://northeurope.azuredatabricks.net"
# Replace with your Databricks Personal Access Token
$DATABRICKS_TOKEN = "dapi00000000000000000000000000000000"
# Max latency expected for events to be returned in API results
$latencyMs = 10 * 60 * 1000
############################

############################
# Storage table configuration (for storing previously processed state)
#
# Replace with your storage connection string
$storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=aaaaaaaaaaaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa==;EndpointSuffix=core.windows.net"
# Replace with your storage table name
$watermarkTableName = "watermark"
# Replace with your storage table name
$idrepTableName = "idempotentRepository"
############################


############################
# Log Analytics configuration
#
# Replace with your Log Analytics Workspace ID
$customerId = "00000000-0000-0000-0000-000000000000"
# Replace with your Log Analytics Primary Key
$sharedKey = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa=="
# Specify the name of the record type that you'll be creating
$logType = "ClusterEvent"
# You can use an optional field to specify the timestamp from the data.
# If the time field is not specified, Azure Monitor assumes the time is the message ingestion time
$timeStampField = "timestamp"
############################



$ErrorActionPreference = "Stop"
[Net.ServicePointManager]::SecurityProtocol = 'tls12'

# Create the function to create the authorization signature
Function Build-Signature ($customerId, $sharedKey, $date, $contentLength, $method,
   $contentType, $resource)
{
    $xHeaders = "x-ms-date:" + $date
    $stringToHash = $method + "`n" + $contentLength + "`n" + $contentType `
        + "`n" + $xHeaders + "`n" + $resource

    $bytesToHash = [Text.Encoding]::UTF8.GetBytes($stringToHash)
    $keyBytes = [Convert]::FromBase64String($sharedKey)

    $sha256 = New-Object System.Security.Cryptography.HMACSHA256
    $sha256.Key = $keyBytes
    $calculatedHash = $sha256.ComputeHash($bytesToHash)
    $encodedHash = [Convert]::ToBase64String($calculatedHash)
    $authorization = 'SharedKey {0}:{1}' -f $customerId,$encodedHash
    return $authorization
}


# Create the function to create and post the request
Function Post-LogAnalyticsData($customerId, $sharedKey, $body, $logType)
{
    $method = "POST"
    $contentType = "application/json"
    $resource = "/api/logs"
    $rfc1123date = [DateTime]::UtcNow.ToString("r")
    $contentLength = $body.Length
    $signature = Build-Signature `
        -customerId $customerId `
        -sharedKey $sharedKey `
        -date $rfc1123date `
        -contentLength $contentLength `
        -method $method `
        -contentType $contentType `
        -resource $resource
    $uri = "https://" + $customerId + ".ods.opinsights.azure.com" `
        + $resource + "?api-version=2016-04-01"


    $headers = @{
        "Authorization" = $signature;
        "Log-Type" = $logType;
        "x-ms-date" = $rfc1123date;
        "time-generated-field" = $TimeStampField;
    }

    $response = Invoke-WebRequest `
        -Uri $uri `
        -Method $method `
        -ContentType $contentType `
        -Headers $headers `
        -Body $body `
        -UseBasicParsing
    return $response.StatusCode

}


$password = $DATABRICKS_TOKEN | ConvertTo-SecureString -asPlainText -Force
$credential = New-Object System.Management.Automation.PSCredential("token", $password)

$ctx = New-AzStorageContext -ConnectionString $storageConnectionString

# A partition key is required when writing to storage table
$partitionKey = "partitionKey"

$watermark_table = (Get-AzStorageTable –Name $watermarkTableName -Context $ctx).CloudTable
$idrep_table = (Get-AzStorageTable –Name $idrepTableName -Context $ctx).CloudTable

$watermark = Get-AzTableRow `
  -Table $watermark_table `
  -PartitionKey $partitionKey `
  -RowKey "watermark"

if (!$watermark) {

    Add-AzTableRow `
        -Table $watermark_table `
        -PartitionKey $partitionKey `
        -RowKey "watermark" `
        -property @{"start_time"=0}

    $watermark = Get-AzTableRow `
        -Table $watermark_table `
        -PartitionKey $partitionKey `
        -RowKey "watermark"

}

Write-Host "Starting at time watermark $($watermark.start_time)"


$end_time=[int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalMilliseconds

$clusters = Invoke-RestMethod `
    -Uri $DATABRICKS_HOST/api/2.0/clusters/list `
    -Method GET `
    -Headers @{Authorization = "Bearer $DATABRICKS_TOKEN"}

$clusters.clusters.ForEach({
    Write-Host $_.cluster_id

    $next_page = @{
        "cluster_id"=$_.cluster_id
        "order"="ASC"
        "start_time"=($watermark.start_time - $latencyMs)
        "end_time"=$end_time
        "limit"=100
    }

    while ($next_page) {
        $query = ConvertTo-Json $next_page -Depth 100

        $ret = Invoke-RestMethod `
            -Uri $DATABRICKS_HOST/api/2.0/clusters/events `
            -Method POST `
            -Body $query -Headers @{Authorization = "Bearer $DATABRICKS_TOKEN"}
        $next_page = $ret.next_page

        Write-Host "Got $($ret.events.Count) events for cluster '$($_.cluster_id)'"

        $ret.events.ForEach({
            $eventId=$_.cluster_id + "/" + $_.timestamp + "/" + $_.type
            $rowKey=[uri]::EscapeDataString($eventId)
            $seenRow=Get-AzTableRow `
                -Table $idrep_table `
                -PartitionKey $partitionKey `
                -RowKey $rowKey
            if ($seenRow) {
                Write-Host "Ignoring already seen event"
                return
            }

            # Convert event timestamp to ISO 8601 (required by Log Analytics)
            $epoch = New-Object -Type DateTime -ArgumentList 1970, 1, 1, 0, 0, 0, 0
            $eventTime = $epoch.AddMilliseconds($_.timestamp)
            $_.timestamp = Get-Date -Date $eventTime -Format "yyyy-MM-ddThh:mm:ssZ"

            $json=ConvertTo-Json $_ -Depth 100

            Post-LogAnalyticsData `
                -customerId $customerId `
                -sharedKey $sharedKey `
                -body ([System.Text.Encoding]::UTF8.GetBytes($json)) `
                -logType $logType `
                -TimeStampField $timeStampField
            Add-AzTableRow `
                -Table $idrep_table `
                -PartitionKey $partitionKey `
                -RowKey $rowKey

        })
    }

})

Write-Host "Updating time watermark to $end_time"
$watermark.start_time = $end_time
Update-AzTableRow -Table $watermark_table -entity $watermark

Software Engineer at Microsoft, Data & AI, open source fan