Skip to main content

Llama 3.1 405B와 W&B Weave로 실시간 답변 엔진 구축

Llama 3.1 405B에 인터넷 검색 기능 주입하기!! 이 글은 AI 번역본입니다. 오역이 있을 경우 댓글로 알려주세요.
Created on September 12|Last edited on September 12
광대한 인터넷에서 정보를 검색하고 찾아오는 능력은 인간 지능을 기초적 수준과 구별하는 특징입니다. 언어 모델사람은 최신 정보를 동적으로 검색할 수 있지만, 대부분의 언어 모델은 학습 당시의 데이터에만 의존하며 이는 빠르게 오래될 수 있습니다. 이 격차를 메우기 위해, 언어 모델에 실시간 웹 검색과 관련 정보 추출 기능을 부여하여 능력을 확장할 수 있습니다.
이 프로젝트에서는 Llama 3.1 405B 모델의 고급 기능과 웹 스크래핑을 결합한 답변 엔진을 구축합니다. 웹 검색, 스크린샷 캡처, 그리고 광학 문자 인식 (OCR)을 통해 모델이 최신이고 정확한 답변을 제공하도록 하여 활용성과 성능을 크게 향상시키겠습니다. 이 접근 방식은 검색 엔진의 폭넓은 도달 범위를 활용해 다양한 정보를 획득함으로써, 특화 도구와 API의 한계를 극복하고 모델이 시의적절하고 관련성 높은 응답을 제공하도록 보장합니다.



목차




Google Search: Llama 3.1 405B를 강화하는 궁극의 도구

API는 종종 특정 데이터셋이나 정보 유형으로 사용이 제한되는 한계를 갖습니다. 예를 들어, 소셜 미디어 플랫폼이나 특수 데이터베이스가 제공하는 API는 각자의 출처에서만 데이터를 조회하고 가져올 수 있습니다. 이러한 도구들은 유용하지만 본질적으로 범위가 제한되어, 서로 다른 도메인 전반에 걸친 폭넓은 정보를 제공하기는 어렵습니다.
반면 Google과 같은 검색 엔진은 인터넷 전반에 걸친 방대한 정보를 접근할 수 있는 범용 도구로 작동합니다. 검색 엔진은 특정 데이터셋이나 플랫폼에 한정되지 않고, 수백만 개의 웹사이트에서 광범위한 주제와 출처를 포괄하도록 정보를 색인화하고 검색합니다. 이러한 보편성 덕분에 검색 엔진은 최신의 포괄적인 정보를 수집하는 데 매우 강력한 수단이 됩니다.
이 프로젝트에서는 답변 엔진에 Google Search를 활용하여 Llama 3.1 405B 모델 가중치에 없는 정보를 검색해 가져오겠습니다.

환경 설정

먼저 Python이 설치되어 있는지 확인하고 가상 환경을 설정하세요. 아직 하지 않았다면, 빠른 자습서가 있습니다. 여기 시작하는 데 도움이 되도록
이제 준비가 되었으면, 다음 명령어로 pip를 사용해 필요한 라이브러리를 설치하세요.
pip install pytesseract Pillow google-search-python asyncio playwright nest-asyncio requests flask weave
다음으로 Tesseract OCR을 설치하세요. 사용 중인 운영 체제에 맞는 설치 프로그램을 공식 사이트에서 다운로드한 뒤 설치합니다. Tesseract 문서.

Google Cloud AI Platform 설정

Google Cloud AI Platform을 사용하려면 Google Cloud 프로젝트를 만들어야 합니다. 다음으로 이동하세요 Google Cloud Console 그리고 새 프로젝트를 만드세요. 프로젝트가 생성되면, 프로젝트 ID를 기록해 두세요. 인증과 API 요청에 필요하므로 반드시 기록해 두세요.
인증 설정도 필요합니다. 다음을 설치하세요 gcloud CLI 아직 하지 않았다면, 다음 명령어로 Google 계정에 인증하세요:
gcloud auth login
다음으로 gcloud 구성에서 프로젝트 ID를 설정하세요:
gcloud config set project YOUR_PROJECT_ID

Llama 3.1 405B API 서비스 사용하기

명령줄 인터페이스(CLI)로 Llama 3.1 API 서비스를 사용해 보려면, gcloud CLI가 설치된 Cloud Shell 또는 로컬 터미널 창을 여세요. 아래의 Python 스크립트를 수정하여 필요한 환경 변수를 구성하고, 다음 항목을 바꿔 입력하세요. YOUR_PROJECT_ID Google Cloud 프로젝트의 ID와 함께:
import requests
import json
import subprocess

# Set environment variables
ENDPOINT = "us-central1-aiplatform.googleapis.com"
REGION = "us-central1"
PROJECT_ID = "your google cloud project id"

# Get the access token using gcloud
access_token = subprocess.check_output("gcloud auth print-access-token", shell=True).decode('utf-8').strip()

# Set the headers for the request
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}

# Set the data payload for the request
data = {
"model": "meta/llama3-405b-instruct-maas",
"stream": True,
"messages": [
{
"role": "user",
"content": "Summer travel plan to Paris"
}
]
}

# Define the endpoint URL
url = f"https://{ENDPOINT}/v1beta1/projects/{PROJECT_ID}/locations/{REGION}/endpoints/openapi/chat/completions"

# Make the request and stream the response
response = requests.post(url, headers=headers, json=data, stream=True)

print("Response from the AI Platform:")

# Process the response in real-time
if response.status_code == 200:
for line in response.iter_lines():
if line:
try:
data = line.decode('utf-8').strip()
if data.startswith("data: "):
data_json = json.loads(data[6:])
if "choices" in data_json and len(data_json["choices"]) > 0:
delta_content = data_json["choices"][0]["delta"].get("content", "")
print(delta_content, end='', flush=True)
except json.JSONDecodeError:
continue
else:
print(f"Error: {response.status_code}")
print(response.text)
가격 및 제공 여부 안내
현재(2024년 8월 6일 기준) Llama 3.1 405B API 서비스는 퍼블릭 프리뷰 기간 동안 무료로 제공됩니다. 다만 향후 변경될 수 있으며, 일반 제공(GA) 시에는 100만 토큰당 달러 기준의 과금 방식으로 전환될 가능성이 높습니다. 준수와 비용 관리를 위해 가격 및 이용 약관에 대한 최신 정보는 Llama 3.1 문서를 수시로 확인하시기 바랍니다.
이러한 단계를 따르면 Google Cloud AI Platform에서 Llama 3.1 405B API를 설정하고 사용할 수 있으며, 실시간 정보 검색을 위한 강력한 기능을 활용할 수 있습니다. 자연어 처리.

Google와 Llama 3.1 405B로 답변 엔진 구축하기

우리의 답변 엔진이 웹에서 실시간 정보를 가져올 수 있도록 하기 위해, 우리는 다음을 구현합니다 Search 클래스입니다. 이 클래스는 웹 검색을 수행하고 웹 페이지의 스크린샷을 캡처하며, 여러 URL을 동시에 처리하는 작업을 관리합니다. 이를 통해 최신 답변을 제공하는 데 필요한 정보를 추출합니다.
웹 스크래핑은 웹 페이지의 구조와 서식이 제각각이어서 어려울 수 있습니다. 전통적인 스크래핑 방식은 HTML 파싱과 데이터 추출에 의존하기 때문에, 일관되지 않은 페이지 레이아웃이나 동적 콘텐츠에서 문제가 빈번히 발생합니다. 이러한 과제를 해결하기 위해 우리는 이미지 기반 스크래핑을 사용합니다.
웹 페이지를 이미지로 변환한 뒤 광학 문자 인식(OCR)을 적용하면, 완벽하게 정돈되거나 구조화되지 않은 텍스트를 해석하는 데서 언어 모델의 강점을 활용할 수 있습니다. 더 나아가 연구 이를 보여 주며 대형 언어 모델 완전히 뒤섞인 텍스트를 이해하는 데서 초인적인 능력을 보여 주기 때문에, 이 접근 방식은 특히 효과적입니다. 다음은 웹을 검색할 수 있게 해 주는 우리의 검색 클래스입니다:

