diff --git a/cms/djangoapps/contentstore/tests/test_crud.py b/cms/djangoapps/contentstore/tests/test_crud.py
index a16251fa0f7c549c6e954d70ae85d62e8e4b4bbe..b95d58d913270183e7a15801fdd14b6d10effe11 100644
--- a/cms/djangoapps/contentstore/tests/test_crud.py
+++ b/cms/djangoapps/contentstore/tests/test_crud.py
@@ -2,7 +2,7 @@ import unittest
 from xmodule import templates
 from xmodule.modulestore.tests import persistent_factories
 from xmodule.course_module import CourseDescriptor
-from xmodule.modulestore.django import modulestore
+from xmodule.modulestore.django import modulestore, loc_mapper
 from xmodule.seq_module import SequenceDescriptor
 from xmodule.capa_module import CapaDescriptor
 from xmodule.modulestore.locator import CourseLocator, BlockUsageLocator
@@ -191,6 +191,26 @@ class TemplateTests(unittest.TestCase):
         version_history = modulestore('split').get_block_generations(second_problem.location)
         self.assertNotEqual(version_history.locator.version_guid, first_problem.location.version_guid)
 
+    def test_split_inject_loc_mapper(self):
+        """
+        Test that creating a loc_mapper causes it to automatically attach to the split mongo store
+        """
+        # instantiate location mapper before split
+        mapper = loc_mapper()
+        # split must inject the location mapper itself since the mapper existed before it did
+        self.assertEqual(modulestore('split').loc_mapper, mapper)
+
+    def test_loc_inject_into_split(self):
+        """
+        Test that creating a loc_mapper causes it to automatically attach to the split mongo store
+        """
+        # force instantiation of split modulestore before there's a location mapper and verify
+        # it has no pointer to loc mapper
+        self.assertIsNone(modulestore('split').loc_mapper)
+        # force instantiation of location mapper which must inject itself into the split
+        mapper = loc_mapper()
+        self.assertEqual(modulestore('split').loc_mapper, mapper)
+
     # ================================= JSON PARSING ===========================
     # These are example methods for creating xmodules in memory w/o persisting them.
     # They were in x_module but since xblock is not planning to support them but will
diff --git a/cms/envs/acceptance.py b/cms/envs/acceptance.py
index db7332ee227f1b31c8abf587b5d23cda7210d487..e5ed2261b5db73ffd3391d4dc469a343fef3ed0e 100644
--- a/cms/envs/acceptance.py
+++ b/cms/envs/acceptance.py
@@ -24,14 +24,17 @@ from random import choice, randint
 def seed():
     return os.getppid()
 
