1 package eu.fbk.knowledgestore.runtime;
2
3 import java.util.concurrent.Semaphore;
4
5 import javax.annotation.Nullable;
6
7 import com.google.common.base.Preconditions;
8 import com.google.common.base.Throwables;
9
10 public final class Synchronizer {
11
12 public static final int WX = -1;
13
14 public static final int CX = -2;
15
16 private final int maxConcurrentTx;
17
18 private final int maxWriteTx;
19
20 private final Semaphore mainSemaphore;
21
22 @Nullable
23 private final Semaphore writeSemaphore;
24
25 @Nullable
26 private final Semaphore commitSemaphore;
27
28 private Synchronizer(final int maxConcurrentTx, final int maxWriteTx) {
29
30 Preconditions.checkArgument(maxConcurrentTx > 0);
31 Preconditions.checkArgument(maxWriteTx >= 0 && maxWriteTx <= maxConcurrentTx
32 || maxWriteTx == WX || maxWriteTx == CX);
33
34 this.maxConcurrentTx = maxConcurrentTx;
35 this.maxWriteTx = maxWriteTx;
36 this.mainSemaphore = new Semaphore(maxConcurrentTx, true);
37 this.writeSemaphore = maxWriteTx != 0 ? new Semaphore(Math.max(maxWriteTx, 1), true)
38 : null;
39 this.commitSemaphore = maxWriteTx == CX ? new Semaphore(1, true) : null;
40 }
41
42 public static Synchronizer create(final String spec) {
43
44 final int index = spec.indexOf(':');
45 final String first = (index <= 0 ? spec : spec.substring(0, index)).trim();
46 final String second = index <= 0 ? null : spec.substring(index + 1).trim().toUpperCase();
47
48 try {
49 final int maxConcurrentTx = Integer.parseInt(first);
50 final int maxWriteTx = second == null ? 0 : second.equals("WX") ? WX :
51 second.equals("CX") ? CX : Integer.parseInt(second);
52 return new Synchronizer(maxConcurrentTx, maxWriteTx);
53 } catch (final Throwable ex) {
54 throw new IllegalArgumentException(
55 "Illegal synchronizer specification '" + spec + "'", ex);
56 }
57 }
58
59 public static Synchronizer create(final int maxConcurrentTx, final int maxWriteTx) {
60 return new Synchronizer(maxConcurrentTx, maxWriteTx);
61 }
62
63 public void beginExclusive() {
64 try {
65 this.mainSemaphore.acquire(this.maxConcurrentTx);
66 } catch (final Throwable ex) {
67 Throwables.propagate(ex);
68 }
69 }
70
71 public void endExclusive() {
72 this.mainSemaphore.release(this.maxConcurrentTx);
73 }
74
75 public void beginTransaction(final boolean readOnly) {
76 boolean writeAcquired = false;
77 try {
78 if (readOnly) {
79 this.mainSemaphore.acquire();
80 } else if (this.writeSemaphore == null) {
81 throw new IllegalStateException("Write transactions have been disabled");
82 } else {
83 this.writeSemaphore.acquire();
84 writeAcquired = true;
85 this.mainSemaphore.acquire(this.maxWriteTx == WX ? this.maxConcurrentTx : 1);
86 }
87 } catch (final Throwable ex) {
88 if (writeAcquired) {
89 this.writeSemaphore.release();
90 }
91 Throwables.propagate(ex);
92 }
93 }
94
95 public void endTransaction(final boolean readOnly) {
96 if (readOnly) {
97 this.mainSemaphore.release(1);
98 } else {
99 this.mainSemaphore.release(this.maxWriteTx == WX ? this.maxConcurrentTx : 1);
100 this.writeSemaphore.release();
101 }
102 }
103
104 public void beginCommit() {
105 if (this.maxWriteTx == CX) {
106 boolean commitAcquired = false;
107 try {
108 this.commitSemaphore.acquire();
109 commitAcquired = true;
110 this.mainSemaphore.acquire(this.maxConcurrentTx - 1);
111 } catch (final Throwable ex) {
112 if (commitAcquired) {
113 this.commitSemaphore.release();
114 }
115 Throwables.propagate(ex);
116 }
117 }
118 }
119
120 public void endCommit() {
121 if (this.maxWriteTx == CX) {
122 this.mainSemaphore.release(this.maxConcurrentTx - 1);
123 this.commitSemaphore.release();
124 }
125 }
126
127 @Override
128 public String toString() {
129 return this.maxConcurrentTx + ":"
130 + (this.maxWriteTx == WX ? "WX" : this.maxWriteTx == CX ? "CX" : this.maxWriteTx);
131 }
132
133 }