Tuesday, September 29, 2015

Azure WebJobs - One WebJob SDK exe instance to process one message request.

Problem

The problem of dealing with legacy code is always a burden in software development. Whenever a new technology comes or a technology is upgraded, we have to spent some time to make sure that our existing code can use the features in new technology. Most of the time business / stake holders  don't see value in it especially if its enterprise application where people can do data entry and see reports without the new technology.

Azure WebJobs is a good technology to offload the long running processes from the main stream web apps. Microsoft provides WebJobs SDK to write functions which will be automatically called on events. Events can be entry of new message in storage queue, blobs, service bus etc...This model entertains the use of single process (.exe) to host the processing logic function and for each and every invocation SDK creates new thread on which the processing function will be called. Its really good so that for each and every request, there is no need to create new process or AppDomain which is time consuming. Since different processes are running in different threads, they can run in parallel.

All will work fine if we are starting a new project and writes functions which are going to handle the back end operations. But what about the legacy systems which are using one process / request model. If those were coded with enough design and best practices, they can work even if they are running side by side in same process. Still we need to test all the back end processors as QA is not going to certify based on the programmers assurance of good coding practice. It needs more QA time hence more budget and chances are there for stakeholders to deny//delay the migration to Azure WebJobs.

At least in the above case, we as developers sure that it will work and its matter of QA to certify. But what about the other case? The code is developed years back where developers didn't expect their code to be running side by side in same process and they extensively used static variables to easy their development. Also they didn't care about detaching events, disposing the unmanaged objects on the assumption that the process will be closed after the task is done. We / present development team is screwed.

Solution

As passionate developer who want to produce quality code, our answer will always be "Lets rewrite the code with thread safety". 

Revisit

If there are only couple of back end logic, this might work out. But think about 100 different types of back end services running without any issue in existing system? Even developers rewrite the code it needs to go same testing cycle. Need to perform load testing. Can we quickly deliver this change in this Agile world?

There are chances of memory leaks even in the new code. What if it first observed in a very high load? It requires considerable amount of time, if there is any memory leak reported in production than in development. Why should we take risk, if we can have one process do one message in the WebJobs world? We are free from many things and can deliver the phase 1 faster. May be in phase 2, we can think about changing the approach to thread based, if there are performance complaints from the field or our Azure budget is exceeding. 

So lets see if there are any ways to make one exe process one message from Azure storage queue.

One exe to process one request message

Batch side to 1

Easy option what comes to our minds will be to make the batch size to 1. But this wont work if there are static variables used by different types of queue messages / handler functions.

BatchSize to 1, JobHost.Stop() & Call Environment.Exit after completing function

The message is deleted from queue by SDK only after our WebJob function returns. In this case if we do stop the JobHost the message will not be deleted from the queue. I am trying to figure out in source code what is the relation between the JobHost running state and deleting the message. 

Start another exe from WebJob Host exe for each message

Here we invoke same or another exe from the WebJob function and pass the required parameters. Listen to its Console /Standard Out and relay the same to Azure WebJob log stream. Once the child processing exe completes it will just go away along with what ever memory it has.

The parameters required for child processor exe can be transmitted from WebJob SDK exe using standard in as mentioned in the below link.
http://joymonscode.blogspot.com/2015/09/ipc-via-standard-in-and-out-in-cnet.html

Tuesday, September 22, 2015

IPC via Standard In and Out in C#.Net

When to communicate via console instead of service based IPC

IPC is an essential feature of large software systems. We cannot build large systems with just process. Technically we can but it will not scale to the larger audience. Another problem is stability. There are many ways to achieve inter process communication which is listed in wiki. Normally people prefer service based communication as it will scale easily to more machines.

If we are sure that 2 processes are 
  • Always going to run on same machine.
  • They may not be available always. Sometimes the destination process might be spawn by source. Sometimes source may need go down after initiating the destination after getting the 'take over'signal.
we can use simple mechanisms to establish the communication.

Command line arguments

If the communication is
  • From source to destination only
  • One time 
  • Destination is spawn by source process 
  • With small data
We can use command line argument passing mechanism from the source to destination.

Using Process.StandardInput.WriteLine

If the communication payload is big passing via the command line arguments is not the good option. We can use standard input and output streams instead. Below is the C# code snippets for source as well as destination. Even it can be 2 instances of same executable.
Source code for destination exe There are multiple ways to write to the standard in stream such as using the StreamWriter and all. If we use writer in source and ReadLine in destination there are chances that programs may wait on ReadLine statements unless we explicitly flush. So better use simple console write and read line methods.

