Running Azure Databricks notebooks in parallel

You can run multiple Azure Databricks notebooks in parallel by using the dbutils library. Here is a snippet based on the sample code from the Azure Databricks documentation on running notebooks concurrently and on Notebook workflows as well as code from code by my colleague Abhishek Mehra, with additional parameterization, retry logic and error handling.

Note that all child notebooks will share resources on the cluster, which can cause bottlenecks and failures in case of resource contention. In that case, it might be better to run parallel jobs each on its own dedicated clusters using the Jobs API. You could use Azure Data Factory pipelines, which support parallel activities to easily schedule and orchestrate such as graph of notebooks.

Scala version:

 %scala
import scala.concurrent.{Future, Await}
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.backend.common.rpc.CommandContext
case class NotebookData(path: String, timeout: Int, parameters: Map[String, String] = Map.empty, retry:Int=0)
def runNotebook(notebook: NotebookData, ctx: Object)(implicit ec: ExecutionContext): Future[Try[String]] = Future {
val setContext = dbutils.notebook.getClass.getMethods.find(_.getName == "setContext").get
setContext.invoke(dbutils.notebook, ctx)
println(s"Running notebook ${notebook.path}")
if (notebook.parameters.nonEmpty)
Success(dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters))
else
Success(dbutils.notebook.run(notebook.path, notebook.timeout))
} recoverWith { case e =>
if (notebook.retry < 1)
return Future(Failure(e))
println(s"Retrying notebook ${notebook.path}")
runNotebook(NotebookData(notebook.path, notebook.timeout, notebook.parameters, notebook.retry - 1), ctx)
}
def parallelNotebooks(notebooks: Seq[NotebookData], numInParallel: Int=2): Future[Seq[Try[String]]] = {
// If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
// This code limits the number of parallel notebooks.
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numInParallel))
val ctx = dbutils.notebook.getContext()
Future.sequence(
notebooks.map { notebook =>
runNotebook(notebook, ctx)
}
)
}
var notebooks = Seq(
NotebookData("../path/to/Notebook1", 1200, Map("Name" -> "John")),
NotebookData("../path/to/Notebook2", 1200, retry=2)
)
val res = parallelNotebooks(notebooks, 2)
var result = Await.result(res, 60 minutes) // this is a blocking call.
// Manage the results. You can change this logic,
// for example to ignore exceptions.
val outputs = result.map(_ match {
case Success(s) => s
case Failure(e) => throw e
})

Python version

 %python
from concurrent.futures import ThreadPoolExecutor
class NotebookData:
def init(self, path, timeout, parameters=None, retry=0):
self.path = path
self.timeout = timeout
self.parameters = parameters
self.retry = retry
def submitNotebook(notebook):
print("Running notebook %s" % notebook.path)
try:
if (notebook.parameters):
return dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
else:
return dbutils.notebook.run(notebook.path, notebook.timeout)
except Exception:
if notebook.retry < 1:
raise
print("Retrying notebook %s" % notebook.path)
notebook.retry = notebook.retry - 1
submitNotebook(notebook)
def parallelNotebooks(notebooks, numInParallel):
# If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
# This code limits the number of parallel notebooks.
with ThreadPoolExecutor(max_workers=numInParallel) as ec:
return [ec.submit(submitNotebook, notebook) for notebook in notebooks]
notebooks = [
NotebookData("../path/to/Notebook1", 1200, {"Name": "John"}),
NotebookData("../path/to/Notebook2", 1200, retry=2)
]
res = parallelNotebooks(notebooks, 2)
result = [f.result(timeout=3600) for f in res] # This is a blocking call.
print(result)
Software Engineer at Microsoft, Data & AI, open source fan
#respond -->