Python и Twisted - Заметки о параллельной обработке данных (мультипроцессорности)

Michael_XIII аватар

Twisted это фреймворк для разработки сетевых приложений на Python, который среди многих других применений, может быть использован и для параллельной обработки данных - мультипроцессорности . Это замечательно, но мне пришлось попотеть, для того, чтобы найти то, что мне нужно. Я листал документацию Twisted и книгу O'Reilly Twisted. Существует также рецепт в Python Cookbook. Однако, самое интересное я нашел в статье Брюса Эккель - Параллельность с Python, Twisted и Flex. Также стоит прочитать первоначальные статьи Брюса Эккель про Twisted: Grokking Twisted).

Вот мои замечания о текущем примере Брюса.

Я убрал Flex - отчасти потому, что мне это не нужно и я ничего не хочу знать об этом. В примере запускается контроллер, который инициализирует ряд отдельных параллельных процессов-вычислителей, в которых уже запускаются какие-то сложные действия (эти процессы называют solvers). Также тут имеется взаимодействие между контроллером и вычислителями. Хотя этот пример запускается только на одной машине, те принципы, о которых говорится в статье - не трудно распространить и на систему из нескольких компьютеров.

Для хорошего примера, как это работает, пожалуйста, смотрите оригинал статьи .

Вот solver.py который скопирован с оригинала статьи. Настоящая "работа" происходит в методе step(). Я только добавил некоторую отладочную информацию для себя.

  1. """
  2. solver.py
  3. Original version by Bruce Eckel
  4. Solves one portion of a problem, in a separate process on a separate CPU
  5. """
  6. import sys, random, math
  7. from twisted.spread import pb
  8. from twisted.internet import reactor
  9.  
  10. class Solver(pb.Root):
  11.  
  12. def __init__(self, id):
  13. print "solver.py %s: solver init" % id
  14. self.id = id
  15.  
  16. def __str__(self): # String representation
  17. return "Solver %s" % self.id
  18.  
  19. def remote_initialize(self, initArg):
  20. return "%s initialized" % self
  21.  
  22. def step(self, arg):
  23. print "solver.py %s: solver step" % self.id
  24. "Simulate work and return result"
  25. result = 0
  26. for i in range(random.randint(1000000, 3000000)):
  27. angle = math.radians(random.randint(0, 45))
  28. result += math.tanh(angle)/math.cosh(angle)
  29. return "%s, %s, result: %.2f" % (self, str(arg), result)
  30.  
  31. # Alias methods, for demonstration version:
  32. remote_step1 = step
  33. remote_step2 = step
  34. remote_step3 = step
  35.  
  36. def remote_status(self):
  37. print "solver.py %s: remote_status" % self.id
  38. return "%s operational" % self
  39.  
  40. def remote_terminate(self):
  41. print "solver.py %s: remote_terminate" % self.id
  42. reactor.callLater(0.5, reactor.stop)
  43. return "%s terminating..." % self
  44.  
  45. if __name__ == "__main__":
  46. port = int(sys.argv[1])
  47. reactor.listenTCP(port, pb.PBServerFactory(Solver(sys.argv[1])))
  48. reactor.run()

