Skip to content

Commit

Permalink
Merge pull request #19 from Broomva/feat/vortex-flows-prefect
Browse files Browse the repository at this point in the history
Feat/vortex flows prefect
  • Loading branch information
broomva authored Mar 12, 2024
2 parents a466993 + 3845997 commit b7d1194
Show file tree
Hide file tree
Showing 24 changed files with 1,193 additions and 57 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
41 changes: 41 additions & 0 deletions example_flows/prefect_flows/.prefectignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# prefect artifacts
.prefectignore

# python artifacts
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
*.egg

# Type checking artifacts
.mypy_cache/
.dmypy.json
dmypy.json
.pyre/

# IPython
profile_default/
ipython_config.py
*.ipynb_checkpoints/*

# Environments
.python-version
.env
.venv
env/
venv/

# MacOS
.DS_Store

# Dask
dask-worker-space/

# Editors
.idea/
.vscode/

# VCS
.git/
.hg/
15 changes: 15 additions & 0 deletions example_flows/prefect_flows/gcs_filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from prefect.filesystems import GCS
from dotenv import load_dotenv
import os

load_dotenv()

block = GCS(
bucket_path="vortex-flows/broomva-flows",
service_account_info=os.environ.get("GCS_SERVICE_ACCOUNT"),
project="broomva-vortex-ai",
)
block.save("gcs-vortex-flows", overwrite=True)

# %%
# # prefect deployment build web_rag.py:web_rag_flow --name web_rag_deployment --tag gcs-vortex-flows -sb gcs/gcs-vortex-flows
17 changes: 17 additions & 0 deletions example_flows/prefect_flows/minio_filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from prefect.filesystems import RemoteFileSystem
import os

minio_block = RemoteFileSystem(
basepath=f"s3://vortex-flows/{os.environ.get('MINIO_FLOWS_FOLDER', 'broomva-flows')}",
settings={
"key": os.environ.get("MINIO_USER", "minio"),
"secret": os.environ.get("MINIO_PASSWORD", "minio123"),
"client_kwargs": {
"endpoint_url": os.environ.get("MINIO_HOST", "http://localhost:9001")
},
},
)
minio_block.save("minio")


# prefect deployment build web_rag.py:web_rag_flow --name web_rag_deployment --tag minio -sb remote-file-system/minio
46 changes: 46 additions & 0 deletions example_flows/prefect_flows/web_rag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from prefect import task, flow
from vortex.ai.tools import scrape_website, scrape_website_selenium
from prefect.artifacts import create_markdown_artifact
from vortex.flows.resources import OpenAIResource

openai_resource = OpenAIResource()


@task
def get_article(input_url: str) -> str:
if not input_url:
return None
try:
try:
response = scrape_website(input_url)
except Exception:
response = None
if response is None:
response = scrape_website_selenium(input_url)
except Exception as e:
raise e
return response


@task
def summarize_article(article_text: str) -> str:
if not article_text:
return None
user_query = f"Please generate a new fresh article of similar length based on this information: \n{article_text}"
response = openai_resource.get(user_query)
return response


@flow(log_prints=True)
def web_rag_flow(input_url: str = "https://github.com/Broomva/vortex"):
article_text = get_article(input_url)
article_summary = summarize_article(article_text)
print(article_summary)


if __name__ == "__main__":
web_rag_flow.deploy(
name="web-rag-flow",
work_pool_name="broomva-worker",
parameters={"input_url": "https://github.com/Broomva/vortex"},
)
Empty file.
Empty file removed examples/wapp_dag/__init__.py
Empty file.
1,075 changes: 1,056 additions & 19 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ numexpr = "^2.9.0"
langchain-fireworks = "^0.1.1"
semantic-router = "^0.0.27"
dagster-postgres = "^0.22.8"


prefect = "^2.16.2"
gcsfs = "^2024.2.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"
Expand Down
12 changes: 6 additions & 6 deletions vortex/flows/resources/completion.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
# %%
from typing import Optional

from dagster import ConfigurableResource, EnvVar
from pydantic import BaseModel
from dotenv import load_dotenv
from openai import OpenAI
import os

load_dotenv()


class OpenAIResource(ConfigurableResource):
class OpenAIResource(BaseModel):
api_key: Optional[str] = (
None or EnvVar("TOGETHER_API_KEY") or EnvVar("OPENAI_API_KEY")
None or os.environ.get("TOGETHER_API_KEY") or os.environ.get("OPENAI_API_KEY")
)
base_url: Optional[str] = EnvVar("TOGETHER_API_BASE_URL") or EnvVar(
base_url: Optional[str] = os.environ.get("TOGETHER_API_BASE_URL") or os.environ.get(
"OPENAI_API_BASE_URL"
)
# client: Optional[OpenAI] = None
model: Optional[str] = (
EnvVar("TOGETHER_MODEL_NAME") or "mistralai/Mixtral-8x7B-Instruct-v0.1"
os.environ.get("TOGETHER_MODEL_NAME") or "mistralai/Mixtral-8x7B-Instruct-v0.1"
)
# respone_model: Optional[BaseModel] = None
temperature: Optional[float] = 0.8
Expand Down
7 changes: 3 additions & 4 deletions vortex/flows/resources/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
import contextlib
import os
from typing import Optional

from dagster import ConfigurableResource, EnvVar
from pydantic import BaseModel
from sqlalchemy import create_engine, text


class SQLAlchemyResource(ConfigurableResource):
class SQLAlchemyResource(BaseModel):
"""
Represents a resource for executing SQL queries using SQLAlchemy.
Attributes:
url (Optional[str]): The URL of the SQLAlchemy database connection.
"""

url: Optional[str] = EnvVar("SQLALCHEMY_URL")
url: Optional[str] = os.environ.get("SQLALCHEMY_URL")

@contextlib.contextmanager
def connect(self):
Expand Down
33 changes: 7 additions & 26 deletions vortex/flows/resources/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,12 @@
import contextlib
import os
from typing import Optional

from pydantic import BaseModel
import psycopg2
from dagster import ConfigurableResource, EnvVar

# from dagster import (
# AssetExecutionContext,
# Definitions,
# InitResourceContext,
# asset,
# resource,
# )

# class FancyDbResource:
# def __init__(self, conn_string: str) -> None:
# self.conn_string = conn_string

# def execute(self, query: str) -> None:
# ...

# @resource(config_schema={"conn_string": str})
# def fancy_db_resource(context: InitResourceContext) -> FancyDbResource:
# return FancyDbResource(context.resource_config["conn_string"])


class PostgresResource(ConfigurableResource):
class PostgresResource(BaseModel):
"""
The `PostgresResource` class represents a resource for connecting to a PostgreSQL database.
It provides methods for executing queries and fetching results from the database.
Expand All @@ -53,11 +34,11 @@ class PostgresResource(ConfigurableResource):
"""

database: Optional[str] = EnvVar("POSTGRES_DATABASE")
username: Optional[str] = EnvVar("POSTGRES_USERNAME")
password: Optional[str] = EnvVar("POSTGRES_PASSWORD")
host: Optional[str] = EnvVar("POSTGRES_HOST")
port: Optional[str] = EnvVar("POSTGRES_PORT")
database: Optional[str] = os.environ.get("POSTGRES_DATABASE")
username: Optional[str] = os.environ.get("POSTGRES_USERNAME")
password: Optional[str] = os.environ.get("POSTGRES_PASSWORD")
host: Optional[str] = os.environ.get("POSTGRES_HOST")
port: Optional[str] = os.environ.get("POSTGRES_PORT")

@contextlib.contextmanager
def connect(self):
Expand Down

0 comments on commit b7d1194

Please sign in to comment.