[TIPC] Initial merge
[linux-block.git] / net / tipc / subscr.c
1 /*
2  * net/tipc/subscr.c: TIPC subscription service
3  * 
4  * Copyright (c) 2003-2005, Ericsson Research Canada
5  * Copyright (c) 2005, Wind River Systems
6  * Copyright (c) 2005-2006, Ericsson AB
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * Redistributions of source code must retain the above copyright notice, this 
13  * list of conditions and the following disclaimer.
14  * Redistributions in binary form must reproduce the above copyright notice, 
15  * this list of conditions and the following disclaimer in the documentation 
16  * and/or other materials provided with the distribution.
17  * Neither the names of the copyright holders nor the names of its 
18  * contributors may be used to endorse or promote products derived from this 
19  * software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
24  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "core.h"
35 #include "dbg.h"
36 #include "subscr.h"
37 #include "name_table.h"
38 #include "ref.h"
39
40 /**
41  * struct subscriber - TIPC network topology subscriber
42  * @ref: object reference to subscriber object itself
43  * @lock: pointer to spinlock controlling access to subscriber object
44  * @subscriber_list: adjacent subscribers in top. server's list of subscribers
45  * @subscription_list: list of subscription objects for this subscriber
46  * @port_ref: object reference to port used to communicate with subscriber
47  * @swap: indicates if subscriber uses opposite endianness in its messages
48  */
49  
50 struct subscriber {
51         u32 ref;
52         spinlock_t *lock;
53         struct list_head subscriber_list;
54         struct list_head subscription_list;
55         u32 port_ref;
56         int swap;
57 };
58
59 /**
60  * struct top_srv - TIPC network topology subscription service
61  * @user_ref: TIPC userid of subscription service
62  * @setup_port: reference to TIPC port that handles subscription requests
63  * @subscription_count: number of active subscriptions (not subscribers!)
64  * @subscriber_list: list of ports subscribing to service
65  * @lock: spinlock govering access to subscriber list
66  */
67
68 struct top_srv {
69         u32 user_ref;
70         u32 setup_port;
71         atomic_t subscription_count;
72         struct list_head subscriber_list;
73         spinlock_t lock;
74 };
75
76 static struct top_srv topsrv = { 0 };
77
78 /**
79  * htohl - convert value to endianness used by destination
80  * @in: value to convert
81  * @swap: non-zero if endianness must be reversed
82  * 
83  * Returns converted value
84  */
85
86 static inline u32 htohl(u32 in, int swap)
87 {
88         char *c = (char *)∈
89
90         return swap ? ((c[3] << 3) + (c[2] << 2) + (c[1] << 1) + c[0]) : in;
91 }
92
93 /**
94  * subscr_send_event - send a message containing a tipc_event to the subscriber
95  */
96
97 static void subscr_send_event(struct subscription *sub, 
98                               u32 found_lower, 
99                               u32 found_upper,
100                               u32 event, 
101                               u32 port_ref, 
102                               u32 node)
103 {
104         struct iovec msg_sect;
105
106         msg_sect.iov_base = (void *)&sub->evt;
107         msg_sect.iov_len = sizeof(struct tipc_event);
108
109         sub->evt.event = htohl(event, sub->owner->swap);
110         sub->evt.found_lower = htohl(found_lower, sub->owner->swap);
111         sub->evt.found_upper = htohl(found_upper, sub->owner->swap);
112         sub->evt.port.ref = htohl(port_ref, sub->owner->swap);
113         sub->evt.port.node = htohl(node, sub->owner->swap);
114         tipc_send(sub->owner->port_ref, 1, &msg_sect);
115 }
116
117 /**
118  * subscr_overlap - test for subscription overlap with the given values
119  *
120  * Returns 1 if there is overlap, otherwise 0.
121  */
122
123 int subscr_overlap(struct subscription *sub, 
124                    u32 found_lower, 
125                    u32 found_upper)
126
127 {
128         if (found_lower < sub->seq.lower)
129                 found_lower = sub->seq.lower;
130         if (found_upper > sub->seq.upper)
131                 found_upper = sub->seq.upper;
132         if (found_lower > found_upper)
133                 return 0;
134         return 1;
135 }
136
137 /**
138  * subscr_report_overlap - issue event if there is subscription overlap
139  * 
140  * Protected by nameseq.lock in name_table.c
141  */
142
143 void subscr_report_overlap(struct subscription *sub, 
144                            u32 found_lower, 
145                            u32 found_upper,
146                            u32 event, 
147                            u32 port_ref, 
148                            u32 node,
149                            int must)
150 {
151         dbg("Rep overlap %u:%u,%u<->%u,%u\n", sub->seq.type, sub->seq.lower,
152             sub->seq.upper, found_lower, found_upper);
153         if (!subscr_overlap(sub, found_lower, found_upper))
154                 return;
155         if (!must && (sub->filter != TIPC_SUB_PORTS))
156                 return;
157         subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
158 }
159
160 /**
161  * subscr_timeout - subscription timeout has occurred
162  */
163
164 static void subscr_timeout(struct subscription *sub)
165 {
166         struct subscriber *subscriber;
167         u32 subscriber_ref;
168
169         /* Validate subscriber reference (in case subscriber is terminating) */
170
171         subscriber_ref = sub->owner->ref;
172         subscriber = (struct subscriber *)ref_lock(subscriber_ref);
173         if (subscriber == NULL)
174                 return;
175
176         /* Unlink subscription from name table */
177
178         nametbl_unsubscribe(sub);
179
180         /* Notify subscriber of timeout, then unlink subscription */
181
182         subscr_send_event(sub, 
183                           sub->evt.s.seq.lower, 
184                           sub->evt.s.seq.upper,
185                           TIPC_SUBSCR_TIMEOUT, 
186                           0, 
187                           0);
188         list_del(&sub->subscription_list);
189
190         /* Now destroy subscription */
191
192         ref_unlock(subscriber_ref);
193         k_term_timer(&sub->timer);
194         kfree(sub);
195         atomic_dec(&topsrv.subscription_count);
196 }
197
198 /**
199  * subscr_terminate - terminate communication with a subscriber
200  * 
201  * Called with subscriber locked.  Routine must temporarily release this lock
202  * to enable subscription timeout routine(s) to finish without deadlocking; 
203  * the lock is then reclaimed to allow caller to release it upon return.
204  * (This should work even in the unlikely event some other thread creates 
205  * a new object reference in the interim that uses this lock; this routine will
206  * simply wait for it to be released, then claim it.)
207  */
208
209 static void subscr_terminate(struct subscriber *subscriber)
210 {
211         struct subscription *sub;
212         struct subscription *sub_temp;
213
214         /* Invalidate subscriber reference */
215
216         ref_discard(subscriber->ref);
217         spin_unlock_bh(subscriber->lock);
218
219         /* Destroy any existing subscriptions for subscriber */
220         
221         list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
222                                  subscription_list) {
223                 if (sub->timeout != TIPC_WAIT_FOREVER) {
224                         k_cancel_timer(&sub->timer);
225                         k_term_timer(&sub->timer);
226                 }
227                 nametbl_unsubscribe(sub);
228                 list_del(&sub->subscription_list);
229                 dbg("Term: Removed sub %u,%u,%u from subscriber %x list\n",
230                     sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
231                 kfree(sub);
232                 atomic_dec(&topsrv.subscription_count);
233         }
234
235         /* Sever connection to subscriber */
236
237         tipc_shutdown(subscriber->port_ref);
238         tipc_deleteport(subscriber->port_ref);
239
240         /* Remove subscriber from topology server's subscriber list */
241
242         spin_lock_bh(&topsrv.lock);
243         list_del(&subscriber->subscriber_list);
244         spin_unlock_bh(&topsrv.lock);
245
246         /* Now destroy subscriber */
247
248         spin_lock_bh(subscriber->lock);
249         kfree(subscriber);
250 }
251
252 /**
253  * subscr_subscribe - create subscription for subscriber
254  * 
255  * Called with subscriber locked
256  */
257
258 static void subscr_subscribe(struct tipc_subscr *s,
259                              struct subscriber *subscriber)
260 {
261         struct subscription *sub;
262
263         /* Refuse subscription if global limit exceeded */
264
265         if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) {
266                 warn("Failed: max %u subscriptions\n", tipc_max_subscriptions);
267                 subscr_terminate(subscriber);
268                 return;
269         }
270
271         /* Allocate subscription object */
272
273         sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
274         if (sub == NULL) {
275                 warn("Memory squeeze; ignoring subscription\n");
276                 subscr_terminate(subscriber);
277                 return;
278         }
279
280         /* Determine/update subscriber's endianness */
281
282         if ((s->filter == TIPC_SUB_PORTS) || (s->filter == TIPC_SUB_SERVICE))
283                 subscriber->swap = 0;
284         else
285                 subscriber->swap = 1;
286
287         /* Initialize subscription object */
288
289         memset(sub, 0, sizeof(*sub));
290         sub->seq.type = htohl(s->seq.type, subscriber->swap);
291         sub->seq.lower = htohl(s->seq.lower, subscriber->swap);
292         sub->seq.upper = htohl(s->seq.upper, subscriber->swap);
293         sub->timeout = htohl(s->timeout, subscriber->swap);
294         sub->filter = htohl(s->filter, subscriber->swap);
295         if ((((sub->filter != TIPC_SUB_PORTS) 
296               && (sub->filter != TIPC_SUB_SERVICE)))
297             || (sub->seq.lower > sub->seq.upper)) {
298                 warn("Rejecting illegal subscription %u,%u,%u\n",
299                      sub->seq.type, sub->seq.lower, sub->seq.upper);
300                 kfree(sub);
301                 subscr_terminate(subscriber);
302                 return;
303         }
304         memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
305         INIT_LIST_HEAD(&sub->subscription_list);
306         INIT_LIST_HEAD(&sub->nameseq_list);
307         list_add(&sub->subscription_list, &subscriber->subscription_list);
308         atomic_inc(&topsrv.subscription_count);
309         if (sub->timeout != TIPC_WAIT_FOREVER) {
310                 k_init_timer(&sub->timer,
311                              (Handler)subscr_timeout, (unsigned long)sub);
312                 k_start_timer(&sub->timer, sub->timeout);
313         }
314         sub->owner = subscriber;
315         nametbl_subscribe(sub);
316 }
317
318 /**
319  * subscr_conn_shutdown_event - handle termination request from subscriber
320  */
321
322 static void subscr_conn_shutdown_event(void *usr_handle,
323                                        u32 portref,
324                                        struct sk_buff **buf,
325                                        unsigned char const *data,
326                                        unsigned int size,
327                                        int reason)
328 {
329         struct subscriber *subscriber = ref_lock((u32)usr_handle);
330         spinlock_t *subscriber_lock;
331
332         if (subscriber == NULL)
333                 return;
334
335         subscriber_lock = subscriber->lock;
336         subscr_terminate(subscriber);
337         spin_unlock_bh(subscriber_lock);
338 }
339
340 /**
341  * subscr_conn_msg_event - handle new subscription request from subscriber
342  */
343
344 static void subscr_conn_msg_event(void *usr_handle,
345                                   u32 port_ref,
346                                   struct sk_buff **buf,
347                                   const unchar *data,
348                                   u32 size)
349 {
350         struct subscriber *subscriber = ref_lock((u32)usr_handle);
351         spinlock_t *subscriber_lock;
352
353         if (subscriber == NULL)
354                 return;
355
356         subscriber_lock = subscriber->lock;
357         if (size != sizeof(struct tipc_subscr))
358                 subscr_terminate(subscriber);
359         else
360                 subscr_subscribe((struct tipc_subscr *)data, subscriber);
361         
362         spin_unlock_bh(subscriber_lock);
363 }
364
365 /**
366  * subscr_named_msg_event - handle request to establish a new subscriber
367  */
368
369 static void subscr_named_msg_event(void *usr_handle,
370                                    u32 port_ref,
371                                    struct sk_buff **buf,
372                                    const unchar *data,
373                                    u32 size,
374                                    u32 importance, 
375                                    struct tipc_portid const *orig,
376                                    struct tipc_name_seq const *dest)
377 {
378         struct subscriber *subscriber;
379         struct iovec msg_sect = {0, 0};
380         spinlock_t *subscriber_lock;
381
382         dbg("subscr_named_msg_event: orig = %x own = %x,\n",
383             orig->node, tipc_own_addr);
384         if (size && (size != sizeof(struct tipc_subscr))) {
385                 warn("Received tipc_subscr of invalid size\n");
386                 return;
387         }
388
389         /* Create subscriber object */
390
391         subscriber = kmalloc(sizeof(struct subscriber), GFP_ATOMIC);
392         if (subscriber == NULL) {
393                 warn("Memory squeeze; ignoring subscriber setup\n");
394                 return;
395         }
396         memset(subscriber, 0, sizeof(struct subscriber));
397         INIT_LIST_HEAD(&subscriber->subscription_list);
398         INIT_LIST_HEAD(&subscriber->subscriber_list);
399         subscriber->ref = ref_acquire(subscriber, &subscriber->lock);
400         if (subscriber->ref == 0) {
401                 warn("Failed to acquire subscriber reference\n");
402                 kfree(subscriber);
403                 return;
404         }
405
406         /* Establish a connection to subscriber */
407
408         tipc_createport(topsrv.user_ref,
409                         (void *)subscriber->ref,
410                         importance,
411                         0,
412                         0,
413                         subscr_conn_shutdown_event,
414                         0,
415                         0,
416                         subscr_conn_msg_event,
417                         0,
418                         &subscriber->port_ref);
419         if (subscriber->port_ref == 0) {
420                 warn("Memory squeeze; failed to create subscription port\n");
421                 ref_discard(subscriber->ref);
422                 kfree(subscriber);
423                 return;
424         }
425         tipc_connect2port(subscriber->port_ref, orig);
426
427
428         /* Add subscriber to topology server's subscriber list */
429
430         ref_lock(subscriber->ref);
431         spin_lock_bh(&topsrv.lock);
432         list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
433         spin_unlock_bh(&topsrv.lock);
434
435         /*
436          * Subscribe now if message contains a subscription,
437          * otherwise send an empty response to complete connection handshaking
438          */
439
440         subscriber_lock = subscriber->lock;
441         if (size)
442                 subscr_subscribe((struct tipc_subscr *)data, subscriber);
443         else
444                 tipc_send(subscriber->port_ref, 1, &msg_sect);
445
446         spin_unlock_bh(subscriber_lock);
447 }
448
449 int subscr_start(void)
450 {
451         struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV};
452         int res = -1;
453
454         memset(&topsrv, 0, sizeof (topsrv));
455         topsrv.lock = SPIN_LOCK_UNLOCKED;
456         INIT_LIST_HEAD(&topsrv.subscriber_list);
457
458         spin_lock_bh(&topsrv.lock);
459         res = tipc_attach(&topsrv.user_ref, 0, 0);
460         if (res) {
461                 spin_unlock_bh(&topsrv.lock);
462                 return res;
463         }
464
465         res = tipc_createport(topsrv.user_ref,
466                               0,
467                               TIPC_CRITICAL_IMPORTANCE,
468                               0,
469                               0,
470                               0,
471                               0,
472                               subscr_named_msg_event,
473                               0,
474                               0,
475                               &topsrv.setup_port);
476         if (res)
477                 goto failed;
478
479         res = nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
480         if (res)
481                 goto failed;
482
483         spin_unlock_bh(&topsrv.lock);
484         return 0;
485
486 failed:
487         err("Unable to create subscription service\n");
488         tipc_detach(topsrv.user_ref);
489         topsrv.user_ref = 0;
490         spin_unlock_bh(&topsrv.lock);
491         return res;
492 }
493
494 void subscr_stop(void)
495 {
496         struct subscriber *subscriber;
497         struct subscriber *subscriber_temp;
498         spinlock_t *subscriber_lock;
499
500         if (topsrv.user_ref) {
501                 tipc_deleteport(topsrv.setup_port);
502                 list_for_each_entry_safe(subscriber, subscriber_temp, 
503                                          &topsrv.subscriber_list,
504                                          subscriber_list) {
505                         ref_lock(subscriber->ref);
506                         subscriber_lock = subscriber->lock;
507                         subscr_terminate(subscriber);
508                         spin_unlock_bh(subscriber_lock);
509                 }
510                 tipc_detach(topsrv.user_ref);
511                 topsrv.user_ref = 0;
512         }
513 }
514
515
516 int tipc_ispublished(struct tipc_name const *name)
517 {
518         u32 domain = 0;
519
520         return(nametbl_translate(name->type, name->instance,&domain) != 0);
521 }
522