checkpoint new implementation

This commit is contained in:
Kacper Marzecki 2025-06-18 14:29:40 +02:00
parent 967eb93aa8
commit 0880bb338f

798
new.exs Normal file
View File

@ -0,0 +1,798 @@
defmodule Tdd.TypeSpec do
@moduledoc """
Defines the `TypeSpec` structure and functions for its manipulation.
A `TypeSpec` is a stable, structural, and declarative representation of a type.
It serves as the primary language for defining and interacting with types in the
higher-level system, abstracting away the underlying TDD implementation.
All `TypeSpec`s should be passed through `normalize/1` before being used in
caching or compilation to ensure a canonical representation.
"""
@typedoc "A stable, structural representation of a type."
# --- Core Types ---
@type t ::
:any
| :none
| :atom
| :integer
| :list
| :tuple
# (Add :binary, :function, :pid, etc. here as they are implemented)
# --- Literal Value Type ---
| {:literal, term()}
# --- Set-Theoretic Combinators ---
# Note: The `normalize/1` function guarantees that the lists in
# :union and :intersect are sorted, unique, and flattened.
| {:union, [t()]}
| {:intersect, [t()]}
| {:negation, t()}
# --- Parameterized Structural Types ---
| {:tuple, [t()]}
| {:cons, head :: t(), tail :: t()}
| {:list_of, element :: t()}
# --- Integer Range (Example of property-based type) ---
| {:integer_range, min :: integer() | :neg_inf, max :: integer() | :pos_inf}
# --- For Polymorphism (Future Use) ---
| {:type_var, atom()}
@doc """
Converts a `TypeSpec` into its canonical (normalized) form.
Normalization is crucial for reliable caching and simplifying downstream logic.
It performs several key operations:
1. **Flattens nested unions and intersections:**
`{:union, [A, {:union, [B, C]}]}` becomes `{:union, [A, B, C]}`.
2. **Sorts and uniqs members of unions and intersections:**
`{:union, [C, A, A]}` becomes `{:union, [A, C]}`.
3. **Applies logical simplification rules (idempotency, annihilation):**
- `A | A` -> `A`
- `A | none` -> `A`
- `A & any` -> `A`
- `A & none` -> `none`
- `A | any` -> `any`
- `¬(¬A)` -> `A`
- An intersection containing both `A` and `¬A` simplifies. (This is
better handled by the TDD compiler, but basic checks can happen here).
4. **Recursively normalizes all sub-specs.**
"""
@spec normalize(t()) :: t()
def normalize(spec) do
case spec do
# --- Base cases: already normalized ---
:any ->
:any
:none ->
:none
:atom ->
:atom
:integer ->
:integer
:list ->
:list
:tuple ->
:tuple
{:literal, _} ->
spec
{:type_var, _} ->
spec
# Assume range is already canonical
{:integer_range, _, _} ->
spec
# --- Recursive cases ---
{:negation, sub_spec} ->
normalized_sub = normalize(sub_spec)
# Apply double negation law: ¬(¬A) -> A
case normalized_sub do
{:negation, inner_spec} -> inner_spec
_ -> {:negation, normalized_sub}
end
{:tuple, elements} ->
{:tuple, Enum.map(elements, &normalize/1)}
{:cons, head, tail} ->
{:cons, normalize(head), normalize(tail)}
{:list_of, element} ->
{:list_of, normalize(element)}
# --- Complex cases: Union and Intersection ---
{:union, members} ->
# 1. Recursively normalize, then flatten nested unions.
normalized_and_flattened =
Enum.flat_map(members, fn member ->
normalized = normalize(member)
case normalized do
{:union, sub_members} -> sub_members
_ -> [normalized]
end
end)
# 2. Apply simplification rules and sort.
simplified_members =
normalized_and_flattened
# Annihilation: A | none -> A
|> Enum.reject(&(&1 == :none))
# Uniq members
|> MapSet.new()
# Annihilation: if `any` is a member, the whole union is `any`.
if MapSet.member?(simplified_members, :any) do
:any
else
# 3. Finalize the structure.
case MapSet.to_list(simplified_members) do
# An empty union is the empty set.
[] -> :none
# Idempotency: A | A -> A
[single_member] -> single_member
sorted_members -> {:union, Enum.sort(sorted_members)}
end
end
{:intersect, members} ->
# 1. Recursively normalize, then flatten.
normalized_and_flattened =
Enum.flat_map(members, fn member ->
normalized = normalize(member)
case normalized do
{:intersect, sub_members} -> sub_members
_ -> [normalized]
end
end)
# 2. Apply simplification rules and sort.
simplified_members =
normalized_and_flattened
# Annihilation: A & any -> A
|> Enum.reject(&(&1 == :any))
|> MapSet.new()
# Annihilation: if `none` is a member, the whole intersection is `none`.
if MapSet.member?(simplified_members, :none) do
:none
else
# 3. Finalize the structure.
case MapSet.to_list(simplified_members) do
# An empty intersection is the universal set.
[] -> :any
# Idempotency: A & A -> A
[single_member] -> single_member
sorted_members -> {:intersect, Enum.sort(sorted_members)}
end
end
end
end
end
defmodule Tdd.Store do
@moduledoc """
Manages the state of the TDD system's node graph and operation cache.
This module acts as the stateful backend for the TDD algorithms. It is
responsible for creating unique, shared nodes (ensuring structural canonicity)
and for memoizing the results of expensive operations.
It is intentionally agnostic about the *meaning* of the variables within the
nodes; it treats them as opaque, comparable terms. The logic for interpreting
these variables resides in higher-level modules like `Tdd.Algo` and
`Tdd.Consistency.Engine`.
For simplicity, this implementation uses the Process dictionary for state.
In a production, concurrent application, this would be replaced by a `GenServer`
to ensure safe, serialized access to the shared TDD state.
"""
# --- State Keys ---
@nodes_key :tdd_nodes
@node_by_id_key :tdd_node_by_id
@next_id_key :tdd_next_id
@op_cache_key :tdd_op_cache
# --- Terminal Node IDs ---
@false_node_id 0
@true_node_id 1
# --- Public API ---
@doc "Initializes the TDD store in the current process."
def init do
# The main lookup table: {variable, y, n, d} -> id
Process.put(@nodes_key, %{})
# The reverse lookup table: id -> {variable, y, n, d} or :terminal
Process.put(@node_by_id_key, %{
@false_node_id => :false_terminal,
@true_node_id => :true_terminal
})
# The next available integer ID for a new node.
Process.put(@next_id_key, 2)
# The cache for memoizing operation results: {op, args} -> id
Process.put(@op_cache_key, %{})
:ok
end
@doc "Returns the ID for the TRUE terminal node (the 'any' type)."
@spec true_node_id() :: non_neg_integer()
def true_node_id, do: @true_node_id
@doc "Returns the ID for the FALSE terminal node (the 'none' type)."
@spec false_node_id() :: non_neg_integer()
def false_node_id, do: @false_node_id
@doc "Retrieves the details of a node by its ID."
@spec get_node(non_neg_integer()) ::
{:ok,
{variable :: term(), yes_id :: non_neg_integer(), no_id :: non_neg_integer(),
dc_id :: non_neg_integer()}}
| {:ok, :true_terminal | :false_terminal}
| {:error, :not_found}
def get_node(id) do
case Process.get(@node_by_id_key, %{}) do
%{^id => details} -> {:ok, details}
%{} -> {:error, :not_found}
end
end
@doc """
Finds an existing node that matches the structure or creates a new one.
This is the core function for ensuring structural sharing (part of the "Reduced"
property of ROBDDs). It also implements a fundamental reduction rule: if all
children of a node are identical, the node is redundant and is replaced by
its child.
"""
@spec find_or_create_node(
variable :: term(),
yes_id :: non_neg_integer(),
no_id :: non_neg_integer(),
dc_id :: non_neg_integer()
) :: non_neg_integer()
def find_or_create_node(variable, yes_id, no_id, dc_id) do
# Basic reduction rule: a node whose test is irrelevant is redundant.
if yes_id == no_id && yes_id == dc_id do
yes_id
else
node_tuple = {variable, yes_id, no_id, dc_id}
nodes = Process.get(@nodes_key, %{})
case Map.get(nodes, node_tuple) do
# Node already exists, return its ID for structural sharing.
id when is_integer(id) ->
id
# Node does not exist, create it.
nil ->
next_id = Process.get(@next_id_key)
node_by_id = Process.get(@node_by_id_key)
# Update all state tables
Process.put(@nodes_key, Map.put(nodes, node_tuple, next_id))
Process.put(@node_by_id_key, Map.put(node_by_id, next_id, node_tuple))
Process.put(@next_id_key, next_id + 1)
next_id
end
end
end
@doc "Retrieves a result from the operation cache."
@spec get_op_cache(term()) :: {:ok, term()} | :not_found
def get_op_cache(cache_key) do
case Process.get(@op_cache_key, %{}) do
%{^cache_key => result} -> {:ok, result}
%{} -> :not_found
end
end
@doc "Puts a result into the operation cache."
@spec put_op_cache(term(), term()) :: :ok
def put_op_cache(cache_key, result) do
# Using `get_and_update_in` would be safer but this is fine for this context.
cache = Process.get(@op_cache_key, %{})
Process.put(@op_cache_key, Map.put(cache, cache_key, result))
:ok
end
end
defmodule Tdd.Variable do
@moduledoc """
Defines the canonical structure for all Tdd predicate variables.
The structure `{category, predicate, value, padding}` is used to enforce a
stable global ordering. All variables are 4-element tuples to ensure that
Elixir's tuple-size-first comparison rule does not interfere with the
intended predicate ordering within a category.
"""
alias Tdd.TypeSpec
# --- Category 0: Primary Type Discriminators ---
# Padding with `nil` to make them 4-element tuples.
@spec v_is_atom() :: term()
def v_is_atom, do: {0, :is_atom, nil, nil}
@spec v_is_integer() :: term()
def v_is_integer, do: {0, :is_integer, nil, nil}
@spec v_is_list() :: term()
def v_is_list, do: {0, :is_list, nil, nil}
@spec v_is_tuple() :: term()
def v_is_tuple, do: {0, :is_tuple, nil, nil}
# --- Category 1: Atom Properties ---
@spec v_atom_eq(atom()) :: term()
def v_atom_eq(atom_val) when is_atom(atom_val), do: {1, :value, atom_val, nil}
# --- Category 2: Integer Properties ---
@spec v_int_lt(integer()) :: term()
def v_int_lt(n) when is_integer(n), do: {2, :alt, n, nil}
@spec v_int_eq(integer()) :: term()
def v_int_eq(n) when is_integer(n), do: {2, :beq, n, nil}
@spec v_int_gt(integer()) :: term()
def v_int_gt(n) when is_integer(n), do: {2, :cgt, n, nil}
# --- Category 4: Tuple Properties ---
# The most complex var here is `:b_element` with index and nested var.
# So all vars in this category need to be at least 4-element.
@spec v_tuple_size_eq(non_neg_integer()) :: term()
def v_tuple_size_eq(size) when is_integer(size) and size >= 0, do: {4, :a_size, size, nil}
@spec v_tuple_elem_pred(non_neg_integer(), term()) :: term()
def v_tuple_elem_pred(index, nested_pred_var) when is_integer(index) and index >= 0 do
{4, :b_element, index, nested_pred_var}
end
# --- Category 5: List Properties ---
# All are now 4-element tuples. The sorting will be correct.
@spec v_list_all_elements_are(TypeSpec.t()) :: term()
def v_list_all_elements_are(element_spec), do: {5, :a_all_elements, element_spec, nil}
@spec v_list_is_empty() :: term()
def v_list_is_empty, do: {5, :b_is_empty, nil, nil}
@spec v_list_head_pred(term()) :: term()
def v_list_head_pred(nested_pred_var), do: {5, :c_head, nested_pred_var, nil}
@spec v_list_tail_pred(term()) :: term()
def v_list_tail_pred(nested_pred_var), do: {5, :d_tail, nested_pred_var, nil}
end
defmodule TddStoreTests do
def test(name, expected, result) do
if expected == result do
IO.puts("[PASS] #{name}")
else
IO.puts("[FAIL] #{name}")
IO.puts(" Expected: #{inspect(expected)}")
IO.puts(" Got: #{inspect(result)}")
Process.put(:test_failures, [name | Process.get(:test_failures, [])])
end
end
def run() do
IO.puts("\n--- Running Tdd.Store Tests ---")
Process.put(:test_failures, [])
# --- Test Setup ---
Tdd.Store.init()
# --- Test Cases ---
IO.puts("\n--- Section: Initialization and Terminals ---")
test("true_node_id returns 1", 1, Tdd.Store.true_node_id())
test("false_node_id returns 0", 0, Tdd.Store.false_node_id())
test("get_node for ID 1 returns true_terminal", {:ok, :true_terminal}, Tdd.Store.get_node(1))
test(
"get_node for ID 0 returns false_terminal",
{:ok, :false_terminal},
Tdd.Store.get_node(0)
)
test(
"get_node for unknown ID returns not_found",
{:error, :not_found},
Tdd.Store.get_node(99)
)
IO.puts("\n--- Section: Node Creation and Structural Sharing ---")
# Define some opaque variables
var_a = {:is_atom}
var_b = {:is_integer}
true_id = Tdd.Store.true_node_id()
false_id = Tdd.Store.false_node_id()
# Create a new node. It should get ID 2.
id1 = Tdd.Store.find_or_create_node(var_a, true_id, false_id, false_id)
test("First created node gets ID 2", 2, id1)
# Verify its content
test(
"get_node for ID 2 returns the correct tuple",
{:ok, {var_a, true_id, false_id, false_id}},
Tdd.Store.get_node(id1)
)
# Create another, different node. It should get ID 3.
id2 = Tdd.Store.find_or_create_node(var_b, id1, false_id, false_id)
test("Second created node gets ID 3", 3, id2)
# Attempt to create the first node again.
id1_again = Tdd.Store.find_or_create_node(var_a, true_id, false_id, false_id)
test(
"Attempting to create an existing node returns the same ID (Structural Sharing)",
id1,
id1_again
)
# Check that next_id was not incremented by the shared call
id3 = Tdd.Store.find_or_create_node(var_b, true_id, false_id, false_id)
test("Next new node gets the correct ID (4)", 4, id3)
IO.puts("\n--- Section: Basic Reduction Rule ---")
# Create a node where all children are the same.
id_redundant = Tdd.Store.find_or_create_node(var_a, id3, id3, id3)
test(
"A node with identical children reduces to the child's ID",
id3,
id_redundant
)
IO.puts("\n--- Section: Caching ---")
cache_key = {:my_op, 1, 2}
test("Cache is initially empty for a key", :not_found, Tdd.Store.get_op_cache(cache_key))
Tdd.Store.put_op_cache(cache_key, :my_result)
test(
"Cache returns the stored value after put",
{:ok, :my_result},
Tdd.Store.get_op_cache(cache_key)
)
Tdd.Store.put_op_cache(cache_key, :new_result)
test("Cache can be updated", {:ok, :new_result}, Tdd.Store.get_op_cache(cache_key))
# --- Final Report ---
failures = Process.get(:test_failures, [])
if failures == [] do
IO.puts("\n✅ All Tdd.Store tests passed!")
else
IO.puts("\n❌ Found #{length(failures)} test failures.")
end
end
end
defmodule TypeSpecTests do
alias Tdd.TypeSpec
# Simple test helper function
defp test(name, expected, tested) do
current_failures = Process.get(:test_failures, [])
result = TypeSpec.normalize(tested)
# Use a custom comparison to handle potentially unsorted lists in expected values
# The normalize function *should* sort, but this makes tests more robust.
is_equal =
case {expected, result} do
{{:union, list1}, {:union, list2}} -> Enum.sort(list1) == Enum.sort(list2)
{{:intersect, list1}, {:intersect, list2}} -> Enum.sort(list1) == Enum.sort(list2)
_ -> expected == result
end
if is_equal do
IO.puts("[PASS] #{name}")
else
IO.puts("[FAIL] #{name}")
IO.puts(" tested: #{inspect(tested)}")
IO.puts(" Expected: #{inspect(expected)}")
IO.puts(" Got: #{inspect(result)}")
Process.put(:test_failures, [name | current_failures])
end
end
def run() do
IO.puts("\n--- Running Tdd.TypeSpec.normalize/1 Tests ---")
Process.put(:test_failures, [])
# --- Test Section: Base & Simple Types ---
IO.puts("\n--- Section: Base & Simple Types ---")
test("Normalizing :any is idempotent", :any, :any)
test("Normalizing :none is idempotent", :none, :none)
test("Normalizing :atom is idempotent", :atom, :atom)
test("Normalizing a literal is idempotent", {:literal, :foo}, {:literal, :foo})
# --- Test Section: Double Negation ---
IO.puts("\n--- Section: Double Negation ---")
test("¬(¬atom) simplifies to atom", :atom, {:negation, {:negation, :atom}})
test("A single negation is preserved", {:negation, :integer}, {:negation, :integer})
test(
"¬(¬(¬atom)) simplifies to ¬atom",
{:negation, :atom},
{:negation, {:negation, {:negation, :atom}}}
)
# --- Test Section: Union Normalization ---
IO.puts("\n--- Section: Union Normalization ---")
test(
"Flattens nested unions",
{:union, [:atom, :integer, :list]},
{:union, [:integer, {:union, [:list, :atom]}]}
)
test(
"Sorts members of a union",
{:union, [:atom, :integer, :list]},
{:union, [:list, :integer, :atom]}
)
test(
"Removes duplicates in a union",
{:union, [:atom, :integer]},
{:union, [:integer, :atom, :integer]}
)
test("Simplifies a union with :none (A | none -> A)", :atom, {:union, [:atom, :none]})
test("Simplifies a union with :any (A | any -> any)", :any, {:union, [:atom, :any]})
test("An empty union simplifies to :none", :none, {:union, []})
test("A union containing only :none simplifies to :none", :none, {:union, [:none, :none]})
test("A union of a single element simplifies to the element itself", :atom, {:union, [:atom]})
# --- Test Section: Intersection Normalization ---
IO.puts("\n--- Section: Intersection Normalization ---")
test(
"Flattens nested intersections",
{:intersect, [:atom, :integer, :list]},
{:intersect, [:integer, {:intersect, [:list, :atom]}]}
)
test(
"Sorts members of an intersection",
{:intersect, [:atom, :integer, :list]},
{:intersect, [:list, :integer, :atom]}
)
test(
"Removes duplicates in an intersection",
{:intersect, [:atom, :integer]},
{:intersect, [:integer, :atom, :integer]}
)
test(
"Simplifies an intersection with :any (A & any -> A)",
:atom,
{:intersect, [:atom, :any]}
)
test(
"Simplifies an intersection with :none (A & none -> none)",
:none,
{:intersect, [:atom, :none]}
)
test("An empty intersection simplifies to :any", :any, {:intersect, []})
test(
"An intersection of a single element simplifies to the element itself",
:atom,
{:intersect, [:atom]}
)
# --- Test Section: Recursive Normalization ---
IO.puts("\n--- Section: Recursive Normalization ---")
test(
"Recursively normalizes elements in a tuple",
{:tuple, [:atom, {:union, [{:literal, :a}, {:literal, :b}]}]},
{:tuple, [{:union, [:atom]}, {:union, [{:literal, :a}, {:literal, :b}]}]}
)
test(
"Recursively normalizes head and tail in a cons",
{:cons, :any, {:negation, :integer}},
{:cons, {:union, [:atom, :any]}, {:negation, {:union, [:integer]}}}
)
test(
"Recursively normalizes element in list_of",
{:list_of, :list},
{:list_of, {:intersect, [:any, :list]}}
)
test(
"Recursively normalizes sub-spec in negation",
{:negation, {:union, [{:literal, :a}, {:literal, :b}]}},
{:negation, {:union, [{:literal, :a}, {:literal, :b}]}}
)
# --- Test Section: Complex Nested Cases ---
IO.puts("\n--- Section: Complex Nested Cases ---")
complex_spec =
{:union,
[
:atom,
# simplifies to :integer
{:intersect, [:any, :integer, {:intersect, [:integer]}]},
# simplifies to :list
{:union, [:none, :list]}
]}
test(
"Handles complex nested simplifications correctly",
{:union, [:atom, :integer, :list]},
complex_spec
)
# --- Final Report ---
failures = Process.get(:test_failures, [])
if failures == [] do
IO.puts("\n✅ All TypeSpec tests passed!")
else
IO.puts("\n❌ Found #{length(failures)} test failures:")
Enum.each(failures, &IO.puts(" - #{&1}"))
end
end
end
defmodule TddVariableTests do
alias Tdd.Variable
alias Tdd.TypeSpec
# Simple test helper function
defp test(name, expected, result) do
current_failures = Process.get(:test_failures, [])
if expected == result do
IO.puts("[PASS] #{name}")
else
IO.puts("[FAIL] #{name}")
IO.puts(" Expected: #{inspect(expected)}")
IO.puts(" Got: #{inspect(result)}")
Process.put(:test_failures, [name | current_failures])
end
end
def run() do
IO.puts("\n--- Running Tdd.Variable Tests ---")
Process.put(:test_failures, [])
# --- Test Section: Variable Structure ---
IO.puts("\n--- Section: Variable Structure ---")
test("v_is_atom returns correct tuple", {0, :is_atom, nil, nil}, Variable.v_is_atom())
test("v_atom_eq returns correct tuple", {1, :value, :foo, nil}, Variable.v_atom_eq(:foo))
test("v_int_lt returns correct tuple", {2, :alt, 10, nil}, Variable.v_int_lt(10))
test("v_tuple_size_eq returns correct tuple", {4, :a_size, 2, nil}, Variable.v_tuple_size_eq(2))
test(
"v_tuple_elem_pred nests a variable correctly",
{4, :b_element, 0, {0, :is_integer, nil, nil }},
Variable.v_tuple_elem_pred(0, Variable.v_is_integer())
)
test("v_list_is_empty returns correct tuple", {5, :b_is_empty, nil, nil}, Variable.v_list_is_empty())
test(
"v_list_head_pred nests a variable correctly",
{5, :c_head, {0, :is_atom, nil, nil}, nil},
Variable.v_list_head_pred(Variable.v_is_atom())
)
test(
"v_list_all_elements_are nests a TypeSpec correctly",
{5, :a_all_elements, {:union, [:atom, :integer]}, nil},
Variable.v_list_all_elements_are(TypeSpec.normalize({:union, [:integer, :atom]}))
)
# --- Test Section: Global Ordering ---
IO.puts("\n--- Section: Global Ordering (Based on Elixir Term Comparison) ---")
# Category 0 < Category 1
test(
"Primary type var < Atom property var",
true,
Variable.v_is_tuple() < Variable.v_atom_eq(:anything)
)
# Within Category 2: alt < beq < cgt
test(
"Integer :lt var < Integer :eq var",
true,
Variable.v_int_lt(10) < Variable.v_int_eq(10)
)
test(
"Integer :eq var < Integer :gt var",
true,
Variable.v_int_eq(10) < Variable.v_int_gt(10)
)
# Within Category 2: comparison of value
test(
"Integer :eq(5) var < Integer :eq(10) var",
true,
Variable.v_int_eq(5) < Variable.v_int_eq(10)
)
# Within Category 4: comparison of index
test(
"Tuple elem(0) var < Tuple elem(1) var",
true,
Variable.v_tuple_elem_pred(0, Variable.v_is_atom()) <
Variable.v_tuple_elem_pred(1, Variable.v_is_atom())
)
# Within Category 4, same index: comparison of nested var
test(
"Tuple elem(0, atom) var < Tuple elem(0, int) var",
true,
Variable.v_tuple_elem_pred(0, Variable.v_is_atom()) <
Variable.v_tuple_elem_pred(0, Variable.v_is_integer())
)
IO.inspect(Variable.v_list_all_elements_are(:atom), label: "Variable.v_list_all_elements_are(:atom)")
IO.inspect(Variable.v_list_is_empty(), label: "Variable.v_list_is_empty()")
test(
"List :a_all_elements var < List :b_is_empty var",
true,
Variable.v_list_all_elements_are(:atom) < Variable.v_list_is_empty()
)
test(
"List :b_is_empty var < List :c_head var",
true,
Variable.v_list_is_empty() < Variable.v_list_head_pred(Variable.v_is_atom())
)
test(
"List :c_head var < List :tail var",
true,
Variable.v_list_head_pred(Variable.v_is_atom()) <
Variable.v_list_tail_pred(Variable.v_is_atom())
)
# --- Final Report ---
failures = Process.get(:test_failures, [])
if failures == [] do
IO.puts("\n✅ All Tdd.Variable tests passed!")
else
IO.puts("\n❌ Found #{length(failures)} test failures.")
end
end
end
# To run the tests, you would call this function:
TypeSpecTests.run()
TddStoreTests.run()
TddVariableTests.run()