Context
There are chances that we may get below exception when playing around Azure WebJobs which logs into Azure storage. Note that it might not have thrown any issue during development time with single message. Lets see what may cause this issue.
System.InvalidOperationException: System.InvalidOperationException: Error while handling parameter writer after function returned: ---> Microsoft.WindowsAzure.Storage.StorageException: The remote server returned an error: (400) Bad Request. ---> System.Net.WebException: The remote server returned an error: (400) Bad Request.
at Microsoft.WindowsAzure.Storage.Shared.Protocol.HttpResponseParsers.ProcessExpectedStatusCodeNoException[T](HttpStatusCode expectedStatusCode, HttpStatusCode actualStatusCode, T retVal, StorageCommandBase`1 cmd, Exception ex)
at Microsoft.WindowsAzure.Storage.Blob.CloudBlockBlob.<PutBlockListImpl>b__44(RESTCommand`1 cmd, HttpWebResponse resp, Exception ex, OperationContext ctx)
at Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndGetResponse[T](IAsyncResult getResponseResult)
--- End of inner exception stack trace ---
at Microsoft.WindowsAzure.Storage.Core.Util.StorageAsyncResult`1.End()
at Microsoft.WindowsAzure.Storage.Blob.BlobWriteStream.EndCommit(IAsyncResult asyncResult)
at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.DelegatingCloudBlobStream.EndCommit(IAsyncResult asyncResult)
at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.CancellableTaskFactory.<>c__DisplayClass3.<FromAsync>b__2(IAsyncResult ar)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.WatchableCloudBlobStream.<CommitAsyncCore>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.WatchableCloudBlobStream.<CompleteAsync>d__9.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.TextWriterArgumentBindingProvider.TextWriterArgumentBinding.TextWriterValueBinder.<SetValueAsync>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithWatchersAsync>d__23.MoveNext()
--- End of inner exception stack trace ---
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithWatchersAsync>d__23.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithOutputLogsAsync>d__1c.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithLogMessageAsync>d__c.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithLogMessageAsync>d__c.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<TryExecuteAsync>d__1.MoveNext()
Consequences
The major problem with this is we can see our job might have executed more than the number of messages arrived. Or messages are processed more than once but still some are left in the queue. Its highly inconsistent.
Root cause
It clearly tells that there are something wrong when dealing with storage especially blob.Below can be our code (probably wrong)which handles the queuing and WebJob and associated logging.//Code this is used to queue messages into Azure storage queue.
public class CustomQueueMessage { public string MessageType { get; set; } public int MessageId { get; set; } public int Priority { get; set; } public XElement Parameters { get; set; } }public void TestInsertQueueWithObject(int noOfMessagesToBeQueued=1) { for (int counter = 0; counter < noOfMessagesToBeQueued; counter++) { CloudQueue testQWithMessageObject = GetCloudQueue("queue1"); testQWithMessageObject.CreateIfNotExists(); CustomQueueMessage msg = GetSampleCustomQueueMessage(counter); CloudQueueMessage cloudMsg = new CloudQueueMessage(JsonConvert.SerializeObject(msg)); testQWithMessageObject.AddMessage(cloudMsg); Console.WriteLine("Message added. MessageId=", msg.MessageId); } }
//function which is invoked by Azure WebJobs SDK when a new message arrives in queue1.
public static void ProcessQueueMessage( [QueueTrigger("queue1")] CustomQueueMessage customMsg , [Blob("logs/webjobs")] TextWriter log, IBinder binder) { //DoOperation(); log.WriteLine(customMsg);
Console.WriteLine(customMsg);}
What is happening here? Each and every time when a message is seen by JobHost it start the ProcessMessage function is a new thread. If there are many messages getting de-queued simultaneously, there are chances that every thread will try to get the same blob ('logs\webjobs') and it ends up in the exception.
Lets see by removing the blob writing code. Even if we don't write to blob and stick only with Console.Write, logs will be visible from the Azure WebJobs monitoring portal. We can navigate from Azure portal or use direct url. Url format is below
https://<Webapp name>.scm.azurewebsites.net/azurejobs/#/jobs/<job type continuous/triggered/>/<job name>
eg:https://mvc5application.scm.azurewebsites.net/azurejobs/#/jobs/triggered/AzureWebJob45
public static void ProcessQueueMessage( [QueueTrigger("queue1")] CustomQueueMessage customMsg , [Blob("logs/webjobs")] TextWriter log, IBinder binder) { //DoOperation(); //log.WriteLine(customMsg);
Console.WriteLine(customMsg);}
Yes. It worked better.
.Net Locking is not a solution in Azure
Most of us may thinkg that, easy solution to this is to put a 'lock {}' around blob writing code. But will that scale? Absolutely no as when the application grows we may launch multiple instances and the .Net locking is limited to the process and in other instance will be running inside different machine and different process.Logging WebJobs processing details to corresponding blobs
Sometimes we may need to write the processing insights to our own data stores rather than console stream which is shared with Azure WebJobs SDK as well. So lets try to use the Blob attribute properly so that it creates separate blob for each message.
public static void ProcessQueueMessage( [QueueTrigger("queue1")] CustomQueueMessage customMsg , [Blob("logs/webjob{MessageId}")] TextWriter log, IBinder binder) { //DoOperation(); log.WriteLine(customMsg);
Console.WriteLine(customMsg);}
Yes. With the MessageId, now we can see that things started working properly and we will get separate logs/webjob{MessageId} blobs. There is no magic actually, the WebJobs binding mechanism replaces the {MessageId} with the MessageId property of automatically deserialized customMsg object. Also it creates new blob for us too...
Moral of the story is we should always think about running our application in scale and need to design for that.
Is this the end of story? Certainly not. There are chances that we may get Http (409) error which says there is conflict. This can happen if we are writing into Console stream which is internally writing concurrently into some blobs. Also how to uniquely assign a message id during queuing so that the logs can be written to separate blobs?
No comments:
Post a Comment