Pollutant Spread in a River Network

The Problem

Representing the River Network

# River network: 5 nodes in a directed tree (upstream to downstream).
# Node indices correspond to NODE_NAMES.
NODE_NAMES = ["upstream_a", "upstream_b", "junction", "reach", "outlet"]

# Node 4 (outlet) is the sink: pollutant that arrives there is not transported further.
SINK = 4

# Initial spill: unit concentration at upstream_a only.
SPILL_NODE = 0

# Fraction of concentration at each non-sink node transported downstream per step.
# At 0.4, roughly 40 % of mass moves forward each time step.
TRANSPORT_RATE = 0.4

N_STEPS = 20
SNAPSHOT_INTERVAL = 5
def build_network():
    """Return a directed graph representing the river network.

    Edge weights represent relative mean discharge (arbitrary units) and
    are used for visualization only; transport uses a uniform rate.
    """
    G = nx.DiGraph()
    G.add_nodes_from(range(len(NODE_NAMES)))
    # upstream_a and upstream_b both drain into the junction.
    G.add_edge(0, 2, weight=3.0)
    G.add_edge(1, 2, weight=5.0)
    # junction drains to reach, reach drains to outlet.
    G.add_edge(2, 3, weight=8.0)
    G.add_edge(3, 4, weight=8.0)
    return G

Why is a river network represented as a directed graph rather than an undirected graph?

Water can flow in either direction between any two connected nodes.
Wrong: rivers flow in one direction, from higher to lower elevation; edges must be directed to encode this.
The direction of each edge encodes which way water (and pollutant) flows.
Correct: directed edges from upstream to downstream nodes capture the unidirectional nature of river flow.
Directed graphs are always faster to compute with than undirected graphs.
Wrong: the choice of directed vs. undirected is based on the physical system, not computational speed.
Edge weights cannot be stored on undirected graphs.
Wrong: undirected graphs support weighted edges just as directed graphs do.

The Transport Model

$$c_v^{n+1} = (1 - \alpha)\,c_v^n + \sum_k f_k\,c_{u_k}^n$$

where $\alpha = \text{TRANSPORT_RATE}$ and $f_k = \alpha / \text{(number of downstream neighbours of } u_k\text{)}$.

Building the Upstream Table

def build_upstream(G):
    """Return upstream[v] = list of (u, fraction) pairs for each node v.

    For each edge u -> v, the fraction of u's concentration that flows to v
    in one step is TRANSPORT_RATE divided by the number of downstream
    neighbours of u.  A node with no upstream neighbours has an empty list.
    """
    n = len(NODE_NAMES)
    upstream = {v: [] for v in range(n)}
    for u in range(n):
        if u == SINK:
            continue
        succs = list(G.successors(u))
        if not succs:
            continue
        fraction = TRANSPORT_RATE / len(succs)
        for v in succs:
            upstream[v].append((u, fraction))
    return upstream

Put these steps in the correct order to fill in the upstream table for one non-sink node u.

  1. Find the list of downstream successors of u in the graph G
  2. Compute fraction = TRANSPORT_RATE / number of successors
  3. For each successor v, append (u, fraction) to upstream[v]
  4. Check that the fractions appended for u sum to TRANSPORT_RATE (conservation check)

In our 5-node tree every non-sink node has exactly one downstream successor. What fraction of its concentration does each non-sink node send downstream per step?

0.0
Wrong: that would mean no pollutant is ever transported.
TRANSPORT_RATE / 2
Wrong: dividing by 2 would only apply if the node had two successors.
TRANSPORT_RATE
Correct: with one successor, fraction = TRANSPORT_RATE / 1 = TRANSPORT_RATE.
1.0
Wrong: 1.0 would mean all concentration moves downstream and none is retained.

Running the Simulation

