Stateful Parallel Property Testing Example in Elixir

tags: elixir ; stateful property based testing ; concurrency
Posted on 2022-08-15

I think the first time I heard about stateful property testing was in this excellent blog post1. I found the idea very interesting, but only managed to apply the idea (very poorly) once. Recently, I saw two great talks about stateful property based testing2 3, and was fascinated by QuickCheck’s ability to test parallelism errors.

I tried to search for some toy examples of such stateful parallel testing, but had some difficulty finding one4, as most examples use sequential execution. Also, I find the callback descriptions and typespecs from PropEr a bit confusing as to what I should do on each one. The introduction of the module does describe them, but I got confused when trying to look at the typespecs. So, for my own reference, I tried to simply reproduce Hughes’ ticket dispenser example in Elixir, using PropCheck, which is an Elixir wrapper around PropEr.

The ticket dispenser has two operations: take_ticket and reset. The first version needs to have a concurrency bug, obviously:

defmodule TicketDispenserTest do
  use ExUnit.Case, async: true
  use PropCheck, default_opt: &PropCheck.TestHelpers.config/0
  use PropCheck.StateM

  @issue_ticket {:call, __MODULE__, :take_ticket, []}
  @reset {:call, __MODULE__, :reset, []}

  property "issues unique tickets", [:verbose, numtests: 1_000] do
    forall cmds <- parallel_commands(__MODULE__) do
      {:ok, pid} = Agent.start_link(fn -> 0 end)
      Process.register(pid, Counter)
      {seq_history, par_history, result} = run_parallel_commands(__MODULE__, cmds)
      Agent.stop(pid)

      (result == :ok)
      |> aggregate(command_names(cmds))
      |> when_fail(
        (
          PropCheck.StateM.Reporter.print_report({seq_history, par_history, result}, cmds)
          IO.puts("Result: #{inspect(result)}")
        )
      )
    end
  end

  def take_ticket() do
    x = Agent.get(Counter, & &1)
    Agent.update(Counter, fn _ -> x + 1 end)
    x
  end

  def reset() do
    Agent.update(Counter, fn _ -> 0 end)
    :ok
  end

  def initial_state(), do: 0
  def command(_state), do: oneof([@issue_ticket, @reset])
  def next_state(state, _res, @issue_ticket), do: state + 1
  def next_state(_state, _res, @reset), do: 0
  def postcondition(state, @issue_ticket, res), do: state == res
  def postcondition(_state, @reset, _res), do: true
  def precondition(_state, @issue_ticket), do: true
  def precondition(_state, @reset), do: true
end

The command/1 callback must return a generator for the commands you want to define, which in turn is a symbolic call of the form {:call, module :: atom(), fun :: atom(), [term()]} (not what the command() typespec suggests it would be). Another detail to have in mind (which is described the proper_statem introduction) is that the postcondition/3 callback takes in the state prior to command execution.

Instead of using Agent.get_and_update/1 to atomically update the state while issuing a new ticket, we do it in separate steps. That way, we have an unprotected critical section to see how PropCheck/PropEr behaves.

Indeed, it does manage to quickly find and shrink to a very minimal failing example:

================================================================================
Concurrency Failure, we don't show the state :-/

Sequential commands:
   var1 = TicketDispenserTest.reset()
        # -> :ok
        # Post state: 0


Process 1:
   var2 = TicketDispenserTest.reset()
        # -> :ok
   var3 = TicketDispenserTest.take_ticket()
        # -> 0
   var4 = TicketDispenserTest.reset()
        # -> :ok
   var5 = TicketDispenserTest.reset()
        # -> :ok
   var6 = TicketDispenserTest.reset()
        # -> :ok


Process 2:
   var7 = TicketDispenserTest.take_ticket()
        # -> 0
   var8 = TicketDispenserTest.take_ticket()
        # -> 1
   var9 = TicketDispenserTest.take_ticket()
        # -> 0
   var10 = TicketDispenserTest.take_ticket()
        # -> 1
   var11 = TicketDispenserTest.take_ticket()
        # -> 2
   var12 = TicketDispenserTest.take_ticket()
        # -> 3



Result: :no_possible_interleaving