class Search:
@staticmethod
def get_search_results(query, num_results=5):
return [url for url in search(query, num_results=num_results)]
@staticmethod
async def download_screenshot(url, delay, index):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
file_name = f'{videos_folder}/Screenshot_{index}.png'
try:
await asyncio.wait_for(page.goto(url), timeout=5)
await page.set_viewport_size({"width": 1920, "height": 1080})
await page.wait_for_timeout(delay * 1000)
await page.screenshot(path=file_name, full_page=True)
print(f"Screenshot saved as {file_name}!")
except (PlaywrightTimeoutError, asyncio.TimeoutError):
print(f"Timeout occurred while loading {url}")
file_name = None
except Exception as e:
print(f"Unexpected error occurred: {e}")
file_name = None
finally:
await browser.close()
return file_name

@staticmethod
def process_urls(urls, delay):
if os.path.exists(videos_folder):
for file in os.listdir(videos_folder):
file_path = os.path.join(videos_folder, file)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
os.rmdir(file_path)
async def _process_urls():
tasks = [Search.download_screenshot(url, delay, index) for index, url in enumerate(urls)]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(_process_urls())
return results

@staticmethod
def perform_ocr(image_path):
if image_path is None:
return None
img = Image.open(image_path)
tesseract_text = pytesseract.image_to_string(img)
print(f"Tesseract OCR text for {image_path}:")
print(tesseract_text)
return tesseract_text

@staticmethod
def ocr_results_from_screenshots(screenshots):
ocr_results = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(Search.perform_ocr, screenshot) for screenshot in screenshots]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
ocr_results.append(result)
except Exception as e:
print(f"An error occurred during OCR processing: {e}")
return ocr_results

@staticmethod
def get_context_from_ocr_results():
screenshots = [os.path.join(videos_folder, f) for f in os.listdir(videos_folder) if os.path.isfile(os.path.join(videos_folder, f))]

if not screenshots:
print("No valid screenshots to process.")
return None

# Perform OCR on downloaded screenshots and prepare the context
ocr_results = Search.ocr_results_from_screenshots(screenshots)
ocr_results = [val[:1000] for val in ocr_results if isinstance(val, str)]
context = " ".join(ocr_results)[:3000]
return context


@staticmethod
def decide_search(query):
# Instantiate the model to decide if a web search is needed
model = Model(endpoint=API_ENDPOINT, region=REGION, project_id=PROJECT_ID)
context = ""
res = model.query_model_for_search_decision(query)
return res
Search 클래스에는 여러 핵심 메서드가 포함됩니다:
get_search_results 이 메서드는 Google Search 라이브러리를 사용해 주어진 쿼리를 기반으로 URL을 검색하며, 반환할 검색 결과 개수를 지정할 수 있습니다.
download_screenshot 메서드는 비동기로 동작하며 매개변수로 URL, 지연 시간, 그리고 고유한 파일 이름 생성을 위한 인덱스를 받습니다. Playwright를 사용해 헤드리스 브라우저에서 해당 URL을 열고, 지정된 지연 시간만큼 대기한 뒤 웹 페이지의 스크린샷을 캡처합니다. 이 스크린샷들은 인덱스와 스크롤 위치가 포함된 파일 이름으로 다운로드 폴더에 저장됩니다.
process_urls 메서드는 여러 URL을 동시에 처리하도록 관리합니다. 프로세스를 시작하기 전에 다운로드 폴더를 비우고, 이어서 사용합니다. asyncio 실행하기 위한 download_screenshot 각 URL에 대해 메서드를 병렬로 실행합니다. 스크린샷 결과는 리스트로 반환됩니다. 신뢰성을 보장하기 위해, download_screenshot 메서드는 다음과 같은 일반적인 예외에 대한 오류 처리를 포함합니다 PlaywrightTimeoutError 그리고 asyncio.TimeoutError오류가 발생하면 메서드는 오류 메시지를 출력하고 None을 반환합니다.
perform_ocr 메서드는 Tesseract를 사용해 이미지에서 텍스트를 추출합니다. 그 ocr_results_from_screenshots 메서드는 스레드 풀 실행기를 사용해 여러 스크린샷에 대한 OCR을 동시에 수행하여 OCR 처리 속도를 높입니다. The get_context_from_ocr_results 메서드는 모든 스크린샷에서 추출한 텍스트를 하나의 컨텍스트로 결합하고, 이를 언어 모델이 정확한 답변을 생성하는 데 사용합니다.
종합하면, 이들 메서드는 웹 스크래핑과 웹 페이지 스크린샷 캡처를 위한 견고한 솔루션을 제공하며, 이는 이후 단계의 answer engine 파이프라인에 필수적입니다. 이미지 기반 스크래핑과 OCR을 위한 청크 분할을 활용함으로써, 구조나 동적 콘텐츠에 상관없이 다양한 웹 페이지에서 정보를 안정적으로 추출할 수 있습니다. 이러한 접근 방식은 복잡하고 비정형화된 텍스트를 이해하는 LLM의 능력을 최대한 활용하여, 우리 answer engine의 정확성과 효율성을 높입니다.