Using unique/temp files and pass the file name via commandline args

If the communication payload is enormous or its binary and mainly one way from source to destination, we can use temp files to pass data. Things to remember

  • Better generate a guid and use that as file name. 
  • Close the file after writing at the source.
  • Keep it in a temp location.
  • Let the destination file delete the temp file once its read
  • Still have cleanup routine to delete the temp files if anything remains
  • Do not use this mechanism for 2 way communication with FileSystemWatcher or passing the file name via Standard in/out streams.

Why to subscribe to OutputDataReceived event

There are now 2 parties involved in communication and there are chances that 2 of those may wait for the input from others. So better use OutputDataReceived event to avoid such deadlocks.

References

http://stackoverflow.com/questions/21270842/reading-writing-to-a-command-line-program-in-c-sharp
http://stackoverflow.com/questions/285760/how-to-spawn-a-process-and-capture-its-stdout-in-net
http://stackoverflow.com/questions/26199441/redirect-standard-output-for-n-level-of-child-processes

Tuesday, September 15, 2015

Azure WebJobs - HTTP (409) Conflict causing Microsoft.WindowsAzure.Storage.StorageException

Below is another exception which can happen if we are using shared storage resources in WebJobs. HTTP 409 is not commonly implemented by web servers as its reserved for applications to convey specific conflicts in requests.

[07/14/2015 18:28:28 > 65f7a4: SYS INFO] Status changed to Initializing
[07/14/2015 18:28:32 > 65f7a4: SYS INFO] Run script 'AzureWebJob45.exe' with script host - 'WindowsScriptHost'
[07/14/2015 18:28:32 > 65f7a4: SYS INFO] Status changed to Running
[07/14/2015 18:28:33 > 65f7a4: ERR ]
[07/14/2015 18:28:33 > 65f7a4: ERR ] Unhandled Exception: Microsoft.WindowsAzure.Storage.StorageException: The remote server returned an error: (409) Conflict. ---> System.Net.WebException: The remote server returned an error: (409) Conflict.
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.WindowsAzure.Storage.Shared.Protocol.HttpResponseParsers.ProcessExpectedStatusCodeNoException[T](HttpStatusCode expectedStatusCode, HttpStatusCode actualStatusCode, T retVal, StorageCommandBase`1 cmd, Exception ex)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.WindowsAzure.Storage.Blob.CloudBlobContainer.<CreateContainerImpl>b__27(RESTCommand`1 cmd, HttpWebResponse resp, Exception ex, OperationContext ctx)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndGetResponse[T](IAsyncResult getResponseResult)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    --- End of inner exception stack trace ---
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.WindowsAzure.Storage.Core.Util.StorageAsyncResult`1.End()
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.WindowsAzure.Storage.Blob.CloudBlobContainer.EndCreateIfNotExists(IAsyncResult asyncResult)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass1`1.<CreateCallback>b__0(IAsyncResult ar)
[07/14/2015 18:28:33 > 65f7a4: ERR ] --- End of stack trace from previous location where exception was thrown ---
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.Azure.WebJobs.Host.Executors.DynamicHostIdProvider.<TryInitializeIdAsync>d__10.MoveNext()
[07/14/2015 18:28:33 > 65f7a4: ERR ] --- End of stack trace from previous location where exception was thrown ---
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.Azure.WebJobs.Host.Executors.DynamicHostIdProvider.<GetOrCreateHostIdAsync>d__0.MoveNext()
[07/14/2015 18:28:33 > 65f7a4: ERR ] --- End of stack trace from previous location where exception was thrown ---
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.Azure.WebJobs.Host.Executors.JobHostContext.<CreateAndLogHostStartedAsync>d__2.MoveNext()
[07/14/2015 18:28:33 > 65f7a4: ERR ] --- End of stack trace from previous location where exception was thrown ---
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.Azure.WebJobs.JobHost.<CreateContextAndLogHostStartedAsync>d__f.MoveNext()
[07/14/2015 18:28:33 > 65f7a4: ERR ] --- End of stack trace from previous location where exception was thrown ---
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.Azure.WebJobs.JobHost.<StartAsyncCore>d__0.MoveNext()
[07/14/2015 18:28:33 > 65f7a4: ERR ] --- End of stack trace from previous location where exception was thrown ---
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.Azure.WebJobs.JobHost.Start()
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at Microsoft.Azure.WebJobs.JobHost.RunAndBlock()
[07/14/2015 18:28:33 > 65f7a4: ERR ]    at AzureWebJob45.Program.Main()
[07/14/2015 18:28:33 > 65f7a4: SYS INFO] Status changed to Failed
[07/14/2015 18:28:33 > 65f7a4: SYS ERR ] Job failed due to exit code -532462766

