-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconcurrent_model.py
106 lines (92 loc) · 3.36 KB
/
concurrent_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from abc import ABC, abstractmethod
import multiprocessing
import multiprocessing.process
from queue import Queue
from pandas import DataFrame
from consumer import Consumer
from threading import Thread
from producer import Producer
from chunks_processing_info import ChunkFilteringInfo, ChunkInfo, merge_chunks_info
# interface (abstract class)
class ConcurrentModel(ABC):
@abstractmethod
def start(self, producer: Producer, consumer: Consumer) -> list[ChunkInfo]:
"""start the concurrent work"""
pass
class ProcessesPoolModel(ConcurrentModel):
def start(self, producer: Producer, consumer: Consumer) -> list[ChunkInfo]:
with multiprocessing.Manager() as manager:
chunks_queue = manager.Queue(maxsize=1000)
reading_info_queue = manager.Queue()
filtering_info_queue = manager.Queue()
pool = multiprocessing.Pool(4)
pool.apply_async(
producer.run,
args=(
chunks_queue,
reading_info_queue,
),
)
for _ in range(3):
pool.apply_async(
consumer.run,
args=(
chunks_queue,
filtering_info_queue,
),
)
pool.close()
pool.join()
chunks_queue.get()
return merge_chunks_info(reading_info_queue, filtering_info_queue)
class MultiProcessingModel(ConcurrentModel):
def start(self, producer: Producer, consumer: Consumer) -> list[ChunkInfo]:
with multiprocessing.Manager() as manager:
chunks_queue = manager.Queue(maxsize=1000)
reading_info_queue = manager.Queue()
filtering_info_queue = manager.Queue()
producer_process = multiprocessing.Process(
target=producer.run,
args=(
chunks_queue,
reading_info_queue,
),
)
consumer_process = multiprocessing.Process(
target=consumer.run,
args=(
chunks_queue,
filtering_info_queue,
),
)
producer_process.start()
consumer_process.start()
producer_process.join()
consumer_process.join()
chunks_queue.get()
return merge_chunks_info(reading_info_queue, filtering_info_queue)
class MultiThreadingModel(ConcurrentModel):
def start(self, producer: Producer, consumer: Consumer) -> list[ChunkInfo]:
chunks_queue = Queue[tuple[int, DataFrame]](maxsize=1000)
reading_info_queue = Queue[float]()
filtering_info_queue = Queue[tuple[int, ChunkFilteringInfo]]()
producer_thread = Thread(
target=producer.run,
args=(
chunks_queue,
reading_info_queue,
),
)
consumer_thread = Thread(
target=consumer.run,
args=(
chunks_queue,
filtering_info_queue,
),
)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
chunks_queue.get()
return merge_chunks_info(reading_info_queue, filtering_info_queue)