19. Конкурентност

19. Конкурентност

19. Конкурентност

13 януари 2014

Днес

Disclaimer

Няма да говорим за паралелни алгоритми и други подобни неща

Ще говорим за конкурентност на по-практическо ниво

IO-bound vs. CPU-bound

Processes vs. Threads (Green & Native)

Паралелизация на многоядрени процесори

Нишки

GVL/GIL?

Импликации от съществуването на GVL

Бъдещето на GVL

1. Паралелизация с процеси

Първият начин за паралелизация и фоново изпълнение на задачи, който ще разгледаме, е базиран на процеси.

fork

Това е интерфейс към системен примитив. Не всички операционни системи го имат.

child_id = fork()
if child_id
  puts "This is the parent"
else
  puts "This is the child speaking!"
end

fork (2)

Има и друга версия:

fork do
  puts "This is the child"
end

puts "This is the parent"

module Process

Всъщност, повечето неща са в модул Process:

Process.fork
Process.wait
Process.waitall
Process.waitpid

Process.wait

Process.wait чака някое от децата да приключи и връща pid-а му, а $? съдържа Process::Status.

fork { exit 99 } # 32480
Process.wait     # 32480
$?.exitstatus    # 99

Process.wait2

Process.wait2 е сходно, но връща масив от pid и Process::Status:

fork { exit 99 } # 32534
Process.wait2    # [32534, #<Process::Status: pid 32534 exit 99>]

Process.waitpid

Process.waitpid(pid) чака дадено конкретно дете да приключи:

pid = fork do
  puts "Child: I'm starting..."
  sleep 1
  puts "Child: I'm done."
end

puts "Parent: Child running. Waiting for it to complete."
Process.waitpid(pid)
puts "Parent: Child is done."

Process.waitall

Process.waitall чака всички деца да приключат

fork { sleep 1; puts '1' }
fork { sleep 2; puts '2' }

puts '0'
Process.waitall
puts '3'

Process.exec

Process.exec заменя текущия процес с изпълнение на команда:

fork do
  exec 'date'
  puts 'Unreachable code'
end

Process.daemon

Process.daemon "откача" процеса от терминала и го пуска в background.

fork do
  Process.daemon
  loop do
    system 'echo Spam >> ~/spam'
    sleep 1
  end
end

Process.methods

2. Паралелизация с нишки

Втори начин за паралелизация и фоново изпълнение на задачи е употребата на нишки.

Thread.new

Създаването на нишка в Ruby е лесно:

thread = Thread.new do
  puts 'This is run in the thread'
  sleep 1
  puts 'The thread is started immediatelly'
end

puts 'This is run in the main thread'

Thread#join

Процесът приключва, когато основната нишка приключи. Ако искате да изчакате някоя от създадените нишки да приключи преди процеса да излезе, ползвайте Thread#join.

thread = Thread.new do
  puts 'This is run in the thread'
  sleep 1
  puts 'The thread is started immediatelly'
end

puts 'This is run in the main thread'
thread.join

Thread#value

Thread#value блокира докато нишката не приключи и връща последния оценен израз

thread = Thread.new do
  2 + 2
end

# Can be called multiple times, will block only on the first call.
thread.value # 4
thread.value # 4

Изключения

Ако една нишка предизвика изключение, то няма да убие интерпретатора. Вместо това, ще се появи в нишката, извикваща #value или #join.

thread = Thread.new do
  raise 'Oh noes!'
end

thread.join # error: RuntimeError

Изключения (2)

Можете да промените последното с Thread.abort_on_exception.

Thread.abort_on_exception = true

Thread.methods

Thread#priority

Променливи (1)

Променливи, дефинирани в блока на нишката, са (очевидно) локални за нея:

thread = Thread.new { something = 1 }
thread.join

something # error: NameError

Променливи (2)

Блокът на нишката вижда променливите отвън:

answer = 1
thread = Thread.new { answer = 2 }

thread.join
answer # 2

Променливи (3)

Можете да подавате стойности на нишката през Thread.new

n = 10
thread = Thread.new(n) do |number|
  n      # 20
  number # 10
end
n = 20

thread.join

Променливи (4)

Всяка нишка функционира като хеш от символи. Така може да правите thread-local променливи:

Thread.current[:x] = 10
thread = Thread.new do
  Thread.current[:x] # nil
  Thread.current[:x] = 20
end
thread.join
Thread.current[:x]   # 10

Thread-local променливи

Thread-local променливи (2)

Ето примерна имплементация на I18n.locale методите:

class I18n
  class << self
    def locale
      Thread.current[:locale]
    end

    def locale=(new_locale)
      Thread.current[:locale] = new_locale
    end
  end
end

Thread-safety

Конкурентният достъп до споделени ресурси създава проблеми, когато операциите по промяна на ресурс не са атомарни. Нека разгледаме един пример:

items_in_stock = 4000

threads = []
40.times do
  threads << Thread.new do
    100.times do
      items_in_stock -= 1
    end
  end
end
threads.each(&:join)

items_in_stock # 0

Thread-safety (2)

Thread-safety (3)

Нека разгледаме един още по-очевиден пример:

username = 'larodi'

50.times do
  Thread.new do
    unless User.username_taken?(username)
      User.create username: username
    end
  end
end

Какво става тук? Няколко нишки могат да изпълнят username_taken?, преди да се е стигнало до създаване на потребител и да решат, че няма пробелм да създадат такъв, понеже потребителското име е свободно и хоп – дублирани данни.

Синхронизация на нишки

Mutex (1)

Mutex (2)

$mutex = Mutex.new

def stuff
  # pre-lock
  $mutex.lock
  # critical - access and/or modify a shared resource
  $mutex.unlock
  # post-lock
end

t1 = Thread.new { loop { stuff } }
t2 = Thread.new { loop { stuff } }

t1.join
t2.join

Mutex (3)

Има и по-удобна форма, приемаща блок:

$mutex = Mutex.new

def stuff
  # pre-lock
  $mutex.synchronize do
    # critical
  end
  # post-lock
end

t1 = Thread.new { loop { stuff } }
t2 = Thread.new { loop { stuff } }

t1.join
t2.join

Обърнете внимание, че ако възникне изключение в блока, подаден на synchronize, mutex-ът ще бъде коректно отключен.

Писане на thread-safe код

Thread-safe структури от данни

Fibers

Fibers

Най-простият възможен пример:

fiber = Fiber.new { :larodi }

fiber.resume # :larodi
fiber.resume # error: FiberError

Fibonacci

Кодът по-долу няма да приключи никога:

class FibonacciNumbers
  def each
    current, previous = 1, 0

    while true
      yield current
      current, previous = current + previous, current
    end
  end
end

FibonacciNumbers.new.each { |number| puts number }

Fibonacci с fibers

Ако заменим yield с Fiber.yield, можем да направим нещо като безкраен поток от числа на Фибоначи:

class FibonacciNumbers
  def each
    current, previous = 1, 0

    while true
      Fiber.yield current
      current, previous = current + previous, current
    end
  end
end

fibonacci_stream = Fiber.new { FibonacciNumbers.new.each }

fibonacci_stream.resume # 1
fibonacci_stream.resume # 1
fibonacci_stream.resume # 2
fibonacci_stream.resume # 3

Приложение на fibers

Enumerator-и и fibers

Enumerator класът в Ruby се възползва от Fiber.

Това се случва, когато направите (1..100_000).each.

Въпроси