freebsd-nq/module/zfs/zfs_sa.c
Brian Behlendorf ec2626ad3f Use SA_HDL_PRIVATE for SA xattrs
A private SA handle must be used to ensure we can drop the dbuf
hold on the spill block prior to calling dmu_tx_commit().  If we
call dmu_tx_commit() before sa_handle_destroy(), then our hold
will trigger a copy of the dbuf to be made.  This is done to
prevent data from leaking in to the syncing txg.  As a result
the original dirty spill block will remain cached.

Additionally, relying on the shared zp->z_sa_hdl is unsafe in
the xattr context because the znode may be asynchronously dropped
from the cache.  It's far safer and simpler just to use a private
handle for xattrs.  Plus any additional overhead is offset by
the avoidance of the previously mentioned memory copy.

These forever dirty buffers can be noticed in the arcstats under
the anon_size.  On a quiescent system the value should be zero.
Without this fix and a SA xattr write workload you will see
anon_size increase.  Eventually, if enough dirty data builds up
your system it will appear to hang.  This occurs because the dmu
won't allow new txs to be assigned until that dirty data is
flushed, and it won't be because it's not part of an assigned tx.

As an aside, I typically see anon_size lurk around 16k so I think
there is another place in the code which needs a similar fix.
However, this value doesn't grow over time so it isn't critical.

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Issue #503
Issue #513
2012-03-02 13:20:48 -08:00

449 lines
13 KiB
C

