0
0
mirror of https://github.com/django/django.git synced 2024-11-21 19:09:18 +01:00

Refs #35718 -- Add JSONArray to django.db.models.functions.

Co-authored-by: Sarah Boyce <42296566+sarahboyce@users.noreply.github.com>
This commit is contained in:
John Parton 2024-09-04 18:13:05 -05:00
parent c4c076223e
commit 6dc6906c17
5 changed files with 325 additions and 1 deletions

View File

@ -1,4 +1,13 @@
from .comparison import Cast, Coalesce, Collate, Greatest, JSONObject, Least, NullIf
from .comparison import (
Cast,
Coalesce,
Collate,
Greatest,
JSONArray,
JSONObject,
Least,
NullIf,
)
from .datetime import (
Extract,
ExtractDay,
@ -97,6 +106,7 @@ __all__ = [
"Coalesce",
"Collate",
"Greatest",
"JSONArray",
"JSONObject",
"Least",
"NullIf",

View File

@ -143,6 +143,74 @@ class Greatest(Func):
return super().as_sqlite(compiler, connection, function="MAX", **extra_context)
class JSONArray(Func):
function = "JSON_ARRAY"
output_field = JSONField()
def as_sql(self, compiler, connection, **extra_context):
if not connection.features.has_json_object_function:
raise NotSupportedError(
"JSONArray() is not supported on this database backend."
)
return super().as_sql(compiler, connection, **extra_context)
def as_native(self, compiler, connection, *, returning, **extra_context):
"""Modify JSON_ARRAY for Oracle and Postgres 16 and later.
Postgres and Oracle both remove SQL NULL values from the array by
default. Adds the NULL ON NULL clause to keep NULL values in the array,
mapping them to JSON null values, which matches the behavior of SQLite.
"""
null_on_null = "NULL ON NULL" if len(self.get_source_expressions()) > 0 else ""
return self.as_sql(
compiler,
connection,
template=(
f"%(function)s(%(expressions)s {null_on_null} RETURNING {returning})"
),
**extra_context,
)
def as_postgresql(self, compiler, connection, **extra_context):
# Explicitly casting source expressions is only required using JSONB_BUILD_ARRAY
# or when using JSON_ARRAY on PostgreSQL 16+ with server-side bindings.
# This is done in all cases for consistency.
# This logic avoids wrapping any source expression in Cast twice.
if all(
isinstance(expression, Cast) for expression in self.get_source_expressions()
):
casted_obj = self
else:
casted_obj = self.copy()
casted_obj.set_source_expressions(
[
(
expression
if isinstance(expression, Cast)
else Cast(expression, expression.output_field)
)
for expression in casted_obj.get_source_expressions()
]
)
if connection.features.is_postgresql_16:
return casted_obj.as_native(
compiler, connection, returning="JSONB", **extra_context
)
return casted_obj.as_sql(
compiler,
connection,
function="JSONB_BUILD_ARRAY",
**extra_context,
)
def as_oracle(self, compiler, connection, **extra_context):
return self.as_native(compiler, connection, returning="CLOB", **extra_context)
class JSONObject(Func):
function = "JSON_OBJECT"
output_field = JSONField()

View File

@ -163,6 +163,34 @@ and ``comment.modified``.
The PostgreSQL behavior can be emulated using ``Coalesce`` if you know
a sensible minimum value to provide as a default.
``JSONArray``
--------------
.. versionadded:: 5.2
.. class:: JSONArray(*expressions)
Accepts a list of field names or expressions and returns a JSON array
containing those values. SQL ``NULL`` will be mapped to JSON scalar
``null`` on all supported backends.
Usage example:
.. code-block:: pycon
>>> from django.db.models import F
>>> from django.db.models.functions import JSONArray, Lower
>>> Author.objects.create(name="Margaret Smith", alias="msmith", age=25)
>>> author = Author.objects.annotate(
... json_array=JSONArray(
... Lower("name"),
... "alias",
... F("age") * 2,
... )
... ).get()
>>> author.json_array
['margaret smith', 'msmith', 50]
``JSONObject``
--------------

View File

@ -0,0 +1,122 @@
import re
import unittest
from django.db import NotSupportedError, connection
from django.db.models import CharField, F, Value
from django.db.models.functions import Cast, JSONArray, Lower
from django.test import TestCase
from django.test.testcases import skipIfDBFeature, skipUnlessDBFeature
from django.utils import timezone
from ..models import Article, Author
@skipUnlessDBFeature("has_json_object_function")
class JSONArrayTests(TestCase):
@classmethod
def setUpTestData(cls):
Author.objects.bulk_create(
[
Author(name="Ivan Ivanov", alias="iivanov"),
Author(name="Bertha Berthy", alias="bberthy"),
]
)
def test_empty(self):
obj = Author.objects.annotate(json_array=JSONArray()).first()
self.assertEqual(obj.json_array, [])
def test_basic(self):
obj = Author.objects.annotate(
json_array=JSONArray(Value("name"), F("name"))
).first()
self.assertEqual(obj.json_array, ["name", "Ivan Ivanov"])
def test_expressions(self):
obj = Author.objects.annotate(
json_array=JSONArray(
Lower("name"),
F("alias"),
F("goes_by"),
Value(30000.15),
F("age") * 2,
)
).first()
self.assertEqual(
obj.json_array,
[
"ivan ivanov",
"iivanov",
None,
30000.15,
60,
],
)
def test_nested_json_array(self):
obj = Author.objects.annotate(
json_array=JSONArray(
F("name"),
JSONArray(F("alias"), F("age")),
)
).first()
self.assertEqual(
obj.json_array,
[
"Ivan Ivanov",
["iivanov", 30],
],
)
def test_nested_empty_json_array(self):
obj = Author.objects.annotate(
json_array=JSONArray(
F("name"),
JSONArray(),
)
).first()
self.assertEqual(
obj.json_array,
[
"Ivan Ivanov",
[],
],
)
def test_textfield(self):
Article.objects.create(
title="The Title",
text="x" * 4000,
written=timezone.now(),
)
obj = Article.objects.annotate(json_array=JSONArray(F("text"))).first()
self.assertEqual(obj.json_array, ["x" * 4000])
@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests")
def test_explicit_cast(self):
obj = Author.objects.annotate(json_array=JSONArray(Cast("age", CharField())))
self.assertTrue(
bool(re.search(r"::varchar", str(obj.query))),
)
self.assertFalse(
bool(re.search(r"::varchar\)?::varchar", str(obj.query))),
)
def test_order_by_key(self):
qs = Author.objects.annotate(arr=JSONArray(F("alias"))).order_by("arr__0")
self.assertQuerySetEqual(qs, Author.objects.order_by("alias"))
def test_order_by_nested_key(self):
qs = Author.objects.annotate(arr=JSONArray(JSONArray(F("alias")))).order_by(
"-arr__0__0"
)
self.assertQuerySetEqual(qs, Author.objects.order_by("-alias"))
@skipIfDBFeature("has_json_object_function")
class JSONObjectNotSupportedTests(TestCase):
def test_not_supported(self):
msg = "JSONArray() is not supported on this database backend."
with self.assertRaisesMessage(NotSupportedError, msg):
Author.objects.annotate(json_array=JSONArray()).get()

View File

@ -0,0 +1,96 @@
from django.db.models import F
from django.db.models.functions import JSONArray, JSONObject
from django.test import TestCase
from django.test.testcases import skipUnlessDBFeature
from ..models import Author
@skipUnlessDBFeature("has_json_object_function")
class JSONArrayObjectTests(TestCase):
@classmethod
def setUpTestData(cls):
Author.objects.bulk_create(
[
Author(name="Ivan Ivanov", alias="iivanov"),
Author(name="Bertha Berthy", alias="bberthy"),
]
)
def test_nested_json_array_object(self):
obj = Author.objects.annotate(
json_array=JSONArray(
JSONObject(
name1="name",
nested_json_object1=JSONObject(
alias1="alias",
age1="age",
),
),
JSONObject(
name2="name",
nested_json_object2=JSONObject(
alias2="alias",
age2="age",
),
),
)
).first()
self.assertEqual(
obj.json_array,
[
{
"name1": "Ivan Ivanov",
"nested_json_object1": {
"alias1": "iivanov",
"age1": 30,
},
},
{
"name2": "Ivan Ivanov",
"nested_json_object2": {
"alias2": "iivanov",
"age2": 30,
},
},
],
)
def test_nested_json_object_array(self):
obj = Author.objects.annotate(
json_object=JSONObject(
name="name",
nested_json_array=JSONArray(
JSONObject(
alias1="alias",
age1="age",
),
JSONObject(
alias2="alias",
age2="age",
),
),
)
).first()
self.assertEqual(
obj.json_object,
{
"name": "Ivan Ivanov",
"nested_json_array": [
{
"alias1": "iivanov",
"age1": 30,
},
{
"alias2": "iivanov",
"age2": 30,
},
],
},
)
def test_order_by_nested_key(self):
qs = Author.objects.annotate(
arr=JSONArray(JSONObject(alias=F("alias")))
).order_by("-arr__0__alias")
self.assertQuerySetEqual(qs, Author.objects.order_by("-alias"))