

430 стр.
Да вземем една опашка

Не точно:

По скоро:

1 class Queue(object):
2 def __init__(self):
3 self.lock = Lock()
4 self.queue = []
5
6 def pop(self):
7 with self.lock:
8 return self.queue.pop(0)
9
10 def push(self, v):
11 with self.lock:
12 self.queue.append(v)

Прекалено ниско ниво на абстракция.
Подобно на GC.
Не е нужно всеки път да откриваме топлата вода.



Actor model
1 -module(counter).
2 -export([run/0, counter/1]).
3
4 run() ->
5 S = spawn(counter, counter, [0]),
6 send_msgs(S, 100000),
7 S.
8
9 counter(Sum) ->
10 receive
11 value -> io:fwrite("Value is ~w~n", [Sum]);
12 {inc, Amount} -> counter(Sum+Amount)
13 end.
14
15 send_msgs(_, 0) -> true;
16 send_msgs(S, Count) ->
17 S ! {inc, 1},
18 send_msgs(S, Count-1).





Map-Reduce?


Pipeline

1 from pika.adapters import BlockingConnection
2 from pika import BasicProperties
3
4 # Open a connection to RabbitMQ on localhost
5 # using all default parameters
6 connection = BlockingConnection()
7
8 # Open the channel
9 channel = connection.channel()
1 channel.exchange_declare(
2 exchange='fest-exchange',
3 type='direct')
Аналогия с Поща (Postal service)
1 channel.queue_declare(
2 queue="fest-queue",
3 durable=True,
4 auto_delete=False)
1 channel.queue_bind(
2 queue='fest-queue',
3 exchange='fest-exchange')



1 byte[] messageBodyBytes =
2 "Hello, world!".getBytes();
3
4 channel.basicPublish(
5 "fest-exchange", // Exchange
6 null, // Routing key
7 null, // Properties
8 messageBodyBytes);
1 def handle_delivery(channel,
2 method_frame, header_frame, body):
3 # Process the message
4 log.debug('Got message with data: %s',
5 body)
6
7 channel.basic_consume(
8 handle_delivery, queue='fest-queue')
1 def handle_delivery(channel,
2 method_frame, header_frame, body):
3 # Process the message and acknowledge it
4
5 channel.basic_ack(
6 delivery_tag=method_frame.delivery_tag)
7
8 channel.basic_consume(
9 handle_delivery, queue='fest-queue')
Не бива опашките или съобщенията в тях да стават много.
1 import zmq, datetime, time
2
3 ctx = zmq.Context()
4 socket = ctx.socket(zmq.PUB)
5 socket.bind("tcp://*:6000")
6
7 while True:
8 socket.send('Time is %s' %
9 datetime.datetime.now())
10 time.sleep(1)
1 import zmq
2
3 ctx = zmq.Context()
4 socket = ctx.socket(zmq.SUB)
5 socket.connect("tcp://127.0.0.1:6000")
6 socket.setsockopt(zmq.SUBSCRIBE, '')
7
8 for i in range(10):
9 print socket.recv()
10
11 # Time is 2011-10-27 17:30:27.811003
12 # Time is 2011-10-27 17:30:28.812346
13 # Time is 2011-10-27 17:30:29.813524
Изпуска съобщенията, ако няма кой да слуша (неможе с RabbitMQ)
1 val ctx = ZMQ.context(2)
2 val s = ctx.socket(ZMQ.PUSH)
3 s.bind("tcp://*:6001")
4
5 for (i <- 1 to 100) {
6 var r1 = (Math.random() * 100).toInt
7 var r2 = (Math.random() * 100).toInt
8
9 s.send("%s+%s".format(r1, r2).getBytes(),
10 0) // flags
11 }
1 <?php
2 $ctx = new ZMQContext();
3 $socket = $ctx->getSocket(ZMQ::SOCKET_PULL);
4 $socket->connect('tcp://127.0.0.1:6001');
5
6 for ($i = 0; $i < 100; $i++) {
7 $data = $socket->recv();
8 list($a, $b) = explode('+', $data);
9 $c = $a + $b;
10 echo "$a + $b = $c\n";
11 }
12
13 // 1 + 31 = 32
14 // 84 + 50 = 134
15 // 88 + 62 = 150
PUSH → PULLPUB ⇉ SUBREQ ⇄ REPИ още няколко
(Node.js + Coffeecript)
1 zmq = require 'zmq'
2
3 inPort = 5555
4 outPort = 5556
5
6 connectedClients = {}
7
8 inSocket = zmq.createSocket 'push'
9 inSocket.bind "tcp://127.0.0.1:#{inPort}", (e) ->
10
11 outSocket = zmq.createSocket 'pull'
12 outSocket.bind "tcp://127.0.0.1:#{outPort}", (e) ->
13 outSocket.on 'message', (m) ->
14 title = m.toString('utf-8')
15 for id of connectedClients
16 connectedClients[id].send(title)
socket.io
1 import time, zmq
2 from lxml import html
3
4 c = zmq.Context()
5 in_socket = c.socket(zmq.PULL)
6 in_socket.connect('tcp://127.0.0.1:5555')
7
8 out_socket = c.socket(zmq.PUSH)
9 out_socket.connect('tcp://127.0.0.1:5556')
10
11 while True:
12 try:
13 url = in_socket.recv()
14 time.sleep(1) # Simulate latency
15 dom = html.parse(url)
16 title = dom.getroot().find('head/title').text
17
18 out_socket.send(title)
19 except KeyboardInterrupt:
20 break
Демо
Програмирането с нишки, семафори и други ЖП елементи е трудно. Една от алтернативите е различните процеси комуникират със съобщения. RabbitMQ и ZeroMQ са едни от примерите. Нека трудните проблеми да бъдат решени веднъж за винаги.


emil.vladev@gmail.com
| Table of Contents | t |
|---|---|
| Exposé | ESC |
| Full screen slides | e |
| Presenter View | p |
| Source Files | s |
| Slide Numbers | n |
| Toggle screen blanking | b |
| Show/hide slide context | c |
| Notes | 2 |
| Help | h |