我需要合并多个字典,这是我有例如:

dict1 = {1:{"a":{A}}, 2:{"b":{B}}}

dict2 = {2:{"c":{C}}, 3:{"d":{D}}}

A、B、C和D是树的叶子,比如{"info1":"value", "info2":"value2"}

字典的级别(深度)未知,可能是{2:{"c":{"z":{"y":{c}}}}}

在我的例子中,它表示一个目录/文件结构,节点是文档,叶子是文件。

我想将它们合并得到:

 dict3 = {1:{"a":{A}}, 2:{"b":{B},"c":{C}}, 3:{"d":{D}}}

我不确定如何用Python轻松做到这一点。


当前回答

正如在许多其他答案中提到的,递归算法在这里最有意义。一般来说,在使用递归时,最好创建新值,而不是试图修改任何输入数据结构。

我们需要定义在每个合并步骤中发生的事情。如果两个输入都是字典,这很简单:我们从每一边复制唯一键,然后递归合并重复键的值。导致问题的是基本情况。如果我们拿出一个单独的函数,逻辑会更容易理解。作为占位符,我们可以将这两个值包装在一个元组中:

def merge_leaves(x, y):
    return (x, y)

现在我们的逻辑核心是这样的:

def merge(x, y):
    if not(isinstance(x, dict) and isinstance(y, dict)):
        return merge_leaves(x, y)
    x_keys, y_keys = x.keys(), y.keys()
    result = { k: merge(x[k], y[k]) for k in x_keys & y_keys }
    result.update({k: x[k] for k in x_keys - y_keys})
    result.update({k: y[k] for k in y_keys - x_keys})
    return result

让我们来测试一下:

>>> x = {'a': {'b': 'c', 'd': 'e'}, 'f': 1, 'g': {'h', 'i'}, 'j': None}
>>> y = {'a': {'d': 'e', 'h': 'i'}, 'f': {'b': 'c'}, 'g': 1, 'k': None}
>>> merge(x, y)
{'f': (1, {'b': 'c'}), 'g': ({'h', 'i'}, 1), 'a': {'d': ('e', 'e'), 'b': 'c', 'h': 'i'}, 'j': None, 'k': None}
>>> x # The originals are unmodified.
{'a': {'b': 'c', 'd': 'e'}, 'f': 1, 'g': {'h', 'i'}, 'j': None}
>>> y
{'a': {'d': 'e', 'h': 'i'}, 'f': {'b': 'c'}, 'g': 1, 'k': None}

我们可以很容易地修改叶子归并规则,例如:

def merge_leaves(x, y):
    try:
        return x + y
    except TypeError:
        return Ellipsis

并观察效果:

>>> merge(x, y)
{'f': Ellipsis, 'g': Ellipsis, 'a': {'d': 'ee', 'b': 'c', 'h': 'i'}, 'j': None, 'k': None}

我们还可以通过使用第三方库来根据输入的类型进行分派来潜在地清理这个问题。例如,使用multidispatch,我们可以这样做:

@dispatch(dict, dict)
def merge(x, y):
    x_keys, y_keys = x.keys(), y.keys()
    result = { k: merge(x[k], y[k]) for k in x_keys & y_keys }
    result.update({k: x[k] for k in x_keys - y_keys})
    result.update({k: y[k] for k in y_keys - x_keys})
    return result

@dispatch(str, str)
def merge(x, y):
    return x + y

@dispatch(tuple, tuple)
def merge(x, y):
    return x + y

@dispatch(list, list)
def merge(x, y):
    return x + y

@dispatch(int, int):
def merge(x, y):
    raise ValueError("integer value conflict")

@dispatch(object, object):
    return (x, y)

这允许我们在不编写自己的类型检查的情况下处理叶类型特殊情况的各种组合,并在主递归函数中替换类型检查。

其他回答

在不影响输入字典的情况下返回一个合并。

def _merge_dicts(dictA: Dict = {}, dictB: Dict = {}) -> Dict:
    # it suffices to pass as an argument a clone of `dictA`
    return _merge_dicts_aux(dictA, dictB, copy(dictA))


def _merge_dicts_aux(dictA: Dict = {}, dictB: Dict = {}, result: Dict = {}, path: List[str] = None) -> Dict:

    # conflict path, None if none
    if path is None:
        path = []

    for key in dictB:

        # if the key doesn't exist in A, add the B element to A
        if key not in dictA:
            result[key] = dictB[key]

        else:
            # if the key value is a dict, both in A and in B, merge the dicts
            if isinstance(dictA[key], dict) and isinstance(dictB[key], dict):
                _merge_dicts_aux(dictA[key], dictB[key], result[key], path + [str(key)])

            # if the key value is the same in A and in B, ignore
            elif dictA[key] == dictB[key]:
                pass

            # if the key value differs in A and in B, raise error
            else:
                err: str = f"Conflict at {'.'.join(path + [str(key)])}"
                raise Exception(err)

    return result

