Intro Azure Storage Data Movement Library

先日(2015/9/23)、Microsoft Azure Storage Data Movement Library (Preview 以下 DML) というAzure Storage用のファイル転送ライブラリが公開されました。[1]

DML は、従来 Storage Client Library だけでは難しかったような、高速なファイル転送を簡単にアプリで実装できるようなライブライです。(ベースは AzCopyと同じらしい) Azure File、Blob、Local の 3つの間のデータ移動ができます。

brick

下記のような、関数が揃っていて、sourceとdestをセットして呼ぶとその間でデータが転送されるって感じです。

public static Task CopyAsync(
     Uri sourceUri,
     CloudBlob destBlob,
     bool isServiceCopy,
     CopyOptions options,
     TransferContext context,
     CancellationToken cancellationToken
)

どんな組み合わせがサポートされているかとを表にまとめるとこんな感じ。このうちUrl->Blob, Url->FileのCopyだけ isServerCopy (サーバー側で実行される非同期コピー) のオプションがあります。Local to Local はサポートされていません。現在のバージョンでは、すべての処理はファイル(オブジェクト)1つの単位です。例えば、ディレクトリを指定して、その下をまとめてUploadなどのようなことはできません。表は横が転送元で縦が転送先です。

DML Matrix
元/先 Local Blob File Url
Local なし DownloadAsync DownloadAsync なし
Blob UploadAsync CopyAsync CopyAsync CopyAsync※
File UploadAsync CopyAsync CopyAsync CopyAsync※

isServerCopy(※付きのやつ) は、URLとして指定できるようなものならコピー元(ソース)にすることができます。public に公開されているURLなら、そのまま。private のものでも、S3のように、pre-signed URLが使えるならば、そのURLからAzure Storageにコピーすることが可能です。TransferManager.CopyAsync Method (Uri, CloudBlob, Boolean, CopyOptions, TransferContext, CancellationToken)

これは、DMLの機能というよりAzure Storage Blobの機能で、2012/6 に追加されたAsynchronous Cross-Account Copy Blobの副産物(?)です。[2](気が付いて無かったですが orz)

Introducing Asynchronous Cross-Account Copy Blob から引用
Note: The source blob could even be a blob outside of Windows Azure, as long as it is publicly accessible or accessible via some form of a Signed URL. For source blobs outside of Windows Azure, they will be copied to block blobs.

「Note: 公開された url あるいは、なんらかのSigned URLのようなものなら Azure の外側でもかまいません。それは、block blob として copy されます」という感じでしょうか。DMLのAPIを見ると、Azure Fileにもコピーできますね、これは、2015/8/3 に公開された新機能です、Azure Files Preview Updateversion 2015-02-21 から利用できます。[7]

DML のGitHub レポジトリを見ると、S3から Azure Blobにコピーするサンプルコードが公開されています。S3ToAzureSample

余談ですが、このサンプルを読んでいて気になったことがあったので PR を出してみたらさくっとマージされました。[3](軽微な修正ですが)

WAD Custom Log Downloader

とりあえず、なにか作ってみようと思い Azure Diagonestics の Custom Log をローカルにDownloadするアプリWADCustomLogDownloaderを書きました。AzCopyだけだと、ダウンロード対象のファイルのリストがどこか外部にあって参照しながらダウンロードするというのは効率悪くWADのCustom Logなどは不得意パターンでした。それをライブラリになったので、ちょっとやってみようという感じです。

環境変数AZURE_STORAGE_CONNECTION_STRINGに、StorageのConnection文字列を入れて、下記のように実行します。-f から、-t の間のFileTimeのログを、WADDirectoriesTable から探し対応するファイルをblobから、-d で指定したローカルディレクトリにダウンロードします。ちょっと動かしてみた感じだとAzCopyと同じような速度を期待できそうです。(ちゃんと計っていないのですが)

$ $env:AZURE_STORAGE_CONNECTION_STRING="YOUR_STORAGE_KEY"
$ ./WADCustomLogDownloader -container diagnostics-custom-logs -d c:\temp\logs -f 2015-09-10 -t 2015-09-23 -v

ここから、コードの解説をしながらコーディングしていて気が付いた点をいくつか並べていきます。

基本的な作り

ダウンロード対象のBlobのリストを、Azure Table の WADDirectoriesTable から読んで、BlockingCollectionに突っ込み、別TaskでBlockingCollection を読んで対象のBlobをダウンロードするという流れになっています。Producer が、ListWadCustomLogs()で、Consumer が、DownloadFromBlob()、その間を繋ぐ queue が、BlockingCollection _jobQueueです。

今回は、Producer が軽いので割と序盤で全部 queue に入ってしまって、順次 Consumer が処理するということになります。大量に対象のファイルがある場合、BlockingCollection の本領発揮なのでしょうが、今回は Producer–consumer problem[4]には引っかからない気がするので枠組みだけ使ってさくっと進みます。この基本構造は、DMLのS3のサンプルからいただきました、王道ですね。

Best Practice

パフォーマンスを出すには、DefaultConnectionLimit を増やすのと、 100-continue をOffにする必要があります。Azure Storage を使っている人にはお馴染みかと思いますが。

ServicePointManager.DefaultConnectionLimit = Environment.ProcessorCount * 8;
ServicePointManager.Expect100Continue = false;

ちなみに、ASP.NET環境下では、HttpRuntimeの初期化が走ると、HttpRuntime.InitializeHostingFeatures() から、いろいろ呼ばれて、HttpRuntime.SetAutoConfigLimits()で、下記のコードが実行されるので、DefaultConnectionLimit の変更は不要です。Consoleアプリや、WorkerRoleでは必須なので要注意です。

System.Net.ServicePointManager.DefaultConnectionLimit = Int32.MaxValue;

