使用Python实现多进程调度器的案例

2023-08-04 16:56:22

本案例使用Python语言编写,通过自定义多进程调度器,实现并发获取指定路径下的样本文件元信息,并写入共享变量results中。由于博主技术水平有限,如果你在本地测试过程中发现存在BUG,纯属正常,感谢您的理解!!!(本案例使用Python語言編寫,通過自定義多進程調度器,實現並發獲取指定路徑下的樣本文件元信息,並寫入共享變量results中。由於博主技術水平有限,如果你在本地測試過程中發現存在BUG,純屬正常,感謝您的理解! ! !)

多进程调度器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import multiprocessing


class ProcessScheduler(object):
results = multiprocessing.Manager().dict() # 多进程共享存储

def __init__(self, core=None, job_list=None):
self.running = True
self.core = core # 目标函数
self._all_process = [] # 使用列表模拟的进程池
self.total_process_count = 0 # 当前进程数
self.max_count = 3 # 进程池容量
self.job_list = job_list # 任务列表
self.activity_process_count = 0 # 当前活跃的进程数

def initialize(self):
pass

def get_job(self):
job_name = None
if len(self.job_list) > 0:
job_name = self.job_list.pop(0)
return job_name

def update(self, process=None):
if process is not None:
self._all_process.append(process)
self.activity_process_count += 1
return

for process in self._all_process:
if process.is_alive() is False:
self._all_process.remove(process)
self.activity_process_count -= 1

def start(self):
self.initialize()
while self.running:
self.update()
if len(self.job_list) == 0 and self.activity_process_count == 0:
self.stop()
break
if self.max_count and self.activity_process_count >= self.max_count:
continue
job_name = self.get_job()
if job_name:
self.total_process_count += 1
process = multiprocessing.Process(target=self.core, args=(job_name,))
# process = multiprocessing.Process(target=self.core, kwargs=job_name)
process.daemon = True
process.start()
self.update(process)

def stop(self):
self.running = False
ExifTool类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import sys
import subprocess
import os
import json
import warnings
import codecs

executable = "exiftool"
sentinel = b"{ready}"
block_size = 4096


def _fscodec():
encoding = sys.getfilesystemencoding()
errors = "strict"
if encoding != "mbcs":
try:
codecs.lookup_error("surrogateescape")
except LookupError:
pass
else:
errors = "surrogateescape"

def fsencode(filename):
if isinstance(filename, bytes):
return filename
else:
return filename.encode(encoding, errors)

return fsencode


fsencode = _fscodec()
del _fscodec


class ExifTool(object):

def __init__(self, executable_=None):
if executable_ is None:
self.executable = executable
else:
self.executable = executable_
self.running = False

def start(self):
if self.running:
warnings.warn("ExifTool already running; doing nothing.")
return
with open(os.devnull, "w") as devnull:
self._process = subprocess.Popen(
[self.executable, "-stay_open", "True", "-@", "-",
"-common_args", "-G", "-n"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=devnull)
self.running = True

def terminate(self):
if not self.running:
return
self._process.stdin.write(b"-stay_open\nFalse\n")
self._process.stdin.flush()
self._process.communicate()
del self._process
self.running = False

def __enter__(self):
self.start()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()

def __del__(self):
self.terminate()

def execute(self, *params):
if not self.running:
raise ValueError("ExifTool instance not running.")
self._process.stdin.write(b"\n".join(params + (b"-execute\n",)))
self._process.stdin.flush()
output = b""
fd = self._process.stdout.fileno()
while not output[-32:].strip().endswith(sentinel):
output += os.read(fd, block_size)
return output.strip()[:-len(sentinel)]

def execute_json(self, *params):
params = map(fsencode, params)
return json.loads(self.execute(b"-j", *params).decode("utf-8"))

def get_metadata(self, filename):
return self.execute_json(filename)[0]
主程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import os
import argparse
import logging

from exiftool import ExifTool
from process_scheduler import ProcessScheduler

log = logging.getLogger(__name__)


def get_metadata(file_path):
with ExifTool() as et:
res = et.get_metadata(file_path)
ProcessScheduler.results[file_path] = res


def main():
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--process_dir", help="samples store path.")
args = parser.parse_args()
process_dir = args.process_dir
if os.path.isdir(process_dir):
files = [os.path.join(process_dir, filename) for filename in os.listdir(process_dir)]
process_scheduler = ProcessScheduler(core=get_metadata, job_list=files)
process_scheduler.start()


if __name__ == '__main__':
main()