Вот controller.py . Он также скопирован из оригинальной статьи, но я убрал Flex интерфейс и создал сигналы start и terminate в классе контроллера. Я не уверен, что это имеет смысл, но, по крайней мере, это позволило мне использовать пример. Я тоже перенес метод terminate из FlexInterface в Controller.
  1. """
  2. Controller.py
  3. Original version by Bruce Eckel
  4. Starts and manages solvers in separate processes for parallel processing.
  5. """
  6. import sys
  7. from subprocess import Popen
  8. from twisted.spread import pb
  9. from twisted.internet import reactor, defer
  10.  
  11. START_PORT = 5566
  12. MAX_PROCESSES = 2
  13.  
  14. class Controller(object):
  15.  
  16. def broadcastCommand(self, remoteMethodName, arguments, nextStep, failureMessage):
  17. print "controller.py: broadcasting..."
  18. deferreds = [solver.callRemote(remoteMethodName, arguments)
  19. for solver in self.solvers.values()]
  20. print "controller.py: broadcasted"
  21. reactor.callLater(3, self.checkStatus)
  22.  
  23. defer.DeferredList(deferreds, consumeErrors=True).addCallbacks(
  24. nextStep, self.failed, errbackArgs=(failureMessage))
  25.  
  26. def checkStatus(self):
  27. print "controller.py: checkStatus"
  28. for solver in self.solvers.values():
  29. solver.callRemote("status").addCallbacks(
  30. lambda r: sys.stdout.write(r + "\n"), self.failed,
  31. errbackArgs=("Status Check Failed"))
  32.  
  33. def failed(self, results, failureMessage="Call Failed"):
  34. print "controller.py: failed"
  35. for (success, returnValue), (address, port) in zip(results, self.solvers):
  36. if not success:
  37. raise Exception("address: %s port: %d %s" % (address, port, failureMessage))
  38.  
  39. def __init__(self):
  40. print "controller.py: init"
  41. self.solvers = dict.fromkeys(
  42. [("localhost", i) for i in range(START_PORT, START_PORT+MAX_PROCESSES)])
  43. self.pids = [Popen(["python", "solver.py", str(port)]).pid
  44. for ip, port in self.solvers]
  45. print "PIDS: ", self.pids
  46. self.connected = False
  47. reactor.callLater(1, self.connect)
  48.  
  49. def connect(self):
  50. print "controller.py: connect"
  51. connections = []
  52. for address, port in self.solvers:
  53. factory = pb.PBClientFactory()
  54. reactor.connectTCP(address, port, factory)
  55. connections.append(factory.getRootObject())
  56. defer.DeferredList(connections, consumeErrors=True).addCallbacks(
  57. self.storeConnections, self.failed, errbackArgs=("Failed to Connect"))
  58.  
  59. print "controller.py: starting parallel jobs"
  60. self.start()
  61.  
  62. def storeConnections(self, results):
  63. print "controller.py: storeconnections"
  64. for (success, solver), (address, port) in zip(results, self.solvers):
  65. self.solvers[address, port] = solver
  66. print "controller.py: Connected; self.solvers:", self.solvers
  67. self.connected = True
  68.  
  69. def start(self):
  70. "controller.py: Begin the solving process"
  71. if not self.connected:
  72. return reactor.callLater(0.5, self.start)
  73. self.broadcastCommand("step1", ("step 1"), self.step2, "Failed Step 1")
  74.  
  75. def step2(self, results):
  76. print "controller.py: step 1 results:", results
  77. self.broadcastCommand("step2", ("step 2"), self.step3, "Failed Step 2")
  78.  
  79. def step3(self, results):
  80. print "controller.py: step 2 results:", results
  81. self.broadcastCommand("step3", ("step 3"), self.collectResults, "Failed Step 3")
  82.  
  83. def collectResults(self, results):
  84. print "controller.py: step 3 results:", results
  85. self.terminate()
  86.  
  87. def terminate(self):
  88. print "controller.py: terminate"
  89. for solver in self.solvers.values():
  90. solver.callRemote("terminate").addErrback(self.failed, "Termination Failed")
  91. reactor.callLater(1, reactor.stop)
  92. return "Terminating remote solvers"
  93.  
  94. if __name__ == "__main__":
  95. controller = Controller()
  96. reactor.run()

Чтобы запустить программу, положите оба файла в одну папку и запустите

  1. python controller.py

Вы должны увидеть, как загрузка двух процессоров (если их, конечно, у вас - 2 ;-) ) поднимется до 100%. А вот и вывод скрипта на экран:
  1. controller.py: init
  2. PIDS: [12173, 12174]
  3. solver.py 5567: solver init
  4. solver.py 5566: solver init
  5. controller.py: connect
  6. controller.py: starting parallel jobs
  7. controller.py: storeconnections
  8. controller.py: Connected; self.solvers: {('localhost', 5567): , ('localhost', 5566): }
  9. controller.py: broadcasting...
  10. controller.py: broadcasted
  11. solver.py 5566: solver step
  12. solver.py 5567: solver step
  13. controller.py: checkStatus
  14. solver.py 5566: remote_status
  15. Solver 5566 operational
  16. solver.py 5567: remote_status
  17. controller.py: step 1 results: [(True, 'Solver 5567, step 1, result: 683825.75'), (True, 'Solver 5566, step 1, result: 543177.17')]
  18. controller.py: broadcasting...
  19. controller.py: broadcasted
  20. Solver 5567 operational
  21. solver.py 5566: solver step
  22. solver.py 5567: solver step
  23. controller.py: checkStatus
  24. solver.py 5566: remote_status
  25. Solver 5566 operational
  26. solver.py 5567: remote_status
  27. controller.py: step 2 results: [(True, 'Solver 5567, step 2, result: 636793.90'), (True, 'Solver 5566, step 2, result: 335358.16')]
  28. controller.py: broadcasting...
  29. controller.py: broadcasted
  30. Solver 5567 operational
  31. solver.py 5566: solver step
  32. solver.py 5567: solver step
  33. controller.py: checkStatus
  34. solver.py 5566: remote_status
  35. Solver 5566 operational
  36. solver.py 5567: remote_status
  37. controller.py: step 3 results: [(True, 'Solver 5567, step 3, result: 847386.43'), (True, 'Solver 5566, step 3, result: 512120.15')]
  38. controller.py: terminate
  39. Solver 5567 operational
  40. solver.py 5566: remote_terminate
  41. solver.py 5567: remote_terminate

Оригинал

0
No votes yet
Your rating: None

Комментарии

Настройки просмотра комментариев

Выберите нужный метод показа комментариев и нажмите "Сохранить установки".
sanjek65 аватар

Миш, сорри, но руки бы

Миш, сорри, но руки бы оторвал и голову отбил за такой код))))), вобщем, як кажуть у нас на батьковщине: "поубывав бы":D
ПС: ну жабир же мой знаешь и тилипон, ну спросил бы, есличо))))

Michael_XIII аватар

Есть лучше пример для

Есть лучше пример для Twisted? Не нашел...

Отправить комментарий