Llama 3.1 405B 통합

Llama 3.1 405B 모델의 고급 기능을 통합하기 위해 Model 클래스를 개발하겠습니다. 이 클래스는 인증을 관리하고, 쿼리를 전송하며, Google Cloud AI Platform으로부터의 응답을 처리합니다.
다음은 우리의 모델 클래스입니다:
class Model:
def __init__(self, endpoint, region, project_id):
self.endpoint = endpoint
self.region = region
self.project_id = project_id

def get_access_token(self):
return subprocess.check_output("gcloud auth print-access-token", shell=True).decode('utf-8').strip()


@weave.op()
def query_model_non_stream(self, query, context):
if context != "":
q = "Answer the question {}. You can use this as help: {}".format(query, context)
else:
q = query

access_token = self.get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"model": "meta/llama3-405b-instruct-maas",
"stream": False,
"messages": [
{
"role": "user",
"content": q
}
]
}
url = f"https://{self.endpoint}/v1beta1/projects/{self.project_id}/locations/{self.region}/endpoints/openapi/chat/completions"
response = requests.post(url, headers=headers, json=data)

if response.status_code == 200:
data = response.json()
if "choices" in data and len(data["choices"]) > 0:
res = data["choices"][0]["message"]["content"]
return res
else:
print(f"Error: {response.status_code}")
print(response.text)
return ""


@weave.op()
def query_model_for_search_decision(self, query):
access_token = self.get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"model": "meta/llama3-405b-instruct-maas",
"stream": False,
"messages": [
{
"role": "user",
"content": f"Do we need a web search to answer the question: {query}? usually questions that are asking about time related details or new inforamtion that might be in you initial training set will require a web search. Also information that could be subject to change is also a good to double check with search. Respond with 'yes' or 'no'."
}
]
}
url = f"https://{self.endpoint}/v1beta1/projects/{self.project_id}/locations/{self.region}/endpoints/openapi/chat/completions"
response = requests.post(url, headers=headers, json=data)

if response.status_code == 200:
data = response.json()
if "choices" in data and len(data["choices"]) > 0:
decision = data["choices"][0]["message"]["content"].strip().lower()
return 'yes' in decision
else:
print(f"Error: {response.status_code}")
print(response.text)
return False


모델 클래스는 다음으로 시작합니다 __init__ 메서드로, 엔드포인트, 리전, 프로젝트 ID를 포함한 필요한 매개변수를 초기화합니다. The get_access_token 메서드는 Google Cloud SDK를 사용해 액세스 토큰을 생성하여 API 요청 인증을 가능하게 합니다.
query_model_non_stream 메서드는 비스트리밍 응답을 처리합니다. AI Platform 엔드포인트에 POST 요청을 보내고, JSON 응답을 처리하여 모델의 출력을 추출합니다.
query_model_for_search_decision 메서드는 주어진 쿼리에 웹 검색이 필요한지 판단합니다. 웹 검색이 필요한지 판단하기 위해 포맷된 쿼리를 Llama 모델에 보내고, 응답을 처리하여 결정을 내립니다. 이를 통해 웹에서 추가 정보가 필요한 시점을 판단함으로써 answer engine을 최적화합니다.
참고: 시스템을 더 개선하려면 모델에게 원래 질의를 Google 검색에 더 적합하도록 다시 표현하도록 요청할 수 있습니다. API 속도 제한 때문에 이 부분은 코드에서 제외했습니다.
💡

