Messaging технологии

Presenter Notes

За мен

Емил Иванов

Presenter Notes

Твърдение

Threads + shared state

hard

Presenter Notes

Пример 1

concurrent-java

430 стр.

Presenter Notes

Пример 2

Presenter Notes

Да вземем една опашка

tail

Presenter Notes

Не точно:

not-tail

По скоро:

queue

Presenter Notes

Thread safe: прост случай

 1 class Queue(object):
 2     def __init__(self):
 3         self.lock = Lock()
 4         self.queue = []
 5 
 6     def pop(self):
 7         with self.lock:
 8             return self.queue.pop(0)
 9 
10     def push(self, v):
11         with self.lock:
12             self.queue.append(v)

Presenter Notes

Thread safe: бърз случай

work

Presenter Notes

Threads

Прекалено ниско ниво на абстракция.

Подобно на GC.

Не е нужно всеки път да откриваме топлата вода.

Presenter Notes

Алтернатива?

escape

Presenter Notes

Оставете на професионалистите

pro

Presenter Notes

Комуникация със съобщения

actor

Actor model

Presenter Notes

Erlang

Presenter Notes

Actors in Erlang

 1 -module(counter).
 2 -export([run/0, counter/1]).
 3 
 4 run() ->
 5     S = spawn(counter, counter, [0]),
 6     send_msgs(S, 100000),
 7     S.
 8 
 9 counter(Sum) ->
10     receive
11         value -> io:fwrite("Value is ~w~n", [Sum]);
12         {inc, Amount} -> counter(Sum+Amount)
13     end.
14 
15 send_msgs(_, 0) -> true;
16 send_msgs(S, Count) ->
17     S ! {inc, 1},
18     send_msgs(S, Count-1).

Presenter Notes

Erlang

Presenter Notes

Съобщения, но кога?

Presenter Notes

Email

email

Presenter Notes

При големи натоварвания

load

Presenter Notes

В началото...

load-simple

Presenter Notes

... по-късно...

load-bal

Presenter Notes

Решение със съобщения

load-msg

Map-Reduce?

Presenter Notes

Когато редът има значение

in-order

Presenter Notes

Ред със съобщения

pipeline

Pipeline

Presenter Notes

RabbitMQ

Presenter Notes

RabbitMQ

  • AMQP протокол
  • Монолитен сървър
  • Черна кутия
  • Написан на Erlang

Presenter Notes

rabbit

Presenter Notes

Как се работи с RabbitMQ?

Presenter Notes

1. Свързване

1 from pika.adapters import BlockingConnection
2 from pika import BasicProperties
3 
4 # Open a connection to RabbitMQ on localhost
5 # using all default parameters
6 connection = BlockingConnection()
7 
8 # Open the channel
9 channel = connection.channel()

Presenter Notes

2. Exchange

1 channel.exchange_declare(
2     exchange='fest-exchange',
3     type='direct')

Аналогия с Поща (Postal service)

Presenter Notes

3. Queue

1 channel.queue_declare(
2     queue="fest-queue",
3     durable=True,
4     auto_delete=False)

Presenter Notes

4. Bind

1 channel.queue_bind(
2     queue='fest-queue',
3     exchange='fest-exchange')

Presenter Notes

Exchanges types

Presenter Notes

Direct

mq-direct

Presenter Notes

Fanout

mq-fanout

Presenter Notes

Topic

mq-topic

Presenter Notes

5. Публикуване

1 byte[] messageBodyBytes =
2     "Hello, world!".getBytes();
3 
4 channel.basicPublish(
5     "fest-exchange", // Exchange
6     null, // Routing key
7     null, // Properties
8     messageBodyBytes);

Presenter Notes

6. Абониране

1 def handle_delivery(channel,
2     method_frame, header_frame, body):
3     # Process the message
4     log.debug('Got message with data: %s',
5               body)
6 
7 channel.basic_consume(
8     handle_delivery, queue='fest-queue')

Presenter Notes

7. Потвърждаване

1 def handle_delivery(channel,
2     method_frame, header_frame, body):
3     # Process the message and acknowledge it
4 
5     channel.basic_ack(
6         delivery_tag=method_frame.delivery_tag)
7 
8 channel.basic_consume(
9     handle_delivery, queue='fest-queue')

Presenter Notes

RabbitMQ: Плюсове

  • Инсталирате го и готово
  • Изцяло се конфигурира през протокола
  • Стабилен
  • Бърз
  • Сходно API независимо от езика

Presenter Notes

RabbitMQ: Минуси

  • Single point of failure - no redundancy (has clustering, though)

Не бива опашките или съобщенията в тях да стават много.

Presenter Notes

ØMQ (ZeroMQ)

