Skip to content

Commit

Permalink
Merge branch '3.3.5'
Browse files Browse the repository at this point in the history
  • Loading branch information
shlomii committed Feb 7, 2013
2 parents 9b607a2 + 8579791 commit 0f5a9d5
Showing 1 changed file with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using log4net;
using Microsoft.WindowsAzure.StorageClient;
using Microsoft.WindowsAzure.StorageClient.Protocol;

namespace NServiceBus.DataBus.Azure.BlobStorage
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure.StorageClient;
using Microsoft.WindowsAzure.StorageClient.Protocol;
using log4net;

public class BlobStorageDataBus : IDataBus
{
private readonly ILog logger = LogManager.GetLogger(typeof(IDataBus));
Expand Down Expand Up @@ -78,6 +78,10 @@ private void DeleteExpiredBlobs()
blockBlob.DeleteIfExists();
}
}
catch (StorageClientException ex) // to handle race conditions between multiple active instances.
{
logger.Warn(ex.Message);
}
catch (StorageServerException ex) // prevent azure hickups from hurting us.
{
logger.Warn(ex.Message);
Expand Down Expand Up @@ -137,22 +141,33 @@ private void Commit(CloudBlockBlob blob, List<string> originalOrder)

private void DownloadBlobInParallel(CloudBlob blob, Stream stream)
{
blob.FetchAttributes();
var order = new List<string>();
var blocksToDownload = new Queue<Block>();
CalculateBlocks(blocksToDownload, (int)blob.Properties.Length, order);
ExecuteInParallel(() => AsLongAsThereAre(blocksToDownload, block =>
try
{
var s = DownloadBlockFromBlob(blob, block, blocksToDownload); if (s == null) return;
var buffer = new byte[BlockSize];
ExtractBytesFromBlockIntoBuffer(buffer, s, block);
lock (stream)
blob.FetchAttributes();
var order = new List<string>();
var blocksToDownload = new Queue<Block>();
CalculateBlocks(blocksToDownload, (int)blob.Properties.Length, order);
ExecuteInParallel(() => AsLongAsThereAre(blocksToDownload, block =>
{
stream.Position = block.Offset;
stream.Write(buffer, 0,block.Length);
}
}));
stream.Seek(0, SeekOrigin.Begin);
var s = DownloadBlockFromBlob(blob, block, blocksToDownload); if (s == null) return;
var buffer = new byte[BlockSize];
ExtractBytesFromBlockIntoBuffer(buffer, s, block);
lock (stream)
{
stream.Position = block.Offset;
stream.Write(buffer, 0,block.Length);
}
}));
stream.Seek(0, SeekOrigin.Begin);
}
catch (StorageClientException ex) // to handle race conditions between multiple active instances.
{
logger.Warn(ex.Message);
}
catch (StorageServerException ex) // prevent azure hickups from hurting us.
{
logger.Warn(ex.Message);
}
}

private void CalculateBlocks(Queue<Block> blocksToUpload, int blobLength, ICollection<string> order)
Expand Down

0 comments on commit 0f5a9d5

Please sign in to comment.