Tuesday, April 7, 2009

Adding ambient transactions to NCommon

Well.. here goes another enhancement for NCommon framework..

One of the requirements of the project in which I'm using NCommon is supporting transactions over web services based on WS-AT standard.

This should not suppose a big deal as we are using WCF as our underlaying web services stack, which in turns support WS-AT "nearly" out of the box, relying on System.Transactions infrastructure, so if a remote transaction is in use it will be available as an ambient transaction accessible via System.Transactions.Transaction.Current. If you want, you can read the details about WCF and WS-AT here and here.

Also, if you read a bit about transactions and Linq2Sql you'll find that L2S supports three different ways of interacting with transactions:
1. Linq2Sql's DataContext.SubmitChanges() method checks if DataContext.Connection.Transaction (which holds an ADO.NET DbTransaction object) is not null, and if so, it will make use of such transaction object to coordinate db operations.
2. If DataContext.Connection.Transaction is null, L2S will try to find if SubmitChanges() was called inside an implicit transaction created by instantiating a System.Transactions.TransactionScope object. If so, L2S will coordinate db operations within the transaction identified by such TransactionScope object.
3. At this point, if you have read about System.Transactions and WCF, you will know that WS-AT Ambient transactions are wrapped into an implicit transactions exposed as TransactionScope instances. So, if you are working with L2S within a web service call which is participating on a propagated transaction, you will have access to such remote transaction as if it were a local implicit transaction started with TransactionScope.

Up till now everything looked promising, but then, I found that NCommon currently does not support implicit transactions, nor it is capable of interacting with System.Transactions infrastructure. From reading NCommon's source code, I found that NCommon's Linq2Sql transaction handling code relies only on method #1: Creating a new DbTransaction instance and assigning it to DataContext.Connection.Transaction during UnitOfWork initialization.
Obviouslly, this doesn't work for what I needed, so I started extending NCommon to support Ambient Transactions..

Here I will outline the steps I performed in order to add Ambient Transactions support into NCommon. However, you must take into account that, the following code only adds Ambient Transactions support to NCommon's Linq2Sql code. Adding such support to NCommon.EntityFramework and/or NCommon.NHibernate should not be difficult, but by now, I will left this task to the reader. ;)

Step One: Implement an Ambient enabled ITransaction object

This class will our bridge between NCommon.Data.ITransaction infrastructure and System.Transactions, as thus, this class will wrapp a TransactionScope instance which will depend on an existing ambient TransactionScope, this way we can relay ITransacion events like Commit and/or Rollback to System.Transactions.


#region license
//Copyright 2009 Pablo Ruiz Garcia

//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at

//http://www.apache.org/licenses/LICENSE-2.0

//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
#endregion

using System;
using System.Data;
using System.Transactions;

