diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 28d51e4..8e37284 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -28,10 +28,10 @@ jobs: - name: Setup Chrome uses: browser-actions/setup-chrome@v1.7.2 with: - chrome-version: stable - - name: Set chrome in path + chrome-version: 126 + - name: Set Chrome in path run: | - echo "/opt/hostedtoolcache/chromium/stable/x64" >> $GITHUB_PATH + echo "/opt/hostedtoolcache/setup-chrome/chromium/stable/x64" >> $GITHUB_PATH - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v3 with: diff --git a/examples/get_sp500.multithread.py b/examples/get_sp500.mp.py similarity index 65% rename from examples/get_sp500.multithread.py rename to examples/get_sp500.mp.py index ac1950b..12f9284 100755 --- a/examples/get_sp500.multithread.py +++ b/examples/get_sp500.mp.py @@ -3,19 +3,21 @@ import msfinance as msf from concurrent.futures import ProcessPoolExecutor, as_completed from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, scoped_session import math +import logging proxy = 'socks5://127.0.0.1:1088' -# Create a shared SQLAlchemy engine and session factory -engine = create_engine('sqlite:///sp500.db3', pool_size=5, max_overflow=10) -SessionFactory = sessionmaker(bind=engine) +# Create an engine +engine = create_engine('sqlite:///sp500.mt.db3', pool_size=5, max_overflow=10) +# Create a session factory +InitialSessionFactory = sessionmaker(bind=engine) # Fetch tickers outside the process pool initial_stock = msf.Stock( debug=False, - session_factory=SessionFactory, + session_factory=InitialSessionFactory, proxy=proxy, ) @@ -26,14 +28,21 @@ tickers_list['xnys'] = initial_stock.get_xnys_tickers() tickers_list['xase'] = initial_stock.get_xase_tickers() -def process_tickers(tickers, proxy, session_factory): - # Create a single Stock instance for each process +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(processName)s - %(levelname)s - %(message)s') + +def process_tickers(tickers, proxy): + + SessionFactory = sessionmaker(bind=engine) + # Create a Stock instance using the session stock = msf.Stock( debug=True, - session_factory=session_factory, + session_factory=SessionFactory, proxy=proxy, ) + logging.info(f"Processing tickers: {tickers}") + results = [] for ticker in tickers: if ticker in tickers_list['xnas']: @@ -50,16 +59,22 @@ def process_tickers(tickers, proxy, session_factory): continue results.append((f"Ticker: {ticker}", valuations, financials)) - + return results +# End of process_tickers + +def initializer(): + """ensure the parent proc's database connections are not touched + in the new connection pool""" + engine.dispose(close=False) # Use ProcessPoolExecutor to process tickers in parallel -max_workers = 3 # Adjust max_workers as needed +max_workers = 4 # Adjust max_workers as needed chunk_size = math.ceil(len(sp500_tickers) / max_workers) ticker_chunks = [sp500_tickers[i:i + chunk_size] for i in range(0, len(sp500_tickers), chunk_size)] -with ProcessPoolExecutor(max_workers=max_workers) as executor: - futures = {executor.submit(process_tickers, chunk, proxy, SessionFactory): chunk for chunk in ticker_chunks} +with ProcessPoolExecutor(max_workers=max_workers, initializer=initializer) as executor: + futures = {executor.submit(process_tickers, chunk, proxy): chunk for chunk in ticker_chunks} for future in as_completed(futures): results = future.result() for ticker_info, valuations, financials in results: