tipc: remove redundant code in topology server
[linux-2.6-block.git] / net / tipc / server.c
1 /*
2  * net/tipc/server.c: TIPC server infrastructure
3  *
4  * Copyright (c) 2012-2013, Wind River Systems
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. Neither the names of the copyright holders nor the names of its
16  *    contributors may be used to endorse or promote products derived from
17  *    this software without specific prior written permission.
18  *
19  * Alternatively, this software may be distributed under the terms of the
20  * GNU General Public License ("GPL") version 2 as published by the Free
21  * Software Foundation.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33  * POSSIBILITY OF SUCH DAMAGE.
34  */
35
36 #include "server.h"
37 #include "core.h"
38 #include "socket.h"
39 #include "addr.h"
40 #include "msg.h"
41 #include <net/sock.h>
42 #include <linux/module.h>
43
44 /* Number of messages to send before rescheduling */
45 #define MAX_SEND_MSG_COUNT      25
46 #define MAX_RECV_MSG_COUNT      25
47 #define CF_CONNECTED            1
48 #define CF_SERVER               2
49
50 #define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
51
52 /**
53  * struct tipc_conn - TIPC connection structure
54  * @kref: reference counter to connection object
55  * @conid: connection identifier
56  * @sock: socket handler associated with connection
57  * @flags: indicates connection state
58  * @server: pointer to connected server
59  * @rwork: receive work item
60  * @usr_data: user-specified field
61  * @rx_action: what to do when connection socket is active
62  * @outqueue: pointer to first outbound message in queue
63  * @outqueue_lock: control access to the outqueue
64  * @outqueue: list of connection objects for its server
65  * @swork: send work item
66  */
67 struct tipc_conn {
68         struct kref kref;
69         int conid;
70         struct socket *sock;
71         unsigned long flags;
72         struct tipc_server *server;
73         struct work_struct rwork;
74         int (*rx_action) (struct tipc_conn *con);
75         void *usr_data;
76         struct list_head outqueue;
77         spinlock_t outqueue_lock;
78         struct work_struct swork;
79 };
80
81 /* An entry waiting to be sent */
82 struct outqueue_entry {
83         struct list_head list;
84         struct kvec iov;
85 };
86
87 static void tipc_recv_work(struct work_struct *work);
88 static void tipc_send_work(struct work_struct *work);
89 static void tipc_clean_outqueues(struct tipc_conn *con);
90
91 static void tipc_conn_kref_release(struct kref *kref)
92 {
93         struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
94         struct tipc_server *s = con->server;
95         struct socket *sock = con->sock;
96         struct sock *sk;
97
98         if (sock) {
99                 sk = sock->sk;
100                 if (test_bit(CF_SERVER, &con->flags)) {
101                         __module_get(sock->ops->owner);
102                         __module_get(sk->sk_prot_creator->owner);
103                 }
104                 sock_release(sock);
105                 con->sock = NULL;
106         }
107         spin_lock_bh(&s->idr_lock);
108         idr_remove(&s->conn_idr, con->conid);
109         s->idr_in_use--;
110         spin_unlock_bh(&s->idr_lock);
111         tipc_clean_outqueues(con);
112         kfree(con);
113 }
114
115 static void conn_put(struct tipc_conn *con)
116 {
117         kref_put(&con->kref, tipc_conn_kref_release);
118 }
119
120 static void conn_get(struct tipc_conn *con)
121 {
122         kref_get(&con->kref);
123 }
124
125 static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
126 {
127         struct tipc_conn *con;
128
129         spin_lock_bh(&s->idr_lock);
130         con = idr_find(&s->conn_idr, conid);
131         if (con) {
132                 if (!test_bit(CF_CONNECTED, &con->flags) ||
133                     !kref_get_unless_zero(&con->kref))
134                         con = NULL;
135         }
136         spin_unlock_bh(&s->idr_lock);
137         return con;
138 }
139
140 static void sock_data_ready(struct sock *sk)
141 {
142         struct tipc_conn *con;
143
144         read_lock_bh(&sk->sk_callback_lock);
145         con = sock2con(sk);
146         if (con && test_bit(CF_CONNECTED, &con->flags)) {
147                 conn_get(con);
148                 if (!queue_work(con->server->rcv_wq, &con->rwork))
149                         conn_put(con);
150         }
151         read_unlock_bh(&sk->sk_callback_lock);
152 }
153
154 static void sock_write_space(struct sock *sk)
155 {
156         struct tipc_conn *con;
157
158         read_lock_bh(&sk->sk_callback_lock);
159         con = sock2con(sk);
160         if (con && test_bit(CF_CONNECTED, &con->flags)) {
161                 conn_get(con);
162                 if (!queue_work(con->server->send_wq, &con->swork))
163                         conn_put(con);
164         }
165         read_unlock_bh(&sk->sk_callback_lock);
166 }
167
168 static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
169 {
170         struct sock *sk = sock->sk;
171
172         write_lock_bh(&sk->sk_callback_lock);
173
174         sk->sk_data_ready = sock_data_ready;
175         sk->sk_write_space = sock_write_space;
176         sk->sk_user_data = con;
177
178         con->sock = sock;
179
180         write_unlock_bh(&sk->sk_callback_lock);
181 }
182
183 static void tipc_close_conn(struct tipc_conn *con)
184 {
185         struct tipc_server *s = con->server;
186         struct sock *sk = con->sock->sk;
187         bool disconnect = false;
188
189         write_lock_bh(&sk->sk_callback_lock);
190         disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
191         if (disconnect) {
192                 sk->sk_user_data = NULL;
193                 if (con->conid)
194                         s->tipc_conn_release(con->conid, con->usr_data);
195         }
196         write_unlock_bh(&sk->sk_callback_lock);
197
198         /* Handle concurrent calls from sending and receiving threads */
199         if (!disconnect)
200                 return;
201
202         /* Don't flush pending works, -just let them expire */
203         kernel_sock_shutdown(con->sock, SHUT_RDWR);
204         conn_put(con);
205 }
206
207 static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
208 {
209         struct tipc_conn *con;
210         int ret;
211
212         con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
213         if (!con)
214                 return ERR_PTR(-ENOMEM);
215
216         kref_init(&con->kref);
217         INIT_LIST_HEAD(&con->outqueue);
218         spin_lock_init(&con->outqueue_lock);
219         INIT_WORK(&con->swork, tipc_send_work);
220         INIT_WORK(&con->rwork, tipc_recv_work);
221
222         spin_lock_bh(&s->idr_lock);
223         ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
224         if (ret < 0) {
225                 kfree(con);
226                 spin_unlock_bh(&s->idr_lock);
227                 return ERR_PTR(-ENOMEM);
228         }
229         con->conid = ret;
230         s->idr_in_use++;
231         spin_unlock_bh(&s->idr_lock);
232
233         set_bit(CF_CONNECTED, &con->flags);
234         con->server = s;
235
236         return con;
237 }
238
239 static int tipc_receive_from_sock(struct tipc_conn *con)
240 {
241         struct tipc_server *s = con->server;
242         struct sock *sk = con->sock->sk;
243         struct sockaddr_tipc addr;
244         struct msghdr msg = {};
245         struct kvec iov;
246         void *buf;
247         int ret;
248
249         buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
250         if (!buf) {
251                 ret = -ENOMEM;
252                 goto out_close;
253         }
254
255         iov.iov_base = buf;
256         iov.iov_len = s->max_rcvbuf_size;
257         msg.msg_name = &addr;
258         iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
259         ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
260         if (ret <= 0) {
261                 kmem_cache_free(s->rcvbuf_cache, buf);
262                 goto out_close;
263         }
264
265         read_lock_bh(&sk->sk_callback_lock);
266         if (test_bit(CF_CONNECTED, &con->flags))
267                 ret = s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid,
268                                            &addr, con->usr_data, buf, ret);
269         read_unlock_bh(&sk->sk_callback_lock);
270         kmem_cache_free(s->rcvbuf_cache, buf);
271         if (ret < 0)
272                 tipc_conn_terminate(s, con->conid);
273         return ret;
274
275 out_close:
276         if (ret != -EWOULDBLOCK)
277                 tipc_close_conn(con);
278         else if (ret == 0)
279                 /* Don't return success if we really got EOF */
280                 ret = -EAGAIN;
281
282         return ret;
283 }
284
285 static int tipc_accept_from_sock(struct tipc_conn *con)
286 {
287         struct tipc_server *s = con->server;
288         struct socket *sock = con->sock;
289         struct socket *newsock;
290         struct tipc_conn *newcon;
291         int ret;
292
293         ret = kernel_accept(sock, &newsock, O_NONBLOCK);
294         if (ret < 0)
295                 return ret;
296
297         newcon = tipc_alloc_conn(con->server);
298         if (IS_ERR(newcon)) {
299                 ret = PTR_ERR(newcon);
300                 sock_release(newsock);
301                 return ret;
302         }
303
304         newcon->rx_action = tipc_receive_from_sock;
305         tipc_register_callbacks(newsock, newcon);
306
307         /* Notify that new connection is incoming */
308         newcon->usr_data = s->tipc_conn_new(newcon->conid);
309         if (!newcon->usr_data) {
310                 sock_release(newsock);
311                 conn_put(newcon);
312                 return -ENOMEM;
313         }
314
315         /* Wake up receive process in case of 'SYN+' message */
316         newsock->sk->sk_data_ready(newsock->sk);
317         return ret;
318 }
319
320 static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
321 {
322         struct tipc_server *s = con->server;
323         struct socket *sock = NULL;
324         int imp = TIPC_CRITICAL_IMPORTANCE;
325         int ret;
326
327         ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock);
328         if (ret < 0)
329                 return NULL;
330         ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
331                                 (char *)&imp, sizeof(imp));
332         if (ret < 0)
333                 goto create_err;
334         ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
335         if (ret < 0)
336                 goto create_err;
337
338         con->rx_action = tipc_accept_from_sock;
339         ret = kernel_listen(sock, 0);
340         if (ret < 0)
341                 goto create_err;
342
343         /* As server's listening socket owner and creator is the same module,
344          * we have to decrease TIPC module reference count to guarantee that
345          * it remains zero after the server socket is created, otherwise,
346          * executing "rmmod" command is unable to make TIPC module deleted
347          * after TIPC module is inserted successfully.
348          *
349          * However, the reference count is ever increased twice in
350          * sock_create_kern(): one is to increase the reference count of owner
351          * of TIPC socket's proto_ops struct; another is to increment the
352          * reference count of owner of TIPC proto struct. Therefore, we must
353          * decrement the module reference count twice to ensure that it keeps
354          * zero after server's listening socket is created. Of course, we
355          * must bump the module reference count twice as well before the socket
356          * is closed.
357          */
358         module_put(sock->ops->owner);
359         module_put(sock->sk->sk_prot_creator->owner);
360         set_bit(CF_SERVER, &con->flags);
361
362         return sock;
363
364 create_err:
365         kernel_sock_shutdown(sock, SHUT_RDWR);
366         sock_release(sock);
367         return NULL;
368 }
369
370 static int tipc_open_listening_sock(struct tipc_server *s)
371 {
372         struct socket *sock;
373         struct tipc_conn *con;
374
375         con = tipc_alloc_conn(s);
376         if (IS_ERR(con))
377                 return PTR_ERR(con);
378
379         sock = tipc_create_listen_sock(con);
380         if (!sock) {
381                 idr_remove(&s->conn_idr, con->conid);
382                 s->idr_in_use--;
383                 kfree(con);
384                 return -EINVAL;
385         }
386
387         tipc_register_callbacks(sock, con);
388         return 0;
389 }
390
391 static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
392 {
393         struct outqueue_entry *entry;
394         void *buf;
395
396         entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
397         if (!entry)
398                 return NULL;
399
400         buf = kmemdup(data, len, GFP_ATOMIC);
401         if (!buf) {
402                 kfree(entry);
403                 return NULL;
404         }
405
406         entry->iov.iov_base = buf;
407         entry->iov.iov_len = len;
408
409         return entry;
410 }
411
412 static void tipc_free_entry(struct outqueue_entry *e)
413 {
414         kfree(e->iov.iov_base);
415         kfree(e);
416 }
417
418 static void tipc_clean_outqueues(struct tipc_conn *con)
419 {
420         struct outqueue_entry *e, *safe;
421
422         spin_lock_bh(&con->outqueue_lock);
423         list_for_each_entry_safe(e, safe, &con->outqueue, list) {
424                 list_del(&e->list);
425                 tipc_free_entry(e);
426         }
427         spin_unlock_bh(&con->outqueue_lock);
428 }
429
430 int tipc_conn_sendmsg(struct tipc_server *s, int conid,
431                       void *data, size_t len)
432 {
433         struct outqueue_entry *e;
434         struct tipc_conn *con;
435
436         con = tipc_conn_lookup(s, conid);
437         if (!con)
438                 return -EINVAL;
439
440         if (!test_bit(CF_CONNECTED, &con->flags)) {
441                 conn_put(con);
442                 return 0;
443         }
444
445         e = tipc_alloc_entry(data, len);
446         if (!e) {
447                 conn_put(con);
448                 return -ENOMEM;
449         }
450
451         spin_lock_bh(&con->outqueue_lock);
452         list_add_tail(&e->list, &con->outqueue);
453         spin_unlock_bh(&con->outqueue_lock);
454
455         if (!queue_work(s->send_wq, &con->swork))
456                 conn_put(con);
457         return 0;
458 }
459
460 void tipc_conn_terminate(struct tipc_server *s, int conid)
461 {
462         struct tipc_conn *con;
463
464         con = tipc_conn_lookup(s, conid);
465         if (con) {
466                 tipc_close_conn(con);
467                 conn_put(con);
468         }
469 }
470
471 bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
472                              u32 upper, u32 filter, int *conid)
473 {
474         struct tipc_subscriber *scbr;
475         struct tipc_subscr sub;
476         struct tipc_server *s;
477         struct tipc_conn *con;
478
479         sub.seq.type = type;
480         sub.seq.lower = lower;
481         sub.seq.upper = upper;
482         sub.timeout = TIPC_WAIT_FOREVER;
483         sub.filter = filter;
484         *(u32 *)&sub.usr_handle = port;
485
486         con = tipc_alloc_conn(tipc_topsrv(net));
487         if (IS_ERR(con))
488                 return false;
489
490         *conid = con->conid;
491         s = con->server;
492         scbr = s->tipc_conn_new(*conid);
493         if (!scbr) {
494                 conn_put(con);
495                 return false;
496         }
497
498         con->usr_data = scbr;
499         con->sock = NULL;
500         s->tipc_conn_recvmsg(net, *conid, NULL, scbr, &sub, sizeof(sub));
501         return true;
502 }
503
504 void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
505 {
506         struct tipc_conn *con;
507         struct tipc_server *srv;
508
509         con = tipc_conn_lookup(tipc_topsrv(net), conid);
510         if (!con)
511                 return;
512
513         test_and_clear_bit(CF_CONNECTED, &con->flags);
514         srv = con->server;
515         if (con->conid)
516                 srv->tipc_conn_release(con->conid, con->usr_data);
517         conn_put(con);
518         conn_put(con);
519 }
520
521 static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
522 {
523         u32 port = *(u32 *)&evt->s.usr_handle;
524         u32 self = tipc_own_addr(net);
525         struct sk_buff_head evtq;
526         struct sk_buff *skb;
527
528         skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
529                               self, self, port, port, 0);
530         if (!skb)
531                 return;
532         msg_set_dest_droppable(buf_msg(skb), true);
533         memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
534         skb_queue_head_init(&evtq);
535         __skb_queue_tail(&evtq, skb);
536         tipc_sk_rcv(net, &evtq);
537 }
538
539 static void tipc_send_to_sock(struct tipc_conn *con)
540 {
541         struct tipc_server *s = con->server;
542         struct outqueue_entry *e;
543         struct tipc_event *evt;
544         struct msghdr msg;
545         int count = 0;
546         int ret;
547
548         spin_lock_bh(&con->outqueue_lock);
549         while (test_bit(CF_CONNECTED, &con->flags)) {
550                 e = list_entry(con->outqueue.next, struct outqueue_entry, list);
551                 if ((struct list_head *) e == &con->outqueue)
552                         break;
553
554                 spin_unlock_bh(&con->outqueue_lock);
555
556                 if (con->sock) {
557                         memset(&msg, 0, sizeof(msg));
558                         msg.msg_flags = MSG_DONTWAIT;
559                         ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
560                                              e->iov.iov_len);
561                         if (ret == -EWOULDBLOCK || ret == 0) {
562                                 cond_resched();
563                                 goto out;
564                         } else if (ret < 0) {
565                                 goto send_err;
566                         }
567                 } else {
568                         evt = e->iov.iov_base;
569                         tipc_send_kern_top_evt(s->net, evt);
570                 }
571
572                 /* Don't starve users filling buffers */
573                 if (++count >= MAX_SEND_MSG_COUNT) {
574                         cond_resched();
575                         count = 0;
576                 }
577
578                 spin_lock_bh(&con->outqueue_lock);
579                 list_del(&e->list);
580                 tipc_free_entry(e);
581         }
582         spin_unlock_bh(&con->outqueue_lock);
583 out:
584         return;
585
586 send_err:
587         tipc_close_conn(con);
588 }
589
590 static void tipc_recv_work(struct work_struct *work)
591 {
592         struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
593         int count = 0;
594
595         while (test_bit(CF_CONNECTED, &con->flags)) {
596                 if (con->rx_action(con))
597                         break;
598
599                 /* Don't flood Rx machine */
600                 if (++count >= MAX_RECV_MSG_COUNT) {
601                         cond_resched();
602                         count = 0;
603                 }
604         }
605         conn_put(con);
606 }
607
608 static void tipc_send_work(struct work_struct *work)
609 {
610         struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
611
612         if (test_bit(CF_CONNECTED, &con->flags))
613                 tipc_send_to_sock(con);
614
615         conn_put(con);
616 }
617
618 static void tipc_work_stop(struct tipc_server *s)
619 {
620         destroy_workqueue(s->rcv_wq);
621         destroy_workqueue(s->send_wq);
622 }
623
624 static int tipc_work_start(struct tipc_server *s)
625 {
626         s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0);
627         if (!s->rcv_wq) {
628                 pr_err("can't start tipc receive workqueue\n");
629                 return -ENOMEM;
630         }
631
632         s->send_wq = alloc_ordered_workqueue("tipc_send", 0);
633         if (!s->send_wq) {
634                 pr_err("can't start tipc send workqueue\n");
635                 destroy_workqueue(s->rcv_wq);
636                 return -ENOMEM;
637         }
638
639         return 0;
640 }
641
642 int tipc_server_start(struct tipc_server *s)
643 {
644         int ret;
645
646         spin_lock_init(&s->idr_lock);
647         idr_init(&s->conn_idr);
648         s->idr_in_use = 0;
649
650         s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
651                                             0, SLAB_HWCACHE_ALIGN, NULL);
652         if (!s->rcvbuf_cache)
653                 return -ENOMEM;
654
655         ret = tipc_work_start(s);
656         if (ret < 0) {
657                 kmem_cache_destroy(s->rcvbuf_cache);
658                 return ret;
659         }
660         ret = tipc_open_listening_sock(s);
661         if (ret < 0) {
662                 tipc_work_stop(s);
663                 kmem_cache_destroy(s->rcvbuf_cache);
664                 return ret;
665         }
666         return ret;
667 }
668
669 void tipc_server_stop(struct tipc_server *s)
670 {
671         struct tipc_conn *con;
672         int id;
673
674         spin_lock_bh(&s->idr_lock);
675         for (id = 0; s->idr_in_use; id++) {
676                 con = idr_find(&s->conn_idr, id);
677                 if (con) {
678                         spin_unlock_bh(&s->idr_lock);
679                         tipc_close_conn(con);
680                         spin_lock_bh(&s->idr_lock);
681                 }
682         }
683         spin_unlock_bh(&s->idr_lock);
684
685         tipc_work_stop(s);
686         kmem_cache_destroy(s->rcvbuf_cache);
687         idr_destroy(&s->conn_idr);
688 }