To resolve this in WebJobs
  • Check for any console.logging statements which are supposed to be shared by different WebJob executions. Avoid if any.
  • Also make sure the blobs, tables and all are not shared between the executions. We can share common SQL tables as the concurrency is handled by SQL Server. This is true for other databases such as Oracle and all.

Tuesday, September 8, 2015

Azure WebJobs - Error while handling parameter after function returned

Context

There are chances that we may get below exception when playing around Azure WebJobs which logs into Azure storage. Note that it might not have thrown any issue during development time with single message. Lets see what may cause this issue.

System.InvalidOperationException: System.InvalidOperationException: Error while handling parameter writer after function returned: ---> Microsoft.WindowsAzure.Storage.StorageException: The remote server returned an error: (400) Bad Request. ---> System.Net.WebException: The remote server returned an error: (400) Bad Request. at Microsoft.WindowsAzure.Storage.Shared.Protocol.HttpResponseParsers.ProcessExpectedStatusCodeNoException[T](HttpStatusCode expectedStatusCode, HttpStatusCode actualStatusCode, T retVal, StorageCommandBase`1 cmd, Exception ex) at Microsoft.WindowsAzure.Storage.Blob.CloudBlockBlob.<PutBlockListImpl>b__44(RESTCommand`1 cmd, HttpWebResponse resp, Exception ex, OperationContext ctx) at Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndGetResponse[T](IAsyncResult getResponseResult) --- End of inner exception stack trace --- at Microsoft.WindowsAzure.Storage.Core.Util.StorageAsyncResult`1.End() at Microsoft.WindowsAzure.Storage.Blob.BlobWriteStream.EndCommit(IAsyncResult asyncResult) at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.DelegatingCloudBlobStream.EndCommit(IAsyncResult asyncResult) at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.CancellableTaskFactory.<>c__DisplayClass3.<FromAsync>b__2(IAsyncResult ar) --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.WatchableCloudBlobStream.<CommitAsyncCore>d__0.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.WatchableCloudBlobStream.<CompleteAsync>d__9.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.WebJobs.Host.Blobs.Bindings.TextWriterArgumentBindingProvider.TextWriterArgumentBinding.TextWriterValueBinder.<SetValueAsync>d__5.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithWatchersAsync>d__23.MoveNext() --- End of inner exception stack trace --- at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithWatchersAsync>d__23.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithOutputLogsAsync>d__1c.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task) at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithLogMessageAsync>d__c.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<ExecuteWithLogMessageAsync>d__c.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at Microsoft.Azure.WebJobs.Host.Executors.FunctionExecutor.<TryExecuteAsync>d__1.MoveNext()

Consequences

The major problem with this is we can see our job might have executed more than the number of messages arrived. Or messages are processed more than once but still some are left in the queue. Its highly inconsistent.

Root cause

It clearly tells that there are something wrong when dealing with storage especially blob.Below can be our code (probably wrong)which handles the queuing and WebJob and associated logging.

//Code this is used to queue messages into Azure storage queue.
public class CustomQueueMessage
{
    public string MessageType { get; set; }
    public int MessageId { get; set; }
    public int Priority { get; set; }
    public XElement Parameters { get; set; }
}
public void TestInsertQueueWithObject(int noOfMessagesToBeQueued=1)
{
    for (int counter = 0; counter < noOfMessagesToBeQueued; counter++)
    {
        CloudQueue testQWithMessageObject = GetCloudQueue("queue1");
        testQWithMessageObject.CreateIfNotExists();
        CustomQueueMessage msg = GetSampleCustomQueueMessage(counter);
        CloudQueueMessage cloudMsg = new CloudQueueMessage(JsonConvert.SerializeObject(msg));
        testQWithMessageObject.AddMessage(cloudMsg);
        Console.WriteLine("Message added. MessageId=", msg.MessageId);
    }
}

//function which is invoked by Azure WebJobs SDK when a new message arrives in queue1.
public static void ProcessQueueMessage(
    [QueueTrigger("queue1")]
    CustomQueueMessage customMsg
,   [Blob("logs/webjobs")]
    TextWriter log,
    IBinder binder)
{
    //DoOperation();
    log.WriteLine(customMsg);
    Console.WriteLine(customMsg);
}

What is happening here? Each and every time when a message is seen by JobHost it start the ProcessMessage function is a new thread. If there are many messages getting de-queued simultaneously, there are chances that every thread will try to get the same blob ('logs\webjobs') and it ends up in the exception.

Lets see by removing the blob writing code. Even if we don't write to blob and stick only with Console.Write, logs will be visible from the Azure WebJobs monitoring portal. We can navigate from Azure portal or use direct url. Url format is below
https://<Webapp name>.scm.azurewebsites.net/azurejobs/#/jobs/<job type continuous/triggered/>/<job name>
eg:https://mvc5application.scm.azurewebsites.net/azurejobs/#/jobs/triggered/AzureWebJob45


public static void ProcessQueueMessage(
    [QueueTrigger("queue1")]
    CustomQueueMessage customMsg
,   [Blob("logs/webjobs")]
    TextWriter log,
    IBinder binder)
{
    //DoOperation();
    //log.WriteLine(customMsg);
    Console.WriteLine(customMsg);
}

Yes. It worked better.

.Net Locking is not a solution in Azure

Most of us may thinkg that, easy solution to this is to put a 'lock {}' around blob writing code. But will that scale? Absolutely no as when the application grows we may launch multiple instances and the .Net locking is limited to the process and in other instance will be running inside different machine and different process.

Logging WebJobs processing details to corresponding blobs

Sometimes we may need to write the processing insights to our own data stores rather than console stream which is shared with Azure WebJobs SDK as well. So lets try to use the Blob attribute properly so that it creates separate blob for each message. 

public static void ProcessQueueMessage(
    [QueueTrigger("queue1")]
    CustomQueueMessage customMsg
,   [Blob("logs/webjob{MessageId}")]
    TextWriter log,
    IBinder binder)
{
    //DoOperation();
    log.WriteLine(customMsg);
    Console.WriteLine(customMsg);
}

Yes. With the MessageId, now we can see that things started working properly and we will get separate logs/webjob{MessageId} blobs. There is no magic actually, the WebJobs binding mechanism replaces the {MessageId} with the MessageId property of automatically deserialized customMsg object. Also it creates new blob for us too...

Moral of the story is we should always think about running our application in scale and need to design for that.

Is this the end of story? Certainly not. There are chances that we may get Http (409) error which says there is conflict. This can happen if we are writing into Console stream which is internally writing concurrently into some blobs. Also how to uniquely assign a message id during queuing so that the logs can be written to separate blobs?

Tuesday, September 1, 2015

Passing parameters & variables into SSIS package when invoked from C# .Net

SSIS has 2 confusing concepts around dealing with variable. There were Variables earlier and now we can see Parameters also.
They are not really same and there are many places in internet talking about it. Below is a simple code snippet on how both can be set if we are executing SSIS package from C# application.

Sample setup

This sample uses a simple SSIS package which does calculation of square for all the number from 0 to N
  • One parameter named 'number'. This will be used as loop's higher end
  • One variable names 'counter'. Used as looping variable
  • For Loop Container - This loop from 1 to number parameter. Initate FindSquareAndLog script block.
  • FindSquareAndLog script block - This finds the square of variable counter and logs.
There is C# code which invokes this .dtsx file

FindSquareAndLog script block

This is part of .dtsx package. Editing the C# code in SSIS Script task in Visual Studio is simple as clicking on the 'Edit Script' button. As seen this is simple C# script which writes to Dts.Log

C# to pass parameter and variable values

This runs from a .Net application which invoke .dtsx package. Very straight forward. Every execution will create an ExecutionInstanceGUID internally. It is captured here. Very much useful if an operation involves series of SSIS package invocation and logs needs to be correlated.

Experiments

Lets try adding new variables and parameters before invoking from the .Net app.

private static void AddParametersIntoPackage(Package pkg)
{
    pkg.Parameters["number"].Value = 3;
    pkg.Variables["counter"].Value = 10;


    pkg.Parameters.Add("dfd"TypeCode.Boolean);
    pkg.Variables.Add("dfd"false"", 3);
    pkg.Parameters.Add("df"TypeCode.Decimal);
}

Can see it just runs though there is no variables or parameters with these names defined in the dtsx pacakge. Try more so that we will end up in a conclusion that dtsx pacakges can be simply generated from our C# code

http://www.sqlchick.com/entries/2013/9/15/getting-started-with-parameters-variables-configurations-in.html
Happy coding