SQLAzureShardManager.php 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. <?php
  2. /*
  3. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  4. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  5. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  6. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  7. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  8. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  9. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  10. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  11. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  12. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  13. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  14. *
  15. * This software consists of voluntary contributions made by many individuals
  16. * and is licensed under the MIT license. For more information, see
  17. * <http://www.doctrine-project.org>.
  18. */
  19. namespace Doctrine\DBAL\Sharding\SQLAzure;
  20. use Doctrine\DBAL\Sharding\ShardManager;
  21. use Doctrine\DBAL\Sharding\ShardingException;
  22. use Doctrine\DBAL\Connection;
  23. use Doctrine\DBAL\Types\Type;
  24. /**
  25. * Sharding using the SQL Azure Federations support.
  26. *
  27. * @author Benjamin Eberlei <kontakt@beberlei.de>
  28. */
  29. class SQLAzureShardManager implements ShardManager
  30. {
  31. /**
  32. * @var string
  33. */
  34. private $federationName;
  35. /**
  36. * @var boolean
  37. */
  38. private $filteringEnabled;
  39. /**
  40. * @var string
  41. */
  42. private $distributionKey;
  43. /**
  44. * @var string
  45. */
  46. private $distributionType;
  47. /**
  48. * @var \Doctrine\DBAL\Connection
  49. */
  50. private $conn;
  51. /**
  52. * @var string
  53. */
  54. private $currentDistributionValue;
  55. /**
  56. * @param \Doctrine\DBAL\Connection $conn
  57. *
  58. * @throws \Doctrine\DBAL\Sharding\ShardingException
  59. */
  60. public function __construct(Connection $conn)
  61. {
  62. $this->conn = $conn;
  63. $params = $conn->getParams();
  64. if ( ! isset($params['sharding']['federationName'])) {
  65. throw ShardingException::missingDefaultFederationName();
  66. }
  67. if ( ! isset($params['sharding']['distributionKey'])) {
  68. throw ShardingException::missingDefaultDistributionKey();
  69. }
  70. if ( ! isset($params['sharding']['distributionType'])) {
  71. throw ShardingException::missingDistributionType();
  72. }
  73. $this->federationName = $params['sharding']['federationName'];
  74. $this->distributionKey = $params['sharding']['distributionKey'];
  75. $this->distributionType = $params['sharding']['distributionType'];
  76. $this->filteringEnabled = (isset($params['sharding']['filteringEnabled'])) ? (bool)$params['sharding']['filteringEnabled'] : false;
  77. }
  78. /**
  79. * Gets the name of the federation.
  80. *
  81. * @return string
  82. */
  83. public function getFederationName()
  84. {
  85. return $this->federationName;
  86. }
  87. /**
  88. * Gets the distribution key.
  89. *
  90. * @return string
  91. */
  92. public function getDistributionKey()
  93. {
  94. return $this->distributionKey;
  95. }
  96. /**
  97. * Gets the Doctrine Type name used for the distribution.
  98. *
  99. * @return string
  100. */
  101. public function getDistributionType()
  102. {
  103. return $this->distributionType;
  104. }
  105. /**
  106. * Sets Enabled/Disable filtering on the fly.
  107. *
  108. * @param boolean $flag
  109. *
  110. * @return void
  111. */
  112. public function setFilteringEnabled($flag)
  113. {
  114. $this->filteringEnabled = (bool)$flag;
  115. }
  116. /**
  117. * {@inheritDoc}
  118. */
  119. public function selectGlobal()
  120. {
  121. if ($this->conn->isTransactionActive()) {
  122. throw ShardingException::activeTransaction();
  123. }
  124. $sql = "USE FEDERATION ROOT WITH RESET";
  125. $this->conn->exec($sql);
  126. $this->currentDistributionValue = null;
  127. }
  128. /**
  129. * {@inheritDoc}
  130. */
  131. public function selectShard($distributionValue)
  132. {
  133. if ($this->conn->isTransactionActive()) {
  134. throw ShardingException::activeTransaction();
  135. }
  136. if ($distributionValue === null || is_bool($distributionValue) || !is_scalar($distributionValue)) {
  137. throw ShardingException::noShardDistributionValue();
  138. }
  139. $platform = $this->conn->getDatabasePlatform();
  140. $sql = sprintf(
  141. "USE FEDERATION %s (%s = %s) WITH RESET, FILTERING = %s;",
  142. $platform->quoteIdentifier($this->federationName),
  143. $platform->quoteIdentifier($this->distributionKey),
  144. $this->conn->quote($distributionValue),
  145. ($this->filteringEnabled ? 'ON' : 'OFF')
  146. );
  147. $this->conn->exec($sql);
  148. $this->currentDistributionValue = $distributionValue;
  149. }
  150. /**
  151. * {@inheritDoc}
  152. */
  153. public function getCurrentDistributionValue()
  154. {
  155. return $this->currentDistributionValue;
  156. }
  157. /**
  158. * {@inheritDoc}
  159. */
  160. public function getShards()
  161. {
  162. $sql = "SELECT member_id as id,
  163. distribution_name as distribution_key,
  164. CAST(range_low AS CHAR) AS rangeLow,
  165. CAST(range_high AS CHAR) AS rangeHigh
  166. FROM sys.federation_member_distributions d
  167. INNER JOIN sys.federations f ON f.federation_id = d.federation_id
  168. WHERE f.name = " . $this->conn->quote($this->federationName);
  169. return $this->conn->fetchAll($sql);
  170. }
  171. /**
  172. * {@inheritDoc}
  173. */
  174. public function queryAll($sql, array $params = array(), array $types = array())
  175. {
  176. $shards = $this->getShards();
  177. if (!$shards) {
  178. throw new \RuntimeException("No shards found for " . $this->federationName);
  179. }
  180. $result = array();
  181. $oldDistribution = $this->getCurrentDistributionValue();
  182. foreach ($shards as $shard) {
  183. $this->selectShard($shard['rangeLow']);
  184. foreach ($this->conn->fetchAll($sql, $params, $types) as $row) {
  185. $result[] = $row;
  186. }
  187. }
  188. if ($oldDistribution === null) {
  189. $this->selectGlobal();
  190. } else {
  191. $this->selectShard($oldDistribution);
  192. }
  193. return $result;
  194. }
  195. /**
  196. * Splits Federation at a given distribution value.
  197. *
  198. * @param mixed $splitDistributionValue
  199. *
  200. * @return void
  201. */
  202. public function splitFederation($splitDistributionValue)
  203. {
  204. $type = Type::getType($this->distributionType);
  205. $sql = "ALTER FEDERATION " . $this->getFederationName() . " " .
  206. "SPLIT AT (" . $this->getDistributionKey() . " = " .
  207. $this->conn->quote($splitDistributionValue, $type->getBindingType()) . ")";
  208. $this->conn->exec($sql);
  209. }
  210. }