Presenter Notes

ØMQ

  • Библиотека
  • Много бърз
  • Общо API на всички езици (поне 30)
  • API-то представлява супер-сокет

Presenter Notes

Broadcast

broadcast

Presenter Notes

Broadcast сървър

 1 import zmq, datetime, time
 2 
 3 ctx = zmq.Context()
 4 socket = ctx.socket(zmq.PUB)
 5 socket.bind("tcp://*:6000")
 6 
 7 while True:
 8     socket.send('Time is %s' %
 9         datetime.datetime.now())
10     time.sleep(1)

Presenter Notes

Broadcast клиент

 1 import zmq
 2 
 3 ctx = zmq.Context()
 4 socket = ctx.socket(zmq.SUB)
 5 socket.connect("tcp://127.0.0.1:6000")
 6 socket.setsockopt(zmq.SUBSCRIBE, '')
 7 
 8 for i in range(10):
 9     print socket.recv()
10 
11 # Time is 2011-10-27 17:30:27.811003
12 # Time is 2011-10-27 17:30:28.812346
13 # Time is 2011-10-27 17:30:29.813524

Изпуска съобщенията, ако няма кой да слуша (неможе с RabbitMQ)

Presenter Notes

Pipeline сървър (Scala)

 1 val ctx = ZMQ.context(2)
 2 val s = ctx.socket(ZMQ.PUSH)
 3 s.bind("tcp://*:6001")
 4 
 5 for (i <- 1 to 100) {
 6   var r1 = (Math.random() * 100).toInt
 7   var r2 = (Math.random() * 100).toInt
 8 
 9   s.send("%s+%s".format(r1, r2).getBytes(),
10          0) // flags
11 }

Presenter Notes

Pipeline клиент (PHP)

 1 <?php
 2 $ctx = new ZMQContext();
 3 $socket = $ctx->getSocket(ZMQ::SOCKET_PULL);
 4 $socket->connect('tcp://127.0.0.1:6001');
 5 
 6 for ($i = 0; $i < 100; $i++) {
 7     $data = $socket->recv();
 8     list($a, $b) = explode('+', $data);
 9     $c = $a + $b;
10     echo "$a + $b = $c\n";
11 }
12 
13 // 1 + 31 = 32
14 // 84 + 50 = 134
15 // 88 + 62 = 150

Presenter Notes

Socket types

PUSH → PULL

PUB ⇉ SUB

REQ ⇄ REP

И още няколко

Presenter Notes

Излагация Демо

Presenter Notes

URL Fetching pipeline

pipeline

Presenter Notes

Webserver

(Node.js + Coffeecript)

 1 zmq = require 'zmq'
 2 
 3 inPort = 5555
 4 outPort = 5556
 5 
 6 connectedClients = {}
 7 
 8 inSocket = zmq.createSocket 'push'
 9 inSocket.bind "tcp://127.0.0.1:#{inPort}", (e) ->
10 
11 outSocket = zmq.createSocket 'pull'
12 outSocket.bind "tcp://127.0.0.1:#{outPort}", (e) ->
13   outSocket.on 'message', (m) ->
14     title = m.toString('utf-8')
15     for id of connectedClients
16       connectedClients[id].send(title)

socket.io

Presenter Notes

Fetcher

 1 import time, zmq
 2 from lxml import html
 3 
 4 c = zmq.Context()
 5 in_socket = c.socket(zmq.PULL)
 6 in_socket.connect('tcp://127.0.0.1:5555')
 7 
 8 out_socket = c.socket(zmq.PUSH)
 9 out_socket.connect('tcp://127.0.0.1:5556')
10 
11 while True:
12     try:
13         url = in_socket.recv()
14         time.sleep(1) # Simulate latency
15         dom = html.parse(url)
16         title = dom.getroot().find('head/title').text
17 
18         out_socket.send(title)
19     except KeyboardInterrupt:
20         break

Presenter Notes

URL fetch

Демо

Presenter Notes

ØMQ - Плюсове

  • Лесен за работа
  • Добро и еднакво API
  • Мега-хипер-турбо-плюс бърз
  • Библиотека (не framework)

Presenter Notes

ØMQ - Минуси

  • Малко по-ниско ниво от RabbitMQ
  • Не бива да се използва за външна комуникация

Presenter Notes

Обобщение

Програмирането с нишки, семафори и други ЖП елементи е трудно. Една от алтернативите е различните процеси комуникират със съобщения. RabbitMQ и ZeroMQ са едни от примерите. Нека трудните проблеми да бъдат решени веднъж за винаги.

Presenter Notes

Благодаря

thank-you

Presenter Notes

Въпроси?

questions

emil.vladev@gmail.com

Presenter Notes