From 857979135f580cedfccc935cada2b1e372f56c92 Mon Sep 17 00:00:00 2001 From: yvesgoeleven Date: Wed, 6 Feb 2013 09:07:47 +0100 Subject: [PATCH] Fixed effects of concurrent delete on databus message expiry --- .../BlobStorageDataBus.cs | 65 ++++++++++++------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/src/azure/DataBus/NServiceBus.DataBus.Azure.BlobStorage/BlobStorageDataBus.cs b/src/azure/DataBus/NServiceBus.DataBus.Azure.BlobStorage/BlobStorageDataBus.cs index eb80c1ddce8..340dd73807d 100644 --- a/src/azure/DataBus/NServiceBus.DataBus.Azure.BlobStorage/BlobStorageDataBus.cs +++ b/src/azure/DataBus/NServiceBus.DataBus.Azure.BlobStorage/BlobStorageDataBus.cs @@ -1,16 +1,16 @@ -using System; -using System.Collections.Generic; -using System.Globalization; -using System.IO; -using System.Linq; -using System.Net; -using System.Threading; -using log4net; -using Microsoft.WindowsAzure.StorageClient; -using Microsoft.WindowsAzure.StorageClient.Protocol; - namespace NServiceBus.DataBus.Azure.BlobStorage { + using System; + using System.Collections.Generic; + using System.Globalization; + using System.IO; + using System.Linq; + using System.Net; + using System.Threading; + using Microsoft.WindowsAzure.StorageClient; + using Microsoft.WindowsAzure.StorageClient.Protocol; + using log4net; + public class BlobStorageDataBus : IDataBus { private readonly ILog logger = LogManager.GetLogger(typeof(IDataBus)); @@ -78,6 +78,10 @@ private void DeleteExpiredBlobs() blockBlob.DeleteIfExists(); } } + catch (StorageClientException ex) // to handle race conditions between multiple active instances. + { + logger.Warn(ex.Message); + } catch (StorageServerException ex) // prevent azure hickups from hurting us. { logger.Warn(ex.Message); @@ -137,22 +141,33 @@ private void Commit(CloudBlockBlob blob, List originalOrder) private void DownloadBlobInParallel(CloudBlob blob, Stream stream) { - blob.FetchAttributes(); - var order = new List(); - var blocksToDownload = new Queue(); - CalculateBlocks(blocksToDownload, (int)blob.Properties.Length, order); - ExecuteInParallel(() => AsLongAsThereAre(blocksToDownload, block => + try { - var s = DownloadBlockFromBlob(blob, block, blocksToDownload); if (s == null) return; - var buffer = new byte[BlockSize]; - ExtractBytesFromBlockIntoBuffer(buffer, s, block); - lock (stream) + blob.FetchAttributes(); + var order = new List(); + var blocksToDownload = new Queue(); + CalculateBlocks(blocksToDownload, (int)blob.Properties.Length, order); + ExecuteInParallel(() => AsLongAsThereAre(blocksToDownload, block => { - stream.Position = block.Offset; - stream.Write(buffer, 0,block.Length); - } - })); - stream.Seek(0, SeekOrigin.Begin); + var s = DownloadBlockFromBlob(blob, block, blocksToDownload); if (s == null) return; + var buffer = new byte[BlockSize]; + ExtractBytesFromBlockIntoBuffer(buffer, s, block); + lock (stream) + { + stream.Position = block.Offset; + stream.Write(buffer, 0,block.Length); + } + })); + stream.Seek(0, SeekOrigin.Begin); + } + catch (StorageClientException ex) // to handle race conditions between multiple active instances. + { + logger.Warn(ex.Message); + } + catch (StorageServerException ex) // prevent azure hickups from hurting us. + { + logger.Warn(ex.Message); + } } private void CalculateBlocks(Queue blocksToUpload, int blobLength, ICollection order)