/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/*
* Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
*/
#include <sys/types.h>
#include <sys/param.h>
#include <sys/vnode.h>
#include <sys/sa.h>
#include <sys/zfs_acl.h>
#include <sys/zfs_sa.h>
/*
* ZPL attribute registration table.
* Order of attributes doesn't matter
* a unique value will be assigned for each
* attribute that is file system specific
*
* This is just the set of ZPL attributes that this
* version of ZFS deals with natively. The file system
* could have other attributes stored in files, but they will be
* ignored. The SA framework will preserve them, just that
* this version of ZFS won't change or delete them.
*/
sa_attr_reg_t zfs_attr_table[ZPL_END+1] = {
{"ZPL_ATIME", sizeof (uint64_t) * 2, SA_UINT64_ARRAY, 0},
{"ZPL_MTIME", sizeof (uint64_t) * 2, SA_UINT64_ARRAY, 1},
{"ZPL_CTIME", sizeof (uint64_t) * 2, SA_UINT64_ARRAY, 2},
{"ZPL_CRTIME", sizeof (uint64_t) * 2, SA_UINT64_ARRAY, 3},
{"ZPL_GEN", sizeof (uint64_t), SA_UINT64_ARRAY, 4},
{"ZPL_MODE", sizeof (uint64_t), SA_UINT64_ARRAY, 5},
{"ZPL_SIZE", sizeof (uint64_t), SA_UINT64_ARRAY, 6},
{"ZPL_PARENT", sizeof (uint64_t), SA_UINT64_ARRAY, 7},
{"ZPL_LINKS", sizeof (uint64_t), SA_UINT64_ARRAY, 8},
{"ZPL_XATTR", sizeof (uint64_t), SA_UINT64_ARRAY, 9},
{"ZPL_RDEV", sizeof (uint64_t), SA_UINT64_ARRAY, 10},
{"ZPL_FLAGS", sizeof (uint64_t), SA_UINT64_ARRAY, 11},
{"ZPL_UID", sizeof (uint64_t), SA_UINT64_ARRAY, 12},
{"ZPL_GID", sizeof (uint64_t), SA_UINT64_ARRAY, 13},
{"ZPL_PAD", sizeof (uint64_t) * 4, SA_UINT64_ARRAY, 14},
{"ZPL_ZNODE_ACL", 88, SA_UINT8_ARRAY, 15},
{"ZPL_DACL_COUNT", sizeof (uint64_t), SA_UINT64_ARRAY, 0},
{"ZPL_SYMLINK", 0, SA_UINT8_ARRAY, 0},
{"ZPL_SCANSTAMP", 32, SA_UINT8_ARRAY, 0},
{"ZPL_DACL_ACES", 0, SA_ACL, 0},
{"ZPL_DXATTR", 0, SA_UINT8_ARRAY, 0},
{NULL, 0, 0, 0}
};
#ifdef _KERNEL
int
zfs_sa_readlink(znode_t *zp, uio_t *uio)
{
dmu_buf_t *db = sa_get_db(zp->z_sa_hdl);
size_t bufsz;
int error;
bufsz = zp->z_size;
if (bufsz + ZFS_OLD_ZNODE_PHYS_SIZE <= db->db_size) {
error = uiomove((caddr_t)db->db_data +
ZFS_OLD_ZNODE_PHYS_SIZE,
MIN((size_t)bufsz, uio->uio_resid), UIO_READ, uio);
} else {
dmu_buf_t *dbp;
if ((error = dmu_buf_hold(ZTOZSB(zp)->z_os, zp->z_id,
0, FTAG, &dbp, DMU_READ_NO_PREFETCH)) == 0) {
error = uiomove(dbp->db_data,
MIN((size_t)bufsz, uio->uio_resid), UIO_READ, uio);
dmu_buf_rele(dbp, FTAG);
}
}
return (error);
}
void
zfs_sa_symlink(znode_t *zp, char *link, int len, dmu_tx_t *tx)
{
dmu_buf_t *db = sa_get_db(zp->z_sa_hdl);
if (ZFS_OLD_ZNODE_PHYS_SIZE + len <= dmu_bonus_max()) {
VERIFY(dmu_set_bonus(db,
len + ZFS_OLD_ZNODE_PHYS_SIZE, tx) == 0);
if (len) {
bcopy(link, (caddr_t)db->db_data +
ZFS_OLD_ZNODE_PHYS_SIZE, len);
}
} else {
dmu_buf_t *dbp;
zfs_grow_blocksize(zp, len, tx);
VERIFY(0 == dmu_buf_hold(ZTOZSB(zp)->z_os,
zp->z_id, 0, FTAG, &dbp, DMU_READ_NO_PREFETCH));
dmu_buf_will_dirty(dbp, tx);
ASSERT3U(len, <=, dbp->db_size);
bcopy(link, dbp->db_data, len);
dmu_buf_rele(dbp, FTAG);
}
}
void
zfs_sa_get_scanstamp(znode_t *zp, xvattr_t *xvap)
{
zfs_sb_t *zsb = ZTOZSB(zp);
xoptattr_t *xoap;
ASSERT(MUTEX_HELD(&zp->z_lock));
VERIFY((xoap = xva_getxoptattr(xvap)) != NULL);
if (zp->z_is_sa) {
if (sa_lookup(zp->z_sa_hdl, SA_ZPL_SCANSTAMP(zsb),
&xoap->xoa_av_scanstamp,
sizeof (xoap->xoa_av_scanstamp)) != 0)
return;
} else {
dmu_object_info_t doi;
dmu_buf_t *db = sa_get_db(zp->z_sa_hdl);
int len;
if (!(zp->z_pflags & ZFS_BONUS_SCANSTAMP))
return;
sa_object_info(zp->z_sa_hdl, &doi);
len = sizeof (xoap->xoa_av_scanstamp) +
ZFS_OLD_ZNODE_PHYS_SIZE;
if (len <= doi.doi_bonus_size) {
(void) memcpy(xoap->xoa_av_scanstamp,
(caddr_t)db->db_data + ZFS_OLD_ZNODE_PHYS_SIZE,
sizeof (xoap->xoa_av_scanstamp));
}
}
XVA_SET_RTN(xvap, XAT_AV_SCANSTAMP);
}
void
zfs_sa_set_scanstamp(znode_t *zp, xvattr_t *xvap, dmu_tx_t *tx)
{
zfs_sb_t *zsb = ZTOZSB(zp);
xoptattr_t *xoap;
ASSERT(MUTEX_HELD(&zp->z_lock));
VERIFY((xoap = xva_getxoptattr(xvap)) != NULL);
if (zp->z_is_sa)
VERIFY(0 == sa_update(zp->z_sa_hdl, SA_ZPL_SCANSTAMP(zsb),
&xoap->xoa_av_scanstamp,
sizeof (xoap->xoa_av_scanstamp), tx));
else {
dmu_object_info_t doi;
dmu_buf_t *db = sa_get_db(zp->z_sa_hdl);
int len;
sa_object_info(zp->z_sa_hdl, &doi);
len = sizeof (xoap->xoa_av_scanstamp) +
ZFS_OLD_ZNODE_PHYS_SIZE;
if (len > doi.doi_bonus_size)
VERIFY(dmu_set_bonus(db, len, tx) == 0);
(void) memcpy((caddr_t)db->db_data + ZFS_OLD_ZNODE_PHYS_SIZE,
xoap->xoa_av_scanstamp, sizeof (xoap->xoa_av_scanstamp));
zp->z_pflags |= ZFS_BONUS_SCANSTAMP;
VERIFY(0 == sa_update(zp->z_sa_hdl, SA_ZPL_FLAGS(zsb),
&zp->z_pflags, sizeof (uint64_t), tx));
}
}
int
zfs_sa_get_xattr(znode_t *zp)
{
zfs_sb_t *zsb = ZTOZSB(zp);
sa_handle_t *sa;
char *obj;
int size;
int error;
ASSERT(RW_LOCK_HELD(&zp->z_xattr_lock));
ASSERT(!zp->z_xattr_cached);
ASSERT(zp->z_is_sa);
error = sa_handle_get(zsb->z_os, zp->z_id, NULL, SA_HDL_PRIVATE, &sa);
if (error)
return (error);
error = sa_size(sa, SA_ZPL_DXATTR(zsb), &size);
if (error) {
sa_handle_destroy(sa);
if (error == ENOENT)
return nvlist_alloc(&zp->z_xattr_cached,
NV_UNIQUE_NAME, KM_SLEEP);
else
return (error);
}
obj = sa_spill_alloc(KM_SLEEP);
error = sa_lookup(sa, SA_ZPL_DXATTR(zsb), obj, size);
if (error == 0)
error = nvlist_unpack(obj, size, &zp->z_xattr_cached, KM_SLEEP);
sa_spill_free(obj);
sa_handle_destroy(sa);
return (error);
}
int
zfs_sa_set_xattr(znode_t *zp)
{
zfs_sb_t *zsb = ZTOZSB(zp);
sa_handle_t *sa;
dmu_tx_t *tx;
char *obj;
size_t size;
int error;
ASSERT(RW_WRITE_HELD(&zp->z_xattr_lock));
ASSERT(zp->z_xattr_cached);
ASSERT(zp->z_is_sa);
error = nvlist_size(zp->z_xattr_cached, &size, NV_ENCODE_XDR);
if (error)
goto out;
obj = sa_spill_alloc(KM_SLEEP);
error = nvlist_pack(zp->z_xattr_cached, &obj, &size,
NV_ENCODE_XDR, KM_SLEEP);
if (error)
goto out_free;
/*
* A private SA handle must be used to ensure we can drop the hold
* on the spill block prior to calling dmu_tx_commit(). If we call
* dmu_tx_commit() before sa_handle_destroy(), then our hold will
* trigger a copy of the buffer at txg sync time. This is done to
* prevent data from leaking in to the syncing txg. As a result
* the original dirty spill block will be remain dirty in the arc
* while the copy is written and laundered.
*/
error = sa_handle_get(zsb->z_os, zp->z_id, NULL, SA_HDL_PRIVATE, &sa);
if (error)
goto out_free;
tx = dmu_tx_create(zsb->z_os);
dmu_tx_hold_sa_create(tx, size);
dmu_tx_hold_sa(tx, sa, B_TRUE);
error = dmu_tx_assign(tx, TXG_WAIT);
if (error) {
dmu_tx_abort(tx);
sa_handle_destroy(sa);
} else {
error = sa_update(sa, SA_ZPL_DXATTR(zsb), obj, size, tx);
sa_handle_destroy(sa);
if (error)
dmu_tx_abort(tx);
else
dmu_tx_commit(tx);
}
out_free:
sa_spill_free(obj);
out:
return (error);
}
/*
* I'm not convinced we should do any of this upgrade.
* since the SA code can read both old/new znode formats
* with probably little to know performance difference.
*
* All new files will be created with the new format.
*/
void
zfs_sa_upgrade(sa_handle_t *hdl, dmu_tx_t *tx)
{
dmu_buf_t *db = sa_get_db(hdl);
znode_t *zp = sa_get_userdata(hdl);
zfs_sb_t *zsb = ZTOZSB(zp);
int count = 0;
sa_bulk_attr_t *bulk, *sa_attrs;
zfs_acl_locator_cb_t locate = { 0 };
uint64_t uid, gid, mode, rdev, xattr, parent;
uint64_t crtime[2], mtime[2], ctime[2];
zfs_acl_phys_t znode_acl;
char scanstamp[AV_SCANSTAMP_SZ];
boolean_t drop_lock = B_FALSE;
/*
* No upgrade if ACL isn't cached
* since we won't know which locks are held
* and ready the ACL would require special "locked"
* interfaces that would be messy
*/
if (zp->z_acl_cached == NULL || S_ISLNK(ZTOI(zp)->i_mode))
return;
/*
* If the z_lock is held and we aren't the owner
* the just return since we don't want to deadlock
* trying to update the status of z_is_sa. This
* file can then be upgraded at a later time.
*
* Otherwise, we know we are doing the
* sa_update() that caused us to enter this function.
*/
if (mutex_owner(&zp->z_lock) != curthread) {
if (mutex_tryenter(&zp->z_lock) == 0)
return;
else
drop_lock = B_TRUE;
}
/* First do a bulk query of the attributes that aren't cached */
bulk = kmem_alloc(sizeof(sa_bulk_attr_t) * 20, KM_SLEEP);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_MTIME(zsb), NULL, &mtime, 16);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_CTIME(zsb), NULL, &ctime, 16);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_CRTIME(zsb), NULL, &crtime, 16);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_MODE(zsb), NULL, &mode, 8);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_PARENT(zsb), NULL, &parent, 8);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_XATTR(zsb), NULL, &xattr, 8);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_RDEV(zsb), NULL, &rdev, 8);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_UID(zsb), NULL, &uid, 8);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_GID(zsb), NULL, &gid, 8);
SA_ADD_BULK_ATTR(bulk, count, SA_ZPL_ZNODE_ACL(zsb), NULL,
&znode_acl, 88);
if (sa_bulk_lookup_locked(hdl, bulk, count) != 0) {
kmem_free(bulk, sizeof(sa_bulk_attr_t) * 20);
goto done;
}
/*
* While the order here doesn't matter its best to try and organize
* it is such a way to pick up an already existing layout number
*/
count = 0;
sa_attrs = kmem_zalloc(sizeof(sa_bulk_attr_t) * 20, KM_SLEEP);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_MODE(zsb), NULL, &mode, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_SIZE(zsb), NULL,
&zp->z_size, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_GEN(zsb),
NULL, &zp->z_gen, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_UID(zsb), NULL, &uid, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_GID(zsb), NULL, &gid, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_PARENT(zsb),
NULL, &parent, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_FLAGS(zsb), NULL,
&zp->z_pflags, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_ATIME(zsb), NULL,
zp->z_atime, 16);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_MTIME(zsb), NULL,
&mtime, 16);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_CTIME(zsb), NULL,
&ctime, 16);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_CRTIME(zsb), NULL,
&crtime, 16);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_LINKS(zsb), NULL,
&zp->z_links, 8);
if (S_ISBLK(ZTOI(zp)->i_mode) || S_ISCHR(ZTOI(zp)->i_mode))
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_RDEV(zsb), NULL,
&rdev, 8);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_DACL_COUNT(zsb), NULL,
&zp->z_acl_cached->z_acl_count, 8);
if (zp->z_acl_cached->z_version < ZFS_ACL_VERSION_FUID)
zfs_acl_xform(zp, zp->z_acl_cached, CRED());
locate.cb_aclp = zp->z_acl_cached;
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_DACL_ACES(zsb),
zfs_acl_data_locator, &locate, zp->z_acl_cached->z_acl_bytes);
if (xattr)
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_XATTR(zsb),
NULL, &xattr, 8);
/* if scanstamp then add scanstamp */
if (zp->z_pflags & ZFS_BONUS_SCANSTAMP) {
bcopy((caddr_t)db->db_data + ZFS_OLD_ZNODE_PHYS_SIZE,
scanstamp, AV_SCANSTAMP_SZ);
SA_ADD_BULK_ATTR(sa_attrs, count, SA_ZPL_SCANSTAMP(zsb),
NULL, scanstamp, AV_SCANSTAMP_SZ);
zp->z_pflags &= ~ZFS_BONUS_SCANSTAMP;
}
VERIFY(dmu_set_bonustype(db, DMU_OT_SA, tx) == 0);
VERIFY(sa_replace_all_by_template_locked(hdl, sa_attrs,
count, tx) == 0);
if (znode_acl.z_acl_extern_obj)
VERIFY(0 == dmu_object_free(zsb->z_os,
znode_acl.z_acl_extern_obj, tx));
zp->z_is_sa = B_TRUE;
kmem_free(sa_attrs, sizeof(sa_bulk_attr_t) * 20);
kmem_free(bulk, sizeof(sa_bulk_attr_t) * 20);
done:
if (drop_lock)
mutex_exit(&zp->z_lock);
}
void
zfs_sa_upgrade_txholds(dmu_tx_t *tx, znode_t *zp)
{
if (!ZTOZSB(zp)->z_use_sa || zp->z_is_sa)
return;
dmu_tx_hold_sa(tx, zp->z_sa_hdl, B_TRUE);
if (zfs_external_acl(zp)) {
dmu_tx_hold_free(tx, zfs_external_acl(zp), 0,
DMU_OBJECT_END);
}
}
EXPORT_SYMBOL(zfs_attr_table);
EXPORT_SYMBOL(zfs_sa_readlink);
EXPORT_SYMBOL(zfs_sa_symlink);
EXPORT_SYMBOL(zfs_sa_get_scanstamp);
EXPORT_SYMBOL(zfs_sa_set_scanstamp);
EXPORT_SYMBOL(zfs_sa_get_xattr);
EXPORT_SYMBOL(zfs_sa_set_xattr);
EXPORT_SYMBOL(zfs_sa_upgrade);
EXPORT_SYMBOL(zfs_sa_upgrade_txholds);
#endif