Connect Azure Databricks to CosmosDB Gremlin API and run queries

Ilse Epskamp
Azure Tutorials
Published in
3 min readApr 4, 2022

--

Photo by Markus Spiske on Unsplash

CosmosDB with Gremlin API is a graph database on Azure. You can interact with the database directly in the portal, however when ingestion data as part of an automated flow this is not desirable. In that scenario you want to be able to create and run queries dynamically based on variable input data. One approach is to use Databricks to ingest data in CosmosDB. In this blog we describe how to connect Databricks to CosmosDB with Gremlin API and run queries on the database. Also a scenario for usage in an automated end to end flow is provided.

Data Explorer for Cosmos DB Gremlin API in Azure Portal.

Import the gremlin_python package

pip install gremlinpythonfrom gremlin_python import statics
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.process.graph_traversal import __
from gremlin_python.process.strategies import *
from gremlin_python.structure.graph import Graph
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
from gremlin_python.process.traversal import T
from gremlin_python.process.traversal import Order
from gremlin_python.process.traversal import Cardinality
from gremlin_python.process.traversal import Column
from gremlin_python.process.traversal import Direction
from gremlin_python.process.traversal import Operator
from gremlin_python.process.traversal import P
from gremlin_python.process.traversal import Pop
from gremlin_python.process.traversal import Scope
from gremlin_python.process.traversal import Barrier
from gremlin_python.process.traversal import Bindings
from gremlin_python.process.traversal import WithOptions
from gremlin_python.driver import client, serializer, protocol
statics.load_statics(globals())

Setup your credentials

ENDPOINT = {ENDPOINT}
DATABASE = {DATABASENAME}
COLLECTION = {COLLECTIONNAME}
PRIMARY_KEY = {PRIMARY MASTER KEY}

Open an connection from Databricks to CosmosDB

# Initalize Cosmos Database Connection
def connect_cosmosdb(ENDPOINT,DATABASE,COLLECTION,PRIMARY_KEY):
gremlin_client = client.Client(
'wss://' + ENDPOINT + ':443/', 'g',
username="/dbs/" + DATABASE + "/colls/" + COLLECTION,
password=PRIMARY_KEY,
message_serializer=serializer.GraphSONSerializersV2d0()
)

return gremlin_client
gremlin_client=connect_cosmosdb(ENDPOINT,DATABASE,COLLECTION,PRIMARY_KEY)

Create a function to run any query

def run_gremlin_query(query):
run = gremlin_client.submit(query).all()
result=run.result()
return result

Execute a query and handle the (JSON) result

For example:

Get details for a specific vertex ID

  • pass vertex-id
query="g.V('{vertex-id}')"result=run_gremlin_query(query)

Get vertices of a given label and with specific properties

  • pass vertex label
  • pass filter properties
query="g.V().hasLabel('{vertex-label'})
.has('{propertyname}','{property-value}') # ==
.has('{propertyname}',gt('{value}')) # >
.has('{propertyname}',lt('{value}')) # <
.has('{propertyname}',containing('{value}'))" # contains
result=run_gremlin_query(query)

Create a new vertex

  • add vertex-label
  • add name of the partition to which the vertex must be assigned to
  • add properties, either in or outside a list
query="g.addV('{vertex-label}')
.property('{partition-name}','{partition-value}')
.property('{property-name}','{property-value}')
.property(list, '{property-name}','{property-value}')"
result=run_gremlin_query(query)

Scenario for an automated end to end flow

  1. New data is pushed or pulled to Storage account.
  2. Data Factory Blob Trigger is activated and passes the triggerFileName and triggerFolderPath to Databricks.
  3. Read data from Storage account into spark dataframe. Example file:
primary_key || label   || name
1 || persons || Ilse
2 || persons || Sagar
filepath={path}dataframe= spark.read.option("header","true")
.option("multiline","true")
.option("sep","||").csv(filepath)

4. Collect rows of dataframe so you can iterate rows:

rows=dataframe.collect()

5. Create vertex for every record in the partition of the vertex label

for row in rows:
partitionname="PARTITION"
partitionvalue=row["label"]
query="g.addV('{}')
.property('PARTITION','{}'),
.property('primary_key','{}'),
.property('name','{}')"
.format(
row["label"],
partitionvalue,
row["primary_key"],
row["name"]
)

run_gremlin_query(query)

Let’s make it more dynamic by parameterizing the keys as well, so your script is not customized for a source but generic for any source. Example:

for row in rows:
partitionname="PARTITION"
partitionvalue=row["label"]
rowdict=row.asDict()
keys=list(rowdict.keys())
query="g.addV('{}')
.property('PARTITION','{}')".format(row["label"],partitionvalue)
for key in keys:
value=row[key]
query+=".property('{}','{}')".format(key,value)
run_gremlin_query(query)

Parallel vs. sequential ingestion

To increase ingestion speed, you can break up your dataframe in several pieces and start N parallel ingestion instead of row-by-row ingestion. This will require more Request Units for your database which is more costly, so deciding on an ingestion strategy is finding a balance between cost and performance.

Azure Tutorials frequently publishes tutorials, best practices, insights or updates about Azure Services, to contribute to the Azure Community. Azure Tutorials is driven by two enthusiastic Azure Cloud Engineers, combining over 15 years of IT experience in several domains. Stay tuned for weekly blog updates and follow us if you are interested!
https://www.linkedin.com/company/azure-tutorials

--

--

Ilse Epskamp
Azure Tutorials

Azure Certified IT Engineer with 9+ years of experience in the banking industry. Focus areas: Azure, Data Engineering, DevOps, CI/CD, Automation, Python