LLM 입력과 출력 추적하기

우리는 사용합니다 Weave 이 메서드들의 입력과 출력을 추적하기 위해서입니다. 메서드에 주석을 추가하여 @weave.op()을 통해 이러한 함수들을 거치는 데이터를 Weave가 기록하도록 합니다. 여기에는 모델에 보낸 질의, 제공된 컨텍스트, 웹 검색 수행 여부에 대한 결정, 그리고 모델이 생성한 최종 응답이 모두 포함됩니다.
전체 내용을 보고 싶다면 logic.py 필수 임포트와 앱 실행을 위한 몇 가지 추가 로직이 포함된 파일인데, 여기에서 공유하겠습니다:
import os
import pytesseract
from PIL import Image
from googlesearch import search
import asyncio
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
from concurrent.futures import ThreadPoolExecutor
import threading
import nest_asyncio
import requests
import json
import subprocess
import concurrent
import weave

# Project configuration
PROJECT_ID = "your google cloud poject id"
API_ENDPOINT = "us-central1-aiplatform.googleapis.com"
REGION = "us-central1"

weave.init("answer_engine")

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Set default download folder for screenshots
videos_folder = r"./download"

# Clear the download folder
if os.path.exists(videos_folder):
for file in os.listdir(videos_folder):
file_path = os.path.join(videos_folder, file)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
else:
os.makedirs(videos_folder)

# Global stop event
stop_flag = threading.Event()


class Search:
@staticmethod
def get_search_results(query, num_results=5):
return [url for url in search(query, num_results=num_results)]
@staticmethod
async def download_screenshot(url, delay, index):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
file_name = f'{videos_folder}/Screenshot_{index}.png'
try:
await asyncio.wait_for(page.goto(url), timeout=5)
await page.set_viewport_size({"width": 1920, "height": 1080})
await page.wait_for_timeout(delay * 1000)
await page.screenshot(path=file_name, full_page=True)
print(f"Screenshot saved as {file_name}!")
except (PlaywrightTimeoutError, asyncio.TimeoutError):
print(f"Timeout occurred while loading {url}")
file_name = None
except Exception as e:
print(f"Unexpected error occurred: {e}")
file_name = None
finally:
await browser.close()
return file_name

@staticmethod
def process_urls(urls, delay):
if os.path.exists(videos_folder):
for file in os.listdir(videos_folder):
file_path = os.path.join(videos_folder, file)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
os.rmdir(file_path)
async def _process_urls():
tasks = [Search.download_screenshot(url, delay, index) for index, url in enumerate(urls)]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(_process_urls())
return results

@staticmethod
def perform_ocr(image_path):
if image_path is None:
return None
img = Image.open(image_path)
tesseract_text = pytesseract.image_to_string(img)
print(f"Tesseract OCR text for {image_path}:")
print(tesseract_text)
return tesseract_text

@staticmethod
def ocr_results_from_screenshots(screenshots):
ocr_results = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(Search.perform_ocr, screenshot) for screenshot in screenshots]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
ocr_results.append(result)
except Exception as e:
print(f"An error occurred during OCR processing: {e}")
return ocr_results

@staticmethod
def get_context_from_ocr_results():
screenshots = [os.path.join(videos_folder, f) for f in os.listdir(videos_folder) if os.path.isfile(os.path.join(videos_folder, f))]

if not screenshots:
print("No valid screenshots to process.")
return None

# Perform OCR on downloaded screenshots and prepare the context
ocr_results = Search.ocr_results_from_screenshots(screenshots)
ocr_results = [val[:1000] for val in ocr_results if isinstance(val, str)]
context = " ".join(ocr_results)[:3000]
return context


@staticmethod
def decide_search(query):
# Instantiate the model to decide if a web search is needed
model = Model(endpoint=API_ENDPOINT, region=REGION, project_id=PROJECT_ID)
context = ""
res = model.query_model_for_search_decision(query)
return res


class Model:
def __init__(self, endpoint, region, project_id):
self.endpoint = endpoint
self.region = region
self.project_id = project_id

