Fork me on GitHub

深入源码分析 Dgraph 图数据库- live导入

首先,我们先看一下 Dgraph live 命令。

  1. dgraph live -f <path-to-encrypted-gzipped-RDF-or-JSON-file> -s <path-to-encrypted-schema> --encryption_keyfile <path-to-keyfile-to-decrypt-files>

参数:

  1. --new_uids(默认值:false):分配新的UID,而不使用数据文件中的现有UID。这对于避免覆盖已经运行的数据库中的数据很有用。
  2. -f, --files:* .rdf(.gz)或* .json(.gz)文件的加载位置。它可以在给定路径中加载多个文件。如果路径是目录,则将加载所有以.rdf,.rdf.gz,.json和.json.gz结尾的文件。
  3. --format:指定文件格式(rdfjson),而不是从文件名获取文件格式。如果您需要手动定义严格格式,这将很有用。
  4. -b, --batch (默认值:1000):作为突变的一部分发送的N个四联体的数量。
  5. -c, --conc(默认值:10):向Dgraph发出的并发请求数。不要混淆-C
  6. -C, --use_compression (默认值:false):为与Alpha服务器之间的连接启用压缩。
  7. -a, --alpha(默认值localhost:9080:):Dgraph Alpha gRPC服务器地址以进行实时加载。这可以是以逗号分隔的同一群集中的Alpha地址列表,以分配负载 "alpha:grpc_port,alpha2:grpc_port,alpha3:grpc_port"
  8. -x, --xidmap(默认值:禁用。需要路径):将xiduid的映射存储到目录中。Dgraph将保存在加载中使用的所有标识符,以供以后在其他数据提取操作中使用。映射将保存在您提供的路径中,并且在下一次加载时必须指示相同的路径。如果您完全控制标识符(空白节点),建议使用此标志。因为标识符将被映射到特定的UID
  9. --vault_* 标志指定Vault服务器地址,角色ID,秘密ID和包含可用于解密加密导出的加密密钥的字段。

看完了使用例子,我们进入正题。

首先我们执行 dgraph live -f /var/local/dgraph-data/1million.rdf.gz -s /var/local/dgraph-data/2million.schema -a localhost:9080 --zero localhost:5080 的时候在 ./dgraph/main.go 中的 cmd.Execute() 判断命令为 live 进入到 ./dgraph/cmd/live/run.go 执行 run() 方法。在run() 方法执行之前会先执行 init() 方法,该方法主要作用接受live命令的相关参数,解析参数到 Live.Cmd 中。这里就不做多加分析。贴上代码:

  1. func init() {
  2. Live.Cmd = &cobra.Command{
  3. Use: "live",
  4. Short: "Run Dgraph live loader",
  5. Run: func(cmd *cobra.Command, args []string) {
  6. defer x.StartProfile(Live.Conf).Stop()
  7. if err := run(); err != nil {
  8. x.Check2(fmt.Fprintf(os.Stderr, "%s", err.Error()))
  9. os.Exit(1)
  10. }
  11. },
  12. }
  13. Live.EnvPrefix = "DGRAPH_LIVE"
  14. flag := Live.Cmd.Flags()
  15. flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load")
  16. flag.StringP("schema", "s", "", "Location of schema file")
  17. flag.String("format", "", "Specify file format (rdf or json) instead of getting it "+
  18. "from filename")
  19. flag.StringP("alpha", "a", "127.0.0.1:9080",
  20. "Comma-separated list of Dgraph alpha gRPC server addresses")
  21. flag.StringP("zero", "z", "127.0.0.1:5080", "Dgraph zero gRPC server address")
  22. flag.IntP("conc", "c", 10,
  23. "Number of concurrent requests to make to Dgraph")
  24. flag.IntP("batch", "b", 1000,
  25. "Number of N-Quads to send as part of a mutation.")
  26. flag.StringP("xidmap", "x", "", "Directory to store xid to uid mapping")
  27. flag.StringP("auth_token", "t", "",
  28. "The auth token passed to the server for Alter operation of the schema file")
  29. flag.BoolP("use_compression", "C", false,
  30. "Enable compression on connection to alpha server")
  31. flag.Bool("new_uids", false,
  32. "Ignore UIDs in load files and assign new ones.")
  33. flag.String("http", "localhost:6060", "Address to serve http (pprof).")
  34. flag.Bool("verbose", false, "Run the live loader in verbose mode")
  35. flag.StringP("user", "u", "", "Username if login is required.")
  36. flag.StringP("password", "p", "", "Password of the user.")
  37. flag.StringP("bufferSize", "m", "100", "Buffer for each thread")
  38. flag.Bool("ludicrous_mode", false, "Run live loader in ludicrous mode (Should only be done when alpha is under ludicrous mode)")
  39. // Encryption and Vault options
  40. enc.RegisterFlags(flag)
  41. // TLS configuration
  42. x.RegisterClientTLSFlags(flag)
  43. }