namespace NCommon.Data
{
/// <summary>
/// System.Transactions ambient transactions wrapped into an ITransaction object.
/// </summary>
public class AmbientTransaction : ITransaction
{
#region fields
private bool _disposed;
private bool _deferEvents;
private readonly TransactionScope _internalTransactionScope;
private readonly Transaction _internalTransaction;
#endregion

#region ctor
/// <summary>
/// Default Constructor.
/// Creates a new instance of the <see cref="AmbientTransaction"/>
/// </summary>
public AmbientTransaction(System.Transactions.IsolationLevel? isolationLevel, TransactionScopeOption? scopeOptions)
{
TransactionOptions? options = null;

if (isolationLevel.HasValue)
options = new TransactionOptions() { IsolationLevel = isolationLevel.Value };

if (!scopeOptions.HasValue)
scopeOptions = TransactionScopeOption.Required;

if (options.HasValue && scopeOptions.HasValue)
_internalTransactionScope = new TransactionScope(scopeOptions.Value, options.Value);
else if (options.HasValue) // XXX: Not possible..
_internalTransactionScope = new TransactionScope(scopeOptions.Value, options.Value);
else if (scopeOptions.HasValue)
_internalTransactionScope = new TransactionScope(scopeOptions.Value);
else
_internalTransactionScope = new TransactionScope();

_internalTransaction = Transaction.Current;
_internalTransaction.TransactionCompleted += new TransactionCompletedEventHandler(_internalTransaction_TransactionCompleted);
}

/// <summary>
/// Initializes a new instance of the <see cref="AmbientTransaction"/> class.
/// </summary>
/// <param name="isolationLevel">The isolation level.</param>
public AmbientTransaction(System.Transactions.IsolationLevel isolationLevel)
: this(isolationLevel, null)
{
}

public AmbientTransaction(TransactionScopeOption scopeOptions)
: this(null, scopeOptions)
{
}

public AmbientTransaction()
: this(null, null)
{
}
#endregion

#region InternalTransactionEventArgs
private class InternalTransactionEventArgs : TransactionEventArgs
{
public Transaction transaction;

public InternalTransactionEventArgs(Transaction tr)
{
transaction = tr;
}
}
#endregion

#region Implementation of IDisposable

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Disposes off managed and un-managed transactions.
/// </summary>
/// <param name="disposing"></param>
private void Dispose (bool disposing)
{
if (disposing)
{
if (!_disposed)
{
#if false // Not needed as TransactionScope will take care of it..
var status = _internalTransaction.TransactionInformation.Status;

if (status != TransactionStatus.Committed)
_internalTransaction.Rollback();

_internalTransaction.Dispose();
#endif
_internalTransactionScope.Dispose();
_internalTransaction.TransactionCompleted -= new TransactionCompletedEventHandler(_internalTransaction_TransactionCompleted);
_disposed = true;
}
}
}
#endregion

#region Implementation of ITransaction
/// <summary>
/// Event raised when the transaction has been comitted.
/// </summary>
public event EventHandler TransactonComitted;

/// <summary>
/// Event raised when the transaction has been rolledback.
/// </summary>
public event EventHandler TransactionRolledback;

/// <summary>
/// Commits the changes made to the data store.
/// </summary>
/// <remarks>Implementors MUST raise the <see cref="ITransaction.TransactonComitted"/> event.</remarks>
public void Commit()
{
if (_disposed)
throw new ObjectDisposedException("AmbientTransaction", "Cannot commit a disposed transaction.");

Guard.Against<TransactionAbortedException>(
_internalTransaction.TransactionInformation.Status == TransactionStatus.Aborted,
"Ambient Transaction aborted, unable to commit!"
);

try
{
_deferEvents = true;
_internalTransactionScope.Complete();
}
finally
{
_deferEvents = false;
_internalTransaction_TransactionCompleted(this, new InternalTransactionEventArgs(_internalTransaction));
}
}

void _internalTransaction_TransactionCompleted(object sender, TransactionEventArgs e)
{
Transaction tr = e.Transaction;

Guard.Against<ObjectDisposedException>(_disposed, "Transaction is in disposed state!");

// If we are getting called by ourselves, just defer event propagatioon. (pruiz)
if (_deferEvents)
return;

if (e is InternalTransactionEventArgs)
tr = (e as InternalTransactionEventArgs).transaction;

if (tr.TransactionInformation.Status == TransactionStatus.Committed && TransactonComitted != null)
TransactonComitted(this, e);

if (tr.TransactionInformation.Status == TransactionStatus.Aborted && TransactionRolledback != null)
TransactionRolledback(this, e);
}

/// <summary>
/// Rollsback any changes made.
/// </summary>
/// <remarks>Implementors MUST raise the <see cref="ITransaction.TransactionRolledback"/> event.</remarks>
public void Rollback()
{
if (_disposed)
throw new ObjectDisposedException("AmbientTransaction", "Cannot rollback a disposed transaction.");

try
{
_deferEvents = true;
// XXX? Do nothing, as the transaction will be rolledback automatically
// when diposing the transactionScope. (pruiz)
//_internalTransaction.Rollback();

}
finally
{
_deferEvents = false;
_internalTransaction_TransactionCompleted(this, new InternalTransactionEventArgs(_internalTransaction));
}
}
#endregion

#region Utility methods
/// <summary>
/// Gets the current ambient transaction isolation level. Or returns a default one
/// if no current ambient transaction is available.
/// </summary>
/// <param name="default">The @default.</param>
/// <returns>The current ambient transaction isolation level, or @default if no there is not current ambient transaction.</returns>
public static System.Data.IsolationLevel GetCurrentIsolationLevel(System.Data.IsolationLevel @default)
{
if (System.Transactions.Transaction.Current == null)
return @default;

return (System.Data.IsolationLevel)
Enum.Parse(
typeof(System.Data.IsolationLevel),
System.Transactions.Transaction.Current.IsolationLevel.ToString()
);
}

/// <summary>
/// Gets the current ambient transaction isolation level.
/// </summary>
/// <returns>
/// The current ambient transaction isolation level.
/// </returns>
/// <exception cref="InvalidOperationException">
/// If no currently ambient transaction is found.
/// </exception>
public static System.Data.IsolationLevel GetCurrentIsolationLevel()
{
if (System.Transactions.Transaction.Current == null)
throw new InvalidOperationException("No current ambient transaction is started!");

return (System.Data.IsolationLevel)
Enum.Parse(
typeof(System.Data.IsolationLevel),
System.Transactions.Transaction.Current.IsolationLevel.ToString()
);
}
#endregion
}
}


Step Two: Modifying NCommon.LinqToSql to support Ambient Transactions

An important thing to note is that, when used directly, L2S supports System.Transactions infrastructure out of the box. How ever, if you already worked with such support, you also need to know that, while L2S's DataContext will care about ambient/implicit transactions during calls to SubmitChanges() method.
L2S operated under an NCommon's UnitOfWork/Repository infrastructure will try to find existing implicit transactions during UnitofWorkScope creation.

This means that you cannot use the same UoWS under two unrelated implicit/ambient transactions by calling IRepository.Save() once under one TransactionScope, and then under a new TS. This simply, wont work, as UoWS will look for an implicit transaction at UoWS creation time.

Having said that, here are the modifications I made to LinqToSqlUnitOfWork class:


#region license
//Copyright 2008 Ritesh Rao

//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at

//http://www.apache.org/licenses/LICENSE-2.0

//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
#endregion

using System;
using System.Data;
using System.Data.Common;
using System.Data.Linq;

