mirror of
https://github.com/codecrafters-io/build-your-own-x
synced 2026-07-02 16:59:25 +00:00
Add basic LLM implementation from scratch
Implements a character-level GPT-style Transformer: - model.py: CausalSelfAttention, FeedForward, TransformerBlock, LLM - tokenizer.py: CharTokenizer (char -> int mapping) - train.py: training loop with AdamW, gradient clipping, checkpointing, sampling - generate.py: load checkpoint and generate text from a prompt Verified working on a built-in Shakespeare excerpt (805k param model). https://claude.ai/code/session_01SWXLQb3nFTiygbp74dpjVa
This commit is contained in:
parent
c439f35496
commit
1d3ce8cff7
7 changed files with 391 additions and 0 deletions
BIN
llm/__pycache__/model.cpython-311.pyc
Normal file
BIN
llm/__pycache__/model.cpython-311.pyc
Normal file
Binary file not shown.
BIN
llm/__pycache__/tokenizer.cpython-311.pyc
Normal file
BIN
llm/__pycache__/tokenizer.cpython-311.pyc
Normal file
Binary file not shown.
BIN
llm/ckpt.pt
Normal file
BIN
llm/ckpt.pt
Normal file
Binary file not shown.
50
llm/generate.py
Normal file
50
llm/generate.py
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
"""
|
||||
Generate text from a saved checkpoint.
|
||||
|
||||
Usage:
|
||||
python generate.py --checkpoint ckpt.pt --prompt "First Citizen:" --tokens 300
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import torch
|
||||
from model import LLM
|
||||
|
||||
|
||||
def generate(args):
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
ckpt = torch.load(args.checkpoint, map_location=device, weights_only=False)
|
||||
config = ckpt["config"]
|
||||
tokenizer = ckpt["tokenizer"]
|
||||
|
||||
model = LLM(config).to(device)
|
||||
model.load_state_dict(ckpt["model"])
|
||||
model.eval()
|
||||
|
||||
prompt = args.prompt or ""
|
||||
if prompt:
|
||||
idx = torch.tensor(tokenizer.encode(prompt), dtype=torch.long, device=device).unsqueeze(0)
|
||||
else:
|
||||
# Start from a single newline token
|
||||
idx = torch.zeros((1, 1), dtype=torch.long, device=device)
|
||||
|
||||
out = model.generate(idx, max_new_tokens=args.tokens,
|
||||
temperature=args.temperature, top_k=args.top_k)
|
||||
print(tokenizer.decode(out[0].tolist()))
|
||||
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description="Generate text from a trained LLM checkpoint")
|
||||
p.add_argument("--checkpoint", type=str, default="ckpt.pt", help="Path to checkpoint")
|
||||
p.add_argument("--prompt", type=str, default="", help="Seed text")
|
||||
p.add_argument("--tokens", type=int, default=300, help="Tokens to generate")
|
||||
p.add_argument("--temperature", type=float, default=0.8, help="Sampling temperature")
|
||||
p.add_argument("--top_k", type=int, default=20, help="Top-k sampling (0 = off)")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parse_args()
|
||||
if args.top_k == 0:
|
||||
args.top_k = None
|
||||
generate(args)
|
||||
171
llm/model.py
Normal file
171
llm/model.py
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
"""
|
||||
Basic LLM: a character-level GPT-style Transformer built from scratch.
|
||||
|
||||
Architecture:
|
||||
- Token + positional embeddings
|
||||
- N Transformer blocks (masked multi-head self-attention + feed-forward)
|
||||
- Layer norm + linear head
|
||||
"""
|
||||
|
||||
import math
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
|
||||
|
||||
class CausalSelfAttention(nn.Module):
|
||||
"""Multi-head self-attention with a causal (look-ahead) mask."""
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
assert config.n_embd % config.n_heads == 0
|
||||
self.n_heads = config.n_heads
|
||||
self.head_dim = config.n_embd // config.n_heads
|
||||
self.n_embd = config.n_embd
|
||||
|
||||
# Fused Q/K/V projection
|
||||
self.qkv = nn.Linear(config.n_embd, 3 * config.n_embd, bias=False)
|
||||
self.proj = nn.Linear(config.n_embd, config.n_embd, bias=False)
|
||||
self.dropout = nn.Dropout(config.dropout)
|
||||
|
||||
# Causal mask: upper-triangular, registered as a non-parameter buffer
|
||||
mask = torch.triu(torch.ones(config.block_size, config.block_size), diagonal=1).bool()
|
||||
self.register_buffer("mask", mask)
|
||||
|
||||
def forward(self, x):
|
||||
B, T, C = x.shape
|
||||
|
||||
q, k, v = self.qkv(x).split(self.n_embd, dim=2)
|
||||
|
||||
# Reshape to (B, n_heads, T, head_dim)
|
||||
def reshape(t):
|
||||
return t.view(B, T, self.n_heads, self.head_dim).transpose(1, 2)
|
||||
|
||||
q, k, v = reshape(q), reshape(k), reshape(v)
|
||||
|
||||
# Scaled dot-product attention
|
||||
scale = math.sqrt(self.head_dim)
|
||||
scores = (q @ k.transpose(-2, -1)) / scale # (B, nh, T, T)
|
||||
scores = scores.masked_fill(self.mask[:T, :T], float("-inf"))
|
||||
weights = F.softmax(scores, dim=-1)
|
||||
weights = self.dropout(weights)
|
||||
|
||||
out = weights @ v # (B, nh, T, hd)
|
||||
out = out.transpose(1, 2).contiguous().view(B, T, C) # (B, T, C)
|
||||
return self.proj(out)
|
||||
|
||||
|
||||
class FeedForward(nn.Module):
|
||||
"""Position-wise feed-forward network (expand → GELU → contract)."""
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
self.net = nn.Sequential(
|
||||
nn.Linear(config.n_embd, 4 * config.n_embd),
|
||||
nn.GELU(),
|
||||
nn.Linear(4 * config.n_embd, config.n_embd),
|
||||
nn.Dropout(config.dropout),
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
return self.net(x)
|
||||
|
||||
|
||||
class TransformerBlock(nn.Module):
|
||||
"""Self-attention + feed-forward with pre-layer-norm residual connections."""
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
self.ln1 = nn.LayerNorm(config.n_embd)
|
||||
self.attn = CausalSelfAttention(config)
|
||||
self.ln2 = nn.LayerNorm(config.n_embd)
|
||||
self.ff = FeedForward(config)
|
||||
|
||||
def forward(self, x):
|
||||
x = x + self.attn(self.ln1(x))
|
||||
x = x + self.ff(self.ln2(x))
|
||||
return x
|
||||
|
||||
|
||||
class LLM(nn.Module):
|
||||
"""
|
||||
Tiny GPT-style language model.
|
||||
|
||||
config fields:
|
||||
vocab_size – number of tokens
|
||||
block_size – maximum context length
|
||||
n_embd – embedding dimension
|
||||
n_heads – number of attention heads
|
||||
n_layers – number of Transformer blocks
|
||||
dropout – dropout probability
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
|
||||
self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd)
|
||||
self.pos_emb = nn.Embedding(config.block_size, config.n_embd)
|
||||
self.drop = nn.Dropout(config.dropout)
|
||||
self.blocks = nn.Sequential(*[TransformerBlock(config) for _ in range(config.n_layers)])
|
||||
self.ln_f = nn.LayerNorm(config.n_embd)
|
||||
self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
|
||||
|
||||
# Weight tying: share token embedding and output projection weights
|
||||
self.head.weight = self.tok_emb.weight
|
||||
|
||||
self.apply(self._init_weights)
|
||||
|
||||
def _init_weights(self, module):
|
||||
if isinstance(module, nn.Linear):
|
||||
nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
||||
if module.bias is not None:
|
||||
nn.init.zeros_(module.bias)
|
||||
elif isinstance(module, nn.Embedding):
|
||||
nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
||||
|
||||
def forward(self, idx, targets=None):
|
||||
B, T = idx.shape
|
||||
assert T <= self.config.block_size, "Sequence longer than block_size"
|
||||
|
||||
positions = torch.arange(T, device=idx.device)
|
||||
x = self.drop(self.tok_emb(idx) + self.pos_emb(positions))
|
||||
x = self.blocks(x)
|
||||
x = self.ln_f(x)
|
||||
logits = self.head(x) # (B, T, vocab_size)
|
||||
|
||||
loss = None
|
||||
if targets is not None:
|
||||
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
|
||||
|
||||
return logits, loss
|
||||
|
||||
@torch.no_grad()
|
||||
def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
|
||||
"""
|
||||
Autoregressively generate tokens given a conditioning sequence.
|
||||
|
||||
Args:
|
||||
idx – (1, T) tensor of starting token ids
|
||||
max_new_tokens – number of tokens to generate
|
||||
temperature – >1 = more random, <1 = more focused
|
||||
top_k – if set, restrict sampling to the top-k logits
|
||||
"""
|
||||
for _ in range(max_new_tokens):
|
||||
# Crop to block_size if needed
|
||||
idx_cond = idx[:, -self.config.block_size:]
|
||||
logits, _ = self(idx_cond)
|
||||
logits = logits[:, -1, :] / temperature # last time-step
|
||||
|
||||
if top_k is not None:
|
||||
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
|
||||
logits[logits < v[:, [-1]]] = float("-inf")
|
||||
|
||||
probs = F.softmax(logits, dim=-1)
|
||||
next_token = torch.multinomial(probs, num_samples=1)
|
||||
idx = torch.cat([idx, next_token], dim=1)
|
||||
|
||||
return idx
|
||||
|
||||
def num_params(self):
|
||||
return sum(p.numel() for p in self.parameters())
|
||||
20
llm/tokenizer.py
Normal file
20
llm/tokenizer.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
"""
|
||||
Character-level tokenizer.
|
||||
|
||||
Maps every unique character in the training corpus to an integer id.
|
||||
Simple, requires no external libraries, and good enough for a tiny LLM.
|
||||
"""
|
||||
|
||||
|
||||
class CharTokenizer:
|
||||
def __init__(self, text: str):
|
||||
chars = sorted(set(text))
|
||||
self.vocab_size = len(chars)
|
||||
self._stoi = {ch: i for i, ch in enumerate(chars)}
|
||||
self._itos = {i: ch for i, ch in enumerate(chars)}
|
||||
|
||||
def encode(self, text: str) -> list[int]:
|
||||
return [self._stoi[ch] for ch in text]
|
||||
|
||||
def decode(self, ids: list[int]) -> str:
|
||||
return "".join(self._itos[i] for i in ids)
|
||||
150
llm/train.py
Normal file
150
llm/train.py
Normal file
|
|
@ -0,0 +1,150 @@
|
|||
"""
|
||||
Training script for the basic LLM.
|
||||
|
||||
Usage:
|
||||
# Train on a text file (defaults to a tiny built-in dataset if omitted)
|
||||
python train.py --data path/to/corpus.txt
|
||||
|
||||
# Quick smoke-test on the built-in dataset
|
||||
python train.py
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import types
|
||||
import random
|
||||
import torch
|
||||
from model import LLM
|
||||
from tokenizer import CharTokenizer
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tiny built-in corpus (Shakespeare excerpt) used when no file is provided
|
||||
# ---------------------------------------------------------------------------
|
||||
BUILTIN_TEXT = """\
|
||||
First Citizen: Before we proceed any further, hear me speak.
|
||||
All: Speak, speak.
|
||||
First Citizen: You are all resolved rather to die than to famish?
|
||||
All: Resolved. Resolved.
|
||||
First Citizen: First, you know Caius Marcius is chief enemy to the people.
|
||||
All: We know't, we know't.
|
||||
First Citizen: Let us kill him, and we'll have corn at our own price.
|
||||
Is't a verdict?
|
||||
All: No more talking on't; let it be done: away, away!
|
||||
Second Citizen: One word, good citizens.
|
||||
First Citizen: We are accounted poor citizens, the patricians good.
|
||||
What authority surfeits on would relieve us: if they
|
||||
would yield us but the superfluity, while it were wholesome,
|
||||
we might guess they relieved us humanely; but they think we are
|
||||
too dear: the leanness that afflicts us, the object of our
|
||||
misery, is as an inventory to particularise their abundance;
|
||||
our sufferance is a gain to them. Let us revenge this with
|
||||
our pikes, ere we become rakes: for the gods know I speak this
|
||||
in hunger for bread, not in thirst for revenge.
|
||||
"""
|
||||
|
||||
|
||||
def get_batch(data: torch.Tensor, block_size: int, batch_size: int, device: str):
|
||||
"""Sample a random batch of (input, target) sequences."""
|
||||
ix = torch.randint(len(data) - block_size, (batch_size,))
|
||||
x = torch.stack([data[i : i + block_size] for i in ix])
|
||||
y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix])
|
||||
return x.to(device), y.to(device)
|
||||
|
||||
|
||||
def train(args):
|
||||
# ------------------------------------------------------------------
|
||||
# 1. Load / prepare data
|
||||
# ------------------------------------------------------------------
|
||||
if args.data:
|
||||
with open(args.data, encoding="utf-8") as f:
|
||||
text = f.read()
|
||||
else:
|
||||
print("No --data file provided. Using built-in Shakespeare excerpt.")
|
||||
text = BUILTIN_TEXT
|
||||
|
||||
tokenizer = CharTokenizer(text)
|
||||
data = torch.tensor(tokenizer.encode(text), dtype=torch.long)
|
||||
|
||||
split = int(0.9 * len(data))
|
||||
train_data, val_data = data[:split], data[split:]
|
||||
|
||||
print(f"Corpus: {len(text):,} chars | vocab: {tokenizer.vocab_size} | "
|
||||
f"train tokens: {len(train_data):,} | val tokens: {len(val_data):,}")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 2. Build model
|
||||
# ------------------------------------------------------------------
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
config = types.SimpleNamespace(
|
||||
vocab_size=tokenizer.vocab_size,
|
||||
block_size=args.block_size,
|
||||
n_embd=args.n_embd,
|
||||
n_heads=args.n_heads,
|
||||
n_layers=args.n_layers,
|
||||
dropout=args.dropout,
|
||||
)
|
||||
|
||||
model = LLM(config).to(device)
|
||||
print(f"Model: {model.num_params():,} parameters | device: {device}")
|
||||
|
||||
optimizer = torch.optim.AdamW(model.parameters(), lr=args.lr)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 3. Training loop
|
||||
# ------------------------------------------------------------------
|
||||
best_val_loss = float("inf")
|
||||
|
||||
for step in range(1, args.steps + 1):
|
||||
model.train()
|
||||
x, y = get_batch(train_data, args.block_size, args.batch_size, device)
|
||||
_, loss = model(x, y)
|
||||
optimizer.zero_grad()
|
||||
loss.backward()
|
||||
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
|
||||
optimizer.step()
|
||||
|
||||
if step % args.eval_interval == 0 or step == args.steps:
|
||||
model.eval()
|
||||
with torch.no_grad():
|
||||
vx, vy = get_batch(val_data, args.block_size, args.batch_size, device)
|
||||
_, val_loss = model(vx, vy)
|
||||
|
||||
print(f"step {step:>6} | train loss {loss.item():.4f} | val loss {val_loss.item():.4f}")
|
||||
|
||||
if val_loss.item() < best_val_loss:
|
||||
best_val_loss = val_loss.item()
|
||||
torch.save({"model": model.state_dict(), "config": config, "tokenizer": tokenizer},
|
||||
args.checkpoint)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 4. Sample from the trained model
|
||||
# ------------------------------------------------------------------
|
||||
print("\n--- Generated sample ---")
|
||||
ckpt = torch.load(args.checkpoint, map_location=device, weights_only=False)
|
||||
model.load_state_dict(ckpt["model"])
|
||||
model.eval()
|
||||
|
||||
seed_text = text[:args.block_size] if len(text) >= args.block_size else text
|
||||
idx = torch.tensor(tokenizer.encode(seed_text), dtype=torch.long, device=device).unsqueeze(0)
|
||||
out = model.generate(idx, max_new_tokens=200, temperature=0.8, top_k=20)
|
||||
print(tokenizer.decode(out[0].tolist()))
|
||||
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description="Train a basic character-level LLM")
|
||||
p.add_argument("--data", type=str, default=None, help="Path to training text file")
|
||||
p.add_argument("--checkpoint", type=str, default="ckpt.pt", help="Where to save the best model")
|
||||
p.add_argument("--block_size", type=int, default=64, help="Context length")
|
||||
p.add_argument("--batch_size", type=int, default=32, help="Batch size")
|
||||
p.add_argument("--n_embd", type=int, default=128, help="Embedding dimension")
|
||||
p.add_argument("--n_heads", type=int, default=4, help="Number of attention heads")
|
||||
p.add_argument("--n_layers", type=int, default=4, help="Number of Transformer blocks")
|
||||
p.add_argument("--dropout", type=float, default=0.1, help="Dropout probability")
|
||||
p.add_argument("--lr", type=float, default=3e-4, help="Learning rate")
|
||||
p.add_argument("--steps", type=int, default=2000, help="Training steps")
|
||||
p.add_argument("--eval_interval", type=int, default=200, help="Steps between evaluations")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
train(parse_args())
|
||||
Loading…
Reference in a new issue