You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
import datetime
import enum
import json
import logging
import os
from concurrent.futures.thread import ThreadPoolExecutor
from timeit import default_timer as timer
import aiohttp
import requests
import slugify
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from requests.cookies import cookiejar_from_dict
class CollectionFilter(enum.Enum):
FacetFilter = 'facet-value-filter'
def get_product_by_id(params):
product_id = params['id']
token = params['token']
query = """
query Query($productId: ID) {
product(id: $productId) {
enabled
channels {
id
code
}
id
createdAt
updatedAt
languageCode
name
slug
description
featuredAsset {
id
}
assets {
id
}
variants {
enabled
trackInventory
stockOnHand
stockAllocated
useGlobalOutOfStockThreshold
outOfStockThreshold
channels {
id
}
id
product {
id
}
productId
createdAt
updatedAt
languageCode
sku
name
featuredAsset {
id
}
assets {
id
}
price
currencyCode
priceWithTax
taxRateApplied {
id
name
value
}
taxCategory {
id
name
}
options {
id
code
name
groupId
languageCode
}
facetValues {
id
facet {
id
}
name
code
}
translations {
id
name
languageCode
}
}
optionGroups {
id
code
name
}
facetValues {
id
facet {
id
}
name
code
}
collections {
id
name
slug
}
customFields
}
}
"""
request = requests.post("https://devroooms.ru/admin-api",
{"query": query, 'variables': json.dumps({'productId': product_id})},
headers={'authorization': 'Bearer {}'.format(token)})
if request.status_code == 200:
response = request.json()
if response.get('data'):
if response['data'].get('product'):
return response['data']['product']
else:
raise Exception
class VendureAdminApi:
def __init__(self, graphql_url="http://localhost/admin-api"):
self.graphql_url = graphql_url
jar = aiohttp.CookieJar(unsafe=True)
self.transport = AIOHTTPTransport(url=graphql_url, client_session_args={'cookie_jar': jar},)
self.client = Client(transport=self.transport, fetch_schema_from_transport=True)
self.shop_id = None
self.token_datetime = None
self.cookie_login()
self.__take = 100
self.no_image_id = 1
self.tax_category_id = '1'
def bearer_login(self):
query = """
mutation {
login(username: "username", password: "password", rememberMe: true) {
...CurrentUser
...ErrorResult
__typename
}
}fragment CurrentUser on CurrentUser {
id
identifier
channels {
id
code
token
permissions
__typename
}
__typename
}
fragment ErrorResult on ErrorResult {
errorCode
message
__typename
}
"""
p = requests.post(self.graphql_url, {"query": query})
if p.status_code == 200:
self.token_datetime = datetime.datetime.now()
return p.headers.get('vendure-auth-token')
else:
raise Exception
def cookie_login(self):
query = gql("""
mutation AttemptLogin(
$username: String!
$password: String!
$rememberMe: Boolean!
) {
login(username: $username, password: $password, rememberMe: $rememberMe) {
...CurrentUser
...ErrorResult
__typename
}
}fragment CurrentUser on CurrentUser {
id
identifier
channels {
id
code
token
permissions
__typename
}
__typename
}
fragment ErrorResult on ErrorResult {
errorCode
message
__typename
}
""")
variable_values = {'username': 'username', 'password': 'password', 'rememberMe': True}
result = self.client.execute(query, variable_values=variable_values)
self.token_datetime = datetime.datetime.now()
def __execute(self, query, variable_values=None, upload_files=False):
if (datetime.datetime.now() - self.token_datetime).seconds > 3000:
self.cookie_login()
return self.client.execute(query, variable_values=variable_values, upload_files=upload_files)
def get_all_facets(self):
query = gql("""
query Query {
facets {
items {
isPrivate
id
createdAt
updatedAt
languageCode
name
code
values {
id
createdAt
updatedAt
languageCode
name
code
customFields
}
customFields
}
totalItems
}
}
""")
result = self.__execute(query, variable_values={})
if result:
if result.get('facets'):
if result['facets'].get('items'):
return result['facets']['items']
return None
def create_facet(self, name, slug, is_private=False):
query = gql("""
mutation CreateFacetMutation($createFacetInput: CreateFacetInput!) {
createFacet(input: $createFacetInput) {
id
}
}""")
if not name:
raise ValueError('name is required!')
if not slug:
raise ValueError('slug is required!')
variable_values = {
"createFacetInput": {
"code": slug,
"isPrivate": is_private,
"translations": {
'languageCode': 'en',
'name': name
}
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Facet {} was added'.format(slug))
if result:
if result.get('createFacet'):
if result['createFacet'].get('id'):
return result['createFacet']['id']
return None
def create_facet_value(self, facet_id, name, slug):
query = gql("""
mutation CreateFacetValuesMutation($createFacetValuesInput: [CreateFacetValueInput!]!) {
createFacetValues(input: $createFacetValuesInput) {
id
}
}""")
if not name:
raise ValueError('name is required!')
if not slug:
raise ValueError('slug is required!')
if not facet_id:
raise ValueError('facet_id is required!')
variable_values = {
"createFacetValuesInput": {
"code": slug,
"facetId": facet_id,
"translations": {
'languageCode': 'en',
'name': name
}
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Facet value {} was added'.format(slug))
if result:
if result.get('createFacetValues'):
if result['createFacetValues'][0].get('id'):
return result['createFacetValues'][0]['id']
return None
def update_facet_value(self, facet_value_id, name, slug):
query = gql("""
mutation UpdateFacetValuesMutation($updateFacetValuesInput: [UpdateFacetValueInput!]!) {
updateFacetValues(input: $updateFacetValuesInput) {
id
}
}""")
if not name:
raise ValueError('name is required!')
if not slug:
raise ValueError('slug is required!')
if not facet_value_id:
raise ValueError('facet_value_id is required!')
variable_values = {
"updateFacetValuesInput": {
"code": slug,
"id": facet_value_id,
"translations": {
'languageCode': 'en',
'name': name
}
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Facet value {} was updated'.format(slug))
if result:
if result.get('updateFacetValues'):
if result['updateFacetValues'][0].get('id'):
return result['updateFacetValues'][0]['id']
return None
def delete_facet_value(self, facet_values_ids, force=False):
query = gql("""
mutation DeleteFacetValuesMutation($deleteFacetValuesIds: [ID!]!, $deleteFacetValuesForce: Boolean) {
deleteFacetValues(ids: $deleteFacetValuesIds, force: $deleteFacetValuesForce) {
result
}
}""")
if not facet_values_ids:
raise ValueError('facet_values_ids is required!')
variable_values = {
"deleteFacetValuesIds": facet_values_ids,
"deleteFacetValuesForce": force
}
result = self.__execute(query, variable_values=variable_values)
if result:
deleted_values = []
delete_result = result['deleteFacetValues']
for index, facet_value_id in enumerate(facet_values_ids):
if delete_result[index]['result'] == 'DELETED':
logging.info('Facet value {} was deleted'.format(facet_value_id))
deleted_values.append(facet_value_id)
else:
raise NameError('Error deleting facet value {}'.format(facet_value_id))
return deleted_values
return None
def get_all_collections(self):
query = gql("""
query Query {
collections {
items {
isPrivate
id
createdAt
updatedAt
languageCode
name
slug
breadcrumbs {
id
name
slug
}
position
description
parent {
id
}
children {
id
}
translations {
name
slug
languageCode
}
filters {
code
args {
value
name
}
}
assets {
id
}
}
}
}
""")
result = self.__execute(query, variable_values={})
if result:
if result.get('collections'):
if result['collections'].get('items'):
return result['collections']['items']
return None
def get_collection(self, collection_id):
query = gql("""
query Query ($Id: ID) {
collection(id: $Id) {
isPrivate
id
createdAt
updatedAt
languageCode
name
slug
breadcrumbs {
id
name
slug
}
position
description
parent {
id
}
children {
id
}
translations {
name
slug
languageCode
}
filters {
code
args {
value
name
}
}
assets {
id
}
}
}
""")
result = self.__execute(query, variable_values={'Id':collection_id})
if result:
if result.get('collection'):
return result['collection']
return None
def get_all_assets(self, tags=None, take=None, limit = None):
if limit:
if limit < 1:
raise ValueError('limit arg can be less than 1')
_take = take or self.__take
query = gql("""query Query($assetsOptions: AssetListOptions) {
assets(options: $assetsOptions) {
totalItems
}
}""")
result = self.__execute(query, variable_values={
"assetsOptions": {
"tags": tags
}
})
total_items = result.get('assets', dict()).get('totalItems')
if limit:
if total_items > limit:
total_items = limit
items = list()
if total_items:
for i in range(int(total_items / _take) + (total_items % _take > 0)):
skip = i * _take
query = gql("""
query Query($assetsOptions: AssetListOptions) {
assets(options: $assetsOptions) {
items {
id
type
name
mimeType
fileSize
width
height
source
preview
tags {
id
createdAt
updatedAt
value
}
focalPoint {
x
y
}
}
}
}
""")
result = self.__execute(query,
variable_values={"assetsOptions": {"skip": skip, "take": _take, "tags": tags}})
if result:
if result.get('assets'):
if result['assets'].get('items'):
items += result['assets']['items']
if items:
return items
else:
return None
def add_asset(self, url, filename=None, tags=None):
query = gql("""
mutation CreateAssetsMutation($createAssetsInput: [CreateAssetInput!]!) {
createAssets(input: $createAssetsInput) {
... on Asset {
id
}
}
}
""")
r = requests.get(url, allow_redirects=True)
if not filename:
if url.find('/'):
filename = url.rsplit('/', 1)[1]
else:
filename = slugify.slugify(url)
open(filename, 'wb').write(r.content)
file = open(filename, "rb")
variable_values = {
"createAssetsInput": [
{
"file": file,
"tags": tags
}
]
}
try:
result = self.client.execute(query, variable_values=variable_values, upload_files=True)
ret = None
if result:
if result.get('createAssets'):
ret = result['createAssets']
return ret
except Exception as e:
logging.warning(e)
finally:
os.remove(filename)
def update_asset(self, asset_id, name, focal_point=None, tags=None):
query = gql("""
mutation UpdateAssetMutation($updateAssetInput: UpdateAssetInput!) {
updateAsset(input: $updateAssetInput) {
id
}
}""")
if not name:
raise ValueError('name is required!')
if not asset_id:
raise ValueError('asset_id is required!')
variable_values = {
"updateAssetInput": {
"id": asset_id,
"name": name,
"focalPoint": focal_point,
"tags": tags
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Asset {} was updated'.format(name))
if result:
if result.get('updateAsset'):
if result['updateAsset'].get('id'):
return result['updateAsset']['id']
return None
def delete_asset(self, asset_id, force=False):
query = gql("""
mutation DeleteAssetMutation($deleteAssetInput: DeleteAssetInput!) {
deleteAsset(input: $deleteAssetInput) {
result
}
}""")
if not asset_id:
raise ValueError('asset_id is required!')
variable_values = {
"deleteAssetInput": {
"assetId": asset_id,
"force": force,
"deleteFromAllChannels": True
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Asset {} was deleted'.format(asset_id))
if result:
if result.get('deleteAsset'):
if result['deleteAsset'].get('result'):
return result['deleteAsset']['result']
return None
def create_collection_item(self, name, slug, description, asset_ids, facet_filter_ids, parent_id=None,
featured_asset_id=None,
is_private=False):
query = gql("""
mutation CreateCollectionMutation($createCollectionInput: CreateCollectionInput!) {
createCollection(input: $createCollectionInput) {
id
}
}""")
if not name:
raise ValueError('name is required!')
if not asset_ids:
asset_ids = [self.no_image_id]
if not facet_filter_ids:
raise ValueError('facet_filter_ids is required!')
if not slug:
raise ValueError('slug is required!')
if not description:
raise ValueError('description is required!')
variable_values = {
"createCollectionInput": {
"isPrivate": is_private,
"featuredAssetId": featured_asset_id if featured_asset_id else asset_ids[0],
"assetIds": asset_ids,
"parentId": parent_id,
"translations": [{
'languageCode': 'en',
'name': name,
"slug": slug,
"description": description
}],
"filters": [{
"code": CollectionFilter.FacetFilter.value,
"arguments": [{
"name": 'facetValueIds',
"value": json.dumps([json.dumps(v) for v in sorted(facet_filter_ids)])
#
},
{
"name": 'containsAny',
"value": 'true'
},
]
}]
}
}
if not parent_id:
del variable_values['createCollectionInput']['parentId']
result = self.__execute(query, variable_values=variable_values)
logging.info('Collection {} was added'.format(slug))
if result:
if result.get('createCollection'):
if result['createCollection'].get('id'):
return result['createCollection']['id']
return None
def update_collection_item(self, collection_id, name, slug, description, asset_ids, facet_filter_ids,
parent_id=None, featured_asset_id=None,
is_private=False):
query = gql("""
mutation UpdateCollectionMutation($updateCollectionInput: UpdateCollectionInput!) {
updateCollection(input: $updateCollectionInput) {
id
}
}""")
if not name:
raise ValueError('name is required!')
if not asset_ids:
asset_ids = [self.no_image_id]
if not facet_filter_ids:
raise ValueError('facet_filter_ids is required!')
if not slug:
raise ValueError('slug is required!')
if not description:
raise ValueError('descriprion is required!')
if not collection_id:
raise ValueError('collection_id is required!')
variable_values = {
"updateCollectionInput": {
"isPrivate": is_private,
"featuredAssetId": featured_asset_id if featured_asset_id else asset_ids[0],
"assetIds": asset_ids,
"parentId": parent_id,
'id': collection_id,
"translations": [{
'languageCode': 'en',
'name': name,
"slug": slug,
'id': collection_id,
"description": description
}],
"filters": [{
"code": 'facet-value-filter',
"arguments": [{
"name": 'facetValueIds',
"value": '[{}]'.format(','.join(str(i) for i in sorted(facet_filter_ids)))
# json.dumps([json.dumps(int(v)) for v in facet_filter_ids])
},
{
"name": 'containsAny',
"value": 'true'
},
]
}]
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Collection {} was updated'.format(slug))
if result:
if result.get('updateCollection'):
if result['updateCollection'].get('id'):
return result['updateCollection']['id']
return None
def delete_collection_item(self, collection_id):
query = gql("""
mutation DeleteCollectionMutation($deleteCollectionId: ID!) {
deleteCollection(id: $deleteCollectionId) {
result
}
}""")
if not collection_id:
raise ValueError('collection_id is required!')
variable_values = {
"deleteCollectionId": collection_id
}
result = self.__execute(query, variable_values=variable_values)
if result:
if result.get('deleteCollection'):
if result['deleteCollection'].get('result'):
return result['deleteCollection']['result']
return None
def move_collection_item(self, collection_id, parent_id, index=0):
query = gql("""
mutation IdMutation($moveCollectionInput: MoveCollectionInput!) {
moveCollection(input: $moveCollectionInput) {
id
}
}""")
if not collection_id:
raise ValueError('collection_id is required!')
if not parent_id:
raise ValueError('parent_id is required!')
variable_values = {
"moveCollectionInput": {
'collectionId': collection_id,
'index': index,
'parent_id': parent_id
}
}
result = self.__execute(query, variable_values=variable_values)
if result:
if result.get('moveCollection'):
if result['moveCollection'].get('id'):
return result['moveCollection']['id']
return None
def get_all_products(self):
query = gql("""query Query {
products {
totalItems
}
}""")
result = self.__execute(query, variable_values={})
total_items = result.get('products', dict()).get('totalItems')
take = 50
items = list()
if total_items:
for i in range(int(total_items / take) + (total_items % take > 0)):
skip = i * take
query = gql("""
query Query($productsOptions: ProductListOptions) {
products(options: $productsOptions) {
items {
enabled
channels {
id
code
}
id
createdAt
updatedAt
languageCode
name
slug
description
featuredAsset {
id
}
assets {
id
}
variants {
enabled
trackInventory
stockOnHand
stockAllocated
useGlobalOutOfStockThreshold
outOfStockThreshold
channels {
id
}
id
product {
id
}
productId
createdAt
updatedAt
languageCode
sku
name
featuredAsset {
id
}
assets {
id
}
price
currencyCode
priceWithTax
taxRateApplied {
id
name
value
}
taxCategory {
id
name
}
options {
id
code
name
groupId
languageCode
}
facetValues {
id
facet {
id
}
name
code
}
translations {
id
name
languageCode
}
}
optionGroups {
id
code
name
}
facetValues {
id
facet {
id
}
name
code
}
collections {
id
name
slug
}
customFields
}
}
}
""")
result = self.__execute(query, variable_values={'productsOptions': {"skip": skip,
"take": take}})
logging.debug('Take:{} Skip: {}'.format(take,skip))
if result:
if result.get('products'):
if result['products'].get('items'):
items += result['products']['items']
# time.sleep(1)
if items:
return items
else:
return None
def _get_product_by_id(self, product_id):
query = """
query Query($productId: ID) {
product(id: $productId) {
enabled
channels {
id
code
}
id
createdAt
updatedAt
languageCode
name
slug
description
featuredAsset {
id
}
assets {
id
}
variants {
enabled
trackInventory
stockOnHand
stockAllocated
useGlobalOutOfStockThreshold
outOfStockThreshold
channels {
id
}
id
product {
id
}
productId
createdAt
updatedAt
languageCode
sku
name
featuredAsset {
id
}
assets {
id
}
price
currencyCode
priceWithTax
taxRateApplied {
id
name
value
}
taxCategory {
id
name
}
options {
id
code
name
groupId
languageCode
}
facetValues {
id
facet {
id
}
name
code
}
translations {
id
name
languageCode
}
}
optionGroups {
id
code
name
}
facetValues {
id
facet {
id
}
name
code
}
collections {
id
name
slug
}
customFields
}
}
"""
s = requests.Session()
# noinspection PyUnresolvedReferences
s.cookies = cookiejar_from_dict(
{c.key: c.value for c in self.client.transport.client_session_args['cookie_jar']})
request = s.post(self.graphql_url,
{"query": query, 'variables': json.dumps({'productId': product_id})},
)
if request.status_code == 200:
response = request.json()
if response.get('data'):
if response['data'].get('product'):
return response['data']['product']
else:
raise Exception
def get_all_products_by_facet_value_id(self, facet_value_id):
query = gql("""query Query($searchInput: SearchInput!) {
search(input: $searchInput) {
totalItems
}
}""")
result = self.__execute(query, variable_values={"searchInput": {
"facetValueIds": [facet_value_id]
}})
total_items = result.get('search', dict()).get('totalItems')
products_ids = set()
if total_items:
for i in range(int(total_items / self.__take) + (total_items % self.__take > 0)):
skip = i * self.__take
query = gql("""
query Query($searchInput: SearchInput!) {
search(input: $searchInput) {
items {
productId
}
}
}""")
result = self.__execute(query, variable_values={
"searchInput": {
"facetValueIds": [facet_value_id],
"take": self.__take,
"skip": skip
}
})
if result:
if result.get('search'):
if result['search'].get('items'):
for item in result['search']['items']:
id = item.get('productId')
if id:
products_ids.update({id})
items = list()
if products_ids:
with ThreadPoolExecutor(4) as executor:
items = list(
executor.map(self._get_product_by_id, products_ids))
if items:
return [i for i in items if i]
else:
return None
def get_all_product_option_groups(self, filter=None):
query = gql("""
query Query($productOptionGroupsFilterTerm: String) {
productOptionGroups(filterTerm: $productOptionGroupsFilterTerm) {
id
code
name
options {
id
code
name
}
}
}
""")
result = self.__execute(query, variable_values={"filter": filter})
if result:
if result.get('productOptionGroups'):
return result['productOptionGroups']
return None
def create_product(self, name, slug, description, asset_ids, facet_ids,
featured_asset_id=None):
query = gql("""
mutation CreateProductMutation($createProductInput: CreateProductInput!) {
createProduct(input: $createProductInput) {
id
}
}""")
if not name:
raise ValueError('name is required!')
if not asset_ids:
asset_ids = [self.no_image_id]
if not facet_ids:
raise ValueError('facet_ids is required!')
if not slug:
raise ValueError('slug is required!')
if not description:
raise ValueError('description is required!')
variable_values = {
"createProductInput": {
"featuredAssetId": featured_asset_id if featured_asset_id else asset_ids[0],
"enabled": True,
"assetIds": asset_ids,
"facetValueIds": facet_ids,
"customFields": None,
"translations": [{
"languageCode": 'en',
"name": name,
"slug": slug,
"description": description,
"customFields": None,
}]
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product {} was added'.format(slug))
if result:
if result.get('createProduct'):
if result['createProduct'].get('id'):
return result['createProduct']['id']
return None
def update_product(self, product_id, name, slug, description, asset_ids, facet_ids,
featured_asset_id=None):
query = gql("""
mutation UpdateProductMutation($updateProductInput: UpdateProductInput!) {
updateProduct(input: $updateProductInput) {
id
}
}""")
if not product_id:
raise ValueError('product_id is required!')
if not name:
raise ValueError('name is required!')
if not asset_ids:
asset_ids = [self.no_image_id]
if not facet_ids:
raise ValueError('facet_ids is required!')
if not slug:
raise ValueError('slug is required!')
if not description:
raise ValueError('description is required!')
variable_values = {
"updateProductInput": {
"id": product_id,
"featuredAssetId": featured_asset_id if featured_asset_id else asset_ids[0],
"enabled": True,
"assetIds": asset_ids,
"facetValueIds": facet_ids,
"customFields": None,
"translations": [{
"languageCode": 'en',
"name": name,
"slug": slug,
"description": description,
"customFields": None,
}]
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product {} was updated'.format(slug))
if result:
if result.get('updateProduct'):
if result['updateProduct'].get('id'):
return result['updateProduct']['id']
return None
def delete_product(self, product_id):
query = gql("""
mutation DeleteProductMutation($deleteProductId: ID!) {
deleteProduct(id: $deleteProductId) {
result
message
}
}""")
if not product_id:
raise ValueError('product_id is required!')
variable_values = {
"deleteProductId": product_id
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product {} was deleted'.format(product_id))
if result:
if result.get('deleteProduct'):
if result['deleteProduct'].get('result'):
return result['deleteProduct']['result']
return None
def create_product_variant(self, product_id, sku, price, name, asset_ids, option_ids, facet_ids=None,
featured_asset_id=None):
query = gql("""
mutation CreateProductVariantsMutation($createProductVariantsInput: [CreateProductVariantInput!]!) {
createProductVariants(input: $createProductVariantsInput) {
id
}
}""")
if not product_id:
raise ValueError('product_id is required!')
if not sku:
raise ValueError('sku is required!')
if not price:
raise ValueError('price is required!')
if not name:
raise ValueError('name is required!')
if not asset_ids:
asset_ids = [self.no_image_id]
if not option_ids:
raise ValueError('option_ids is required!')
variable_values = {
"createProductVariantsInput": [{
"productId": product_id,
"translations": [{
"languageCode": 'en',
"name": name,
"customFields": None,
}],
"facetValueIds": facet_ids,
"sku": sku,
"price": price,
"customFields": None,
"assetIds": asset_ids,
"featuredAssetId": featured_asset_id if featured_asset_id else asset_ids[0],
"optionIds": option_ids,
"taxCategoryId": self.tax_category_id
}]
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product variant {} was added'.format(name))
if result:
if result.get('createProductVariants'):
if result['createProductVariants'][0].get('id'):
return result['createProductVariants'][0]['id']
return None
def update_product_variant(self, variant_id, sku, price, name, asset_ids, facet_ids=None,
featured_asset_id=None):
query = gql("""
mutation UpdateProductVariantsMutation($updateProductVariantsInput: [UpdateProductVariantInput!]!) {
updateProductVariants(input: $updateProductVariantsInput) {
id
}
}""")
if not variant_id:
raise ValueError('variant_id is required!')
if not sku:
raise ValueError('sku is required!')
if not price:
raise ValueError('price is required!')
if not name:
raise ValueError('name is required!')
if not asset_ids:
asset_ids = [self.no_image_id]
variable_values = {
"updateProductVariantsInput": [{
"id": variant_id,
"translations": [{
"languageCode": 'en',
"name": name,
"customFields": None,
}],
"facetValueIds": facet_ids,
"sku": sku,
"price": price,
"customFields": None,
"assetIds": asset_ids,
"featuredAssetId": featured_asset_id if featured_asset_id else asset_ids[0],
"taxCategoryId": self.tax_category_id
}]
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product variant {} was updated'.format(name))
if result:
if result.get('updateProductVariants'):
if result['updateProductVariants'][0].get('id'):
return result['updateProductVariants'][0]['id']
return None
def get_all_jobs(self):
query = gql("""query Query {
jobs {
totalItems
}
}""")
total_items = 100000
items = list()
if total_items:
for i in range(int(total_items / self.__take) + (total_items % self.__take > 0)):
skip = i * self.__take
query = gql("""
query Query($jobsOptions: JobListOptions) {
jobs(options: $jobsOptions) {
items {
id
createdAt
queueName
state
progress
data
result
error
isSettled
duration
}
}
}
""")
result = self.__execute(query, variable_values={'jobsOptions': {"skip": skip,
"take": self.__take,
"filter": {
'state': {'eq': 'PENDING'}}, }})
if result:
if result.get('jobs'):
if result['jobs'].get('items'):
items += result['jobs']['items']
if items:
return items
else:
return None
def cancel_job(self, job_id):
query = gql("""
mutation CancelJobMutation($cancelJobJobId: ID!) {
cancelJob(jobId: $cancelJobJobId) {
id
}
}""")
if not job_id:
raise ValueError('job_id is required!')
variable_values = {
"cancelJobJobId": job_id
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Job {} was deleted'.format(job_id))
if result:
if result.get('cancelJob'):
if result['cancelJob'].get('id'):
return result['cancelJob']['id']
return None
def delete_product_variant(self, variant_id):
query = gql("""
mutation DeleteProductVariantMutation($deleteProductVariantId: ID!) {
deleteProductVariant(id: $deleteProductVariantId) {
result
}
}""")
if not variant_id:
raise ValueError('variant_id is required!')
variable_values = {
"deleteProductVariantId": variant_id
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product variant {} was deleted'.format(variant_id))
if result:
if result.get('deleteProductVariant'):
if result['deleteProductVariant'].get('result'):
return result['deleteProductVariant']['result']
return None
def create_product_option_group(self, slug, name, options=None, custom_fields=None):
query = gql("""
mutation CreateProductOptionGroupMutation($createProductOptionGroupInput: CreateProductOptionGroupInput!) {
createProductOptionGroup(input: $createProductOptionGroupInput) {
id
code
options {
id
code
name
}
}
}""")
if not slug:
raise ValueError('slug is required!')
if not name:
raise ValueError('name is required!')
variable_values = {
"createProductOptionGroupInput": {
"code": slug,
"translations": [{
"languageCode": 'en',
"name": name,
"customFields": None
}],
"options": [{"code": opt['slug'], "translations": [{"languageCode": 'en', "name": opt['name']}]} for opt
in options],
"customFields": custom_fields
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product option group {} was added'.format(name))
if result:
if result.get('createProductOptionGroup'):
return result['createProductOptionGroup']
return None
def update_product_option_group(self, option_group_id, slug, name, custom_fields=None):
query = gql("""
mutation UpdateProductOptionGroupMutation($updateProductOptionGroupInput: UpdateProductOptionGroupInput!) {
updateProductOptionGroup(input: $updateProductOptionGroupInput) {
id
}
}""")
if not slug:
raise ValueError('slug is required!')
if not name:
raise ValueError('name is required!')
variable_values = {
"updateProductOptionGroupInput": {
"id": option_group_id,
"code": slug,
"translations": [{
"languageCode": 'en',
"name": name,
"customFields": None
}],
"customFields": custom_fields
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product option group {} was added'.format(name))
if result:
if result.get('createProductOptionGroup'):
if result['createProductOptionGroup'].get('id'):
return result['createProductOptionGroup']['id']
return None
def create_product_option(self, option_group_id, slug, name, custom_fields=None):
query = gql("""
mutation CreateProductOptionMutation($createProductOptionInput: CreateProductOptionInput!) {
createProductOption(input: $createProductOptionInput) {
id
}
}""")
if not option_group_id:
raise ValueError('option_group_id is required!')
if not name:
raise ValueError('name is required!')
if not slug:
raise ValueError('slug is required!')
variable_values = {
"createProductOptionInput": {
"productOptionGroupId": option_group_id,
"code": slug,
"translations": [{
"languageCode": 'en',
"name": name,
"customFields": None
}],
"customFields": custom_fields
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product option group {} was added'.format(name))
if result:
if result.get('createProductOption'):
if result['createProductOption'].get('id'):
return result['createProductOption']['id']
return None
def update_product_option(self, option_id, slug, name, custom_fields=None):
query = gql("""
mutation UpdateProductOptionMutation($updateProductOptionInput: UpdateProductOptionInput!) {
updateProductOption(input: $updateProductOptionInput) {
id
}
}""")
if not option_id:
raise ValueError('option_id is required!')
if not name:
raise ValueError('name is required!')
if not slug:
raise ValueError('slug is required!')
variable_values = {
"createProductOptionInput": {
"id": option_id,
"code": slug,
"translations": [{
"languageCode": 'en',
"name": name,
"customFields": None
}],
"customFields": custom_fields
}
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product option group {} was added'.format(name))
if result:
if result.get('createProductOptionGroup'):
if result['createProductOptionGroup'].get('id'):
return result['createProductOptionGroup']['id']
return None
def add_product_option_group_to_product(self, product_id, option_group_id):
query = gql("""
mutation AddOptionGroupToProductMutation($addOptionGroupToProductProductId: ID!, $addOptionGroupToProductOptionGroupId: ID!) {
addOptionGroupToProduct(productId: $addOptionGroupToProductProductId, optionGroupId: $addOptionGroupToProductOptionGroupId) {
id
}
}""")
if not product_id:
raise ValueError('product_id is required!')
if not option_group_id:
raise ValueError('option_group_id is required!')
variable_values = {
"addOptionGroupToProductProductId": product_id,
"addOptionGroupToProductOptionGroupId": option_group_id
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product option group {} was added to product {}'.format(option_group_id, product_id))
if result:
if result.get('addOptionGroupToProduct'):
if result['addOptionGroupToProduct'].get('id'):
return result['addOptionGroupToProduct']['id']
return None
def remove_product_option_group_from_product(self, product_id, option_group_id):
query = gql("""
mutation RemoveOptionGroupFromProductMutation($removeOptionGroupFromProductProductId: ID!, $removeOptionGroupFromProductOptionGroupId: ID!) {
removeOptionGroupFromProduct(productId: $removeOptionGroupFromProductProductId, optionGroupId: $removeOptionGroupFromProductOptionGroupId) {
... on Product {
id
}
}
}""")
if not product_id:
raise ValueError('product_id is required!')
if not option_group_id:
raise ValueError('option_group_id is required!')
variable_values = {
"removeOptionGroupFromProductProductId": product_id,
"removeOptionGroupFromProductOptionGroupId": option_group_id
}
result = self.__execute(query, variable_values=variable_values)
logging.info('Product option group {} was removed from product {}'.format(option_group_id, product_id))
if result:
if result.get('removeOptionGroupFromProduct'):
if result['removeOptionGroupFromProduct'].get('id'):
return result['removeOptionGroupFromProduct']['id']
return None
cancel_all_jobs.py
import logging
import sys
from vendure_api import VendureAdminApi
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
aiohttp_logger = logging.getLogger('gql.transport.aiohttp')
aiohttp_logger.setLevel(logging.WARNING)
if __name__ == "__main__":
api = VendureAdminApi()
jobs = api.get_all_jobs() or []
logging.info('Successfully get jobs for canceling')
for job in jobs:
try:
api.cancel_job(job['id'])
except:
pass
delete_products_without_assets.py
import logging
import sys
from vendure_api import VendureAdminApi
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
if __name__ == "__main__":
api = VendureAdminApi()
assets = api.get_all_assets()
products = api.get_all_products()
used_assets_ids = set()
for product in products:
for asset in product.get('assets', []):
used_assets_ids.add(asset['id'])
if product.get('featuredAsset'):
used_assets_ids.add(product['featuredAsset']['id'])
for asset in assets:
if asset['id'] not in used_assets_ids:
api.delete_asset(asset['id'])
move_data_for_dev_to_prod.py
import logging
import sys
from vendure_api import VendureAdminApi
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
aiohttp_logger = logging.getLogger('gql.transport.aiohttp')
aiohttp_logger.setLevel(logging.WARNING)
urlib_logger = logging.getLogger('urllib3')
urlib_logger.setLevel(logging.WARNING)
if __name__ == "__main__":
load_assets = False
load_facets = False
load_options_group = False
load_products = True
dev_api = VendureAdminApi(graphql_url='old_url')
prod_api = VendureAdminApi(graphql_url='new_url')
if load_assets:
dev_assets = dev_api.get_all_assets(take=1000) or []
prod_assets = prod_api.get_all_assets(take=500) or []
prod_assets_names = [a['name'] for a in prod_assets]
assets_to_load = [a for a in dev_assets if a['name'] not in prod_assets_names]
print('Assets to load: {}'.format(len(assets_to_load)))
for idx, asset in enumerate(assets_to_load):
print('load {} asset'.format(idx))
tags = asset['tags']
if tags:
prod_api.add_asset(asset['source'], asset['name'], tags[0]['value'])
else:
prod_api.add_asset(asset['source'], asset['name'])
if load_facets:
dev_facets = dev_api.get_all_facets()
prod_facets = prod_api.get_all_facets()
for facet in dev_facets:
prod_facet = next((f for f in prod_facets if f['code'] == facet['code']), None)
if not prod_facet:
prod_api.create_facet(facet['name'], facet['code'])
prod_facets = prod_api.get_all_facets()
prod_facet = next((f for f in prod_facets if f['code'] == facet['code']), None)
for value in facet['values']:
prod_value = next((v for v in prod_facet['values'] if v['code'] == value['code']), None)
if not prod_value:
prod_api.create_facet_value(prod_facet['id'], value['name'], value['code'])
if load_options_group:
dev_options_groups = dev_api.get_all_product_option_groups()
prod_options_groups = prod_api.get_all_product_option_groups() or []
for dev_option_group in dev_options_groups:
prod_options_group = next((f for f in prod_options_groups if f['code'] == dev_option_group['code']), None)
if not prod_options_group:
dev_option_group_values = [{'slug': og['code'], 'name': og['name']} for og in
dev_option_group['options']]
prod_api.create_product_option_group(dev_option_group['code'], dev_option_group['name'],
dev_option_group_values)
else:
for value in dev_option_group['options']:
prod_option_group_value = next(
(v for v in prod_options_group['options'] if v['code'] == value['code']),
None)
if not prod_option_group_value:
prod_api.create_product_option(prod_options_group['id'], value['code'], value['name'])
if load_products:
dev_assets = dev_api.get_all_assets(take=1000) or []
prod_assets = prod_api.get_all_assets(take=1000) or []
dev_assets_names = {a['id']: a['name'] for a in dev_assets}
prod_assets_names = {a['name']: int(a['id']) for a in prod_assets}
dev_facets = dev_api.get_all_facets() or []
prod_facets = prod_api.get_all_facets() or []
prod_facet_values_names = {a['code']: int(a['id']) for f in prod_facets for a in f['values']}
dev_options_groups = dev_api.get_all_product_option_groups() or []
prod_options_groups = prod_api.get_all_product_option_groups() or []
prod_options_groups_names = {a['code']: int(a['id']) for a in prod_options_groups}
dev_brands = next((f['values'] for f in dev_facets if f['code'] == 'brand'), None)
prod_brands = next((f['values'] for f in prod_facets if f['code'] == 'brand'), None)
for dev_brand in dev_brands:
prod_brand = next((b for b in prod_brands if b['code'] == dev_brand['code']), None)
if not prod_brand:
raise Exception
dev_brand_products = dev_api.get_all_products_by_facet_value_id(dev_brand['id'])
if dev_brand_products:
prod_brand_products = prod_api.get_all_products_by_facet_value_id(prod_brand['id']) or []
for dev_product in dev_brand_products:
prod_product_assets = [prod_assets_names[dev_assets_names[a['id']]] for a in dev_product['assets']]
prod_product_facets = [prod_facet_values_names[a['code']] for a in dev_product['facetValues']]
prod_product_featured_asset_id = prod_assets_names[
dev_assets_names[dev_product['featuredAsset']['id']]]
dev_product_have_og = False
prod_product = next((p for p in prod_brand_products if p['slug'] == dev_product['slug']), None)
variant_oooptions_group_codes = [
next((g for g in dev_options_groups if g['id'] == ogv['groupId']), {'code': None})['code']
for variant in dev_product['variants'] for ogv in variant['options']]
oooptions_group_codes = list(
set([g['code'] for g in dev_product['optionGroups']] + [ogv for ogv in
variant_oooptions_group_codes
if ogv is not None]))
if not prod_product:
product_id = prod_api.create_product(dev_product['name'], dev_product['slug'],
dev_product['description'],
prod_product_assets, prod_product_facets,
featured_asset_id=prod_product_featured_asset_id)
for og in oooptions_group_codes:
prod_api.add_product_option_group_to_product(product_id,
prod_options_groups_names[og])
for variant in dev_product['variants']:
option_ids = list()
for ogv in variant['options']:
group_id = ogv['groupId']
dev_og = next((g for g in dev_options_groups if g['id'] == group_id), None)
prod_og_id = prod_options_groups_names[dev_og['code']]
prod_og = next(
(asda for asda in prod_options_groups if asda['id'] == str(prod_og_id)),
None)
prod_ogv = next(
(vvv['id'] for vvv in prod_og['options'] if vvv['code'] == ogv['code']),
None)
if prod_ogv:
option_ids.append(prod_ogv)
prod_api.create_product_variant(product_id=product_id, sku=variant['sku'],
price=variant['price'],
name=variant['name'],
asset_ids=[prod_assets_names[dev_assets_names[a['id']]] for
a in variant['assets']],
option_ids=option_ids,
featured_asset_id=prod_assets_names[
dev_assets_names[variant['featuredAsset']['id']]]
)
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
vendure_api.py
cancel_all_jobs.py
delete_products_without_assets.py
move_data_for_dev_to_prod.py
Beta Was this translation helpful? Give feedback.
All reactions