y.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "bytes"
  8. "math/rand"
  9. "sync"
  10. "time"
  11. "gitee.com/chunanyong/dm/util"
  12. )
  13. /**
  14. * dm_svc.conf中配置的服务名对应的一组实例, 以及相关属性和状态信息
  15. *
  16. * 需求:
  17. * 1. 连接均匀分布在各个节点上
  18. * 2. loginMode,loginStatus匹配
  19. * 3. 连接异常节点比较耗时,在DB列表中包含异常节点时异常连接尽量靠后,减少对建连接速度的影响
  20. *
  21. *
  22. * DB 连接顺序:
  23. * 1. well distribution,每次连接都从列表的下一个节点开始
  24. * 2. 用DB sort值按从大到小排序,sort为一个四位数XXXX,个位--serverStatus,十位--serverMode,共 有三种模式,最优先的 *100, 次优先的*10
  25. */
  26. type epGroup struct {
  27. name string
  28. epList []*ep
  29. props *Properties
  30. epStartPos int32 // wellDistribute 起始位置
  31. lock sync.Mutex
  32. }
  33. func newEPGroup(name string, serverList []*ep) *epGroup {
  34. g := new(epGroup)
  35. g.name = name
  36. g.epList = serverList
  37. if serverList == nil || len(serverList) == 0 {
  38. g.epStartPos = -1
  39. } else {
  40. // 保证进程间均衡,起始位置采用随机值
  41. g.epStartPos = rand.Int31n(int32(len(serverList))) - 1
  42. }
  43. return g
  44. }
  45. func (g *epGroup) connect(connector *DmConnector) (*DmConnection, error) {
  46. var dbSelector = g.getEPSelector(connector)
  47. var ex error = nil
  48. // 如果配置了loginMode的主、备等优先策略,而未找到最高优先级的节点时持续循环switchtimes次,如果最终还是没有找到最高优先级则选择次优先级的
  49. // 如果只有一个节点,一轮即可决定是否连接;多个节点时保证switchTimes轮尝试,最后一轮决定用哪个节点(由于节点已经按照模式优先级排序,最后一轮理论上就是连第一个节点)
  50. var cycleCount int32
  51. if len(g.epList) == 1 {
  52. cycleCount = 1
  53. } else {
  54. cycleCount = connector.switchTimes + 1
  55. }
  56. for i := int32(0); i < cycleCount; i++ {
  57. // 循环了一遍,如果没有符合要求的, 重新排序, 再尝试连接
  58. conn, err := g.traverseServerList(connector, dbSelector, i == 0, i == cycleCount-1)
  59. if err != nil {
  60. ex = err
  61. time.Sleep(time.Duration(connector.switchInterval) * time.Millisecond)
  62. continue
  63. }
  64. return conn, nil
  65. }
  66. return nil, ex
  67. }
  68. func (g *epGroup) getEPSelector(connector *DmConnector) *epSelector {
  69. if connector.epSelector == TYPE_HEAD_FIRST {
  70. return newEPSelector(g.epList)
  71. } else {
  72. serverCount := int32(len(g.epList))
  73. sortEPs := make([]*ep, serverCount)
  74. g.lock.Lock()
  75. defer g.lock.Unlock()
  76. g.epStartPos = (g.epStartPos + 1) % serverCount
  77. for i := int32(0); i < serverCount; i++ {
  78. sortEPs[i] = g.epList[(i+g.epStartPos)%serverCount]
  79. }
  80. return newEPSelector(sortEPs)
  81. }
  82. }
  83. /**
  84. * 从指定编号开始,遍历一遍服务名中的ip列表,只连接指定类型(主机或备机)的ip
  85. * @param servers
  86. * @param checkTime
  87. *
  88. * @exception
  89. * DBError.ECJDBC_INVALID_SERVER_MODE 有站点的模式不匹配
  90. * DBError.ECJDBC_COMMUNITION_ERROR 所有站点都连不上
  91. */
  92. func (g *epGroup) traverseServerList(connector *DmConnector, epSelector *epSelector, first bool, last bool) (*DmConnection, error) {
  93. epList := epSelector.sortDBList(first)
  94. errorMsg := bytes.NewBufferString("")
  95. var ex error = nil // 第一个错误
  96. for _, server := range epList {
  97. conn, err := server.connect(connector)
  98. if err != nil {
  99. if ex == nil {
  100. ex = err
  101. }
  102. errorMsg.WriteString("[")
  103. errorMsg.WriteString(server.String())
  104. errorMsg.WriteString("]")
  105. errorMsg.WriteString(err.Error())
  106. errorMsg.WriteString(util.StringUtil.LineSeparator())
  107. continue
  108. }
  109. valid, err := epSelector.checkServerMode(conn, last)
  110. if err != nil {
  111. if ex == nil {
  112. ex = err
  113. }
  114. errorMsg.WriteString("[")
  115. errorMsg.WriteString(server.String())
  116. errorMsg.WriteString("]")
  117. errorMsg.WriteString(err.Error())
  118. errorMsg.WriteString(util.StringUtil.LineSeparator())
  119. continue
  120. }
  121. if !valid {
  122. conn.close()
  123. err = ECGO_INVALID_SERVER_MODE.throw()
  124. if ex == nil {
  125. ex = err
  126. }
  127. errorMsg.WriteString("[")
  128. errorMsg.WriteString(server.String())
  129. errorMsg.WriteString("]")
  130. errorMsg.WriteString(err.Error())
  131. errorMsg.WriteString(util.StringUtil.LineSeparator())
  132. continue
  133. }
  134. return conn, nil
  135. }
  136. if ex != nil {
  137. return nil, ex
  138. }
  139. return nil, ECGO_COMMUNITION_ERROR.addDetail(errorMsg.String()).throw()
  140. }