def get_access_token(self):
return subprocess.check_output("gcloud auth print-access-token", shell=True).decode('utf-8').strip()


@weave.op()
def query_model_non_stream(self, query, context):
if context != "":
q = "Answer the question {}. You can use this as help: {}".format(query, context)
else:
q = query

access_token = self.get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"model": "meta/llama3-405b-instruct-maas",
"stream": False,
"messages": [
{
"role": "user",
"content": q
}
]
}
url = f"https://{self.endpoint}/v1beta1/projects/{self.project_id}/locations/{self.region}/endpoints/openapi/chat/completions"
response = requests.post(url, headers=headers, json=data)

if response.status_code == 200:
data = response.json()
if "choices" in data and len(data["choices"]) > 0:
res = data["choices"][0]["message"]["content"]
return res
else:
print(f"Error: {response.status_code}")
print(response.text)
return ""


@weave.op()
def query_model_for_search_decision(self, query):
access_token = self.get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
data = {
"model": "meta/llama3-405b-instruct-maas",
"stream": False,
"messages": [
{
"role": "user",
"content": f"Do we need a web search to answer the question: {query}? usually questions that are asking about time related details or new inforamtion that might be in you initial training set will require a web search. Also information that could be subject to change is also a good to double check with search. Respond with 'yes' or 'no'."
}
]
}
url = f"https://{self.endpoint}/v1beta1/projects/{self.project_id}/locations/{self.region}/endpoints/openapi/chat/completions"
response = requests.post(url, headers=headers, json=data)

if response.status_code == 200:
data = response.json()
if "choices" in data and len(data["choices"]) > 0:
decision = data["choices"][0]["message"]["content"].strip().lower()
return 'yes' in decision
else:
print(f"Error: {response.status_code}")
print(response.text)
return False

웹 인터페이스 개발

우리의 답변 엔진과 사용자가 쉽게 상호작용할 수 있도록 Flask 애플리케이션을 개발했습니다. Flask는 가볍고 효율적인 웹 프레임워크로, 웹 애플리케이션을 빠르고 효율적으로 만들 수 있게 해줍니다.
우리의 Flask 앱은 사용자와 답변 엔진의 백엔드 프로세스 사이를 이어주는 인터페이스 역할을 합니다. 애플리케이션은 요청을 처리하는 라우트, 질의를 처리하는 로직, 결과를 반환하는 엔드포인트 등 몇 가지 핵심 구성 요소로 이루어져 있습니다. 이 튜토리얼에서는 프런트엔드의 세부 사항까지는 다루지 않지만, Flask 앱의 기본 구조와 동작을 이해하는 것이 중요합니다.
다음은 app.py 앱의 동작 로직을 실행하는 함수를 선언한 파일입니다:

import os
import threading
import nest_asyncio
import asyncio
from flask import Flask, request, render_template, jsonify
from logic import Search, Model

# Project configuration
PROJECT_ID = "your google cloud poject id"
API_ENDPOINT = "us-central1-aiplatform.googleapis.com"
REGION = "us-central1"

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Set default download folder for screenshots
videos_folder = r"./download"

# Clear the download folder
if os.path.exists(videos_folder):
for file in os.listdir(videos_folder):
file_path = os.path.join(videos_folder, file)
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
else:
os.makedirs(videos_folder)

# Global stop event
stop_flag = threading.Event()

# Global variable for response storage
response_storage = ""

app = Flask(__name__)

@app.route('/')
def index():
return render_template('index.html')

@app.route('/search', methods=['POST'])
def search():
global response_storage
query = request.form.get('query')
delay = 1

# Clear the stop flag before running the function
stop_flag.clear()

asyncio.run(run_search_and_ocr(query, delay))
return jsonify({'status': 'Search started'})

async def run_search_and_ocr(query, delay):
global response_storage
context = ""
if Search.decide_search(query):
urls = Search.get_search_results(query, num_results=20)
process_thread = threading.Thread(target=Search.process_urls, args=(urls, delay))
process_thread.start()
await asyncio.sleep(15)
stop_flag.set()
if process_thread.is_alive():
process_thread.join(timeout=0)

context = Search.get_context_from_ocr_results()

