Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi threading / Parallel Processing #80

Open
s2t2 opened this issue May 21, 2021 · 1 comment
Open

Multi threading / Parallel Processing #80

s2t2 opened this issue May 21, 2021 · 1 comment

Comments

@s2t2
Copy link
Member

s2t2 commented May 21, 2021

Some students have need for threading in their final project, so the repo should provide some notes about parallel processing, using the concurrent futures module.

References

Threads and Thread Pool Executors:

Locks and Semaphores:

Threading on Heroku

Example:

# super h/t: https://www.youtube.com/watch?v=IEEhzQoKtQU

import os
import time
import random
from dotenv import load_dotenv

from concurrent.futures import ThreadPoolExecutor, as_completed # see: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
from threading import Thread, Lock, BoundedSemaphore, current_thread

load_dotenv()

LIMIT = int(os.getenv("USERS_LIMIT", default=500))
MAX_THREADS = int(os.getenv("MAX_THREADS", default=200)) # heroku supports max 256, see: https://devcenter.heroku.com/articles/dynos#process-thread-limits
BATCH_SIZE = int(os.getenv("BATCH_SIZE", default=20))

def fetch_friends(user_id, sleep_seconds=1):
    thread_id = int(current_thread().name.replace("THREAD_", "")) + 1
    time.sleep(sleep_seconds)
    return {"thread_id": thread_id, "user_id": user_id, "duration": sleep_seconds}

if __name__ == "__main__":

    user_ids = range(1, LIMIT + 1)
    start_at = time.perf_counter()
    print(f"USERS: {len(user_ids)}")
    print(f"THREADS: {MAX_THREADS}")

    with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
        #print("EXECUTOR:", type(executor))

        #results = executor.map(fetch_friends, user_ids, random.choice([1,5]))
        #for result in results:
        #    print(result)

        #futures = [executor.submit(fetch_friends, user_id, random.choice([1,5])) for user_id in user_ids]
        #for future in futures:
        #    print(future.result())

        #batch = BoundedSemaphore(5)
        #lock = Lock()

        batch = []
        results = []
        futures = [executor.submit(fetch_friends, user_id, random.choice([1,5,10])) for user_id in user_ids]
        for index, future in enumerate(as_completed(futures)):
            result = future.result()
            print(result)
            batch.append(result)
            results.append(result)

            if len(batch) == BATCH_SIZE:
                print(f"CLEARING BATCH OF {len(batch)}...")
                #time.sleep(5)
                batch = []

    end_at = time.perf_counter()
    clock_seconds = round(end_at - start_at, 2)
    total_seconds = sum([result["duration"] for result in results])
    print(f"PROCESSED {len(user_ids)} USERS IN {clock_seconds} SECONDS (OTHERWISE {total_seconds} SECONDS)")

Use a simpler example than this though.

@s2t2
Copy link
Member Author

s2t2 commented May 21, 2021

with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:            
    for batch in batches:                
        executor.submit(process_and_save_batch, batch, service)
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
        futures = [executor.submit(perform, batch, bq_service, bas_service) for batch in batches]        
        for future in as_completed(futures):            
            job.counter += future.result()            
            job.progress_report()
with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="THREAD") as executor:            
    futures = [executor.submit(perform, group_name, filtered_df, storage.local_dirpath, tokenize) for group_name, filtered_df in groupby]            
    for future in as_completed(futures):                
        result = future.result()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

1 participant