Producer ListWadCustomLogs

WADDirectoriesTable から、該当日付近のデータを取り出します。WAD(Windows Azure Diagonestics)では、PartitionKey にTickを使っているので日付をUTCにしてTickを19桁の文字列に展開します。ここは、Partition Queryなので、速度が必要なら分割してパラレルで実行した方が良いですが、実測で DC内で 100 ms/1000件 程度、西から東を見た場合でも、140ms/1000件、の時間でした。そのためあまり気にせず単純なクエリーで書きます。件数が多くなるのが想定できる場合は、適時分割した方が良いと思います。

ソース:WADDirectoriesTable から該当期間のログのエンティティを取得

そのあと、ExecuteSegmentedAsync() を使って、1000件毎にエンティティを取得し、jobQueueに入れます。全部入れ終わたら、_jobQueue.CompleteAdding()します。Consumer 側では、_jobQueue.IsCompletedがtrueなら処理終了と判断します。true になるのは、complete とマークされてて queue が空の時です。[5]これだけでもBlockingCollectionちょっと便利ですね。

今回は、BlockingCollectionの maximum capacity を設定していないので_jobQueue.Add(result.Results)はブロックしません。Producer と Consumer のバランスをとる必要がある場合は適時設定するといいでしょう。

Consumer DownloadFromBlob

_jobQueue に乗っている WADDirectoriesTable をTransferManager.DownloadAsyncに渡してダウンロードするタスクを作成します。ここは、Task の作成しすぎとかは、気にせずにグルグル回してしまって良く、実際のTaskのスケジューリングは適時 DML内でやってくれます。基本的には、4MB chunk で、DownloadRangeToStreamAsync を使って分割ダウンロード、chunk は、buffer pool を利用、core 数*8 を並列実行という感じです。

DMLソース:BlokBlob ダウンロードの実装部分

並列度は、TransferConfigurationsで設定できます。デフォルトは、core数*8、その下の行を見ると、chunk size は、4MB(固定?)です。使うメモリーは、Native Method の GlobalMemoryStatusEx()を呼んで物理メモリーサイズを拾っています。(なんと!)

そして、DMLの一番の肝は、 buffer pool と並列実行度をリンクさせたスケジューラーTransferSchedulerTransferSchedulerですね。Blobでパフォーマンスを上げるなら並列度を上げるのが重要で、大きなファイルを扱うためにはGCを避けるのが王道なので、こんな感じになるのかという気がします。

DMLの中に脱線していました、まだあまり読めてないのですが、いろいろ参考になるコードだと思います。

Consumerの細かいところを説明すると、大体4点

  1. ダウンロード済みのファイルがあればSkipする:TransferContext.OverwriteCallbackで、ダウンロード先にファイルがあるかどうかを見ています。WADDirectoriesTable の中のFileTime、FileSize と比較して同じだったらスキップとしています。WADDirectoriesTable のデータを参照するために、_sourceFileDictionary を参照しているのがイマイチな気がします。OverwriteCallback に、オプションのobject とか渡せれば解決なのですが…
  2. MD5 Validation をOffにする:DownloadOptions.DisableContentMD5ValidationWADのCustom log 転送を使って転送したBlobにはMD5Sumが付いていないので仕方ない。ここはOffです。WADDirectoriesTable の FileSize と同じかどうかを確認できるので大丈夫かな?と思いますが、選択できても良い気がします。MD5Sumの計算はコストが高いのでなるべく避けたい気持ちもわかりますが。
  3. TransferException をハンドルする:TransferExceptionここでは、TransferErrorCode の TransferAlreadyExists、NotOverwriteExistingDestination を特別扱いしています。 TransferContext.OverwriteCallback で、false を返すと、NotOverwriteExistingDestination が帰ります。今回はすでにダウンロード済みの場合は、スキップしているのでエラーではありません。 TransferManager.DownloadAsync に、同じ source, dest のセットが追加されると TransferAlreadyExists が発生します。今回は、WADDirectoriesTable に残っている転送履歴からBlobを探しているので、複数回転送されると TransferAlreadyExists になります。これは、WAD の Custom Log転送をどう使うのかという話にも絡みますが、今回はスキップで扱います。[6]
  4. ダウンロード後のファイルのタイムスタンプを合わせる:File.SetLastWriteTimeUtcここは、見て通りです。よく見ると、余計なcaptureがあるし、cancel とかの場合の分岐が出来てませんね orz ….

あとは、 CountdownEvent を使って、全部のTaskの終了を待ちます。ProgressHandler をフックすると、転送量とか転送ファイル数のステータスをもらえるのは便利です。

まとめ

ちょっと長いコードになりましたが、割と簡単に効率的な転送プログラムが実装できました。DMLの内部構造を見てわかる通り、CPUとメモリーがある程度あるマシンでないと本来の性能が発揮できない傾向があります。合計1.6GB の50本ほどのlog を D1とD3の両方でダウンロードしてみたところ、D1で 12 Mbps ぐらい、D3では 800 Mbps ぐらいのスループットになりました。(速度を見るにはもう少し本格的にやってみないといけないので参考程度ですが)

exe 単体で動作するように、 Release Build では、ILMergeで、wadcldl.exe を作っています。

環境変数 AZURE_STORAGE_CONNECTION_STRING にStorage Account Keyをセットするスクリプトsetenv.ps1を入れてあるので参考にしてください。実行前に、Add-AzureAccount をします。

[1]2015/9/23 Introducing Azure Storage Data Movement Library Preview
[2]Introducing Asynchronous Cross-Account Copy Blob
[3]Dispose GetObjectResponse object. Here are many run.
[4]Producer–consumer problem
[5]Reference Source BlockingCollection.IsCompleted
[6]TransferErrorCode Enumeration
[7]MSDN Copy File