-MODULESTORE_OPTIONS = {
-    'default_class': 'xmodule.raw_module.RawDescriptor',
+DOC_STORE_CONFIG = {
     'host': 'localhost',
     'db': 'acceptance_xmodule',
     'collection': 'acceptance_modulestore_%s' % seed(),
+}
+
+MODULESTORE_OPTIONS = dict({
+    'default_class': 'xmodule.raw_module.RawDescriptor',
     'fs_root': TEST_ROOT / "data",
     'render_template': 'mitxmako.shortcuts.render_to_string',
-}
+}, **DOC_STORE_CONFIG)
 
 MODULESTORE = {
     'default': {
@@ -97,7 +100,7 @@ SELENIUM_GRID = {
 #####################################################################
 # Lastly, see if the developer has any local overrides.
 try:
-    from .private import *      # pylint: disable=F0401
+    from .private import *  # pylint: disable=F0401
 except ImportError:
     pass
 
diff --git a/cms/envs/dev.py b/cms/envs/dev.py
index 42a6f706b6c3277ec0cdf6f28a1fac6dc61e0686..6e4ce460c3a4b16702cc71b8b714864df5e9b3d7 100644
--- a/cms/envs/dev.py
+++ b/cms/envs/dev.py
@@ -16,14 +16,17 @@ LOGGING = get_logger_config(ENV_ROOT / "log",
                             dev_env=True,
                             debug=True)
 
-modulestore_options = {
-    'default_class': 'xmodule.raw_module.RawDescriptor',
+DOC_STORE_CONFIG = {
     'host': 'localhost',
     'db': 'xmodule',
     'collection': 'modulestore',
+}
+
+modulestore_options = dict({
+    'default_class': 'xmodule.raw_module.RawDescriptor',
     'fs_root': GITHUB_REPO_ROOT,
     'render_template': 'mitxmako.shortcuts.render_to_string',
-}
+}, **DOC_STORE_CONFIG)
 
 MODULESTORE = {
     'default': {
@@ -185,6 +188,6 @@ if SEGMENT_IO_KEY:
 #####################################################################
 # Lastly, see if the developer has any local overrides.
 try:
-    from .private import *      # pylint: disable=F0401
+    from .private import *  # pylint: disable=F0401
 except ImportError:
     pass
diff --git a/cms/envs/test.py b/cms/envs/test.py
index ffbf9f5376d24c4ef6395c7c924d973920bb58a5..364bee2441cb95fecb3de4a746739b3acd5375c0 100644
--- a/cms/envs/test.py
+++ b/cms/envs/test.py
@@ -42,14 +42,17 @@ STATICFILES_DIRS += [
     if os.path.isdir(COMMON_TEST_DATA_ROOT / course_dir)
 ]
 
-MODULESTORE_OPTIONS = {
-    'default_class': 'xmodule.raw_module.RawDescriptor',
+DOC_STORE_CONFIG = {
     'host': 'localhost',
     'db': 'test_xmodule',
     'collection': 'test_modulestore',
+}
+
+MODULESTORE_OPTIONS = dict({
+    'default_class': 'xmodule.raw_module.RawDescriptor',
     'fs_root': TEST_ROOT / "data",
     'render_template': 'mitxmako.shortcuts.render_to_string',
-}
+}, **DOC_STORE_CONFIG)
 
 MODULESTORE = {
     'default': {
diff --git a/common/lib/xmodule/xmodule/modulestore/django.py b/common/lib/xmodule/xmodule/modulestore/django.py
index 93416cae17ee816531b0bf22c3450c2a375de9a8..284f5e4eb783acc7c6e7a55e3085ebe81d20f8dc 100644
--- a/common/lib/xmodule/xmodule/modulestore/django.py
+++ b/common/lib/xmodule/xmodule/modulestore/django.py
@@ -75,6 +75,9 @@ def modulestore(name='default'):
     if name not in _MODULESTORES:
         _MODULESTORES[name] = create_modulestore_instance(settings.MODULESTORE[name]['ENGINE'],
                                                           settings.MODULESTORE[name]['OPTIONS'])
+        # inject loc_mapper into newly created modulestore if it needs it
+        if name == 'split' and _loc_singleton is not None:
+            _MODULESTORES['split'].loc_mapper = _loc_singleton
 
     return _MODULESTORES[name]
 
@@ -90,7 +93,10 @@ def loc_mapper():
     # pylint: disable=W0212
     if _loc_singleton is None:
         # instantiate
-        _loc_singleton = LocMapperStore(settings.modulestore_options)
+        _loc_singleton = LocMapperStore(**settings.DOC_STORE_CONFIG)
+    # inject into split mongo modulestore
+    if 'split' in _MODULESTORES:
+        _MODULESTORES['split'].loc_mapper = _loc_singleton
     return _loc_singleton
 
 
diff --git a/common/lib/xmodule/xmodule/modulestore/exceptions.py b/common/lib/xmodule/xmodule/modulestore/exceptions.py
index 508599b677aa738454635eb8882a030d0eb3a138..411078d821e9b35dbdbfcd9e5e946b699ae201e6 100644
--- a/common/lib/xmodule/xmodule/modulestore/exceptions.py
+++ b/common/lib/xmodule/xmodule/modulestore/exceptions.py
@@ -28,7 +28,14 @@ class NoPathToItem(Exception):
 
 
 class DuplicateItemError(Exception):
-    pass
+    """
+    Attempted to create an item which already exists.
+    """
+    def __init__(self, element_id, store=None, collection=None):
+        super(DuplicateItemError, self).__init__()
+        self.element_id = element_id
+        self.store = store
+        self.collection = collection
 
 
 class VersionConflictError(Exception):
diff --git a/common/lib/xmodule/xmodule/modulestore/inheritance.py b/common/lib/xmodule/xmodule/modulestore/inheritance.py
index 78d584580fd3a946859440d8564b9cbf534bc421..d68c121b96ba25f5cbe2c53fa3e3e1d04807ffb7 100644
--- a/common/lib/xmodule/xmodule/modulestore/inheritance.py
+++ b/common/lib/xmodule/xmodule/modulestore/inheritance.py
@@ -56,8 +56,7 @@ def compute_inherited_metadata(descriptor):
         parent_metadata = descriptor.xblock_kvs.inherited_settings.copy()
         # add any of descriptor's explicitly set fields to the inheriting list
         for field in InheritanceMixin.fields.values():
-            # pylint: disable = W0212
-            if descriptor._field_data.has(descriptor, field.name):
+            if field.is_set_on(descriptor):
                 # inherited_settings values are json repr
                 parent_metadata[field.name] = field.read_json(descriptor)
 
diff --git a/common/lib/xmodule/xmodule/modulestore/loc_mapper_store.py b/common/lib/xmodule/xmodule/modulestore/loc_mapper_store.py
index 5967ee3d9d6a604021c7af54fe323f36fd46ab10..289e4ad1021f73204671f20c1155d6025f0fdc2b 100644
--- a/common/lib/xmodule/xmodule/modulestore/loc_mapper_store.py
+++ b/common/lib/xmodule/xmodule/modulestore/loc_mapper_store.py
@@ -28,7 +28,10 @@ class LocMapperStore(object):
 
     # C0103: varnames and attrs must be >= 3 chars, but db defined by long time usage
     # pylint: disable = C0103
-    def __init__(self, host, db, collection, port=27017, user=None, password=None, **kwargs):
+    def __init__(
+        self, host, db, collection, port=27017, user=None, password=None,
+        **kwargs
+    ):
         '''
         Constructor
         '''
@@ -100,6 +103,7 @@ class LocMapperStore(object):
             'prod_branch': prod_branch,
             'block_map': block_map or {},
         })
+        return course_id
 
     def translate_location(self, old_style_course_id, location, published=True, add_entry_if_missing=True):
         """
@@ -150,7 +154,7 @@ class LocMapperStore(object):
             if add_entry_if_missing:
                 usage_id = self._add_to_block_map(location, location_id, entry['block_map'])
             else:
-                raise ItemNotFoundError()
+                raise ItemNotFoundError(location)
         elif isinstance(usage_id, dict):
             # name is not unique, look through for the right category
             if location.category in usage_id:
@@ -244,7 +248,7 @@ class LocMapperStore(object):
                 if usage_id is None:
                     usage_id = map_entry['block_map'][location.name][location.category]
                 elif usage_id != map_entry['block_map'][location.name][location.category]:
-                    raise DuplicateItemError()
+                    raise DuplicateItemError(usage_id, self, 'location_map')
 
         computed_usage_id = usage_id
 
@@ -257,7 +261,7 @@ class LocMapperStore(object):
                 alt_usage_id = self._verify_uniqueness(computed_usage_id, map_entry['block_map'])
                 if alt_usage_id != computed_usage_id:
                     if usage_id is not None:
-                        raise DuplicateItemError()
+                        raise DuplicateItemError(usage_id, self, 'location_map')
                     else:
                         # revise already set ones and add to remaining ones
                         computed_usage_id = self.update_block_location_translator(
@@ -301,7 +305,7 @@ class LocMapperStore(object):
                     usage_id = self.update_block_location_translator(location, alt_usage_id, old_course_id, True)
                     return usage_id
                 else:
-                    raise DuplicateItemError()
+                    raise DuplicateItemError(usage_id, self, 'location_map')
 
             if location.category in map_entry['block_map'].setdefault(location.name, {}):
                 map_entry['block_map'][location.name][location.category] = usage_id
@@ -335,6 +339,8 @@ class LocMapperStore(object):
             # the block ids will likely be out of sync and collide from an id perspective. HOWEVER,
             # if there are few == org/course roots or their content is unrelated, this will work well.
             usage_id = self._verify_uniqueness(location.category + location.name[:3], block_map)
+        else:
+            usage_id = location.name
         block_map.setdefault(location.name, {})[location.category] = usage_id
         self.location_map.update(location_id, {'$set': {'block_map': block_map}})
         return usage_id
diff --git a/common/lib/xmodule/xmodule/modulestore/locator.py b/common/lib/xmodule/xmodule/modulestore/locator.py
index 916a970ad0d669206f62282349314390d6dc9c19..dfeafa5e1de3c0d456654b5fe7c7907a17455459 100644
--- a/common/lib/xmodule/xmodule/modulestore/locator.py
+++ b/common/lib/xmodule/xmodule/modulestore/locator.py
@@ -377,7 +377,7 @@ class BlockUsageLocator(CourseLocator):
 
         :param block_locator:
         """
-        if self.course_id and self.version_guid:
+        if self.version_guid:
             return BlockUsageLocator(version_guid=self.version_guid,
                                      branch=self.branch,
                                      usage_id=self.usage_id)
diff --git a/common/lib/xmodule/xmodule/modulestore/mongo/base.py b/common/lib/xmodule/xmodule/modulestore/mongo/base.py
index f4c88387f424bb903bfaafb1ed4ca2c4bc730b56..44865ab58e4421a447f3d8cc9d8b3910f6354ca6 100644
--- a/common/lib/xmodule/xmodule/modulestore/mongo/base.py
+++ b/common/lib/xmodule/xmodule/modulestore/mongo/base.py
@@ -324,16 +324,14 @@ class MongoModuleStore(ModuleStoreBase):
         for result in resultset:
             location = Location(result['_id'])
             # We need to collate between draft and non-draft
-            # i.e. draft verticals can have children which are not in non-draft versions
+            # i.e. draft verticals will have draft children but will have non-draft parents currently
             location = location.replace(revision=None)
             location_url = location.url()
             if location_url in results_by_url:
                 existing_children = results_by_url[location_url].get('definition', {}).get('children', [])
                 additional_children = result.get('definition', {}).get('children', [])
                 total_children = existing_children + additional_children
-                if 'definition' not in results_by_url[location_url]:
-                    results_by_url[location_url]['definition'] = {}
-                results_by_url[location_url]['definition']['children'] = total_children
+                results_by_url[location_url].setdefault('definition', {})['children'] = total_children
             results_by_url[location.url()] = result
             if location.category == 'course':
                 root = location.url()
@@ -643,12 +641,11 @@ class MongoModuleStore(ModuleStoreBase):
         """
         # Save any changes to the xmodule to the MongoKeyValueStore
         xmodule.save()
-        # split mongo's persist_dag is more general and useful.
         self.collection.save({
                 '_id': xmodule.location.dict(),
                 'metadata': own_metadata(xmodule),
                 'definition': {
-                    'data': xmodule.xblock_kvs._data,
+                    'data': xmodule.get_explicitly_set_fields_by_scope(Scope.content),
                     'children': xmodule.children if xmodule.has_children else []
                 }
             })
diff --git a/common/lib/xmodule/xmodule/modulestore/mongo/draft.py b/common/lib/xmodule/xmodule/modulestore/mongo/draft.py
index bed42b30a9093654b6c077305c14617e424919ae..b816ab246dc074e3860ba2f57db836494648230e 100644
--- a/common/lib/xmodule/xmodule/modulestore/mongo/draft.py
+++ b/common/lib/xmodule/xmodule/modulestore/mongo/draft.py
@@ -15,6 +15,7 @@ from xmodule.modulestore.inheritance import own_metadata
 from xmodule.modulestore.mongo.base import location_to_query, namedtuple_to_son, get_course_id_no_run, MongoModuleStore
 import pymongo
 from pytz import UTC
+from xblock.fields import Scope
 
 DRAFT = 'draft'
 # Things w/ these categories should never be marked as version='draft'
@@ -237,8 +238,9 @@ class DraftModuleStore(MongoModuleStore):
 
         draft.published_date = datetime.now(UTC)
         draft.published_by = published_by_id
-        super(DraftModuleStore, self).update_item(location, draft._field_data._kvs._data)
-        super(DraftModuleStore, self).update_children(location, draft._field_data._kvs._children)
+        super(DraftModuleStore, self).update_item(location, draft.get_explicitly_set_fields_by_scope(Scope.content))
+        if draft.has_children:
+            super(DraftModuleStore, self).update_children(location, draft.children)
         super(DraftModuleStore, self).update_metadata(location, own_metadata(draft))
         self.delete_item(location)
 
diff --git a/common/lib/xmodule/xmodule/modulestore/split_migrator.py b/common/lib/xmodule/xmodule/modulestore/split_migrator.py
new file mode 100644
index 0000000000000000000000000000000000000000..27a758c0839d05d56d1ad256d913d27ba544d2e0
--- /dev/null
+++ b/common/lib/xmodule/xmodule/modulestore/split_migrator.py
@@ -0,0 +1,181 @@
+'''
+Code for migrating from other modulestores to the split_mongo modulestore.
+
+Exists at the top level of modulestore b/c it needs to know about and access each modulestore.
+
+In general, it's strategy is to treat the other modulestores as read-only and to never directly
+manipulate storage but use existing api's.
+'''
+from xmodule.modulestore import Location
+from xmodule.modulestore.locator import CourseLocator
+from xmodule.modulestore.mongo import draft
+
+
+class SplitMigrator(object):
+    """
+    Copies courses from old mongo to split mongo and sets up location mapping so any references to the old
+    name will be able to find the new elements.
+    """
+    def __init__(self, split_modulestore, direct_modulestore, draft_modulestore, loc_mapper):
+        super(SplitMigrator, self).__init__()
+        self.split_modulestore = split_modulestore
+        self.direct_modulestore = direct_modulestore
+        self.draft_modulestore = draft_modulestore
+        self.loc_mapper = loc_mapper
+
+    def migrate_mongo_course(self, course_location, user_id, new_course_id=None):
+        """
+        Create a new course in split_mongo representing the published and draft versions of the course from the
+        original mongo store. And return the new_course_id (which the caller can also get by calling
+        self.loc_mapper.translate_location(old_course_location)
+
+        If the new course already exists, this raises DuplicateItemError
+
+        :param course_location: a Location whose category is 'course' and points to the course
+        :param user_id: the user whose action is causing this migration
+        :param new_course_id: (optional) the Locator.course_id for the new course. Defaults to
+        whatever translate_location_to_locator returns
+        """
+        new_course_id = self.loc_mapper.create_map_entry(course_location, course_id=new_course_id)
+        old_course_id = course_location.course_id
+        # the only difference in data between the old and split_mongo xblocks are the locations;
+        # so, any field which holds a location must change to a Locator; otherwise, the persistence
+        # layer and kvs's know how to store it.
+        # locations are in location, children, conditionals, course.tab
+
+        # create the course: set fields to explicitly_set for each scope, id_root = new_course_id, master_branch = 'production'
+        original_course = self.direct_modulestore.get_item(course_location)
+        new_course_root_locator = self.loc_mapper.translate_location(old_course_id, course_location)
+        new_course = self.split_modulestore.create_course(
+            course_location.org, original_course.display_name,
+            user_id, id_root=new_course_id,
+            fields=self._get_json_fields_translate_children(original_course, old_course_id, True),
+            root_usage_id=new_course_root_locator.usage_id,
+            master_branch=new_course_root_locator.branch
+        )
+
+        self._copy_published_modules_to_course(new_course, course_location, old_course_id, user_id)
+        self._add_draft_modules_to_course(new_course_id, old_course_id, course_location, user_id)
+
+        return new_course_id
+
+    def _copy_published_modules_to_course(self, new_course, old_course_loc, old_course_id, user_id):
+        """
+        Copy all of the modules from the 'direct' version of the course to the new split course.
+        """
+        course_version_locator = new_course.location.as_course_locator()
+
+        # iterate over published course elements. Wildcarding rather than descending b/c some elements are orphaned (e.g.,
+        # course about pages, conditionals)
+        for module in self.direct_modulestore.get_items(
+            old_course_loc.replace(category=None, name=None, revision=None),
+            old_course_id
+        ):
+            # don't copy the course again. No drafts should get here but check
+            if module.location != old_course_loc and not getattr(module, 'is_draft', False):
+                # create split_xblock using split.create_item
+                # where usage_id is computed by translate_location_to_locator
+                new_locator = self.loc_mapper.translate_location(
+                    old_course_id, module.location, True, add_entry_if_missing=True
+                )
+                _new_module = self.split_modulestore.create_item(
+                    course_version_locator, module.category, user_id,
+                    usage_id=new_locator.usage_id,
+                    fields=self._get_json_fields_translate_children(module, old_course_id, True),
+                    continue_version=True
+                )
+        # after done w/ published items, add version for 'draft' pointing to the published structure
+        index_info = self.split_modulestore.get_course_index_info(course_version_locator)
+        versions = index_info['versions']
+        versions['draft'] = versions['published']
+        self.split_modulestore.update_course_index(course_version_locator, {'versions': versions}, update_versions=True)
+
+        # clean up orphans in published version: in old mongo, parents pointed to the union of their published and draft
+        # children which meant some pointers were to non-existent locations in 'direct'
+        self.split_modulestore.internal_clean_children(course_version_locator)
+
+    def _add_draft_modules_to_course(self, new_course_id, old_course_id, old_course_loc, user_id):
+        """
+        update each draft. Create any which don't exist in published and attach to their parents.
+        """
+        # each true update below will trigger a new version of the structure. We may want to just have one new version
+        # but that's for a later date.
+        new_draft_course_loc = CourseLocator(course_id=new_course_id, branch='draft')
+        # to prevent race conditions of grandchilden being added before their parents and thus having no parent to
+        # add to
+        awaiting_adoption = {}
+        for module in self.draft_modulestore.get_items(
+            old_course_loc.replace(category=None, name=None, revision=draft.DRAFT),
+            old_course_id
+        ):
+            if getattr(module, 'is_draft', False):
+                new_locator = self.loc_mapper.translate_location(
+                    old_course_id, module.location, False, add_entry_if_missing=True
+                )
+                if self.split_modulestore.has_item(new_course_id, new_locator):
+                    # was in 'direct' so draft is a new version
+                    split_module = self.split_modulestore.get_item(new_locator)
+                    # need to remove any no-longer-explicitly-set values and add/update any now set values.
+                    for name, field in split_module.fields.iteritems():
+                        if field.is_set_on(split_module) and not module.fields[name].is_set_on(module):
+                            field.delete_from(split_module)
+                    for name, field in module.fields.iteritems():
+                        # draft children will insert themselves and the others are here already; so, don't do it 2x
+                        if name != 'children' and field.is_set_on(module):
+                            field.write_to(split_module, field.read_from(module))
+
+                    _new_module = self.split_modulestore.update_item(split_module, user_id)
+                else:
+                    # only a draft version (aka, 'private'). parent needs updated too.
+                    # create a new course version just in case the current head is also the prod head
+                    _new_module = self.split_modulestore.create_item(
+                        new_draft_course_loc, module.category, user_id,
+                        usage_id=new_locator.usage_id,
+                        fields=self._get_json_fields_translate_children(module, old_course_id, True)
+                    )
+                    awaiting_adoption[module.location] = new_locator.usage_id
+        for draft_location, new_usage_id in awaiting_adoption.iteritems():
+            for parent_loc in self.draft_modulestore.get_parent_locations(draft_location, old_course_id):
+                old_parent = self.draft_modulestore.get_item(parent_loc)
+                new_parent = self.split_modulestore.get_item(
+                    self.loc_mapper.translate_location(old_course_id, old_parent.location, False)
+                )
+                # this only occurs if the parent was also awaiting adoption
+                if new_usage_id in new_parent.children:
+                    break
+                # find index for module: new_parent may be missing quite a few of old_parent's children
+                new_parent_cursor = 0
+                draft_location = draft_location.url()  # need as string
+                for old_child_loc in old_parent.children:
+                    if old_child_loc == draft_location:
+                        break
+                    sibling_loc = self.loc_mapper.translate_location(old_course_id, Location(old_child_loc), False)
+                    # sibling may move cursor
+                    for idx in range(new_parent_cursor, len(new_parent.children)):
+                        if new_parent.children[idx] == sibling_loc.usage_id:
+                            new_parent_cursor = idx + 1
+                            break
+                new_parent.children.insert(new_parent_cursor, new_usage_id)
+                new_parent = self.split_modulestore.update_item(new_parent, user_id)
+
+    def _get_json_fields_translate_children(self, xblock, old_course_id, published):
+        """
+        Return the json repr for explicitly set fields but convert all children to their usage_id's
+        """
+        fields = self.get_json_fields_explicitly_set(xblock)
+        # this will too generously copy the children even for ones that don't exist in the published b/c the old mongo
+        # had no way of not having parents point to draft only children :-(
+        if 'children' in fields:
+            fields['children'] = [
+                self.loc_mapper.translate_location(
+                    old_course_id, Location(child), published, add_entry_if_missing=True
+                ).usage_id
+                for child in fields['children']]
+        return fields
+
+    def get_json_fields_explicitly_set(self, xblock):
+        """
+        Get the json repr for fields set on this specific xblock
+        :param xblock:
+        """
+        return {field.name: field.read_json(xblock) for field in xblock.fields.itervalues() if field.is_set_on(xblock)}
diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/caching_descriptor_system.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/caching_descriptor_system.py
index 020ebdbbfe577903b1b0e46365d676757a6a797c..ba81a5231d78f3e7ac515d99f10e0d8aa0871c59 100644
--- a/common/lib/xmodule/xmodule/modulestore/split_mongo/caching_descriptor_system.py
+++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/caching_descriptor_system.py
@@ -27,27 +27,27 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
         modulestore: the module store that can be used to retrieve additional
         modules
 
+        course_entry: the originally fetched enveloped course_structure w/ branch and course_id info.
+        Callers to _load_item provide an override but that function ignores the provided structure and
+        only looks at the branch and course_id
+
         module_data: a dict mapping Location -> json that was cached from the
             underlying modulestore
         """
-        # TODO find all references to resources_fs and make handle None
         super(CachingDescriptorSystem, self).__init__(load_item=self._load_item, **kwargs)
         self.modulestore = modulestore
         self.course_entry = course_entry
         self.lazy = lazy
         self.module_data = module_data
-        # TODO see if self.course_id is needed: is already in course_entry but could be > 1 value
         # Compute inheritance
         modulestore.inherit_settings(
-            course_entry.get('blocks', {}),
-            course_entry.get('blocks', {}).get(course_entry.get('root'))
+            course_entry['structure'].get('blocks', {}),
+            course_entry['structure'].get('blocks', {}).get(course_entry['structure'].get('root'))
         )
         self.default_class = default_class
         self.local_modules = {}
 
     def _load_item(self, usage_id, course_entry_override=None):
-        # TODO ensure all callers of system.load_item pass just the id
-
         if isinstance(usage_id, BlockUsageLocator) and isinstance(usage_id.usage_id, LocalId):
             try:
                 return self.local_modules[usage_id]
@@ -60,7 +60,7 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
             self.modulestore.cache_items(self, [usage_id], lazy=self.lazy)
             json_data = self.module_data.get(usage_id)
             if json_data is None:
-                raise ItemNotFoundError
+                raise ItemNotFoundError(usage_id)
 
         class_ = XModuleDescriptor.load_class(
             json_data.get('category'),
@@ -68,9 +68,24 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
         )
         return self.xblock_from_json(class_, usage_id, json_data, course_entry_override)
 
+    # xblock's runtime does not always pass enough contextual information to figure out
+    # which named container (course x branch) or which parent is requesting an item. Because split allows
+    # a many:1 mapping from named containers to structures and because item's identities encode
+    # context as well as unique identity, this function must sometimes infer whether the access is
+    # within an unspecified named container. In most cases, course_entry_override will give the
+    # explicit context; however, runtime.get_block(), e.g., does not. HOWEVER, there are simple heuristics
+    # which will work 99.999% of the time: a runtime is thread & even context specific. The likelihood that
+    # the thread is working with more than one named container pointing to the same specific structure is
+    # low; thus, the course_entry is most likely correct. If the thread is looking at > 1 named container
+    # pointing to the same structure, the access is likely to be chunky enough that the last known container
+    # is the intended one when not given a course_entry_override; thus, the caching of the last branch/course_id.
     def xblock_from_json(self, class_, usage_id, json_data, course_entry_override=None):
         if course_entry_override is None:
             course_entry_override = self.course_entry
+        else:
+            # most recent retrieval is most likely the right one for next caller (see comment above fn)
+            self.course_entry['branch'] = course_entry_override['branch']
+            self.course_entry['course_id'] = course_entry_override['course_id']
         # most likely a lazy loader or the id directly
         definition = json_data.get('definition', {})
         definition_id = self.modulestore.definition_locator(definition)
@@ -80,7 +95,7 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
             usage_id = LocalId()
 
         block_locator = BlockUsageLocator(
-            version_guid=course_entry_override['_id'],
+            version_guid=course_entry_override['structure']['_id'],
             usage_id=usage_id,
             course_id=course_entry_override.get('course_id'),
             branch=course_entry_override.get('branch')
@@ -105,7 +120,7 @@ class CachingDescriptorSystem(MakoDescriptorSystem):
                 json_data,
                 self,
                 BlockUsageLocator(
-                    version_guid=course_entry_override['_id'],
+                    version_guid=course_entry_override['structure']['_id'],
                     usage_id=usage_id
                 ),
                 error_msg=exc_info_to_str(sys.exc_info())
diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py
index bb9aa59016e2660f270fdd54f74a420977f81fe0..d85092b3636a244385d29988f1010030efd4d5c6 100644
--- a/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py
+++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/split.py
@@ -5,20 +5,21 @@ import pymongo
 import re
 from importlib import import_module
 from path import path
+import collections
+import copy
+from pytz import UTC
 
 from xmodule.errortracker import null_error_tracker
 from xmodule.x_module import XModuleDescriptor
 from xmodule.modulestore.locator import BlockUsageLocator, DescriptionLocator, CourseLocator, VersionTree, LocalId
-from xmodule.modulestore.exceptions import InsufficientSpecificationError, VersionConflictError
-from xmodule.modulestore import inheritance, ModuleStoreBase
+from xmodule.modulestore.exceptions import InsufficientSpecificationError, VersionConflictError, DuplicateItemError
+from xmodule.modulestore import inheritance, ModuleStoreBase, Location
 
 from ..exceptions import ItemNotFoundError
 from .definition_lazy_loader import DefinitionLazyLoader
 from .caching_descriptor_system import CachingDescriptorSystem
 from xblock.fields import Scope
 from xblock.runtime import Mixologist
-from pytz import UTC
-import collections
 
 log = logging.getLogger(__name__)
 #==============================================================================
@@ -49,14 +50,17 @@ class SplitMongoModuleStore(ModuleStoreBase):
     A Mongodb backed ModuleStore supporting versions, inheritance,
     and sharing.
     """
+    # pylint: disable=C0103
     def __init__(self, host, db, collection, fs_root, render_template,
                  port=27017, default_class=None,
                  error_tracker=null_error_tracker,
                  user=None, password=None,
                  mongo_options=None,
+                 loc_mapper=None,
                  **kwargs):
 
         super(SplitMongoModuleStore, self).__init__(**kwargs)
+        self.loc_mapper = loc_mapper
         if mongo_options is None:
             mongo_options = {}
 
@@ -113,17 +117,12 @@ class SplitMongoModuleStore(ModuleStoreBase):
         new_module_data = {}
         for usage_id in base_usage_ids:
             new_module_data = self.descendants(
-                system.course_entry['blocks'],
+                system.course_entry['structure']['blocks'],
                 usage_id,
                 depth,
                 new_module_data
             )
 
-        # remove any which were already in module_data (not sure if there's a better way)
-        for newkey in new_module_data.iterkeys():
-            if newkey in system.module_data:
-                del new_module_data[newkey]
-
         if lazy:
             for block in new_module_data.itervalues():
                 block['definition'] = DefinitionLazyLoader(self, block['definition'])
@@ -149,7 +148,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         given depth. Load the definitions into each block if lazy is False;
         otherwise, use the lazy definition placeholder.
         '''
-        system = self._get_cache(course_entry['_id'])
+        system = self._get_cache(course_entry['structure']['_id'])
         if system is None:
             system = CachingDescriptorSystem(
                 modulestore=self,
@@ -162,7 +161,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
                 resources_fs=None,
                 mixins=self.xblock_mixins
             )
-            self._add_cache(course_entry['_id'], system)
+            self._add_cache(course_entry['structure']['_id'], system)
             self.cache_items(system, usage_ids, depth, lazy)
         return [system.load_item(usage_id, course_entry) for usage_id in usage_ids]
 
@@ -187,11 +186,15 @@ class SplitMongoModuleStore(ModuleStoreBase):
         self.thread_cache.course_cache[course_version_guid] = system
         return system
 
-    def _clear_cache(self):
+    def _clear_cache(self, course_version_guid=None):
         """
-        Should only be used by testing or something which implements transactional boundary semantics
+        Should only be used by testing or something which implements transactional boundary semantics.
+        :param course_version_guid: if provided, clear only this entry
         """
-        self.thread_cache.course_cache = {}
+        if course_version_guid:
+            del self.thread_cache.course_cache[course_version_guid]
+        else:
+            self.thread_cache.course_cache = {}
 
     def _lookup_course(self, course_locator):
         '''
@@ -230,14 +233,15 @@ class SplitMongoModuleStore(ModuleStoreBase):
         version_guid = course_locator.as_object_id(version_guid)
         entry = self.structures.find_one({'_id': version_guid})
 
-        # b/c more than one course can use same structure, the 'course_id' is not intrinsic to structure
+        # b/c more than one course can use same structure, the 'course_id' and 'branch' are not intrinsic to structure
         # and the one assoc'd w/ it by another fetch may not be the one relevant to this fetch; so,
-        # fake it by explicitly setting it in the in memory structure.
-
-        if course_locator.course_id:
-            entry['course_id'] = course_locator.course_id
-            entry['branch'] = course_locator.branch
-        return entry
+        # add it in the envelope for the structure.
+        envelope = {
+            'course_id': course_locator.course_id,
+            'branch': course_locator.branch,
+            'structure': entry,
+        }
+        return envelope
 
     def get_courses(self, branch='published', qualifiers=None):
         '''
@@ -260,20 +264,23 @@ class SplitMongoModuleStore(ModuleStoreBase):
         # collect ids and then query for those
         version_guids = []
         id_version_map = {}
-        for course_entry in matching:
-            version_guid = course_entry['versions'][branch]
+        for structure in matching:
+            version_guid = structure['versions'][branch]
             version_guids.append(version_guid)
-            id_version_map[version_guid] = course_entry['_id']
+            id_version_map[version_guid] = structure['_id']
 
         course_entries = self.structures.find({'_id': {'$in': version_guids}})
 
         # get the block for the course element (s/b the root)
         result = []
         for entry in course_entries:
-            # structures are course agnostic but the caller wants to know course, so add it in here
-            entry['course_id'] = id_version_map[entry['_id']]
+            envelope = {
+                'course_id': id_version_map[entry['_id']],
+                'branch': branch,
+                'structure': entry,
+            }
             root = entry['root']
-            result.extend(self._load_items(entry, [root], 0, lazy=True))
+            result.extend(self._load_items(envelope, [root], 0, lazy=True))
         return result
 
     def get_course(self, course_locator):
@@ -284,7 +291,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         raises InsufficientSpecificationError
         '''
         course_entry = self._lookup_course(course_locator)
-        root = course_entry['root']
+        root = course_entry['structure']['root']
         result = self._load_items(course_entry, [root], 0, lazy=True)
         return result[0]
 
@@ -304,7 +311,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         if block_location.usage_id is None:
             raise InsufficientSpecificationError(block_location)
         try:
-            course_structure = self._lookup_course(block_location)
+            course_structure = self._lookup_course(block_location)['structure']
         except ItemNotFoundError:
             # this error only occurs if the course does not exist
             return False
@@ -320,6 +327,15 @@ class SplitMongoModuleStore(ModuleStoreBase):
             descendants.
         raises InsufficientSpecificationError or ItemNotFoundError
         """
+        # intended for temporary support of some pointers being old-style
+        if isinstance(location, Location):
+            if self.loc_mapper is None:
+                raise InsufficientSpecificationError('No location mapper configured')
+            else:
+                location = self.loc_mapper.translate_location(
+                    None, location, location.revision is None,
+                    add_entry_if_missing=False
+                )
         assert isinstance(location, BlockUsageLocator)
         if not location.is_initialized():
             raise InsufficientSpecificationError("Not yet initialized: %s" % location)
@@ -356,7 +372,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
             qualifiers = {}
         course = self._lookup_course(locator)
         items = []
-        for usage_id, value in course['blocks'].iteritems():
+        for usage_id, value in course['structure']['blocks'].iteritems():
             if self._block_matches(value, qualifiers):
                 items.append(usage_id)
 
@@ -378,7 +394,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         """
         return self.get_item(location, depth=depth)
 
-    def get_parent_locations(self, locator, usage_id=None):
+    def get_parent_locations(self, locator, course_id=None):
         '''
         Return the locations (Locators w/ usage_ids) for the parents of this location in this
         course. Could use get_items(location, {'children': usage_id}) but this is slightly faster.
@@ -387,10 +403,9 @@ class SplitMongoModuleStore(ModuleStoreBase):
         :param locator: BlockUsageLocator restricting search scope
         :param usage_id: ignored. Only included for API compatibility. Specify the usage_id within the locator.
         '''
-
         course = self._lookup_course(locator)
         items = []
-        for parent_id, value in course['blocks'].iteritems():
+        for parent_id, value in course['structure']['blocks'].iteritems():
             for child_id in value['fields'].get('children', []):
                 if locator.usage_id == child_id:
                     items.append(BlockUsageLocator(url=locator.as_course_locator(), usage_id=parent_id))
@@ -427,7 +442,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
             'edited_on': when the change was made
         }
         """
-        course = self._lookup_course(course_locator)
+        course = self._lookup_course(course_locator)['structure']
         return {'original_version': course['original_version'],
             'previous_version': course['previous_version'],
             'edited_by': course['edited_by'],
@@ -460,7 +475,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
             return None
         if course_locator.version_guid is None:
             course = self._lookup_course(course_locator)
-            version_guid = course.version_guid
+            version_guid = course['structure']['_id']
         else:
             version_guid = course_locator.version_guid
 
@@ -490,8 +505,8 @@ class SplitMongoModuleStore(ModuleStoreBase):
         The block's history tracks its explicit changes but not the changes in its children.
 
         '''
-        block_locator = block_locator.version_agnostic()
-        course_struct = self._lookup_course(block_locator)
+        # version_agnostic means we don't care if the head and version don't align, trust the version
+        course_struct = self._lookup_course(block_locator.version_agnostic())['structure']
         usage_id = block_locator.usage_id
         update_version_field = 'blocks.{}.edit_info.update_version'.format(usage_id)
         all_versions_with_block = self.structures.find({'original_version': course_struct['original_version'],
@@ -624,46 +639,61 @@ class SplitMongoModuleStore(ModuleStoreBase):
         else:
             return id_root
 
-    # TODO Should I rewrite this to take a new xblock instance rather than to construct it? That is, require the
+    # DHM: Should I rewrite this to take a new xblock instance rather than to construct it? That is, require the
     # caller to use XModuleDescriptor.load_from_json thus reducing similar code and making the object creation and
     # validation behavior a responsibility of the model layer rather than the persistence layer.
-    def create_item(self, course_or_parent_locator, category, user_id, definition_locator=None, fields=None,
-        force=False):
+    def create_item(
+        self, course_or_parent_locator, category, user_id,
+        usage_id=None, definition_locator=None, fields=None,
+        force=False, continue_version=False
+    ):
         """
         Add a descriptor to persistence as the last child of the optional parent_location or just as an element
         of the course (if no parent provided). Return the resulting post saved version with populated locators.
 
-        If the locator is a BlockUsageLocator, then it's assumed to be the parent. If it's a CourseLocator, then it's
+        :param course_or_parent_locator: If BlockUsageLocator, then it's assumed to be the parent.
+        If it's a CourseLocator, then it's
         merely the containing course.
 
         raises InsufficientSpecificationError if there is no course locator.
         raises VersionConflictError if course_id and version_guid given and the current version head != version_guid
             and force is not True.
-        force: fork the structure and don't update the course draftVersion if the above
+        :param force: fork the structure and don't update the course draftVersion if the above
+        :param continue_revision: for multistep transactions, continue revising the given version rather than creating
+        a new version. Setting force to True conflicts with setting this to True and will cause a VersionConflictError
 
-        The incoming definition_locator should either be None to indicate this is a brand new definition or
+        :param definition_locator: should either be None to indicate this is a brand new definition or
         a pointer to the existing definition to which this block should point or from which this was derived.
         If fields does not contain any Scope.content, then definition_locator must have a value meaning that this
         block points
         to the existing definition. If fields contains Scope.content and definition_locator is not None, then
         the Scope.content fields are assumed to be a new payload for definition_locator.
 
-        Creates a new version of the course structure, creates and inserts the new block, makes the block point
+        :param usage_id: if provided, must not already exist in the structure. Provides the block id for the
+        new item in this structure. Otherwise, one is computed using the category appended w/ a few digits.
+
+        :param continue_version: continue changing the current structure at the head of the course. Very dangerous
+        unless used in the same request as started the change! See below about version conflicts.
+
+        This method creates a new version of the course structure unless continue_version is True.
+        It creates and inserts the new block, makes the block point
         to the definition which may be new or a new version of an existing or an existing.
+
         Rules for course locator:
 
         * If the course locator specifies a course_id and either it doesn't
-          specify version_guid or the one it specifies == the current draft, it progresses the course to point
-          to the new draft and sets the active version to point to the new draft
-        * If the locator has a course_id but its version_guid != current draft, it raises VersionConflictError.
+          specify version_guid or the one it specifies == the current head of the branch,
+          it progresses the course to point
+          to the new head and sets the active version to point to the new head
+        * If the locator has a course_id but its version_guid != current head, it raises VersionConflictError.
 
         NOTE: using a version_guid will end up creating a new version of the course. Your new item won't be in
         the course id'd by version_guid but instead in one w/ a new version_guid. Ensure in this case that you get
         the new version_guid from the locator in the returned object!
         """
         # find course_index entry if applicable and structures entry
-        index_entry = self._get_index_if_valid(course_or_parent_locator, force)
-        structure = self._lookup_course(course_or_parent_locator)
+        index_entry = self._get_index_if_valid(course_or_parent_locator, force, continue_version)
+        structure = self._lookup_course(course_or_parent_locator)['structure']
 
         partitioned_fields = self._partition_fields_by_scope(category, fields)
         new_def_data = partitioned_fields.get(Scope.content, {})
@@ -674,17 +704,21 @@ class SplitMongoModuleStore(ModuleStoreBase):
             definition_locator, _ = self.update_definition_from_data(definition_locator, new_def_data, user_id)
 
         # copy the structure and modify the new one
-        new_structure = self._version_structure(structure, user_id)
+        if continue_version:
+            new_structure = structure
+        else:
+            new_structure = self._version_structure(structure, user_id)
+
         # generate an id
-        new_usage_id = self._generate_usage_id(new_structure['blocks'], category)
+        if usage_id is not None:
+            if usage_id in new_structure['blocks']:
+                raise DuplicateItemError(usage_id, self, 'structures')
+            else:
+                new_usage_id = usage_id
+        else:
+            new_usage_id = self._generate_usage_id(new_structure['blocks'], category)
+
         update_version_keys = ['blocks.{}.edit_info.update_version'.format(new_usage_id)]
-        if isinstance(course_or_parent_locator, BlockUsageLocator) and course_or_parent_locator.usage_id is not None:
-            parent = new_structure['blocks'][course_or_parent_locator.usage_id]
-            parent['fields'].setdefault('children', []).append(new_usage_id)
-            parent['edit_info']['edited_on'] = datetime.datetime.now(UTC)
-            parent['edit_info']['edited_by'] = user_id
-            parent['edit_info']['previous_version'] = parent['edit_info']['update_version']
-            update_version_keys.append('blocks.{}.edit_info.update_version'.format(course_or_parent_locator.usage_id))
         block_fields = partitioned_fields.get(Scope.settings, {})
         if Scope.children in partitioned_fields:
             block_fields.update(partitioned_fields[Scope.children])
@@ -698,26 +732,50 @@ class SplitMongoModuleStore(ModuleStoreBase):
                 'previous_version': None
             }
         }
-        new_id = self.structures.insert(new_structure)
+        # if given parent, add new block as child and update parent's version
+        parent = None
+        if isinstance(course_or_parent_locator, BlockUsageLocator) and course_or_parent_locator.usage_id is not None:
+            parent = new_structure['blocks'][course_or_parent_locator.usage_id]
+            parent['fields'].setdefault('children', []).append(new_usage_id)
+            if not continue_version or parent['edit_info']['update_version'] != structure['_id']:
+                parent['edit_info']['edited_on'] = datetime.datetime.now(UTC)
+                parent['edit_info']['edited_by'] = user_id
+                parent['edit_info']['previous_version'] = parent['edit_info']['update_version']
+                update_version_keys.append(
+                    'blocks.{}.edit_info.update_version'.format(course_or_parent_locator.usage_id)
+                )
+        if continue_version:
+            new_id = structure['_id']
+            # db update
+            self.structures.update({'_id': new_id}, new_structure)
+            # clear cache so things get refetched and inheritance recomputed
+            self._clear_cache(new_id)
+        else:
+            new_id = self.structures.insert(new_structure)
+
         update_version_payload = {key: new_id for key in update_version_keys}
-        self.structures.update({'_id': new_id},
+        self.structures.update(
+            {'_id': new_id},
             {'$set': update_version_payload})
 
         # update the index entry if appropriate
         if index_entry is not None:
-            self._update_head(index_entry, course_or_parent_locator.branch, new_id)
+            if not continue_version:
+                self._update_head(index_entry, course_or_parent_locator.branch, new_id)
             course_parent = course_or_parent_locator.as_course_locator()
         else:
             course_parent = None
 
-        # fetch and return the new item--fetching is unnecessary but a good qc step
+        # reconstruct the new_item from the cache
         return self.get_item(BlockUsageLocator(course_id=course_parent,
                                                usage_id=new_usage_id,
                                                version_guid=new_id))
 
     def create_course(
         self, org, prettyid, user_id, id_root=None, fields=None,
-        master_branch='draft', versions_dict=None, root_category='course'):
+        master_branch='draft', versions_dict=None, root_category='course',
+        root_usage_id='course'
+    ):
         """
         Create a new entry in the active courses index which points to an existing or new structure. Returns
         the course root of the resulting entry (the location has the course id)
@@ -749,7 +807,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         provide any fields overrides, see above). if not provided, will create a mostly empty course
         structure with just a category course root xblock.
         """
-        partitioned_fields = self._partition_fields_by_scope('course', fields)
+        partitioned_fields = self._partition_fields_by_scope(root_category, fields)
         block_fields = partitioned_fields.setdefault(Scope.settings, {})
         if Scope.children in partitioned_fields:
             block_fields.update(partitioned_fields[Scope.children])
@@ -773,13 +831,13 @@ class SplitMongoModuleStore(ModuleStoreBase):
             self.definitions.update({'_id': definition_id}, {'$set': {"edit_info.original_version": definition_id}})
 
             draft_structure = {
-                'root': 'course',
+                'root': root_usage_id,
                 'previous_version': None,
                 'edited_by': user_id,
                 'edited_on': datetime.datetime.now(UTC),
                 'blocks': {
-                    'course': {
-                        'category': 'course',
+                    root_usage_id: {
+                        'category': root_category,
                         'definition': definition_id,
                         'fields': block_fields,
                         'edit_info': {
@@ -792,9 +850,11 @@ class SplitMongoModuleStore(ModuleStoreBase):
             }
             new_id = self.structures.insert(draft_structure)
             draft_structure['original_version'] = new_id
-            self.structures.update({'_id': new_id},
+            self.structures.update(
+                {'_id': new_id},
                 {'$set': {"original_version": new_id,
-                    'blocks.course.edit_info.update_version': new_id}})
+                    'blocks.{}.edit_info.update_version'.format(root_usage_id): new_id}}
+            )
             if versions_dict is None:
                 versions_dict = {master_branch: new_id}
             else:
@@ -803,7 +863,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         else:
             # just get the draft_version structure
             draft_version = CourseLocator(version_guid=versions_dict[master_branch])
-            draft_structure = self._lookup_course(draft_version)
+            draft_structure = self._lookup_course(draft_version)['structure']
             if definition_fields or block_fields:
                 draft_structure = self._version_structure(draft_structure, user_id)
                 root_block = draft_structure['blocks'][draft_structure['root']]
@@ -855,7 +915,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         The implementation tries to detect which, if any changes, actually need to be saved and thus won't version
         the definition, structure, nor course if they didn't change.
         """
-        original_structure = self._lookup_course(descriptor.location)
+        original_structure = self._lookup_course(descriptor.location)['structure']
         index_entry = self._get_index_if_valid(descriptor.location, force)
 
         descriptor.definition_locator, is_updated = self.update_definition_from_data(
@@ -897,7 +957,9 @@ class SplitMongoModuleStore(ModuleStoreBase):
                 self._update_head(index_entry, descriptor.location.branch, new_id)
 
             # fetch and return the new item--fetching is unnecessary but a good qc step
-            return self.get_item(BlockUsageLocator(descriptor.location, version_guid=new_id))
+            new_locator = BlockUsageLocator(descriptor.location)
+            new_locator.version_guid = new_id
+            return self.get_item(new_locator)
         else:
             # nothing changed, just return the one sent in
             return descriptor
@@ -921,7 +983,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         """
         # find course_index entry if applicable and structures entry
         index_entry = self._get_index_if_valid(xblock.location, force)
-        structure = self._lookup_course(xblock.location)
+        structure = self._lookup_course(xblock.location)['structure']
         new_structure = self._version_structure(structure, user_id)
 
         changed_blocks = self._persist_subdag(xblock, user_id, new_structure['blocks'])
@@ -1067,7 +1129,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         the course but leaves the head pointer where it is (this change will not be in the course head).
         """
         assert isinstance(usage_locator, BlockUsageLocator) and usage_locator.is_initialized()
-        original_structure = self._lookup_course(usage_locator)
+        original_structure = self._lookup_course(usage_locator)['structure']
         if original_structure['root'] == usage_locator.usage_id:
             raise ValueError("Cannot delete the root of a course")
         index_entry = self._get_index_if_valid(usage_locator, force)
@@ -1153,7 +1215,13 @@ class SplitMongoModuleStore(ModuleStoreBase):
                 inheriting_settings[field_name] = block_fields[field_name]
 
         for child in block_fields.get('children', []):
-            self.inherit_settings(block_map, block_map[child], inheriting_settings)
+            try:
+                self.inherit_settings(block_map, block_map[child], inheriting_settings)
+            except KeyError:
+                # here's where we need logic for looking up in other structures when we allow cross pointers
+                # but it's also getting this during course creation if creating top down w/ children set or
+                # migration where the old mongo published had pointers to privates
+                pass
 
     def descendants(self, block_map, usage_id, depth, descendent_map):
         """
@@ -1188,6 +1256,24 @@ class SplitMongoModuleStore(ModuleStoreBase):
         else:
             return DescriptionLocator(definition['_id'])
 
+    def internal_clean_children(self, course_locator):
+        """
+        Only intended for rather low level methods to use. Goes through the children attrs of
+        each block removing any whose usage_id is not a member of the course. Does not generate
+        a new version of the course but overwrites the existing one.
+
+        :param course_locator: the course to clean
+        """
+        original_structure = self._lookup_course(course_locator)['structure']
+        for block in original_structure['blocks'].itervalues():
+            if 'fields' in block and 'children' in block['fields']:
+                block['fields']["children"] = [
+                    usage_id for usage_id in block['fields']["children"] if usage_id in original_structure['blocks']
+                ]
+        self.structures.update({'_id': original_structure['_id']}, original_structure)
+        # clear cache again b/c inheritance may be wrong over orphans
+        self._clear_cache(original_structure['_id'])
+
 
     def _block_matches(self, value, qualifiers):
         '''
@@ -1231,10 +1317,9 @@ class SplitMongoModuleStore(ModuleStoreBase):
         """
         if len(lista) != len(listb):
             return False
-        for idx in enumerate(lista):
-            if lista[idx] != listb[idx]:
-                itema = self._usage_id(lista[idx])
-                if itema != self._usage_id(listb[idx]):
+        for ele_a, ele_b in zip(lista, listb):
+            if ele_a != ele_b:
+                if self._usage_id(ele_a) != self._usage_id(ele_b):
                     return False
         return True
 
@@ -1248,30 +1333,41 @@ class SplitMongoModuleStore(ModuleStoreBase):
         else:
             return xblock_or_id
 
-    def _get_index_if_valid(self, locator, force=False):
+    def _get_index_if_valid(self, locator, force=False, continue_version=False):
         """
         If the locator identifies a course and points to its draft (or plausibly its draft),
         then return the index entry.
 
         raises VersionConflictError if not the right version
 
-        :param locator:
+        :param locator: a courselocator
+        :param force: if false, raises VersionConflictError if the current head of the course != the one identified
+        by locator. Cannot be True if continue_version is True
+        :param continue_version: if True, assumes this operation requires a head version and will not create a new
+        version but instead continue an existing transaction on this version. This flag cannot be True if force is True.
         """
         if locator.course_id is None or locator.branch is None:
-            return None
+            if continue_version:
+                raise InsufficientSpecificationError(
+                    "To continue a version, the locator must point to one ({}).".format(locator)
+                )
+            else:
+                return None
         else:
             index_entry = self.course_index.find_one({'_id': locator.course_id})
-            if (locator.version_guid is not None
-                and index_entry['versions'][locator.branch] != locator.version_guid
-                and not force):
+            is_head = (
+                locator.version_guid is None or
+                index_entry['versions'][locator.branch] == locator.version_guid
+            )
+            if (is_head or (force and not continue_version)):
+                return index_entry
+            else:
                 raise VersionConflictError(
                     locator,
                     CourseLocator(
                         course_id=index_entry['_id'],
                         version_guid=index_entry['versions'][locator.branch],
                         branch=locator.branch))
-            else:
-                return index_entry
 
     def _version_structure(self, structure, user_id):
         """
@@ -1279,8 +1375,7 @@ class SplitMongoModuleStore(ModuleStoreBase):
         :param structure:
         :param user_id:
         """
-        new_structure = structure.copy()
-        new_structure['blocks'] = new_structure['blocks'].copy()
+        new_structure = copy.deepcopy(structure)
         del new_structure['_id']
         new_structure['previous_version'] = structure['_id']
         new_structure['edited_by'] = user_id
diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/split_mongo_kvs.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/split_mongo_kvs.py
index e2d8fd30a9be7daa46f4d9f931665d8364f3e9c5..02f5aad9f5b5359d70dd40552d9b050434025e53 100644
--- a/common/lib/xmodule/xmodule/modulestore/split_mongo/split_mongo_kvs.py
+++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/split_mongo_kvs.py
@@ -1,7 +1,6 @@
 import copy
 from xblock.fields import Scope
 from collections import namedtuple
-from xblock.runtime import KeyValueStore
 from xblock.exceptions import InvalidScopeError
 from .definition_lazy_loader import DefinitionLazyLoader
 from xmodule.modulestore.inheritance import InheritanceKeyValueStore
@@ -25,7 +24,8 @@ class SplitMongoKVS(InheritanceKeyValueStore):
             Note, local fields may override and disagree w/ this b/c this says what the value
             should be if the field is undefined.
         """
-        super(SplitMongoKVS, self).__init__(copy.copy(fields), inherited_settings)
+        # deepcopy so that manipulations of fields does not pollute the source
+        super(SplitMongoKVS, self).__init__(copy.deepcopy(fields), inherited_settings)
         self._definition = definition  # either a DefinitionLazyLoader or the db id of the definition.
         # if the db id, then the definition is presumed to be loaded into _fields
 
diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_location_mapper.py b/common/lib/xmodule/xmodule/modulestore/tests/test_location_mapper.py
index 599bd6da5edeac2453a7f47d624f39342d211124..abe41eaaed9579f761026995b7fffe26e16c9d70 100644
--- a/common/lib/xmodule/xmodule/modulestore/tests/test_location_mapper.py
+++ b/common/lib/xmodule/xmodule/modulestore/tests/test_location_mapper.py
@@ -192,6 +192,14 @@ class TestLocationMapper(unittest.TestCase):
             add_entry_if_missing=True
         )
         self.assertEqual(prob_locator.course_id, new_style_course_id)
+        # create an entry w/o a guid name
+        other_location = Location('i4x', org, course, 'chapter', 'intro')
+        locator = loc_mapper().translate_location(
+            old_style_course_id,
+            other_location,
+            add_entry_if_missing=True
+        )
+        self.assertEqual(locator.usage_id, 'intro')
 
         # add a distractor course
         loc_mapper().create_map_entry(
diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_locators.py b/common/lib/xmodule/xmodule/modulestore/tests/test_locators.py
index 862b05cd53f3f16835525c8e024629f0d41e38ab..05f4d25283a05063ab65cc5f2ac2127d175322a0 100644
--- a/common/lib/xmodule/xmodule/modulestore/tests/test_locators.py
+++ b/common/lib/xmodule/xmodule/modulestore/tests/test_locators.py
@@ -208,6 +208,12 @@ class LocatorTest(TestCase):
                                      block=expected_block_ref)
         self.assertEqual(str(testobj), testurn)
         self.assertEqual(testobj.url(), 'edx://' + testurn)
+        agnostic = testobj.version_agnostic()
+        self.assertIsNone(agnostic.version_guid)
+        self.check_block_locn_fields(agnostic, 'test_block constructor',
+                                     course_id=expected_id,
+                                     branch=expected_branch,
+                                     block=expected_block_ref)
 
     def test_block_constructor_url_version_prefix(self):
         test_id_loc = '519665f6223ebd6980884f2b'
@@ -220,6 +226,14 @@ class LocatorTest(TestCase):
             block='lab2',
             version_guid=ObjectId(test_id_loc)
         )
+        agnostic = testobj.version_agnostic()
+        self.check_block_locn_fields(
+            agnostic, 'error parsing URL with version and block',
+            block='lab2',
+            course_id=None,
+            version_guid=ObjectId(test_id_loc)
+        )
+        self.assertIsNone(agnostic.course_id)
 
     def test_block_constructor_url_kitchen_sink(self):
         test_id_loc = '519665f6223ebd6980884f2b'
diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_migrator.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_migrator.py
new file mode 100644
index 0000000000000000000000000000000000000000..92b1304d733f7e9613092582d66bf42244c11017
--- /dev/null
+++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_migrator.py
@@ -0,0 +1,273 @@
+"""
+Created on Sep 10, 2013
+
+@author: dmitchell
+
+Tests for split_migrator
+
+"""
+import unittest
+import uuid
+import random
+import mock
+import datetime
+from xmodule.fields import Date
+from xmodule.modulestore import Location
+from xmodule.modulestore.inheritance import InheritanceMixin
+from xmodule.modulestore.loc_mapper_store import LocMapperStore
+from xmodule.modulestore.mongo.draft import DraftModuleStore
+from xmodule.modulestore.split_mongo.split import SplitMongoModuleStore
+from xmodule.modulestore.mongo.base import MongoModuleStore
+from xmodule.modulestore.split_migrator import SplitMigrator
+from xmodule.modulestore.mongo import draft
+
+
+class TestMigration(unittest.TestCase):
+    """
+    Test the split migrator
+    """
+
+    # Snippet of what would be in the django settings envs file
+    db_config = {
+        'host': 'localhost',
+        'db': 'test_xmodule',
+        'collection': 'modulestore{0}'.format(uuid.uuid4().hex),
+    }
+
+    modulestore_options = dict({
+        'default_class': 'xmodule.raw_module.RawDescriptor',
+        'fs_root': '',
+        'render_template': mock.Mock(return_value=""),
+        'xblock_mixins': (InheritanceMixin,)
+    }, **db_config)
+
+    def setUp(self):
+        super(TestMigration, self).setUp()
+        self.loc_mapper = LocMapperStore(**self.db_config)
+        self.old_mongo = MongoModuleStore(**self.modulestore_options)
+        self.draft_mongo = DraftModuleStore(**self.modulestore_options)
+        self.split_mongo = SplitMongoModuleStore(
+            loc_mapper=self.loc_mapper, **self.modulestore_options
+        )
+        self.migrator = SplitMigrator(self.split_mongo, self.old_mongo, self.draft_mongo, self.loc_mapper)
+        self.course_location = None
+        self.create_source_course()
+
+    def tearDown(self):
+        dbref = self.loc_mapper.db
+        dbref.drop_collection(self.loc_mapper.location_map)
+        split_db = self.split_mongo.db
+        split_db.drop_collection(split_db.course_index)
+        split_db.drop_collection(split_db.structures)
+        split_db.drop_collection(split_db.definitions)
+        # old_mongo doesn't give a db attr, but all of the dbs are the same
+        dbref.drop_collection(self.old_mongo.collection)
+
+        dbref.connection.close()
+
+        super(TestMigration, self).tearDown()
+
+    def _create_and_get_item(self, store, location, data, metadata, runtime=None):
+        store.create_and_save_xmodule(location, data, metadata, runtime)
+        return store.get_item(location)
+
+    def create_source_course(self):
+        """
+        A course testing all of the conversion mechanisms:
+        * some inheritable settings
+        * sequences w/ draft and live intermixed children to ensure all get to the draft but
+        only the live ones get to published. Some are only draft, some are both, some are only live.
+        * about, static_tab, and conditional documents
+        """
+        location = Location('i4x', 'test_org', 'test_course', 'course', 'runid')
+        self.course_location = location
+        date_proxy = Date()
+        metadata = {
+            'start': date_proxy.to_json(datetime.datetime(2000, 3, 13, 4)),
+            'display_name': 'Migration test course',
+        }
+        data = {
+            'wiki_slug': 'test_course_slug'
+        }
+        course_root = self._create_and_get_item(self.old_mongo, location, data, metadata)
+        runtime = course_root.runtime
+        # chapters
+        location = location.replace(category='chapter', name=uuid.uuid4().hex)
+        chapter1 = self._create_and_get_item(self.old_mongo, location, {}, {'display_name': 'Chapter 1'}, runtime)
+        course_root.children.append(chapter1.location.url())
+        location = location.replace(category='chapter', name=uuid.uuid4().hex)
+        chapter2 = self._create_and_get_item(self.old_mongo, location, {}, {'display_name': 'Chapter 2'}, runtime)
+        course_root.children.append(chapter2.location.url())
+        self.old_mongo.update_children(course_root.location, course_root.children)
+        # vertical in live only
+        location = location.replace(category='vertical', name=uuid.uuid4().hex)
+        live_vert = self._create_and_get_item(self.old_mongo, location, {}, {'display_name': 'Live vertical'}, runtime)
+        chapter1.children.append(live_vert.location.url())
+        self.create_random_units(self.old_mongo, live_vert)
+        # vertical in both live and draft
+        location = location.replace(category='vertical', name=uuid.uuid4().hex)
+        both_vert = self._create_and_get_item(
+            self.old_mongo, location, {}, {'display_name': 'Both vertical'}, runtime
+        )
+        draft_both = self._create_and_get_item(
+            self.draft_mongo, location, {}, {'display_name': 'Both vertical renamed'}, runtime
+        )
+        chapter1.children.append(both_vert.location.url())
+        self.create_random_units(self.old_mongo, both_vert, self.draft_mongo, draft_both)
+        # vertical in draft only (x2)
+        location = location.replace(category='vertical', name=uuid.uuid4().hex)
+        draft_vert = self._create_and_get_item(
+            self.draft_mongo,
+            location, {}, {'display_name': 'Draft vertical'}, runtime)
+        chapter1.children.append(draft_vert.location.url())
+        self.create_random_units(self.draft_mongo, draft_vert)
+        location = location.replace(category='vertical', name=uuid.uuid4().hex)
+        draft_vert = self._create_and_get_item(
+            self.draft_mongo,
+            location, {}, {'display_name': 'Draft vertical2'}, runtime)
+        chapter1.children.append(draft_vert.location.url())
+        self.create_random_units(self.draft_mongo, draft_vert)
+        # and finally one in live only (so published has to skip 2)
+        location = location.replace(category='vertical', name=uuid.uuid4().hex)
+        live_vert = self._create_and_get_item(
+            self.old_mongo,
+            location, {}, {'display_name': 'Live vertical end'}, runtime)
+        chapter1.children.append(live_vert.location.url())
+        self.create_random_units(self.old_mongo, live_vert)
+
+        # update the chapter
+        self.old_mongo.update_children(chapter1.location, chapter1.children)
+
+        # now the other one w/ the conditional
+        # first create some show children
+        indirect1 = self._create_and_get_item(
+            self.old_mongo,
+            location.replace(category='discussion', name=uuid.uuid4().hex),
+            "", {'display_name': 'conditional show 1'}, runtime
+        )
+        indirect2 = self._create_and_get_item(
+            self.old_mongo,
+            location.replace(category='html', name=uuid.uuid4().hex),
+            "", {'display_name': 'conditional show 2'}, runtime
+        )
+        location = location.replace(category='conditional', name=uuid.uuid4().hex)
+        metadata = {
+            'xml_attributes': {
+                'sources': [live_vert.location.url(), ],
+                'completed': True,
+            },
+        }
+        data = {
+            'show_tag_list': [indirect1.location.url(), indirect2.location.url()]
+        }
+        conditional = self._create_and_get_item(self.old_mongo, location, data, metadata, runtime)
+        conditional.children = [indirect1.location.url(), indirect2.location.url()]
+        # add direct children
+        self.create_random_units(self.old_mongo, conditional)
+        chapter2.children.append(conditional.location.url())
+        self.old_mongo.update_children(chapter2.location, chapter2.children)
+
+        # and the ancillary docs (not children)
+        location = location.replace(category='static_tab', name=uuid.uuid4().hex)
+        # the below automatically adds the tab to the course
+        _tab = self._create_and_get_item(self.old_mongo, location, "", {'display_name': 'Tab uno'}, runtime)
+
+        location = location.replace(category='about', name='overview')
+        _overview = self._create_and_get_item(self.old_mongo, location, "<p>test</p>", {}, runtime)
+        location = location.replace(category='course_info', name='updates')
+        _overview = self._create_and_get_item(
+            self.old_mongo,
+            location, "<ol><li><h2>Sep 22</h2><p>test</p></li></ol>", {}, runtime
+        )
+
+    def create_random_units(self, store, parent, cc_store=None, cc_parent=None):
+        """
+        Create a random selection of units under the given parent w/ random names & attrs
+        :param store: which store (e.g., direct/draft) to create them in
+        :param parent: the parent to have point to them
+        :param cc_store: (optional) if given, make a small change and save also to this store but w/ same location
+        (only makes sense if store is 'direct' and this is 'draft' or vice versa)
+        """
+        for _ in range(random.randrange(6)):
+            location = parent.location.replace(
+                category=random.choice(['html', 'video', 'problem', 'discussion']),
+                name=uuid.uuid4().hex
+            )
+            metadata = {'display_name': str(uuid.uuid4()), 'graded': True}
+            data = {}
+            element = self._create_and_get_item(store, location, data, metadata, parent.runtime)
+            parent.children.append(element.location.url())
+            if cc_store is not None:
+                # change display_name and remove graded to test the delta
+                element = self._create_and_get_item(
+                    cc_store, location, data, {'display_name': str(uuid.uuid4())}, parent.runtime
+                )
+                cc_parent.children.append(element.location.url())
+        store.update_children(parent.location, parent.children)
+        if cc_store is not None:
+            cc_store.update_children(cc_parent.location, cc_parent.children)
+
+    def compare_courses(self, presplit, published):
+        # descend via children to do comparison
+        old_root = presplit.get_item(self.course_location, depth=None)
+        new_root_locator = self.loc_mapper.translate_location(
+            self.course_location.course_id, self.course_location, published, add_entry_if_missing=False
+        )
+        new_root = self.split_mongo.get_course(new_root_locator)
+        self.compare_dags(presplit, old_root, new_root, published)
+
+        # grab the detached items to compare they should be in both published and draft
+        for category in ['conditional', 'about', 'course_info', 'static_tab']:
+            location = self.course_location.replace(name=None, category=category)
+            for conditional in presplit.get_items(location):
+                locator = self.loc_mapper.translate_location(
+                    self.course_location.course_id,
+                    conditional.location, published, add_entry_if_missing=False
+                )
+                self.compare_dags(presplit, conditional, self.split_mongo.get_item(locator), published)
+
+    def compare_dags(self, presplit, presplit_dag_root, split_dag_root, published):
+        # check that locations match
+        self.assertEqual(
+            presplit_dag_root.location,
+            self.loc_mapper.translate_locator_to_location(split_dag_root.location).replace(revision=None)
+        )
+        # compare all fields but children
+        for name in presplit_dag_root.fields.iterkeys():
+            if name != 'children':
+                self.assertEqual(
+                    getattr(presplit_dag_root, name),
+                    getattr(split_dag_root, name),
+                    "{}/{}: {} != {}".format(
+                        split_dag_root.location, name, getattr(presplit_dag_root, name), getattr(split_dag_root, name)
+                    )
+                )
+        # test split get_item using old Location: old draft store didn't set revision for things above vertical
+        # but split does distinguish these; so, set revision if not published
+        if not published:
+            location = draft.as_draft(presplit_dag_root.location)
+        else:
+            location = presplit_dag_root.location
+        refetched = self.split_mongo.get_item(location)
+        self.assertEqual(
+            refetched.location, split_dag_root.location,
+            "Fetch from split via old Location {} not same as new {}".format(
+                refetched.location, split_dag_root.location
+            )
+        )
+        # compare children
+        if presplit_dag_root.has_children:
+            self.assertEqual(
+                len(presplit_dag_root.get_children()), len(split_dag_root.get_children()),
+                "{0.category} '{0.display_name}': children count {1} != {2}".format(
+                    presplit_dag_root, len(presplit_dag_root.get_children()), split_dag_root.children
+                )
+            )
+            for pre_child, split_child in zip(presplit_dag_root.get_children(), split_dag_root.get_children()):
+                self.compare_dags(presplit, pre_child, split_child, published)
+
+    def test_migrator(self):
+        self.migrator.migrate_mongo_course(self.course_location, random.getrandbits(32))
+        # now compare the migrated to the original course
+        self.compare_courses(self.old_mongo, True)
+        self.compare_courses(self.draft_mongo, False)
diff --git a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py
index b299f56711e2ee96c645ee393d3f89a1c623fe7d..0f1b65d0a3d4cc8a797c2bfabcfd9f2c41d83dec 100644
--- a/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py
+++ b/common/lib/xmodule/xmodule/modulestore/tests/test_split_modulestore.py
@@ -11,12 +11,14 @@ from importlib import import_module
 
 from xblock.fields import Scope
 from xmodule.course_module import CourseDescriptor
-from xmodule.modulestore.exceptions import InsufficientSpecificationError, ItemNotFoundError, VersionConflictError
+from xmodule.modulestore.exceptions import InsufficientSpecificationError, ItemNotFoundError, VersionConflictError, \
+    DuplicateItemError
 from xmodule.modulestore.locator import CourseLocator, BlockUsageLocator, VersionTree, DescriptionLocator
 from xmodule.modulestore.inheritance import InheritanceMixin
 from pytz import UTC
 from path import path
 import re
+import random
 
 
 class SplitModuleTest(unittest.TestCase):
@@ -250,7 +252,6 @@ class SplitModuleCourseTests(SplitModuleTest):
         self.assertEqual(str(result.children[0].locator.version_guid), self.GUID_D1)
         self.assertEqual(len(result.children[0].children), 1)
 
-
 class SplitModuleItemTests(SplitModuleTest):
     '''
     Item read tests including inheritance
@@ -492,7 +493,7 @@ class TestItemCrud(SplitModuleTest):
     """
     Test create update and delete of items
     """
-    # TODO do I need to test this case which I believe won't work:
+    # DHM do I need to test this case which I believe won't work:
     #  1) fetch a course and some of its blocks
     #  2) do a series of CRUD operations on those previously fetched elements
     # The problem here will be that the version_guid of the items will be the version at time of fetch.
@@ -604,7 +605,91 @@ class TestItemCrud(SplitModuleTest):
         self.assertGreaterEqual(new_history['edited_on'], premod_time)
         another_history = modulestore().get_definition_history_info(another_module.definition_locator)
         self.assertEqual(another_history['previous_version'], 'problem12345_3_1')
-    # TODO check that default fields are set
+
+    def test_create_continue_version(self):
+        """
+        Test create_item using the continue_version flag
+        """
+        # start transaction w/ simple creation
+        user = random.getrandbits(32)
+        new_course = modulestore().create_course('test_org', 'test_transaction', user)
+        new_course_locator = new_course.location.as_course_locator()
+        index_history_info = modulestore().get_course_history_info(new_course.location)
+        course_block_prev_version = new_course.previous_version
+        course_block_update_version = new_course.update_version
+        self.assertIsNotNone(new_course_locator.version_guid, "Want to test a definite version")
+        versionless_course_locator = CourseLocator(
+            course_id=new_course_locator.course_id, branch=new_course_locator.branch
+        )
+
+        # positive simple case: no force, add chapter
+        new_ele = modulestore().create_item(
+            new_course.location, 'chapter', user,
+            fields={'display_name': 'chapter 1'},
+            continue_version=True
+        )
+        # version info shouldn't change
+        self.assertEqual(new_ele.update_version, course_block_update_version)
+        self.assertEqual(new_ele.update_version, new_ele.location.version_guid)
+        refetch_course = modulestore().get_course(versionless_course_locator)
+        self.assertEqual(refetch_course.location.version_guid, new_course.location.version_guid)
+        self.assertEqual(refetch_course.previous_version, course_block_prev_version)
+        self.assertEqual(refetch_course.update_version, course_block_update_version)
+        refetch_index_history_info = modulestore().get_course_history_info(refetch_course.location)
+        self.assertEqual(refetch_index_history_info, index_history_info)
+        self.assertIn(new_ele.location.usage_id, refetch_course.children)
+
+        # try to create existing item
+        with self.assertRaises(DuplicateItemError):
+            _fail = modulestore().create_item(
+                new_course.location, 'chapter', user,
+                usage_id=new_ele.location.usage_id,
+                fields={'display_name': 'chapter 2'},
+                continue_version=True
+            )
+
+        # start a new transaction
+        new_ele = modulestore().create_item(
+            new_course.location, 'chapter', user,
+            fields={'display_name': 'chapter 2'},
+            continue_version=False
+        )
+        transaction_guid = new_ele.location.version_guid
+        # ensure force w/ continue gives exception
+        with self.assertRaises(VersionConflictError):
+            _fail = modulestore().create_item(
+                new_course.location, 'chapter', user,
+                fields={'display_name': 'chapter 2'},
+                force=True, continue_version=True
+            )
+
+        # ensure trying to continue the old one gives exception
+        with self.assertRaises(VersionConflictError):
+            _fail = modulestore().create_item(
+                new_course.location, 'chapter', user,
+                fields={'display_name': 'chapter 3'},
+                continue_version=True
+            )
+
+        # add new child to old parent in continued (leave off version_guid)
+        course_module_locator = BlockUsageLocator(
+            course_id=new_course.location.course_id,
+            usage_id=new_course.location.usage_id,
+            branch=new_course.location.branch
+        )
+        new_ele = modulestore().create_item(
+            course_module_locator, 'chapter', user,
+            fields={'display_name': 'chapter 4'},
+            continue_version=True
+        )
+        self.assertNotEqual(new_ele.update_version, course_block_update_version)
+        self.assertEqual(new_ele.location.version_guid, transaction_guid)
+
+        # check children, previous_version
+        refetch_course = modulestore().get_course(versionless_course_locator)
+        self.assertIn(new_ele.location.usage_id, refetch_course.children)
+        self.assertEqual(refetch_course.previous_version, course_block_update_version)
+        self.assertEqual(refetch_course.update_version, transaction_guid)
 
     def test_update_metadata(self):
         """
@@ -967,6 +1052,27 @@ class TestCourseCreation(SplitModuleTest):
         course = modulestore().get_course(CourseLocator(course_id=locator.course_id, branch="published"))
         self.assertEqual(str(course.location.version_guid), self.GUID_D1)
 
+    def test_create_with_root(self):
+        """
+        Test create_course with a specified root id and category
+        """
+        user = random.getrandbits(32)
+        new_course = modulestore().create_course(
+            'test_org', 'test_transaction', user,
+            root_usage_id='top', root_category='chapter'
+        )
+        self.assertEqual(new_course.location.usage_id, 'top')
+        self.assertEqual(new_course.category, 'chapter')
+        # look at db to verify
+        db_structure = modulestore().structures.find_one({
+            '_id': new_course.location.as_object_id(new_course.location.version_guid)
+        })
+        self.assertIsNotNone(db_structure, "Didn't find course")
+        self.assertNotIn('course', db_structure['blocks'])
+        self.assertIn('top', db_structure['blocks'])
+        self.assertEqual(db_structure['blocks']['top']['category'], 'chapter')
+
+
 
 class TestInheritance(SplitModuleTest):
     """
diff --git a/common/lib/xmodule/xmodule/x_module.py b/common/lib/xmodule/xmodule/x_module.py
index cd8760a197e83056fa71c3c0676be5a61b6c6886..63a73c45308a582e0543cef88d063feb69965639 100644
--- a/common/lib/xmodule/xmodule/x_module.py
+++ b/common/lib/xmodule/xmodule/x_module.py
@@ -662,8 +662,8 @@ class XModuleDescriptor(XModuleFields, HTMLSnippet, ResourceTemplates, XBlock):
         """
         result = {}
         for field in self.fields.values():
-            if (field.scope == scope and self._field_data.has(self, field.name)):
-                result[field.name] = self._field_data.get(self, field.name)
+            if (field.scope == scope and field.is_set_on(self)):
+                result[field.name] = field.read_json(self)
         return result
 
     @property
diff --git a/lms/envs/acceptance.py b/lms/envs/acceptance.py
index 0383c291b9aaf26e55983c964a19e235ac385515..7d2ad15e6c8e47759614a93a1776fc26cbc875f3 100644
--- a/lms/envs/acceptance.py
+++ b/lms/envs/acceptance.py
@@ -26,14 +26,17 @@ def seed():
     return os.getppid()
 
 # Use the mongo store for acceptance tests
-modulestore_options = {
-    'default_class': 'xmodule.raw_module.RawDescriptor',
+DOC_STORE_CONFIG = {
     'host': 'localhost',
     'db': 'acceptance_xmodule',
     'collection': 'acceptance_modulestore_%s' % seed(),
+}
+
+modulestore_options = dict({
+    'default_class': 'xmodule.raw_module.RawDescriptor',
     'fs_root': TEST_ROOT / "data",
     'render_template': 'mitxmako.shortcuts.render_to_string',
-}
+}, **DOC_STORE_CONFIG)
 
 MODULESTORE = {
     'default': {
@@ -119,7 +122,7 @@ SELENIUM_GRID = {
 #####################################################################
 # See if the developer has any local overrides.
 try:
-    from .private import *      # pylint: disable=F0401
+    from .private import *  # pylint: disable=F0401
 except ImportError:
     pass
 
diff --git a/lms/envs/cms/dev.py b/lms/envs/cms/dev.py
index 0d11c373c9f4f647e8d580198de4cbb307f67537..7026d4536676a35de91a37cb56ecd7916b869654 100644
--- a/lms/envs/cms/dev.py
+++ b/lms/envs/cms/dev.py
@@ -22,14 +22,17 @@ MITX_FEATURES['ENABLE_LMS_MIGRATION'] = False
 
 META_UNIVERSITIES = {}
 
-modulestore_options = {
-    'default_class': 'xmodule.raw_module.RawDescriptor',
+DOC_STORE_CONFIG = {
     'host': 'localhost',
     'db': 'xmodule',
     'collection': 'modulestore',
+}
+
+modulestore_options = dict({
+    'default_class': 'xmodule.raw_module.RawDescriptor',
     'fs_root': DATA_DIR,
     'render_template': 'mitxmako.shortcuts.render_to_string',
-}
+}, **DOC_STORE_CONFIG)
 
 MODULESTORE = {
     'default': {