接着我们来详细看一下 run() 方法。

  1. func run() error {
  2. var err error
  3. x.PrintVersion()
  4. opt = options{
  5. dataFiles: Live.Conf.GetString("files"),
  6. dataFormat: Live.Conf.GetString("format"),
  7. schemaFile: Live.Conf.GetString("schema"),
  8. zero: Live.Conf.GetString("zero"),
  9. concurrent: Live.Conf.GetInt("conc"),
  10. batchSize: Live.Conf.GetInt("batch"),
  11. clientDir: Live.Conf.GetString("xidmap"),
  12. authToken: Live.Conf.GetString("auth_token"),
  13. useCompression: Live.Conf.GetBool("use_compression"),
  14. newUids: Live.Conf.GetBool("new_uids"),
  15. verbose: Live.Conf.GetBool("verbose"),
  16. httpAddr: Live.Conf.GetString("http"),
  17. bufferSize: Live.Conf.GetInt("bufferSize"),
  18. ludicrousMode: Live.Conf.GetBool("ludicrous_mode"),
  19. }
  20. if opt.key, err = enc.ReadKey(Live.Conf); err != nil {
  21. fmt.Printf("unable to read key %v", err)
  22. return err
  23. }
  24. go func() {
  25. if err := http.ListenAndServe(opt.httpAddr, nil); err != nil {
  26. glog.Errorf("Error while starting HTTP server: %+v", err)
  27. }
  28. }()
  29. ctx := context.Background()
  30. bmOpts := batchMutationOptions{
  31. Size: opt.batchSize,
  32. Pending: opt.concurrent,
  33. PrintCounters: true,
  34. Ctx: ctx,
  35. MaxRetries: math.MaxUint32,
  36. bufferSize: opt.bufferSize,
  37. }
  38. // 创建客户端,返回客户端列表
  39. dg, closeFunc := x.GetDgraphClient(Live.Conf, true)
  40. defer closeFunc()
  41. //连接zero服务器,并且设置req消费写入数据的操作
  42. l := setup(bmOpts, dg)
  43. defer l.zeroconn.Close()
  44. // 处理schema文件
  45. if len(opt.schemaFile) > 0 {
  46. //读取schema文件,写入schema
  47. err := processSchemaFile(ctx, opt.schemaFile, opt.key, dg)
  48. if err != nil {
  49. if err == context.Canceled {
  50. fmt.Printf("Interrupted while processing schema file %q\n", opt.schemaFile)
  51. return nil
  52. }
  53. fmt.Printf("Error while processing schema file %q: %s\n", opt.schemaFile, err)
  54. return err
  55. }
  56. fmt.Printf("Processed schema file %q\n\n", opt.schemaFile)
  57. }
  58. // 获取schema
  59. l.schema, err = getSchema(ctx, dg)
  60. if err != nil {
  61. fmt.Printf("Error while loading schema from alpha %s\n", err)
  62. return err
  63. }
  64. if opt.dataFiles == "" {
  65. return errors.New("RDF or JSON file(s) location must be specified")
  66. }
  67. // 读取rdf文件,可以是目录
  68. filesList := x.FindDataFiles(opt.dataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"})
  69. totalFiles := len(filesList)
  70. if totalFiles == 0 {
  71. return errors.Errorf("No data files found in %s", opt.dataFiles)
  72. }
  73. fmt.Printf("Found %d data file(s) to process\n", totalFiles)
  74. // x.Check(dgraphClient.NewSyncMarks(filesList))
  75. errCh := make(chan error, totalFiles)
  76. // 遍历rdf文件
  77. for _, file := range filesList {
  78. fmt.Println(file)
  79. file = strings.Trim(file, " \t")
  80. // 每个文件开启一个协程去处理文件
  81. go func(file string) {
  82. errCh <- l.processFile(ctx, file, opt.key)
  83. }(file)
  84. }
  85. fmt.Println("Load File Finsh!")
  86. // PrintCounters should be called after schema has been updated.
  87. if bmOpts.PrintCounters {
  88. go l.printCounters()
  89. }
  90. for i := 0; i < totalFiles; i++ {
  91. if err := <-errCh; err != nil {
  92. fmt.Printf("Error while processing data file %q: %s\n", filesList[i], err)
  93. return err
  94. }
  95. }
  96. close(l.reqs)
  97. // First we wait for requestsWg, when it is done we know all retry requests have been added
  98. // to retryRequestsWg. We can't have the same waitgroup as by the time we call Wait, we can't
  99. // be sure that all retry requests have been added to the waitgroup.
  100. l.requestsWg.Wait()
  101. l.retryRequestsWg.Wait()
  102. c := l.Counter()
  103. var rate uint64
  104. if c.Elapsed.Seconds() < 1 {
  105. rate = c.Nquads
  106. } else {
  107. rate = c.Nquads / uint64(c.Elapsed.Seconds())
  108. }
  109. // Lets print an empty line, otherwise Interrupted or Number of Mutations overwrites the
  110. // previous printed line.
  111. fmt.Printf("%100s\r", "")
  112. fmt.Printf("Number of TXs run : %d\n", c.TxnsDone)
  113. fmt.Printf("Number of N-Quads processed : %d\n", c.Nquads)
  114. fmt.Printf("Time spent : %v\n", c.Elapsed)
  115. fmt.Printf("N-Quads processed per second : %d\n", rate)
  116. if l.db != nil {
  117. if err := l.alloc.Flush(); err != nil {
  118. return err
  119. }
  120. if err := l.db.Close(); err != nil {
  121. return err
  122. }
  123. }
  124. return nil
  125. }

1. 获取参数

run() 中先去获取到live命令输入的参数,解析赋值给一个叫 options 的新对象。接着启动一个 HTTP server 做pprof 性能分析监听。

2. 创建Dgraph客户端

创建Dgraph客户端,客户端包含了多个alphagrpc连接客户端,数量是根据传入的alpha参数决定。在创建alpha连接客户端的时候,可以设置失败尝试连接次数,连接后把所有的alpha客户端穿给Dgraph客户端,最后返回Dgraph客户端以及回调。
源码:

  1. func GetDgraphClient(conf *viper.Viper, login bool) (*dgo.Dgraph, CloseFunc) {
  2. alphas := conf.GetString("alpha")
  3. if len(alphas) == 0 {
  4. glog.Fatalf("The --alpha option must be set in order to connect to Dgraph")
  5. }
  6. fmt.Printf("\nRunning transaction with dgraph endpoint: %v\n", alphas)
  7. tlsCfg, err := LoadClientTLSConfig(conf)
  8. Checkf(err, "While loading TLS configuration")
  9. // 获取 alphas 参数 ,拆分成数组
  10. ds := strings.Split(alphas, ",")
  11. var conns []*grpc.ClientConn
  12. var clients []api.DgraphClient
  13. // 设置失败连接次数
  14. retries := 1
  15. if conf.IsSet("retries") {
  16. retries = conf.GetInt("retries")
  17. if retries < 1 {
  18. retries = 1
  19. }
  20. }
  21. for _, d := range ds {
  22. var conn *grpc.ClientConn
  23. for i := 0; i < retries; i++ {
  24. // 连接客户端
  25. conn, err = SetupConnection(d, tlsCfg, false)
  26. if err == nil {
  27. break
  28. }
  29. fmt.Printf("While trying to setup connection: %v. Retrying...\n", err)
  30. time.Sleep(time.Second)
  31. }
  32. if conn == nil {
  33. Fatalf("Could not setup connection after %d retries", retries)
  34. }
  35. conns = append(conns, conn)
  36. // 创建客户端实例
  37. dc := api.NewDgraphClient(conn)
  38. clients = append(clients, dc)
  39. }
  40. /**
  41. *
  42. * NewDgraphClient 实例内部
  43. * func NewDgraphClient(clients ...api.DgraphClient) *Dgraph {
  44. * dg := &Dgraph{
  45. * dc: clients,
  46. * }
  47. *
  48. * return dg
  49. * }
  50. */
  51. dg := dgo.NewDgraphClient(clients...)
  52. user := conf.GetString("user")
  53. if login && len(user) > 0 {
  54. err = GetPassAndLogin(dg, &CredOpt{
  55. Conf: conf,
  56. UserID: user,
  57. PasswordOpt: "password",
  58. })
  59. Checkf(err, "While retrieving password and logging in")
  60. }
  61. closeFunc := func() {
  62. for _, c := range conns {
  63. if err := c.Close(); err != nil {
  64. glog.Warningf("Error closing connection to Dgraph client: %v", err)
  65. }
  66. }
  67. }
  68. // 返回所有连接以及关闭回调
  69. return dg, closeFunc
  70. }

3. 接着去连接zero服务器,并且启动把rdf数据写入到服务器的操作监听

首先连接zero服务器,然后创建一个创建一个loader实例,这个实例中包含了Dgraph客户端,之后写入数据的时候,也是从这里拿到客户端的。
接着启动 n 个协程来处理rdf数据,n 是可以通过conc参数设置配置的。

处理rdf数据的逻辑我们后面再说,这里只需要知道,程序执行到这里,已经起了conc这么多个协程在监听等待数据写入了。

源码:

  1. func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader {
  2. var db *badger.DB
  3. if len(opt.clientDir) > 0 {
  4. x.Check(os.MkdirAll(opt.clientDir, 0700))
  5. var err error
  6. db, err = badger.Open(badger.DefaultOptions(opt.clientDir).
  7. WithTableLoadingMode(bopt.MemoryMap).
  8. WithCompression(bopt.ZSTD).
  9. WithSyncWrites(false).
  10. WithLoadBloomsOnOpen(false).
  11. WithZSTDCompressionLevel(3))
  12. x.Checkf(err, "Error while creating badger KV posting store")
  13. }
  14. // compression with zero server actually makes things worse
  15. // 连接zero服务器
  16. connzero, err := x.SetupConnection(opt.zero, nil, false)
  17. x.Checkf(err, "Unable to connect to zero, Is it running at %s?", opt.zero)
  18. alloc := xidmap.New(connzero, db)
  19. // 创建一个loader实例
  20. l := &loader{
  21. opts: opts,
  22. dc: dc,
  23. start: time.Now(),
  24. reqs: make(chan request, opts.Pending*100),
  25. conflicts: make(map[uint64]struct{}),
  26. alloc: alloc,
  27. db: db,
  28. zeroconn: connzero,
  29. }
  30. // request数量
  31. l.requestsWg.Add(opts.Pending)
  32. for i := 0; i < opts.Pending; i++ {
  33. // 开多个协程处理rdf数据
  34. go l.makeRequests()
  35. }
  36. rand.Seed(time.Now().Unix())
  37. return l
  38. }

4. 获取schema文件,处理schema

在处理的schema文件时候,会获取一个alpha客户端去进行操作。

  1. func processSchemaFile(ctx context.Context, file string, key x.SensitiveByteSlice,
  2. dgraphClient *dgo.Dgraph) error {
  3. fmt.Printf("\nProcessing schema file %q\n", file)
  4. if len(opt.authToken) > 0 {
  5. md := metadata.New(nil)
  6. md.Append("auth-token", opt.authToken)
  7. ctx = metadata.NewOutgoingContext(ctx, md)
  8. }
  9. // 打开文件
  10. f, err := os.Open(file)
  11. x.CheckfNoTrace(err)
  12. defer f.Close()
  13. reader, err := enc.GetReader(key, f)
  14. x.Check(err)
  15. if strings.HasSuffix(strings.ToLower(file), ".gz") {
  16. reader, err = gzip.NewReader(reader)
  17. x.Check(err)
  18. }
  19. b, err := ioutil.ReadAll(reader)
  20. if err != nil {
  21. x.Checkf(err, "Error while reading file")
  22. }
  23. op := &api.Operation{}
  24. op.Schema = string(b)
  25. // 执行修改schema操作
  26. return dgraphClient.Alter(ctx, op)
  27. }
  1. func (d *Dgraph) Alter(ctx context.Context, op *api.Operation) error {
  2. dc := d.anyClient() // 随机获取一个`alpha`客户端
  3. ctx = d.getContext(ctx)
  4. _, err := dc.Alter(ctx, op)
  5. if isJwtExpired(err) {
  6. err = d.retryLogin(ctx)
  7. if err != nil {
  8. return err
  9. }
  10. ctx = d.getContext(ctx)
  11. _, err = dc.Alter(ctx, op)
  12. }
  13. return err
  14. }
  1. // NewTxn creates a new transaction.
  2. func (d *Dgraph) NewTxn() *Txn {
  3. return &Txn{
  4. dg: d,
  5. dc: d.anyClient(),
  6. context: &api.TxnContext{},
  7. }
  8. }
  1. func (d *Dgraph) anyClient() api.DgraphClient {
  2. return d.dc[rand.Intn(len(d.dc))]
  3. }

5. 读取rdf文件处理rdf数据

首先解析-f 参数,该参数可以是多个路径用,连起来的字符串,也可以是文件夹路径,获取到文件列表之后,遍历文件,每个文件启动一个goroutine进行处理文件数据。

  1. // 读取rdf文件,可以是目录
  2. filesList := x.FindDataFiles(opt.dataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"})
  3. totalFiles := len(filesList)
  4. if totalFiles == 0 {
  5. return errors.Errorf("No data files found in %s", opt.dataFiles)
  6. }
  7. fmt.Printf("Found %d data file(s) to process\n", totalFiles)
  8. // x.Check(dgraphClient.NewSyncMarks(filesList))
  9. errCh := make(chan error, totalFiles)
  10. // 遍历rdf文件
  11. for _, file := range filesList {
  12. fmt.Println(file)
  13. file = strings.Trim(file, " \t")
  14. // 每个文件开启一个协程去处理文件
  15. go func(file string) {
  16. errCh <- l.processFile(ctx, file, opt.key)
  17. }(file)
  18. }
处理文件流程

首先去判断导入数据的类型,根据文件类型再去调用对应的方法去处理相关文件。

  1. // processFile forwards a file to the RDF or JSON processor as appropriate
  2. func (l *loader) processFile(ctx context.Context, filename string, key x.SensitiveByteSlice) error {
  3. fmt.Printf("Processing data file %q\n", filename)
  4. rd, cleanup := chunker.FileReader(filename, key)
  5. defer cleanup()
  6. // 判断文件类型,rdf或者json
  7. loadType := chunker.DataFormat(filename, opt.dataFormat)
  8. if loadType == chunker.UnknownFormat {
  9. if isJson, err := chunker.IsJSONData(rd); err == nil {
  10. if isJson {
  11. loadType = chunker.JsonFormat
  12. } else {
  13. return errors.Errorf("need --format=rdf or --format=json to load %s", filename)
  14. }
  15. }
  16. }
  17. // 实际处理文件
  18. return l.processLoadFile(ctx, rd, chunker.NewChunker(loadType, opt.batchSize))
  19. }

处理读取文件时,会以块的方式读取,100000行为一块,拿到块数据之后,再进行解析处理。解析文件时,当数量达到 buf.batchSize后把数据给 ck.nqs.nqCh 其本质是 chan []*api.NQuad。最终把处理好的数据传给回调做最后满足API数据格式处理。

看源码:

  1. func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker) error {
  2. var wg sync.WaitGroup
  3. wg.Add(1)
  4. //实际上返回ck.nqs,也就是
  5. /**
  6. * type NQuadBuffer struct {
  7. * batchSize int
  8. * nquads []*api.NQuad
  9. * nqCh chan []*api.NQuad
  10. * predHints map[string]pb.Metadata_HintType
  11. * }
  12. **/
  13. nqbuf := ck.NQuads()
  14. // 起一个协程
  15. // Spin a goroutine to push NQuads to mutation channel.
  16. go func() {
  17. defer wg.Done()
  18. // *api.NQuad 缓冲区,数量 opt.bufferSize*opt.batchSize ,配置文件传进来
  19. buffer := make([]*api.NQuad, 0, opt.bufferSize*opt.batchSize)
  20. drain := func() {
  21. // We collect opt.bufferSize requests and preprocess them. For the requests
  22. // to not confict between themself, we sort them on the basis of their predicates.
  23. // Predicates with count index will conflict among themselves, so we keep them at
  24. // end, making room for other predicates to load quickly.
  25. // 排序分类处理
  26. sort.Slice(buffer, func(i, j int) bool {
  27. iPred := sch.preds[buffer[i].Predicate]
  28. jPred := sch.preds[buffer[j].Predicate]
  29. t := func(a *predicate) int {
  30. if a != nil && a.Count {
  31. return 1
  32. }
  33. return 0
  34. }
  35. // Sorts the nquads on basis of their predicates, while keeping the
  36. // predicates with count index later than those without it.
  37. if t(iPred) != t(jPred) {
  38. return t(iPred) < t(jPred)
  39. }
  40. return buffer[i].Predicate < buffer[j].Predicate
  41. })
  42. for len(buffer) > 0 {
  43. sz := opt.batchSize
  44. if len(buffer) < opt.batchSize {
  45. sz = len(buffer)
  46. }
  47. mu := request{Mutation: &api.Mutation{Set: buffer[:sz]}}
  48. //_ = request{Mutation: &api.Mutation{Set: buffer[:sz]}}
  49. //fmt.Println("协程:%d", GetGID())
  50. // 处理好的数据放进 l.reqs,这里放进去的 reqs 在 makeRequests 方法中调用api写入
  51. l.reqs <- mu
  52. buffer = buffer[sz:]
  53. }
  54. }
  55. // nqbuf.Ch() 也就是 ck.nqs.nqCh
  56. for nqs := range nqbuf.Ch() {
  57. if len(nqs) == 0 {
  58. continue
  59. }
  60. // 获取到可能用到的最大的uid,保存起来,下次的uid均大于这个uid
  61. l.allocateUids(nqs)
  62. // 补充api.NQuad数据完整
  63. for _, nq := range nqs {
  64. nq.Subject = l.uid(nq.Subject)
  65. if len(nq.ObjectId) > 0 {
  66. nq.ObjectId = l.uid(nq.ObjectId)
  67. }
  68. }
  69. // 往缓冲区添加,数量达到opt.bufferSize*opt.batchSize 后就调用drain() ,把数据发送给 request 协程处理
  70. buffer = append(buffer, nqs...)
  71. if len(buffer) < opt.bufferSize*opt.batchSize {
  72. continue
  73. }
  74. drain()
  75. }
  76. drain()
  77. }()
  78. // 读取文件,并且捕抓错误
  79. for {
  80. select {
  81. case <-ctx.Done():
  82. return ctx.Err()
  83. default:
  84. }
  85. // 读取文件数据,当满足下面的三个条件中的一个时候返回
  86. // 1)文件读取结束
  87. // 2)已经读了1e5行
  88. // 3)发生了错误
  89. chunkBuf, err := ck.Chunk(rd)
  90. // Parses the rdf entries from the chunk, groups them into batches (each one
  91. // containing opt.batchSize entries) and sends the batches to the loader.reqs channel (see
  92. // above).
  93. // 解析rdf文件,当数量达到 buf.batchSize 把数据给 ck.nqs.nqCh, 其本质是 chan []*api.NQuad
  94. if oerr := ck.Parse(chunkBuf); oerr != nil {
  95. return errors.Wrap(oerr, "During parsing chunk in processLoadFile")
  96. }
  97. if err == io.EOF {
  98. break
  99. } else {
  100. x.Check(err)
  101. }
  102. }
  103. // 把数据再次刷新,写进ck.nqs.nqCh
  104. nqbuf.Flush()
  105. wg.Wait()
  106. return nil
  107. }

处理好数据后,发送给前面准备好的写入服务的 goroutine

  1. for len(buffer) > 0 {
  2. sz := opt.batchSize
  3. if len(buffer) < opt.batchSize {
  4. sz = len(buffer)
  5. }
  6. mu := request{Mutation: &api.Mutation{Set: buffer[:sz]}}
  7. //_ = request{Mutation: &api.Mutation{Set: buffer[:sz]}}
  8. //fmt.Println("协程:%d", GetGID())
  9. // 处理好的数据放进 l.reqs,这里放进去的 reqs 在 makeRequests 方法中调用api写入
  10. l.reqs <- mu
  11. buffer = buffer[sz:]
  12. }

6. 数据刷入 alpha 服务

首先在接收处理好的rdf数据前,创建一个缓冲区,用于存放这些数据,当数据达到bufferSize 时,进行写入。

在接收处理好的rdf数据时,会对数据进行key计算,防止重复。(贴上来的代码注释比较全,看代码)

  1. func (l *loader) makeRequests() {
  2. defer l.requestsWg.Done()
  3. buffer := make([]request, 0, l.opts.bufferSize)
  4. drain := func(maxSize int) {
  5. for len(buffer) > maxSize {
  6. i := 0
  7. for _, req := range buffer {
  8. // If there is no conflict in req, we will use it
  9. // and then it would shift all the other reqs in buffer
  10. // 同样已经存在则跳过,不存在则去请求把数据刷进去
  11. if !l.addConflictKeys(&req) {
  12. buffer[i] = req
  13. i++
  14. continue
  15. }
  16. // Req will no longer be part of a buffer
  17. l.request(req)
  18. }
  19. buffer = buffer[:i]
  20. }
  21. }
  22. // l.reqs 是一个channel 接受另一个 goroutine 写进来的数据
  23. for req := range l.reqs {
  24. //fmt.Println(req)
  25. req.conflicts = l.conflictKeysForReq(&req)
  26. // 已经存在则往buffer添加,不存在则 request 处理,并且往 l.conflicts 追加
  27. if l.addConflictKeys(&req) {
  28. l.request(req)
  29. } else {
  30. buffer = append(buffer, req)
  31. }
  32. // req缓冲的数量,当req等于l.opts.bufferSize 的时候,去执行l.request(req) 写入数据
  33. drain(l.opts.bufferSize - 1)
  34. }
  35. // 清一遍数据
  36. drain(0)
  37. }

最后刷数据的时候,也是随机获取一个alpha客户端进行数据插入,插入失败时无线重试插入操作。

  1. func (l *loader) request(req request) {
  2. atomic.AddUint64(&l.reqNum, 1)
  3. // 随机获取一个客户端
  4. txn := l.dc.NewTxn()
  5. req.CommitNow = true
  6. // 操作api导入数据
  7. _, err := txn.Mutate(l.opts.Ctx, req.Mutation)
  8. if err == nil {
  9. atomic.AddUint64(&l.nquads, uint64(len(req.Set)))
  10. atomic.AddUint64(&l.txns, 1)
  11. l.deregister(&req)
  12. return
  13. }
  14. handleError(err, false)
  15. atomic.AddUint64(&l.aborts, 1)
  16. // 重试等待
  17. l.retryRequestsWg.Add(1)
  18. // 重试
  19. go l.infinitelyRetry(req)
  20. }

到此一次live导入操作已经结束,后面有的只是对操作错误的监听跟Wait.

最后把写有注销的代码分享给大家。https://github.com/1920853199/Dgraph-Source-code-analysis代码地址

2020-10-28 17:53:40  LeeChan 阅读(33) 评论(0) 标签:Dgraph 分类:原创