Shrinking .......(7 time(s))
{[],
 [
   [{:set, {:var, 3}, {:call, TicketDispenserTest, :take_ticket, []}}],
   [{:set, {:var, 8}, {:call, TicketDispenserTest, :take_ticket, []}}]
 ]}

================================================================================
Concurrency Failure, we don't show the state :-/

Sequential commands:


Process 1:
   var3 = TicketDispenserTest.take_ticket()
        # -> 0


Process 2:
   var8 = TicketDispenserTest.take_ticket()
        # -> 0

So, the shrunk counter-example shows that we have no sequential prefix (so, no shared sequence of commands before the two processes run concurrently), and they simply each try to take a ticket concurrently. Then it fails with the :no_possible_interleaving error since, according to the model, there’s no way that both processes manage to get the same ticket.

As mentioned before, after the simple fix…

def take_ticket() do
-  x = Agent.get(Counter, & &1)
-  Agent.update(Counter, fn _ -> x + 1 end)
-  x
+  Agent.get_and_update(Counter, & {&1, &1 + 1})
end

… the test passes:

OK: Passed 1000 test(s).

50.06% {TicketDispenserTest, :reset, 0}
49.94% {TicketDispenserTest, :take_ticket, 0}
.

Finished in 1.9 seconds (1.9s async, 0.00s sync)
1 property, 0 failures

One more toy example: the classic bank transaction. Here, I’ll model it with just one transfer command, and the initial state will contain some cash on three predetermined accounts. I’ll put the real implementation in a separate Transaction module (the reason for this will be clear shortly). For the initial obviously buggy version, we’ll use Mnesia dirty reads/writes without any transactions:

defmodule Transaction do
  def transfer(from, to, amount) do
    [{_, _, prev_from}] = :mnesia.dirty_read({:balances, from})
    [{_, _, prev_to}] = :mnesia.dirty_read({:balances, to})
    if prev_from >= amount do
      :mnesia.dirty_write({:balances, from, prev_from - amount})
      :mnesia.dirty_write({:balances, to, prev_to + amount})
      :ok
    else
      :no_funds
    end
  end
end
Test module for Bank Transaction
defmodule BankTransactionTest do
  use ExUnit.Case, async: true
  use PropCheck, default_opt: &PropCheck.TestHelpers.config/0
  use PropCheck.StateM

  setup_all do
    :ok = :mnesia.start()
    :ok
  end

  property "issues unique tickets", [:verbose, numtests: 1_000] do
    {:atomic, :ok} = :mnesia.create_table(:balances, [])

    forall cmds <- parallel_commands(__MODULE__) do
      Enum.each([:a, :b, :c], fn account ->
        :mnesia.dirty_write({:balances, account, initial_balance()})
      end)

      {seq_history, par_history, result} = run_parallel_commands(__MODULE__, cmds)
      tab = :ets.tab2list(:balances)
      :mnesia.clear_table(:balances)

      (result == :ok)
      |> aggregate(command_names(cmds))
      |> when_fail(
        (
          PropCheck.StateM.Reporter.print_report({seq_history, par_history, result}, cmds)
          IO.puts("Result: #{inspect(result)}")
          IO.puts("Accounts: #{inspect(tab, pretty: true)}")
        )
      )
    end
  end

  def initial_state(),
    do: %{
      a: initial_balance(),
      b: initial_balance(),
      c: initial_balance()
    }

  def command(_state),
    do:
      oneof([
        {:call, Transaction, :transfer, [account(), account(), integer(1, 10)]}
      ])

  def next_state(state, res, {:call, Transaction, :transfer, [from, to, amount]}) do
    if Map.fetch!(state, from) >= amount do
      state
      |> Map.update!(from, &(&1 - amount))
      |> Map.update!(to, &(&1 + amount))
    else
      state
    end
  end

  def postcondition(state, {:call, Transaction, :transfer, [from, to, amount]}, res) do
    case res do
      :ok -> Map.fetch!(state, from) >= amount
      :no_funds -> Map.fetch!(state, from) < amount
    end
  end

  def precondition(state, {:call, Transaction, :transfer, [from, to, amount]}) do
    from != to
  end

  defp initial_balance(), do: 3

  defp account(), do: oneof([:a, :b, :c])
end