namespace NCommon.Data.LinqToSql
{
/// <summary>
/// Implements the <see cref="IUnitOfWork"/> interface to provide an implementation
/// of a IUnitOfWork that uses NHibernate to query and update the underlying store.
/// </summary>
public class LinqToSqlUnitOfWork : IUnitOfWork
{
#region fields
private bool _disposed;
private bool _transactionAborted;
private ILinqSession _linqContext;
private ITransaction _transaction;
#endregion

#region ctor
/// <summary>
/// Default Constructor.
/// Creates a new instance of the <see cref="LinqToSqlUnitOfWork"/> class that uses the specified data context.
/// </summary>
/// <param name="context">The <see cref="DataContext"/> instance that the LinqToSqlUnitOfWork instance uses.</param>
public LinqToSqlUnitOfWork(ILinqSession context)
{
Guard.Against<ArgumentNullException>(context == null, "Expected a non-nul DataContext instance");
_linqContext = context;
}
#endregion

#region properties
/// <summary>
/// Gets the <see cref="DataContext"/> that the LinqToSqlUnitOfWork instance wraps.
/// </summary>
public DataContext Context
{
get { return _linqContext.Context;}
}
#endregion

#region Implementation of IDisposable
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Disposes off manages resources used by the LinqToSqlUnitOfWork instance.
/// </summary>
/// <param name="disposing"></param>
private void Dispose(bool disposing)
{
if (!disposing) return;
if (_disposed) return;

if (_transaction != null)
{
_transaction.Dispose();
_transaction = null;
}
if (_linqContext != null)
{
_linqContext.Dispose();
_linqContext = null;
}
_disposed = true;
}
#endregion

#region Implementation of IUnitOfWork
/// <summary>
/// Gets a boolean value indicating whether the current unit of work is running under
/// a transaction.
/// </summary>
public bool IsInTransaction
{
get { return _transaction != null; }
}

/// <summary>
/// Instructs the <see cref="IUnitOfWork"/> instance to begin a new transaction.
/// </summary>
/// <returns></returns>
public ITransaction BeginTransaction()
{
return BeginTransaction(UnitOfWorkScope.DefaultIsolationLevel);
}

/// <summary>
/// Instructs the <see cref="IUnitOfWork"/> instance to begin a new transaction
/// with the specified isolation level.
/// </summary>
/// <param name="isolationLevel">One of the values of <see cref="IsolationLevel"/>
/// that specifies the isolation level of the transaction.</param>
/// <returns></returns>
public ITransaction BeginTransaction(IsolationLevel isolationLevel)
{
Guard.Against<InvalidOperationException>(_transaction != null,
"Cannot begin a new transaction while an existing transaction is still running. " +
"Please commit or rollback the existing transaction before starting a new one.");

if (System.Transactions.Transaction.Current == null)
{
if (_linqContext.Connection.State != ConnectionState.Open)
_linqContext.Connection.Open();

IDbTransaction transaction = _linqContext.Connection.BeginTransaction(isolationLevel);
_linqContext.Transaction = transaction;
_transaction = new LinqToSqlTransaction(transaction);
_transaction.TransactonComitted += TransactionCommitted;
_transaction.TransactionRolledback += TransactionRolledback;
}
else
{
// Convert System.Data.IsolationLevel to System.Transactions.IsolationLevel
var isolation = (System.Transactions.IsolationLevel)Enum.Parse(
typeof(System.Transactions.IsolationLevel), isolationLevel.ToString()
);
_transaction = new AmbientTransaction(isolation);
_transaction.TransactonComitted += TransactionCommitted;
_transaction.TransactionRolledback += TransactionRolledback;
}
return _transaction;
}

/// <summary>
/// Flushes the changes made in the unit of work to the data store.
/// </summary>
public void Flush()
{
Guard.Against<System.Transactions.TransactionAbortedException>(
_transactionAborted == true,
"Current transaction is in aborted state"
);

_linqContext.SubmitChanges();
}

/// <summary>
/// Flushes the changes made in the unit of work to the data store
/// within a transaction.
/// </summary>
public void TransactionalFlush()
{
TransactionalFlush(UnitOfWorkScope.DefaultIsolationLevel);
}

/// <summary>
/// Flushes the changes made in the unit of work to the data store
/// within a transaction with the specified isolation level.
/// </summary>
/// <param name="isolationLevel"></param>
public void TransactionalFlush(IsolationLevel isolationLevel)
{
// Start a transaction if one isn't already running.
if (!IsInTransaction)
BeginTransaction(isolationLevel);

try
{
_linqContext.SubmitChanges();
_transaction.Commit();
}
catch
{
_transaction.Rollback();
throw;
}
}
#endregion

#region methods
/// <summary>
/// Handles the <see cref="ITransaction.TransactionRolledback"/> event.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void TransactionRolledback(object sender, EventArgs e)
{
Guard.IsEqual<InvalidOperationException>(sender, _transaction,
"Expected the sender of TransactionRolledback event to be the transaction that was created by the LinqToSqlUnitOfWork instance.");
_transactionAborted = true;
ReleaseCurrentTransaction();
}

/// <summary>
/// Handles the <see cref="ITransaction.TransactonComitted"/> event.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void TransactionCommitted(object sender, EventArgs e)
{
Guard.IsEqual<InvalidOperationException>(sender, _transaction,
"Expected the sender of TransactionComitted event to be the transaction that was created by the LintToSqlUnitOfWork instance.");
ReleaseCurrentTransaction();
}

/// <summary>
/// Releases the current transaction in the <see cref="LinqToSqlUnitOfWork"/> instance.
/// </summary>
private void ReleaseCurrentTransaction()
{
if (_transaction != null)
{
_transaction.TransactonComitted -= TransactionCommitted;
_transaction.TransactionRolledback -= TransactionRolledback;
_transaction.Dispose();
}
_transaction = null;

//Closing the connection once the transaction has completed.
if (_linqContext.Connection.State == ConnectionState.Open)
_linqContext.Connection.Close();
}
#endregion
}
}


This way, each time a new transaction is demanded by calling BeginTransaction(), we will check if an implicit one is available at System.Transactions.Transaction.Current, and if so we will create a new AmbientTransaction class.

Step Three: Being able to use Ambient Transaction's IsolationLevel

Once I made this changes I found them to be not enough, because NCommon defines ReadCommited as the default IsolationLevel of newly created transactions, while System.Transactions uses a different default isolation level.

