-
Notifications
You must be signed in to change notification settings - Fork 0
/
couchbaseops.py
161 lines (105 loc) · 4.9 KB
/
couchbaseops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from couchbase.vector_search import VectorQuery, VectorSearch
import couchbase.search as search
from couchbase.options import SearchOptions
from dotenv import load_dotenv
import uuid
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, QueryOptions
from couchbase.auth import PasswordAuthenticator
from datetime import timedelta
import os
import couchbase.subdocument as SD
from sharedfunctions.print import print_success
from couchbase.n1ql import QueryScanConsistency
load_dotenv()
# Couchbase connection
auth = PasswordAuthenticator(os.getenv("CB_USERNAME"), os.getenv("CB_PASSWORD"))
cluster = Cluster(f'couchbase://{os.getenv("EE_HOSTNAME")}', ClusterOptions(auth))
cluster.wait_until_ready(timedelta(seconds=5))
print("Couchbase setup complete")
# CRUD operations
def get_doc(bucket, scope, collection, doc_id):
collection = cluster.bucket(bucket).scope(scope).collection(collection)
try:
result = collection.get(doc_id)
return result.content_as[dict]
except Exception as e:
print("exception:", e)
return None
def insert_doc(bucket, scope, collection, doc, doc_id=None, mute=False):
cb_collection = cluster.bucket(bucket).scope(scope).collection(collection)
try:
docid = doc_id if doc_id else str(generate_uuid())
cb_collection.insert(
docid,
doc
)
if not mute:
print(f"Insert {collection} successful: {docid}")
return docid
except Exception as e:
print("exception:", e)
return None
def generate_uuid():
return uuid.uuid4()
def cb_vector_search(bucket_name, scope_name, fts_index, embedding_field, vector, key_context_fields):
scope = cluster.bucket(bucket_name).scope(scope_name)
search_req = search.SearchRequest.create(search.MatchNoneQuery()).with_vector_search(
VectorSearch.from_vector_query(VectorQuery(embedding_field, vector, num_candidates=2)))
return scope.search(fts_index, search_req, SearchOptions(limit=13, fields=key_context_fields))
def delete_doc(bucket, scope, collection, doc_id):
collection = cluster.bucket(bucket).scope(scope).collection(collection)
try:
collection.remove(doc_id)
print("Deleted doc with id:", doc_id)
except Exception as e:
print("exception:", e)
return None
def flush_collection(bucket_name, scope_name, collection_name):
try:
result = cluster.query(
f"""
select meta().id
from `{bucket_name}`.`{scope_name}`.`{collection_name}`
"""
)
for row in result:
doc_id = row["id"]
delete_doc(bucket_name, scope_name, collection_name, doc_id)
except Exception as e:
print(f"An error occurred: {e}")
print_success(f"collection `{bucket_name}.{scope_name}.{collection_name}` flushed")
def subdocument_upsert(bucket, scope, collection, doc_id, path, value):
cb_collection = cluster.bucket(bucket).scope(scope).collection(collection)
try:
cb_collection.mutate_in(doc_id, [SD.upsert(path, value)])
print(f"Subdocument upsert successful for {doc_id}, collection {collection}, path {path} and value {value}")
except Exception as e:
print("exception with subdoc upsert:", e)
return None
def subdocument_insert(bucket, scope, collection, doc_id, path, value):
cb_collection = cluster.bucket(bucket).scope(scope).collection(collection)
try:
cb_collection.mutate_in(doc_id, [SD.insert(path, value)])
print(f"Subdocument insert successful for {doc_id}, collection {collection}, path {path} and value {value}")
except Exception as e:
print("exception with subdoc insert:", e)
return None
def mutliple_subdoc_upsert(bucket, scope, collection, doc_id, path_value_dict):
cb_collection = cluster.bucket(bucket).scope(scope).collection(collection)
try:
operations = [SD.upsert(path, value) for path, value in path_value_dict.items()]
cb_collection.mutate_in(doc_id, operations)
print(f"Multiple subdocument upsert successful for {doc_id}, collection {collection}, path_value_dict {path_value_dict}")
except Exception as e:
print("exception:", e)
return None
def run_query(query, scanConsistency=False):
try:
queryOptions = QueryOptions(scan_consistency=QueryScanConsistency.REQUEST_PLUS if scanConsistency else QueryScanConsistency.NOT_BOUNDED)
result = cluster.query(query, queryOptions).execute()
print(f"Query successful: {query}, result: {result}")
return result
except Exception as e:
print("exception:", e)
return None