Here, we set a pre-condition on the transfer so we don’t transfer from an account to itself. That’ll guide the command generation phase.

Let’s see it break!

OK: Passed 1000 test(s).

100.0% {Transaction, :transfer, 3}
.

Finished in 2.7 seconds (2.7s async, 0.00s sync)
1 property, 0 failures

What?! That implementation is obviously broken, yet all tests passed? How come?

The issue is that this concurrency bug depends on special interleavings of the processes, which are rare in the schedulings of most executions, in particular if the system is not under heavy stress.

To increase the chance of such rare interleavings, we have to sprinkle :erlang.yield/0 calls between function calls which might lead to race conditions. Luckily, PropCheck has some facilities to do that for us: we just have to instrument the implementation module with Instrument.instrument_module(Impl, YieldInstrumenter). Unfortunately, Mnesia functions are not among the default functions targeted by Instrumenter. We simply need to make our own instrumenter extending the default one:

Custom Yield Instrumenter
defmodule PropCheck.MyYieldInstrumenter do
  require Logger

  alias PropCheck.Instrument

  @behaviour Instrument

  @impl true
  def handle_function_call(call) do
    Logger.debug("handle_function: #{inspect(call)}")
    Instrument.prepend_call(call, Instrument.call_yield())
  end

  @impl true
  def is_instrumentable_function(mod = {:atom, _meta1, module}, fun = {:atom, _, function}) do
    [
      {:mnesia, :dirty_read},
      {:mnesia, :dirty_write},
      {:mnesia, :read},
      {:mnesia, :wread},
      {:mnesia, :write},
      {:mnesia, :transaction}
    ]
    |> MapSet.new()
    |> MapSet.member?({module, function})
    |> Kernel.||(Instrument.instrumentable_function(mod, fun))
  end
  def is_instrumentable_function(_, _), do: false
end

And then we instrument our module before running the tests. That’s why I put the code in a separate module that’s compiled (not an .exs file):

setup_all do
+ Instrument.instrument_module(Transaction, MyYieldInstrumenter)
  :ok = :mnesia.start()
  :ok
end

Alternatively, we could sprinkle the :erlang.yield() calls ourselves.

Running the instrumented code then yields the expected failure:

!
Failed: After 1 test(s).

# ✀ -- snip -- ✀

Shrinking ...(3 time(s))
{[],
 [
   [{:set, {:var, 2}, {:call, Transaction, :transfer, [:a, :c, 3]}}],
   [{:set, {:var, 4}, {:call, Transaction, :transfer, [:a, :b, 3]}}]
 ]}

================================================================================
Concurrency Failure, we don't show the state :-/

Sequential commands:


Process 1:
   var2 = Transaction.transfer(:a, :c, 3)
        # -> :ok


Process 2:
   var4 = Transaction.transfer(:a, :b, 3)
        # -> :ok



Result: :no_possible_interleaving
Accounts: [{:balances, :c, 6}, {:balances, :b, 6}, {:balances, :a, 0}]

The minimal failing test in this case is simple: two processes try to transfer from the same account concurrently. Wrapping everything in a transaction should fix the problem:

Fixed Bank Transaction module
defmodule Transaction do
  def transfer(from, to, amount) do
    {:atomic, res} = :mnesia.transaction(fn ->
      [{_, _, prev_from}] = :mnesia.wread({:balances, from})
      [{_, _, prev_to}] = :mnesia.wread({:balances, to})
      if prev_from >= amount do
        :mnesia.write({:balances, from, prev_from - amount})
        :mnesia.write({:balances, to, prev_to + amount})
        :ok
      else
        :no_funds
      end
    end)
    res
  end
end
OK: Passed 1000 test(s).

100.0% {Transaction, :transfer, 3}
.

Finished in 11.5 seconds (11.5s async, 0.00s sync)
1 property, 0 failures

Very cool! I hope to be able to apply more of this to my real world problems! 🍻


  1. Reliable Systems Series: Model-Based Testing - Tyler Neely↩︎

  2. Testing the Hard Stuff and Staying Sane - John Hughes↩︎

  3. Introduction to Stateful Property Testing - Tomasz Kowal↩︎

  4. It seems that people use parallel execution so seldomly that I even found a couple bugs when exploring this simple example.↩︎