This was causing my code to fail as, our remote transaction's isolation level did not match NCommon's default one. An easy, but boring, solution was to instantiate NCommon's UnitOfWorkScope passing a different isolation level as constructor parameters, but I did not find my self confident with this solution, so I diced to implement a new SetIsolationLevelProvider method at UoWS so NCommon could call my application's code to know in each situation which isolation level should be used.


...

using System;
using System.Collections.Generic;
using System.Data;
using NCommon.Storage;

namespace NCommon.Data
{
/// <summary>
/// Helper class that allows starting and using a unit of work like:
/// <![CDATA[
/// using (UnitOfWorkScope scope = new UnitOfWorkScope()) {
/// //Do some stuff here.
/// scope.Commit();
/// }
///
/// ]]>
/// </summary>
public class UnitOfWorkScope : IDisposable
{
#region fields
private static readonly string UnitOfWorkScopeStackKey = typeof (UnitOfWorkScope).FullName +
".RunningScopeStack";
private UnitOfWorkScopeTransaction _currentTransaction;
private bool _disposed;
private static Func<IsolationLevel> _isolationLevelProvider = () => IsolationLevel.ReadCommitted;
private static readonly object _isolationLevelProviderLock = new object();
private bool _autocomplete = false;
#endregion

#region ctor

/// <summary>
/// Default Constuctor.
/// Creates a new <see cref="UnitOfWorkScope"/> with the <see cref="IsolationLevel.Serializable"/>
/// transaction isolation level.
/// </summary>
public UnitOfWorkScope() : this(DefaultIsolationLevel, DefaultOptions)
{
}

/// <summary>
/// Overloaded Constructor.
/// Creates a new instance of <see cref="UnitOfWorkScope"/> with the specified transaction
/// isolation level.
/// </summary>
/// <param name="isolationLevel">One of the values of <see cref="IsolationLevel"/> that specifies
/// the transation isolation level the scope should use.</param>
public UnitOfWorkScope(IsolationLevel isolationLevel)
: this(isolationLevel, DefaultOptions)
{
}

public UnitOfWorkScope(UnitOfWorkScopeTransactionOptions transactionOptions)
: this(DefaultIsolationLevel, transactionOptions)
{
}

/// <summary>
/// Overloaded Constructor.
/// Creates a new instance of <see cref="UnitOfWorkScope"/> with the specified transaction isolation level, option connection and
/// a transaction option that specifies if an existing transaction should be used or to create a new transaction.
/// </summary>
/// <param name="isolationLevel"></param>
/// <param name="transactionOptions"></param>
public UnitOfWorkScope(IsolationLevel isolationLevel, UnitOfWorkScopeTransactionOptions transactionOptions)
{
_disposed = false;

if ((transactionOptions & UnitOfWorkScopeTransactionOptions.AutoComplete) != 0)
_autocomplete = true;

_currentTransaction = UnitOfWorkScopeTransaction.GetTransactionForScope(this, isolationLevel,
transactionOptions);
RegisterScope(this);
}

#endregion

#region properties
/// <summary>
/// Checks if the current thread or request has a <see cref="UnitOfWorkScope"/> instance started.
/// </summary>
/// <value>True if a <see cref="UnitOfWorkScope"/> instance has started and is present.</value>
public static bool HasStarted
{
get
{
if (!Store.Local.Contains(UnitOfWorkScopeStackKey))
return false;
return RunningScopes.Count > 0;
}
}

/// <summary>
/// Gets the current <see cref="UnitOfWorkScope"/> instance for the current thread or request.
/// </summary>
/// <value>The current and most recent <see cref="UnitOfWorkScope"/> instance started for the current thread or request.
/// If none started, then a null reference is returned.</value>
public static UnitOfWorkScope Current
{
get
{
if (RunningScopes.Count == 0)
return null;
return RunningScopes.Peek();
}
}

/// <summary>
/// Gets a <see cref="Stack{TEntity}"/> of <see cref="UnitOfWorkScope"/> that is used to store and retrieve
/// running scope instances.
/// </summary>
private static Stack<UnitOfWorkScope> RunningScopes
{
get
{
//Note: No locking is required since the stack is stored either on the current thread or on the current request.
if (!Store.Local.Contains(UnitOfWorkScopeStackKey))
Store.Local.Set(UnitOfWorkScopeStackKey, new Stack<UnitOfWorkScope>());
return Store.Local.Get<Stack<UnitOfWorkScope>>(UnitOfWorkScopeStackKey);
}
}

/// <summary>
/// Gets the <see cref="UnitOfWorkScope"/> instance used by the <see cref="IUnitOfWork"/> instance.
/// </summary>
public IUnitOfWork UnitOfWork
{
get
{
return _currentTransaction.UnitOfWork;
}
}

/// <summary>
/// Gets the default isolation level.
/// </summary>
/// <value>The default isolation level.</value>
public static IsolationLevel DefaultIsolationLevel {
get {
lock (_isolationLevelProviderLock)
return _isolationLevelProvider();
}
}

/// <summary>
/// Gets the default unit of work scope transaction options.
/// </summary>
/// <value>The default unit of work scope transaction options.</value>
// TODO: Make this one provided by an external delegate as with DefaultIsolationLevel (pruiz)
public static UnitOfWorkScopeTransactionOptions DefaultOptions
{
get { return UnitOfWorkScopeTransactionOptions.UseCompatible; }
}
#endregion

#region methods

/// <summary>
/// Disposes off the <see cref="UnitOfWorkScope"/> insance.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Disposes off the managed and un-managed resources used.
/// </summary>
/// <param name="disposing"></param>
private void Dispose(bool disposing)
{
if (!disposing) return;
if (_disposed) return;

if (_currentTransaction != null)
{
if (_autocomplete == true)
_currentTransaction.Commit(this);
else
_currentTransaction.Rollback(this);

_currentTransaction = null;
}
UnRegisterScope(this);
_disposed = true;
}

/// <summary>
/// Registers a scope as the top level scope on the <see cref="RunningScopes"/> stack.
/// </summary>
/// <param name="scope">The <see cref="UnitOfWorkScope"/> instance to set as the top level scope on the stack.</param>
private static void RegisterScope(UnitOfWorkScope scope)
{
Guard.Against<ArgumentNullException>(scope == null,
"Cannot register a null UnitOfWorkScope instance as the top level scope.");
Data.UnitOfWork.Current = scope.UnitOfWork;
//Setting the UnitOfWork isntance held by the scope as the current scope.
RunningScopes.Push(scope);
}

/// <summary>
/// UnRegisters a <see cref="UnitOfWorkScope"/> as the top level scope on the stack.
/// </summary>
/// <param name="scope"></param>
private static void UnRegisterScope(UnitOfWorkScope scope)
{
Guard.Against<ArgumentNullException>(scope == null,
"Cannot Un-Register a null UnitOfWorkScope instance as the top level scope.");
Guard.Against<InvalidOperationException>(RunningScopes.Peek() != scope,
"The UnitOfWorkScope provided does not match the current top level scope. Cannot un-register the specified scope.");
RunningScopes.Pop();

if (RunningScopes.Count > 0)
{
//If the Stack has additional scopes, set the current unit of work to the UnitOfWork instance held by the top most scope.
UnitOfWorkScope currentScope = RunningScopes.Peek();
Data.UnitOfWork.Current = currentScope.UnitOfWork;
}
else
Data.UnitOfWork.Current = null;
}

///<summary>
/// Commits the current running transaction in the scope.
///</summary>
public void Commit()
{
Guard.Against<ObjectDisposedException>(_disposed, "Cannot commit a disposed UnitOfWorkScope instance.");
_currentTransaction.Commit(this);
_currentTransaction = null;
}

/// <summary>
/// Allows setting a default isolation level provider.
/// </summary>
/// <param name="provider">The provider.</param>
/// <returns>The current isolation level provider.</returns>
public static Func<IsolationLevel> SetIsolationLevelProvider(Func<IsolationLevel> provider)
{
lock (_isolationLevelProviderLock)
{
var ret = _isolationLevelProvider;
_isolationLevelProvider = provider;
return ret;
}
}
#endregion
}
}