model = Model(endpoint=API_ENDPOINT, region=REGION, project_id=PROJECT_ID)
response = model.query_model_non_stream(query, context) # Replaced with query_model_nonstream to just return the response
response_storage = response


@app.route('/results', methods=['GET'])
def get_results():
global response_storage
return jsonify({'results': response_storage.splitlines()})

if __name__ == "__main__":
app.run(debug=True)

먼저 Flask 앱을 초기화하고 필요한 설정을 구성합니다. 또한 스크린샷의 기본 다운로드 폴더가 생성되어 있고, 애플리케이션을 시작하기 전에 비워져 있도록 확인합니다. 이렇게 하면 새 질의를 처리할 수 있도록 환경이 준비됩니다.
주요 라우트를 정의합니다 (`/`) 프런트엔드 인터페이스를 렌더링합니다. 이 인터페이스를 통해 사용자는 질의를 입력하고 검색 프로세스를 시작할 수 있습니다. POST 요청으로 접근하는 `search` 라우트는 사용자의 질의를 처리합니다. 이 라우트는 질의를 받아 이전 중지 플래그를 모두 초기화하고, 검색 및 OCR 프로세스를 비동기로 실행합니다. 이는 다음 함수를 호출하여 수행됩니다 run_search_and_ocr 검색부터 OCR, 컨텍스트 준비까지의 흐름을 조율하는 함수입니다.
run_search_and_ocr 함수는 다음을 사용하여 웹 검색이 필요한지 확인합니다 Search.decide_search 메서드입니다. 검색이 필요하면 다음을 사용해 URL을 가져옵니다 get_search_results 메서드를 통해 가져온 URL을 처리해 스크린샷을 캡처합니다. 스크린샷을 캡처한 뒤에는 이미지에 대해 OCR을 수행하여 텍스트를 추출하고 컨텍스트를 준비합니다. 마지막으로 다음을 사용해 Llama 3.1 405B 모델에 질의를 보냅니다 query_model_non_stream 메서드를 사용해 응답을 저장합니다.
results GET 요청으로 접근하는 라우트는 처리된 결과를 사용자에게 반환합니다. 이를 통해 사용자는 자신의 질의를 바탕으로 Llama 3.1 405B 모델이 생성한 답변을 확인할 수 있습니다.
이 Flask 애플리케이션을 개발함으로써, 사용자가 우리의 answer engine 기능을 원활하고 상호작용적으로 활용할 수 있는 방법을 제공합니다. Flask 앱은 데이터 흐름을 관리하고 비동기 작업을 처리하며, 사용자가 정확하고 최신의 답변을 받을 수 있도록 보장합니다. 프런트엔드는 사용자 상호작용에 필수적이지만, 이 튜토리얼의 초점은 answer engine을 구동하는 백엔드 프로세스에 맞춰져 있습니다.
직접 앱을 사용해 보고 싶다면 리포지토리를 자유롭게 클론하세요 여기코드를 실행하려면 에 PROJECT_ID를 추가해야 합니다 logic.py 그리고 the app.py 파일들을 준비한 다음 실행하세요 python app.py. 참고로, 앱을 실행하려면 필요한 pip 패키지를 모두 설치해야 합니다. 앱을 실행한 후에는 아래 예시처럼 콘솔에 웹사이트에 접속할 수 있는 URL이 출력됩니다.

이제 http://127.0.0.1:5000/ 로 이동하면 앱을 볼 수 있습니다.

Weave로 모델 응답과 검색 결정을 추적하기

Weave는 모델의 성능을 추적하고 모니터링할 수 있게 해 주는 강력한 도구로, 생성된 응답과 웹 검색이 필요한지에 대한 의사결정까지 포함해 기록할 수 있습니다. 우리의 answer engine에 Weave를 통합하면 모델이 어떻게 동작하는지에 대한 유용한 인사이트를 얻을 수 있으며, 이를 바탕으로 근거 있는 개선을 진행할 수 있습니다.
우리는 Llama 3.1 405B 모델이 생성한 응답을 기록하기 위해 Weave를 사용합니다. 여기에는 사용자 질의에 대한 최종 답변뿐 아니라 OCR 처리 결과와 같은 중간 산출물도 포함됩니다. 이러한 응답을 추적함으로써 답변의 품질과 정확성을 분석하고, 패턴이나 문제를 식별하며, 성능 개선을 위한 조정을 수행할 수 있습니다.
한 사례로, “휴스턴의 날씨가 어떤가요?”라는 질의에 대해 모델이 상세한 응답을 생성했습니다. 이 응답에는 날씨 정보, 예보 상세, 그리고 사용자에게 제공할 추가 권장 사항이 포함되어 있습니다. 이 응답을 Weave에 기록해 두면, 나중에 검토하여 품질 기준을 충족하는지 확인하고 필요한 조정을 수행할 수 있습니다.

