1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
| import re import asyncio import random import pandas as pd from playwright.async_api import async_playwright
CONCURRENCY = 5 MIN_DELAY = 2 MAX_DELAY = 8
USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15" ]
def parse_follower_count(text): """解析粉丝数量文本,支持K、M等单位""" try: text = text.strip().upper() text = text.replace(',', '') if 'K' in text: number = float(text.replace('K', '')) return int(number * 1000) elif 'M' in text: number = float(text.replace('M', '')) return int(number * 1000000) elif 'B' in text: number = float(text.replace('B', '')) return int(number * 1000000000) else: return int(float(text)) except: return None
async def crawl_followers(index, homepage): """异步爬取单个主页的粉丝数量""" result_msg = "未抓取到粉丝数" try: await asyncio.sleep(random.uniform(MIN_DELAY, MAX_DELAY)) async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=random.choice(USER_AGENTS), viewport={"width": 1920, "height": 1080} ) page = await context.new_page() try: await page.goto(homepage, timeout=30000, wait_until="domcontentloaded") if random.random() > 0.3: await page.mouse.move( random.randint(0, 500), random.randint(0, 500) ) await page.evaluate("window.scrollBy(0, window.innerHeight/2)") await asyncio.sleep(random.uniform(0.5, 1.5)) try: selectors = [ '[data-e2e="followers-count"]', '[data-e2e="followers-number"]', 'strong[data-e2e="followers-count"]', 'strong[title*="Followers"]', '[title*="Followers"]' ] follower_text = None for selector in selectors: try: await page.wait_for_selector(selector, timeout=8000) follower_text = await page.locator(selector).first.inner_text() if follower_text: break except: continue if not follower_text: try: followers_element = page.locator('strong:near(:text("Followers"))') follower_text = await followers_element.first.inner_text(timeout=8000) except: pass if follower_text: count = parse_follower_count(follower_text) if count is not None: return index, count else: result_msg = f"解析失败: {follower_text}" else: result_msg = "页面元素未找到" except Exception as e: result_msg = "页面元素加载失败" except Exception as e: result_msg = f"页面加载失败: {str(e)[:50]}" finally: await browser.close() except Exception as e: result_msg = f"浏览器错误: {str(e)[:50]}" return index, result_msg
async def main(): print(f"启动粉丝数重试爬虫,并发数: {CONCURRENCY}") file_name = "tiktok_links_table_followers" column_name = "tiktok_url"
file_path = "./result/" + file_name + ".xlsx" df = pd.read_excel(file_path) df["粉丝数"] = df["粉丝数"].astype(str)
output_path = "./retry/" + file_name + ".xlsx"
retry_condition = ( df["粉丝数"].str.contains("失败", na=False) | df["粉丝数"].str.contains("未抓取到", na=False) | df["粉丝数"].str.contains("无效链接", na=False) | df["粉丝数"].str.contains("元素", na=False) | df["粉丝数"].str.contains("浏览器", na=False) | df["粉丝数"].str.lower().isin(["nan", "none", "null", ""]) ) retry_df = df[retry_condition] if len(retry_df) == 0: print("没有需要重试的记录") return print(f"发现 {len(retry_df)} 条需要重试的记录") tasks = [] for index, row in retry_df.iterrows(): homepage = row[column_name] if pd.isna(homepage) or not isinstance(homepage, str) or not homepage.startswith(('http', 'www')): df.at[index, "粉丝数"] = "无效链接格式" continue tasks.append((index, homepage.strip())) total = len(tasks) success_count = 0 for i in range(0, total, CONCURRENCY): batch = tasks[i:i + CONCURRENCY] batch_tasks = [crawl_followers(idx, url) for idx, url in batch] batch_num = i // CONCURRENCY + 1 total_batches = (total + CONCURRENCY - 1) // CONCURRENCY print(f"\n处理批次 {batch_num}/{total_batches} (任务 {i+1}-{min(i+CONCURRENCY, total)})") results = await asyncio.gather(*batch_tasks) for index, result in results: original_status = df.at[index, "粉丝数"] df.at[index, "粉丝数"] = result if isinstance(result, int): status = "✅ 成功" success_count += 1 else: status = "❌ 失败" print(f"行{index+2}: {status} | 原状态: {original_status} | 新结果: {result}") df.to_excel(output_path, index=False) print("\n" + "="*50) print(f"任务完成,文件已保存: {output_path}") print(f"总记录数: {len(df)}") print(f"本次处理: {len(retry_df)} 条") print(f"成功抓取: {success_count} 条") print(f"仍失败: {len(retry_df) - success_count} 条") if success_count > 0: numeric_followers = df[df['粉丝数'].apply(lambda x: isinstance(x, (int, float)) and str(x).isdigit())] if len(numeric_followers) > 0: numeric_followers['粉丝数'] = pd.to_numeric(numeric_followers['粉丝数'], errors='coerce') print(f"- 平均粉丝数: {int(numeric_followers['粉丝数'].mean())}人") print(f"- 最高粉丝数: {int(numeric_followers['粉丝数'].max())}人") print(f"- 最低粉丝数: {int(numeric_followers['粉丝数'].min())}人") print("="*50)
if __name__ == "__main__": asyncio.run(main())
|