Step Four: Adding unit tests

In order to tests this new Ambient Transaction feature we need to add/modify some unit tests. Make the following modifications to LinqToSqlRepositoryTests class.

We will start creating a few helper methods:


#region Helper Methods
private Customer CreateCustomer()
{
Random rnd = new Random();
var newCustomer = new Customer
{
FirstName = ("John_" + rnd.Next(30001, 50000)),
LastName = ("Doe_" + rnd.Next(30001, 50000)),
StreetAddress1 = "This record was inserted via a test",
City = "Fictional city",
State = "LA",
ZipCode = "12345"
};

return newCustomer;
}
private Func<LinqToSqlRepository<Customer>, Customer> GetQueryForCustomer(Customer newCustomer)
{
//Re-usable query to query for the matching record.
var queryForCustomer = new Func<LinqToSqlRepository<Customer>, Customer>
(
x => (from cust in x
where cust.FirstName == newCustomer.FirstName && cust.LastName == newCustomer.LastName
select cust).FirstOrDefault()
);

return queryForCustomer;
}
#endregion


And now, let's add a few test which tries to add a new customer within a transactionScope.


[Test]
public void Save_New_Customer_Saves_Customer_When_UnitOfWork_Is_Committed_Within_Ambient_Transaction()
{
Random rnd = new Random();
var newCustomer = new Customer
{
FirstName = ("John_" + rnd.Next(0, 30000)),
LastName = ("Doe_" + rnd.Next(0, 30000)),
StreetAddress1 = "This record was inserted via a test",
City = "Fictional city",
State = "LA",
ZipCode = "12345"
};

// Set default isolation level provider.. (pruiz)
UnitOfWorkScope.SetIsolationLevelProvider(() =>
{ return AmbientTransaction.GetCurrentIsolationLevel(IsolationLevel.ReadCommitted); }
);

var queryForCustomer = new Func<LinqToSqlRepository<Customer>, Customer>
(
x => (from cust in x
where cust.FirstName == newCustomer.FirstName && cust.LastName == newCustomer.LastName
select cust).FirstOrDefault()
);

using (var tr = new System.Transactions.TransactionScope(System.Transactions.TransactionScopeOption.RequiresNew))
{
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = queryForCustomer(customerRepository);
Assert.That(recordCheckResult, Is.Null);

customerRepository.Add(newCustomer);
scope.Commit();
}
tr.Complete();
}

//Starting a completely new unit of work and repository to check for existance.
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = queryForCustomer(customerRepository);
Assert.That(recordCheckResult, Is.Not.Null);
Assert.That(recordCheckResult.FirstName, Is.EqualTo(newCustomer.FirstName));
Assert.That(recordCheckResult.LastName, Is.EqualTo(newCustomer.LastName));
scope.Commit();
}
}

