summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDarrick J. Wong <djwong@kernel.org>2021-06-08 09:39:47 -0700
committerDarrick J. Wong <djwong@kernel.org>2021-06-08 09:39:47 -0700
commit7e4311b04be46b71a2008d6922da60d08d05b8bb (patch)
treea948d2be4909ad6321980b51dddb1d382fb14805
parent68b2c8bcdb813cd7e520e8cf54912a3280deb74d (diff)
parent7017b129e69c1b451fa926f2cac507c4128608dc (diff)
Merge tag 'xfs-cil-scale-2-tag' of git://git.kernel.org/pub/scm/linux/kernel/git/dgc/linux-xfs into xfs-5.14-merge2-cilxfs-5.14-merge-2
xfs: CIL and log scalability improvements Performance improvements are largely documented in the change logs of the individual patches. Headline numbers are an increase in transaction rate from 700k commits/s to 1.7M commits/s, and a reduction in fua/flush operations by 2-3 orders of magnitude on metadata heavy workloads that don't use fsync. Summary of series: Patches Modifications ------- ------------- 1-7: log write FUA/FLUSH optimisations 8: bug fix 9-11: Async CIL pushes 12-25: xlog_write() rework 26-39: CIL commit scalability The log write FUA/FLUSH optimisations reduce the number of cache flushes required to flush the CIL to the journal. It extends the old pre-delayed logging ordering semantics required by writing individual transactions to the iclogs out to cover then CIL checkpoint transactions rather than individual writes to the iclogs. In doing so, we reduce the cache flush requirements to once per CIL checkpoint rather than once per iclog write. The async CIL pushes fix a pipeline limitation that only allowed a single CIL push to be processed at a time. This was causing CIL checkpoint writing to become CPU bound as only a single CIL checkpoint could be pushed at a time. The checkpoint pipleine was designed to allow multiple pushes to be in flight at once and use careful ordering of the commit records to ensure correct recovery order, but the workqueue implementation didn't allow concurrent works to be run. The concurrent works now extend out to 4 CIL checkpoints running at a time, hence removing the CPU usage limiations without introducing new lock contention issues. The xlog_write() rework is long overdue. The code is complex, difficult to understand, full of tricky, subtle corner cases and just generally really hard to modify. This patchset reworks the xlog_write() API to reduce the processing overhead of writing out long log vector chains, and factors the xlog_write() code into a simple, compact fast path along with a clearer slow path to handle the complex cases. The CIL commit scalability patchset removes spinlocks from the transaction commit fast path. These spinlocks are the performance limiting bottleneck in the transaction commit path, so we apply a variety of different techniques to do either atomic. lockless or per-cpu updates of the CIL tracking structures during commits. This greatly increases the throughput of the the transaction commit engine, moving the contention point to the log space tracking algorithms after doubling throughput on 32-way workloads. * tag 'xfs-cil-scale-2-tag' of git://git.kernel.org/pub/scm/linux/kernel/git/dgc/linux-xfs: (40 commits) xfs: expanding delayed logging design with background material xfs: xlog_sync() manually adjusts grant head space xfs: avoid cil push lock if possible xfs: move CIL ordering to the logvec chain xfs: convert log vector chain to use list heads xfs: convert CIL to unordered per cpu lists xfs: Add order IDs to log items in CIL xfs: convert CIL busy extents to per-cpu xfs: track CIL ticket reservation in percpu structure xfs: implement percpu cil space used calculation xfs: introduce per-cpu CIL tracking structure xfs: introduce CPU hotplug infrastructure xfs: rework per-iclog header CIL reservation xfs: lift init CIL reservation out of xc_cil_lock xfs: use the CIL space used counter for emptiness checks xfs: CIL context doesn't need to count iovecs xfs: xlog_write() doesn't need optype anymore xfs: xlog_write() no longer needs contwr state xfs:_introduce xlog_write_partial() xfs: introduce xlog_write_single() ...
-rw-r--r--Documentation/filesystems/xfs-delayed-logging-design.rst361
-rw-r--r--fs/xfs/libxfs/xfs_log_format.h4
-rw-r--r--fs/xfs/libxfs/xfs_types.h1
-rw-r--r--fs/xfs/xfs_bio_io.c35
-rw-r--r--fs/xfs/xfs_buf.c2
-rw-r--r--fs/xfs/xfs_buf_item.c39
-rw-r--r--fs/xfs/xfs_dquot_item.c2
-rw-r--r--fs/xfs/xfs_file.c20
-rw-r--r--fs/xfs/xfs_inode.c10
-rw-r--r--fs/xfs/xfs_inode_item.c18
-rw-r--r--fs/xfs/xfs_inode_item.h2
-rw-r--r--fs/xfs/xfs_linux.h2
-rw-r--r--fs/xfs/xfs_log.c1015
-rw-r--r--fs/xfs/xfs_log.h66
-rw-r--r--fs/xfs/xfs_log_cil.c804
-rw-r--r--fs/xfs/xfs_log_priv.h123
-rw-r--r--fs/xfs/xfs_super.c52
-rw-r--r--fs/xfs/xfs_super.h1
-rw-r--r--fs/xfs/xfs_sysfs.c1
-rw-r--r--fs/xfs/xfs_trace.c1
-rw-r--r--fs/xfs/xfs_trans.c18
-rw-r--r--fs/xfs/xfs_trans.h5
-rw-r--r--fs/xfs/xfs_trans_ail.c11
-rw-r--r--fs/xfs/xfs_trans_priv.h3
-rw-r--r--include/linux/cpuhotplug.h1
25 files changed, 1632 insertions, 965 deletions
diff --git a/Documentation/filesystems/xfs-delayed-logging-design.rst b/Documentation/filesystems/xfs-delayed-logging-design.rst
index 464405d2801e..4ef419f54663 100644
--- a/Documentation/filesystems/xfs-delayed-logging-design.rst
+++ b/Documentation/filesystems/xfs-delayed-logging-design.rst
@@ -1,29 +1,314 @@
.. SPDX-License-Identifier: GPL-2.0
-==========================
-XFS Delayed Logging Design
-==========================
-
-Introduction to Re-logging in XFS
-=================================
-
-XFS logging is a combination of logical and physical logging. Some objects,
-such as inodes and dquots, are logged in logical format where the details
-logged are made up of the changes to in-core structures rather than on-disk
-structures. Other objects - typically buffers - have their physical changes
-logged. The reason for these differences is to reduce the amount of log space
-required for objects that are frequently logged. Some parts of inodes are more
-frequently logged than others, and inodes are typically more frequently logged
-than any other object (except maybe the superblock buffer) so keeping the
-amount of metadata logged low is of prime importance.
-
-The reason that this is such a concern is that XFS allows multiple separate
-modifications to a single object to be carried in the log at any given time.
-This allows the log to avoid needing to flush each change to disk before
-recording a new change to the object. XFS does this via a method called
-"re-logging". Conceptually, this is quite simple - all it requires is that any
-new change to the object is recorded with a *new copy* of all the existing
-changes in the new transaction that is written to the log.
+==================
+XFS Logging Design
+==================
+
+Preamble
+========
+
+This document describes the design and algorithms that the XFS journalling
+subsystem is based on. This document describes the design and algorithms that
+the XFS journalling subsystem is based on so that readers may familiarize
+themselves with the general concepts of how transaction processing in XFS works.
+
+We begin with an overview of transactions in XFS, followed by describing how
+transaction reservations are structured and accounted, and then move into how we
+guarantee forwards progress for long running transactions with finite initial
+reservations bounds. At this point we need to explain how relogging works. With
+the basic concepts covered, the design of the delayed logging mechanism is
+documented.
+
+
+Introduction
+============
+
+XFS uses Write Ahead Logging for ensuring changes to the filesystem metadata
+are atomic and recoverable. For reasons of space and time efficiency, the
+logging mechanisms are varied and complex, combining intents, logical and
+physical logging mechanisms to provide the necessary recovery guarantees the
+filesystem requires.
+
+Some objects, such as inodes and dquots, are logged in logical format where the
+details logged are made up of the changes to in-core structures rather than
+on-disk structures. Other objects - typically buffers - have their physical
+changes logged. Long running atomic modifications have individual changes
+chained together by intents, ensuring that journal recovery can restart and
+finish an operation that was only partially done when the system stopped
+functioning.
+
+The reason for these differences is to keep the amount of log space and CPU time
+required to process objects being modified as small as possible and hence the
+logging overhead as low as possible. Some items are very frequently modified,
+and some parts of objects are more frequently modified than others, so keeping
+the overhead of metadata logging low is of prime importance.
+
+The method used to log an item or chain modifications together isn't
+particularly important in the scope of this document. It suffices to know that
+the method used for logging a particular object or chaining modifications
+together are different and are dependent on the object and/or modification being
+performed. The logging subsystem only cares that certain specific rules are
+followed to guarantee forwards progress and prevent deadlocks.
+
+
+Transactions in XFS
+===================
+
+XFS has two types of high level transactions, defined by the type of log space
+reservation they take. These are known as "one shot" and "permanent"
+transactions. Permanent transaction reservations can take reservations that span
+commit boundaries, whilst "one shot" transactions are for a single atomic
+modification.
+
+The type and size of reservation must be matched to the modification taking
+place. This means that permanent transactions can be used for one-shot
+modifications, but one-shot reservations cannot be used for permanent
+transactions.
+
+In the code, a one-shot transaction pattern looks somewhat like this::
+
+ tp = xfs_trans_alloc(<reservation>)
+ <lock items>
+ <join item to transaction>
+ <do modification>
+ xfs_trans_commit(tp);
+
+As items are modified in the transaction, the dirty regions in those items are
+tracked via the transaction handle. Once the transaction is committed, all
+resources joined to it are released, along with the remaining unused reservation
+space that was taken at the transaction allocation time.
+
+In contrast, a permanent transaction is made up of multiple linked individual
+transactions, and the pattern looks like this::
+
+ tp = xfs_trans_alloc(<reservation>)
+ xfs_ilock(ip, XFS_ILOCK_EXCL)
+
+ loop {
+ xfs_trans_ijoin(tp, 0);
+ <do modification>
+ xfs_trans_log_inode(tp, ip);
+ xfs_trans_roll(&tp);
+ }
+
+ xfs_trans_commit(tp);
+ xfs_iunlock(ip, XFS_ILOCK_EXCL);
+
+While this might look similar to a one-shot transaction, there is an important
+difference: xfs_trans_roll() performs a specific operation that links two
+transactions together::
+
+ ntp = xfs_trans_dup(tp);
+ xfs_trans_commit(tp);
+ xfs_log_reserve(ntp);
+
+This results in a series of "rolling transactions" where the inode is locked
+across the entire chain of transactions. Hence while this series of rolling
+transactions is running, nothing else can read from or write to the inode and
+this provides a mechanism for complex changes to appear atomic from an external
+observer's point of view.
+
+It is important to note that a series of rolling transactions in a permanent
+transaction does not form an atomic change in the journal. While each
+individual modification is atomic, the chain is *not atomic*. If we crash half
+way through, then recovery will only replay up to the last transactional
+modification the loop made that was committed to the journal.
+
+This affects long running permanent transactions in that it is not possible to
+predict how much of a long running operation will actually be recovered because
+there is no guarantee of how much of the operation reached stale storage. Hence
+if a long running operation requires multiple transactions to fully complete,
+the high level operation must use intents and deferred operations to guarantee
+recovery can complete the operation once the first transactions is persisted in
+the on-disk journal.
+
+
+Transactions are Asynchronous
+=============================
+
+In XFS, all high level transactions are asynchronous by default. This means that
+xfs_trans_commit() does not guarantee that the modification has been committed
+to stable storage when it returns. Hence when a system crashes, not all the
+completed transactions will be replayed during recovery.
+
+However, the logging subsystem does provide global ordering guarantees, such
+that if a specific change is seen after recovery, all metadata modifications
+that were committed prior to that change will also be seen.
+
+For single shot operations that need to reach stable storage immediately, or
+ensuring that a long running permanent transaction is fully committed once it is
+complete, we can explicitly tag a transaction as synchronous. This will trigger
+a "log force" to flush the outstanding committed transactions to stable storage
+in the journal and wait for that to complete.
+
+Synchronous transactions are rarely used, however, because they limit logging
+throughput to the IO latency limitations of the underlying storage. Instead, we
+tend to use log forces to ensure modifications are on stable storage only when
+a user operation requires a synchronisation point to occur (e.g. fsync).
+
+
+Transaction Reservations
+========================
+
+It has been mentioned a number of times now that the logging subsystem needs to
+provide a forwards progress guarantee so that no modification ever stalls
+because it can't be written to the journal due to a lack of space in the
+journal. This is achieved by the transaction reservations that are made when
+a transaction is first allocated. For permanent transactions, these reservations
+are maintained as part of the transaction rolling mechanism.
+
+A transaction reservation provides a guarantee that there is physical log space
+available to write the modification into the journal before we start making
+modifications to objects and items. As such, the reservation needs to be large
+enough to take into account the amount of metadata that the change might need to
+log in the worst case. This means that if we are modifying a btree in the
+transaction, we have to reserve enough space to record a full leaf-to-root split
+of the btree. As such, the reservations are quite complex because we have to
+take into account all the hidden changes that might occur.
+
+For example, a user data extent allocation involves allocating an extent from
+free space, which modifies the free space trees. That's two btrees. Inserting
+the extent into the inode's extent map might require a split of the extent map
+btree, which requires another allocation that can modify the free space trees
+again. Then we might have to update reverse mappings, which modifies yet
+another btree which might require more space. And so on. Hence the amount of
+metadata that a "simple" operation can modify can be quite large.
+
+This "worst case" calculation provides us with the static "unit reservation"
+for the transaction that is calculated at mount time. We must guarantee that the
+log has this much space available before the transaction is allowed to proceed
+so that when we come to write the dirty metadata into the log we don't run out
+of log space half way through the write.
+
+For one-shot transactions, a single unit space reservation is all that is
+required for the transaction to proceed. For permanent transactions, however, we
+also have a "log count" that affects the size of the reservation that is to be
+made.
+
+While a permanent transaction can get by with a single unit of space
+reservation, it is somewhat inefficient to do this as it requires the
+transaction rolling mechanism to re-reserve space on every transaction roll. We
+know from the implementation of the permanent transactions how many transaction
+rolls are likely for the common modifications that need to be made.
+
+For example, and inode allocation is typically two transactions - one to
+physically allocate a free inode chunk on disk, and another to allocate an inode
+from an inode chunk that has free inodes in it. Hence for an inode allocation
+transaction, we might set the reservation log count to a value of 2 to indicate
+that the common/fast path transaction will commit two linked transactions in a
+chain. Each time a permanent transaction rolls, it consumes an entire unit
+reservation.
+
+Hence when the permanent transaction is first allocated, the log space
+reservation is increases from a single unit reservation to multiple unit
+reservations. That multiple is defined by the reservation log count, and this
+means we can roll the transaction multiple times before we have to re-reserve
+log space when we roll the transaction. This ensures that the common
+modifications we make only need to reserve log space once.
+
+If the log count for a permanent transaction reaches zero, then it needs to
+re-reserve physical space in the log. This is somewhat complex, and requires
+an understanding of how the log accounts for space that has been reserved.
+
+
+Log Space Accounting
+====================
+
+The position in the log is typically referred to as a Log Sequence Number (LSN).
+The log is circular, so the positions in the log are defined by the combination
+of a cycle number - the number of times the log has been overwritten - and the
+offset into the log. A LSN carries the cycle in the upper 32 bits and the
+offset in the lower 32 bits. The offset is in units of "basic blocks" (512
+bytes). Hence we can do realtively simple LSN based math to keep track of
+available space in the log.
+
+Log space accounting is done via a pair of constructs called "grant heads". The
+position of the grant heads is an absolute value, so the amount of space
+available in the log is defined by the distance between the position of the
+grant head and the current log tail. That is, how much space can be
+reserved/consumed before the grant heads would fully wrap the log and overtake
+the tail position.
+
+The first grant head is the "reserve" head. This tracks the byte count of the
+reservations currently held by active transactions. It is a purely in-memory
+accounting of the space reservation and, as such, actually tracks byte offsets
+into the log rather than basic blocks. Hence it technically isn't using LSNs to
+represent the log position, but it is still treated like a split {cycle,offset}
+tuple for the purposes of tracking reservation space.
+
+The reserve grant head is used to accurately account for exact transaction
+reservations amounts and the exact byte count that modifications actually make
+and need to write into the log. The reserve head is used to prevent new
+transactions from taking new reservations when the head reaches the current
+tail. It will block new reservations in a FIFO queue and as the log tail moves
+forward it will wake them in order once sufficient space is available. This FIFO
+mechanism ensures no transaction is starved of resources when log space
+shortages occur.
+
+The other grant head is the "write" head. Unlike the reserve head, this grant
+head contains an LSN and it tracks the physical space usage in the log. While
+this might sound like it is accounting the same state as the reserve grant head
+- and it mostly does track exactly the same location as the reserve grant head -
+there are critical differences in behaviour between them that provides the
+forwards progress guarantees that rolling permanent transactions require.
+
+These differences when a permanent transaction is rolled and the internal "log
+count" reaches zero and the initial set of unit reservations have been
+exhausted. At this point, we still require a log space reservation to continue
+the next transaction in the sequeunce, but we have none remaining. We cannot
+sleep during the transaction commit process waiting for new log space to become
+available, as we may end up on the end of the FIFO queue and the items we have
+locked while we sleep could end up pinning the tail of the log before there is
+enough free space in the log to fulfil all of the pending reservations and
+then wake up transaction commit in progress.
+
+To take a new reservation without sleeping requires us to be able to take a
+reservation even if there is no reservation space currently available. That is,
+we need to be able to *overcommit* the log reservation space. As has already
+been detailed, we cannot overcommit physical log space. However, the reserve
+grant head does not track physical space - it only accounts for the amount of
+reservations we currently have outstanding. Hence if the reserve head passes
+over the tail of the log all it means is that new reservations will be throttled
+immediately and remain throttled until the log tail is moved forward far enough
+to remove the overcommit and start taking new reservations. In other words, we
+can overcommit the reserve head without violating the physical log head and tail
+rules.
+
+As a result, permanent transactions only "regrant" reservation space during
+xfs_trans_commit() calls, while the physical log space reservation - tracked by
+the write head - is then reserved separately by a call to xfs_log_reserve()
+after the commit completes. Once the commit completes, we can sleep waiting for
+physical log space to be reserved from the write grant head, but only if one
+critical rule has been observed::
+
+ Code using permanent reservations must always log the items they hold
+ locked across each transaction they roll in the chain.
+
+"Re-logging" the locked items on every transaction roll ensures that the items
+attached to the transaction chain being rolled are always relocated to the
+physical head of the log and so do not pin the tail of the log. If a locked item
+pins the tail of the log when we sleep on the write reservation, then we will
+deadlock the log as we cannot take the locks needed to write back that item and
+move the tail of the log forwards to free up write grant space. Re-logging the
+locked items avoids this deadlock and guarantees that the log reservation we are
+making cannot self-deadlock.
+
+If all rolling transactions obey this rule, then they can all make forwards
+progress independently because nothing will block the progress of the log
+tail moving forwards and hence ensuring that write grant space is always
+(eventually) made available to permanent transactions no matter how many times
+they roll.
+
+
+Re-logging Explained
+====================
+
+XFS allows multiple separate modifications to a single object to be carried in
+the log at any given time. This allows the log to avoid needing to flush each
+change to disk before recording a new change to the object. XFS does this via a
+method called "re-logging". Conceptually, this is quite simple - all it requires
+is that any new change to the object is recorded with a *new copy* of all the
+existing changes in the new transaction that is written to the log.
That is, if we have a sequence of changes A through to F, and the object was
written to disk after change D, we would see in the log the following series
@@ -42,16 +327,13 @@ transaction::
In other words, each time an object is relogged, the new transaction contains
the aggregation of all the previous changes currently held only in the log.
-This relogging technique also allows objects to be moved forward in the log so
-that an object being relogged does not prevent the tail of the log from ever
-moving forward. This can be seen in the table above by the changing
-(increasing) LSN of each subsequent transaction - the LSN is effectively a
-direct encoding of the location in the log of the transaction.
+This relogging technique allows objects to be moved forward in the log so that
+an object being relogged does not prevent the tail of the log from ever moving
+forward. This can be seen in the table above by the changing (increasing) LSN
+of each subsequent transaction, and it's the technique that allows us to
+implement long-running, multiple-commit permanent transactions.
-This relogging is also used to implement long-running, multiple-commit
-transactions. These transaction are known as rolling transactions, and require
-a special log reservation known as a permanent transaction reservation. A
-typical example of a rolling transaction is the removal of extents from an
+A typical example of a rolling transaction is the removal of extents from an
inode which can only be done at a rate of two extents per transaction because
of reservation size limitations. Hence a rolling extent removal transaction
keeps relogging the inode and btree buffers as they get modified in each
@@ -67,12 +349,13 @@ the log over and over again. Worse is the fact that objects tend to get
dirtier as they get relogged, so each subsequent transaction is writing more
metadata into the log.
-Another feature of the XFS transaction subsystem is that most transactions are
-asynchronous. That is, they don't commit to disk until either a log buffer is
-filled (a log buffer can hold multiple transactions) or a synchronous operation
-forces the log buffers holding the transactions to disk. This means that XFS is
-doing aggregation of transactions in memory - batching them, if you like - to
-minimise the impact of the log IO on transaction throughput.
+It should now also be obvious how relogging and asynchronous transactions go
+hand in hand. That is, transactions don't get written to the physical journal
+until either a log buffer is filled (a log buffer can hold multiple
+transactions) or a synchronous operation forces the log buffers holding the
+transactions to disk. This means that XFS is doing aggregation of transactions
+in memory - batching them, if you like - to minimise the impact of the log IO on
+transaction throughput.
The limitation on asynchronous transaction throughput is the number and size of
log buffers made available by the log manager. By default there are 8 log
diff --git a/fs/xfs/libxfs/xfs_log_format.h b/fs/xfs/libxfs/xfs_log_format.h
index 3e15ea29fb8d..78d5368a7caa 100644
--- a/fs/xfs/libxfs/xfs_log_format.h
+++ b/fs/xfs/libxfs/xfs_log_format.h
@@ -34,9 +34,6 @@ typedef uint32_t xlog_tid_t;
#define XLOG_MIN_RECORD_BSHIFT 14 /* 16384 == 1 << 14 */
#define XLOG_BIG_RECORD_BSHIFT 15 /* 32k == 1 << 15 */
#define XLOG_MAX_RECORD_BSHIFT 18 /* 256k == 1 << 18 */
-#define XLOG_BTOLSUNIT(log, b) (((b)+(log)->l_mp->m_sb.sb_logsunit-1) / \
- (log)->l_mp->m_sb.sb_logsunit)
-#define XLOG_LSUNITTOB(log, su) ((su) * (log)->l_mp->m_sb.sb_logsunit)
#define XLOG_HEADER_SIZE 512
@@ -72,7 +69,6 @@ static inline uint xlog_get_cycle(char *ptr)
/* Log Clients */
#define XFS_TRANSACTION 0x69
-#define XFS_VOLUME 0x2
#define XFS_LOG 0xaa
#define XLOG_UNMOUNT_TYPE 0x556e /* Un for Unmount */
diff --git a/fs/xfs/libxfs/xfs_types.h b/fs/xfs/libxfs/xfs_types.h
index 064bd6e8c922..0870ef6f933d 100644
--- a/fs/xfs/libxfs/xfs_types.h
+++ b/fs/xfs/libxfs/xfs_types.h
@@ -21,6 +21,7 @@ typedef int32_t xfs_suminfo_t; /* type of bitmap summary info */
typedef uint32_t xfs_rtword_t; /* word type for bitmap manipulations */
typedef int64_t xfs_lsn_t; /* log sequence number */
+typedef int64_t xfs_csn_t; /* CIL sequence number */
typedef uint32_t xfs_dablk_t; /* dir/attr block number (in file) */
typedef uint32_t xfs_dahash_t; /* dir/attr hash value */
diff --git a/fs/xfs/xfs_bio_io.c b/fs/xfs/xfs_bio_io.c
index 17f36db2f792..667e297f59b1 100644
--- a/fs/xfs/xfs_bio_io.c
+++ b/fs/xfs/xfs_bio_io.c
@@ -9,6 +9,41 @@ static inline unsigned int bio_max_vecs(unsigned int count)
return bio_max_segs(howmany(count, PAGE_SIZE));
}
+static void
+xfs_flush_bdev_async_endio(
+ struct bio *bio)
+{
+ complete(bio->bi_private);
+}
+
+/*
+ * Submit a request for an async cache flush to run. If the request queue does
+ * not require flush operations, just skip it altogether. If the caller needs
+ * to wait for the flush completion at a later point in time, they must supply a
+ * valid completion. This will be signalled when the flush completes. The
+ * caller never sees the bio that is issued here.
+ */
+void
+xfs_flush_bdev_async(
+ struct bio *bio,
+ struct block_device *bdev,
+ struct completion *done)
+{
+ struct request_queue *q = bdev->bd_disk->queue;
+
+ if (!test_bit(QUEUE_FLAG_WC, &q->queue_flags)) {
+ complete(done);
+ return;
+ }
+
+ bio_init(bio, NULL, 0);
+ bio_set_dev(bio, bdev);
+ bio->bi_opf = REQ_OP_WRITE | REQ_PREFLUSH | REQ_SYNC;
+ bio->bi_private = done;
+ bio->bi_end_io = xfs_flush_bdev_async_endio;
+
+ submit_bio(bio);
+}
int
xfs_rw_bdev(
struct block_device *bdev,
diff --git a/fs/xfs/xfs_buf.c b/fs/xfs/xfs_buf.c
index b4ee9d3532f0..1914103beeb2 100644
--- a/fs/xfs/xfs_buf.c
+++ b/fs/xfs/xfs_buf.c
@@ -1889,7 +1889,7 @@ xfs_free_buftarg(
percpu_counter_destroy(&btp->bt_io_count);
list_lru_destroy(&btp->bt_lru);
- xfs_blkdev_issue_flush(btp);
+ blkdev_issue_flush(btp->bt_bdev);
kmem_free(btp);
}
diff --git a/fs/xfs/xfs_buf_item.c b/fs/xfs/xfs_buf_item.c
index fb69879e4b2b..1cb087b320b1 100644
--- a/fs/xfs/xfs_buf_item.c
+++ b/fs/xfs/xfs_buf_item.c
@@ -74,14 +74,12 @@ xfs_buf_item_straddle(
}
/*
- * This returns the number of log iovecs needed to log the
- * given buf log item.
+ * Return the number of log iovecs and space needed to log the given buf log
+ * item segment.
*
- * It calculates this as 1 iovec for the buf log format structure
- * and 1 for each stretch of non-contiguous chunks to be logged.
- * Contiguous chunks are logged in a single iovec.
- *
- * If the XFS_BLI_STALE flag has been set, then log nothing.
+ * It calculates this as 1 iovec for the buf log format structure and 1 for each
+ * stretch of non-contiguous chunks to be logged. Contiguous chunks are logged
+ * in a single iovec.
*/
STATIC void
xfs_buf_item_size_segment(
@@ -168,11 +166,8 @@ slow_scan:
}
/*
- * This returns the number of log iovecs needed to log the given buf log item.
- *
- * It calculates this as 1 iovec for the buf log format structure and 1 for each
- * stretch of non-contiguous chunks to be logged. Contiguous chunks are logged
- * in a single iovec.
+ * Return the number of log iovecs and space needed to log the given buf log
+ * item.
*
* Discontiguous buffers need a format structure per region that is being
* logged. This makes the changes in the buffer appear to log recovery as though
@@ -182,7 +177,11 @@ slow_scan:
* what ends up on disk.
*
* If the XFS_BLI_STALE flag has been set, then log nothing but the buf log
- * format structures.
+ * format structures. If the item has previously been logged and has dirty
+ * regions, we do not relog them in stale buffers. This has the effect of
+ * reducing the size of the relogged item by the amount of dirty data tracked
+ * by the log item. This can result in the committing transaction reducing the
+ * amount of space being consumed by the CIL.
*/
STATIC void
xfs_buf_item_size(
@@ -199,9 +198,9 @@ xfs_buf_item_size(
ASSERT(atomic_read(&bip->bli_refcount) > 0);
if (bip->bli_flags & XFS_BLI_STALE) {
/*
- * The buffer is stale, so all we need to log
- * is the buf log format structure with the
- * cancel flag in it.
+ * The buffer is stale, so all we need to log is the buf log
+ * format structure with the cancel flag in it as we are never
+ * going to replay the changes tracked in the log item.
*/
trace_xfs_buf_item_size_stale(bip);
ASSERT(bip->__bli_format.blf_flags & XFS_BLF_CANCEL);
@@ -216,9 +215,9 @@ xfs_buf_item_size(
if (bip->bli_flags & XFS_BLI_ORDERED) {
/*
- * The buffer has been logged just to order it.
- * It is not being included in the transaction
- * commit, so no vectors are used at all.
+ * The buffer has been logged just to order it. It is not being
+ * included in the transaction commit, so no vectors are used at
+ * all.
*/
trace_xfs_buf_item_size_ordered(bip);
*nvecs = XFS_LOG_VEC_ORDERED;
@@ -714,7 +713,7 @@ xfs_buf_item_release(
STATIC void
xfs_buf_item_committing(
struct xfs_log_item *lip,
- xfs_lsn_t commit_lsn)
+ xfs_csn_t seq)
{
return xfs_buf_item_release(lip);
}
diff --git a/fs/xfs/xfs_dquot_item.c b/fs/xfs/xfs_dquot_item.c
index 8c1fdf37ee8f..8ed47b739b6c 100644
--- a/fs/xfs/xfs_dquot_item.c
+++ b/fs/xfs/xfs_dquot_item.c
@@ -188,7 +188,7 @@ xfs_qm_dquot_logitem_release(
STATIC void
xfs_qm_dquot_logitem_committing(
struct xfs_log_item *lip,
- xfs_lsn_t commit_lsn)
+ xfs_csn_t seq)
{
return xfs_qm_dquot_logitem_release(lip);
}
diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
index 9fd5a82a814c..3d64d99e64f9 100644
--- a/fs/xfs/xfs_file.c
+++ b/fs/xfs/xfs_file.c
@@ -119,8 +119,8 @@ xfs_dir_fsync(
return xfs_log_force_inode(ip);
}
-static xfs_lsn_t
-xfs_fsync_lsn(
+static xfs_csn_t
+xfs_fsync_seq(
struct xfs_inode *ip,
bool datasync)
{
@@ -128,7 +128,7 @@ xfs_fsync_lsn(
return 0;
if (datasync && !(ip->i_itemp->ili_fsync_fields & ~XFS_ILOG_TIMESTAMP))
return 0;
- return ip->i_itemp->ili_last_lsn;
+ return ip->i_itemp->ili_commit_seq;
}
/*
@@ -151,12 +151,12 @@ xfs_fsync_flush_log(
int *log_flushed)
{
int error = 0;
- xfs_lsn_t lsn;
+ xfs_csn_t seq;
xfs_ilock(ip, XFS_ILOCK_SHARED);
- lsn = xfs_fsync_lsn(ip, datasync);
- if (lsn) {
- error = xfs_log_force_lsn(ip->i_mount, lsn, XFS_LOG_SYNC,
+ seq = xfs_fsync_seq(ip, datasync);
+ if (seq) {
+ error = xfs_log_force_seq(ip->i_mount, seq, XFS_LOG_SYNC,
log_flushed);
spin_lock(&ip->i_itemp->ili_lock);
@@ -197,9 +197,9 @@ xfs_file_fsync(
* inode size in case of an extending write.
*/
if (XFS_IS_REALTIME_INODE(ip))
- xfs_blkdev_issue_flush(mp->m_rtdev_targp);
+ blkdev_issue_flush(mp->m_rtdev_targp->bt_bdev);
else if (mp->m_logdev_targp != mp->m_ddev_targp)
- xfs_blkdev_issue_flush(mp->m_ddev_targp);
+ blkdev_issue_flush(mp->m_ddev_targp->bt_bdev);
/*
* Any inode that has dirty modifications in the log is pinned. The
@@ -219,7 +219,7 @@ xfs_file_fsync(
*/
if (!log_flushed && !XFS_IS_REALTIME_INODE(ip) &&
mp->m_logdev_targp == mp->m_ddev_targp)
- xfs_blkdev_issue_flush(mp->m_ddev_targp);
+ blkdev_issue_flush(mp->m_ddev_targp->bt_bdev);
return error;
}
diff --git a/fs/xfs/xfs_inode.c b/fs/xfs/xfs_inode.c
index 3bee1cd20072..9ae5d2968274 100644
--- a/fs/xfs/xfs_inode.c
+++ b/fs/xfs/xfs_inode.c
@@ -2633,7 +2633,7 @@ xfs_iunpin(
trace_xfs_inode_unpin_nowait(ip, _RET_IP_);
/* Give the log a push to start the unpinning I/O */
- xfs_log_force_lsn(ip->i_mount, ip->i_itemp->ili_last_lsn, 0, NULL);
+ xfs_log_force_seq(ip->i_mount, ip->i_itemp->ili_commit_seq, 0, NULL);
}
@@ -3647,16 +3647,16 @@ int
xfs_log_force_inode(
struct xfs_inode *ip)
{
- xfs_lsn_t lsn = 0;
+ xfs_csn_t seq = 0;
xfs_ilock(ip, XFS_ILOCK_SHARED);
if (xfs_ipincount(ip))
- lsn = ip->i_itemp->ili_last_lsn;
+ seq = ip->i_itemp->ili_commit_seq;
xfs_iunlock(ip, XFS_ILOCK_SHARED);
- if (!lsn)
+ if (!seq)
return 0;
- return xfs_log_force_lsn(ip->i_mount, lsn, XFS_LOG_SYNC, NULL);
+ return xfs_log_force_seq(ip->i_mount, seq, XFS_LOG_SYNC, NULL);
}
/*
diff --git a/fs/xfs/xfs_inode_item.c b/fs/xfs/xfs_inode_item.c
index 6764d12342da..35de30849fcc 100644
--- a/fs/xfs/xfs_inode_item.c
+++ b/fs/xfs/xfs_inode_item.c
@@ -28,6 +28,20 @@ static inline struct xfs_inode_log_item *INODE_ITEM(struct xfs_log_item *lip)
return container_of(lip, struct xfs_inode_log_item, ili_item);
}
+/*
+ * The logged size of an inode fork is always the current size of the inode
+ * fork. This means that when an inode fork is relogged, the size of the logged
+ * region is determined by the current state, not the combination of the
+ * previously logged state + the current state. This is different relogging
+ * behaviour to most other log items which will retain the size of the
+ * previously logged changes when smaller regions are relogged.
+ *
+ * Hence operations that remove data from the inode fork (e.g. shortform
+ * dir/attr remove, extent form extent removal, etc), the size of the relogged
+ * inode gets -smaller- rather than stays the same size as the previously logged
+ * size and this can result in the committing transaction reducing the amount of
+ * space being consumed by the CIL.
+ */
STATIC void
xfs_inode_item_data_fork_size(
struct xfs_inode_log_item *iip,
@@ -629,9 +643,9 @@ xfs_inode_item_committed(
STATIC void
xfs_inode_item_committing(
struct xfs_log_item *lip,
- xfs_lsn_t commit_lsn)
+ xfs_csn_t seq)
{
- INODE_ITEM(lip)->ili_last_lsn = commit_lsn;
+ INODE_ITEM(lip)->ili_commit_seq = seq;
return xfs_inode_item_release(lip);
}
diff --git a/fs/xfs/xfs_inode_item.h b/fs/xfs/xfs_inode_item.h
index 4b926e32831c..403b45ab9aa2 100644
--- a/fs/xfs/xfs_inode_item.h
+++ b/fs/xfs/xfs_inode_item.h
@@ -33,7 +33,7 @@ struct xfs_inode_log_item {
unsigned int ili_fields; /* fields to be logged */
unsigned int ili_fsync_fields; /* logged since last fsync */
xfs_lsn_t ili_flush_lsn; /* lsn at last flush */
- xfs_lsn_t ili_last_lsn; /* lsn at last transaction */
+ xfs_csn_t ili_commit_seq; /* last transaction commit */
};
static inline int xfs_inode_clean(struct xfs_inode *ip)
diff --git a/fs/xfs/xfs_linux.h b/fs/xfs/xfs_linux.h
index 7688663b9773..c174262a074e 100644
--- a/fs/xfs/xfs_linux.h
+++ b/fs/xfs/xfs_linux.h
@@ -196,6 +196,8 @@ static inline uint64_t howmany_64(uint64_t x, uint32_t y)
int xfs_rw_bdev(struct block_device *bdev, sector_t sector, unsigned int count,
char *data, unsigned int op);
+void xfs_flush_bdev_async(struct bio *bio, struct block_device *bdev,
+ struct completion *done);
#define ASSERT_ALWAYS(expr) \
(likely(expr) ? (void)0 : assfail(NULL, #expr, __FILE__, __LINE__))
diff --git a/fs/xfs/xfs_log.c b/fs/xfs/xfs_log.c
index c19a82adea1e..63e2358f160a 100644
--- a/fs/xfs/xfs_log.c
+++ b/fs/xfs/xfs_log.c
@@ -47,21 +47,16 @@ xlog_state_get_iclog_space(
int len,
struct xlog_in_core **iclog,
struct xlog_ticket *ticket,
- int *continued_write,
int *logoffsetp);
STATIC void
-xlog_state_switch_iclogs(
- struct xlog *log,
- struct xlog_in_core *iclog,
- int eventual_size);
-STATIC void
xlog_grant_push_ail(
struct xlog *log,
int need_bytes);
STATIC void
xlog_sync(
struct xlog *log,
- struct xlog_in_core *iclog);
+ struct xlog_in_core *iclog,
+ struct xlog_ticket *ticket);
#if defined(DEBUG)
STATIC void
xlog_verify_dest_ptr(
@@ -94,6 +89,62 @@ xlog_iclogs_empty(
static int
xfs_log_cover(struct xfs_mount *);
+/*
+ * We need to make sure the buffer pointer returned is naturally aligned for the
+ * biggest basic data type we put into it. We have already accounted for this
+ * padding when sizing the buffer.
+ *
+ * However, this padding does not get written into the log, and hence we have to
+ * track the space used by the log vectors separately to prevent log space hangs
+ * due to inaccurate accounting (i.e. a leak) of the used log space through the
+ * CIL context ticket.
+ *
+ * We also add space for the xlog_op_header that describes this region in the
+ * log. This prepends the data region we return to the caller to copy their data
+ * into, so do all the static initialisation of the ophdr now. Because the ophdr
+ * is not 8 byte aligned, we have to be careful to ensure that we align the
+ * start of the buffer such that the region we return to the call is 8 byte
+ * aligned and packed against the tail of the ophdr.
+ */
+void *
+xlog_prepare_iovec(
+ struct xfs_log_vec *lv,
+ struct xfs_log_iovec **vecp,
+ uint type)
+{
+ struct xfs_log_iovec *vec = *vecp;
+ struct xlog_op_header *oph;
+ uint32_t len;
+ void *buf;
+
+ if (vec) {
+ ASSERT(vec - lv->lv_iovecp < lv->lv_niovecs);
+ vec++;
+ } else {
+ vec = &lv->lv_iovecp[0];
+ }
+
+ len = lv->lv_buf_len + sizeof(struct xlog_op_header);
+ if (!IS_ALIGNED(len, sizeof(uint64_t))) {
+ lv->lv_buf_len = round_up(len, sizeof(uint64_t)) -
+ sizeof(struct xlog_op_header);
+ }
+
+ vec->i_type = type;
+ vec->i_addr = lv->lv_buf + lv->lv_buf_len;
+
+ oph = vec->i_addr;
+ oph->oh_clientid = XFS_TRANSACTION;
+ oph->oh_res2 = 0;
+ oph->oh_flags = 0;
+
+ buf = vec->i_addr + sizeof(struct xlog_op_header);
+ ASSERT(IS_ALIGNED((unsigned long)buf, sizeof(uint64_t)));
+
+ *vecp = vec;
+ return buf;
+}
+
static void
xlog_grant_sub_space(
struct xlog *log,
@@ -326,30 +377,6 @@ xlog_grant_head_check(
return error;
}
-static void
-xlog_tic_reset_res(xlog_ticket_t *tic)
-{
- tic->t_res_num = 0;
- tic->t_res_arr_sum = 0;
- tic->t_res_num_ophdrs = 0;
-}
-
-static void
-xlog_tic_add_region(xlog_ticket_t *tic, uint len, uint type)
-{
- if (tic->t_res_num == XLOG_TIC_LEN_MAX) {
- /* add to overflow and start again */
- tic->t_res_o_flow += tic->t_res_arr_sum;
- tic->t_res_num = 0;
- tic->t_res_arr_sum = 0;
- }
-
- tic->t_res_arr[tic->t_res_num].r_len = len;
- tic->t_res_arr[tic->t_res_num].r_type = type;
- tic->t_res_arr_sum += len;
- tic->t_res_num++;
-}
-
bool
xfs_log_writable(
struct xfs_mount *mp)
@@ -399,8 +426,6 @@ xfs_log_regrant(
xlog_grant_push_ail(log, tic->t_unit_res);
tic->t_curr_res = tic->t_unit_res;
- xlog_tic_reset_res(tic);
-
if (tic->t_cnt > 0)
return 0;
@@ -438,10 +463,9 @@ out_error:
int
xfs_log_reserve(
struct xfs_mount *mp,
- int unit_bytes,
- int cnt,
+ int unit_bytes,
+ int cnt,
struct xlog_ticket **ticp,
- uint8_t client,
bool permanent)
{
struct xlog *log = mp->m_log;
@@ -449,15 +473,13 @@ xfs_log_reserve(
int need_bytes;
int error = 0;
- ASSERT(client == XFS_TRANSACTION || client == XFS_LOG);
-
if (XLOG_FORCED_SHUTDOWN(log))
return -EIO;
XFS_STATS_INC(mp, xs_try_logspace);
ASSERT(*ticp == NULL);
- tic = xlog_ticket_alloc(log, unit_bytes, cnt, client, permanent);
+ tic = xlog_ticket_alloc(log, unit_bytes, cnt, permanent);
*ticp = tic;
xlog_grant_push_ail(log, tic->t_cnt ? tic->t_unit_res * tic->t_cnt
@@ -513,10 +535,11 @@ __xlog_state_release_iclog(
* Flush iclog to disk if this is the last reference to the given iclog and the
* it is in the WANT_SYNC state.
*/
-static int
+int
xlog_state_release_iclog(
struct xlog *log,
- struct xlog_in_core *iclog)
+ struct xlog_in_core *iclog,
+ struct xlog_ticket *ticket)
{
lockdep_assert_held(&log->l_icloglock);
@@ -526,30 +549,13 @@ xlog_state_release_iclog(
if (atomic_dec_and_test(&iclog->ic_refcnt) &&
__xlog_state_release_iclog(log, iclog)) {
spin_unlock(&log->l_icloglock);
- xlog_sync(log, iclog);
+ xlog_sync(log, iclog, ticket);
spin_lock(&log->l_icloglock);
}
return 0;
}
-void
-xfs_log_release_iclog(
- struct xlog_in_core *iclog)
-{
- struct xlog *log = iclog->ic_log;
- bool sync = false;
-
- if (atomic_dec_and_lock(&iclog->ic_refcnt, &log->l_icloglock)) {
- if (iclog->ic_state != XLOG_STATE_IOERROR)
- sync = __xlog_state_release_iclog(log, iclog);
- spin_unlock(&log->l_icloglock);
- }
-
- if (sync)
- xlog_sync(log, iclog);
-}
-
/*
* Mount a log filesystem
*
@@ -786,10 +792,12 @@ xfs_log_mount_cancel(
}
/*
- * Wait for the iclog to be written disk, or return an error if the log has been
- * shut down.
+ * Wait for the iclog and all prior iclogs to be written disk as required by the
+ * log force state machine. Waiting on ic_force_wait ensures iclog completions
+ * have been ordered and callbacks run before we are woken here, hence
+ * guaranteeing that all the iclogs up to this one are on stable storage.
*/
-static int
+int
xlog_wait_on_iclog(
struct xlog_in_core *iclog)
__releases(iclog->ic_log->l_icloglock)
@@ -818,26 +826,49 @@ xlog_wait_on_iclog(
static int
xlog_write_unmount_record(
struct xlog *log,
- struct xlog_ticket *ticket,
- xfs_lsn_t *lsn,
- uint flags)
+ struct xlog_ticket *ticket)
{
- struct xfs_unmount_log_format ulf = {
- .magic = XLOG_UNMOUNT_TYPE,
+ struct {
+ struct xlog_op_header ophdr;
+ struct xfs_unmount_log_format ulf;
+ } unmount_rec = {
+ .ophdr = {
+ .oh_clientid = XFS_LOG,
+ .oh_tid = cpu_to_be32(ticket->t_tid),
+ .oh_flags = XLOG_UNMOUNT_TRANS,
+ },
+ .ulf = {
+ .magic = XLOG_UNMOUNT_TYPE,
+ },
};
struct xfs_log_iovec reg = {
- .i_addr = &ulf,
- .i_len = sizeof(ulf),
+ .i_addr = &unmount_rec,
+ .i_len = sizeof(unmount_rec),
.i_type = XLOG_REG_TYPE_UNMOUNT,
};
struct xfs_log_vec vec = {
.lv_niovecs = 1,
.lv_iovecp = &reg,
};
+ LIST_HEAD(lv_chain);
+ INIT_LIST_HEAD(&vec.lv_list);
+ list_add(&vec.lv_list, &lv_chain);
+
+ BUILD_BUG_ON((sizeof(struct xlog_op_header) +
+ sizeof(struct xfs_unmount_log_format)) !=
+ sizeof(unmount_rec));
/* account for space used by record data */
- ticket->t_curr_res -= sizeof(ulf);
- return xlog_write(log, &vec, ticket, lsn, NULL, flags, false);
+ ticket->t_curr_res -= sizeof(unmount_rec);
+
+ /*
+ * For external log devices, we need to flush the data device cache
+ * first to ensure all metadata writeback is on stable storage before we
+ * stamp the tail LSN into the unmount record.
+ */
+ if (log->l_targ != log->l_mp->m_ddev_targp)
+ blkdev_issue_flush(log->l_targ->bt_bdev);
+ return xlog_write(log, &lv_chain, ticket, NULL, NULL, reg.i_len);
}
/*
@@ -851,15 +882,13 @@ xlog_unmount_write(
struct xfs_mount *mp = log->l_mp;
struct xlog_in_core *iclog;
struct xlog_ticket *tic = NULL;
- xfs_lsn_t lsn;
- uint flags = XLOG_UNMOUNT_TRANS;
int error;
- error = xfs_log_reserve(mp, 600, 1, &tic, XFS_LOG, 0);
+ error = xfs_log_reserve(mp, 600, 1, &tic, 0);
if (error)
goto out_err;
- error = xlog_write_unmount_record(log, tic, &lsn, flags);
+ error = xlog_write_unmount_record(log, tic);
/*
* At this point, we're umounting anyway, so there's no point in
* transitioning log state to IOERROR. Just continue...
@@ -876,7 +905,12 @@ out_err:
else
ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC ||
iclog->ic_state == XLOG_STATE_IOERROR);
- error = xlog_state_release_iclog(log, iclog);
+ /*
+ * Ensure the journal is fully flushed and on stable storage once the
+ * iclog containing the unmount record is written.
+ */
+ iclog->ic_flags |= (XLOG_ICL_NEED_FLUSH | XLOG_ICL_NEED_FUA);
+ error = xlog_state_release_iclog(log, iclog, tic);
xlog_wait_on_iclog(iclog);
if (tic) {
@@ -1401,6 +1435,11 @@ xlog_alloc_log(
xlog_assign_atomic_lsn(&log->l_last_sync_lsn, 1, 0);
log->l_curr_cycle = 1; /* 0 is bad since this is initial value */
+ if (xfs_sb_version_haslogv2(&mp->m_sb) && mp->m_sb.sb_logsunit > 1)
+ log->l_iclog_roundoff = mp->m_sb.sb_logsunit;
+ else
+ log->l_iclog_roundoff = BBSIZE;
+
xlog_grant_head_init(&log->l_reserve_head);
xlog_grant_head_init(&log->l_write_head);
@@ -1532,9 +1571,14 @@ xlog_commit_record(
struct xlog_in_core **iclog,
xfs_lsn_t *lsn)
{
+ struct xlog_op_header ophdr = {
+ .oh_clientid = XFS_TRANSACTION,
+ .oh_tid = cpu_to_be32(ticket->t_tid),
+ .oh_flags = XLOG_COMMIT_TRANS,
+ };
struct xfs_log_iovec reg = {
- .i_addr = NULL,
- .i_len = 0,
+ .i_addr = &ophdr,
+ .i_len = sizeof(struct xlog_op_header),
.i_type = XLOG_REG_TYPE_COMMIT,
};
struct xfs_log_vec vec = {
@@ -1542,12 +1586,16 @@ xlog_commit_record(
.lv_iovecp = &reg,
};
int error;
+ LIST_HEAD(lv_chain);
+ INIT_LIST_HEAD(&vec.lv_list);
+ list_add(&vec.lv_list, &lv_chain);
if (XLOG_FORCED_SHUTDOWN(log))
return -EIO;
- error = xlog_write(log, &vec, ticket, lsn, iclog, XLOG_COMMIT_TRANS,
- false);
+ /* account for space used by record data */
+ ticket->t_curr_res -= reg.i_len;
+ error = xlog_write(log, &lv_chain, ticket, lsn, iclog, reg.i_len);
if (error)
xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
return error;
@@ -1753,8 +1801,7 @@ xlog_write_iclog(
struct xlog *log,
struct xlog_in_core *iclog,
uint64_t bno,
- unsigned int count,
- bool need_flush)
+ unsigned int count)
{
ASSERT(bno < log->l_logBBsize);
@@ -1792,10 +1839,12 @@ xlog_write_iclog(
* writeback throttle from throttling log writes behind background
* metadata writeback and causing priority inversions.
*/
- iclog->ic_bio.bi_opf = REQ_OP_WRITE | REQ_META | REQ_SYNC |
- REQ_IDLE | REQ_FUA;
- if (need_flush)
+ iclog->ic_bio.bi_opf = REQ_OP_WRITE | REQ_META | REQ_SYNC | REQ_IDLE;
+ if (iclog->ic_flags & XLOG_ICL_NEED_FLUSH)
iclog->ic_bio.bi_opf |= REQ_PREFLUSH;
+ if (iclog->ic_flags & XLOG_ICL_NEED_FUA)
+ iclog->ic_bio.bi_opf |= REQ_FUA;
+ iclog->ic_flags &= ~(XLOG_ICL_NEED_FLUSH | XLOG_ICL_NEED_FUA);
if (xlog_map_iclog_data(&iclog->ic_bio, iclog->ic_data, count)) {
xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
@@ -1854,29 +1903,15 @@ xlog_calc_iclog_size(
uint32_t *roundoff)
{
uint32_t count_init, count;
- bool use_lsunit;
-
- use_lsunit = xfs_sb_version_haslogv2(&log->l_mp->m_sb) &&
- log->l_mp->m_sb.sb_logsunit > 1;
/* Add for LR header */
count_init = log->l_iclog_hsize + iclog->ic_offset;
+ count = roundup(count_init, log->l_iclog_roundoff);
- /* Round out the log write size */
- if (use_lsunit) {
- /* we have a v2 stripe unit to use */
- count = XLOG_LSUNITTOB(log, XLOG_BTOLSUNIT(log, count_init));
- } else {
- count = BBTOB(BTOBB(count_init));
- }
-
- ASSERT(count >= count_init);
*roundoff = count - count_init;
- if (use_lsunit)
- ASSERT(*roundoff < log->l_mp->m_sb.sb_logsunit);
- else
- ASSERT(*roundoff < BBTOB(1));
+ ASSERT(count >= count_init);
+ ASSERT(*roundoff < log->l_iclog_roundoff);
return count;
}
@@ -1906,24 +1941,32 @@ xlog_calc_iclog_size(
STATIC void
xlog_sync(
struct xlog *log,
- struct xlog_in_core *iclog)
+ struct xlog_in_core *iclog,
+ struct xlog_ticket *ticket)
{
unsigned int count; /* byte count of bwrite */
unsigned int roundoff; /* roundoff to BB or stripe */
uint64_t bno;
unsigned int size;
- bool need_flush = true, split = false;
ASSERT(atomic_read(&iclog->ic_refcnt) == 0);
count = xlog_calc_iclog_size(log, iclog, &roundoff);
- /* move grant heads by roundoff in sync */
- xlog_grant_add_space(log, &log->l_reserve_head.grant, roundoff);
- xlog_grant_add_space(log, &log->l_write_head.grant, roundoff);
+ /*
+ * If we have a ticket, account for the roundoff via the ticket
+ * reservation to avoid touching the hot grant heads needlessly.
+ * Otherwise, we have to move grant heads directly.
+ */
+ if (ticket) {
+ ticket->t_curr_res -= roundoff;
+ } else {
+ xlog_grant_add_space(log, &log->l_reserve_head.grant, roundoff);
+ xlog_grant_add_space(log, &log->l_write_head.grant, roundoff);
+ }
/* put cycle number in every block */
- xlog_pack_data(log, iclog, roundoff);
+ xlog_pack_data(log, iclog, roundoff);
/* real byte length */
size = iclog->ic_offset;
@@ -1937,10 +1980,8 @@ xlog_sync(
bno = BLOCK_LSN(be64_to_cpu(iclog->ic_header.h_lsn));
/* Do we need to split this write into 2 parts? */
- if (bno + BTOBB(count) > log->l_logBBsize) {
+ if (bno + BTOBB(count) > log->l_logBBsize)
xlog_split_iclog(log, &iclog->ic_header, bno, count);
- split = true;
- }
/* calculcate the checksum */
iclog->ic_header.h_crc = xlog_cksum(log, &iclog->ic_header,
@@ -1961,22 +2002,8 @@ xlog_sync(
be64_to_cpu(iclog->ic_header.h_lsn));
}
#endif
-
- /*
- * Flush the data device before flushing the log to make sure all meta
- * data written back from the AIL actually made it to disk before
- * stamping the new log tail LSN into the log buffer. For an external
- * log we need to issue the flush explicitly, and unfortunately
- * synchronously here; for an internal log we can simply use the block
- * layer state machine for preflushes.
- */
- if (log->l_targ != log->l_mp->m_ddev_targp || split) {
- xfs_blkdev_issue_flush(log->l_mp->m_ddev_targp);
- need_flush = false;
- }
-
xlog_verify_iclog(log, iclog, count);
- xlog_write_iclog(log, iclog, bno, count, need_flush);
+ xlog_write_iclog(log, iclog, bno, count);
}
/*
@@ -2040,63 +2067,11 @@ xlog_print_tic_res(
struct xfs_mount *mp,
struct xlog_ticket *ticket)
{
- uint i;
- uint ophdr_spc = ticket->t_res_num_ophdrs * (uint)sizeof(xlog_op_header_t);
-
- /* match with XLOG_REG_TYPE_* in xfs_log.h */
-#define REG_TYPE_STR(type, str) [XLOG_REG_TYPE_##type] = str
- static char *res_type_str[] = {
- REG_TYPE_STR(BFORMAT, "bformat"),
- REG_TYPE_STR(BCHUNK, "bchunk"),
- REG_TYPE_STR(EFI_FORMAT, "efi_format"),
- REG_TYPE_STR(EFD_FORMAT, "efd_format"),
- REG_TYPE_STR(IFORMAT, "iformat"),
- REG_TYPE_STR(ICORE, "icore"),
- REG_TYPE_STR(IEXT, "iext"),
- REG_TYPE_STR(IBROOT, "ibroot"),
- REG_TYPE_STR(ILOCAL, "ilocal"),
- REG_TYPE_STR(IATTR_EXT, "iattr_ext"),
- REG_TYPE_STR(IATTR_BROOT, "iattr_broot"),
- REG_TYPE_STR(IATTR_LOCAL, "iattr_local"),
- REG_TYPE_STR(QFORMAT, "qformat"),
- REG_TYPE_STR(DQUOT, "dquot"),
- REG_TYPE_STR(QUOTAOFF, "quotaoff"),
- REG_TYPE_STR(LRHEADER, "LR header"),
- REG_TYPE_STR(UNMOUNT, "unmount"),
- REG_TYPE_STR(COMMIT, "commit"),
- REG_TYPE_STR(TRANSHDR, "trans header"),
- REG_TYPE_STR(ICREATE, "inode create"),
- REG_TYPE_STR(RUI_FORMAT, "rui_format"),
- REG_TYPE_STR(RUD_FORMAT, "rud_format"),
- REG_TYPE_STR(CUI_FORMAT, "cui_format"),
- REG_TYPE_STR(CUD_FORMAT, "cud_format"),
- REG_TYPE_STR(BUI_FORMAT, "bui_format"),
- REG_TYPE_STR(BUD_FORMAT, "bud_format"),
- };
- BUILD_BUG_ON(ARRAY_SIZE(res_type_str) != XLOG_REG_TYPE_MAX + 1);
-#undef REG_TYPE_STR
-
xfs_warn(mp, "ticket reservation summary:");
- xfs_warn(mp, " unit res = %d bytes",
- ticket->t_unit_res);
- xfs_warn(mp, " current res = %d bytes",
- ticket->t_curr_res);
- xfs_warn(mp, " total reg = %u bytes (o/flow = %u bytes)",
- ticket->t_res_arr_sum, ticket->t_res_o_flow);
- xfs_warn(mp, " ophdrs = %u (ophdr space = %u bytes)",
- ticket->t_res_num_ophdrs, ophdr_spc);
- xfs_warn(mp, " ophdr + reg = %u bytes",
- ticket->t_res_arr_sum + ticket->t_res_o_flow + ophdr_spc);
- xfs_warn(mp, " num regions = %u",
- ticket->t_res_num);
-
- for (i = 0; i < ticket->t_res_num; i++) {
- uint r_type = ticket->t_res_arr[i].r_type;
- xfs_warn(mp, "region[%u]: %s - %u bytes", i,
- ((r_type <= 0 || r_type > XLOG_REG_TYPE_MAX) ?
- "bad-rtype" : res_type_str[r_type]),
- ticket->t_res_arr[i].r_len);
- }
+ xfs_warn(mp, " unit res = %d bytes", ticket->t_unit_res);
+ xfs_warn(mp, " current res = %d bytes", ticket->t_curr_res);
+ xfs_warn(mp, " original count = %d", ticket->t_ocnt);
+ xfs_warn(mp, " remaining count = %d", ticket->t_cnt);
}
/*
@@ -2150,191 +2125,255 @@ xlog_print_trans(
}
/*
- * Calculate the potential space needed by the log vector. We may need a start
- * record, and each region gets its own struct xlog_op_header and may need to be
- * double word aligned.
+ * Write whole log vectors into a single iclog which is guaranteed to have
+ * either sufficient space for the entire log vector chain to be written or
+ * exclusive access to the remaining space in the iclog.
+ *
+ * Return the number of iovecs and data written into the iclog, as well as
+ * a pointer to the logvec that doesn't fit in the log (or NULL if we hit the
+ * end of the chain.
*/
-static int
-xlog_write_calc_vec_length(
- struct xlog_ticket *ticket,
+static struct xfs_log_vec *
+xlog_write_single(
+ struct list_head *lv_chain,
struct xfs_log_vec *log_vector,
- bool need_start_rec)
+ struct xlog_ticket *ticket,
+ struct xlog_in_core *iclog,
+ uint32_t *log_offset,
+ uint32_t *len,
+ uint32_t *record_cnt,
+ uint32_t *data_cnt)
{
struct xfs_log_vec *lv;
- int headers = need_start_rec ? 1 : 0;
- int len = 0;
- int i;
+ void *ptr;
+ int index;
- for (lv = log_vector; lv; lv = lv->lv_next) {
- /* we don't write ordered log vectors */
- if (lv->lv_buf_len == XFS_LOG_VEC_ORDERED)
- continue;
+ ASSERT(*log_offset + *len <= iclog->ic_size ||
+ iclog->ic_state == XLOG_STATE_WANT_SYNC);
+
+ ptr = iclog->ic_datap + *log_offset;
+ for (lv = log_vector;
+ !list_entry_is_head(lv, lv_chain, lv_list);
+ lv = list_next_entry(lv, lv_list)) {
+ /*
+ * If the entire log vec does not fit in the iclog, punt it to
+ * the partial copy loop which can handle this case.
+ */
+ if (lv->lv_niovecs &&
+ lv->lv_bytes > iclog->ic_size - *log_offset)
+ break;
- headers += lv->lv_niovecs;
+ /*
+ * Ordered log vectors have no regions to write so this
+ * loop will naturally skip them.
+ */
+ for (index = 0; index < lv->lv_niovecs; index++) {
+ struct xfs_log_iovec *reg = &lv->lv_iovecp[index];
+ struct xlog_op_header *ophdr = reg->i_addr;
- for (i = 0; i < lv->lv_niovecs; i++) {
- struct xfs_log_iovec *vecp = &lv->lv_iovecp[i];
+ ASSERT(reg->i_len % sizeof(int32_t) == 0);
+ ASSERT((unsigned long)ptr % sizeof(int32_t) == 0);
- len += vecp->i_len;
- xlog_tic_add_region(ticket, vecp->i_len, vecp->i_type);
+ ophdr->oh_tid = cpu_to_be32(ticket->t_tid);
+ ophdr->oh_len = cpu_to_be32(reg->i_len -
+ sizeof(struct xlog_op_header));
+ memcpy(ptr, reg->i_addr, reg->i_len);
+ xlog_write_adv_cnt(&ptr, len, log_offset, reg->i_len);
+ (*record_cnt)++;
+ *data_cnt += reg->i_len;
}
}
-
- ticket->t_res_num_ophdrs += headers;
- len += headers * sizeof(struct xlog_op_header);
-
- return len;
+ if (list_entry_is_head(lv, lv_chain, lv_list))
+ lv = NULL;
+ ASSERT(*len == 0 || lv);
+ return lv;
}
-static void
-xlog_write_start_rec(
- struct xlog_op_header *ophdr,
- struct xlog_ticket *ticket)
-{
- ophdr->oh_tid = cpu_to_be32(ticket->t_tid);
- ophdr->oh_clientid = ticket->t_clientid;
- ophdr->oh_len = 0;
- ophdr->oh_flags = XLOG_START_TRANS;
- ophdr->oh_res2 = 0;
-}
-
-static xlog_op_header_t *
-xlog_write_setup_ophdr(
+static int
+xlog_write_get_more_iclog_space(
struct xlog *log,
- struct xlog_op_header *ophdr,
struct xlog_ticket *ticket,
- uint flags)
+ struct xlog_in_core **iclogp,
+ uint32_t *log_offset,
+ uint32_t len,
+ uint32_t *record_cnt,
+ uint32_t *data_cnt)
{
- ophdr->oh_tid = cpu_to_be32(ticket->t_tid);
- ophdr->oh_clientid = ticket->t_clientid;
- ophdr->oh_res2 = 0;
-
- /* are we copying a commit or unmount record? */
- ophdr->oh_flags = flags;
+ struct xlog_in_core *iclog = *iclogp;
+ int error;
- /*
- * We've seen logs corrupted with bad transaction client ids. This
- * makes sure that XFS doesn't generate them on. Turn this into an EIO
- * and shut down the filesystem.
- */
- switch (ophdr->oh_clientid) {
- case XFS_TRANSACTION:
- case XFS_VOLUME:
- case XFS_LOG:
- break;
- default:
- xfs_warn(log->l_mp,
- "Bad XFS transaction clientid 0x%x in ticket "PTR_FMT,
- ophdr->oh_clientid, ticket);
- return NULL;
- }
+ spin_lock(&log->l_icloglock);
+ xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt);
+ ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC ||
+ iclog->ic_state == XLOG_STATE_IOERROR);
+ error = xlog_state_release_iclog(log, iclog, ticket);
+ spin_unlock(&log->l_icloglock);
+ if (error)
+ return error;
- return ophdr;
+ error = xlog_state_get_iclog_space(log, len, &iclog, ticket,
+ log_offset);
+ if (error)
+ return error;
+ *record_cnt = 0;
+ *data_cnt = 0;
+ *iclogp = iclog;
+ return 0;
}
/*
- * Set up the parameters of the region copy into the log. This has
- * to handle region write split across multiple log buffers - this
- * state is kept external to this function so that this code can
- * be written in an obvious, self documenting manner.
+ * Write log vectors into a single iclog which is smaller than the current chain
+ * length. We write until we cannot fit a full record into the remaining space
+ * and then stop. We return the log vector that is to be written that cannot
+ * wholly fit in the iclog.
*/
-static int
-xlog_write_setup_copy(
+static struct xfs_log_vec *
+xlog_write_partial(
+ struct xlog *log,
+ struct list_head *lv_chain,
+ struct xfs_log_vec *log_vector,
struct xlog_ticket *ticket,
- struct xlog_op_header *ophdr,
- int space_available,
- int space_required,
- int *copy_off,
- int *copy_len,
- int *last_was_partial_copy,
- int *bytes_consumed)
-{
- int still_to_copy;
-
- still_to_copy = space_required - *bytes_consumed;
- *copy_off = *bytes_consumed;
-
- if (still_to_copy <= space_available) {
- /* write of region completes here */
- *copy_len = still_to_copy;
- ophdr->oh_len = cpu_to_be32(*copy_len);
- if (*last_was_partial_copy)
- ophdr->oh_flags |= (XLOG_END_TRANS|XLOG_WAS_CONT_TRANS);
- *last_was_partial_copy = 0;
- *bytes_consumed = 0;
- return 0;
- }
+ struct xlog_in_core **iclogp,
+ uint32_t *log_offset,
+ uint32_t *len,
+ uint32_t *record_cnt,
+ uint32_t *data_cnt)
+{
+ struct xlog_in_core *iclog = *iclogp;
+ struct xfs_log_vec *lv = log_vector;
+ struct xfs_log_iovec *reg;
+ struct xlog_op_header *ophdr;
+ void *ptr;
+ int index = 0;
+ uint32_t rlen;
+ int error;
- /* partial write of region, needs extra log op header reservation */
- *copy_len = space_available;
- ophdr->oh_len = cpu_to_be32(*copy_len);
- ophdr->oh_flags |= XLOG_CONTINUE_TRANS;
- if (*last_was_partial_copy)
- ophdr->oh_flags |= XLOG_WAS_CONT_TRANS;
- *bytes_consumed += *copy_len;
- (*last_was_partial_copy)++;
+ /* walk the logvec, copying until we run out of space in the iclog */
+ ptr = iclog->ic_datap + *log_offset;
+ for (index = 0; index < lv->lv_niovecs; index++) {
+ uint32_t reg_offset = 0;
- /* account for new log op header */
- ticket->t_curr_res -= sizeof(struct xlog_op_header);
- ticket->t_res_num_ophdrs++;
+ reg = &lv->lv_iovecp[index];
+ ASSERT(reg->i_len % sizeof(int32_t) == 0);
- return sizeof(struct xlog_op_header);
-}
+ /*
+ * The first region of a continuation must have a non-zero
+ * length otherwise log recovery will just skip over it and
+ * start recovering from the next opheader it finds. Because we
+ * mark the next opheader as a continuation, recovery will then
+ * incorrectly add the continuation to the previous region and
+ * that breaks stuff.
+ *
+ * Hence if there isn't space for region data after the
+ * opheader, then we need to start afresh with a new iclog.
+ */
+ if (iclog->ic_size - *log_offset <=
+ sizeof(struct xlog_op_header)) {
+ error = xlog_write_get_more_iclog_space(log, ticket,
+ &iclog, log_offset, *len, record_cnt,
+ data_cnt);
+ if (error)
+ return ERR_PTR(error);
+ ptr = iclog->ic_datap + *log_offset;
+ }
-static int
-xlog_write_copy_finish(
- struct xlog *log,
- struct xlog_in_core *iclog,
- uint flags,
- int *record_cnt,
- int *data_cnt,
- int *partial_copy,
- int *partial_copy_len,
- int log_offset,
- struct xlog_in_core **commit_iclog)
-{
- int error;
+ ophdr = reg->i_addr;
+ rlen = min_t(uint32_t, reg->i_len, iclog->ic_size - *log_offset);
+
+ ophdr->oh_tid = cpu_to_be32(ticket->t_tid);
+ ophdr->oh_len = cpu_to_be32(rlen - sizeof(struct xlog_op_header));
+ if (rlen != reg->i_len)
+ ophdr->oh_flags |= XLOG_CONTINUE_TRANS;
+
+ ASSERT((unsigned long)ptr % sizeof(int32_t) == 0);
+ xlog_verify_dest_ptr(log, ptr);
+ memcpy(ptr, reg->i_addr, rlen);
+ xlog_write_adv_cnt(&ptr, len, log_offset, rlen);
+ (*record_cnt)++;
+ *data_cnt += rlen;
+
+ /* If we wrote the whole region, move to the next. */
+ if (rlen == reg->i_len)
+ continue;
- if (*partial_copy) {
/*
- * This iclog has already been marked WANT_SYNC by
- * xlog_state_get_iclog_space.
+ * We now have a partially written iovec, but it can span
+ * multiple iclogs so we loop here. First we release the iclog
+ * we currently have, then we get a new iclog and add a new
+ * opheader. Then we continue copying from where we were until
+ * we either complete the iovec or fill the iclog. If we
+ * complete the iovec, then we increment the index and go right
+ * back to the top of the outer loop. if we fill the iclog, we
+ * run the inner loop again.
+ *
+ * This is complicated by the tail of a region using all the
+ * space in an iclog and hence requiring us to release the iclog
+ * and get a new one before returning to the outer loop. We must
+ * always guarantee that we exit this inner loop with at least
+ * space for log transaction opheaders left in the current
+ * iclog, hence we cannot just terminate the loop at the end
+ * of the of the continuation. So we loop while there is no
+ * space left in the current iclog, and check for the end of the
+ * continuation after getting a new iclog.
*/
- spin_lock(&log->l_icloglock);
- xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt);
- *record_cnt = 0;
- *data_cnt = 0;
- goto release_iclog;
- }
+ do {
+ /*
+ * Account for the continuation opheader before we get
+ * a new iclog. This is necessary so that we reserve
+ * space in the iclog for it.
+ */
+ *len += sizeof(struct xlog_op_header);
+ ticket->t_curr_res -= sizeof(struct xlog_op_header);
- *partial_copy = 0;
- *partial_copy_len = 0;
+ error = xlog_write_get_more_iclog_space(log, ticket,
+ &iclog, log_offset, *len, record_cnt,
+ data_cnt);
+ if (error)
+ return ERR_PTR(error);
+ ptr = iclog->ic_datap + *log_offset;
- if (iclog->ic_size - log_offset <= sizeof(xlog_op_header_t)) {
- /* no more space in this iclog - push it. */
- spin_lock(&log->l_icloglock);
- xlog_state_finish_copy(log, iclog, *record_cnt, *data_cnt);
- *record_cnt = 0;
- *data_cnt = 0;
+ ophdr = ptr;
+ ophdr->oh_tid = cpu_to_be32(ticket->t_tid);
+ ophdr->oh_clientid = XFS_TRANSACTION;
+ ophdr->oh_res2 = 0;
+ ophdr->oh_flags = XLOG_WAS_CONT_TRANS;
- if (iclog->ic_state == XLOG_STATE_ACTIVE)
- xlog_state_switch_iclogs(log, iclog, 0);
- else
- ASSERT(iclog->ic_state == XLOG_STATE_WANT_SYNC ||
- iclog->ic_state == XLOG_STATE_IOERROR);
- if (!commit_iclog)
- goto release_iclog;
- spin_unlock(&log->l_icloglock);
- ASSERT(flags & XLOG_COMMIT_TRANS);
- *commit_iclog = iclog;
- }
+ xlog_write_adv_cnt(&ptr, len, log_offset,
+ sizeof(struct xlog_op_header));
+ *data_cnt += sizeof(struct xlog_op_header);
- return 0;
+ /*
+ * If rlen fits in the iclog, then end the region
+ * continuation. Otherwise we're going around again.
+ */
+ reg_offset += rlen;
+ rlen = reg->i_len - reg_offset;
+ if (rlen <= iclog->ic_size - *log_offset)
+ ophdr->oh_flags |= XLOG_END_TRANS;
+ else
+ ophdr->oh_flags |= XLOG_CONTINUE_TRANS;
-release_iclog:
- error = xlog_state_release_iclog(log, iclog);
- spin_unlock(&log->l_icloglock);
- return error;
+ rlen = min_t(uint32_t, rlen, iclog->ic_size - *log_offset);
+ ophdr->oh_len = cpu_to_be32(rlen);
+
+ xlog_verify_dest_ptr(log, ptr);
+ memcpy(ptr, reg->i_addr + reg_offset, rlen);
+ xlog_write_adv_cnt(&ptr, len, log_offset, rlen);
+ (*record_cnt)++;
+ *data_cnt += rlen;
+
+ } while (ophdr->oh_flags & XLOG_CONTINUE_TRANS);
+ }
+
+ /*
+ * No more iovecs remain in this logvec so return the next log vec to
+ * the caller so it can go back to fast path copying.
+ */
+ *iclogp = iclog;
+ lv = list_next_entry(lv, lv_list);
+ if (list_entry_is_head(lv, lv_chain, lv_list))
+ return NULL;
+ return lv;
}
/*
@@ -2380,32 +2419,19 @@ release_iclog:
int
xlog_write(
struct xlog *log,
- struct xfs_log_vec *log_vector,
+ struct list_head *lv_chain,
struct xlog_ticket *ticket,
xfs_lsn_t *start_lsn,
struct xlog_in_core **commit_iclog,
- uint flags,
- bool need_start_rec)
+ uint32_t len)
{
struct xlog_in_core *iclog = NULL;
- struct xfs_log_vec *lv = log_vector;
- struct xfs_log_iovec *vecp = lv->lv_iovecp;
- int index = 0;
- int len;
- int partial_copy = 0;
- int partial_copy_len = 0;
- int contwr = 0;
+ struct xfs_log_vec *lv;
int record_cnt = 0;
int data_cnt = 0;
int error = 0;
+ int log_offset;
- /*
- * If this is a commit or unmount transaction, we don't need a start
- * record to be written. We do, however, have to account for the
- * commit or unmount header that gets written. Hence we always have
- * to account for an extra xlog_op_header here.
- */
- ticket->t_curr_res -= sizeof(struct xlog_op_header);
if (ticket->t_curr_res < 0) {
xfs_alert_tag(log->l_mp, XFS_PTAG_LOGRES,
"ctx ticket reservation ran out. Need to up reservation");
@@ -2413,144 +2439,44 @@ xlog_write(
xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
}
- len = xlog_write_calc_vec_length(ticket, log_vector, need_start_rec);
- *start_lsn = 0;
- while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) {
- void *ptr;
- int log_offset;
-
- error = xlog_state_get_iclog_space(log, len, &iclog, ticket,
- &contwr, &log_offset);
- if (error)
- return error;
-
- ASSERT(log_offset <= iclog->ic_size - 1);
- ptr = iclog->ic_datap + log_offset;
-
- /* start_lsn is the first lsn written to. That's all we need. */
- if (!*start_lsn)
- *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn);
-
- /*
- * This loop writes out as many regions as can fit in the amount
- * of space which was allocated by xlog_state_get_iclog_space().
- */
- while (lv && (!lv->lv_niovecs || index < lv->lv_niovecs)) {
- struct xfs_log_iovec *reg;
- struct xlog_op_header *ophdr;
- int copy_len;
- int copy_off;
- bool ordered = false;
-
- /* ordered log vectors have no regions to write */
- if (lv->lv_buf_len == XFS_LOG_VEC_ORDERED) {
- ASSERT(lv->lv_niovecs == 0);
- ordered = true;
- goto next_lv;
- }
-
- reg = &vecp[index];
- ASSERT(reg->i_len % sizeof(int32_t) == 0);
- ASSERT((unsigned long)ptr % sizeof(int32_t) == 0);
-
- /*
- * Before we start formatting log vectors, we need to
- * write a start record. Only do this for the first
- * iclog we write to.
- */
- if (need_start_rec) {
- xlog_write_start_rec(ptr, ticket);
- xlog_write_adv_cnt(&ptr, &len, &log_offset,
- sizeof(struct xlog_op_header));
- }
-
- ophdr = xlog_write_setup_ophdr(log, ptr, ticket, flags);
- if (!ophdr)
- return -EIO;
+ error = xlog_state_get_iclog_space(log, len, &iclog, ticket,
+ &log_offset);
+ if (error)
+ return error;
- xlog_write_adv_cnt(&ptr, &len, &log_offset,
- sizeof(struct xlog_op_header));
+ /* start_lsn is the LSN of the first iclog written to. */
+ if (start_lsn)
+ *start_lsn = be64_to_cpu(iclog->ic_header.h_lsn);
- len += xlog_write_setup_copy(ticket, ophdr,
- iclog->ic_size-log_offset,
- reg->i_len,
- &copy_off, &copy_len,
- &partial_copy,
- &partial_copy_len);
- xlog_verify_dest_ptr(log, ptr);
-
- /*
- * Copy region.
- *
- * Unmount records just log an opheader, so can have
- * empty payloads with no data region to copy. Hence we
- * only copy the payload if the vector says it has data
- * to copy.
- */
- ASSERT(copy_len >= 0);
- if (copy_len > 0) {
- memcpy(ptr, reg->i_addr + copy_off, copy_len);
- xlog_write_adv_cnt(&ptr, &len, &log_offset,
- copy_len);
- }
- copy_len += sizeof(struct xlog_op_header);
- record_cnt++;
- if (need_start_rec) {
- copy_len += sizeof(struct xlog_op_header);
- record_cnt++;
- need_start_rec = false;
- }
- data_cnt += contwr ? copy_len : 0;
-
- error = xlog_write_copy_finish(log, iclog, flags,
- &record_cnt, &data_cnt,
- &partial_copy,
- &partial_copy_len,
- log_offset,
- commit_iclog);
- if (error)
- return error;
-
- /*
- * if we had a partial copy, we need to get more iclog
- * space but we don't want to increment the region
- * index because there is still more is this region to
- * write.
- *
- * If we completed writing this region, and we flushed
- * the iclog (indicated by resetting of the record
- * count), then we also need to get more log space. If
- * this was the last record, though, we are done and
- * can just return.
- */
- if (partial_copy)
- break;
+ lv = list_first_entry_or_null(lv_chain, struct xfs_log_vec, lv_list);
+ while (lv) {
+ lv = xlog_write_single(lv_chain, lv, ticket, iclog, &log_offset,
+ &len, &record_cnt, &data_cnt);
+ if (!lv)
+ break;
- if (++index == lv->lv_niovecs) {
-next_lv:
- lv = lv->lv_next;
- index = 0;
- if (lv)
- vecp = lv->lv_iovecp;
- }
- if (record_cnt == 0 && !ordered) {
- if (!lv)
- return 0;
- break;
- }
+ lv = xlog_write_partial(log, lv_chain, lv, ticket, &iclog,
+ &log_offset, &len, &record_cnt,
+ &data_cnt);
+ if (IS_ERR_OR_NULL(lv)) {
+ error = PTR_ERR_OR_ZERO(lv);
+ break;
}
}
+ ASSERT((len == 0 && !lv) || error);
- ASSERT(len == 0);
-
+ /*
+ * We've already been guaranteed that the last writes will fit inside
+ * the current iclog, and hence it will already have the space used by
+ * those writes accounted to it. Hence we do not need to update the
+ * iclog with the number of bytes written here.
+ */
spin_lock(&log->l_icloglock);
- xlog_state_finish_copy(log, iclog, record_cnt, data_cnt);
- if (commit_iclog) {
- ASSERT(flags & XLOG_COMMIT_TRANS);
+ xlog_state_finish_copy(log, iclog, record_cnt, 0);
+ if (commit_iclog)
*commit_iclog = iclog;
- } else {
- error = xlog_state_release_iclog(log, iclog);
- }
+ else
+ error = xlog_state_release_iclog(log, iclog, ticket);
spin_unlock(&log->l_icloglock);
return error;
@@ -2946,7 +2872,6 @@ xlog_state_get_iclog_space(
int len,
struct xlog_in_core **iclogp,
struct xlog_ticket *ticket,
- int *continued_write,
int *logoffsetp)
{
int log_offset;
@@ -2981,9 +2906,6 @@ restart:
*/
if (log_offset == 0) {
ticket->t_curr_res -= log->l_iclog_hsize;
- xlog_tic_add_region(ticket,
- log->l_iclog_hsize,
- XLOG_REG_TYPE_LRHEADER);
head->h_cycle = cpu_to_be32(log->l_curr_cycle);
head->h_lsn = cpu_to_be64(
xlog_assign_lsn(log->l_curr_cycle, log->l_curr_block));
@@ -3012,7 +2934,7 @@ restart:
* reference to the iclog.
*/
if (!atomic_add_unless(&iclog->ic_refcnt, -1, 1))
- error = xlog_state_release_iclog(log, iclog);
+ error = xlog_state_release_iclog(log, iclog, ticket);
spin_unlock(&log->l_icloglock);
if (error)
return error;
@@ -3025,13 +2947,10 @@ restart:
* iclogs (to mark it taken), this particular iclog will release/sync
* to disk in xlog_write().
*/
- if (len <= iclog->ic_size - iclog->ic_offset) {
- *continued_write = 0;
+ if (len <= iclog->ic_size - iclog->ic_offset)
iclog->ic_offset += len;
- } else {
- *continued_write = 1;
+ else
xlog_state_switch_iclogs(log, iclog, iclog->ic_size);
- }
*iclogp = iclog;
ASSERT(iclog->ic_offset <= iclog->ic_size);
@@ -3063,7 +2982,6 @@ xfs_log_ticket_regrant(
xlog_grant_sub_space(log, &log->l_write_head.grant,
ticket->t_curr_res);
ticket->t_curr_res = ticket->t_unit_res;
- xlog_tic_reset_res(ticket);
trace_xfs_log_ticket_regrant_sub(log, ticket);
@@ -3074,7 +2992,6 @@ xfs_log_ticket_regrant(
trace_xfs_log_ticket_regrant_exit(log, ticket);
ticket->t_curr_res = ticket->t_unit_res;
- xlog_tic_reset_res(ticket);
}
xfs_log_ticket_put(ticket);
@@ -3131,7 +3048,7 @@ xfs_log_ticket_ungrant(
* This routine will mark the current iclog in the ring as WANT_SYNC and move
* the current iclog pointer to the next iclog in the ring.
*/
-STATIC void
+void
xlog_state_switch_iclogs(
struct xlog *log,
struct xlog_in_core *iclog,
@@ -3151,10 +3068,9 @@ xlog_state_switch_iclogs(
log->l_curr_block += BTOBB(eventual_size)+BTOBB(log->l_iclog_hsize);
/* Round up to next log-sunit */
- if (xfs_sb_version_haslogv2(&log->l_mp->m_sb) &&
- log->l_mp->m_sb.sb_logsunit > 1) {
- uint32_t sunit_bb = BTOBB(log->l_mp->m_sb.sb_logsunit);
- log->l_curr_block = roundup(log->l_curr_block, sunit_bb);
+ if (log->l_iclog_roundoff > BBSIZE) {
+ log->l_curr_block = roundup(log->l_curr_block,
+ BTOBB(log->l_iclog_roundoff));
}
if (log->l_curr_block >= log->l_logBBsize) {
@@ -3246,7 +3162,7 @@ xfs_log_force(
atomic_inc(&iclog->ic_refcnt);
lsn = be64_to_cpu(iclog->ic_header.h_lsn);
xlog_state_switch_iclogs(log, iclog, 0);
- if (xlog_state_release_iclog(log, iclog))
+ if (xlog_state_release_iclog(log, iclog, NULL))
goto out_error;
if (be64_to_cpu(iclog->ic_header.h_lsn) != lsn)
@@ -3279,15 +3195,28 @@ out_error:
return -EIO;
}
+/*
+ * Force the log to a specific LSN.
+ *
+ * If an iclog with that lsn can be found:
+ * If it is in the DIRTY state, just return.
+ * If it is in the ACTIVE state, move the in-core log into the WANT_SYNC
+ * state and go to sleep or return.
+ * If it is in any other state, go to sleep or return.
+ *
+ * Synchronous forces are implemented with a wait queue. All callers trying
+ * to force a given lsn to disk must wait on the queue attached to the
+ * specific in-core log. When given in-core log finally completes its write
+ * to disk, that thread will wake up all threads waiting on the queue.
+ */
static int
-__xfs_log_force_lsn(
- struct xfs_mount *mp,
+xlog_force_lsn(
+ struct xlog *log,
xfs_lsn_t lsn,
uint flags,
int *log_flushed,
bool already_slept)
{
- struct xlog *log = mp->m_log;
struct xlog_in_core *iclog;
spin_lock(&log->l_icloglock);
@@ -3320,15 +3249,13 @@ __xfs_log_force_lsn(
if (!already_slept &&
(iclog->ic_prev->ic_state == XLOG_STATE_WANT_SYNC ||
iclog->ic_prev->ic_state == XLOG_STATE_SYNCING)) {
- XFS_STATS_INC(mp, xs_log_force_sleep);
-
xlog_wait(&iclog->ic_prev->ic_write_wait,
&log->l_icloglock);
return -EAGAIN;
}
atomic_inc(&iclog->ic_refcnt);
xlog_state_switch_iclogs(log, iclog, 0);
- if (xlog_state_release_iclog(log, iclog))
+ if (xlog_state_release_iclog(log, iclog, NULL))
goto out_error;
if (log_flushed)
*log_flushed = 1;
@@ -3345,39 +3272,38 @@ out_error:
}
/*
- * Force the in-core log to disk for a specific LSN.
- *
- * Find in-core log with lsn.
- * If it is in the DIRTY state, just return.
- * If it is in the ACTIVE state, move the in-core log into the WANT_SYNC
- * state and go to sleep or return.
- * If it is in any other state, go to sleep or return.
+ * Force the log to a specific checkpoint sequence.
*
- * Synchronous forces are implemented with a wait queue. All callers trying
- * to force a given lsn to disk must wait on the queue attached to the
- * specific in-core log. When given in-core log finally completes its write
- * to disk, that thread will wake up all threads waiting on the queue.
+ * First force the CIL so that all the required changes have been flushed to the
+ * iclogs. If the CIL force completed it will return a commit LSN that indicates
+ * the iclog that needs to be flushed to stable storage. If the caller needs
+ * a synchronous log force, we will wait on the iclog with the LSN returned by
+ * xlog_cil_force_seq() to be completed.
*/
int
-xfs_log_force_lsn(
+xfs_log_force_seq(
struct xfs_mount *mp,
- xfs_lsn_t lsn,
+ xfs_csn_t seq,
uint flags,
int *log_flushed)
{
+ struct xlog *log = mp->m_log;
+ xfs_lsn_t lsn;
int ret;
- ASSERT(lsn != 0);
+ ASSERT(seq != 0);
XFS_STATS_INC(mp, xs_log_force);
- trace_xfs_log_force(mp, lsn, _RET_IP_);
+ trace_xfs_log_force(mp, seq, _RET_IP_);
- lsn = xlog_cil_force_lsn(mp->m_log, lsn);
+ lsn = xlog_cil_force_seq(log, seq);
if (lsn == NULLCOMMITLSN)
return 0;
- ret = __xfs_log_force_lsn(mp, lsn, flags, log_flushed, false);
- if (ret == -EAGAIN)
- ret = __xfs_log_force_lsn(mp, lsn, flags, log_flushed, true);
+ ret = xlog_force_lsn(log, lsn, flags, log_flushed, false);
+ if (ret == -EAGAIN) {
+ XFS_STATS_INC(mp, xs_log_force_sleep);
+ ret = xlog_force_lsn(log, lsn, flags, log_flushed, true);
+ }
return ret;
}
@@ -3406,12 +3332,12 @@ xfs_log_ticket_get(
* Figure out the total log space unit (in bytes) that would be
* required for a log ticket.
*/
-int
-xfs_log_calc_unit_res(
- struct xfs_mount *mp,
- int unit_bytes)
+static int
+xlog_calc_unit_res(
+ struct xlog *log,
+ int unit_bytes,
+ int *niclogs)
{
- struct xlog *log = mp->m_log;
int iclog_space;
uint num_headers;
@@ -3487,18 +3413,22 @@ xfs_log_calc_unit_res(
/* for commit-rec LR header - note: padding will subsume the ophdr */
unit_bytes += log->l_iclog_hsize;
- /* for roundoff padding for transaction data and one for commit record */
- if (xfs_sb_version_haslogv2(&mp->m_sb) && mp->m_sb.sb_logsunit > 1) {
- /* log su roundoff */
- unit_bytes += 2 * mp->m_sb.sb_logsunit;
- } else {
- /* BB roundoff */
- unit_bytes += 2 * BBSIZE;
- }
+ /* roundoff padding for transaction data and one for commit record */
+ unit_bytes += 2 * log->l_iclog_roundoff;
+ if (niclogs)
+ *niclogs = num_headers;
return unit_bytes;
}
+int
+xfs_log_calc_unit_res(
+ struct xfs_mount *mp,
+ int unit_bytes)
+{
+ return xlog_calc_unit_res(mp->m_log, unit_bytes, NULL);
+}
+
/*
* Allocate and initialise a new log ticket.
*/
@@ -3507,7 +3437,6 @@ xlog_ticket_alloc(
struct xlog *log,
int unit_bytes,
int cnt,
- char client,
bool permanent)
{
struct xlog_ticket *tic;
@@ -3515,7 +3444,7 @@ xlog_ticket_alloc(
tic = kmem_cache_zalloc(xfs_log_ticket_zone, GFP_NOFS | __GFP_NOFAIL);
- unit_res = xfs_log_calc_unit_res(log->l_mp, unit_bytes);
+ unit_res = xlog_calc_unit_res(log, unit_bytes, &tic->t_iclog_hdrs);
atomic_set(&tic->t_ref, 1);
tic->t_task = current;
@@ -3525,12 +3454,9 @@ xlog_ticket_alloc(
tic->t_cnt = cnt;
tic->t_ocnt = cnt;
tic->t_tid = prandom_u32();
- tic->t_clientid = client;
if (permanent)
tic->t_flags |= XLOG_TIC_PERM_RESERV;
- xlog_tic_reset_res(tic);
-
return tic;
}
@@ -3698,11 +3624,12 @@ xlog_verify_iclog(
iclog->ic_header.h_cycle_data[idx]);
}
}
- if (clientid != XFS_TRANSACTION && clientid != XFS_LOG)
+ if (clientid != XFS_TRANSACTION && clientid != XFS_LOG) {
xfs_warn(log->l_mp,
- "%s: invalid clientid %d op "PTR_FMT" offset 0x%lx",
- __func__, clientid, ophead,
+ "%s: op %d invalid clientid %d op "PTR_FMT" offset 0x%lx",
+ __func__, i, clientid, ophead,
(unsigned long)field_offset);
+ }
/* check length */
p = &ophead->oh_len;
diff --git a/fs/xfs/xfs_log.h b/fs/xfs/xfs_log.h
index 044e02cb8921..93aaee7c276e 100644
--- a/fs/xfs/xfs_log.h
+++ b/fs/xfs/xfs_log.h
@@ -9,7 +9,8 @@
struct xfs_cil_ctx;
struct xfs_log_vec {
- struct xfs_log_vec *lv_next; /* next lv in build list */
+ struct list_head lv_list; /* CIL lv chain ptrs */
+ uint32_t lv_order_id; /* chain ordering info */
int lv_niovecs; /* number of iovecs in lv */
struct xfs_log_iovec *lv_iovecp; /* iovec array */
struct xfs_log_item *lv_item; /* owner */
@@ -21,42 +22,19 @@ struct xfs_log_vec {
#define XFS_LOG_VEC_ORDERED (-1)
-static inline void *
-xlog_prepare_iovec(struct xfs_log_vec *lv, struct xfs_log_iovec **vecp,
- uint type)
-{
- struct xfs_log_iovec *vec = *vecp;
-
- if (vec) {
- ASSERT(vec - lv->lv_iovecp < lv->lv_niovecs);
- vec++;
- } else {
- vec = &lv->lv_iovecp[0];
- }
-
- vec->i_type = type;
- vec->i_addr = lv->lv_buf + lv->lv_buf_len;
-
- ASSERT(IS_ALIGNED((unsigned long)vec->i_addr, sizeof(uint64_t)));
-
- *vecp = vec;
- return vec->i_addr;
-}
+void *xlog_prepare_iovec(struct xfs_log_vec *lv, struct xfs_log_iovec **vecp,
+ uint type);
-/*
- * We need to make sure the next buffer is naturally aligned for the biggest
- * basic data type we put into it. We already accounted for this padding when
- * sizing the buffer.
- *
- * However, this padding does not get written into the log, and hence we have to
- * track the space used by the log vectors separately to prevent log space hangs
- * due to inaccurate accounting (i.e. a leak) of the used log space through the
- * CIL context ticket.
- */
static inline void
xlog_finish_iovec(struct xfs_log_vec *lv, struct xfs_log_iovec *vec, int len)
{
- lv->lv_buf_len += round_up(len, sizeof(uint64_t));
+ struct xlog_op_header *oph = vec->i_addr;
+
+ /* opheader tracks payload length, logvec tracks region length */
+ oph->oh_len = len;
+
+ len += sizeof(struct xlog_op_header);
+ lv->lv_buf_len += len;
lv->lv_bytes += len;
vec->i_len = len;
}
@@ -104,9 +82,10 @@ struct xlog_ticket;
struct xfs_log_item;
struct xfs_item_ops;
struct xfs_trans;
+struct xlog;
int xfs_log_force(struct xfs_mount *mp, uint flags);
-int xfs_log_force_lsn(struct xfs_mount *mp, xfs_lsn_t lsn, uint flags,
+int xfs_log_force_seq(struct xfs_mount *mp, xfs_csn_t seq, uint flags,
int *log_forced);
int xfs_log_mount(struct xfs_mount *mp,
struct xfs_buftarg *log_target,
@@ -116,24 +95,17 @@ int xfs_log_mount_finish(struct xfs_mount *mp);
void xfs_log_mount_cancel(struct xfs_mount *);
xfs_lsn_t xlog_assign_tail_lsn(struct xfs_mount *mp);
xfs_lsn_t xlog_assign_tail_lsn_locked(struct xfs_mount *mp);
-void xfs_log_space_wake(struct xfs_mount *mp);
-void xfs_log_release_iclog(struct xlog_in_core *iclog);
-int xfs_log_reserve(struct xfs_mount *mp,
- int length,
- int count,
- struct xlog_ticket **ticket,
- uint8_t clientid,
- bool permanent);
-int xfs_log_regrant(struct xfs_mount *mp, struct xlog_ticket *tic);
-void xfs_log_unmount(struct xfs_mount *mp);
-int xfs_log_force_umount(struct xfs_mount *mp, int logerror);
+void xfs_log_space_wake(struct xfs_mount *mp);
+int xfs_log_reserve(struct xfs_mount *mp, int length, int count,
+ struct xlog_ticket **ticket, bool permanent);
+int xfs_log_regrant(struct xfs_mount *mp, struct xlog_ticket *tic);
+void xfs_log_unmount(struct xfs_mount *mp);
+int xfs_log_force_umount(struct xfs_mount *mp, int logerror);
bool xfs_log_writable(struct xfs_mount *mp);
struct xlog_ticket *xfs_log_ticket_get(struct xlog_ticket *ticket);
void xfs_log_ticket_put(struct xlog_ticket *ticket);
-void xfs_log_commit_cil(struct xfs_mount *mp, struct xfs_trans *tp,
- xfs_lsn_t *commit_lsn, bool regrant);
void xlog_cil_process_committed(struct list_head *list);
bool xfs_log_item_in_current_chkpt(struct xfs_log_item *lip);
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c
index b0ef071b3cb5..705619e9dab4 100644
--- a/fs/xfs/xfs_log_cil.c
+++ b/fs/xfs/xfs_log_cil.c
@@ -37,16 +37,94 @@ xlog_cil_ticket_alloc(
{
struct xlog_ticket *tic;
- tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0);
+ tic = xlog_ticket_alloc(log, 0, 1, 0);
/*
* set the current reservation to zero so we know to steal the basic
* transaction overhead reservation from the first transaction commit.
*/
tic->t_curr_res = 0;
+ tic->t_iclog_hdrs = 0;
return tic;
}
+static inline void
+xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil)
+{
+ struct xlog *log = cil->xc_log;
+
+ atomic_set(&cil->xc_iclog_hdrs,
+ (XLOG_CIL_BLOCKING_SPACE_LIMIT(log) /
+ (log->l_iclog_size - log->l_iclog_hsize)));
+}
+
+/*
+ * Unavoidable forward declaration - xlog_cil_push_work() calls
+ * xlog_cil_ctx_alloc() itself.
+ */
+static void xlog_cil_push_work(struct work_struct *work);
+
+static struct xfs_cil_ctx *
+xlog_cil_ctx_alloc(void)
+{
+ struct xfs_cil_ctx *ctx;
+
+ ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS);
+ INIT_LIST_HEAD(&ctx->committing);
+ INIT_LIST_HEAD(&ctx->busy_extents);
+ INIT_LIST_HEAD(&ctx->log_items);
+ INIT_LIST_HEAD(&ctx->lv_chain);
+ INIT_WORK(&ctx->push_work, xlog_cil_push_work);
+ return ctx;
+}
+
+/*
+ * Aggregate the CIL per cpu structures into global counts, lists, etc and
+ * clear the percpu state ready for the next context to use.
+ */
+static void
+xlog_cil_pcp_aggregate(
+ struct xfs_cil *cil,
+ struct xfs_cil_ctx *ctx)
+{
+ struct xlog_cil_pcp *cilpcp;
+ int cpu;
+
+ for_each_online_cpu(cpu) {
+ cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+
+ ctx->ticket->t_curr_res += cilpcp->space_reserved;
+ ctx->ticket->t_unit_res += cilpcp->space_reserved;
+ cilpcp->space_reserved = 0;
+
+ if (!list_empty(&cilpcp->busy_extents)) {
+ list_splice_init(&cilpcp->busy_extents,
+ &ctx->busy_extents);
+ }
+ if (!list_empty(&cilpcp->log_items))
+ list_splice_init(&cilpcp->log_items, &ctx->log_items);
+
+ /*
+ * We're in the middle of switching cil contexts. Reset the
+ * counter we use to detect when the current context is nearing
+ * full.
+ */
+ cilpcp->space_used = 0;
+ }
+}
+
+static void
+xlog_cil_ctx_switch(
+ struct xfs_cil *cil,
+ struct xfs_cil_ctx *ctx)
+{
+ xlog_cil_set_iclog_hdr_count(cil);
+ set_bit(XLOG_CIL_EMPTY, &cil->xc_flags);
+ ctx->sequence = ++cil->xc_current_sequence;
+ ctx->cil = cil;
+ cil->xc_ctx = ctx;
+}
+
/*
* After the first stage of log recovery is done, we know where the head and
* tail of the log are. We need this log initialisation done before we can
@@ -63,6 +141,7 @@ xlog_cil_init_post_recovery(
{
log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
log->l_cilp->xc_ctx->sequence = 1;
+ xlog_cil_set_iclog_hdr_count(log->l_cilp);
}
static inline int
@@ -153,13 +232,20 @@ xlog_cil_alloc_shadow_bufs(
}
/*
- * We 64-bit align the length of each iovec so that the start
- * of the next one is naturally aligned. We'll need to
- * account for that slack space here. Then round nbytes up
- * to 64-bit alignment so that the initial buffer alignment is
- * easy to calculate and verify.
+ * We 64-bit align the length of each iovec so that the start of
+ * the next one is naturally aligned. We'll need to account for
+ * that slack space here.
+ *
+ * We also add the xlog_op_header to each region when
+ * formatting, but that's not accounted to the size of the item
+ * at this point. Hence we'll need an addition number of bytes
+ * for each vector to hold an opheader.
+ *
+ * Then round nbytes up to 64-bit alignment so that the initial
+ * buffer alignment is easy to calculate and verify.
*/
- nbytes += niovecs * sizeof(uint64_t);
+ nbytes += niovecs *
+ (sizeof(uint64_t) + sizeof(struct xlog_op_header));
nbytes = round_up(nbytes, sizeof(uint64_t));
/*
@@ -188,6 +274,7 @@ xlog_cil_alloc_shadow_bufs(
lv = kmem_alloc_large(buf_size, KM_NOFS);
memset(lv, 0, xlog_cil_iovec_space(niovecs));
+ INIT_LIST_HEAD(&lv->lv_list);
lv->lv_item = lip;
lv->lv_size = buf_size;
if (ordered)
@@ -203,7 +290,6 @@ xlog_cil_alloc_shadow_bufs(
else
lv->lv_buf_len = 0;
lv->lv_bytes = 0;
- lv->lv_next = NULL;
}
/* Ensure the lv is set up according to ->iop_size */
@@ -217,22 +303,18 @@ xlog_cil_alloc_shadow_bufs(
/*
* Prepare the log item for insertion into the CIL. Calculate the difference in
- * log space and vectors it will consume, and if it is a new item pin it as
- * well.
+ * log space it will consume, and if it is a new item pin it as well.
*/
STATIC void
xfs_cil_prepare_item(
struct xlog *log,
struct xfs_log_vec *lv,
struct xfs_log_vec *old_lv,
- int *diff_len,
- int *diff_iovecs)
+ int *diff_len)
{
/* Account for the new LV being passed in */
- if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) {
+ if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
*diff_len += lv->lv_bytes;
- *diff_iovecs += lv->lv_niovecs;
- }
/*
* If there is no old LV, this is the first time we've seen the item in
@@ -249,7 +331,6 @@ xfs_cil_prepare_item(
ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED);
*diff_len -= old_lv->lv_bytes;
- *diff_iovecs -= old_lv->lv_niovecs;
lv->lv_item->li_lv_shadow = old_lv;
}
@@ -298,12 +379,10 @@ static void
xlog_cil_insert_format_items(
struct xlog *log,
struct xfs_trans *tp,
- int *diff_len,
- int *diff_iovecs)
+ int *diff_len)
{
struct xfs_log_item *lip;
-
/* Bail out if we didn't find a log item. */
if (list_empty(&tp->t_items)) {
ASSERT(0);
@@ -337,7 +416,6 @@ xlog_cil_insert_format_items(
if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) {
/* same or smaller, optimise common overwrite case */
lv = lip->li_lv;
- lv->lv_next = NULL;
if (ordered)
goto insert;
@@ -346,7 +424,6 @@ xlog_cil_insert_format_items(
* set the item up as though it is a new insertion so
* that the space reservation accounting is correct.
*/
- *diff_iovecs -= lv->lv_niovecs;
*diff_len -= lv->lv_bytes;
/* Ensure the lv is set up according to ->iop_size */
@@ -371,7 +448,7 @@ xlog_cil_insert_format_items(
ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t)));
lip->li_ops->iop_format(lip, lv);
insert:
- xfs_cil_prepare_item(log, lv, old_lv, diff_len, diff_iovecs);
+ xfs_cil_prepare_item(log, lv, old_lv, diff_len);
}
}
@@ -391,9 +468,10 @@ xlog_cil_insert_items(
struct xfs_cil_ctx *ctx = cil->xc_ctx;
struct xfs_log_item *lip;
int len = 0;
- int diff_iovecs = 0;
- int iclog_space;
int iovhdr_res = 0, split_res = 0, ctx_res = 0;
+ int space_used;
+ int order;
+ struct xlog_cil_pcp *cilpcp;
ASSERT(tp);
@@ -401,51 +479,92 @@ xlog_cil_insert_items(
* We can do this safely because the context can't checkpoint until we
* are done so it doesn't matter exactly how we update the CIL.
*/
- xlog_cil_insert_format_items(log, tp, &len, &diff_iovecs);
+ xlog_cil_insert_format_items(log, tp, &len);
- spin_lock(&cil->xc_cil_lock);
+ /*
+ * We need to take the CIL checkpoint unit reservation on the first
+ * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't
+ * unnecessarily do an atomic op in the fast path here. We can clear the
+ * XLOG_CIL_EMPTY bit as we are under the xc_ctx_lock here and that
+ * needs to be held exclusively to reset the XLOG_CIL_EMPTY bit.
+ */
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) &&
+ test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
+ ctx_res = ctx->ticket->t_unit_res;
- /* account for space used by new iovec headers */
- iovhdr_res = diff_iovecs * sizeof(xlog_op_header_t);
- len += iovhdr_res;
- ctx->nvecs += diff_iovecs;
+ /*
+ * Check if we need to steal iclog headers. atomic_read() is not a
+ * locked atomic operation, so we can check the value before we do any
+ * real atomic ops in the fast path. If we've already taken the CIL unit
+ * reservation from this commit, we've already got one iclog header
+ * space reserved so we have to account for that otherwise we risk
+ * overrunning the reservation on this ticket.
+ *
+ * If the CIL is already at the hard limit, we might need more header
+ * space that originally reserved. So steal more header space from every
+ * commit that occurs once we are over the hard limit to ensure the CIL
+ * push won't run out of reservation space.
+ *
+ * This can steal more than we need, but that's OK.
+ */
+ space_used = atomic_read(&ctx->space_used);
+ if (atomic_read(&cil->xc_iclog_hdrs) > 0 ||
+ space_used + len >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) {
+ int split_res = log->l_iclog_hsize +
+ sizeof(struct xlog_op_header);
+ if (ctx_res)
+ ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1);
+ else
+ ctx_res = split_res * tp->t_ticket->t_iclog_hdrs;
+ atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs);
+ }
+ /*
+ * Update the CIL percpu pointer. This updates the global counter when
+ * over the percpu batch size or when the CIL is over the space limit.
+ * This means low lock overhead for normal updates, and when over the
+ * limit the space used is immediately accounted. This makes enforcing
+ * the hard limit much more accurate. The per cpu fold threshold is
+ * based on how close we are to the hard limit.
+ */
+ cilpcp = get_cpu_ptr(cil->xc_pcp);
+ cilpcp->space_reserved += ctx_res;
+ cilpcp->space_used += len;
+ if (space_used >= XLOG_CIL_SPACE_LIMIT(log) ||
+ cilpcp->space_used >
+ ((XLOG_CIL_BLOCKING_SPACE_LIMIT(log) - space_used) /
+ num_online_cpus())) {
+ atomic_add(cilpcp->space_used, &ctx->space_used);
+ cilpcp->space_used = 0;
+ }
/* attach the transaction to the CIL if it has any busy extents */
if (!list_empty(&tp->t_busy))
- list_splice_init(&tp->t_busy, &ctx->busy_extents);
-
+ list_splice_init(&tp->t_busy, &cilpcp->busy_extents);
/*
- * Now transfer enough transaction reservation to the context ticket
- * for the checkpoint. The context ticket is special - the unit
- * reservation has to grow as well as the current reservation as we
- * steal from tickets so we can correctly determine the space used
- * during the transaction commit.
+ * Now update the order of everything modified in the transaction
+ * and insert items into the CIL if they aren't already there.
+ * We do this here so we only need to take the CIL lock once during
+ * the transaction commit.
*/
- if (ctx->ticket->t_curr_res == 0) {
- ctx_res = ctx->ticket->t_unit_res;
- ctx->ticket->t_curr_res = ctx_res;
- tp->t_ticket->t_curr_res -= ctx_res;
- }
+ order = atomic_inc_return(&ctx->order_id);
+ list_for_each_entry(lip, &tp->t_items, li_trans) {
- /* do we need space for more log record headers? */
- iclog_space = log->l_iclog_size - log->l_iclog_hsize;
- if (len > 0 && (ctx->space_used / iclog_space !=
- (ctx->space_used + len) / iclog_space)) {
- split_res = (len + iclog_space - 1) / iclog_space;
- /* need to take into account split region headers, too */
- split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
- ctx->ticket->t_unit_res += split_res;
- ctx->ticket->t_curr_res += split_res;
- tp->t_ticket->t_curr_res -= split_res;
- ASSERT(tp->t_ticket->t_curr_res >= len);
+ /* Skip items which aren't dirty in this transaction. */
+ if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
+ continue;
+
+ lip->li_order_id = order;
+ if (!list_empty(&lip->li_cil))
+ continue;
+ list_add_tail(&lip->li_cil, &cilpcp->log_items);
}
- tp->t_ticket->t_curr_res -= len;
- ctx->space_used += len;
+ put_cpu_ptr(cilpcp);
/*
* If we've overrun the reservation, dump the tx details before we move
* the log items. Shutdown is imminent...
*/
+ tp->t_ticket->t_curr_res -= ctx_res + len;
if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
xfs_warn(log->l_mp, "Transaction log reservation overrun:");
xfs_warn(log->l_mp,
@@ -457,42 +576,20 @@ xlog_cil_insert_items(
xlog_print_trans(tp);
}
- /*
- * Now (re-)position everything modified at the tail of the CIL.
- * We do this here so we only need to take the CIL lock once during
- * the transaction commit.
- */
- list_for_each_entry(lip, &tp->t_items, li_trans) {
-
- /* Skip items which aren't dirty in this transaction. */
- if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
- continue;
-
- /*
- * Only move the item if it isn't already at the tail. This is
- * to prevent a transient list_empty() state when reinserting
- * an item that is already the only item in the CIL.
- */
- if (!list_is_last(&lip->li_cil, &cil->xc_cil))
- list_move_tail(&lip->li_cil, &cil->xc_cil);
- }
-
- spin_unlock(&cil->xc_cil_lock);
-
if (tp->t_ticket->t_curr_res < 0)
xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR);
}
static void
xlog_cil_free_logvec(
- struct xfs_log_vec *log_vector)
+ struct list_head *lv_chain)
{
struct xfs_log_vec *lv;
- for (lv = log_vector; lv; ) {
- struct xfs_log_vec *next = lv->lv_next;
+ while (!list_empty(lv_chain)) {
+ lv = list_first_entry(lv_chain, struct xfs_log_vec, lv_list);
+ list_del_init(&lv->lv_list);
kmem_free(lv);
- lv = next;
}
}
@@ -591,7 +688,7 @@ xlog_cil_committed(
spin_unlock(&ctx->cil->xc_push_lock);
}
- xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain,
+ xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, &ctx->lv_chain,
ctx->start_lsn, abort);
xfs_extent_busy_sort(&ctx->busy_extents);
@@ -602,7 +699,7 @@ xlog_cil_committed(
list_del(&ctx->committing);
spin_unlock(&ctx->cil->xc_push_lock);
- xlog_cil_free_logvec(ctx->lv_chain);
+ xlog_cil_free_logvec(&ctx->lv_chain);
if (!list_empty(&ctx->busy_extents))
xlog_discard_busy_extents(mp, ctx);
@@ -623,6 +720,89 @@ xlog_cil_process_committed(
}
}
+struct xlog_cil_trans_hdr {
+ struct xlog_op_header oph[2];
+ struct xfs_trans_header thdr;
+ struct xfs_log_iovec lhdr[2];
+};
+
+/*
+ * Build a checkpoint transaction header to begin the journal transaction. We
+ * need to account for the space used by the transaction header here as it is
+ * not accounted for in xlog_write().
+ *
+ * This is the only place we write a transaction header, so we also build the
+ * log opheaders that indicate the start of a log transaction and wrap the
+ * transaction header. We keep the start record in it's own log vector rather
+ * than compacting them into a single region as this ends up making the logic
+ * in xlog_write() for handling empty opheaders for start, commit and unmount
+ * records much simpler.
+ */
+static void
+xlog_cil_build_trans_hdr(
+ struct xfs_cil_ctx *ctx,
+ struct xlog_cil_trans_hdr *hdr,
+ struct xfs_log_vec *lvhdr,
+ int num_iovecs)
+{
+ struct xlog_ticket *tic = ctx->ticket;
+ uint32_t tid = cpu_to_be32(tic->t_tid);
+
+ memset(hdr, 0, sizeof(*hdr));
+
+ /* Log start record */
+ hdr->oph[0].oh_tid = tid;
+ hdr->oph[0].oh_clientid = XFS_TRANSACTION;
+ hdr->oph[0].oh_flags = XLOG_START_TRANS;
+
+ /* log iovec region pointer */
+ hdr->lhdr[0].i_addr = &hdr->oph[0];
+ hdr->lhdr[0].i_len = sizeof(struct xlog_op_header);
+ hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER;
+
+ /* log opheader */
+ hdr->oph[1].oh_tid = tid;
+ hdr->oph[1].oh_clientid = XFS_TRANSACTION;
+
+ /* transaction header */
+ hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
+ hdr->thdr.th_type = XFS_TRANS_CHECKPOINT;
+ hdr->thdr.th_tid = tid;
+ hdr->thdr.th_num_items = num_iovecs;
+
+ /* log iovec region pointer */
+ hdr->lhdr[1].i_addr = &hdr->oph[1];
+ hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) +
+ sizeof(struct xfs_trans_header);
+ hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR;
+
+ lvhdr->lv_niovecs = 2;
+ lvhdr->lv_iovecp = &hdr->lhdr[0];
+ lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len;
+
+ tic->t_curr_res -= lvhdr->lv_bytes;
+}
+
+/*
+ * CIL item reordering compare function. We want to order in ascending ID order,
+ * but we want to leave items with the same ID in the order they were added to
+ * the list. This is important for operations like reflink where we log 4 order
+ * dependent intents in a single transaction when we overwrite an existing
+ * shared extent with a new shared extent. i.e. BUI(unmap), CUI(drop),
+ * CUI (inc), BUI(remap)...
+ */
+static int
+xlog_cil_order_cmp(
+ void *priv,
+ const struct list_head *a,
+ const struct list_head *b)
+{
+ struct xfs_log_vec *l1 = container_of(a, struct xfs_log_vec, lv_list);
+ struct xfs_log_vec *l2 = container_of(b, struct xfs_log_vec, lv_list);
+
+ return l1->lv_order_id > l2->lv_order_id;
+}
+
/*
* Push the Committed Item List to the log.
*
@@ -641,36 +821,45 @@ static void
xlog_cil_push_work(
struct work_struct *work)
{
- struct xfs_cil *cil =
- container_of(work, struct xfs_cil, xc_push_work);
+ struct xfs_cil_ctx *ctx =
+ container_of(work, struct xfs_cil_ctx, push_work);
+ struct xfs_cil *cil = ctx->cil;
struct xlog *log = cil->xc_log;
struct xfs_log_vec *lv;
- struct xfs_cil_ctx *ctx;
struct xfs_cil_ctx *new_ctx;
struct xlog_in_core *commit_iclog;
- struct xlog_ticket *tic;
- int num_iovecs;
+ int num_iovecs = 0;
+ int num_bytes = 0;
int error = 0;
- struct xfs_trans_header thdr;
- struct xfs_log_iovec lhdr;
- struct xfs_log_vec lvhdr = { NULL };
+ struct xlog_cil_trans_hdr thdr;
+ struct xfs_log_vec lvhdr = {};
xfs_lsn_t commit_lsn;
xfs_lsn_t push_seq;
+ struct bio bio;
+ DECLARE_COMPLETION_ONSTACK(bdev_flush);
+ bool push_commit_stable;
+ struct xlog_ticket *ticket;
- new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_NOFS);
+ new_ctx = xlog_cil_ctx_alloc();
new_ctx->ticket = xlog_cil_ticket_alloc(log);
down_write(&cil->xc_ctx_lock);
- ctx = cil->xc_ctx;
spin_lock(&cil->xc_push_lock);
push_seq = cil->xc_push_seq;
ASSERT(push_seq <= ctx->sequence);
+ push_commit_stable = cil->xc_push_commit_stable;
+ cil->xc_push_commit_stable = false;
/*
- * Wake up any background push waiters now this context is being pushed.
+ * As we are about to switch to a new, empty CIL context, we no longer
+ * need to throttle tasks on CIL space overruns. Wake any waiters that
+ * the hard push throttle may have caught so they can start committing
+ * to the new context. The ctx->xc_push_lock provides the serialisation
+ * necessary for safely using the lockless waitqueue_active() check in
+ * this context.
*/
- if (ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log))
+ if (waitqueue_active(&cil->xc_push_wait))
wake_up_all(&cil->xc_push_wait);
/*
@@ -678,7 +867,7 @@ xlog_cil_push_work(
* move on to a new sequence number and so we have to be able to push
* this sequence again later.
*/
- if (list_empty(&cil->xc_cil)) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
cil->xc_push_seq = 0;
spin_unlock(&cil->xc_push_lock);
goto out_skip;
@@ -686,7 +875,7 @@ xlog_cil_push_work(
/* check for a previously pushed sequence */
- if (push_seq < cil->xc_ctx->sequence) {
+ if (push_seq < ctx->sequence) {
spin_unlock(&cil->xc_push_lock);
goto out_skip;
}
@@ -719,42 +908,36 @@ xlog_cil_push_work(
spin_unlock(&cil->xc_push_lock);
/*
- * pull all the log vectors off the items in the CIL, and
- * remove the items from the CIL. We don't need the CIL lock
- * here because it's only needed on the transaction commit
- * side which is currently locked out by the flush lock.
+ * The CIL is stable at this point - nothing new will be added to it
+ * because we hold the flush lock exclusively. Hence we can now issue
+ * a cache flush to ensure all the completed metadata in the journal we
+ * are about to overwrite is on stable storage.
*/
- lv = NULL;
- num_iovecs = 0;
- while (!list_empty(&cil->xc_cil)) {
+ xfs_flush_bdev_async(&bio, log->l_mp->m_ddev_targp->bt_bdev,
+ &bdev_flush);
+
+ xlog_cil_pcp_aggregate(cil, ctx);
+
+ while (!list_empty(&ctx->log_items)) {
struct xfs_log_item *item;
- item = list_first_entry(&cil->xc_cil,
+ item = list_first_entry(&ctx->log_items,
struct xfs_log_item, li_cil);
- list_del_init(&item->li_cil);
- if (!ctx->lv_chain)
- ctx->lv_chain = item->li_lv;
- else
- lv->lv_next = item->li_lv;
lv = item->li_lv;
- item->li_lv = NULL;
+ lv->lv_order_id = item->li_order_id;
num_iovecs += lv->lv_niovecs;
- }
+ /* we don't write ordered log vectors */
+ if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
+ num_bytes += lv->lv_bytes;
- /*
- * initialise the new context and attach it to the CIL. Then attach
- * the current context to the CIL committing list so it can be found
- * during log forces to extract the commit lsn of the sequence that
- * needs to be forced.
- */
- INIT_LIST_HEAD(&new_ctx->committing);
- INIT_LIST_HEAD(&new_ctx->busy_extents);
- new_ctx->sequence = ctx->sequence + 1;
- new_ctx->cil = cil;
- cil->xc_ctx = new_ctx;
+ list_add_tail(&lv->lv_list, &ctx->lv_chain);
+ list_del_init(&item->li_cil);
+ item->li_order_id = 0;
+ item->li_lv = NULL;
+ }
/*
- * The switch is now done, so we can drop the context lock and move out
+ * Switch the contexts so we can drop the context lock and move out
* of a shared context. We can't just go straight to the commit record,
* though - we need to synchronise with previous and future commits so
* that the commit records are correctly ordered in the log to ensure
@@ -772,41 +955,55 @@ xlog_cil_push_work(
* that higher sequences will wait for us to write out a commit record
* before they do.
*
- * xfs_log_force_lsn requires us to mirror the new sequence into the cil
+ * xfs_log_force_seq requires us to mirror the new sequence into the cil
* structure atomically with the addition of this sequence to the
* committing list. This also ensures that we can do unlocked checks
* against the current sequence in log forces without risking
* deferencing a freed context pointer.
*/
spin_lock(&cil->xc_push_lock);
- cil->xc_current_sequence = new_ctx->sequence;
+ xlog_cil_ctx_switch(cil, new_ctx);
spin_unlock(&cil->xc_push_lock);
up_write(&cil->xc_ctx_lock);
/*
+ * Sort the log vector chain before we add the transaction headers.
+ * This ensures we always have the transaction headers at the start
+ * of the chain.
+ */
+ list_sort(NULL, &ctx->lv_chain, xlog_cil_order_cmp);
+
+ /*
* Build a checkpoint transaction header and write it to the log to
* begin the transaction. We need to account for the space used by the
* transaction header here as it is not accounted for in xlog_write().
- *
- * The LSN we need to pass to the log items on transaction commit is
- * the LSN reported by the first log vector write. If we use the commit
- * record lsn then we can move the tail beyond the grant write head.
+ * Add the lvhdr to the head of the lv chain we pass to xlog_write() so
+ * it gets written into the iclog first.
+ */
+ xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs);
+ num_bytes += lvhdr.lv_bytes;
+ list_add(&lvhdr.lv_list, &ctx->lv_chain);
+
+ /*
+ * Before we format and submit the first iclog, we have to ensure that
+ * the metadata writeback ordering cache flush is complete.
*/
- tic = ctx->ticket;
- thdr.th_magic = XFS_TRANS_HEADER_MAGIC;
- thdr.th_type = XFS_TRANS_CHECKPOINT;
- thdr.th_tid = tic->t_tid;
- thdr.th_num_items = num_iovecs;
- lhdr.i_addr = &thdr;
- lhdr.i_len = sizeof(xfs_trans_header_t);
- lhdr.i_type = XLOG_REG_TYPE_TRANSHDR;
- tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t);
-
- lvhdr.lv_niovecs = 1;
- lvhdr.lv_iovecp = &lhdr;
- lvhdr.lv_next = ctx->lv_chain;
-
- error = xlog_write(log, &lvhdr, tic, &ctx->start_lsn, NULL, 0, true);
+ wait_for_completion(&bdev_flush);
+
+ /*
+ * The LSN we need to pass to the log items on transaction commit is the
+ * LSN reported by the first log vector write, not the commit lsn. If we
+ * use the commit record lsn then we can move the tail beyond the grant
+ * write head.
+ */
+ error = xlog_write(log, &ctx->lv_chain, ctx->ticket, &ctx->start_lsn,
+ NULL, num_bytes);
+
+ /*
+ * Take the lvhdr back off the lv_chain as it should not be passed
+ * to log IO completion.
+ */
+ list_del(&lvhdr.lv_list);
if (error)
goto out_abort_free_ticket;
@@ -844,16 +1041,14 @@ restart:
}
spin_unlock(&cil->xc_push_lock);
- error = xlog_commit_record(log, tic, &commit_iclog, &commit_lsn);
+ error = xlog_commit_record(log, ctx->ticket, &commit_iclog, &commit_lsn);
if (error)
goto out_abort_free_ticket;
- xfs_log_ticket_ungrant(log, tic);
-
spin_lock(&commit_iclog->ic_callback_lock);
if (commit_iclog->ic_state == XLOG_STATE_IOERROR) {
spin_unlock(&commit_iclog->ic_callback_lock);
- goto out_abort;
+ goto out_abort_free_ticket;
}
ASSERT_ALWAYS(commit_iclog->ic_state == XLOG_STATE_ACTIVE ||
commit_iclog->ic_state == XLOG_STATE_WANT_SYNC);
@@ -870,8 +1065,45 @@ restart:
wake_up_all(&cil->xc_commit_wait);
spin_unlock(&cil->xc_push_lock);
- /* release the hounds! */
- xfs_log_release_iclog(commit_iclog);
+ /*
+ * Pull the ticket off the ctx so we can ungrant it after releasing the
+ * commit_iclog. The ctx may be freed by the time we return from
+ * releasing the commit_iclog (i.e. checkpoint has been completed and
+ * callback run) so we can't reference the ctx after the call to
+ * xlog_state_release_iclog().
+ */
+ ticket = ctx->ticket;
+
+ /*
+ * If the checkpoint spans multiple iclogs, wait for all previous
+ * iclogs to complete before we submit the commit_iclog. In this case,
+ * the commit_iclog write needs to issue a pre-flush so that the
+ * ordering is correctly preserved down to stable storage.
+ */
+ spin_lock(&log->l_icloglock);
+ if (ctx->start_lsn != commit_lsn) {
+ xlog_wait_on_iclog(commit_iclog->ic_prev);
+ spin_lock(&log->l_icloglock);
+ commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH;
+ }
+
+ /*
+ * The commit iclog must be written to stable storage to guarantee
+ * journal IO vs metadata writeback IO is correctly ordered on stable
+ * storage.
+ *
+ * If the push caller needs the commit to be immediately stable and the
+ * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it
+ * will be written when released, switch it's state to WANT_SYNC right
+ * now.
+ */
+ commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA;
+ if (push_commit_stable && commit_iclog->ic_state == XLOG_STATE_ACTIVE)
+ xlog_state_switch_iclogs(log, commit_iclog, 0);
+ xlog_state_release_iclog(log, commit_iclog, ticket);
+ spin_unlock(&log->l_icloglock);
+
+ xfs_log_ticket_ungrant(log, ticket);
return;
out_skip:
@@ -881,8 +1113,7 @@ out_skip:
return;
out_abort_free_ticket:
- xfs_log_ticket_ungrant(log, tic);
-out_abort:
+ xfs_log_ticket_ungrant(log, ctx->ticket);
ASSERT(XLOG_FORCED_SHUTDOWN(log));
xlog_cil_committed(ctx);
}
@@ -899,18 +1130,27 @@ xlog_cil_push_background(
struct xlog *log) __releases(cil->xc_ctx_lock)
{
struct xfs_cil *cil = log->l_cilp;
+ int space_used = atomic_read(&cil->xc_ctx->space_used);
/*
* The cil won't be empty because we are called while holding the
- * context lock so whatever we added to the CIL will still be there
+ * context lock so whatever we added to the CIL will still be there.
*/
- ASSERT(!list_empty(&cil->xc_cil));
+ ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
/*
- * don't do a background push if we haven't used up all the
- * space available yet.
+ * We are done if:
+ * - we haven't used up all the space available yet; or
+ * - we've already queued up a push; and
+ * - we're not over the hard limit; and
+ * - nothing has been over the hard limit.
+ *
+ * If so, we don't need to take the push lock as there's nothing to do.
*/
- if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) {
+ if (space_used < XLOG_CIL_SPACE_LIMIT(log) ||
+ (cil->xc_push_seq == cil->xc_current_sequence &&
+ space_used < XLOG_CIL_BLOCKING_SPACE_LIMIT(log) &&
+ !waitqueue_active(&cil->xc_push_wait))) {
up_read(&cil->xc_ctx_lock);
return;
}
@@ -918,7 +1158,7 @@ xlog_cil_push_background(
spin_lock(&cil->xc_push_lock);
if (cil->xc_push_seq < cil->xc_current_sequence) {
cil->xc_push_seq = cil->xc_current_sequence;
- queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
+ queue_work(log->l_mp->m_cil_workqueue, &cil->xc_ctx->push_work);
}
/*
@@ -931,11 +1171,18 @@ xlog_cil_push_background(
/*
* If we are well over the space limit, throttle the work that is being
- * done until the push work on this context has begun.
+ * done until the push work on this context has begun. Enforce the hard
+ * throttle on all transaction commits once it has been activated, even
+ * if the committing transactions have resulted in the space usage
+ * dipping back down under the hard limit.
+ *
+ * The ctx->xc_push_lock provides the serialisation necessary for safely
+ * using the lockless waitqueue_active() check in this context.
*/
- if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) {
+ if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) ||
+ waitqueue_active(&cil->xc_push_wait)) {
trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket);
- ASSERT(cil->xc_ctx->space_used < log->l_logsize);
+ ASSERT(space_used < log->l_logsize);
xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock);
return;
}
@@ -947,13 +1194,26 @@ xlog_cil_push_background(
/*
* xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence
* number that is passed. When it returns, the work will be queued for
- * @push_seq, but it won't be completed. The caller is expected to do any
- * waiting for push_seq to complete if it is required.
+ * @push_seq, but it won't be completed.
+ *
+ * If the caller is performing a synchronous force, we will flush the workqueue
+ * to get previously queued work moving to minimise the wait time they will
+ * undergo waiting for all outstanding pushes to complete. The caller is
+ * expected to do the required waiting for push_seq to complete.
+ *
+ * If the caller is performing an async push, we need to ensure that the
+ * checkpoint is fully flushed out of the iclogs when we finish the push. If we
+ * don't do this, then the commit record may remain sitting in memory in an
+ * ACTIVE iclog. This then requires another full log force to push to disk,
+ * which defeats the purpose of having an async, non-blocking CIL force
+ * mechanism. Hence in this case we need to pass a flag to the push work to
+ * indicate it needs to flush the commit record itself.
*/
static void
xlog_cil_push_now(
struct xlog *log,
- xfs_lsn_t push_seq)
+ xfs_lsn_t push_seq,
+ bool async)
{
struct xfs_cil *cil = log->l_cilp;
@@ -963,20 +1223,23 @@ xlog_cil_push_now(
ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
/* start on any pending background push to minimise wait time on it */
- flush_work(&cil->xc_push_work);
+ if (!async)
+ flush_workqueue(log->l_mp->m_cil_workqueue);
/*
* If the CIL is empty or we've already pushed the sequence then
* there's no work we need to do.
*/
spin_lock(&cil->xc_push_lock);
- if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) ||
+ push_seq <= cil->xc_push_seq) {
spin_unlock(&cil->xc_push_lock);
return;
}
cil->xc_push_seq = push_seq;
- queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
+ cil->xc_push_commit_stable = async;
+ queue_work(log->l_mp->m_cil_workqueue, &cil->xc_ctx->push_work);
spin_unlock(&cil->xc_push_lock);
}
@@ -988,7 +1251,7 @@ xlog_cil_empty(
bool empty = false;
spin_lock(&cil->xc_push_lock);
- if (list_empty(&cil->xc_cil))
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
empty = true;
spin_unlock(&cil->xc_push_lock);
return empty;
@@ -1008,16 +1271,14 @@ xlog_cil_empty(
* allowed again.
*/
void
-xfs_log_commit_cil(
- struct xfs_mount *mp,
+xlog_cil_commit(
+ struct xlog *log,
struct xfs_trans *tp,
- xfs_lsn_t *commit_lsn,
+ xfs_csn_t *commit_seq,
bool regrant)
{
- struct xlog *log = mp->m_log;
struct xfs_cil *cil = log->l_cilp;
struct xfs_log_item *lip, *next;
- xfs_lsn_t xc_commit_lsn;
/*
* Do all necessary memory allocation before we lock the CIL.
@@ -1031,10 +1292,6 @@ xfs_log_commit_cil(
xlog_cil_insert_items(log, tp);
- xc_commit_lsn = cil->xc_ctx->sequence;
- if (commit_lsn)
- *commit_lsn = xc_commit_lsn;
-
if (regrant && !XLOG_FORCED_SHUTDOWN(log))
xfs_log_ticket_regrant(log, tp->t_ticket);
else
@@ -1057,27 +1314,44 @@ xfs_log_commit_cil(
list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) {
xfs_trans_del_item(lip);
if (lip->li_ops->iop_committing)
- lip->li_ops->iop_committing(lip, xc_commit_lsn);
+ lip->li_ops->iop_committing(lip, cil->xc_ctx->sequence);
}
+ if (commit_seq)
+ *commit_seq = cil->xc_ctx->sequence;
/* xlog_cil_push_background() releases cil->xc_ctx_lock */
xlog_cil_push_background(log);
}
/*
+ * Flush the CIL to stable storage but don't wait for it to complete. This
+ * requires the CIL push to ensure the commit record for the push hits the disk,
+ * but otherwise is no different to a push done from a log force.
+ */
+void
+xlog_cil_flush(
+ struct xlog *log)
+{
+ xfs_csn_t seq = log->l_cilp->xc_current_sequence;
+
+ trace_xfs_log_force(log->l_mp, seq, _RET_IP_);
+ xlog_cil_push_now(log, seq, true);
+}
+
+/*
* Conditionally push the CIL based on the sequence passed in.
*
- * We only need to push if we haven't already pushed the sequence
- * number given. Hence the only time we will trigger a push here is
- * if the push sequence is the same as the current context.
+ * We only need to push if we haven't already pushed the sequence number given.
+ * Hence the only time we will trigger a push here is if the push sequence is
+ * the same as the current context.
*
* We return the current commit lsn to allow the callers to determine if a
* iclog flush is necessary following this call.
*/
xfs_lsn_t
-xlog_cil_force_lsn(
+xlog_cil_force_seq(
struct xlog *log,
- xfs_lsn_t sequence)
+ xfs_csn_t sequence)
{
struct xfs_cil *cil = log->l_cilp;
struct xfs_cil_ctx *ctx;
@@ -1085,13 +1359,17 @@ xlog_cil_force_lsn(
ASSERT(sequence <= cil->xc_current_sequence);
+ if (!sequence)
+ sequence = cil->xc_current_sequence;
+ trace_xfs_log_force(log->l_mp, sequence, _RET_IP_);
+
/*
* check to see if we need to force out the current context.
* xlog_cil_push() handles racing pushes for the same sequence,
* so no need to deal with it here.
*/
restart:
- xlog_cil_push_now(log, sequence);
+ xlog_cil_push_now(log, sequence, false);
/*
* See if we can find a previous sequence still committing.
@@ -1115,6 +1393,7 @@ restart:
* It is still being pushed! Wait for the push to
* complete, then start again from the beginning.
*/
+ XFS_STATS_INC(log->l_mp, xs_log_force_sleep);
xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
goto restart;
}
@@ -1140,7 +1419,7 @@ restart:
* we would have found the context on the committing list.
*/
if (sequence == cil->xc_current_sequence &&
- !list_empty(&cil->xc_cil)) {
+ !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
spin_unlock(&cil->xc_push_lock);
goto restart;
}
@@ -1173,21 +1452,126 @@ bool
xfs_log_item_in_current_chkpt(
struct xfs_log_item *lip)
{
- struct xfs_cil_ctx *ctx;
+ struct xfs_cil *cil = lip->li_mountp->m_log->l_cilp;
- if (list_empty(&lip->li_cil))
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
return false;
- ctx = lip->li_mountp->m_log->l_cilp->xc_ctx;
-
/*
* li_seq is written on the first commit of a log item to record the
* first checkpoint it is written to. Hence if it is different to the
* current sequence, we're in a new checkpoint.
*/
- if (XFS_LSN_CMP(lip->li_seq, ctx->sequence) != 0)
- return false;
- return true;
+ return lip->li_seq == cil->xc_ctx->sequence;
+}
+
+#ifdef CONFIG_HOTPLUG_CPU
+static LIST_HEAD(xlog_cil_pcp_list);
+static DEFINE_SPINLOCK(xlog_cil_pcp_lock);
+
+/*
+ * Move dead percpu state to the relevant CIL context structures.
+ *
+ * We have to lock the CIL context here to ensure that nothing is modifying
+ * the percpu state, either addition or removal. Both of these are done under
+ * the CIL context lock, so grabbing that exclusively here will ensure we can
+ * safely drain the cilpcp for the CPU that is dying.
+ */
+void
+xlog_cil_pcp_dead(
+ unsigned int cpu)
+{
+ struct xfs_cil *cil, *n;
+
+ spin_lock(&xlog_cil_pcp_lock);
+ list_for_each_entry_safe(cil, n, &xlog_cil_pcp_list, xc_pcp_list) {
+ struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+ struct xfs_cil_ctx *ctx;
+
+ spin_unlock(&xlog_cil_pcp_lock);
+ down_write(&cil->xc_ctx_lock);
+ ctx = cil->xc_ctx;
+
+ atomic_add(cilpcp->space_used, &ctx->space_used);
+ if (ctx->ticket) {
+ ctx->ticket->t_curr_res += cilpcp->space_reserved;
+ ctx->ticket->t_unit_res += cilpcp->space_reserved;
+ }
+ if (!list_empty(&cilpcp->busy_extents)) {
+ list_splice_init(&cilpcp->busy_extents,
+ &ctx->busy_extents);
+ }
+ if (!list_empty(&cilpcp->log_items))
+ list_splice_init(&cilpcp->log_items, &ctx->log_items);
+
+ cilpcp->space_used = 0;
+ cilpcp->space_reserved = 0;
+
+ up_write(&cil->xc_ctx_lock);
+ spin_lock(&xlog_cil_pcp_lock);
+ }
+ spin_unlock(&xlog_cil_pcp_lock);
+}
+
+static int
+xlog_cil_pcp_hpadd(
+ struct xfs_cil *cil)
+{
+ INIT_LIST_HEAD(&cil->xc_pcp_list);
+ spin_lock(&xlog_cil_pcp_lock);
+ list_add(&cil->xc_pcp_list, &xlog_cil_pcp_list);
+ spin_unlock(&xlog_cil_pcp_lock);
+ return 0;
+}
+
+static void
+xlog_cil_pcp_hpremove(
+ struct xfs_cil *cil)
+{
+ spin_lock(&xlog_cil_pcp_lock);
+ list_del(&cil->xc_pcp_list);
+ spin_unlock(&xlog_cil_pcp_lock);
+}
+
+#else /* !CONFIG_HOTPLUG_CPU */
+static inline int xlog_cil_pcp_hpadd(struct xfs_cil *cil) { return 0; }
+static inline void xlog_cil_pcp_hpremove(struct xfs_cil *cil) {}
+#endif
+
+static void __percpu *
+xlog_cil_pcp_alloc(
+ struct xfs_cil *cil)
+{
+ struct xlog_cil_pcp *cilpcp;
+ void __percpu *pcp;
+ int cpu;
+
+ pcp = alloc_percpu(struct xlog_cil_pcp);
+ if (!pcp)
+ return NULL;
+
+ if (xlog_cil_pcp_hpadd(cil) < 0) {
+ free_percpu(pcp);
+ return NULL;
+ }
+
+ for_each_possible_cpu(cpu) {
+ cilpcp = per_cpu_ptr(pcp, cpu);
+ INIT_LIST_HEAD(&cilpcp->busy_extents);
+ INIT_LIST_HEAD(&cilpcp->log_items);
+ }
+ return pcp;
+}
+
+static void
+xlog_cil_pcp_free(
+ struct xfs_cil *cil,
+ void __percpu *pcp)
+{
+ if (!pcp)
+ return;
+ xlog_cil_pcp_hpremove(cil);
+ free_percpu(pcp);
}
/*
@@ -1204,30 +1588,23 @@ xlog_cil_init(
if (!cil)
return -ENOMEM;
- ctx = kmem_zalloc(sizeof(*ctx), KM_MAYFAIL);
- if (!ctx) {
+ cil->xc_log = log;
+ cil->xc_pcp = xlog_cil_pcp_alloc(cil);
+ if (!cil->xc_pcp) {
kmem_free(cil);
return -ENOMEM;
}
- INIT_WORK(&cil->xc_push_work, xlog_cil_push_work);
- INIT_LIST_HEAD(&cil->xc_cil);
INIT_LIST_HEAD(&cil->xc_committing);
- spin_lock_init(&cil->xc_cil_lock);
spin_lock_init(&cil->xc_push_lock);
init_waitqueue_head(&cil->xc_push_wait);
init_rwsem(&cil->xc_ctx_lock);
init_waitqueue_head(&cil->xc_commit_wait);
+ log->l_cilp = cil;
- INIT_LIST_HEAD(&ctx->committing);
- INIT_LIST_HEAD(&ctx->busy_extents);
- ctx->sequence = 1;
- ctx->cil = cil;
- cil->xc_ctx = ctx;
- cil->xc_current_sequence = ctx->sequence;
+ ctx = xlog_cil_ctx_alloc();
+ xlog_cil_ctx_switch(cil, ctx);
- cil->xc_log = log;
- log->l_cilp = cil;
return 0;
}
@@ -1235,13 +1612,16 @@ void
xlog_cil_destroy(
struct xlog *log)
{
- if (log->l_cilp->xc_ctx) {
- if (log->l_cilp->xc_ctx->ticket)
- xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
- kmem_free(log->l_cilp->xc_ctx);
+ struct xfs_cil *cil = log->l_cilp;
+
+ if (cil->xc_ctx) {
+ if (cil->xc_ctx->ticket)
+ xfs_log_ticket_put(cil->xc_ctx->ticket);
+ kmem_free(cil->xc_ctx);
}
- ASSERT(list_empty(&log->l_cilp->xc_cil));
- kmem_free(log->l_cilp);
+ ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
+ xlog_cil_pcp_free(cil, cil->xc_pcp);
+ kmem_free(cil);
}
diff --git a/fs/xfs/xfs_log_priv.h b/fs/xfs/xfs_log_priv.h
index 1c6fdbf3d506..e4e421a70335 100644
--- a/fs/xfs/xfs_log_priv.h
+++ b/fs/xfs/xfs_log_priv.h
@@ -133,37 +133,20 @@ enum xlog_iclog_state {
#define XLOG_COVER_OPS 5
-/* Ticket reservation region accounting */
-#define XLOG_TIC_LEN_MAX 15
-
-/*
- * Reservation region
- * As would be stored in xfs_log_iovec but without the i_addr which
- * we don't care about.
- */
-typedef struct xlog_res {
- uint r_len; /* region length :4 */
- uint r_type; /* region's transaction type :4 */
-} xlog_res_t;
+#define XLOG_ICL_NEED_FLUSH (1 << 0) /* iclog needs REQ_PREFLUSH */
+#define XLOG_ICL_NEED_FUA (1 << 1) /* iclog needs REQ_FUA */
typedef struct xlog_ticket {
- struct list_head t_queue; /* reserve/write queue */
- struct task_struct *t_task; /* task that owns this ticket */
- xlog_tid_t t_tid; /* transaction identifier : 4 */
- atomic_t t_ref; /* ticket reference count : 4 */
- int t_curr_res; /* current reservation in bytes : 4 */
- int t_unit_res; /* unit reservation in bytes : 4 */
- char t_ocnt; /* original count : 1 */
- char t_cnt; /* current count : 1 */
- char t_clientid; /* who does this belong to; : 1 */
- char t_flags; /* properties of reservation : 1 */
-
- /* reservation array fields */
- uint t_res_num; /* num in array : 4 */
- uint t_res_num_ophdrs; /* num op hdrs : 4 */
- uint t_res_arr_sum; /* array sum : 4 */
- uint t_res_o_flow; /* sum overflow : 4 */
- xlog_res_t t_res_arr[XLOG_TIC_LEN_MAX]; /* array of res : 8 * 15 */
+ struct list_head t_queue; /* reserve/write queue */
+ struct task_struct *t_task; /* task that owns this ticket */
+ xlog_tid_t t_tid; /* transaction identifier */
+ atomic_t t_ref; /* ticket reference count */
+ int t_curr_res; /* current reservation */
+ int t_unit_res; /* unit reservation */
+ char t_ocnt; /* original count */
+ char t_cnt; /* current count */
+ char t_flags; /* properties of reservation */
+ int t_iclog_hdrs; /* iclog hdrs in t_curr_res */
} xlog_ticket_t;
/*
@@ -201,6 +184,7 @@ typedef struct xlog_in_core {
u32 ic_size;
u32 ic_offset;
enum xlog_iclog_state ic_state;
+ unsigned int ic_flags;
char *ic_datap; /* pointer to iclog data */
/* Callback structures need their own cacheline */
@@ -230,17 +214,29 @@ struct xfs_cil;
struct xfs_cil_ctx {
struct xfs_cil *cil;
- xfs_lsn_t sequence; /* chkpt sequence # */
+ xfs_csn_t sequence; /* chkpt sequence # */
xfs_lsn_t start_lsn; /* first LSN of chkpt commit */
xfs_lsn_t commit_lsn; /* chkpt commit record lsn */
struct xlog_ticket *ticket; /* chkpt ticket */
- int nvecs; /* number of regions */
- int space_used; /* aggregate size of regions */
+ atomic_t space_used; /* aggregate size of regions */
struct list_head busy_extents; /* busy extents in chkpt */
- struct xfs_log_vec *lv_chain; /* logvecs being pushed */
+ struct list_head log_items; /* log items in chkpt */
+ struct list_head lv_chain; /* logvecs being pushed */
struct list_head iclog_entry;
struct list_head committing; /* ctx committing list */
struct work_struct discard_endio_work;
+ struct work_struct push_work;
+ atomic_t order_id;
+};
+
+/*
+ * Per-cpu CIL tracking items
+ */
+struct xlog_cil_pcp {
+ uint32_t space_used;
+ uint32_t space_reserved;
+ struct list_head busy_extents;
+ struct list_head log_items;
};
/*
@@ -261,21 +257,29 @@ struct xfs_cil_ctx {
*/
struct xfs_cil {
struct xlog *xc_log;
- struct list_head xc_cil;
- spinlock_t xc_cil_lock;
+ unsigned long xc_flags;
+ atomic_t xc_iclog_hdrs;
struct rw_semaphore xc_ctx_lock ____cacheline_aligned_in_smp;
struct xfs_cil_ctx *xc_ctx;
spinlock_t xc_push_lock ____cacheline_aligned_in_smp;
- xfs_lsn_t xc_push_seq;
+ xfs_csn_t xc_push_seq;
+ bool xc_push_commit_stable;
struct list_head xc_committing;
wait_queue_head_t xc_commit_wait;
- xfs_lsn_t xc_current_sequence;
- struct work_struct xc_push_work;
+ xfs_csn_t xc_current_sequence;
wait_queue_head_t xc_push_wait; /* background push throttle */
+
+ void __percpu *xc_pcp; /* percpu CIL structures */
+#ifdef CONFIG_HOTPLUG_CPU
+ struct list_head xc_pcp_list;
+#endif
} ____cacheline_aligned_in_smp;
+/* xc_flags bit values */
+#define XLOG_CIL_EMPTY 1
+
/*
* The amount of log space we allow the CIL to aggregate is difficult to size.
* Whatever we choose, we have to make sure we can get a reservation for the
@@ -436,6 +440,8 @@ struct xlog {
#endif
/* log recovery lsn tracking (for buffer submission */
xfs_lsn_t l_recovery_lsn;
+
+ uint32_t l_iclog_roundoff;/* padding roundoff */
};
#define XLOG_BUF_CANCEL_BUCKET(log, blkno) \
@@ -458,13 +464,8 @@ extern __le32 xlog_cksum(struct xlog *log, struct xlog_rec_header *rhead,
char *dp, int size);
extern kmem_zone_t *xfs_log_ticket_zone;
-struct xlog_ticket *
-xlog_ticket_alloc(
- struct xlog *log,
- int unit_bytes,
- int count,
- char client,
- bool permanent);
+struct xlog_ticket *xlog_ticket_alloc(struct xlog *log, int unit_bytes,
+ int count, bool permanent);
static inline void
xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
@@ -476,15 +477,20 @@ xlog_write_adv_cnt(void **ptr, int *len, int *off, size_t bytes)
void xlog_print_tic_res(struct xfs_mount *mp, struct xlog_ticket *ticket);
void xlog_print_trans(struct xfs_trans *);
-int xlog_write(struct xlog *log, struct xfs_log_vec *log_vector,
+int xlog_write(struct xlog *log, struct list_head *lv_chain,
struct xlog_ticket *tic, xfs_lsn_t *start_lsn,
- struct xlog_in_core **commit_iclog, uint flags,
- bool need_start_rec);
+ struct xlog_in_core **commit_iclog, uint32_t len);
int xlog_commit_record(struct xlog *log, struct xlog_ticket *ticket,
struct xlog_in_core **iclog, xfs_lsn_t *lsn);
+
void xfs_log_ticket_ungrant(struct xlog *log, struct xlog_ticket *ticket);
void xfs_log_ticket_regrant(struct xlog *log, struct xlog_ticket *ticket);
+void xlog_state_switch_iclogs(struct xlog *log, struct xlog_in_core *iclog,
+ int eventual_size);
+int xlog_state_release_iclog(struct xlog *xlog, struct xlog_in_core *iclog,
+ struct xlog_ticket *ticket);
+
/*
* When we crack an atomic LSN, we sample it first so that the value will not
* change while we are cracking it into the component values. This means we
@@ -547,19 +553,19 @@ int xlog_cil_init(struct xlog *log);
void xlog_cil_init_post_recovery(struct xlog *log);
void xlog_cil_destroy(struct xlog *log);
bool xlog_cil_empty(struct xlog *log);
+void xlog_cil_commit(struct xlog *log, struct xfs_trans *tp,
+ xfs_csn_t *commit_seq, bool regrant);
/*
* CIL force routines
*/
-xfs_lsn_t
-xlog_cil_force_lsn(
- struct xlog *log,
- xfs_lsn_t sequence);
+void xlog_cil_flush(struct xlog *log);
+xfs_lsn_t xlog_cil_force_seq(struct xlog *log, xfs_csn_t sequence);
static inline void
xlog_cil_force(struct xlog *log)
{
- xlog_cil_force_lsn(log, log->l_cilp->xc_current_sequence);
+ xlog_cil_force_seq(log, log->l_cilp->xc_current_sequence);
}
/*
@@ -582,6 +588,8 @@ xlog_wait(
remove_wait_queue(wq, &wait);
}
+int xlog_wait_on_iclog(struct xlog_in_core *iclog);
+
/*
* The LSN is valid so long as it is behind the current LSN. If it isn't, this
* means that the next log record that includes this metadata could have a
@@ -633,4 +641,13 @@ xlog_valid_lsn(
return valid;
}
+/*
+ * CIL CPU dead notifier
+ */
+#ifdef CONFIG_HOTPLUG_CPU
+void xlog_cil_pcp_dead(unsigned int cpu);
+#else
+static inline void xlog_cil_pcp_dead(unsigned int cpu) {}
+#endif /* CONFIG_HOTPLUG_CPU */
+
#endif /* __XFS_LOG_PRIV_H__ */
diff --git a/fs/xfs/xfs_super.c b/fs/xfs/xfs_super.c
index 3a7fd4f02aa7..c30b077a1ede 100644
--- a/fs/xfs/xfs_super.c
+++ b/fs/xfs/xfs_super.c
@@ -340,13 +340,6 @@ xfs_blkdev_put(
blkdev_put(bdev, FMODE_READ|FMODE_WRITE|FMODE_EXCL);
}
-void
-xfs_blkdev_issue_flush(
- xfs_buftarg_t *buftarg)
-{
- blkdev_issue_flush(buftarg->bt_bdev);
-}
-
STATIC void
xfs_close_devices(
struct xfs_mount *mp)
@@ -508,9 +501,13 @@ xfs_init_mount_workqueues(
if (!mp->m_unwritten_workqueue)
goto out_destroy_buf;
+ /*
+ * Limit the CIL pipeline depth to 4 concurrent works to bound the
+ * concurrency the log spinlocks will be exposed to.
+ */
mp->m_cil_workqueue = alloc_workqueue("xfs-cil/%s",
XFS_WQFLAGS(WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_UNBOUND),
- 0, mp->m_super->s_id);
+ 4, mp->m_super->s_id);
if (!mp->m_cil_workqueue)
goto out_destroy_unwritten;
@@ -2127,6 +2124,36 @@ xfs_destroy_workqueues(void)
destroy_workqueue(xfs_alloc_wq);
}
+static int
+xfs_cpu_dead(
+ unsigned int cpu)
+{
+ xlog_cil_pcp_dead(cpu);
+ return 0;
+}
+
+static int __init
+xfs_cpu_hotplug_init(void)
+{
+ int error;
+
+ error = cpuhp_setup_state_nocalls(CPUHP_XFS_DEAD,
+ "xfs:dead", NULL,
+ xfs_cpu_dead);
+ if (error < 0) {
+ xfs_alert(NULL,
+"Failed to initialise CPU hotplug, error %d. XFS is non-functional.",
+ error);
+ }
+ return error;
+}
+
+static void
+xfs_cpu_hotplug_destroy(void)
+{
+ cpuhp_remove_state_nocalls(CPUHP_XFS_DEAD);
+}
+
STATIC int __init
init_xfs_fs(void)
{
@@ -2139,10 +2166,14 @@ init_xfs_fs(void)
xfs_dir_startup();
- error = xfs_init_zones();
+ error = xfs_cpu_hotplug_init();
if (error)
goto out;
+ error = xfs_init_zones();
+ if (error)
+ goto out_destroy_hp;
+
error = xfs_init_workqueues();
if (error)
goto out_destroy_zones;
@@ -2222,6 +2253,8 @@ init_xfs_fs(void)
xfs_destroy_workqueues();
out_destroy_zones:
xfs_destroy_zones();
+ out_destroy_hp:
+ xfs_cpu_hotplug_destroy();
out:
return error;
}
@@ -2244,6 +2277,7 @@ exit_xfs_fs(void)
xfs_destroy_workqueues();
xfs_destroy_zones();
xfs_uuid_table_free();
+ xfs_cpu_hotplug_destroy();
}
module_init(init_xfs_fs);
diff --git a/fs/xfs/xfs_super.h b/fs/xfs/xfs_super.h
index d2b40dc60dfc..167d23f92ffe 100644
--- a/fs/xfs/xfs_super.h
+++ b/fs/xfs/xfs_super.h
@@ -87,7 +87,6 @@ struct xfs_buftarg;
struct block_device;
extern void xfs_flush_inodes(struct xfs_mount *mp);
-extern void xfs_blkdev_issue_flush(struct xfs_buftarg *);
extern xfs_agnumber_t xfs_set_inode_alloc(struct xfs_mount *,
xfs_agnumber_t agcount);
diff --git a/fs/xfs/xfs_sysfs.c b/fs/xfs/xfs_sysfs.c
index f1bc88f4367c..18dc5eca6c04 100644
--- a/fs/xfs/xfs_sysfs.c
+++ b/fs/xfs/xfs_sysfs.c
@@ -10,6 +10,7 @@
#include "xfs_log_format.h"
#include "xfs_trans_resv.h"
#include "xfs_sysfs.h"
+#include "xfs_log.h"
#include "xfs_log_priv.h"
#include "xfs_mount.h"
diff --git a/fs/xfs/xfs_trace.c b/fs/xfs/xfs_trace.c
index 7e01e00550ac..4c86afad1617 100644
--- a/fs/xfs/xfs_trace.c
+++ b/fs/xfs/xfs_trace.c
@@ -20,6 +20,7 @@
#include "xfs_bmap.h"
#include "xfs_attr.h"
#include "xfs_trans.h"
+#include "xfs_log.h"
#include "xfs_log_priv.h"
#include "xfs_buf_item.h"
#include "xfs_quota.h"
diff --git a/fs/xfs/xfs_trans.c b/fs/xfs/xfs_trans.c
index 586f2992b789..0f8300adb12d 100644
--- a/fs/xfs/xfs_trans.c
+++ b/fs/xfs/xfs_trans.c
@@ -9,7 +9,6 @@
#include "xfs_shared.h"
#include "xfs_format.h"
#include "xfs_log_format.h"
-#include "xfs_log_priv.h"
#include "xfs_trans_resv.h"
#include "xfs_mount.h"
#include "xfs_extent_busy.h"
@@ -17,6 +16,7 @@
#include "xfs_trans.h"
#include "xfs_trans_priv.h"
#include "xfs_log.h"
+#include "xfs_log_priv.h"
#include "xfs_trace.h"
#include "xfs_error.h"
#include "xfs_defer.h"
@@ -194,11 +194,9 @@ xfs_trans_reserve(
ASSERT(resp->tr_logflags & XFS_TRANS_PERM_LOG_RES);
error = xfs_log_regrant(mp, tp->t_ticket);
} else {
- error = xfs_log_reserve(mp,
- resp->tr_logres,
+ error = xfs_log_reserve(mp, resp->tr_logres,
resp->tr_logcount,
- &tp->t_ticket, XFS_TRANSACTION,
- permanent);
+ &tp->t_ticket, permanent);
}
if (error)
@@ -737,7 +735,7 @@ xfs_log_item_batch_insert(
void
xfs_trans_committed_bulk(
struct xfs_ail *ailp,
- struct xfs_log_vec *log_vector,
+ struct list_head *lv_chain,
xfs_lsn_t commit_lsn,
bool aborted)
{
@@ -752,7 +750,7 @@ xfs_trans_committed_bulk(
spin_unlock(&ailp->ail_lock);
/* unpin all the log items */
- for (lv = log_vector; lv; lv = lv->lv_next ) {
+ list_for_each_entry(lv, lv_chain, lv_list) {
struct xfs_log_item *lip = lv->lv_item;
xfs_lsn_t item_lsn;
@@ -839,7 +837,7 @@ __xfs_trans_commit(
bool regrant)
{
struct xfs_mount *mp = tp->t_mountp;
- xfs_lsn_t commit_lsn = -1;
+ xfs_csn_t commit_seq = 0;
int error = 0;
int sync = tp->t_flags & XFS_TRANS_SYNC;
@@ -881,7 +879,7 @@ __xfs_trans_commit(
xfs_trans_apply_sb_deltas(tp);
xfs_trans_apply_dquot_deltas(tp);
- xfs_log_commit_cil(mp, tp, &commit_lsn, regrant);
+ xlog_cil_commit(mp->m_log, tp, &commit_seq, regrant);
xfs_trans_free(tp);
@@ -890,7 +888,7 @@ __xfs_trans_commit(
* log out now and wait for it.
*/
if (sync) {
- error = xfs_log_force_lsn(mp, commit_lsn, XFS_LOG_SYNC, NULL);
+ error = xfs_log_force_seq(mp, commit_seq, XFS_LOG_SYNC, NULL);
XFS_STATS_INC(mp, xs_trans_sync);
} else {
XFS_STATS_INC(mp, xs_trans_async);
diff --git a/fs/xfs/xfs_trans.h b/fs/xfs/xfs_trans.h
index ee42d98d9011..2d1cc1ff93c7 100644
--- a/fs/xfs/xfs_trans.h
+++ b/fs/xfs/xfs_trans.h
@@ -43,7 +43,8 @@ struct xfs_log_item {
struct list_head li_cil; /* CIL pointers */
struct xfs_log_vec *li_lv; /* active log vector */
struct xfs_log_vec *li_lv_shadow; /* standby vector */
- xfs_lsn_t li_seq; /* CIL commit seq */
+ xfs_csn_t li_seq; /* CIL commit seq */
+ uint32_t li_order_id; /* CIL commit order */
};
/*
@@ -69,7 +70,7 @@ struct xfs_item_ops {
void (*iop_pin)(struct xfs_log_item *);
void (*iop_unpin)(struct xfs_log_item *, int remove);
uint (*iop_push)(struct xfs_log_item *, struct list_head *);
- void (*iop_committing)(struct xfs_log_item *, xfs_lsn_t commit_lsn);
+ void (*iop_committing)(struct xfs_log_item *lip, xfs_csn_t seq);
void (*iop_release)(struct xfs_log_item *);
xfs_lsn_t (*iop_committed)(struct xfs_log_item *, xfs_lsn_t);
int (*iop_recover)(struct xfs_log_item *lip,
diff --git a/fs/xfs/xfs_trans_ail.c b/fs/xfs/xfs_trans_ail.c
index dbb69b4bf3ed..69aac416e2ce 100644
--- a/fs/xfs/xfs_trans_ail.c
+++ b/fs/xfs/xfs_trans_ail.c
@@ -17,6 +17,7 @@
#include "xfs_errortag.h"
#include "xfs_error.h"
#include "xfs_log.h"
+#include "xfs_log_priv.h"
#ifdef DEBUG
/*
@@ -429,8 +430,12 @@ xfsaild_push(
/*
* If we encountered pinned items or did not finish writing out all
- * buffers the last time we ran, force the log first and wait for it
- * before pushing again.
+ * buffers the last time we ran, force a background CIL push to get the
+ * items unpinned in the near future. We do not wait on the CIL push as
+ * that could stall us for seconds if there is enough background IO
+ * load. Stalling for that long when the tail of the log is pinned and
+ * needs flushing will hard stop the transaction subsystem when log
+ * space runs out.
*/
if (ailp->ail_log_flush && ailp->ail_last_pushed_lsn == 0 &&
(!list_empty_careful(&ailp->ail_buf_list) ||
@@ -438,7 +443,7 @@ xfsaild_push(
ailp->ail_log_flush = 0;
XFS_STATS_INC(mp, xs_push_ail_flush);
- xfs_log_force(mp, XFS_LOG_SYNC);
+ xlog_cil_flush(mp->m_log);
}
spin_lock(&ailp->ail_lock);
diff --git a/fs/xfs/xfs_trans_priv.h b/fs/xfs/xfs_trans_priv.h
index 3004aeac9110..fc8667c728e3 100644
--- a/fs/xfs/xfs_trans_priv.h
+++ b/fs/xfs/xfs_trans_priv.h
@@ -18,7 +18,8 @@ void xfs_trans_add_item(struct xfs_trans *, struct xfs_log_item *);
void xfs_trans_del_item(struct xfs_log_item *);
void xfs_trans_unreserve_and_mod_sb(struct xfs_trans *tp);
-void xfs_trans_committed_bulk(struct xfs_ail *ailp, struct xfs_log_vec *lv,
+void xfs_trans_committed_bulk(struct xfs_ail *ailp,
+ struct list_head *lv_chain,
xfs_lsn_t commit_lsn, bool aborted);
/*
* AIL traversal cursor.
diff --git a/include/linux/cpuhotplug.h b/include/linux/cpuhotplug.h
index 4a62b3980642..bf8f29ad9bf8 100644
--- a/include/linux/cpuhotplug.h
+++ b/include/linux/cpuhotplug.h
@@ -52,6 +52,7 @@ enum cpuhp_state {
CPUHP_FS_BUFF_DEAD,
CPUHP_PRINTK_DEAD,
CPUHP_MM_MEMCQ_DEAD,
+ CPUHP_XFS_DEAD,
CPUHP_PERCPU_CNT_DEAD,
CPUHP_RADIX_DEAD,
CPUHP_PAGE_ALLOC_DEAD,