tipc: introduce group unicast messaging
[linux-2.6-block.git] / net / tipc / group.c
CommitLineData
75da2163
JM
1/*
2 * net/tipc/group.c: TIPC group messaging code
3 *
4 * Copyright (c) 2017, Ericsson AB
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. Neither the names of the copyright holders nor the names of its
16 * contributors may be used to endorse or promote products derived from
17 * this software without specific prior written permission.
18 *
19 * Alternatively, this software may be distributed under the terms of the
20 * GNU General Public License ("GPL") version 2 as published by the Free
21 * Software Foundation.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "core.h"
37#include "addr.h"
38#include "group.h"
39#include "bcast.h"
40#include "server.h"
41#include "msg.h"
42#include "socket.h"
43#include "node.h"
44#include "name_table.h"
45#include "subscr.h"
46
47#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48#define ADV_IDLE ADV_UNIT
b7d42635 49#define ADV_ACTIVE (ADV_UNIT * 12)
75da2163
JM
50
51enum mbr_state {
52 MBR_QUARANTINED,
53 MBR_DISCOVERED,
54 MBR_JOINING,
55 MBR_PUBLISHED,
56 MBR_JOINED,
57 MBR_LEAVING
58};
59
60struct tipc_member {
61 struct rb_node tree_node;
62 struct list_head list;
b7d42635 63 struct list_head congested;
ae236fb2 64 struct sk_buff *event_msg;
b7d42635 65 struct tipc_group *group;
75da2163
JM
66 u32 node;
67 u32 port;
31c82a2d 68 u32 instance;
75da2163 69 enum mbr_state state;
b7d42635
JM
70 u16 advertised;
71 u16 window;
75da2163 72 u16 bc_rcv_nxt;
b7d42635 73 bool usr_pending;
75da2163
JM
74};
75
76struct tipc_group {
77 struct rb_root members;
b7d42635 78 struct list_head congested;
75da2163
JM
79 struct tipc_nlist dests;
80 struct net *net;
81 int subid;
82 u32 type;
83 u32 instance;
84 u32 domain;
85 u32 scope;
86 u32 portid;
87 u16 member_cnt;
88 u16 bc_snd_nxt;
89 bool loopback;
ae236fb2 90 bool events;
75da2163
JM
91};
92
93static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
94 int mtyp, struct sk_buff_head *xmitq);
95
b7d42635
JM
96static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
97{
98 int mcnt = grp->member_cnt + 1;
99
100 /* Scale to bytes, considering worst-case truesize/msgsize ratio */
101 return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
102}
103
75da2163
JM
104u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
105{
106 return grp->bc_snd_nxt;
107}
108
b7d42635
JM
109static bool tipc_group_is_enabled(struct tipc_member *m)
110{
111 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
112}
113
75da2163
JM
114static bool tipc_group_is_receiver(struct tipc_member *m)
115{
116 return m && m->state >= MBR_JOINED;
117}
118
119int tipc_group_size(struct tipc_group *grp)
120{
121 return grp->member_cnt;
122}
123
124struct tipc_group *tipc_group_create(struct net *net, u32 portid,
125 struct tipc_group_req *mreq)
126{
127 struct tipc_group *grp;
128 u32 type = mreq->type;
129
130 grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
131 if (!grp)
132 return NULL;
133 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
b7d42635 134 INIT_LIST_HEAD(&grp->congested);
75da2163
JM
135 grp->members = RB_ROOT;
136 grp->net = net;
137 grp->portid = portid;
138 grp->domain = addr_domain(net, mreq->scope);
139 grp->type = type;
140 grp->instance = mreq->instance;
141 grp->scope = mreq->scope;
142 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
ae236fb2 143 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
75da2163
JM
144 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
145 return grp;
146 kfree(grp);
147 return NULL;
148}
149
150void tipc_group_delete(struct net *net, struct tipc_group *grp)
151{
152 struct rb_root *tree = &grp->members;
153 struct tipc_member *m, *tmp;
154 struct sk_buff_head xmitq;
155
156 __skb_queue_head_init(&xmitq);
157
158 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
159 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
160 list_del(&m->list);
161 kfree(m);
162 }
163 tipc_node_distr_xmit(net, &xmitq);
164 tipc_nlist_purge(&grp->dests);
165 tipc_topsrv_kern_unsubscr(net, grp->subid);
166 kfree(grp);
167}
168
169struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
170 u32 node, u32 port)
171{
172 struct rb_node *n = grp->members.rb_node;
173 u64 nkey, key = (u64)node << 32 | port;
174 struct tipc_member *m;
175
176 while (n) {
177 m = container_of(n, struct tipc_member, tree_node);
178 nkey = (u64)m->node << 32 | m->port;
179 if (key < nkey)
180 n = n->rb_left;
181 else if (key > nkey)
182 n = n->rb_right;
183 else
184 return m;
185 }
186 return NULL;
187}
188
27bd9ec0
JM
189static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
190 u32 node, u32 port)
191{
192 struct tipc_member *m;
193
194 m = tipc_group_find_member(grp, node, port);
195 if (m && tipc_group_is_enabled(m))
196 return m;
197 return NULL;
198}
199
75da2163
JM
200static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
201 u32 node)
202{
203 struct tipc_member *m;
204 struct rb_node *n;
205
206 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
207 m = container_of(n, struct tipc_member, tree_node);
208 if (m->node == node)
209 return m;
210 }
211 return NULL;
212}
213
214static void tipc_group_add_to_tree(struct tipc_group *grp,
215 struct tipc_member *m)
216{
217 u64 nkey, key = (u64)m->node << 32 | m->port;
218 struct rb_node **n, *parent = NULL;
219 struct tipc_member *tmp;
220
221 n = &grp->members.rb_node;
222 while (*n) {
223 tmp = container_of(*n, struct tipc_member, tree_node);
224 parent = *n;
225 tmp = container_of(parent, struct tipc_member, tree_node);
226 nkey = (u64)tmp->node << 32 | tmp->port;
227 if (key < nkey)
228 n = &(*n)->rb_left;
229 else if (key > nkey)
230 n = &(*n)->rb_right;
231 else
232 return;
233 }
234 rb_link_node(&m->tree_node, parent, n);
235 rb_insert_color(&m->tree_node, &grp->members);
236}
237
238static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
239 u32 node, u32 port,
240 int state)
241{
242 struct tipc_member *m;
243
244 m = kzalloc(sizeof(*m), GFP_ATOMIC);
245 if (!m)
246 return NULL;
247 INIT_LIST_HEAD(&m->list);
b7d42635
JM
248 INIT_LIST_HEAD(&m->congested);
249 m->group = grp;
75da2163
JM
250 m->node = node;
251 m->port = port;
252 grp->member_cnt++;
253 tipc_group_add_to_tree(grp, m);
254 tipc_nlist_add(&grp->dests, m->node);
255 m->state = state;
256 return m;
257}
258
259void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
260{
261 tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
262}
263
264static void tipc_group_delete_member(struct tipc_group *grp,
265 struct tipc_member *m)
266{
267 rb_erase(&m->tree_node, &grp->members);
268 grp->member_cnt--;
269 list_del_init(&m->list);
b7d42635 270 list_del_init(&m->congested);
75da2163
JM
271
272 /* If last member on a node, remove node from dest list */
273 if (!tipc_group_find_node(grp, m->node))
274 tipc_nlist_del(&grp->dests, m->node);
275
276 kfree(m);
277}
278
279struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
280{
281 return &grp->dests;
282}
283
284void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
285 int *scope)
286{
287 seq->type = grp->type;
288 seq->lower = grp->instance;
289 seq->upper = grp->instance;
290 *scope = grp->scope;
291}
292
b7d42635
JM
293void tipc_group_update_member(struct tipc_member *m, int len)
294{
295 struct tipc_group *grp = m->group;
296 struct tipc_member *_m, *tmp;
297
298 if (!tipc_group_is_enabled(m))
299 return;
300
301 m->window -= len;
302
303 if (m->window >= ADV_IDLE)
304 return;
305
306 if (!list_empty(&m->congested))
307 return;
308
309 /* Sort member into congested members' list */
310 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
311 if (m->window > _m->window)
312 continue;
313 list_add_tail(&m->congested, &_m->congested);
314 return;
315 }
316 list_add_tail(&m->congested, &grp->congested);
317}
318
319void tipc_group_update_bc_members(struct tipc_group *grp, int len)
75da2163 320{
b7d42635
JM
321 struct tipc_member *m;
322 struct rb_node *n;
323
324 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
325 m = container_of(n, struct tipc_member, tree_node);
326 if (tipc_group_is_enabled(m))
327 tipc_group_update_member(m, len);
328 }
75da2163
JM
329 grp->bc_snd_nxt++;
330}
331
27bd9ec0
JM
332bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
333 int len, struct tipc_member **mbr)
b7d42635 334{
27bd9ec0 335 struct sk_buff_head xmitq;
b7d42635 336 struct tipc_member *m;
27bd9ec0
JM
337 int adv, state;
338
339 m = tipc_group_find_dest(grp, dnode, dport);
340 *mbr = m;
341 if (!m)
342 return false;
343 if (m->usr_pending)
344 return true;
345 if (m->window >= len)
346 return false;
347 m->usr_pending = true;
348
349 /* If not fully advertised, do it now to prevent mutual blocking */
350 adv = m->advertised;
351 state = m->state;
352 if (state < MBR_JOINED)
353 return true;
354 if (state == MBR_JOINED && adv == ADV_IDLE)
355 return true;
356 skb_queue_head_init(&xmitq);
357 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
358 tipc_node_distr_xmit(grp->net, &xmitq);
359 return true;
360}
361
362bool tipc_group_bc_cong(struct tipc_group *grp, int len)
363{
364 struct tipc_member *m = NULL;
b7d42635
JM
365
366 if (list_empty(&grp->congested))
367 return false;
368
369 m = list_first_entry(&grp->congested, struct tipc_member, congested);
370 if (m->window >= len)
371 return false;
372
27bd9ec0 373 return tipc_group_cong(grp, m->node, m->port, len, &m);
b7d42635
JM
374}
375
75da2163
JM
376/* tipc_group_filter_msg() - determine if we should accept arriving message
377 */
378void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
379 struct sk_buff_head *xmitq)
380{
381 struct sk_buff *skb = __skb_dequeue(inputq);
382 struct tipc_member *m;
383 struct tipc_msg *hdr;
384 u32 node, port;
385 int mtyp;
386
387 if (!skb)
388 return;
389
390 hdr = buf_msg(skb);
391 mtyp = msg_type(hdr);
392 node = msg_orignode(hdr);
393 port = msg_origport(hdr);
394
395 if (!msg_in_group(hdr))
396 goto drop;
397
ae236fb2
JM
398 if (mtyp == TIPC_GRP_MEMBER_EVT) {
399 if (!grp->events)
400 goto drop;
401 __skb_queue_tail(inputq, skb);
402 return;
403 }
404
75da2163
JM
405 m = tipc_group_find_member(grp, node, port);
406 if (!tipc_group_is_receiver(m))
407 goto drop;
408
31c82a2d 409 TIPC_SKB_CB(skb)->orig_member = m->instance;
75da2163
JM
410 __skb_queue_tail(inputq, skb);
411
412 m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
413 return;
414drop:
415 kfree_skb(skb);
416}
417
b7d42635
JM
418void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
419 u32 port, struct sk_buff_head *xmitq)
420{
421 struct tipc_member *m;
422
423 m = tipc_group_find_member(grp, node, port);
424 if (!m)
425 return;
426
427 m->advertised -= blks;
428
429 switch (m->state) {
430 case MBR_JOINED:
431 if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
432 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
433 break;
434 case MBR_DISCOVERED:
435 case MBR_JOINING:
436 case MBR_LEAVING:
437 default:
438 break;
439 }
440}
441
75da2163
JM
442static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
443 int mtyp, struct sk_buff_head *xmitq)
444{
445 struct tipc_msg *hdr;
446 struct sk_buff *skb;
b7d42635 447 int adv = 0;
75da2163
JM
448
449 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
450 m->node, tipc_own_addr(grp->net),
451 m->port, grp->portid, 0);
452 if (!skb)
453 return;
454
b7d42635
JM
455 if (m->state == MBR_JOINED)
456 adv = ADV_ACTIVE - m->advertised;
457
75da2163 458 hdr = buf_msg(skb);
b7d42635
JM
459
460 if (mtyp == GRP_JOIN_MSG) {
75da2163 461 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
b7d42635
JM
462 msg_set_adv_win(hdr, adv);
463 m->advertised += adv;
464 } else if (mtyp == GRP_ADV_MSG) {
465 msg_set_adv_win(hdr, adv);
466 m->advertised += adv;
467 }
75da2163
JM
468 __skb_queue_tail(xmitq, skb);
469}
470
b7d42635
JM
471void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
472 struct tipc_msg *hdr, struct sk_buff_head *inputq,
75da2163
JM
473 struct sk_buff_head *xmitq)
474{
475 u32 node = msg_orignode(hdr);
476 u32 port = msg_origport(hdr);
477 struct tipc_member *m;
478
479 if (!grp)
480 return;
481
482 m = tipc_group_find_member(grp, node, port);
483
484 switch (msg_type(hdr)) {
485 case GRP_JOIN_MSG:
486 if (!m)
487 m = tipc_group_create_member(grp, node, port,
488 MBR_QUARANTINED);
489 if (!m)
490 return;
491 m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
b7d42635 492 m->window += msg_adv_win(hdr);
75da2163
JM
493
494 /* Wait until PUBLISH event is received */
ae236fb2 495 if (m->state == MBR_DISCOVERED) {
75da2163 496 m->state = MBR_JOINING;
ae236fb2 497 } else if (m->state == MBR_PUBLISHED) {
75da2163 498 m->state = MBR_JOINED;
b7d42635
JM
499 *usr_wakeup = true;
500 m->usr_pending = false;
501 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
ae236fb2
JM
502 __skb_queue_tail(inputq, m->event_msg);
503 }
b7d42635
JM
504 if (m->window < ADV_IDLE)
505 tipc_group_update_member(m, 0);
506 else
507 list_del_init(&m->congested);
75da2163
JM
508 return;
509 case GRP_LEAVE_MSG:
510 if (!m)
511 return;
512
513 /* Wait until WITHDRAW event is received */
514 if (m->state != MBR_LEAVING) {
515 m->state = MBR_LEAVING;
516 return;
517 }
518 /* Otherwise deliver already received WITHDRAW event */
ae236fb2 519 __skb_queue_tail(inputq, m->event_msg);
b7d42635 520 *usr_wakeup = m->usr_pending;
75da2163 521 tipc_group_delete_member(grp, m);
b7d42635
JM
522 list_del_init(&m->congested);
523 return;
524 case GRP_ADV_MSG:
525 if (!m)
526 return;
527 m->window += msg_adv_win(hdr);
528 *usr_wakeup = m->usr_pending;
529 m->usr_pending = false;
530 list_del_init(&m->congested);
75da2163
JM
531 return;
532 default:
533 pr_warn("Received unknown GROUP_PROTO message\n");
534 }
535}
536
b7d42635
JM
537/* tipc_group_member_evt() - receive and handle a member up/down event
538 */
75da2163 539void tipc_group_member_evt(struct tipc_group *grp,
b7d42635
JM
540 bool *usr_wakeup,
541 int *sk_rcvbuf,
75da2163 542 struct sk_buff *skb,
ae236fb2 543 struct sk_buff_head *inputq,
75da2163
JM
544 struct sk_buff_head *xmitq)
545{
546 struct tipc_msg *hdr = buf_msg(skb);
547 struct tipc_event *evt = (void *)msg_data(hdr);
ae236fb2 548 u32 instance = evt->found_lower;
75da2163
JM
549 u32 node = evt->port.node;
550 u32 port = evt->port.ref;
ae236fb2 551 int event = evt->event;
75da2163
JM
552 struct tipc_member *m;
553 struct net *net;
554 u32 self;
555
556 if (!grp)
557 goto drop;
558
559 net = grp->net;
560 self = tipc_own_addr(net);
561 if (!grp->loopback && node == self && port == grp->portid)
562 goto drop;
563
ae236fb2
JM
564 /* Convert message before delivery to user */
565 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
566 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
567 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
568 msg_set_origport(hdr, port);
569 msg_set_orignode(hdr, node);
570 msg_set_nametype(hdr, grp->type);
571 msg_set_grp_evt(hdr, event);
572
75da2163
JM
573 m = tipc_group_find_member(grp, node, port);
574
ae236fb2 575 if (event == TIPC_PUBLISHED) {
75da2163
JM
576 if (!m)
577 m = tipc_group_create_member(grp, node, port,
578 MBR_DISCOVERED);
579 if (!m)
580 goto drop;
581
ae236fb2
JM
582 /* Hold back event if JOIN message not yet received */
583 if (m->state == MBR_DISCOVERED) {
584 m->event_msg = skb;
75da2163 585 m->state = MBR_PUBLISHED;
ae236fb2
JM
586 } else {
587 __skb_queue_tail(inputq, skb);
75da2163 588 m->state = MBR_JOINED;
b7d42635
JM
589 *usr_wakeup = true;
590 m->usr_pending = false;
ae236fb2
JM
591 }
592 m->instance = instance;
593 TIPC_SKB_CB(skb)->orig_member = m->instance;
75da2163 594 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
b7d42635
JM
595 if (m->window < ADV_IDLE)
596 tipc_group_update_member(m, 0);
597 else
598 list_del_init(&m->congested);
ae236fb2 599 } else if (event == TIPC_WITHDRAWN) {
75da2163
JM
600 if (!m)
601 goto drop;
602
ae236fb2
JM
603 TIPC_SKB_CB(skb)->orig_member = m->instance;
604
b7d42635
JM
605 *usr_wakeup = m->usr_pending;
606 m->usr_pending = false;
607
ae236fb2
JM
608 /* Hold back event if more messages might be expected */
609 if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
610 m->event_msg = skb;
75da2163 611 m->state = MBR_LEAVING;
ae236fb2
JM
612 } else {
613 __skb_queue_tail(inputq, skb);
75da2163 614 tipc_group_delete_member(grp, m);
ae236fb2 615 }
b7d42635 616 list_del_init(&m->congested);
75da2163 617 }
b7d42635 618 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
ae236fb2 619 return;
75da2163
JM
620drop:
621 kfree_skb(skb);
622}