"""
DataForSEO keyword enrichment. Adds exact volume, KD, CPC, intent, SERP features, and local pack details
to the high-intent keyword list. Supersedes dataforseo_local_pack_check.py
(this does the local_pack detection AND more). Setup: pip install requests pandas openpyxl export DATAFORSEO_LOGIN="your_login" export DATAFORSEO_PASSWORD="your_password" Run: python dataforseo_enrich.py Input: Remodeling_Keywords_HighIntent.xlsx (sheet: High-Intent Keywords)
Output: Remodeling_Keywords_Enriched.xlsx Cost estimate for 510 keywords: ~$1.50 total - keyword_overview: ~$0.05 (batched) - SERP advanced (only for keywords with local_pack): ~$1.00
Runtime: ~1-2 minutes
""" import os
import sys
import time
import json
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed import requests
import pandas as pd # ---------- CONFIG ----------
INPUT_FILE = "Remodeling_Keywords_HighIntent.xlsx"
INPUT_SHEET = "High-Intent Keywords"
OUTPUT_FILE = "Remodeling_Keywords_Enriched.xlsx" # Target market — change to match the client
LOCATION_NAME = "Vancouver,Washington,United States"
# Alternatives: "Portland,Oregon,United States", "Washington,United States" LANGUAGE_CODE = "en"
OVERVIEW_BATCH = 700 # keyword_overview allows up to 700 per task
SERP_BATCH = 100 # serp live/advanced allows up to 100 per task
MAX_PARALLEL = 5 AGGREGATORS = {"yelp", "angi", "angieslist", "houzz", "thumbtack", "bbb", "yellowpages", "homeadvisor", "porch", "nextdoor"} # ---------- AUTH ----------
login = os.environ.get("DATAFORSEO_LOGIN")
password = os.environ.get("DATAFORSEO_PASSWORD")
if not (login and password): sys.exit("Set DATAFORSEO_LOGIN and DATAFORSEO_PASSWORD env vars first.") auth_header = "Basic " + base64.b64encode(f"{login}:{password}".encode()).decode()
HEADERS = {"Authorization": auth_header, "Content-Type": "application/json"} OVERVIEW_URL = "https://api.dataforseo.com/v3/dataforseo_labs/google/keyword_overview/live"
SERP_URL = "https://api.dataforseo.com/v3/serp/google/organic/live/advanced" def post_with_retry(url, payload, tries=3): for attempt in range(tries): try: r = requests.post(url, headers=HEADERS, data=json.dumps(payload), timeout=90) r.raise_for_status() return r.json().get("tasks", []) except Exception as e: if attempt == tries - 1: print(f" request failed after {tries} tries: {e}") return [] time.sleep(2 ** attempt) # ---------- PHASE 1: keyword_overview ----------
def fetch_overview_batch(keywords): payload = [{ "keywords": keywords, "location_name": LOCATION_NAME, "language_code": LANGUAGE_CODE, "include_serp_info": True, "include_clickstream_data": False, }] return post_with_retry(OVERVIEW_URL, payload) def parse_overview(tasks): out = {} for task in tasks: if task.get("status_code") != 20000: continue for result in (task.get("result") or []): for item in (result.get("items") or []): kw = item.get("keyword") if not kw: continue ki = item.get("keyword_info") or {} kp = item.get("keyword_properties") or {} intent = (item.get("search_intent_info") or {}).get("main_intent") serp_info = item.get("serp_info") or {} item_types = serp_info.get("item_types") or [] monthly = ki.get("monthly_searches") or [] # Compact 12mo history trend_str = "" if monthly: vols = [m.get("search_volume", 0) for m in monthly[-12:]] trend_str = ", ".join(str(v) for v in vols) out[kw] = { "DFS Volume": ki.get("search_volume"), "DFS KD": kp.get("keyword_difficulty"), "DFS CPC ($)": ki.get("cpc"), "DFS Competition": ki.get("competition_level"), "DFS Competition (0-1)": ki.get("competition"), "DFS Low Bid ($)": ki.get("low_top_of_page_bid"), "DFS High Bid ($)": ki.get("high_top_of_page_bid"), "Search Intent": intent, "Has Local Pack": "local_pack" in item_types, "Has Paid Results": "paid" in item_types, "Has Featured Snippet": "featured_snippet" in item_types, "Has People Also Ask": "people_also_ask" in item_types, "SERP Features": ", ".join(sorted(set(item_types))), "SERP Results Count": serp_info.get("se_results_count"), "12mo Volume Trend": trend_str, } return out # ---------- PHASE 2: full SERP for local_pack keywords ----------
def fetch_serp_batch(keywords): payload = [{ "keyword": kw, "location_name": LOCATION_NAME, "language_code": LANGUAGE_CODE, "depth": 10, "device": "desktop", } for kw in keywords] return post_with_retry(SERP_URL, payload) def parse_serp(tasks): out = {} for task in tasks: if task.get("status_code") != 20000: continue kw = (task.get("data") or {}).get("keyword", "") if not kw: continue results = task.get("result") or [] if not results: continue items = results[0].get("items") or [] pack_titles = [] organic_domains = [] for item in items: if item.get("type") == "local_pack": pack_titles.append(item.get("title", "")) elif item.get("type") == "organic" and len(organic_domains)
< 3: organic_domains.append(item.get("domain", "")) lower = " ".join(pack_titles).lower() out[kw] = { "Local Pack Businesses": " | ".join(pack_titles), "Local Pack Count": len(pack_titles), "Pack Has Aggregator": any(a in lower for a in AGGREGATORS), "Top 3 Organic Domains": " | ".join(organic_domains), } return out # ---------- MAIN ----------
def main(): df = pd.read_excel(INPUT_FILE, sheet_name=INPUT_SHEET) keywords = df["Keyword"].dropna().astype(str).tolist() print(f"Enriching {len(keywords)} keywords against {LOCATION_NAME}") # Phase 1 print("\n[1/2] keyword_overview...") overview_data = {} overview_batches = [keywords[i:i+OVERVIEW_BATCH] for i in range(0, len(keywords), OVERVIEW_BATCH)] with ThreadPoolExecutor(max_workers=MAX_PARALLEL) as pool: futures = {pool.submit(fetch_overview_batch, b): i for i, b in enumerate(overview_batches)} for fut in as_completed(futures): i = futures[fut] overview_data.update(parse_overview(fut.result())) print(f" batch {i+1}/{len(overview_batches)} done") print(f" got overview for {len(overview_data)}/{len(keywords)} keywords") # Phase 2 — only for keywords where local_pack appears local_pack_kws = [kw for kw, d in overview_data.items() if d.get("Has Local Pack")] print(f"\n[2/2] Full SERP for {len(local_pack_kws)} keywords with local_pack...") serp_data = {} serp_batches = [local_pack_kws[i:i+SERP_BATCH] for i in range(0, len(local_pack_kws), SERP_BATCH)] with ThreadPoolExecutor(max_workers=MAX_PARALLEL) as pool: futures = {pool.submit(fetch_serp_batch, b): i for i, b in enumerate(serp_batches)} for fut in as_completed(futures): i = futures[fut] serp_data.update(parse_serp(fut.result())) print(f" batch {i+1}/{len(serp_batches)} done") # Merge print("\nMerging...") enriched_rows = [] for _, row in df.iterrows(): kw = row["Keyword"] base = row.to_dict() base.update(overview_data.get(kw, {})) base.update(serp_data.get(kw, {})) enriched_rows.append(base) enriched = pd.DataFrame(enriched_rows) # Put key DFS columns up front for readability priority_cols = ["Keyword", "High Intent Signal", "DFS Volume", "DFS KD", "DFS CPC ($)", "DFS Competition", "Search Intent", "Has Local Pack", "Pack Has Aggregator", "Local Pack Businesses", "Top 3 Organic Domains"] existing = [c for c in priority_cols if c in enriched.columns] rest = [c for c in enriched.columns if c not in existing] enriched = enriched[existing + rest] # Sort by DFS Volume desc enriched = enriched.sort_values("DFS Volume", ascending=False, na_position="last") enriched.to_excel(OUTPUT_FILE, sheet_name="Enriched Keywords", index=False) # Summary stats total = len(enriched) with_vol = enriched["DFS Volume"].notna().sum() avg_kd = enriched["DFS KD"].dropna().mean() with_pack = enriched["Has Local Pack"].sum() with_agg = enriched["Pack Has Aggregator"].sum() if "Pack Has Aggregator" in enriched else 0 print(f"\n--- Summary ---") print(f"Keywords enriched: {with_vol}/{total}") print(f"Avg Keyword Difficulty: {avg_kd:.1f}") print(f"Has local pack: {with_pack}/{total} ({with_pack/total:.0%})") print(f"Pack has aggregator: {with_agg}/{with_pack}") print(f"\nSaved: {OUTPUT_FILE}") print("Upload this file back to the chat and I'll run priority scoring + final filters.") if __name__ == "__main__": main()