Skip to content

Commit

Permalink
Merge pull request #15 from M1hacka/update-or-create-single-query
Browse files Browse the repository at this point in the history
Update or create single query
  • Loading branch information
M1ha-Shvn authored Sep 8, 2018
2 parents 392e66c + 805d5ec commit 33001a5
Show file tree
Hide file tree
Showing 17 changed files with 857 additions and 461 deletions.
38 changes: 38 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ env:
- DJANGO=2.0 PG=9.4
- DJANGO=2.0 PG=9.5
- DJANGO=2.0 PG=9.6
- DJANGO=2.1 PG=9.2
- DJANGO=2.1 PG=9.3
- DJANGO=2.1 PG=9.4
- DJANGO=2.1 PG=9.5
- DJANGO=2.1 PG=9.6

matrix:
exclude:
Expand All @@ -57,6 +62,16 @@ matrix:
env: DJANGO=2.0 PG=9.5
- python: 2.7
env: DJANGO=2.0 PG=9.6
- python: 2.7
env: DJANGO=2.1 PG=9.2
- python: 2.7
env: DJANGO=2.1 PG=9.3
- python: 2.7
env: DJANGO=2.1 PG=9.4
- python: 2.7
env: DJANGO=2.1 PG=9.5
- python: 2.7
env: DJANGO=2.1 PG=9.6

# Django 1.9+ doesn't support python 3.3
- python: 3.3
Expand Down Expand Up @@ -99,6 +114,29 @@ matrix:
env: DJANGO=2.0 PG=9.5
- python: 3.3
env: DJANGO=2.0 PG=9.6
- python: 3.3
env: DJANGO=2.1 PG=9.2
- python: 3.3
env: DJANGO=2.1 PG=9.3
- python: 3.3
env: DJANGO=2.1 PG=9.4
- python: 3.3
env: DJANGO=2.1 PG=9.5
- python: 3.3
env: DJANGO=2.1 PG=9.6

# Django 2.1 doesn't support python 3.4
- python: 3.4
env: DJANGO=2.1 PG=9.2
- python: 3.4
env: DJANGO=2.1 PG=9.3
- python: 3.4
env: DJANGO=2.1 PG=9.4
- python: 3.4
env: DJANGO=2.1 PG=9.5
- python: 3.4
env: DJANGO=2.1 PG=9.6


# Django 1.7 doesn't support python 3.5+
- python: 3.5
Expand Down
47 changes: 30 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Django extension to update multiple table records with similar (but not equal) c
* PostgreSQL 9.2+
Previous versions may also work, but haven't been tested.
JSONB operations are available for PostgreSQL 9.4+.
INSERT .. ON CONFLICT is used for PostgreSQL 9.5+.

## Installation
Install via pip:
Expand All @@ -34,17 +35,21 @@ There are 3 query helpers in this library. There parameters are unified and desc
Functions forms raw sql query for PostgreSQL. It's work is not guaranteed on other databases.
Function returns number of updated records.

* `bulk_update_or_create(model, values, key_fields='id', using=None, set_functions=None, update=True, batch_size=None, batch_delay=0)`
* `bulk_update_or_create(model, values, key_fields='id', using=None, set_functions=None, update=True, key_is_unique=True, batch_size=None, batch_delay=0)`
This function finds records by key_fields. It creates not existing records with data, given in values.
If `update` flag is set, it updates existing records with data, given in values.
Update is performed with bulk_udpate function above, so function work is not guaranteed on PostgreSQL only.