def simulate(G):
    """Simulate pollutant transport and return concentration snapshots.

    For each time step the new concentration at node v is:

        new_c[v] = c[v] * (1 - outflow_rate[v])
                   + sum(fraction * c[u] for (u, fraction) in upstream[v])

    where outflow_rate[v] is TRANSPORT_RATE for non-sink nodes (0 for the sink).
    The sink retains everything it receives and sends nothing downstream.
    """
    n = len(NODE_NAMES)
    c = [0.0] * n
    c[SPILL_NODE] = 1.0

    upstream = build_upstream(G)

    # Outflow rate for each node: TRANSPORT_RATE for non-sink nodes that have
    # at least one downstream successor; 0 for the sink and for isolated nodes.
    outflow = [0.0] * n
    for u in range(n):
        if u != SINK and list(G.successors(u)):
            outflow[u] = TRANSPORT_RATE

    records = _snapshot(c, 0)
    for step in range(1, N_STEPS + 1):
        new_c = [0.0] * n
        for v in range(n):
            retained = c[v] * (1.0 - outflow[v])
            inflow = sum(fraction * c[u] for (u, fraction) in upstream[v])
            new_c[v] = retained + inflow
        c = new_c
        if step % SNAPSHOT_INTERVAL == 0:
            records += _snapshot(c, step)
    return pl.DataFrame(records)


def _snapshot(c, step):
    return [
        {"node": NODE_NAMES[i], "concentration": float(c[i]), "step": step}
        for i in range(len(NODE_NAMES))
    ]

Visualizing the Results

def plot(df):
    """Return an Altair line chart of concentration at each node over time."""
    return (
        alt.Chart(df)
        .mark_line(point=True)
        .encode(
            x=alt.X("step:O", title="Time step"),
            y=alt.Y(
                "concentration:Q",
                title="Concentration",
                scale=alt.Scale(domain=[0.0, 1.0]),
            ),
            color=alt.Color("node:N", title="Node", sort=NODE_NAMES),
        )
        .properties(width=350, height=250)
    )
Line chart showing concentration at each node over 20 time steps. upstream_a decays from 1.0; junction and reach rise then fall; outlet rises monotonically.
Figure 1: Concentration at each node over 20 time steps. The pollutant pulse travels from the spill site (upstream_a) through the network toward the outlet.

Testing

Mass conservation

Non-negativity

Monotone decay at the spill node

Uncontaminated node stays zero

Monotone accumulation at the outlet

Upstream fractions sum to transport rate

from pollute import (
    NODE_NAMES,
    N_STEPS,
    SINK,
    SPILL_NODE,
    TRANSPORT_RATE,
    build_network,
    build_upstream,
)


def _run(n_steps=N_STEPS):
    """Return concentration list after n_steps using the same loop as simulate."""
    G = build_network()
    upstream = build_upstream(G)
    n = len(NODE_NAMES)
    c = [0.0] * n
    c[SPILL_NODE] = 1.0

    # Outflow rates mirror those in simulate().
    outflow = [0.0] * n
    for u in range(n):
        if u != SINK and list(G.successors(u)):
            outflow[u] = TRANSPORT_RATE

    for _ in range(n_steps):
        new_c = [0.0] * n
        for v in range(n):
            retained = c[v] * (1.0 - outflow[v])
            inflow = sum(fraction * c[u] for (u, fraction) in upstream[v])
            new_c[v] = retained + inflow
        c = new_c
    return c


def test_mass_conservation():
    # Each step, every unit of concentration either stays at its node or moves
    # to a downstream neighbour.  No mass is created or destroyed, so the total
    # must equal 1.0 throughout.  Floating-point rounding in repeated addition
    # can accumulate, but for 5 nodes and 20 steps the error stays below 1e-12.
    c = _run()
    assert abs(sum(c) - 1.0) < 1e-12


def test_all_nonnegative():
    # Every update is a weighted sum of non-negative values with non-negative
    # weights, so concentration cannot go negative at any node or step.
    G = build_network()
    upstream = build_upstream(G)
    n = len(NODE_NAMES)
    c = [0.0] * n
    c[SPILL_NODE] = 1.0

    outflow = [0.0] * n
    for u in range(n):
        if u != SINK and list(G.successors(u)):
            outflow[u] = TRANSPORT_RATE

    for step in range(N_STEPS):
        new_c = [0.0] * n
        for v in range(n):
            retained = c[v] * (1.0 - outflow[v])
            inflow = sum(fraction * c[u] for (u, fraction) in upstream[v])
            new_c[v] = retained + inflow
        c = new_c
        assert all(x >= 0.0 for x in c), f"negative concentration at step {step + 1}"


