Files
Reverso/src/scoring.py
Junior B. 3417f85eb1 v1.1: full gene space + specificity z-score; hydroxyurea recovers
Post-hoc improvement after the pre-registered v1 recovery test failed.
Two changes, diagnosing v1's failure:
- score on the full 12,328-gene LINCS space (week2_lincs_extract.py),
  lifting signature overlap from 12% to 85% (brings erythroid markers in)
- src/scoring.py: KS connectivity + per-drug specificity z-score
  (spec_z = SDs below a 1,000 random-query null). Primary ranking is
  now spec_z. (Textbook tau saturated at +/-100 for a coherent query —
  documented; needs a reference-signature library, a v2 item.)
- week3_scoring.py: spec_z primary + WTCS reference + prior-blended
- tests: tau/spec_z calibration test; 19 passing
- scripts/exp_genespace.py: the BING vs all-12,328 comparison

Result: hydroxyurea recovers (rank 40 -> 18, top 6%, passes top-10%),
confirming the v1 failure was the landmark bottleneck not the algorithm.
Overall STILL FAILS: L-glutamine does not reverse (rank 213, metabolite),
and negative controls (norethindrone, ciprofloxacin) rank top-3 —
connectivity != therapeutic relatedness. v1.1 is post-hoc/exploratory,
not a confirmatory test; reported as such in recovery_test_report.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-23 22:57:30 +02:00

229 lines
9.7 KiB
Python

