Content Extraction at Scale: Building a Pipeline That Processes 100+ Sources
DevTrends started as a side project to track developer news. I figured I'd scrape a few RSS feeds and call it done. Six months later, it processes 120+ sources, extracts content from thousands of arti
Content Extraction at Scale: Building a Pipeline That Processes 100+ Sources
DevTrends started as a side project to track developer news. I figured I'd scrape a few RSS feeds and call it done. Six months later, it processes 120+ sources, extracts content from thousands of articles a week, and has a deduplication pipeline that catches the same article published across multiple outlets.
The architecture evolved out of necessity. Here's what I ended up with.
The Source Problem
Developer content comes from everywhere: RSS feeds, Atom feeds, JSON APIs, newsletters, Twitter/X accounts, Reddit, Hacker News. Each source has its own format, its own quirks, and its own failure modes.
The naive approach is a big switch statement:
// Don't do this
foreach ($sources as $source) {
match ($source->type) {
'rss' => $this->fetchRss($source),
'atom' => $this->fetchAtom($source),
'api' => $this->fetchApi($source),
'reddit' => $this->fetchReddit($source),
// ... 10 more types
};
}
This doesn't scale. Each source type has different authentication, rate limiting, pagination, and error handling. You end up with a god class that knows about everything.
The Fetcher Pattern
Instead, each source type gets its own fetcher class that implements a common interface:
interface ContentFetcher
{
/** @return Collection<ContentItem> */
public function fetch(Source $source): Collection;
public function supports(Source $source): bool;
}
class RssFetcher implements ContentFetcher
{
public function __construct(
private HttpClient $http,
private RssParser $parser,
) {}
public function fetch(Source $source): Collection
{
$response = $this->http->get($source->url, [
'timeout' => 30,
'headers' => $source->auth_headers ?? [],
]);
return $this->parser->parse($response->body())
->map(fn (array $item) => new ContentItem(
sourceId: $source->id,
url: $item['link'],
title: $item['title'],
content: $item['description'] ?? '',
publishedAt: Carbon::parse($item['pubDate']),
metadata: $item,
));
}
public function supports(Source $source): bool
{
return $source->type === SourceType::Rss;
}
}
class HackerNewsFetcher implements ContentFetcher
{
public function fetch(Source $source): Collection
{
$stories = $this->http
->get('https://hacker-news.firebaseio.com/v0/topstories.json')
->json();
return collect(array_slice($stories, 0, 30))
->map(fn (int $id) => $this->fetchStory($id))
->filter()
->map(fn (array $story) => new ContentItem(
sourceId: $source->id,
url: $story['url'] ?? "https://news.ycombinator.com/item?id={$story['id']}",
title: $story['title'],
content: '',
publishedAt: Carbon::createFromTimestamp($story['time']),
metadata: $story,
));
}
public function supports(Source $source): bool
{
return $source->type === SourceType::HackerNews;
}
}
A resolver picks the right fetcher:
class FetcherResolver
{
/** @param ContentFetcher[] $fetchers */
public function __construct(private array $fetchers) {}
public function resolve(Source $source): ContentFetcher
{
foreach ($this->fetchers as $fetcher) {
if ($fetcher->supports($source)) {
return $fetcher;
}
}
throw new UnsupportedSourceException($source->type);
}
}
Adding a new source type means writing one class. Nothing else changes.
URL Deduplication
The first line of defence against duplicates is the URL. Before processing any content, we check if we've seen this URL before:
class DeduplicateByUrl
{
public function execute(ContentItem $item): bool
{
$urlHash = hash('xxh128', $this->normaliseUrl($item->url));
return ContentEntry::where('url_hash', $urlHash)->exists();
}
private function normaliseUrl(string $url): string
{
$parsed = parse_url($url);
// Strip tracking parameters
$query = [];
if (isset($parsed['query'])) {
parse_str($parsed['query'], $params);
$query = collect($params)
->forget(['utm_source', 'utm_medium', 'utm_campaign', 'ref', 'source'])
->sortKeys()
->all();
}
return sprintf(
'%s://%s%s%s',
$parsed['scheme'] ?? 'https',
strtolower($parsed['host']),
rtrim($parsed['path'] ?? '/', '/'),
$query ? '?' . http_build_query($query) : ''
);
}
}
URL normalisation is important. The same article might appear as:
https://example.com/article?utm_source=twitterhttps://example.com/article?utm_source=newsletterhttps://example.com/article/https://EXAMPLE.com/article
All four should resolve to the same hash.
Content Hash Deduplication
URL dedup catches the same article shared from different referrers. But what about syndicated content? The same article published on Medium, Dev.to, and the author's blog has three different URLs but identical content.
That's where content hashing comes in:
class DeduplicateByContent
{
public function execute(ContentItem $item): bool
{
if (strlen($item->content) < 200) {
return false; // Too short to hash meaningfully
}
$contentHash = hash('xxh128', $this->normaliseContent($item->content));
return ContentEntry::where('content_hash', $contentHash)
->where('id', '!=', $item->id ?? 0)
->exists();
}
private function normaliseContent(string $content): string
{
// Strip HTML
$text = strip_tags($content);
// Normalise whitespace
$text = preg_replace('/\s+/', ' ', $text);
// Lowercase
$text = strtolower(trim($text));
// Take first 2000 chars for hashing
// (syndicated content often has different footers)
return substr($text, 0, 2000);
}
}
We only hash the first 2000 characters because syndicated posts often have different author bios, CTAs, or footers. The meat of the article is usually identical from the start.
The Pipeline
Each content item flows through a pipeline of stages:
class ProcessContentPipeline
{
public function __construct(
private DeduplicateByUrl $urlDedup,
private DeduplicateByContent $contentDedup,
private ExtractFullContent $extractor,
private ClassifyContent $classifier,
) {}
public function execute(ContentItem $item): ?ContentEntry
{
// Stage 1: URL dedup
if ($this->urlDedup->execute($item)) {
return null;
}
// Stage 2: Create entry with pending status
$entry = ContentEntry::create([
'source_id' => $item->sourceId,
'url' => $item->url,
'url_hash' => hash('xxh128', $item->url),
'title' => $item->title,
'raw_content' => $item->content,
'published_at' => $item->publishedAt,
'extraction_status' => ExtractionStatus::Pending,
]);
// Stage 3: Extract full content
try {
$fullContent = $this->extractor->execute($entry);
$entry->update([
'extracted_content' => $fullContent,
'content_hash' => hash('xxh128', $fullContent),
'extraction_status' => ExtractionStatus::Complete,
]);
} catch (ExtractionFailedException $e) {
$entry->update([
'extraction_status' => ExtractionStatus::Failed,
'extraction_error' => $e->getMessage(),
]);
return $entry;
}
// Stage 4: Content dedup (needs extracted content)
if ($this->contentDedup->execute($entry)) {
$entry->update(['is_duplicate' => true]);
return $entry;
}
// Stage 5: Classify
$this->classifier->execute($entry);
return $entry;
}
}
Extraction Status Tracking
Not every extraction succeeds first time. Sites go down, return CAPTCHAs, or have weird JavaScript rendering requirements. The extraction_status column tracks where each entry is:
enum ExtractionStatus: string
{
case Pending = 'pending';
case Complete = 'complete';
case Failed = 'failed';
case Retrying = 'retrying';
case Skipped = 'skipped';
}
A scheduled command retries failed extractions:
// In routes/console.php
Schedule::command('content:retry-failed')
->hourly()
->withoutOverlapping();
class RetryFailedExtractions extends Command
{
protected $signature = 'content:retry-failed {--limit=50}';
public function handle(ExtractFullContent $extractor): int
{
$entries = ContentEntry::query()
->where('extraction_status', ExtractionStatus::Failed)
->where('extraction_attempts', '<', 3)
->where('updated_at', '<', now()->subHour())
->limit($this->option('limit'))
->get();
foreach ($entries as $entry) {
try {
$content = $extractor->execute($entry);
$entry->update([
'extracted_content' => $content,
'content_hash' => hash('xxh128', $content),
'extraction_status' => ExtractionStatus::Complete,
'extraction_attempts' => $entry->extraction_attempts + 1,
]);
} catch (ExtractionFailedException) {
$entry->update([
'extraction_attempts' => $entry->extraction_attempts + 1,
]);
}
}
$this->info("Retried {$entries->count()} entries.");
return self::SUCCESS;
}
}
After 3 failed attempts, we leave it alone. Some content just can't be extracted — paywalled articles, PDFs, interactive pages. That's fine.
Queue Architecture
Each source fetch is a queued job, running on a dedicated queue:
class FetchSourceContent implements ShouldQueue
{
public int $tries = 2;
public int $backoff = 60;
public function __construct(public Source $source) {}
public function handle(FetcherResolver $resolver, ProcessContentPipeline $pipeline): void
{
$fetcher = $resolver->resolve($this->source);
$items = $fetcher->fetch($this->source);
foreach ($items as $item) {
$pipeline->execute($item);
}
$this->source->update(['last_fetched_at' => now()]);
}
}
The scheduler dispatches these:
Schedule::call(function () {
Source::where('active', true)
->where(function ($query) {
$query->whereNull('last_fetched_at')
->orWhere('last_fetched_at', '<', now()->subMinutes(30));
})
->each(fn (Source $source) => FetchSourceContent::dispatch($source));
})->everyFifteenMinutes();
Sources that fail don't block other sources. A broken RSS feed just means that job fails and retries, while the other 119 sources process normally.
The Numbers
On a typical day, the pipeline:
- Fetches from 120+ sources
- Processes 2,000-3,000 content items
- Deduplicates roughly 40% by URL (cross-posted content)
- Deduplicates another 5-10% by content hash (syndicated content)
- Successfully extracts full content from ~90% of remaining items
- Runs on a single 4GB VPS with Horizon managing the queues
The whole thing costs about four quid a month in compute. Not everything needs Kubernetes.
I write about Laravel, AI tooling, and building software. More at stuartmason.co.uk.
Get the Friday email
What I shipped this week, what I learned, one useful thing.
No spam. Unsubscribe anytime. Privacy policy.