def test_spill_node_decreases():
    # upstream_a (node 0) has no upstream neighbours, so every step it loses
    # TRANSPORT_RATE of whatever concentration it holds and gains nothing back.
    # Its concentration must be strictly decreasing from its starting value of 1.
    G = build_network()
    upstream = build_upstream(G)
    n = len(NODE_NAMES)
    c = [0.0] * n
    c[SPILL_NODE] = 1.0

    outflow = [0.0] * n
    for u in range(n):
        if u != SINK and list(G.successors(u)):
            outflow[u] = TRANSPORT_RATE

    prev = c[SPILL_NODE]
    for _ in range(N_STEPS):
        new_c = [0.0] * n
        for v in range(n):
            retained = c[v] * (1.0 - outflow[v])
            inflow = sum(fraction * c[u] for (u, fraction) in upstream[v])
            new_c[v] = retained + inflow
        c = new_c
        assert c[SPILL_NODE] < prev
        prev = c[SPILL_NODE]


def test_uncontaminated_node_stays_zero():
    # upstream_b (node 1) starts at zero concentration and has no upstream
    # neighbours feeding into it.  It loses TRANSPORT_RATE each step, so its
    # concentration is 0 * (1 - TRANSPORT_RATE)^step = 0 throughout.
    c = _run()
    assert c[1] == 0.0  # upstream_b is never contaminated


def test_outlet_nondecreasing():
    # The outlet (sink) only receives mass; it never sends mass downstream.
    # Its concentration therefore cannot decrease between steps.
    G = build_network()
    upstream = build_upstream(G)
    n = len(NODE_NAMES)
    c = [0.0] * n
    c[SPILL_NODE] = 1.0

    outflow = [0.0] * n
    for u in range(n):
        if u != SINK and list(G.successors(u)):
            outflow[u] = TRANSPORT_RATE

    prev = c[SINK]
    for _ in range(N_STEPS):
        new_c = [0.0] * n
        for v in range(n):
            retained = c[v] * (1.0 - outflow[v])
            inflow = sum(fraction * c[u] for (u, fraction) in upstream[v])
            new_c[v] = retained + inflow
        c = new_c
        assert c[SINK] >= prev
        prev = c[SINK]


def test_upstream_fractions_sum():
    # For each non-sink node u that has downstream neighbours, the total
    # fraction of concentration it sends out equals TRANSPORT_RATE exactly.
    # This can be checked by summing the fractions stored in build_upstream.
    G = build_network()
    upstream = build_upstream(G)
    # Collect total outflow fraction assigned to each source node.
    sent = {u: 0.0 for u in range(len(NODE_NAMES))}
    for v in range(len(NODE_NAMES)):
        for (u, fraction) in upstream[v]:
            sent[u] += fraction
    for u, total in sent.items():
        if total > 0.0:
            assert abs(total - TRANSPORT_RATE) < 1e-12, (
                f"node {NODE_NAMES[u]} sends {total}, expected {TRANSPORT_RATE}"
            )

Starting from $c = [1, 0, 0, 0, 0]$ and using TRANSPORT_RATE = 0.4, what is the concentration at junction (node 2) after exactly 2 time steps? Give your answer to two decimal places.

Hint: trace through the update rule step by step. At step 1, junction receives $0.4 \times c_0[\text{upstream_a}]$ from upstream_a. At step 2, junction retains $0.6$ of its step-1 value and receives another inflow from upstream_a.

Exercises

Adding a tributary

Add a sixth node tributary (node 5) that joins at reach (node 3) with edge weight 2.0. Update build_network and rerun the simulation. Does test_mass_conservation still pass? Does the pollutant reach the outlet faster or slower?

Variable transport rate

Replace the uniform TRANSPORT_RATE with per-edge rates stored in a dictionary keyed by (src, dst). Assign higher rates to high-flow edges (e.g., rate proportional to edge weight / max edge weight). Modify build_upstream to use per-edge rates and update simulate accordingly. How does the arrival time at the outlet change compared to the uniform-rate model?

Continuous spill

Change the initial condition so that upstream_a receives a constant input of 0.1 concentration units per step (a continuous source rather than an instantaneous spill). Modify simulate to add this source term to new_c[SPILL_NODE] after each update step. Is mass still conserved? At what step does the outlet concentration stabilise?

Two-spill comparison

Run two separate simulations: one with the spill at upstream_a and one with a spill at upstream_b. Add the two concentration lists at each step. Compare the result to a single simulation that starts with concentration 1.0 at both headwater nodes simultaneously. Are the results identical? Explain why in terms of the structure of the update rule.