"""CMap-style connectivity scoring — the matching engine (Week 3, PLAN §6).
Scores each drug's LINCS consensus signature against the disease signature using the weighted
Kolmogorov-Smirnov / GSEA enrichment statistic (Lamb 2006; Subramanian 2017). The query is the
disease up/down gene sets; the reference is each drug's 978 landmark genes ranked by z-score.
Sign convention (PLAN §6): strongly **negative** connectivity = strong **reversal** of the
disease signature = candidate match. A drug that down-regulates the disease's up-genes and
up-regulates its down-genes scores negative.
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
from . import RANDOM_SEED, RESULTS_DIR
# Sickle-cell-relevant target pathways for the mechanistic prior (PLAN §6 Week 3 task 3).
# Keys are pathway categories; values are substrings matched (case-insensitive) against a
# drug's ChEMBL target names.
SICKLE_PATHWAYS: dict[str, tuple[str, ...]] = {
"hbf_epigenetic": ("histone deacetylase", "hdac", "methyltransferase", "dnmt",
"ribonucleoside-diphosphate reductase", "ribonucleotide reductase"),
"hemoglobin": ("hemoglobin", "globin"),
"no_signaling": ("nitric oxide", "guanylate cyclase", "phosphodiesterase 5", "pde5"),
"inflammation": ("cyclooxygenase", "prostaglandin", "nf-kappa", "interleukin",
"leukotriene", "selectin", "tumor necrosis factor"),
"oxidative_stress": ("glutathione", "superoxide", "nadph oxidase", "thioredoxin", "nrf2"),
}
def _enrichment_score(drug_profile: pd.Series, gene_set: set[str], weight: float = 1.0) -> float:
"""Weighted GSEA/KS enrichment score of ``gene_set`` in a drug's ranked profile.
The profile is ranked by z-score (most up-regulated first). Hits increment a running sum in
proportion to ``|z|**weight``; misses decrement uniformly. ES is the max signed deviation
from zero. ES>0 => set enriched among up-regulated genes; ES<0 => among down-regulated.
"""
s = drug_profile.sort_values(ascending=False)
genes = s.index.to_numpy()
vals = s.to_numpy(dtype=float)
hit = np.fromiter((g in gene_set for g in genes), dtype=bool, count=len(genes))
n_hit = int(hit.sum())
n = len(genes)
if n_hit == 0 or n_hit == n:
return 0.0
w = (np.abs(vals) ** weight) * hit
sum_hit = w.sum()
if sum_hit == 0:
return 0.0
inc = w / sum_hit
dec = (~hit) / (n - n_hit)
running = np.cumsum(inc - dec)
hi, lo = running.max(), running.min()
return float(hi if abs(hi) >= abs(lo) else lo)
def connectivity_score(
up_genes: list[str],
down_genes: list[str],
drug_signature: pd.Series,
) -> float:
"""Weighted connectivity score (WTCS) for one drug vs the disease up/down sets.
Only query genes present in the drug's profile index (the 978 landmarks) are used — callers
should record the overlap count (PLAN §6 Week 3 task 2). Returns the WTCS: if the two
enrichment scores share a sign the result is 0 (ambiguous), else ``(ES_up - ES_down)/2``.
Negative => reversal => candidate.
"""
profile_genes = set(drug_signature.index)
up = set(up_genes) & profile_genes
down = set(down_genes) & profile_genes
es_up = _enrichment_score(drug_signature, up)
es_down = _enrichment_score(drug_signature, down)
if np.sign(es_up) == np.sign(es_down):
return 0.0
return (es_up - es_down) / 2.0
def normalize_scores(scores: pd.Series) -> pd.Series:
"""Signed normalization (NCS, Subramanian 2017): divide by the mean magnitude of same-sign
scores, so positive and negative tails are separately scaled to a mean magnitude of 1."""
out = scores.astype(float).copy()
pos_mean = scores[scores > 0].mean()
neg_mean = scores[scores < 0].abs().mean()
if pos_mean and not np.isnan(pos_mean):
out[scores > 0] = scores[scores > 0] / pos_mean
if neg_mean and not np.isnan(neg_mean):
out[scores < 0] = scores[scores < 0] / neg_mean
return out
def rank_drugs(
up_genes: list[str],
down_genes: list[str],
signature_matrix: pd.DataFrame,
) -> pd.DataFrame:
"""Score and rank all drugs (rows of ``signature_matrix``: drug x landmark-gene z-scores).
Returns a table indexed by drug with ``rank`` (1 = strongest reversal = most negative),
``connectivity_score`` and ``normalized_score``. Drugs are expected to all have signatures
here; signature-less drugs are handled (marked not-scored) by the orchestration layer per
PLAN §6 Week 3 task 2.
"""
scores = pd.Series(
{drug: connectivity_score(up_genes, down_genes, signature_matrix.loc[drug])
for drug in signature_matrix.index},
name="connectivity_score",
)
df = pd.DataFrame({"connectivity_score": scores, "normalized_score": normalize_scores(scores)})
df = df.sort_values("connectivity_score") # most negative (reversal) first
df.insert(0, "rank", range(1, len(df) + 1))
return df
def mechanistic_prior(targets: list[str]) -> float:
"""Count of sickle-cell-relevant pathway categories a drug's targets hit (PLAN §6 task 3).
Higher = more mechanistically plausible. Used to build the secondary, prior-weighted ranking
alongside the raw connectivity ranking.
"""
if not targets:
return 0.0
text = " ; ".join(str(t) for t in targets).lower()
return float(sum(any(kw in text for kw in kws) for kws in SICKLE_PATHWAYS.values()))
# --- KS connectivity + tau calibration (v1.1) -----------------------------------------------
# Unweighted Kolmogorov-Smirnov connectivity (Lamb 2006) is O(k) per query (depends only on the
# ranks of the query genes), which makes a permutation null over many random queries cheap. tau
# expresses each drug's real connectivity as a signed percentile within its own null — so
# broad-effect drugs that connect to *random* signatures too get down-weighted (specificity).
def _ks_es(rank_matrix: np.ndarray, query_cols: np.ndarray, n_genes: int) -> np.ndarray:
"""Vectorized unweighted KS enrichment score of a query gene set for every drug.
``rank_matrix`` is (n_drugs, n_genes) of 1..N rank positions (1 = most up-regulated).
Returns one ES per drug; ES>0 => query enriched among up-regulated genes.
"""
k = len(query_cols)
if k == 0:
return np.zeros(rank_matrix.shape[0])
p = np.sort(rank_matrix[:, query_cols], axis=1) # (n_drugs, k), positions ascending
j = np.arange(1, k + 1)
a = (j / k - p / n_genes).max(axis=1)
b = (p / n_genes - (j - 1) / k).max(axis=1)
return np.where(a >= b, a, -b)
def _ks_connectivity(rank_matrix: np.ndarray, up_cols: np.ndarray, down_cols: np.ndarray,
n_genes: int) -> np.ndarray:
"""KS connectivity per drug: (ES_up - ES_down)/2. Negative=reversal.
Note: unlike WTCS, this does NOT zero same-sign (ambiguous) connections — same-sign ES
partially cancel and land near 0 naturally. Hard-zeroing would collapse the random-query
null to a spike at 0 and make tau saturate, so the continuous form is required for calibration.
"""
es_up = _ks_es(rank_matrix, up_cols, n_genes)
es_down = _ks_es(rank_matrix, down_cols, n_genes)
return (es_up - es_down) / 2.0
def tau_calibrate(
up_genes: list[str],
down_genes: list[str],
signature_matrix: pd.DataFrame,
n_null: int = 1000,
seed: int = RANDOM_SEED,
) -> pd.DataFrame:
"""Rank drugs by tau: each drug's KS connectivity as a signed percentile vs a null of
random queries of the same size (PLAN §6; CMap tau, Subramanian 2017).
tau in [-100, 100]: -100 => reverses our signature more specifically than any random query
(strong, specific candidate); ~0 => connects to our signature no more than to random ones
(broad-effect / non-specific). Ranked by tau ascending (rank 1 = most specific reversal).
"""
genes = list(signature_matrix.columns)
gene_to_col = {g: i for i, g in enumerate(genes)}
n = len(genes)
rank_matrix = signature_matrix.rank(axis=1, ascending=False).to_numpy()
up_cols = np.array([gene_to_col[g] for g in set(up_genes) if g in gene_to_col], dtype=int)
down_cols = np.array([gene_to_col[g] for g in set(down_genes) if g in gene_to_col], dtype=int)
real = _ks_connectivity(rank_matrix, up_cols, down_cols, n)
rng = np.random.default_rng(seed)
k_up, k_down = len(up_cols), len(down_cols)
null = np.empty((rank_matrix.shape[0], n_null))
for m in range(n_null):
samp = rng.choice(n, size=k_up + k_down, replace=False)
null[:, m] = _ks_connectivity(rank_matrix, samp[:k_up], samp[k_up:], n)
null_mean = null.mean(axis=1)
null_std = null.std(axis=1)
null_std[null_std == 0] = np.nan
# Per-drug standardized connectivity: how many SDs the real reversal is below what random
# queries of the same size produce against THIS drug. Continuous (no percentile floor), so it
# discriminates within the strong-reversal tail. Negative = specific reversal.
spec_z = (real - null_mean) / null_std
frac = (null <= real[:, None]).mean(axis=1)
tau = 100.0 * (2.0 * frac - 1.0) # retained for reference; saturates at +/-100 in the tail
df = pd.DataFrame(
{"connectivity_ks": real, "null_mean": null_mean, "spec_z": spec_z, "tau": tau},
index=signature_matrix.index,
)
df = df.sort_values("spec_z") # most negative z = most specific reversal
df.insert(0, "rank", range(1, len(df) + 1))
return df
def persist_ranking(ranking: pd.DataFrame, out_path: Path | None = None) -> Path:
"""Write the ranked candidate list to ``data/results/ranked_candidates_v1.csv``."""
out_path = out_path or (RESULTS_DIR / "ranked_candidates_v1.csv")
out_path.parent.mkdir(parents=True, exist_ok=True)
ranking.to_csv(out_path, index=False)
return out_path