[Test]
public void Save_Does_Not_Save_New_Customer_When_UnitOfWork_Is_Aborted_Within_Ambient_Transaction()
{
Random rnd = new Random();
var newCustomer = new Customer
{
FirstName = ("John_" + rnd.Next(30001, 50000)),
LastName = ("Doe_" + rnd.Next(30001, 50000)),
StreetAddress1 = "This record was inserted via a test",
City = "Fictional city",
State = "LA",
ZipCode = "12345"
};

// Set default isolation level provider.. (pruiz)
UnitOfWorkScope.SetIsolationLevelProvider(() =>
{ return AmbientTransaction.GetCurrentIsolationLevel(IsolationLevel.ReadCommitted); }
);

using (var tr = new System.Transactions.TransactionScope(System.Transactions.TransactionScopeOption.RequiresNew))
{
using (new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);

customerRepository.Add(newCustomer);
//DO NOT CALL COMMIT TO SIMMULATE A ROLLBACK.
}
}

//Starting a completely new unit of work and repository to check for existance.
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);
scope.Commit();
}
}

[Test]
public void Save_Does_Not_Save_New_Customer_When_UnitOfWork_Is_Aborted_But_Ambient_Transaction_Is_Commited()
{
Random rnd = new Random();
var newCustomer = new Customer
{
FirstName = ("John_" + rnd.Next(30001, 50000)),
LastName = ("Doe_" + rnd.Next(30001, 50000)),
StreetAddress1 = "This record was inserted via a test",
City = "Fictional city",
State = "LA",
ZipCode = "12345"
};

// Set default isolation level provider.. (pruiz)
UnitOfWorkScope.SetIsolationLevelProvider(() =>
{ return AmbientTransaction.GetCurrentIsolationLevel(IsolationLevel.ReadCommitted); }
);

using (var tr = new System.Transactions.TransactionScope(System.Transactions.TransactionScopeOption.RequiresNew))
{
using (new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);

customerRepository.Add(newCustomer);
//DO NOT CALL COMMIT TO SIMMULATE A ROLLBACK.
}
}

//Starting a completely new unit of work and repository to check for existance.
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);
scope.Commit();
}
}

[Test]
public void Save_Does_Not_Save_New_Customer_When_UnitOfWork_Is_Commited_But_Ambient_Transaction_Is_Rolledback()
{
Random rnd = new Random();
var newCustomer = new Customer
{
FirstName = ("John_" + rnd.Next(30001, 50000)),
LastName = ("Doe_" + rnd.Next(30001, 50000)),
StreetAddress1 = "This record was inserted via a test",
City = "Fictional city",
State = "LA",
ZipCode = "12345"
};

// Set default isolation level provider.. (pruiz)
UnitOfWorkScope.SetIsolationLevelProvider(() =>
{ return AmbientTransaction.GetCurrentIsolationLevel(IsolationLevel.ReadCommitted); }
);

Assert.Throws<System.Transactions.TransactionAbortedException>(() =>
{
using (var tr = new System.Transactions.TransactionScope(System.Transactions.TransactionScopeOption.RequiresNew))
{
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);

customerRepository.Add(newCustomer);
System.Transactions.Transaction.Current.Rollback(); // WE CALL THIS BEFORE SCOPE ON PURPOSE!! (pruiz)
scope.Commit();
}
}
});

//Starting a completely new unit of work and repository to check for existance.
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);
scope.Commit();
}
}

[Test]
public void Save_Does_Not_Save_New_Customer_When_UnitOfWork_Is_Commited_But_Ambient_Transaction_Is_Disposed()
{
var newCustomer = CreateCustomer();

// Set default isolation level provider.. (pruiz)
UnitOfWorkScope.SetIsolationLevelProvider(() =>
{ return AmbientTransaction.GetCurrentIsolationLevel(IsolationLevel.ReadCommitted); }
);

Assert.Throws<ObjectDisposedException>(() =>
{
using (var tr = new System.Transactions.TransactionScope(System.Transactions.TransactionScopeOption.RequiresNew))
{
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);

customerRepository.Add(newCustomer);
tr.Dispose(); // WE CALL THIS BEFORE SCOPE ON PURPOSE!! (pruiz)
scope.Commit();
}
}
});

//Starting a completely new unit of work and repository to check for existance.
using (var scope = new UnitOfWorkScope())
{
var customerRepository = new LinqToSqlRepository<Customer>();
var recordCheckResult = (from cust in customerRepository
where cust.FirstName == newCustomer.FirstName &&
cust.LastName == newCustomer.LastName
select cust).FirstOrDefault();
Assert.That(recordCheckResult, Is.Null);
scope.Commit();
}
}

[Test]
public void Not_Disposing_a_UoW_Scope_whithin_an_Ambient_Transaction_Causes_Transaction_Abort()
{
var newCustomer = CreateCustomer();
var queryForCustomer = GetQueryForCustomer(newCustomer);

// Set default isolation level provider.. (pruiz)
UnitOfWorkScope.SetIsolationLevelProvider(() =>
{ return AmbientTransaction.GetCurrentIsolationLevel(IsolationLevel.ReadCommitted); }
);

using (var ts = new System.Transactions.TransactionScope())
{
using (var scope = new UnitOfWorkScope())
{
// NOT CALLING scope.Commit() ON PURPOSE
}

Assert.Throws<System.Transactions.TransactionAbortedException>(() =>
{
using (var scope = new UnitOfWorkScope())
{
scope.Commit(); // This one is supposed to fail.
}
}
);
}
}




Step Five: Fixing bugs..

Ok, if you tried those tests created at the last step, you will find that some of them are not passing ok. This is mostly because NCommon is failing to handle properly aborted transactions, letting it's internal UoW stack corrupted.

Let's fix this bugs to make our tests work!.. To do so, we need to hack UnitOfWorkScopeTransaction class a bit:


...

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using Microsoft.Practices.ServiceLocation;
using NCommon.Storage;

