Merge branch 'core-rcu-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6-block.git] / drivers / staging / lustre / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
24  * Developed under the sponsorship of the US Government under
25  * Subcontract No. B514193
26  *
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 /**
38  * This file implements POSIX lock type for Lustre.
39  * Its policy properties are start and end of extent and PID.
40  *
41  * These locks are only done through MDS due to POSIX semantics requiring
42  * e.g. that locks could be only partially released and as such split into
43  * two parts, and also that two adjacent locks from the same process may be
44  * merged into a single wider lock.
45  *
46  * Lock modes are mapped like this:
47  * PR and PW for READ and WRITE locks
48  * NL to request a releasing of a portion of the lock
49  *
50  * These flock locks never timeout.
51  */
52
53 #define DEBUG_SUBSYSTEM S_LDLM
54
55 #include "../include/lustre_dlm.h"
56 #include "../include/obd_support.h"
57 #include "../include/obd_class.h"
58 #include "../include/lustre_lib.h"
59 #include <linux/list.h>
60 #include "ldlm_internal.h"
61
62 /**
63  * list_for_remaining_safe - iterate over the remaining entries in a list
64  *            and safeguard against removal of a list entry.
65  * \param pos   the &struct list_head to use as a loop counter. pos MUST
66  *            have been initialized prior to using it in this macro.
67  * \param n     another &struct list_head to use as temporary storage
68  * \param head  the head for your list.
69  */
70 #define list_for_remaining_safe(pos, n, head) \
71         for (n = pos->next; pos != (head); pos = n, n = pos->next)
72
73 static inline int
74 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
75 {
76         return((new->l_policy_data.l_flock.owner ==
77                 lock->l_policy_data.l_flock.owner) &&
78                (new->l_export == lock->l_export));
79 }
80
81 static inline int
82 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
83 {
84         return((new->l_policy_data.l_flock.start <=
85                 lock->l_policy_data.l_flock.end) &&
86                (new->l_policy_data.l_flock.end >=
87                 lock->l_policy_data.l_flock.start));
88 }
89
90 static inline void
91 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
92 {
93         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
94                    mode, flags);
95
96         /* Safe to not lock here, since it should be empty anyway */
97         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
98
99         list_del_init(&lock->l_res_link);
100         if (flags == LDLM_FL_WAIT_NOREPROC && !ldlm_is_failed(lock)) {
101                 /* client side - set a flag to prevent sending a CANCEL */
102                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
103
104                 /* when reaching here, it is under lock_res_and_lock(). Thus,
105                  * need call the nolock version of ldlm_lock_decref_internal
106                  */
107                 ldlm_lock_decref_internal_nolock(lock, mode);
108         }
109
110         ldlm_lock_destroy_nolock(lock);
111 }
112
113 /**
114  * Process a granting attempt for flock lock.
115  * Must be called under ns lock held.
116  *
117  * This function looks for any conflicts for \a lock in the granted or
118  * waiting queues. The lock is granted if no conflicts are found in
119  * either queue.
120  *
121  * It is also responsible for splitting a lock if a portion of the lock
122  * is released.
123  *
124  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
125  *   - blocking ASTs have already been sent
126  *
127  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
128  *   - blocking ASTs have not been sent yet, so list of conflicting locks
129  *     would be collected and ASTs sent.
130  */
131 static int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
132                                    int first_enq, enum ldlm_error *err,
133                                    struct list_head *work_list)
134 {
135         struct ldlm_resource *res = req->l_resource;
136         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
137         struct list_head *tmp;
138         struct list_head *ownlocks = NULL;
139         struct ldlm_lock *lock = NULL;
140         struct ldlm_lock *new = req;
141         struct ldlm_lock *new2 = NULL;
142         enum ldlm_mode mode = req->l_req_mode;
143         int added = (mode == LCK_NL);
144         int overlaps = 0;
145         int splitted = 0;
146         const struct ldlm_callback_suite null_cbs = { NULL };
147
148         CDEBUG(D_DLMTRACE,
149                "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
150                *flags, new->l_policy_data.l_flock.owner,
151                new->l_policy_data.l_flock.pid, mode,
152                req->l_policy_data.l_flock.start,
153                req->l_policy_data.l_flock.end);
154
155         *err = ELDLM_OK;
156
157         /* No blocking ASTs are sent to the clients for
158          * Posix file & record locks
159          */
160         req->l_blocking_ast = NULL;
161
162 reprocess:
163         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
164                 /* This loop determines where this processes locks start
165                  * in the resource lr_granted list.
166                  */
167                 list_for_each(tmp, &res->lr_granted) {
168                         lock = list_entry(tmp, struct ldlm_lock,
169                                               l_res_link);
170                         if (ldlm_same_flock_owner(lock, req)) {
171                                 ownlocks = tmp;
172                                 break;
173                         }
174                 }
175         } else {
176                 int reprocess_failed = 0;
177
178                 lockmode_verify(mode);
179
180                 /* This loop determines if there are existing locks
181                  * that conflict with the new lock request.
182                  */
183                 list_for_each(tmp, &res->lr_granted) {
184                         lock = list_entry(tmp, struct ldlm_lock,
185                                               l_res_link);
186
187                         if (ldlm_same_flock_owner(lock, req)) {
188                                 if (!ownlocks)
189                                         ownlocks = tmp;
190                                 continue;
191                         }
192
193                         /* locks are compatible, overlap doesn't matter */
194                         if (lockmode_compat(lock->l_granted_mode, mode))
195                                 continue;
196
197                         if (!ldlm_flocks_overlap(lock, req))
198                                 continue;
199
200                         if (!first_enq) {
201                                 reprocess_failed = 1;
202                                 continue;
203                         }
204
205                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
206                                 ldlm_flock_destroy(req, mode, *flags);
207                                 *err = -EAGAIN;
208                                 return LDLM_ITER_STOP;
209                         }
210
211                         if (*flags & LDLM_FL_TEST_LOCK) {
212                                 ldlm_flock_destroy(req, mode, *flags);
213                                 req->l_req_mode = lock->l_granted_mode;
214                                 req->l_policy_data.l_flock.pid =
215                                         lock->l_policy_data.l_flock.pid;
216                                 req->l_policy_data.l_flock.start =
217                                         lock->l_policy_data.l_flock.start;
218                                 req->l_policy_data.l_flock.end =
219                                         lock->l_policy_data.l_flock.end;
220                                 *flags |= LDLM_FL_LOCK_CHANGED;
221                                 return LDLM_ITER_STOP;
222                         }
223
224                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
225                         *flags |= LDLM_FL_BLOCK_GRANTED;
226                         return LDLM_ITER_STOP;
227                 }
228                 if (reprocess_failed)
229                         return LDLM_ITER_CONTINUE;
230         }
231
232         if (*flags & LDLM_FL_TEST_LOCK) {
233                 ldlm_flock_destroy(req, mode, *flags);
234                 req->l_req_mode = LCK_NL;
235                 *flags |= LDLM_FL_LOCK_CHANGED;
236                 return LDLM_ITER_STOP;
237         }
238
239         /* Scan the locks owned by this process that overlap this request.
240          * We may have to merge or split existing locks.
241          */
242         if (!ownlocks)
243                 ownlocks = &res->lr_granted;
244
245         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
246                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
247
248                 if (!ldlm_same_flock_owner(lock, new))
249                         break;
250
251                 if (lock->l_granted_mode == mode) {
252                         /* If the modes are the same then we need to process
253                          * locks that overlap OR adjoin the new lock. The extra
254                          * logic condition is necessary to deal with arithmetic
255                          * overflow and underflow.
256                          */
257                         if ((new->l_policy_data.l_flock.start >
258                              (lock->l_policy_data.l_flock.end + 1)) &&
259                             (lock->l_policy_data.l_flock.end != OBD_OBJECT_EOF))
260                                 continue;
261
262                         if ((new->l_policy_data.l_flock.end <
263                              (lock->l_policy_data.l_flock.start - 1)) &&
264                             (lock->l_policy_data.l_flock.start != 0))
265                                 break;
266
267                         if (new->l_policy_data.l_flock.start <
268                             lock->l_policy_data.l_flock.start) {
269                                 lock->l_policy_data.l_flock.start =
270                                         new->l_policy_data.l_flock.start;
271                         } else {
272                                 new->l_policy_data.l_flock.start =
273                                         lock->l_policy_data.l_flock.start;
274                         }
275
276                         if (new->l_policy_data.l_flock.end >
277                             lock->l_policy_data.l_flock.end) {
278                                 lock->l_policy_data.l_flock.end =
279                                         new->l_policy_data.l_flock.end;
280                         } else {
281                                 new->l_policy_data.l_flock.end =
282                                         lock->l_policy_data.l_flock.end;
283                         }
284
285                         if (added) {
286                                 ldlm_flock_destroy(lock, mode, *flags);
287                         } else {
288                                 new = lock;
289                                 added = 1;
290                         }
291                         continue;
292                 }
293
294                 if (new->l_policy_data.l_flock.start >
295                     lock->l_policy_data.l_flock.end)
296                         continue;
297
298                 if (new->l_policy_data.l_flock.end <
299                     lock->l_policy_data.l_flock.start)
300                         break;
301
302                 ++overlaps;
303
304                 if (new->l_policy_data.l_flock.start <=
305                     lock->l_policy_data.l_flock.start) {
306                         if (new->l_policy_data.l_flock.end <
307                             lock->l_policy_data.l_flock.end) {
308                                 lock->l_policy_data.l_flock.start =
309                                         new->l_policy_data.l_flock.end + 1;
310                                 break;
311                         }
312                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
313                         continue;
314                 }
315                 if (new->l_policy_data.l_flock.end >=
316                     lock->l_policy_data.l_flock.end) {
317                         lock->l_policy_data.l_flock.end =
318                                 new->l_policy_data.l_flock.start - 1;
319                         continue;
320                 }
321
322                 /* split the existing lock into two locks */
323
324                 /* if this is an F_UNLCK operation then we could avoid
325                  * allocating a new lock and use the req lock passed in
326                  * with the request but this would complicate the reply
327                  * processing since updates to req get reflected in the
328                  * reply. The client side replays the lock request so
329                  * it must see the original lock data in the reply.
330                  */
331
332                 /* XXX - if ldlm_lock_new() can sleep we should
333                  * release the lr_lock, allocate the new lock,
334                  * and restart processing this lock.
335                  */
336                 if (!new2) {
337                         unlock_res_and_lock(req);
338                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
339                                                 lock->l_granted_mode, &null_cbs,
340                                                 NULL, 0, LVB_T_NONE);
341                         lock_res_and_lock(req);
342                         if (!new2) {
343                                 ldlm_flock_destroy(req, lock->l_granted_mode,
344                                                    *flags);
345                                 *err = -ENOLCK;
346                                 return LDLM_ITER_STOP;
347                         }
348                         goto reprocess;
349                 }
350
351                 splitted = 1;
352
353                 new2->l_granted_mode = lock->l_granted_mode;
354                 new2->l_policy_data.l_flock.pid =
355                         new->l_policy_data.l_flock.pid;
356                 new2->l_policy_data.l_flock.owner =
357                         new->l_policy_data.l_flock.owner;
358                 new2->l_policy_data.l_flock.start =
359                         lock->l_policy_data.l_flock.start;
360                 new2->l_policy_data.l_flock.end =
361                         new->l_policy_data.l_flock.start - 1;
362                 lock->l_policy_data.l_flock.start =
363                         new->l_policy_data.l_flock.end + 1;
364                 new2->l_conn_export = lock->l_conn_export;
365                 if (lock->l_export) {
366                         new2->l_export = class_export_lock_get(lock->l_export,
367                                                                new2);
368                         if (new2->l_export->exp_lock_hash &&
369                             hlist_unhashed(&new2->l_exp_hash))
370                                 cfs_hash_add(new2->l_export->exp_lock_hash,
371                                              &new2->l_remote_handle,
372                                              &new2->l_exp_hash);
373                 }
374                 if (*flags == LDLM_FL_WAIT_NOREPROC)
375                         ldlm_lock_addref_internal_nolock(new2,
376                                                          lock->l_granted_mode);
377
378                 /* insert new2 at lock */
379                 ldlm_resource_add_lock(res, ownlocks, new2);
380                 LDLM_LOCK_RELEASE(new2);
381                 break;
382         }
383
384         /* if new2 is created but never used, destroy it*/
385         if (splitted == 0 && new2)
386                 ldlm_lock_destroy_nolock(new2);
387
388         /* At this point we're granting the lock request. */
389         req->l_granted_mode = req->l_req_mode;
390
391         if (!added) {
392                 list_del_init(&req->l_res_link);
393                 /* insert new lock before ownlocks in list. */
394                 ldlm_resource_add_lock(res, ownlocks, req);
395         }
396
397         if (*flags != LDLM_FL_WAIT_NOREPROC) {
398                 /* The only one possible case for client-side calls flock
399                  * policy function is ldlm_flock_completion_ast inside which
400                  * carries LDLM_FL_WAIT_NOREPROC flag.
401                  */
402                 CERROR("Illegal parameter for client-side-only module.\n");
403                 LBUG();
404         }
405
406         /* In case we're reprocessing the requested lock we can't destroy
407          * it until after calling ldlm_add_ast_work_item() above so that laawi()
408          * can bump the reference count on \a req. Otherwise \a req
409          * could be freed before the completion AST can be sent.
410          */
411         if (added)
412                 ldlm_flock_destroy(req, mode, *flags);
413
414         ldlm_resource_dump(D_INFO, res);
415         return LDLM_ITER_CONTINUE;
416 }
417
418 struct ldlm_flock_wait_data {
419         struct ldlm_lock *fwd_lock;
420         int            fwd_generation;
421 };
422
423 static void
424 ldlm_flock_interrupted_wait(void *data)
425 {
426         struct ldlm_lock *lock;
427
428         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
429
430         lock_res_and_lock(lock);
431
432         /* client side - set flag to prevent lock from being put on LRU list */
433         ldlm_set_cbpending(lock);
434         unlock_res_and_lock(lock);
435 }
436
437 /**
438  * Flock completion callback function.
439  *
440  * \param lock [in,out]: A lock to be handled
441  * \param flags    [in]: flags
442  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
443  *
444  * \retval 0    : success
445  * \retval <0   : failure
446  */
447 int
448 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
449 {
450         struct file_lock                *getlk = lock->l_ast_data;
451         struct obd_device             *obd;
452         struct obd_import             *imp = NULL;
453         struct ldlm_flock_wait_data     fwd;
454         struct l_wait_info            lwi;
455         enum ldlm_error             err;
456         int                          rc = 0;
457
458         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
459                flags, data, getlk);
460
461         /* Import invalidation. We need to actually release the lock
462          * references being held, so that it can go away. No point in
463          * holding the lock even if app still believes it has it, since
464          * server already dropped it anyway. Only for granted locks too.
465          */
466         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
467             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
468                 if (lock->l_req_mode == lock->l_granted_mode &&
469                     lock->l_granted_mode != LCK_NL && !data)
470                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
471
472                 /* Need to wake up the waiter if we were evicted */
473                 wake_up(&lock->l_waitq);
474                 return 0;
475         }
476
477         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
478
479         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
480                        LDLM_FL_BLOCK_CONV))) {
481                 if (!data)
482                         /* mds granted the lock in the reply */
483                         goto granted;
484                 /* CP AST RPC: lock get granted, wake it up */
485                 wake_up(&lock->l_waitq);
486                 return 0;
487         }
488
489         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, sleeping");
490         fwd.fwd_lock = lock;
491         obd = class_exp2obd(lock->l_conn_export);
492
493         /* if this is a local lock, there is no import */
494         if (obd)
495                 imp = obd->u.cli.cl_import;
496
497         if (imp) {
498                 spin_lock(&imp->imp_lock);
499                 fwd.fwd_generation = imp->imp_generation;
500                 spin_unlock(&imp->imp_lock);
501         }
502
503         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
504
505         /* Go to sleep until the lock is granted. */
506         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
507
508         if (rc) {
509                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
510                            rc);
511                 return rc;
512         }
513
514 granted:
515         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
516
517         if (ldlm_is_failed(lock)) {
518                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
519                 return -EIO;
520         }
521
522         LDLM_DEBUG(lock, "client-side enqueue granted");
523
524         lock_res_and_lock(lock);
525
526         /*
527          * Protect against race where lock could have been just destroyed
528          * due to overlap in ldlm_process_flock_lock().
529          */
530         if (ldlm_is_destroyed(lock)) {
531                 unlock_res_and_lock(lock);
532                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
533                 return 0;
534         }
535
536         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
537         list_del_init(&lock->l_res_link);
538
539         if (ldlm_is_flock_deadlock(lock)) {
540                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
541                 rc = -EDEADLK;
542         } else if (flags & LDLM_FL_TEST_LOCK) {
543                 /* fcntl(F_GETLK) request */
544                 /* The old mode was saved in getlk->fl_type so that if the mode
545                  * in the lock changes we can decref the appropriate refcount.
546                  */
547                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
548                 switch (lock->l_granted_mode) {
549                 case LCK_PR:
550                         getlk->fl_type = F_RDLCK;
551                         break;
552                 case LCK_PW:
553                         getlk->fl_type = F_WRLCK;
554                         break;
555                 default:
556                         getlk->fl_type = F_UNLCK;
557                 }
558                 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
559                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
560                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
561         } else {
562                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
563
564                 /* We need to reprocess the lock to do merges or splits
565                  * with existing locks owned by this process.
566                  */
567                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
568         }
569         unlock_res_and_lock(lock);
570         return rc;
571 }
572 EXPORT_SYMBOL(ldlm_flock_completion_ast);
573
574 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
575                                        ldlm_policy_data_t *lpolicy)
576 {
577         memset(lpolicy, 0, sizeof(*lpolicy));
578         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
579         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
580         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
581         /* Compat code, old clients had no idea about owner field and
582          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
583          * April 2011
584          */
585         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
586 }
587
588 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
589                                        ldlm_policy_data_t *lpolicy)
590 {
591         memset(lpolicy, 0, sizeof(*lpolicy));
592         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
593         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
594         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
595         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
596 }
597
598 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
599                                      ldlm_wire_policy_data_t *wpolicy)
600 {
601         memset(wpolicy, 0, sizeof(*wpolicy));
602         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
603         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
604         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
605         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
606 }