zw.go 5.2 KB


  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "math/rand"
  8. "strconv"
  9. "time"
  10. "gitee.com/chunanyong/dm/util"
  11. )
  12. var rwMap = make(map[string]*rwCounter)
  13. type rwCounter struct {
  14. ntrx_primary int64
  15. ntrx_total int64
  16. primaryPercent float64
  17. standbyPercent float64
  18. standbyNTrxMap map[string]int64
  19. standbyIdMap map[string]int32
  20. standbyCount int32
  21. flag []int32
  22. increments []int32
  23. }
  24. func newRWCounter(primaryPercent int32, standbyCount int32) *rwCounter {
  25. rwc := new(rwCounter)
  26. rwc.standbyNTrxMap = make(map[string]int64)
  27. rwc.standbyIdMap = make(map[string]int32)
  28. rwc.reset(primaryPercent, standbyCount)
  29. return rwc
  30. }
  31. func (rwc *rwCounter) reset(primaryPercent int32, standbyCount int32) {
  32. rwc.ntrx_primary = 0
  33. rwc.ntrx_total = 0
  34. rwc.standbyCount = standbyCount
  35. rwc.increments = make([]int32, standbyCount+1)
  36. rwc.flag = make([]int32, standbyCount+1)
  37. var gcd = util.GCD(primaryPercent*standbyCount, 100-primaryPercent)
  38. rwc.increments[0] = primaryPercent * standbyCount / gcd
  39. for i, tmp := 1, (100-primaryPercent)/gcd; i < len(rwc.increments); i++ {
  40. rwc.increments[i] = tmp
  41. }
  42. copy(rwc.flag, rwc.increments)
  43. if standbyCount > 0 {
  44. rwc.primaryPercent = float64(primaryPercent) / 100.0
  45. rwc.standbyPercent = float64(100-primaryPercent) / 100.0 / float64(standbyCount)
  46. } else {
  47. rwc.primaryPercent = 1
  48. rwc.standbyPercent = 0
  49. }
  50. }
  51. // 连接创建成功后调用,需要服务器返回standbyCount
  52. func getRwCounterInstance(conn *DmConnection, standbyCount int32) *rwCounter {
  53. key := conn.dmConnector.host + "_" + strconv.Itoa(int(conn.dmConnector.port)) + "_" + strconv.Itoa(int(conn.dmConnector.rwPercent))
  54. rwc, ok := rwMap[key]
  55. if !ok {
  56. rwc = newRWCounter(conn.dmConnector.rwPercent, standbyCount)
  57. rwMap[key] = rwc
  58. } else if rwc.standbyCount != standbyCount {
  59. rwc.reset(conn.dmConnector.rwPercent, standbyCount)
  60. }
  61. return rwc
  62. }
  63. /**
  64. * @return 主机;
  65. */
  66. func (rwc *rwCounter) countPrimary() RWSiteEnum {
  67. rwc.adjustNtrx()
  68. rwc.increasePrimaryNtrx()
  69. return PRIMARY
  70. }
  71. /**
  72. * @param dest 主机; 备机; any;
  73. * @return 主机; 备机
  74. */
  75. func (rwc *rwCounter) count(dest RWSiteEnum, standby *DmConnection) RWSiteEnum {
  76. rwc.adjustNtrx()
  77. switch dest {
  78. case ANYSITE:
  79. {
  80. if rwc.primaryPercent == 1 || (rwc.flag[0] > rwc.getStandbyFlag(standby) && rwc.flag[0] > util.Sum(rwc.flag[1:])) {
  81. rwc.increasePrimaryNtrx()
  82. dest = PRIMARY
  83. } else {
  84. rwc.increaseStandbyNtrx(standby)
  85. dest = STANDBY
  86. }
  87. }
  88. case STANDBY:
  89. {
  90. rwc.increaseStandbyNtrx(standby)
  91. }
  92. case PRIMARY:
  93. {
  94. rwc.increasePrimaryNtrx()
  95. }
  96. }
  97. return dest
  98. }
  99. /**
  100. * 防止ntrx超出有效范围,等比调整
  101. */
  102. func (rwc *rwCounter) adjustNtrx() {
  103. if rwc.ntrx_total >= INT64_MAX {
  104. var min int64
  105. var i = 0
  106. for _, num := range rwc.standbyNTrxMap {
  107. if i == 0 || num < min {
  108. min = num
  109. }
  110. i++
  111. }
  112. if rwc.ntrx_primary < min {
  113. min = rwc.ntrx_primary
  114. }
  115. rwc.ntrx_primary /= min
  116. rwc.ntrx_total /= min
  117. for k, v := range rwc.standbyNTrxMap {
  118. rwc.standbyNTrxMap[k] = v / min
  119. }
  120. }
  121. if rwc.flag[0] <= 0 && util.Sum(rwc.flag[1:]) <= 0 {
  122. // 如果主库事务数以及所有备库事务数的总和 都 <= 0, 重置事务计数,给每个库的事务计数加上初始计数值
  123. for i := 0; i < len(rwc.flag); i++ {
  124. rwc.flag[i] += rwc.increments[i]
  125. }
  126. }
  127. }
  128. func (rwc *rwCounter) increasePrimaryNtrx() {
  129. rwc.ntrx_primary++
  130. rwc.flag[0]--
  131. rwc.ntrx_total++
  132. }
  133. //func (rwc *rwCounter) getStandbyNtrx(standby *DmConnection) int64 {
  134. // key := standby.dmConnector.host + ":" + strconv.Itoa(int(standby.dmConnector.port))
  135. // ret, ok := rwc.standbyNTrxMap[key]
  136. // if !ok {
  137. // ret = 0
  138. // }
  139. //
  140. // return ret
  141. //}
  142. func (rwc *rwCounter) getStandbyId(standby *DmConnection) int32 {
  143. key := standby.dmConnector.host + ":" + strconv.Itoa(int(standby.dmConnector.port))
  144. sid, ok := rwc.standbyIdMap[key]
  145. if !ok {
  146. sid = int32(len(rwc.standbyIdMap) + 1) // 下标0是primary
  147. if sid > rwc.standbyCount {
  148. // 不在有效备库中
  149. return -1
  150. }
  151. rwc.standbyIdMap[key] = sid
  152. }
  153. return sid
  154. }
  155. func (rwc *rwCounter) getStandbyFlag(standby *DmConnection) int32 {
  156. sid := rwc.getStandbyId(standby)
  157. if sid > 0 && sid < int32(len(rwc.flag)) {
  158. // 保证备库有效
  159. return rwc.flag[sid]
  160. }
  161. return 0
  162. }
  163. func (rwc *rwCounter) increaseStandbyNtrx(standby *DmConnection) {
  164. key := standby.dmConnector.host + ":" + strconv.Itoa(int(standby.dmConnector.port))
  165. ret, ok := rwc.standbyNTrxMap[key]
  166. if ok {
  167. ret += 1
  168. } else {
  169. ret = 1
  170. }
  171. rwc.standbyNTrxMap[key] = ret
  172. sid, ok := rwc.standbyIdMap[key]
  173. if !ok {
  174. sid = int32(len(rwc.standbyIdMap) + 1) // 下标0是primary
  175. rwc.standbyIdMap[key] = sid
  176. }
  177. rwc.flag[sid]--
  178. rwc.ntrx_total++
  179. }
  180. func (rwc *rwCounter) random(rowCount int32) int32 {
  181. rand.Seed(time.Now().UnixNano())
  182. if rowCount > rwc.standbyCount {
  183. return rand.Int31n(rwc.standbyCount)
  184. } else {
  185. return rand.Int31n(rowCount)
  186. }
  187. }
  188. func (rwc *rwCounter) String() string {
  189. return "PERCENT(P/S) : " + strconv.FormatFloat(rwc.primaryPercent, 'f', -1, 64) + "/" + strconv.FormatFloat(rwc.standbyPercent, 'f', -1, 64) + "\nNTRX_PRIMARY : " +
  190. strconv.FormatInt(rwc.ntrx_primary, 10) + "\nNTRX_TOTAL : " + strconv.FormatInt(rwc.ntrx_total, 10) + "\nNTRX_STANDBY : "
  191. }