Function is done in transaction in 3 queries:
+ Search for existing records
+ Create not existing records (if values have any)
+ Update existing records (if values have any and `update` flag is set)
There are two ways, this function may work:
1) Use INSERT ... ON CONFLICT statement. It is safe, but requires PostgreSQL 9.5+ and unique index on key fields.
This behavior is used by default.
2) 3-query transaction:
+ Search for existing records
+ Create not existing records (if values have any)
+ Update existing records (if values have any and `update` flag is set)
This behavior is used by default on PostgreSQL before 9.5 and if key_is_unique parameter is set to False.
Note that transactional update has a known [race condition issue](https://github.com/M1hacka/django-pg-bulk-update/issues/14) that can't be fixed.

Function returns a tuple, containing number of records inserted and records updated.
Function returns number of records inserted or updated by query.

* `pdnf_clause(key_fields, field_values, key_fields_ops=())`
Pure django implementation of principal disjunctive normal form. It is base on combining Q() objects.
Expand Down Expand Up @@ -137,6 +142,9 @@ There are 3 query helpers in this library. There parameters are unified and desc
* `update: bool`
If flag is not set, bulk_update_or_create function will not update existing records, only creating not existing.

* `key_is_unique: bool`
Defaults to True. Settings this flag to False forces library to use 3-query transactional update_or_create.

* `field_values: Iterable[Union[Iterable[Any], dict]]`
Field values to use in `pdnf_clause` function. They have simpler format than update functions.
It can come in 2 formats:
Expand Down Expand Up @@ -208,7 +216,7 @@ print(list(TestModel.objects.all().order_by("id").values("id", "name", "int_fiel
# ]


inserted, updated = bulk_update_or_create(TestModel, [{
res = bulk_update_or_create(TestModel, [{
"id": 3,
"name": "_concat1",
"int_field": 4
Expand All @@ -218,8 +226,8 @@ inserted, updated = bulk_update_or_create(TestModel, [{
"int_field": 5
}], set_functions={'name': '||'})

print(inserted, updated)
# Outputs: 1, 1
print(res)
# Outputs: 2

print(list(TestModel.objects.all().order_by("id").values("id", "name", "int_field")))
# Outputs: [
Expand Down Expand Up @@ -294,7 +302,7 @@ You can define your own clause operator, creating `AbstractClauseOperator` subcl
In order to simplify method usage of simple `field <op> value` operators,
by default `get_sql()` forms this condition, calling `get_sql_operator()` method, which returns <op>.

Optionally, you can change `def format_field_value(self, field, val, connection, **kwargs)` method,
Optionally, you can change `def format_field_value(self, field, val, connection, cast_type=True, **kwargs)` method,
which formats value according to field rules

Example:
Expand Down Expand Up @@ -336,16 +344,17 @@ You can define your own set function, creating `AbstractSetFunction` subclass an
* `names` attribute
* `supported_field_classes` attribute
* One of:
- `def get_sql_value(self, field, val, connection, val_as_param=True, **kwargs)` method
- `def get_sql_value(self, field, val, connection, val_as_param=True, with_table=False, for_update=True, **kwargs)` method
This method defines new value to set for parameter. It is called from `get_sql(...)` method by default.
- `def get_sql(self, field, val, connection, val_as_param=True, **kwargs)` method
- `def get_sql(self, field, val, connection, val_as_param=True, with_table=False, for_update=True, **kwargs)` method
This method sets full sql and it params to use in set section of update query.
By default it returns: `"%s" = self.get_sql_value(...)`, params

Optionally, you can change:
* `def format_field_value(self, field, val, connection, **kwargs)` method, if input data needs special formatting.
* `def format_field_value(self, field, val, connection, cast_type=False, **kwargs)` method, if input data needs special formatting.
* `def modify_create_params(self, model, key, kwargs)` method, to change data before passing them to model constructor
in `bulk_update_or_create()`
in `bulk_update_or_create()`. This method is used in 3-query transactional update only. INSERT ... ON CONFLICT
uses for_update flag of `get_sql()` and `get_sql_value()` functions

Example:

Expand All @@ -360,7 +369,7 @@ class CustomSetFunction(AbstractSetFunction):
# Names of django field classes, this function supports. You can set None (default) to support any field.
supported_field_classes = {'IntegerField', 'FloatField', 'AutoField', 'BigAutoField'}

def get_sql_value(self, field, val, connection, val_as_param=True, **kwargs):
def get_sql_value(self, field, val, connection, val_as_param=True, with_table=False, for_update=True, **kwargs):
"""
Returns value sql to set into field and parameters for query execution
This method is called from get_sql() by default.
Expand All @@ -369,6 +378,8 @@ class CustomSetFunction(AbstractSetFunction):
:param connection: Connection used to update data
:param val_as_param: If flag is not set, value should be converted to string and inserted into query directly.
Otherwise a placeholder and query parameter will be used
:param with_table: If flag is set, column name in sql is prefixed by table name
:param for_update: If flag is set, returns update sql. Otherwise - insert SQL
:param kwargs: Additional arguments, if needed
:return: A tuple: sql, replacing value in update and a tuple of parameters to pass to cursor
"""
Expand Down Expand Up @@ -403,7 +414,7 @@ Library supports django.contrib.postgres.fields:
+ HStoreField

Note that ArrayField and HStoreField are available since django 1.8, JSONField - since django 1.9.
Also PostgreSQL before 9.4 doesn't support jsonb, and so - JSONField.
PostgreSQL before 9.4 doesn't support jsonb, and so - JSONField.
PostgreSQL 9.4 supports JSONB, but doesn't support concatenation operator (||).
In order to support this set function a special function for postgres 9.4 was written. Add a migration to create it:

Expand All @@ -419,6 +430,8 @@ class Migration(migrations.Migration):
]
```

PostgreSQL before 9.5 doesn't support INSERT ... ON CONFLICT statement. So 3-query transactional update will be used.

## Performance
Test background:
- Django 2.0.2
Expand Down
2 changes: 1 addition & 1 deletion runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
if __name__ == "__main__":
print('Django: ', django.VERSION)
print('Python: ', sys.version)
os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.test_settings'
os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.settings'
django.setup()
TestRunner = get_runner(settings)
test_runner = TestRunner()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name='django-pg-bulk-update',
version='1.1.0',
version='2.0.0',
packages=['django_pg_bulk_update'],
package_dir={'': 'src'},
url='https://github.com/M1hacka/django-pg-bulk-update',
Expand Down
34 changes: 8 additions & 26 deletions src/django_pg_bulk_update/clause_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Type, Optional, Any, Tuple, Iterable, Dict

from django.db import DefaultConnectionProxy
from django.db.models import Field, Model
from django.db.models import Field

from .compatibility import array_available, get_field_db_type
from .utils import get_subclasses, format_field_value
Expand All @@ -17,15 +17,15 @@ class AbstractClauseOperator(object):
def get_django_filters(self, name, value):
# type: (str, Any) -> Dict[str, Any]
"""
This method should return parameter name to use in django QuerySet.fillter() kwargs
This method should return parameter name to use in django QuerySet.filter() kwargs
:param name: Name of the parameter
:param value: Value of the parameter
:return: kwargs to pass to Q() object constructor
"""
raise NotImplementedError("%s must implement get_django_filter method" % self.__class__.__name__)

@classmethod
def get_operation_by_name(cls, name): # type: (str) -> Optional[Type[AbstractClauseOperator]]
def get_operator_by_name(cls, name): # type: (str) -> Optional[Type[AbstractClauseOperator]]
"""
Finds subclass of AbstractOperation applicable to given operation name
:param name: String name to search
Expand All @@ -34,7 +34,7 @@ def get_operation_by_name(cls, name): # type: (str) -> Optional[Type[AbstractCl
try:
return next(sub_cls for sub_cls in get_subclasses(cls, recursive=True) if name in sub_cls.names)
except StopIteration:
raise AssertionError("Operator with name '%s' doesn't exist" % name)
raise ValueError("Operator with name '%s' doesn't exist" % name)

def get_sql_operator(self): # type: () -> str
"""
Expand All @@ -52,42 +52,24 @@ def get_sql(self, table_field, value): # type: (str, str) -> str
"""
return "%s %s %s" % (table_field, self.get_sql_operator(), value)

def get_null_fix_sql(self, model, field_name, connection):
# type: (Type[Model], str, DefaultConnectionProxy) -> str
"""
Bug fix. Postgres wants to know exact type of field to save it
This fake update value is used for each saved column in order to get it's type
:param model: Django model subclass
:param field_name: Name of field fix is got for
:param connection: Database connection used
:return: SQL string
"""
db_table = model._meta.db_table
field = model._meta.get_field(field_name)
return '(SELECT "{key}" FROM "{table}" LIMIT 0)'.format(key=field.column, table=db_table)

def format_field_value(self, field, val, connection, **kwargs):
# type: (Field, Any, DefaultConnectionProxy, **Any) -> Tuple[str, Tuple[Any]]
def format_field_value(self, field, val, connection, cast_type=False, **kwargs):
# type: (Field, Any, DefaultConnectionProxy, bool, **Any) -> Tuple[str, Tuple[Any]]
"""
Formats value, according to field rules
:param field: Django field to take format from
:param val: Value to format
:param connection: Connection used to update data
:param cast_type: Adds type casting to sql if flag is True
:param kwargs: Additional arguments, if needed
:return: A tuple: sql, replacing value in update and a tuple of parameters to pass to cursor
"""
return format_field_value(field, val, connection)
return format_field_value(field, val, connection, cast_type=cast_type)


class AbstractArrayValueOperator(AbstractClauseOperator):
"""
Abstract class partial, that handles an array of field values as input
"""
def get_null_fix_sql(self, model, field_name, connection):
field = model._meta.get_field(field_name)
db_type = get_field_db_type(field, connection)
return '(SELECT ARRAY[]::%s[] LIMIT 0)' % db_type

def format_field_value(self, field, val, connection, **kwargs):
assert isinstance(val, Iterable), "'%s' value must be iterable" % self.__class__.__name__

Expand Down
45 changes: 29 additions & 16 deletions src/django_pg_bulk_update/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
This file contains number of functions to handle different software versions compatibility
"""
import json
from typing import Dict, Any, Optional, Union, Tuple

from django.db.models import Model, Field
from typing import Dict, Any, Optional, Union, Tuple, List, Type

import django
from django.db import connection, connections, models, DefaultConnectionProxy, migrations
Expand All @@ -28,7 +30,7 @@ def jsonb_available(): # type: () -> bool
It is available since django 1.9 and doesn't support Postgres < 9.4
:return: Bool
"""
return get_postgres_version(as_tuple=False) >= 90400 and (django.VERSION[0] > 1 or django.VERSION[1] > 8)
return get_postgres_version() >= (9, 4) and (django.VERSION[0] > 1 or django.VERSION[1] > 8)


def hstore_available(): # type: () -> bool
Expand Down Expand Up @@ -63,36 +65,47 @@ def hstore_serialize(value): # type: (Dict[Any, Any]) -> Dict[str, str]
return val


def get_postgres_version(using=None, as_tuple=True): # type: (Optional[str], bool) -> Union(Tuple[int], int)
def get_postgres_version(using=None, as_tuple=True):
# type: (Optional[str], bool) -> Union[Tuple[int], int]
"""
Returns Postgres server verion used
Returns Postgres server version used
:param using: Connection alias to use
:param as_tuple: If true, returns result as tuple, otherwize as concatenated integer
:param as_tuple: If true, returns result as tuple, otherwise as concatenated integer
:return: Database version as tuple (major, minor, revision) if as_tuple is true.
A single number major*10000 + minor*100 + revision if false.
"""
conn = connection if using is None else connections[using]
num = conn.cursor().connection.server_version
return (num / 10000, num % 10000 / 100, num % 100) if as_tuple else num
return (int(num / 10000), int(num % 10000 / 100), num % 100) if as_tuple else num


def get_field_db_type(field, connection): # type: (models.Field, DefaultConnectionProxy) -> str
def get_field_db_type(field, conn):
# type: (models.Field, DefaultConnectionProxy) -> str
"""
Get database field type used for this field.
:param field: django.db.models.Field instance
:param connection: Datbase connection used
:param conn: Database connection used
:return: Database type name (str)
"""
# We should resolve value as array for IN operator.
# db_type() as id field returned 'serial' instead of 'integer' here
# reL_db_type() return integer, but it is not available before django 1.10
db_type = field.db_type(connection)
if db_type == 'serial':
db_type = 'integer'
elif db_type == 'bigserial':
db_type = 'biginteger'

return db_type
# rel_db_type() return integer, but it is not available before django 1.10
db_type = field.db_type(conn)
return db_type.replace('serial', 'integer')


def get_model_fields(model):
# type: (Type[Model]) -> List[Field]
"""
Returns all model fields.
:param model: Model to get fields for
:return: A list of fields
"""
if hasattr(model._meta, 'get_fields'):
# Django 1.8+
return model._meta.get_fields()
else:
return [f[0] for f in model._meta.get_fields_with_model()]


# Postgres 9.4 has JSONB support, but doesn't support concat operator (||)
Expand Down
Loading

0 comments on commit 33001a5

Please sign in to comment.