灵感来自@andrew cooke的解决方案

还有一个轻微的变化:

下面是一个纯粹的基于python3集的深度更新函数。它通过一次循环遍历一层来更新嵌套字典,并调用自己来更新下一层的字典值:

def deep_update(dict_original, dict_update):
    if isinstance(dict_original, dict) and isinstance(dict_update, dict):
        output=dict(dict_original)
        keys_original=set(dict_original.keys())
        keys_update=set(dict_update.keys())
        similar_keys=keys_original.intersection(keys_update)
        similar_dict={key:deep_update(dict_original[key], dict_update[key]) for key in similar_keys}
        new_keys=keys_update.difference(keys_original)
        new_dict={key:dict_update[key] for key in new_keys}
        output.update(similar_dict)
        output.update(new_dict)
        return output
    else:
        return dict_update

举个简单的例子:

x={'a':{'b':{'c':1, 'd':1}}}
y={'a':{'b':{'d':2, 'e':2}}, 'f':2}

print(deep_update(x, y))
>>> {'a': {'b': {'c': 1, 'd': 2, 'e': 2}}, 'f': 2}

这个问题的一个问题是字典的值可以是任意复杂的数据块。基于这些和其他答案,我得出了以下代码:

class YamlReaderError(Exception):
    pass

def data_merge(a, b):
    """merges b into a and return merged result

    NOTE: tuples and arbitrary objects are not handled as it is totally ambiguous what should happen"""
    key = None
    # ## debug output
    # sys.stderr.write("DEBUG: %s to %s\n" %(b,a))
    try:
        if a is None or isinstance(a, str) or isinstance(a, unicode) or isinstance(a, int) or isinstance(a, long) or isinstance(a, float):
            # border case for first run or if a is a primitive
            a = b
        elif isinstance(a, list):
            # lists can be only appended
            if isinstance(b, list):
                # merge lists
                a.extend(b)
            else:
                # append to list
                a.append(b)
        elif isinstance(a, dict):
            # dicts must be merged
            if isinstance(b, dict):
                for key in b:
                    if key in a:
                        a[key] = data_merge(a[key], b[key])
                    else:
                        a[key] = b[key]
            else:
                raise YamlReaderError('Cannot merge non-dict "%s" into dict "%s"' % (b, a))
        else:
            raise YamlReaderError('NOT IMPLEMENTED "%s" into "%s"' % (b, a))
    except TypeError, e:
        raise YamlReaderError('TypeError "%s" in key "%s" when merging "%s" into "%s"' % (e, key, b, a))
    return a

我的用例是合并YAML文件,其中我只需要处理可能的数据类型的子集。因此我可以忽略元组和其他对象。对我来说,合理的合并逻辑意味着

取代标量 添加列表 通过添加缺失键和更新现有键来合并字典

其他任何事情和不可预见的事情都会导致错误。

这应该有助于将所有项从dict2合并到dict1:

for item in dict2:
    if item in dict1:
        for leaf in dict2[item]:
            dict1[item][leaf] = dict2[item][leaf]
    else:
        dict1[item] = dict2[item]

请测试一下,告诉我们这是否是你想要的。

编辑:

上述解决方案只合并了一个级别,但正确地解决了op给出的例子。如果合并多个级别,应该使用递归。

这实际上是相当棘手的-特别是如果你想要一个有用的错误消息时,事情是不一致的,同时正确地接受重复但一致的条目(这是这里没有其他答案做的..)。

假设你没有大量的条目,递归函数是最简单的:

from functools import reduce

def merge(a, b, path=None):
    "merges b into a"
    if path is None: path = []
    for key in b:
        if key in a:
            if isinstance(a[key], dict) and isinstance(b[key], dict):
                merge(a[key], b[key], path + [str(key)])
            elif a[key] == b[key]:
                pass # same leaf value
            else:
                raise Exception('Conflict at %s' % '.'.join(path + [str(key)]))
        else:
            a[key] = b[key]
    return a

# works
print(merge({1:{"a":"A"},2:{"b":"B"}}, {2:{"c":"C"},3:{"d":"D"}}))
# has conflict
merge({1:{"a":"A"},2:{"b":"B"}}, {1:{"a":"A"},2:{"b":"C"}})

注意,这会使a发生变化——b的内容被添加到a(也会返回a)。如果你想保留a,你可以叫它merge(dict(a) b)

Agf指出(下面),你可能有两个以上的字典,在这种情况下,你可以使用:

reduce(merge, [dict1, dict2, dict3...])

所有内容都将被添加到dict1中。

注意:我编辑了我的初始答案以改变第一个参数;这使得“reduce”更容易解释