아래와 같이 앱이 휴스턴의 일기 예보를 보여주는 웹페이지를 찾아냈습니다:

검색 클래스가 가져온 이 웹사이트(및 몇몇 다른 웹사이트)의 텍스트가 추출되어 앱으로 전달되었습니다. Weave 내부에서는 모델의 “검색 결정”(모델이 검색을 할지 말지 판단하는 단계)과 모델이 질문에 답하는 단계(검색 결과를 사용하든 사용하지 않든)에서 사용된 정확한 입력값을 모두 분석할 수 있습니다.
다음은 Weave에서 가져온 일부 로그입니다:

위에서 보듯이, 우리의 모델은 해당 질의에 대해 웹 검색을 수행하는 것이 적절하다고 판단했습니다. 이어서 검색 결과의 컨텍스트가 수집되어 모델에 전달되었습니다. 다음은 Weave 내부의 다음 로그입니다:


맥락이 전부다

정적 데이터로 ���습된 언어 모델에 실시간 웹 검색 기능을 결합하는 일은, 고정된 지식과 시시각각 변하는 최신 정보 검색 사이의 격차를 메우는 중요한 도약입니다. Llama 3.1 405B 모델의 고급 능력에 정교한 웹 스크래핑 기법을 통합함으로써, 우리는 정확할 뿐 아니라 최신 데이터에 민첩하게 반응하는 answer engine을 구축했습니다.
웹 스크래핑 구현은 Optical Character Recognition(OCR) 같은 현대적 혁신과 전통적 방법을 결합했을 때의 강력을 잘 보여 줍니다. 웹페이지를 이미지로 변환하고, 복잡하고 비정형적인 텍스트를 해석하는 데서 언어 모델의 강점을 활용함으로써, 우리의 answer engine은 다양한 형태의 웹페이지로부터 정보를 효과적으로 추출하고 처리할 수 있습니다.
우리 시스템의 판도를 바꾸는 핵심은 Weave의 통합입니다. 이 강력한 도구는 모델 성능의 모든 측면을 추적하고 모니터링할 수 있게 해 줍니다. 프로세스의 입력과 출력을 기록함으로써, Weave는 모델의 의사 결정 과정을 깊이 있게 들여다볼 수 있는 귀중한 인사이트를 제공합니다. 이를 통해 생성된 응답의 품질과 정확도를 분석하고, 우리의 answer engine이 항상 높은 성능 기준을 지속적으로 충족하도록 보장할 수 있습니다.
Weave는 모델이 웹 검색이 필요하다고 판단하는 빈도와 그 결정의 효과성을 파악하는 데에도 핵심적인 역할을 합니다. 이러한 정교한 추적은 디버깅, 성능 최적화, 그리고 근거 있는 개선에 도움이 됩니다. 이는 신뢰성과 안정성을 유지하는 데 필수적인 투명성과 책임성을 우리 시스템에 부여합니다.
이러한 고급 도구와 기법을 활용해 우리는 실시간 정보 검색을 위한 강력하고 유연한 솔루션을 구축했습니다. 이 통합을 통해 우리의 answer engine은 끊임없이 변화하는 환경에서도 사용자에게 정확하고 시의적절한 정보를 제공하며, 기술의 최전선에 머무를 수 있습니다.
다음은 저장소 프로젝트를 위해.

이 기사는 AI로 번역되었습니다. 오역이 있을 수 있으니 댓글로 알려 주세요. 원문 보고서는 다음 링크에서 확인할 수 있습니다: 원문 보고서 보기