
2025年最新LangChain Agent教程:从入门到精通
随着数据建模成为处理随机性的强大资源,数据解决方案已经出现在整个社会的多项任务中。在本系列文章中,我们的目标是构建一个解决方案,用于根据通过网络收集的数据预测足球比赛结果。
首先,我们需要设定我们项目的主要目标,这将是我们工作的核心。因此,正如我所写的那样,国际足联世界杯临近,人们通常会聚集在一起为比赛和锦标赛的结果下注,希望赢得一些奖品或只是为了好玩。这种行为是一种全球现象,体育博彩是一个亿万富翁的行业,吸引了数百万寻求赔率的人。
在这种情况下出现了该项目的主要目标,即构建一种工具,可以帮助我们通过足球博彩市场的博彩策略获利。然后,根据我们的目标集,我们可以开始构建引导我们实现目标的步骤。
好的,我们的最终目标已经确定,但现在它是一个模糊而遥远的目标。做一些逆向工程可能是清理我们道路的好主意。我想到的第一件事是构建一个应用程序,它允许我们根据某种预测来模拟投注策略。因此,正如您所想象的那样,我们预测结果的框架将是一个机器或深度学习模型,因为我们希望使用过去的信息来预测未来事件。话虽如此,我们需要收集网络中某处输入的数据,顺便说一下,这是第一篇文章的主题。
在进一步研究网络抓取之前,让我们总结一下刚才描述的步骤。
我在网上查看时发现了一些不同的数据源,并且每个数据源都是针对特定目的而选择的。我将介绍并解释它们及其特点。我们要构建的第一个数据集是我们用来为我们的模型提供足球比赛过去统计数据和事件的数据集。我们从中抓取的来源是fbref ,您可以在我们的repo中的文件 sample.pkl 中预览我们的抓取结果。看一下fbref主页结构:
FBRef 主页
继续,我们的目标是获得一个时间序列数据集,其中包含此页面、几个日期中可用的信息,以及比赛报告链接中包含的信息,其中包含有关比赛的更多具体统计信息。下图中还有一个匹配报告示例。
Fbref 匹配报告
然后,通过查看网站及其结构,很明显我们不需要处理 JavaScript 代码,这会使我们的抓取任务稍微复杂一些,所以我们从现在开始使用BeautifulSoup 。我们现在应该根据我们需要的信息来规划我们的抓取结构,因为抓取器线性工作以捕获我们想要的信息。该代码嵌入在类“scrapper”中,并且在其中实现了它的功能。
class scrapper:
"""
Class used to scrap football data
:param path: The chrome driver path in your computer. Only used to get today matches information.
:def getMatches(): Gets past matches information from the leagues chosen in a certain period.
Uses beautifulSoup framework
:def getMatchesToday(): Gets predicted lineups and odds about matches to be played today.
Uses selenium framework
"""
def __init__(self, path='D:/chromedriver_win32/chromedriver.exe'):
self.originLink = 'https://fbref.com'
self.path=path
self.baseFolder = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
self.dataFolder = os.path.join(self.baseFolder, 'data')
self.scoresHome = []
self.scoresAway = []
self.homeTeams = []
self.awayTeams = []
self.scoresHome = []
self.scoresAway = []
self.dates = []
self.homeXG = []
self.awayXG = []
那么,让我们按照我所遵循的步骤进行:
在比赛页面中,到达指定日期
yearNow, monthNow, dayNow = self._getDate(day)
urlDay = self.originLink + "/en/matches/{year}-{month}-{day}".format(year=yearNow, month=monthNow, day=dayNow)
print(urlDay)
html = urlopen(urlDay)
bs = BeautifulSoup(html.read(), 'html.parser')
def _getDate(self, date):
"""
Helper function used to format url in the desired date in getMatches()
:param date: datetime.date object
:return: The formatted year, month and day of the date object
"""
year = str(date.year)
month = str(date.month) if date.month >= 10 else '0' + str(date.month)
day = str(date.day) if date.day >= 10 else '0' + str(date.day)
return year, month, day
这个过程和下面的所有过程都是在用户定义的迭代宇宙中每天进行的。函数 getMatches() 有一个开始日期和一个结束日期,它设置了抓取器将执行的边界。
2.获取每张冠军表
championshipTables = bs.find_all('div', {'class':'table_wrapper'})
errorList = []
for i in range(len(championshipTables)):
try:
championshipTables[i].find('a', {'href':re.compile('^/en/comps/')}).get_text()
except AttributeError:
errorList.append(i)
for error in errorList:
del championshipTables[error]
desiredTables = [ch for ch in championshipTables if ch.find('a', {'href':re.compile('^/en/comps/')}).get_text() in leagues]
按照第一步的例子,联赛变量可以由用户输入,所以他选择他想要报废的联赛。我们还可以在代码中看到一个 try-except 子句,它处理结构错误,例如网站中可能出现的假表。
3.从每个冠军表中,从比赛行中获取信息
for table in desiredTables:
time.sleep(4)
matchesLinks = []
homeTeams = table.find_all('td', {'data-stat':'home_team'})
for team in homeTeams:
self.homeTeams.append(team.get_text())
self.dates.append(day)
awayTeams = table.find_all('td', {'data-stat':'away_team'})
for team in awayTeams:
self.awayTeams.append(team.get_text())
scores = table.find_all('td', {'data-stat':'score'})
for score in scores:
scoreHome, scoreAway = self._getScore(score.get_text())
self.scoresHome.append(scoreHome)
self.scoresAway.append(scoreAway)
matchesLinks.append(score.find('a', {'href':re.compile('^/')})['href'])
if table.find_all('td', {'data-stat':'home_xg'}):
homeXG = table.find_all('td', {'data-stat':'home_xg'})
awayXG = table.find_all('td', {'data-stat':'away_xg'})
for xg in homeXG:
self.homeXG.append(xg.get_text())
for xg in awayXG:
self.awayXG.append(xg.get_text())
else:
for team in homeTeams:
self.homeXG.append(np.nan)
self.awayXG.append(np.nan)
在这里,除了在我们的列表中添加我们最开始想要的信息外,我突出显示了睡眠时间,用于控制我们在一定时间内发出的请求数量,避免我们的IP被禁止。另外值得注意的是每个比赛报告链接的存储,它包含在分数变量中。通过从分数变量而不是“匹配报告”中捕获链接,我们可以避免在内存中存储延迟或取消的匹配链接。这引导我们进入下一步:
4.获取每场比赛报告并检索信息
for link in matchesLinks:
dfMatchStats.loc[len(dfMatchStats)] = self._getMatchStats(link)
def _getMatchStats(self, url):
"""
Helper function to extract the match stats for each match in getMatches()
:param url: The match report url - is extracted in getMatches()
:return: List with match stats
"""
stats={"Fouls":[np.nan, np.nan], "Corners":[np.nan, np.nan], "Crosses":[np.nan, np.nan], "Touches":[np.nan, np.nan],
"Tackles":[np.nan, np.nan], "Interceptions":[np.nan, np.nan],"Aerials Won":[np.nan, np.nan],
"Clearances":[np.nan, np.nan], "Offsides":[np.nan, np.nan], "Goal Kicks":[np.nan, np.nan], "Throw Ins":[np.nan, np.nan],
"Long Balls":[np.nan, np.nan]}
matchStatsList = []
htmlMatch = urlopen(self.originLink + url)
bsMatch = BeautifulSoup(htmlMatch.read(), 'html.parser')
homeLineup = bsMatch.find('div', {'class':'lineup', 'id':'a'})
if not homeLineup:
homePlayers = []
awayPlayers = []
for i in range(0,11):
homePlayers.append(np.nan)
awayPlayers.append(np.nan)
yellowCardsHome = np.nan
redCardsHome = np.nan
yellowCardsAway = np.nan
redCardsAway = np.nan
matchStatsList.extend([yellowCardsHome, redCardsHome, yellowCardsAway, redCardsAway])
for key, value in stats.items():
matchStatsList.extend(value)
return homePlayers + awayPlayers + matchStatsList
homePlayers = homeLineup.find_all('a', {'href':re.compile('^/en/players')})[0:11]
homePlayers = [player.get_text() for player in homePlayers]
awayLineup = bsMatch.find('div', {'class':'lineup', 'id':'b'})
awayPlayers = awayLineup.find_all('a', {'href':re.compile('^/en/players')})[0:11]
awayPlayers = [player.get_text() for player in awayPlayers]
matchCards = bsMatch.find_all('div', {'class':'cards'})
yellowCardsHome = len(matchCards[0].find_all('span', {'class':'yellow_card'})) + len(matchCards[0].find_all('span', {'class':'yellow_red_card'}))
redCardsHome = len(matchCards[0].find_all('span', {'class':'red_card'})) + len(matchCards[0].find_all('span', {'class':'yellow_red_card'}))
yellowCardsAway = len(matchCards[1].find_all('span', {'class':'yellow_card'})) + len(matchCards[1].find_all('span', {'class':'yellow_red_card'}))
redCardsAway = len(matchCards[1].find_all('span', {'class':'red_card'})) + len(matchCards[1].find_all('span', {'class':'yellow_red_card'}))
matchStatsList.extend([yellowCardsHome, redCardsHome, yellowCardsAway, redCardsAway])
extraStatsPanel = bsMatch.find("div", {"id":"team_stats_extra"})
for statColumn in extraStatsPanel.find_all("div", recursive=False):
column = statColumn.find_all("div")
columnValues = [value.get_text() for value in column]
for index, value in enumerate(columnValues):
if not value.isdigit() and value in stats:
stats[value] = [int(columnValues[index-1]), int(columnValues[index+1])]
for key, value in stats.items():
matchStatsList.extend(value)
return homePlayers + awayPlayers + matchStatsList
正如您所看到的,这个过程有点棘手,所以让我们做一个简单的解释。黄色和红色卡片是通过将黄色或红色类别的卡片对象的数量相加而得出的。其他统计数据来自:
作为一个额外的步骤,我意识到需要创建一个检查点触发器,因为爬虫可能会面临无法预料的错误,或者 fbref 可能会不允许您的 IP 发出新请求,而这种情况将意味着大量时间的浪费。然后,每个月的每个第一天,我们都会保存到目前为止的爬虫工作,以防万一发生意外错误,我们有一个安全检查点可以检索。
仅此而已。在下面代码的底部,您可以看到日期更新 iteraroe 和格式化最终数据框所需的操作。
if day.day == 1:
# if the process crashes, we have a checkpoint every month starter
dfCheckpoint = dfMatchStats.copy()
dfCheckpoint["homeTeam"] = self.homeTeams
dfCheckpoint["awayTeam"] = self.awayTeams
dfCheckpoint["scoreHome"] = self.scoresHome
dfCheckpoint["scoreAway"] = self.scoresAway]
dfCheckpoint["homeXG"] = self.homeXG
dfCheckpoint["awayXG"] = self.awayXG
dfCheckpoint["date"] = self.dates
dfCheckpoint.to_pickle(os.path.join(self.dataFolder, 'checkPoint.pkl'))
day = day + timedelta(days=1)
dfMatchStats["homeTeam"] = self.homeTeams
dfMatchStats["awayTeam"] = self.awayTeams
dfMatchStats["scoreHome"] = self.scoresHome
dfMatchStats["scoreAway"] = self.scoresAway
dfMatchStats["homeXG"] = self.homeXG
dfMatchStats["awayXG"] = self.awayXG
dfMatchStats["date"] = self.dates
return dfMatchStats
数据框预览
整个过程允许我们抓取一些数据来建立模型来预测足球比赛,但我们仍然需要抓取有关即将举行的比赛的数据,以便我们可以对已经收集的数据做一些有用的事情。我为此找到的最佳来源是SofaScore,该应用程序还收集和存储有关比赛和球员的信息,但不仅如此,它们还在Bet365中提供每场比赛的实际赔率。
SofaScore 特别处理 JavaScript 代码,这意味着 html 脚本并不完全可供我们与 BeautifulSoup 一起使用。这意味着我们需要使用另一个框架来抓取他们的信息。我选择了广泛使用的Selenium包,它使我们能够像人类用户一样通过 Python 代码上网冲浪。您实际上可以看到网络驱动程序在您选择的浏览器中点击和导航——我选择了 Chrome。
在下图中,您可以看到 SofaScore 主页以及正在进行或即将进行的比赛,在右侧,您可以看到当您点击特定比赛然后点击“LINEUPS”时会发生什么。
SofaScore 界面
使用 Selenium,正如我所解释的,它的工作方式就像人类用户在网上冲浪一样,您可能会认为这个过程会慢一点,这是事实。因此,我们必须在每个步骤中更加小心,这样我们就不会点击不存在的按钮,一旦 JavaScript 代码仅在用户执行某些操作后呈现,例如当我们点击特定匹配项时,服务器会采取需要一些时间来渲染我们在第二张图片中看到的侧边菜单,如果代码在此期间尝试单击阵容按钮,则会返回错误。现在,让我们来看看代码。
def _getDriver(self, path='D:/chromedriver_win32/chromedriver.exe'):
chrome_options = Options()
return webdriver.Chrome(executable_path=path, options=chrome_options)
def getMatchesToday(self):
self.driver = self._getDriver(path=self.path)
self.driver.get("https://www.sofascore.com/")
WebDriverWait(self.driver, 20).until(EC.element_to_be_clickable((By.CLASS_NAME, "slider")))
oddsButton = self.driver.find_element(By.CLASS_NAME, "slider")
oddsButton.click()
homeTeam=[]
awayTeam=[]
odds=[]
homeOdds = []
drawOdds = []
awayOdds = []
正如我提到的,在启动驱动程序并到达 SofaScore 的 URL 后,我们需要等到赔率按钮呈现后才能单击它。我们还为我们创建了列表来存储抓取的信息。
2.店铺匹配主要信息
WebDriverWait(self.driver, 5).until(EC.visibility_of_element_located((By.CLASS_NAME, 'fvgWCd')))
matches = self.driver.find_elements(By.CLASS_NAME, 'js-list-cell-target')
for match in matches:
if self._checkExistsByClass('blXay'):
homeTeam.append(match.find_element(By.CLASS_NAME, 'blXay').text)
awayTeam.append(match.find_element(By.CLASS_NAME, 'crsngN').text)
if match.find_element(By.CLASS_NAME, 'haEAMa').text == '-':
oddsObject = match.find_elements(By.CLASS_NAME, 'fvgWCd')
for odd in oddsObject:
odds.append(odd.text)
while(len(odds) > 0):
homeOdds.append(odds.pop(0))
drawOdds.append(odds.pop(0))
awayOdds.append(odds.pop(0))
这里没有什么特别的,但是考虑到在第 8 行我们只过滤还没有开始的匹配是很好的。我这样做是因为处理正在进行的比赛会使赔率变得更加棘手,而且目前还不清楚未来的投注模拟器将如何工作,而且它可能无法在实时结果中正常工作。
3.获得阵容
df = pd.DataFrame({"homeTeam":homeTeam, "awayTeam":awayTeam, "homeOdds":homeOdds, "drawOdds":drawOdds, "awayOdds":awayOdds})
lineups = self._getLineups()
df = pd.concat([df, lineups], axis=1).iloc[:,:-1]
return df
def _getLineups(self):
matches = self.driver.find_elements(By.CLASS_NAME, "kusmLq")
nameInPanel = ""
df = pd.DataFrame(columns=["{team}Player{i}".format(team="home" if i <=10 else "away", i=i+1 if i <=10 else i-10) for i in range(0,22)])
df["homeTeam"] = []
for match in matches:
self.driver.execute_script("arguments[0].click()", match)
#wait until panel is refreshed
waiter = WebDriverWait(driver=self.driver, timeout=10, poll_frequency=1)
waiter.until(lambda drv: drv.find_element(By.CLASS_NAME, "dsMMht").text != nameInPanel)
nameInPanel = self.driver.find_element(By.CLASS_NAME, "dsMMht").text
if self._checkExistsByClass("jwanNG") and self.driver.find_element(By.CLASS_NAME, "jwanNG").text == "LINEUPS":
lineupButton = self.driver.find_element(By.CLASS_NAME, "jwanNG")
lineupButton.click()
# wait until players are avilable
WebDriverWait(self.driver, 20).until(EC.visibility_of_element_located((By.CLASS_NAME, "kDQXnl")))
players = self.driver.find_elements(By.CLASS_NAME, "kDQXnl")
playerNames=[]
for player in players:
playerNames.append(player.find_elements(By.CLASS_NAME, "sc-eDWCr")[2].accessible_name)
playerNames = [self._isCaptain(playerName) for playerName in playerNames]
playerNames.append(nameInPanel)
df.loc[len(df)] = playerNames
else:
df.loc[len(df), "homeTeam"] = nameInPanel
return df
def _isCaptain(self, name):
if name.startswith("(c) "):
name = name[4:]
return name
数据框预览
总结上面的代码块,我们等到比赛的侧边菜单加载完毕,单击阵容按钮并获取球员姓名。我们需要注意一下,因为每个团队的队长的名字在网站上都是格式化的,所以我们创建了一个辅助函数来处理它。然后,我们将每场比赛的球员姓名存储在数据框中,最后在整个过程之后,我们将比赛信息与预测阵容连接起来。
那么,今天就到此为止。在这篇文章中,我们构建了两个抓取工具,可以收集过去的足球比赛信息,也可以收集未来的比赛信息。这只是项目的开始,一旦您可以期待有关获取包含玩家信息的数据集、预测器建模和最后的投注策略模拟器的新文章。
本文转载自微信公众号@python学研大本营