在《 Spark Connector Reader 原理与实践》中我们提过 Spark Connector 是一个 Spark 的数据连接器,可以通过该连接器进行外部数据系统的读写操作,Spark Connector 包含两部分,分别是 Reader 和 Writer,而本文主要讲述如何利用 Spark Connector 进行 Nebula Graph 数据的写入。
Spark SQL 允许用户自定义数据源,支持对外部数据源进行扩展。
Nebula 的 Spark Connector 单条数据写入是基于 DatasourceV2 实现的,需要以下几个步骤:
WriteSupport
并重写 createWriter
,创建自定义的 DataSourceWriter
。DataSourceWriter
创建 NebulaDataSourceVertexWriter
类和 NebulaDataSourceEdgeWriter
类,重写 createWriterFactory
方法并返回自定义的 DataWriterFactory
,重写 commit
方法,用来提交整个事务。重写 abort
方法,用来做事务回滚。Nebula Graph 1.x 不支持事务操作,故该实现中 commit
和 abort
无实质性操作。DataWriterFactory
创建 NebulaVertexWriterFactory
类和 NebulaEdgeWriterFactory
类,重写 createWriter
方法返回自定义的 DataWriter
。DataWriter
创建 NebulaVertexWriter
类和 NebulaEdgeWriter
类,重写 write
方法,用来将数据写出,重写 commit
方法用来提交事务,重写 abort
方法用来做事务回滚 ,同样 DataWriter
中的 commit
方法和 abort
方法无实质性操作。Nebula 的 Spark Connector Writer 的实现类图如下:
具体写入逻辑在 NebulaVertexWriter
和 NebulaEdgeWriter
的 write
方法中,一次写入的逻辑如下:
Nebula 的 Spark Connector 的批量数据写入与 Exchange 工具类似,是通过对 DataFrame 进行 map
操作批量数据累计提交实现的。
Spark Connector 的 Writer 功能提供了两类接口供用户编程进行数据写入。写入的数据源为 DataFrame,Spark Writer 提供了单条写入和批量写入两类接口。
拉取 GitHub 上 Spark Connector 代码:
git clone -b v1.0 https://github.com/vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true
将编译打成的包 copy 到本地 maven 库。
应用示例如下:
nebula-spark
依赖<dependency>
<groupId>com.vesoft</groupId>
<artifactId>nebula-spark</artifactId>
<version>1.0.1</version>
</dependency>
// 构造点和边数据的 DataFrame,示例数据在 nebula-java/examples/src/main/resources 目录下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
edgeDF.show()
// 写入点
vertexDF.write
.nebula("127.0.0.1:3699", "nb", "100")
.writeVertices("player", "vertexId", "hash")
// 写入边
edgeDF.write
.nebula("127.0.0.1:3699", "nb", "100")
.wirteEdges("follow", "source", "target")
配置说明:
// 构造点和边数据的 DataFrame,示例数据在 nebula-java/examples/src/main/resources 目录下
val vertexDF = spark.read.json("examples/src/main/resources/vertex")
vertexDF.show()
val edgeDF = spark.read.json("examples/src/main/resources/edge")
edgeDF.show()
// 批量写入点
new NebulaBatchWriterUtils()
.batchInsert("127.0.0.1:3699", "nb", 2000)
.batchToNebulaVertex(vertexDF, "player", "vertexId")
// 批量写入边
new NebulaBatchWriterUtils()
.batchInsert("127.0.0.1:3699", "nb", 2000)
.batchToNebulaEdge(edgeDF, "follow", "source", "target")
配置说明:
至此,Nebula Spark Connector Writer 讲解完毕,欢迎前往 GitHub:https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/nebula-spark 试用。
喜欢这篇文章?来来来,给我们的 GitHub 点个 star 表鼓励啦~~ 🙇♂️🙇♀️ [手动跪谢]
交流图数据库技术?交个朋友,Nebula Graph 官方小助手微信:NebulaGraphbot 拉你进交流群~~