diff --git a/llm/__pycache__/model.cpython-311.pyc b/llm/__pycache__/model.cpython-311.pyc new file mode 100644 index 0000000..88e6382 Binary files /dev/null and b/llm/__pycache__/model.cpython-311.pyc differ diff --git a/llm/__pycache__/tokenizer.cpython-311.pyc b/llm/__pycache__/tokenizer.cpython-311.pyc new file mode 100644 index 0000000..39072b1 Binary files /dev/null and b/llm/__pycache__/tokenizer.cpython-311.pyc differ diff --git a/llm/ckpt.pt b/llm/ckpt.pt new file mode 100644 index 0000000..ba3ca18 Binary files /dev/null and b/llm/ckpt.pt differ diff --git a/llm/generate.py b/llm/generate.py new file mode 100644 index 0000000..f088829 --- /dev/null +++ b/llm/generate.py @@ -0,0 +1,50 @@ +""" +Generate text from a saved checkpoint. + +Usage: + python generate.py --checkpoint ckpt.pt --prompt "First Citizen:" --tokens 300 +""" + +import argparse +import torch +from model import LLM + + +def generate(args): + device = "cuda" if torch.cuda.is_available() else "cpu" + + ckpt = torch.load(args.checkpoint, map_location=device, weights_only=False) + config = ckpt["config"] + tokenizer = ckpt["tokenizer"] + + model = LLM(config).to(device) + model.load_state_dict(ckpt["model"]) + model.eval() + + prompt = args.prompt or "" + if prompt: + idx = torch.tensor(tokenizer.encode(prompt), dtype=torch.long, device=device).unsqueeze(0) + else: + # Start from a single newline token + idx = torch.zeros((1, 1), dtype=torch.long, device=device) + + out = model.generate(idx, max_new_tokens=args.tokens, + temperature=args.temperature, top_k=args.top_k) + print(tokenizer.decode(out[0].tolist())) + + +def parse_args(): + p = argparse.ArgumentParser(description="Generate text from a trained LLM checkpoint") + p.add_argument("--checkpoint", type=str, default="ckpt.pt", help="Path to checkpoint") + p.add_argument("--prompt", type=str, default="", help="Seed text") + p.add_argument("--tokens", type=int, default=300, help="Tokens to generate") + p.add_argument("--temperature", type=float, default=0.8, help="Sampling temperature") + p.add_argument("--top_k", type=int, default=20, help="Top-k sampling (0 = off)") + return p.parse_args() + + +if __name__ == "__main__": + args = parse_args() + if args.top_k == 0: + args.top_k = None + generate(args) diff --git a/llm/model.py b/llm/model.py new file mode 100644 index 0000000..780ee0a --- /dev/null +++ b/llm/model.py @@ -0,0 +1,171 @@ +""" +Basic LLM: a character-level GPT-style Transformer built from scratch. + +Architecture: + - Token + positional embeddings + - N Transformer blocks (masked multi-head self-attention + feed-forward) + - Layer norm + linear head +""" + +import math +import torch +import torch.nn as nn +import torch.nn.functional as F + + +class CausalSelfAttention(nn.Module): + """Multi-head self-attention with a causal (look-ahead) mask.""" + + def __init__(self, config): + super().__init__() + assert config.n_embd % config.n_heads == 0 + self.n_heads = config.n_heads + self.head_dim = config.n_embd // config.n_heads + self.n_embd = config.n_embd + + # Fused Q/K/V projection + self.qkv = nn.Linear(config.n_embd, 3 * config.n_embd, bias=False) + self.proj = nn.Linear(config.n_embd, config.n_embd, bias=False) + self.dropout = nn.Dropout(config.dropout) + + # Causal mask: upper-triangular, registered as a non-parameter buffer + mask = torch.triu(torch.ones(config.block_size, config.block_size), diagonal=1).bool() + self.register_buffer("mask", mask) + + def forward(self, x): + B, T, C = x.shape + + q, k, v = self.qkv(x).split(self.n_embd, dim=2) + + # Reshape to (B, n_heads, T, head_dim) + def reshape(t): + return t.view(B, T, self.n_heads, self.head_dim).transpose(1, 2) + + q, k, v = reshape(q), reshape(k), reshape(v) + + # Scaled dot-product attention + scale = math.sqrt(self.head_dim) + scores = (q @ k.transpose(-2, -1)) / scale # (B, nh, T, T) + scores = scores.masked_fill(self.mask[:T, :T], float("-inf")) + weights = F.softmax(scores, dim=-1) + weights = self.dropout(weights) + + out = weights @ v # (B, nh, T, hd) + out = out.transpose(1, 2).contiguous().view(B, T, C) # (B, T, C) + return self.proj(out) + + +class FeedForward(nn.Module): + """Position-wise feed-forward network (expand → GELU → contract).""" + + def __init__(self, config): + super().__init__() + self.net = nn.Sequential( + nn.Linear(config.n_embd, 4 * config.n_embd), + nn.GELU(), + nn.Linear(4 * config.n_embd, config.n_embd), + nn.Dropout(config.dropout), + ) + + def forward(self, x): + return self.net(x) + + +class TransformerBlock(nn.Module): + """Self-attention + feed-forward with pre-layer-norm residual connections.""" + + def __init__(self, config): + super().__init__() + self.ln1 = nn.LayerNorm(config.n_embd) + self.attn = CausalSelfAttention(config) + self.ln2 = nn.LayerNorm(config.n_embd) + self.ff = FeedForward(config) + + def forward(self, x): + x = x + self.attn(self.ln1(x)) + x = x + self.ff(self.ln2(x)) + return x + + +class LLM(nn.Module): + """ + Tiny GPT-style language model. + + config fields: + vocab_size – number of tokens + block_size – maximum context length + n_embd – embedding dimension + n_heads – number of attention heads + n_layers – number of Transformer blocks + dropout – dropout probability + """ + + def __init__(self, config): + super().__init__() + self.config = config + + self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd) + self.pos_emb = nn.Embedding(config.block_size, config.n_embd) + self.drop = nn.Dropout(config.dropout) + self.blocks = nn.Sequential(*[TransformerBlock(config) for _ in range(config.n_layers)]) + self.ln_f = nn.LayerNorm(config.n_embd) + self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False) + + # Weight tying: share token embedding and output projection weights + self.head.weight = self.tok_emb.weight + + self.apply(self._init_weights) + + def _init_weights(self, module): + if isinstance(module, nn.Linear): + nn.init.normal_(module.weight, mean=0.0, std=0.02) + if module.bias is not None: + nn.init.zeros_(module.bias) + elif isinstance(module, nn.Embedding): + nn.init.normal_(module.weight, mean=0.0, std=0.02) + + def forward(self, idx, targets=None): + B, T = idx.shape + assert T <= self.config.block_size, "Sequence longer than block_size" + + positions = torch.arange(T, device=idx.device) + x = self.drop(self.tok_emb(idx) + self.pos_emb(positions)) + x = self.blocks(x) + x = self.ln_f(x) + logits = self.head(x) # (B, T, vocab_size) + + loss = None + if targets is not None: + loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) + + return logits, loss + + @torch.no_grad() + def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None): + """ + Autoregressively generate tokens given a conditioning sequence. + + Args: + idx – (1, T) tensor of starting token ids + max_new_tokens – number of tokens to generate + temperature – >1 = more random, <1 = more focused + top_k – if set, restrict sampling to the top-k logits + """ + for _ in range(max_new_tokens): + # Crop to block_size if needed + idx_cond = idx[:, -self.config.block_size:] + logits, _ = self(idx_cond) + logits = logits[:, -1, :] / temperature # last time-step + + if top_k is not None: + v, _ = torch.topk(logits, min(top_k, logits.size(-1))) + logits[logits < v[:, [-1]]] = float("-inf") + + probs = F.softmax(logits, dim=-1) + next_token = torch.multinomial(probs, num_samples=1) + idx = torch.cat([idx, next_token], dim=1) + + return idx + + def num_params(self): + return sum(p.numel() for p in self.parameters()) diff --git a/llm/tokenizer.py b/llm/tokenizer.py new file mode 100644 index 0000000..543cc85 --- /dev/null +++ b/llm/tokenizer.py @@ -0,0 +1,20 @@ +""" +Character-level tokenizer. + +Maps every unique character in the training corpus to an integer id. +Simple, requires no external libraries, and good enough for a tiny LLM. +""" + + +class CharTokenizer: + def __init__(self, text: str): + chars = sorted(set(text)) + self.vocab_size = len(chars) + self._stoi = {ch: i for i, ch in enumerate(chars)} + self._itos = {i: ch for i, ch in enumerate(chars)} + + def encode(self, text: str) -> list[int]: + return [self._stoi[ch] for ch in text] + + def decode(self, ids: list[int]) -> str: + return "".join(self._itos[i] for i in ids) diff --git a/llm/train.py b/llm/train.py new file mode 100644 index 0000000..b8918a0 --- /dev/null +++ b/llm/train.py @@ -0,0 +1,150 @@ +""" +Training script for the basic LLM. + +Usage: + # Train on a text file (defaults to a tiny built-in dataset if omitted) + python train.py --data path/to/corpus.txt + + # Quick smoke-test on the built-in dataset + python train.py +""" + +import argparse +import types +import random +import torch +from model import LLM +from tokenizer import CharTokenizer + +# --------------------------------------------------------------------------- +# Tiny built-in corpus (Shakespeare excerpt) used when no file is provided +# --------------------------------------------------------------------------- +BUILTIN_TEXT = """\ +First Citizen: Before we proceed any further, hear me speak. +All: Speak, speak. +First Citizen: You are all resolved rather to die than to famish? +All: Resolved. Resolved. +First Citizen: First, you know Caius Marcius is chief enemy to the people. +All: We know't, we know't. +First Citizen: Let us kill him, and we'll have corn at our own price. +Is't a verdict? +All: No more talking on't; let it be done: away, away! +Second Citizen: One word, good citizens. +First Citizen: We are accounted poor citizens, the patricians good. +What authority surfeits on would relieve us: if they +would yield us but the superfluity, while it were wholesome, +we might guess they relieved us humanely; but they think we are +too dear: the leanness that afflicts us, the object of our +misery, is as an inventory to particularise their abundance; +our sufferance is a gain to them. Let us revenge this with +our pikes, ere we become rakes: for the gods know I speak this +in hunger for bread, not in thirst for revenge. +""" + + +def get_batch(data: torch.Tensor, block_size: int, batch_size: int, device: str): + """Sample a random batch of (input, target) sequences.""" + ix = torch.randint(len(data) - block_size, (batch_size,)) + x = torch.stack([data[i : i + block_size] for i in ix]) + y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix]) + return x.to(device), y.to(device) + + +def train(args): + # ------------------------------------------------------------------ + # 1. Load / prepare data + # ------------------------------------------------------------------ + if args.data: + with open(args.data, encoding="utf-8") as f: + text = f.read() + else: + print("No --data file provided. Using built-in Shakespeare excerpt.") + text = BUILTIN_TEXT + + tokenizer = CharTokenizer(text) + data = torch.tensor(tokenizer.encode(text), dtype=torch.long) + + split = int(0.9 * len(data)) + train_data, val_data = data[:split], data[split:] + + print(f"Corpus: {len(text):,} chars | vocab: {tokenizer.vocab_size} | " + f"train tokens: {len(train_data):,} | val tokens: {len(val_data):,}") + + # ------------------------------------------------------------------ + # 2. Build model + # ------------------------------------------------------------------ + device = "cuda" if torch.cuda.is_available() else "cpu" + + config = types.SimpleNamespace( + vocab_size=tokenizer.vocab_size, + block_size=args.block_size, + n_embd=args.n_embd, + n_heads=args.n_heads, + n_layers=args.n_layers, + dropout=args.dropout, + ) + + model = LLM(config).to(device) + print(f"Model: {model.num_params():,} parameters | device: {device}") + + optimizer = torch.optim.AdamW(model.parameters(), lr=args.lr) + + # ------------------------------------------------------------------ + # 3. Training loop + # ------------------------------------------------------------------ + best_val_loss = float("inf") + + for step in range(1, args.steps + 1): + model.train() + x, y = get_batch(train_data, args.block_size, args.batch_size, device) + _, loss = model(x, y) + optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + optimizer.step() + + if step % args.eval_interval == 0 or step == args.steps: + model.eval() + with torch.no_grad(): + vx, vy = get_batch(val_data, args.block_size, args.batch_size, device) + _, val_loss = model(vx, vy) + + print(f"step {step:>6} | train loss {loss.item():.4f} | val loss {val_loss.item():.4f}") + + if val_loss.item() < best_val_loss: + best_val_loss = val_loss.item() + torch.save({"model": model.state_dict(), "config": config, "tokenizer": tokenizer}, + args.checkpoint) + + # ------------------------------------------------------------------ + # 4. Sample from the trained model + # ------------------------------------------------------------------ + print("\n--- Generated sample ---") + ckpt = torch.load(args.checkpoint, map_location=device, weights_only=False) + model.load_state_dict(ckpt["model"]) + model.eval() + + seed_text = text[:args.block_size] if len(text) >= args.block_size else text + idx = torch.tensor(tokenizer.encode(seed_text), dtype=torch.long, device=device).unsqueeze(0) + out = model.generate(idx, max_new_tokens=200, temperature=0.8, top_k=20) + print(tokenizer.decode(out[0].tolist())) + + +def parse_args(): + p = argparse.ArgumentParser(description="Train a basic character-level LLM") + p.add_argument("--data", type=str, default=None, help="Path to training text file") + p.add_argument("--checkpoint", type=str, default="ckpt.pt", help="Where to save the best model") + p.add_argument("--block_size", type=int, default=64, help="Context length") + p.add_argument("--batch_size", type=int, default=32, help="Batch size") + p.add_argument("--n_embd", type=int, default=128, help="Embedding dimension") + p.add_argument("--n_heads", type=int, default=4, help="Number of attention heads") + p.add_argument("--n_layers", type=int, default=4, help="Number of Transformer blocks") + p.add_argument("--dropout", type=float, default=0.1, help="Dropout probability") + p.add_argument("--lr", type=float, default=3e-4, help="Learning rate") + p.add_argument("--steps", type=int, default=2000, help="Training steps") + p.add_argument("--eval_interval", type=int, default=200, help="Steps between evaluations") + return p.parse_args() + + +if __name__ == "__main__": + train(parse_args())