namespace NCommon.Data
{
/// <summary>
/// The <see cref="UnitOfWorkScopeTransaction"/> identifies a unique transaciton that can
/// be shared by multiple <see cref="UnitOfWorkScope"/> instances.
/// </summary>
public class UnitOfWorkScopeTransaction : IDisposable
{
#region fields

private readonly Stack<UnitOfWorkScope> _attachedScopes;
private readonly IsolationLevel _isolationLevel;
private readonly ITransaction _runningTransaction;
private readonly Guid _transactionID;
private readonly IUnitOfWork _unitOfWork;
private bool _disposed;
private bool _transactionRolledback;
private bool _transactionCommited;
private bool _commitFailed;

#endregion

#region ctor

/// <summary>
/// Overloaded Constructor.
/// Creates a new instance of the <see cref="UnitOfWorkScopeTransaction"/> that takes in a
/// <see cref="IUnitOfWorkFactory"/> instance that is responsible for creating instances of <see cref="IUnitOfWork"/> and
/// a <see cref="IDbConnection"/> that is used by the instance to connect to the data store.
/// </summary>
/// <param name="unitOfWorkFactory">The <see cref="IUnitOfWorkFactory"/> implementation that is responsible
/// for creating instances of <see cref="IUnitOfWork"/> instances.</param>
/// <param name="isolationLevel">One of the values of <see cref="IsolationLevel"/> that specifies the transaction
/// isolation level of the <see cref="UnitOfWorkScopeTransaction"/> instance.</param>
public UnitOfWorkScopeTransaction(IUnitOfWorkFactory unitOfWorkFactory, IsolationLevel isolationLevel)
{
Guard.Against<ArgumentNullException>(unitOfWorkFactory == null,
"A valid non-null instance that implements the IUnitOfWorkFactory is required.");
_transactionID = Guid.NewGuid();
_transactionRolledback = false;
_transactionCommited = false;
_commitFailed = false;
_disposed = false;
_unitOfWork = unitOfWorkFactory.Create();
_runningTransaction = _unitOfWork.BeginTransaction(isolationLevel);
_isolationLevel = isolationLevel;
_attachedScopes = new Stack<UnitOfWorkScope>();
}

#endregion

#region properties

/// <summary>
/// Gets a <see cref="Guid"/> that uniqely identifies the transaction.
/// </summary>
/// <value>A <see cref="Guid"/> that uniquely identifies the transaction.</value>
public Guid TransactionID
{
get { return _transactionID; }
}

/// <summary>
/// Gets the <see cref="IsolationLevel"/> of the <see cref="UnitOfWorkScopeTransaction"/> instance.
/// </summary>
/// <value>One of the values of <see cref="IsolationLevel"/>.</value>
public IsolationLevel IsolationLevel
{
get { return _isolationLevel; }
}

/// <summary>
/// Gets the <see cref="IUnitOfWork"/> instance of the <see cref="UnitOfWorkScopeTransaction"/> instance.
/// </summary>
public IUnitOfWork UnitOfWork
{
get { return _unitOfWork; }
}

/// <summary>
/// Gets a <see cref="IList{TEntity}"/> containing instances of <see cref="UnitOfWorkScopeTransaction"/> currently
/// started for the current request / thread.
/// </summary>
private static IList<UnitOfWorkScopeTransaction> CurrentTransactions
{
get
{
string key = typeof (UnitOfWorkScopeTransaction).FullName;
if (!Store.Local.Contains(key))
Store.Local.Set<IList<UnitOfWorkScopeTransaction>>(key,
new List
<
UnitOfWorkScopeTransaction
>());
return Store.Local.Get<IList<UnitOfWorkScopeTransaction>>(key);
}
}

#endregion

#region methods

/// <summary>
/// Gets a <see cref="UnitOfWorkScopeTransaction"/> instance that can be used by a <see cref="UnitOfWorkScope"/> instance.
/// </summary>
/// <param name="scope">The <see cref="UnitOfWorkScope"/> instance that is requesting the transaction.</param>
/// <param name="isolationLevel">One of the values of <see cref="IsolationLevel"/> that specifies the transaction isolation level.</param>
/// <returns>A <see cref="UnitOfWorkScopeTransaction"/> instance.</returns>
public static UnitOfWorkScopeTransaction GetTransactionForScope(UnitOfWorkScope scope,
IsolationLevel isolationLevel)
{
return GetTransactionForScope(scope, isolationLevel, UnitOfWorkScopeTransactionOptions.UseCompatible);
}

/// <summary>
/// Gets a <see cref="UnitOfWorkScopeTransaction"/> instance that can be used by a <see cref="UnitOfWorkScope"/> instance.
/// </summary>
/// <param name="scope">The <see cref="UnitOfWorkScope"/> instance that is requesting the transaction.</param>
/// <param name="isolationLevel">One of the values of <see cref="IsolationLevel"/> that specifies the transaction isolation level.</param>
/// <param name="options">One of the values of <see cref="UnitOfWorkScopeTransactionOptions"/> that specifies options for using existing
/// transacitons or creating new ones.</param>
/// <returns>A <see cref="UnitOfWorkScopeTransaction"/> instance.</returns>
public static UnitOfWorkScopeTransaction GetTransactionForScope(UnitOfWorkScope scope,
IsolationLevel isolationLevel,
UnitOfWorkScopeTransactionOptions options)
{
if (options == UnitOfWorkScopeTransactionOptions.UseCompatible)
{
UnitOfWorkScopeTransaction transaction = (from t in CurrentTransactions
where t.IsolationLevel == isolationLevel
select t).FirstOrDefault();
if (transaction != null)
{
transaction.AttachScope(scope);
return transaction;
}
}

var factory = ServiceLocator.Current.GetInstance<IUnitOfWorkFactory>();
var newTransaction = new UnitOfWorkScopeTransaction(factory, isolationLevel);
newTransaction.AttachScope(scope);
CurrentTransactions.Add(newTransaction);
return newTransaction;
}

/// <summary>
/// Attaches a <see cref="UnitOfWorkScope"/> instance to the <see cref="UnitOfWorkScopeTransaction"/> instance.
/// </summary>
/// <param name="scope"></param>
private void AttachScope(UnitOfWorkScope scope)
{
Guard.Against<ObjectDisposedException>(_disposed,
"Transaction has been disposed. Cannot attach a scope to a disposed transaction.");
Guard.Against<ArgumentNullException>(scope == null,
"Cannot attach a null UnitOfWorkScope instance to the UnitOfWorkScopeTransaction instance.");
_attachedScopes.Push(scope); //Push the scope on to the top of the stack.
}

/// <summary>
/// Causes a comit operation on the <see cref="UnitOfWorkScopeTransaction"/> instance.
/// </summary>
/// <param name="scope">The <see cref="UnitOfWorkScope"/> instance that is calling the commit.</param>
/// <remarks>
/// This method can only by called by the scope currently on top of the stack. If Called by another scope then an
/// <see cref="InvalidOperationException"/> is called. If the calling scope is last in the attached scope hierarchy,
/// then a commit is called on the underling unit of work instance.
/// </remarks>
public void Commit(UnitOfWorkScope scope)
{
Guard.Against<ObjectDisposedException>(_disposed,
"Transaction has been disposed. Cannot commit a disposed transaction.");
Guard.Against<InvalidOperationException>(_transactionRolledback,
"Cannot call commit on a rolledback transaction. A child scope or current scope has already rolled back the transaction. Call Rollback()");
Guard.Against<ArgumentNullException>(scope == null,
"Cannot commit the transaction for a null UnitOfWorkScope instance.");
Guard.Against<InvalidOperationException>(_attachedScopes.Peek() != scope,
"Commit can only be called by the current UnitOfWorkScope instance. The UnitOfWorkScope provided does not match the current scope on the stack.");
Guard.Against<InvalidOperationException>(_transactionCommited,
"Cannot commit an already commited transaction.");

//TODO: Fix wording of exception.
var currentScope = _attachedScopes.Pop();
if (_attachedScopes.Count == 0)
{
//The calling UnitOfWorkScope is the root of the transaction.
try
{
_unitOfWork.Flush();
_runningTransaction.Commit();

// XXX: If I move this calls into finnaly, some tests will break.
// Looks like transaction is not expected to be removed
// untill rollback es called. (pruiz)
CurrentTransactions.Remove(this);
_runningTransaction.Dispose();
_unitOfWork.Dispose();
}
catch
{
_commitFailed = true;
_attachedScopes.Push(currentScope);
throw;
}
finally
{
_transactionCommited = true;
}
}
}

/// <summary>
/// Causes a Rollback operation on the <see cref="UnitOfWorkScopeTransaction"/> instance.
/// </summary>
/// <param name="scope">The <see cref="UnitOfWorkScope"/> instance that is calling the commit.</param>
/// <remarks>
/// This method can only be called by the scope currently on top of the stack. If called by another scope than the
/// current <see cref="UnitOfWorkScope"/> instance, then a <see cref="InvalidOperationException"/> is thrown. If the
/// calling scope is the last in the attached scope hierarchy, then a rollback is called on the underlying UnitOfWork
/// instance.
/// </remarks>
public void Rollback(UnitOfWorkScope scope)
{
Guard.Against<ObjectDisposedException>(_disposed,
"Transaction has been disposed. Cannot rollback a disposed transaction.");
Guard.Against<ArgumentNullException>(scope == null,
"Cannot rollback the transaction for a null UnitOfWork instance.");
Guard.Against<InvalidOperationException>(_attachedScopes.Peek() != scope,
"Rollback can only be called by the current UnitOfWorkScope instance. The UnitOfWorkScope provided does not match the current scope on the stack.");

if (_transactionRolledback)
Console.Error.WriteLine("Warning? Rolling back transaction {0} twice (commitFailed: {1})", this.GetHashCode(), _commitFailed);

if (_transactionCommited)
Console.Error.WriteLine("Warning? Rolling back an already commited transaction {0} (commitFailed: {1})", this.GetHashCode(), _commitFailed);

//TODO: Fix wording of exception.

var _currentScope = _attachedScopes.Pop();
_transactionRolledback = true;
if (_attachedScopes.Count == 0)
{
//The calling UnitOfWorkScope is the root of the transaction.
try
{
if (!_commitFailed)
_runningTransaction.Rollback();
}
catch
{
_attachedScopes.Push(_currentScope);
throw;
}
finally
{
CurrentTransactions.Remove(this);
_runningTransaction.Dispose();
_unitOfWork.Dispose();
}
}
}

#endregion

#region Implementation of IDisposable

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
GC.SuppressFinalize(this);
}
}

#endregion
}
}


NCommon manages in an internal stack object the list of nested UoW scopes that have been created in the current callstack. But, the problem was that it was not handling exceptions thrown during rollback or commit operations, corrupting it's internal UoW stack. What we have done is modifing the code to correctly handle such exceptions. So, now NCommon's and NCommon.Linq2Sql unit tests should run fine.

Ok, so that's it, this are the steps necessary to add Ambient Transaction support to NCommon. As usual you can access this code my svn repository located at https://netway.org/svn/NCommon.

Greets.

1 comment:

  1. can you provide a download link? https://netway.org/svn/NCommon requires username and password. Thanks

    ReplyDelete